type
status
date
slug
summary
tags
category
icon
password
步骤
实验一:
linux:
1.hadoop,zookeeper和kafka启动
2.创建flink需要的存档点
更具代码的路径创建对应的目录
3.创建对应需要的kafka主题
4.需要两个窗口界面:
1).窗口1:
2).窗口2:
win:
运行
KafkaEOSDemo
代码结果:
实验二:
只需要在实验一的基础上修改kafkf消费主题命令就行
linux
win
运行
KafkaEOSDemo2
代码结果:
代码
KafkaEOSDemo
KafkaEOSDemo2
pom.xml
文档
KafkaEOSDemo
1. 创建环境
这行代码创建了一个Flink执行环境,它是所有Flink程序的开始。
2. 启用检查点
这行代码启用了检查点,并设置了检查点的模式为精准一次。检查点间隔为5000毫秒。
3. 设置检查点存储路径
这行代码设置了检查点的存储路径,这里我们选择了HDFS作为存储介质。
4. 从Kafka读取数据
- 调用
setBootstrapServers(hadoop102:9092,hadoop103:9092,hadoop104:9092)
设置Kafka的服务器地址,也就是Kafka broker的地址。
- 调用
setGroupId(test)
设置消费者组ID,Kafka使用消费者组来区分不同的消费者。
- 调用
setTopics(topic_1)
设置要订阅的Kafka主题。
- 调用
setValueOnlyDeserializer(new SimpleStringSchema())
设置反序列化器,这里使用的是简单字符串反序列化器。
- 调用
setStartingOffsets(OffsetsInitializer.latest())
设置起始偏移量,这里设置为从最新的数据开始消费。
- 调用
build()
构建KafkaSource实例。
5. 将数据写入到另一个Kafka主题
- 同KafkaSource的
setBootstrapServers
。
setTopic(ws)
:设置要写入的Kafka主题。
setValueSerializationSchema(new SimpleStringSchema())
:设置序列化器,这里我们使用的是简单字符串序列化器。
setRecordSerializer(...)
:设置记录序列化器,用于将Flink的数据记录序列化为Kafka可以接受的格式。
setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
:设置交付保证,这里我们设置为精确一次,也就是说每条记录将被精确地写入一次。
setTransactionalIdPrefix(prefix-)
:设置事务ID前缀,这是用于Kafka事务的标识。
setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + )
:设置事务超时时间,这是Kafka事务的一个重要参数,它决定了一个事务可以保持打开状态的最长时间。间。
build()
:构建KafkaSink实例。
KafkaEOSDemo2
1. 创建环境
同KafkaEOSDemo。
2. 从Kafka读取数据
setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
:设置Kafka的服务器地址,也就是Kafka broker的地址。
setGroupId("test")
:设置消费者组ID,Kafka使用消费者组来区分不同的消费者。
setTopics("ws")
:设置要订阅的Kafka主题。
setValueOnlyDeserializer(new SimpleStringSchema())
:设置反序列化器,这里我们使用的是简单字符串反序列化器。
setStartingOffsets(OffsetsInitializer.latest())
:设置起始偏移量,这里我们设置为从最新的数据开始消费。
setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
:设置事务的隔离级别为读已提交,这是Kafka事务的一个重要配置。读已提交的隔离级别表示消费者在读取数据时,只能读取到已经被提交的事务。
build()
:构建KafkaSource实例。
- Author:YXH1024
- URL:http://bk.yxh666.top/article/e5b1ce05-51ec-4c7b-8bf5-74af88160f0e
- Copyright:All articles in this blog, except for special statements, adopt BY-NC-SA agreement. Please indicate the source!