启动模式
local方式:
该方式是在Java虚拟机上运行Flink程序,或者是在正在运行程序的Java虚拟机上,像我们在IDE上直接运行就是采用的local方式,这种方式会获取到一个LocalExecutionEnvironment(或者CollectionEnvironment)类的环境上下文对象,默认并行度是当前可用处理器的Java虚拟机的数量
standalone方式:
配置一个或多个JobManager(HA模式),和一台或多台TaskManager,通过flink中bin下面的start-cluster.sh启动,关于这种方式的启动,后面会进行进一步详解。
Yarn方式:
Flink在Yarn上运行,通过Yarn来调度。
kubernetes方式:
容器化部署时目前业界很流行的一项技术,基于Docker镜像运行能够让用户更加方便地对应用进行管理和运维。容器管理工具中最为流行的就是Kubernetes(k8s),而Flink也在最近的版本中支持了k8s部署模式
taskmanager 和slots 点击链接查看(了解)
配置
conf/flink-conf.yaml
配置文件中taskmanager.numberOfTaskSlots
slot数量
parallelism.default
默认并行度
WEB UI提交任务
Flink Savepoint简单介绍(点击连接了解)
启动Flink后,可以在Web UI的
Submit New Job
提交jar包,然后指定Job参数。- Entry Class
程序的入口,指定入口类(类的全限制名)
- Program Arguments
程序启动参数,例如
--host localhost --port 7777
- Parallelism
- 代码中算子
setParallelism()
ExecutionEnvironment env.setMaxParallelism()
- 设置的Job并行度
- 集群conf配置文件中的
parallelism.default
设置Job并行度。
Ps:并行度优先级(从上到下优先级递减)
ps:socket等特殊的IO操作,本身不能并行处理,并行度只能是1
- Savepoint Path
savepoint是通过checkpoint机制为streaming job创建的一致性快照,比如数据源offset,状态等。
(savepoint可以理解为手动备份,而checkpoint为自动备份)
ps:提交job要注意分配的slot总数是否足够使用,如果slot总数不够,那么job执行失败。(资源不够调度)
这里提交前面demo项目的StreamWordCount,在本地socket即
nc -lk 7777
中输入字符串,查看结果输入:
输出:
可以看出来输出的顺序并不是和输入的字符串严格相同的,因为是多个线程并行处理的。
命令行提交job
- 查看已提交的所有job
- 提交job
c
指定入口类p
指定job的并行度
bin/flink run -c <入口类> -p <并行度> <jar包路径> <启动参数>
- 取消job
bin/flink cancel <Job的ID>
注:Total Task Slots只要不小于Job中Parallelism最大值即可。