Flink SQL 实战:从基础开发到 Kafka 与 MySQL 交互

   日期:2024-12-11     作者:g22go       评论:0    移动:http://mip.riyuangf.com/mobile/news/6984.html
核心提示:目录 一、Flink SQL 开发步骤概述 二、简单 Demo 演示剖析 FlinkSQL-API : 三、Flink SQL - Kafka To Kafka 实战 四、Fl

目录

一、Flink SQL 开发步骤概述

二、简单 Demo 演示剖析

FlinkSQL-API

三、Flink SQL - Kafka To Kafka 实战

四、Flink SQL - Kafka To MySQL 深度解析

 五、Flink SQL - Read MySQL 深度解析

六、总结


        在大数据处理领域,Apache Flink 凭借其强大的流批一体处理能力备受青睐,而 Flink SQL 更是为开发者提供了高效便捷的数据处理方式,能以类 SQL 的语法轻松应对复杂的数据场景。今天,就让我们深入探究 Flink SQL 的开发步骤、核心概念以及常见的使用案例,包括与 Kafka 和 MySQL 的联动操作。

添加依赖

        要开启 Flink SQL 之旅,第一步便是添加合适的依赖。在 Flink 生态中,涉及到 DataStream 与表相互转换,有两种常见方案(此处暂不详述具体方案内容,后续结合代码深入讲解,不同方案对应不同的依赖需求与处理逻辑,这是搭建 Flink SQL 项目架构基础。

 

DataStream 与表的转换

DataStream -> 表

        存在多种实现途径,两种方案各有优劣与适用场景,关乎后续数据处理能否高效进行。比如在面对不同的数据结构、实时性要求时,灵活选择合适方案能减少不必要的性能损耗与代码复杂度。

第一种方案

第二种方案

表 -> DataStream

        此转换也需依据业务需求,考虑数据更新特性来抉择合适的转换方法,像  与  方法就有着截然不同的功能侧重。

查询操作

        支持 Table 风格(DSL 风格)和 SQL 风格查询。SQL 风格对于熟悉传统数据库 SQL 语法开发者来说上手极快,能凭借过往经验快速编写查询逻辑;而 Table 风格(DSL 风格)在与 Flink 生态深度融合上更具优势,可利用其提供的函数、算子精细控制数据处理流程,实现复杂业务逻辑。

Table风格/DSL风格

SQL风格

        在实际演示中,我们会发现 DataStream 里若为 Row 类型,打印格式由 Row 类 toString 方法决定,并且输出标识有特殊含义,像 “+I” 表示新增数据。最初编写代码时,使用  方法可能遇到分组操作受限问题,因为它只适用于单纯生成新计算结果且不修改老结果场景。当业务需求涉及对已有计算结果更新时,就得切换到  方法,其返回  类型,其中布尔值为 true 代表新增、更新,false 代表遗弃,对应输出标识分别为 “+U”(更新后)、“-U”(更新前,这样精细的状态标识助力开发者清晰把控数据动态变化。

 

        因为DataStream中是Row 类型,所以打印的格式是Row 这个类中的toString方法决定的。这个地方的 +I 的意思是新增的数据。

因为我们经常编写flinksql,所以创建一个模板

 

根据这个可以做一个单词统计的案例

第一版代码

 

报错: 

 

解决方案

//DataStream<Row> resultStream = tEnv.toAppendStream(tableResult, Row.class);

修改为

DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(tableResult, Row.class);

第二版代码

 

toAppendStream: 适用于生成新的计算结果,并不会对老的计算结果进行修改。使用这个SQL语句中是不能出现分组的。

toRetractStream : 适用于对已经计算的结果进行更新,如果是true 代表新增,更新 false 代表遗弃

+ I 表示新增

-U 更新前

+U 更新后

需求: 使用SQL和Table(DSL)两种方式对DataStream中的单词进行统计

 
 

        以上代码,作为了解,DataStream需要变为Table,计算结果还要变为DataStream,过于麻烦了。

需求: 从Kafka的topic1中消费数据并过滤出状态为success的数据再写入到Kafka的topic2

 

代码实现

//nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/

 

 

注意:如果最后一句写了

 

其实会进行报错,这句话其实已经没有用处了,但是这个错误不影响最终的结果。

将最后一句env.execute(); 删除即可。

假如报以下错误说明没有导入json的包

 

需要导入这个包

 

这个jar包可以在此处找到

测试一下

先启动生产者

 

再启动消费者

 

从上面可以看到,生产者向topic1中发送数据,topic2中只有success数据。

需求: 从Kafka的topic1中消费数据并过滤出状态为success的数据再写入到MySQL

 

在mysql中需要有一个数据库test1 ,需要由一个表 t_success

 

代码实现

 

代码实现

 

 
特别提示:本信息由相关用户自行提供,真实性未证实,仅供参考。请谨慎采用,风险自负。

举报收藏 0打赏 0评论 0
 
更多>同类最新资讯
0相关评论

相关文章
最新文章
推荐文章
推荐图文
最新资讯
点击排行
{
网站首页  |  关于我们  |  联系方式  |  使用协议  |  隐私政策  |  版权隐私  |  网站地图  |  排名推广  |  广告服务  |  积分换礼  |  网站留言  |  RSS订阅  |  违规举报  |  鄂ICP备2020018471号