type
status
date
slug
summary
tags
category
icon
password

步骤

实验一:

linux:

1.hadoop,zookeeper和kafka启动
2.创建flink需要的存档点
更具代码的路径创建对应的目录
3.创建对应需要的kafka主题
4.需要两个窗口界面: 1).窗口1:
2).窗口2:
 

win:

运行KafkaEOSDemo 代码
 

结果:

notion image

实验二:

只需要在实验一的基础上修改kafkf消费主题命令就行

linux

win

运行KafkaEOSDemo2 代码
 

结果:

notion image
 

代码

KafkaEOSDemo

KafkaEOSDemo2

 

pom.xml

文档

 

KafkaEOSDemo

1. 创建环境

这行代码创建了一个Flink执行环境,它是所有Flink程序的开始。

2. 启用检查点

这行代码启用了检查点,并设置了检查点的模式为精准一次。检查点间隔为5000毫秒。

3. 设置检查点存储路径

这行代码设置了检查点的存储路径,这里我们选择了HDFS作为存储介质。

4. 从Kafka读取数据

 
  1. 调用setBootstrapServers(hadoop102:9092,hadoop103:9092,hadoop104:9092)设置Kafka的服务器地址,也就是Kafka broker的地址。
  1. 调用setGroupId(test)设置消费者组ID,Kafka使用消费者组来区分不同的消费者。
  1. 调用setTopics(topic_1)设置要订阅的Kafka主题。
  1. 调用setValueOnlyDeserializer(new SimpleStringSchema())设置反序列化器,这里使用的是简单字符串反序列化器。
  1. 调用setStartingOffsets(OffsetsInitializer.latest())设置起始偏移量,这里设置为从最新的数据开始消费。
  1. 调用build()构建KafkaSource实例。
 

5. 将数据写入到另一个Kafka主题

 
  1. 同KafkaSource的setBootstrapServers
  1. setTopic(ws):设置要写入的Kafka主题。
  1. setValueSerializationSchema(new SimpleStringSchema()):设置序列化器,这里我们使用的是简单字符串序列化器。
  1. setRecordSerializer(...):设置记录序列化器,用于将Flink的数据记录序列化为Kafka可以接受的格式。
  1. setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE):设置交付保证,这里我们设置为精确一次,也就是说每条记录将被精确地写入一次。
  1. setTransactionalIdPrefix(prefix-):设置事务ID前缀,这是用于Kafka事务的标识。
  1. setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + ):设置事务超时时间,这是Kafka事务的一个重要参数,它决定了一个事务可以保持打开状态的最长时间。间。
  1. build():构建KafkaSink实例。

KafkaEOSDemo2

1. 创建环境

同KafkaEOSDemo。

2. 从Kafka读取数据

 
  1. setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092"):设置Kafka的服务器地址,也就是Kafka broker的地址。
  1. setGroupId("test"):设置消费者组ID,Kafka使用消费者组来区分不同的消费者。
  1. setTopics("ws"):设置要订阅的Kafka主题。
  1. setValueOnlyDeserializer(new SimpleStringSchema()):设置反序列化器,这里我们使用的是简单字符串反序列化器。
  1. setStartingOffsets(OffsetsInitializer.latest()):设置起始偏移量,这里我们设置为从最新的数据开始消费。
  1. setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"):设置事务的隔离级别为读已提交,这是Kafka事务的一个重要配置。读已提交的隔离级别表示消费者在读取数据时,只能读取到已经被提交的事务。
  1. build():构建KafkaSource实例。
 
大数据面试题汇总Hive的安装
Loading...
YXH1024
YXH1024
一个普通的干饭人🍚
Announcement
🎉YXH1024公告🎉
👏欢迎您的到来👏
我会带这里分享我的技术