Environment
1.getExecutionEnvironment
创建一个执行环境,表示当前执行程序的上下文。如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。
如果没有设置并行度,会以flink-conf.yaml中的配置为准,默认是1。
2.createLocalEnvironment
返回本地执行环境,需要在调用时指定默认的并行度。
3.createRemoteEnvironment
返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。
Source
1.从集合读取数据
输出
从文件读取数据
输出
从Kafka读取数据
安装kafka
点击链接根据博客安装,找其他的也可以 能安装上就行
添加依赖
启动zookeeper
启动kafka服务
启动kafka生产者
java程序
运行java代码,在Kafka生产者console中输入
控制台输出
自定义数据源
java
结果
Transform(转换算子)
基本转换算子
map、flatMap、filter通常被统一称为基本转换算子(简单转换算子)。
输出
聚合操作算子
- DataStream里没有reduce和sum这类聚合操作的方法,因为Flink设计中,所有数据必须先分组才能做聚合操作。
- 先keyBy得到KeyedStream,然后调用其reduce、sum等聚合操作方法。(先分组后聚合)
常见的聚合操作算子主要有:
- keyBy
- 滚动聚合算子Rolling Aggregation
- reduce
keyBy
DataStream -> KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。
1、KeyBy会重新分区; 2、不同的key有可能分到一起,因为是通过hash原理实现的;
Rolling Aggregation
这些算子可以针对KeyedStream的每一个支流做聚合。
- sum()
- min()
- max()
- minBy()
- maxBy()
输出
result> SensorReading{id='sensor_1', timestamp=1547718199, temperature=35.8}
result> SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4}
result> SensorReading{id='sensor_7', timestamp=1547718202, temperature=6.7}
result> SensorReading{id='sensor_10', timestamp=1547718205, temperature=38.1}
result> SensorReading{id='sensor_1', timestamp=1547718207, temperature=36.3}
result> SensorReading{id='sensor_1', timestamp=1547718207, temperature=36.3}
result> SensorReading{id='sensor_1', timestamp=1547718212, temperature=37.1}
reduce
Reduce适用于更加一般化的聚合操作场景。java中需要实现ReduceFunction函数式接口。
在前面Rolling Aggregation的前提下,对需求进行修改。获取同组历史温度最高的传感器信息,同时要求实时更新其时间戳信息。
输出
支持的数据类型
Flink流应用程序处理的是以数据对象表示的事件流。所以在Flink内部,我们需要能够处理这些对象。它们需要被序列化和反序列化,以便通过网络传送它们;或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点,Flink需要明确知道应用程序所处理的数据类型。Flink使用类型信息的概念来表示数据类型,并为每个数据类型生成特定的序列化器、反序列化器和比较器。
Flink还具有一个类型提取系统,该系统分析函数的输入和返回类型,以自动获取类型信息,从而获得序列化器和反序列化器。但是,在某些情况下,例如lambda函数或泛型类型,需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。
Flink支持Java和Scala中所有常见数据类型。使用最广泛的类型有以下几种。
基础数据类型
Flink支持所有的Java和Scala基础数据类型,Int, Double, Long, String, …
Java和Scala元组(Tuples)
java不像Scala天生支持元组Tuple类型,java的元组类型由Flink的包提供,默认提供Tuple0~Tuple25
Scala样例类(case classes)
case class Person(name:String,age:Int)
val numbers: DataStream[(String,Integer)] = env.fromElements(
Person("张三",12),
Person("李四",23)
)
Java简单对象(POJO)
java的POJO这里要求必须提供无参构造函数
- 成员变量要求都是public(或者private但是提供get、set方法)
其他(Arrays, Lists, Maps, Enums,等等)
Flink对Java和Scala中的一些特殊目的的类型也都是支持的,比如Java的ArrayList,HashMap,Enum等等。