Flink SQL kafka连接器

版本说明

Flink和kafka的版本号有一定的匹配关系,操作成功的版本:

  • Flink1.17.1
  • kafka_2.12-3.3.1

添加kafka连接器依赖

将flink-sql-connector-kafka-1.17.1.jar上传到flink的lib目录下

下载flink-sql-connector-kafka连接器jar包

https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/1.17.1

上传到flink的lib目录下

[hadoop@node2 ~]$ cp flink-connector-kafka-1.17.1.jar $FLINK_HOME/lib

分发flink-connector-kafka-1.17.1.jar

xsync $FLINK_HOME/lib/flink-connector-kafka-1.17.1.jar

启动yarn-session

[hadoop@node2 ~]$ myhadoop.sh start
[hadoop@node2 ~]$ yarn-session.sh -d

启动kafka集群

[hadoop@node2 ~]$ zk.sh start
[hadoop@node2 ~]$ kf.sh start

创建kafka主题

查看主题
[hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --list
​
如果没有ws1,则创建
[hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --create --replication-factor 1 --partitions 1 --topic ws1
​

普通Kafka表

'connector' = 'kafka'

进入Flink SQL客户端

[hadoop@node2 ~]$ sql-client.sh embedded -s yarn-session
...
省略若干日志输出
...
Flink SQL> 

创建Kafka的映射表

CREATE TABLE t1( 
  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL,
id int, 
ts bigint , 
vc int )
WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094',
  'properties.group.id' = 'test',
-- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
  'scan.startup.mode' = 'earliest-offset',
  -- fixed为flink实现的分区器,一个并行度只写往kafka一个分区
'sink.partitioner' = 'fixed',
  'topic' = 'ws1',
  'format' = 'json'
);

可以往kafka读数据,也可以往kafka写数据。

插入数据到Kafka表

如果没有source表,先创建source表,如果source表存在则不需要再创建。

CREATE TABLE source ( 
    id INT, 
    ts BIGINT, 
    vc INT
) WITH ( 
    'connector' = 'datagen', 
    'rows-per-second'='1', 
    'fields.id.kind'='random', 
    'fields.id.min'='1', 
    'fields.id.max'='10', 
    'fields.ts.kind'='sequence', 
    'fields.ts.start'='1', 
    'fields.ts.end'='1000000', 
    'fields.vc.kind'='random', 
    'fields.vc.min'='1', 
    'fields.vc.max'='100'
);

把source表插入t1表

insert into t1(id,ts,vc) select * from source;

如果报错

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer

依然同样错误,还不行,把kafka libs目录下的kafka-clients-3.3.1.jar,把jar包发到Flink的lib目录,同时也注意重启sql-client、yarn-session也要重启(重要)

cp $KAFKA_HOME/libs/kafka-clients-3.3.1.jar $FLINK_HOME/lib

查看是否复制成功

$ ls $FLINK_HOME/lib

重启sql-client重新操作,成功如下:

Flink SQL> CREATE TABLE t1( 
>   `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
>   --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读
>   `partition` BIGINT METADATA VIRTUAL,
>   `offset` BIGINT METADATA VIRTUAL,
> id int, 
> ts bigint , 
> vc int )
> WITH (
>   'connector' = 'kafka',
>   'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094',
>   'properties.group.id' = 'test',
> -- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
>   'scan.startup.mode' = 'earliest-offset',
>   -- fixed为flink实现的分区器,一个并��度只写往kafka一个分区
> 'sink.partitioner' = 'fixed',
>   'topic' = 'ws1',
>   'format' = 'json'
> );
[INFO] Execute statement succeed.
​
Flink SQL> CREATE TABLE source ( 
>     id INT, 
>     ts BIGINT, 
>     vc INT
> ) WITH ( 
>     'connector' = 'datagen', 
>     'rows-per-second'='1', 
>     'fields.id.kind'='random', 
>     'fields.id.min'='1', 
>     'fields.id.max'='10', 
>     'fields.ts.kind'='sequence', 
>     'fields.ts.start'='1', 
>     'fields.ts.end'='1000000', 
>     'fields.vc.kind'='random', 
>     'fields.vc.min'='1', 
>     'fields.vc.max'='100'
> );
[INFO] Execute statement succeed.
​
Flink SQL> insert into t1(id,ts,vc) select * from source;2024-06-14 10:45:30,125 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2024-06-14 10:45:30,673 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at node3/192.168.193.143:8032
2024-06-14 10:45:31,027 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2024-06-14 10:45:31,227 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node3:41749 of application 'application_1718331886020_0001'.
insert into t1(id,ts,vc) select * from source;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: b1765f969c3ae637bd4c8100efbb0c4e
​

查询Kafka表

select * from t1;

报错

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.ConsumerRecord

​

重启yarn session,重新操作,成功如下:

Flink SQL> CREATE TABLE t1( 
>   `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
>   --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读
>   `partition` BIGINT METADATA VIRTUAL,
>   `offset` BIGINT METADATA VIRTUAL,
> id int, 
> ts bigint , 
> vc int )
> WITH (
>   'connector' = 'kafka',
>   'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094',
>   'properties.group.id' = 'test',
> -- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
>   'scan.startup.mode' = 'earliest-offset',
>   -- fixed为flink实现的分区器,一个并??度只写往kafka一个分区
> 'sink.partitioner' = 'fixed',
>   'topic' = 'ws1',
>   'format' = 'json'
> );
[INFO] Execute statement succeed.
​
Flink SQL> CREATE TABLE source ( 
>     id INT, 
>     ts BIGINT, 
>     vc INT
> ) WITH ( 
>     'connector' = 'datagen', 
>     'rows-per-second'='1', 
>     'fields.id.kind'='random', 
>     'fields.id.min'='1', 
>     'fields.id.max'='10', 
>     'fields.ts.kind'='sequence', 
>     'fields.ts.start'='1', 
>     'fields.ts.end'='1000000', 
>     'fields.vc.kind'='random', 
>     'fields.vc.min'='1', 
>     'fields.vc.max'='100'
> );
[INFO] Execute statement succeed.
​
Flink SQL> insert into t1(id,ts,vc) select * from source;2024-06-14 11:22:17,971 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2024-06-14 11:22:18,422 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at node3/192.168.193.143:8032
2024-06-14 11:22:18,895 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2024-06-14 11:22:19,052 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node4:38788 of application 'application_1718331886020_0002'.
insert into t1(id,ts,vc) select * from source;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 84292f84d1fce4756ccd8ae294b6163a
​
​
Flink SQL> select * from t1;2024-06-14 11:23:38,338 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2024-06-14 11:23:38,606 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at node3/192.168.193.143:8032
2024-06-14 11:23:38,617 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2024-06-14 11:23:38,649 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node4:38788 of application 'application_1718331886020_0002'.
select * from t1;
[INFO] Result retrieval cancelled.
​
Flink SQL> 
​

 

upsert-kafka表

'connector' = 'upsert-kafka'

如果当前表存在更新操作,那么普通的kafka连接器将无法满足,此时可以使用Upsert Kafka连接器。

创建upsert-kafka的映射表(必须定义主键)

CREATE TABLE t2( 
    id int , 
    sumVC int ,
    primary key (id) NOT ENFORCED 
)
WITH (
  'connector' = 'upsert-kafka',
  'properties.bootstrap.servers' = 'node2:9092',
  'topic' = 'ws2',
  'key.format' = 'json',
  'value.format' = 'json'
);

如果没有kafka名为ws2的topic,将自动被创建。

插入upsert-kafka表

insert into t2 select id,sum(vc) sumVC  from source group by id;

查询upsert-kafka表

upsert-kafka 无法从指定的偏移量读取,只会从主题的源读取。如此,才知道整个数据的更新过程。并且通过 -U,+U,+I 等符号来显示数据的变化过程。

设置显示模式

SET sql-client.execution.result-mode=tableau;

 查询t2表数据

select * from t2;

如果发现没有输出数据,原因是之前的source表已经生成到end(1000000)就不再生成数据了。

进入Flink Web UI,cancel掉所有running job,重新操作成功如下:

删除表

Flink SQL> show tables;
+------------+
| table name |
+------------+
|     source |
|         t1 |
|         t2 |
+------------+
3 rows in set
​
Flink SQL> drop table source;
Flink SQL> drop table t1;
Flink SQL> drop table t2;

创建表

CREATE TABLE source ( 
    id INT, 
    ts BIGINT, 
    vc INT
) WITH ( 
    'connector' = 'datagen', 
    'rows-per-second'='1', 
    'fields.id.kind'='random', 
    'fields.id.min'='1', 
    'fields.id.max'='10', 
    'fields.ts.kind'='sequence', 
    'fields.ts.start'='1', 
    'fields.ts.end'='1000000', 
    'fields.vc.kind'='random', 
    'fields.vc.min'='1', 
    'fields.vc.max'='100'
);
CREATE TABLE t2( 
    id int , 
    sumVC int ,
    primary key (id) NOT ENFORCED 
)
WITH (
  'connector' = 'upsert-kafka',
  'properties.bootstrap.servers' = 'node2:9092',
  'topic' = 'ws2',
  'key.format' = 'json',
  'value.format' = 'json'
);

设置显示模式

SET sql-client.execution.result-mode=tableau;

查询表

select * from t2;

 

完成!enjoy it!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/781960.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

python实现接口自动化

代码实现自动化相关理论 代码编写脚本和工具实现脚本区别是啥? 代码: 优点:代码灵活方便缺点:学习成本高 工具: 优点:易上手缺点:灵活度低,有局限性。 总结: 功能脚本:工…

找不到x3daudio1_7.dll怎么修复?一招搞定x3daudio1_7.dll丢失问题

当你的电脑突然弹出提示,“找不到x3daudio1_7.dll”,这时候你就需要警惕了。这往往意味着你的电脑中的程序出现了问题,你可能会发现自己无法打开程序,或者即便打开了程序也无法正常使用。因此,接下来我们要一起学习一下…

07浅谈大语言模型可调节参数tempreture

浅谈temperature 什么是temperature? temperature是大预言模型生成文本时常用的两个重要参数。它的作用体现在控制模型输出的确定性和多样性: 控制确定性: temperature参数可以控制模型生成文本的确定性,大部分模型中temperatur…

1、Java入门(cmd使用)+ jdk的配置

文章目录 前言一、常见的CMD命令1 盘符+冒号:D:---- 切换到D盘根目录下(注意要英文冒号才行)2 查看目录下内容dir --- 查看当前目录下的所有内容(包括文件夹、各种文件、exe程序、隐藏文件等所有都会查看到)dir 目录名(或路径)3 cd 目录(或者路径)--- 进入到指定目录…

探索人工智能在电子商务平台与游戏发行商竞争中几种应用方式

过去 12 年来,电脑和视频游戏的发行策略发生了巨大变化。数字游戏的销量首次超过实体游戏的销量 在20132020 年的封锁进一步加速了这一趋势。例如,在意大利,封锁的第一周导致数字游戏下载量 暴涨174.9%. 展望未来,市场有望继续增…

【若依前后端分离】通过输入用户编号自动带出部门名称(部门树)

一、部门树 使用 <treeselect v-model"form.deptId" :options"deptOptions" :show-count"true" placeholder"请选择归属部门"/> <el-col :span"12"><el-form-item label"归属部门" prop"dept…

QT5.14.2与Mysql8.0.16配置笔记

1、前言 我的QT版本为 qt-opensource-windows-x86-5.14.2。这是QT官方能提供的自带安装包的最近版本&#xff0c;更新的版本需要自己编译源代码&#xff0c;可点击此链接进行下载&#xff1a;Index of /archive/qt/5.14/5.14.2&#xff0c;选择下载 qt-opensource-windows-x86…

【机器学习】基于线性回归的医疗费用预测模型

文章目录 一、线性回归定义和工作原理假设表示 二、导入库和数据集矩阵表示可视化 三、成本函数向量的内积 四、正态方程五、探索性数据分析描述性统计检查缺失值数据分布图相关性热图保险费用分布保险费用与性别和吸烟情况的关系保险费用与子女数量的关系保险费用与地区和性别…

Halcon 铣刀刀口破损缺陷检测

一 OTSU OTSU&#xff0c;是一种自适应阈值确定的方法,又叫大津法&#xff0c;简称OTSU&#xff0c;是一种基于全局的二值化算法,它是根据图像的灰度特性,将图像分为前景和背景两个部分。当取最佳阈值时&#xff0c;两部分之间的差别应该是最大的&#xff0c;在OTSU算法中所采…

张量分解(2)——张量运算(内积、外积、直积、范数)

&#x1f345; 写在前面 &#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;这里是hyk写算法了吗&#xff0c;一枚致力于学习算法和人工智能领域的小菜鸟。 &#x1f50e;个人主页&#xff1a;主页链接&#xff08;欢迎各位大佬光临指导&#xff09; ⭐️近…

Stream流真的很好,但答应我别用toMap()

你可能会想&#xff0c;toList 和 toSet 都这么便捷顺手了&#xff0c;当又怎么能少得了 toMap() 呢。 答应我&#xff0c;一定打消你的这个想法&#xff0c;否则这将成为你噩梦的开端。 让我们先准备一个用户实体类。 Data AllArgsConstructor public class User { priv…

【C#】函数方法、属性分文件编写

1.思想 分文件编写是面向对象编程的重要思想&#xff0c;没有实际项目作为支撑很难理解该思想的精髓&#xff0c;换言之&#xff0c;一两个函数代码量因为太少无法体现分文件编写减少大量重复代码的优势。 2.项目结构介绍 整项目的名称叫AutoMetadata&#xff0c;是一个基于W…

【第三版 系统集成项目管理工程师】第4章 信息系统架构

持续更新。。。。。。。。。。。。。。。 【第三版】系统集成项目管理工程师 考情分析4.1架构基础4.1.1指导思想&#xff08;非重点&#xff09; P1364.1.2设计原则&#xff08;非重点&#xff09; P1364.1.3建设目标&#xff08;非重点&#xff09; P1374.1.4总体框架 P138练习…

【web前端HTML+CSS+JS】--- CSS学习笔记02

一、CSS&#xff08;层叠样式表&#xff09;介绍 1.优势 2.定义解释 如果有多个选择器共同作用的话&#xff0c;只有优先级最高那层样式决定最终的效果 二、无语义化标签 div和span&#xff1a;只起到描述的作用&#xff0c;不带任何样式 三、标签选择器 1.标签/元素选择器…

什么牌子的头戴式蓝牙耳机好性价比高?

说起性价比高的头戴式蓝牙耳机,就不得不提倍思H1s,作为倍思最新推出的新款,在各项功能上都实现了不错的升级,二字开头的价格,配置却毫不含糊, 倍思H1s的音质表现堪称一流。它采用了40mm天然生物纤维振膜,这种振膜柔韧而有弹性,能够显著提升低音的量感。无论是深沉的低音还是清…

Android 10年,35岁,该往哪个方向发力

网上看到个网友发的帖子&#xff0c;觉的这个可能是很多开发人员都会面临和需要思考的问题。 不管怎样&#xff0c; 要对生活保持乐观&#xff0c;生活还是有很多的选择和出路的。 &#xff08;内容来自网络&#xff0c;不代表个人观点&#xff09; 《Android Camera开发入门》…

机器人动力学模型及其线性化阻抗控制模型

机器人动力学模型 机器人动力学模型描述了机器人的运动与所受力和力矩之间的关系。这个模型考虑了机器人的质量、惯性、关节摩擦、重力等多种因素&#xff0c;用于预测和解释机器人在给定输入下的动态行为。动力学模型是设计机器人控制器的基础&#xff0c;它可以帮助我们理解…

element-plus的文件上传组件el-upload

el-upload组件 支持多种风格&#xff0c;如文件列表&#xff0c;图片&#xff0c;图片卡片&#xff0c;支持多种事件&#xff0c;预览&#xff0c;删除&#xff0c;上传成功&#xff0c;上传中等钩子。 file-list&#xff1a;上传的文件集合&#xff0c;一定要用v-model:file-…

基于B/S模式和Java技术的生鲜交易系统

你好呀&#xff0c;我是计算机学姐码农小野&#xff01;如果有相关需求&#xff0c;可以私信联系我。 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;B/S模式、Java技术 工具&#xff1a;Visual Studio、MySQL数据库开发工具 系统展示 首页 用户注册…

RAG综述汇总

第一篇&#xff1a;Retrieval-Augmented Generation for Large Language Models: A Survey(同济/复旦) 论文链接 1.简介 这篇全面的综述论文详细研究了 RAG 范式的发展&#xff0c;包括 Naive RAG、Advanced RAG 和 Modular RAG。介绍了 RAG 框架的三个基础技术&#xff0c;…