Lazy loaded image
7.流处理API(数据源+算子)
00 分钟
2024-10-9

Environment

1.getExecutionEnvironment

创建一个执行环境,表示当前执行程序的上下文。如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。
 
如果没有设置并行度,会以flink-conf.yaml中的配置为准,默认是1。
notion image

2.createLocalEnvironment

返回本地执行环境,需要在调用时指定默认的并行度。

3.createRemoteEnvironment

返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。

Source

1.从集合读取数据

输出

notion image

从文件读取数据

输出

notion image

从Kafka读取数据

安装kafka

点击链接根据博客安装,找其他的也可以 能安装上就行
 

添加依赖

启动zookeeper

启动kafka服务

启动kafka生产者

java程序

运行java代码,在Kafka生产者console中输入

notion image

控制台输出

notion image

自定义数据源

java

结果

notion image

Transform(转换算子)

基本转换算子

map、flatMap、filter通常被统一称为基本转换算子(简单转换算子)。

输出

聚合操作算子

  • DataStream里没有reduce和sum这类聚合操作的方法,因为Flink设计中,所有数据必须先分组才能做聚合操作
  • 先keyBy得到KeyedStream,然后调用其reduce、sum等聚合操作方法。(先分组后聚合)
常见的聚合操作算子主要有:
  • keyBy
  • 滚动聚合算子Rolling Aggregation
  • reduce

keyBy

notion image
DataStream -> KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。
1、KeyBy会重新分区; 2、不同的key有可能分到一起,因为是通过hash原理实现的;

Rolling Aggregation

这些算子可以针对KeyedStream的每一个支流做聚合。
  • sum()
  • min()
  • max()
  • minBy()
  • maxBy()

输出

result> SensorReading{id='sensor_1', timestamp=1547718199, temperature=35.8} result> SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4} result> SensorReading{id='sensor_7', timestamp=1547718202, temperature=6.7} result> SensorReading{id='sensor_10', timestamp=1547718205, temperature=38.1}
result> SensorReading{id='sensor_1', timestamp=1547718207, temperature=36.3} result> SensorReading{id='sensor_1', timestamp=1547718207, temperature=36.3}
result> SensorReading{id='sensor_1', timestamp=1547718212, temperature=37.1}

reduce

Reduce适用于更加一般化的聚合操作场景。java中需要实现ReduceFunction函数式接口。
在前面Rolling Aggregation的前提下,对需求进行修改。获取同组历史温度最高的传感器信息,同时要求实时更新其时间戳信息。

输出

支持的数据类型

Flink流应用程序处理的是以数据对象表示的事件流。所以在Flink内部,我们需要能够处理这些对象。它们需要被序列化和反序列化,以便通过网络传送它们;或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点,Flink需要明确知道应用程序所处理的数据类型。Flink使用类型信息的概念来表示数据类型,并为每个数据类型生成特定的序列化器、反序列化器和比较器。
Flink还具有一个类型提取系统,该系统分析函数的输入和返回类型,以自动获取类型信息,从而获得序列化器和反序列化器。但是,在某些情况下,例如lambda函数或泛型类型,需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。
Flink支持Java和Scala中所有常见数据类型。使用最广泛的类型有以下几种。

基础数据类型

Flink支持所有的Java和Scala基础数据类型,Int, Double, Long, String, …

Java和Scala元组(Tuples)

java不像Scala天生支持元组Tuple类型,java的元组类型由Flink的包提供,默认提供Tuple0~Tuple25

Scala样例类(case classes)

case class Person(name:String,age:Int) val numbers: DataStream[(String,Integer)] = env.fromElements( Person("张三",12), Person("李四",23) )

Java简单对象(POJO)

java的POJO这里要求必须提供无参构造函数
  • 成员变量要求都是public(或者private但是提供get、set方法)

其他(Arrays, Lists, Maps, Enums,等等)

Flink对Java和Scala中的一些特殊目的的类型也都是支持的,比如Java的ArrayList,HashMap,Enum等等。
上一篇
flowable获取下一步会创建的用户任务
下一篇
Kafka实现oracle的CDC数据实时变更

评论
Loading...