目录
一、Flink SQL 开发步骤概述
二、简单 Demo 演示剖析
FlinkSQL-API :
三、Flink SQL - Kafka To Kafka 实战
四、Flink SQL - Kafka To MySQL 深度解析
五、Flink SQL - Read MySQL 深度解析
六、总结
在大数据处理领域,Apache Flink 凭借其强大的流批一体处理能力备受青睐,而 Flink SQL 更是为开发者提供了高效便捷的数据处理方式,能以类 SQL 的语法轻松应对复杂的数据场景。今天,就让我们深入探究 Flink SQL 的开发步骤、核心概念以及常见的使用案例,包括与 Kafka 和 MySQL 的联动操作。
添加依赖:
要开启 Flink SQL 之旅,第一步便是添加合适的依赖。在 Flink 生态中,涉及到 DataStream 与表相互转换,有两种常见方案(此处暂不详述具体方案内容,后续结合代码深入讲解),不同方案对应不同的依赖需求与处理逻辑,这是搭建 Flink SQL 项目架构基础。
DataStream 与表的转换
DataStream -> 表:
存在多种实现途径,两种方案各有优劣与适用场景,关乎后续数据处理能否高效进行。比如在面对不同的数据结构、实时性要求时,灵活选择合适方案能减少不必要的性能损耗与代码复杂度。
第一种方案:
第二种方案:
表 -> DataStream:
此转换也需依据业务需求,考虑数据更新特性来抉择合适的转换方法,像 与 方法就有着截然不同的功能侧重。
查询操作:
支持 Table 风格(DSL 风格)和 SQL 风格查询。SQL 风格对于熟悉传统数据库 SQL 语法开发者来说上手极快,能凭借过往经验快速编写查询逻辑;而 Table 风格(DSL 风格)在与 Flink 生态深度融合上更具优势,可利用其提供的函数、算子精细控制数据处理流程,实现复杂业务逻辑。
Table风格/DSL风格
SQL风格
在实际演示中,我们会发现 DataStream 里若为 Row 类型,打印格式由 Row 类 toString 方法决定,并且输出标识有特殊含义,像 “+I” 表示新增数据。最初编写代码时,使用 方法可能遇到分组操作受限问题,因为它只适用于单纯生成新计算结果且不修改老结果场景。当业务需求涉及对已有计算结果更新时,就得切换到 方法,其返回 类型,其中布尔值为 true 代表新增、更新,false 代表遗弃,对应输出标识分别为 “+U”(更新后)、“-U”(更新前),这样精细的状态标识助力开发者清晰把控数据动态变化。
因为DataStream中是Row 类型,所以打印的格式是Row 这个类中的toString方法决定的。这个地方的 +I 的意思是新增的数据。
因为我们经常编写flinksql,所以创建一个模板:
根据这个可以做一个单词统计的案例:
第一版代码:
报错:
解决方案:
//DataStream<Row> resultStream = tEnv.toAppendStream(tableResult, Row.class);
修改为:
DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(tableResult, Row.class);
第二版代码:
toAppendStream: 适用于生成新的计算结果,并不会对老的计算结果进行修改。使用这个SQL语句中是不能出现分组的。
toRetractStream : 适用于对已经计算的结果进行更新,如果是true 代表新增,更新 false 代表遗弃
+ I 表示新增
-U 更新前
+U 更新后
需求: 使用SQL和Table(DSL)两种方式对DataStream中的单词进行统计
以上代码,作为了解,DataStream需要变为Table,计算结果还要变为DataStream,过于麻烦了。
需求: 从Kafka的topic1中消费数据并过滤出状态为success的数据再写入到Kafka的topic2
代码实现
//nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/
注意:如果最后一句写了:
其实会进行报错,这句话其实已经没有用处了,但是这个错误不影响最终的结果。
将最后一句env.execute(); 删除即可。
假如报以下错误:说明没有导入json的包:
需要导入这个包:
这个jar包可以在此处找到:
测试一下:
先启动生产者,
再启动消费者:
从上面可以看到,生产者向topic1中发送数据,topic2中只有success数据。
需求: 从Kafka的topic1中消费数据并过滤出状态为success的数据再写入到MySQL
在mysql中需要有一个数据库test1 ,需要由一个表 t_success
代码实现
代码实现