type
status
date
slug
summary
tags
category
icon
password
AI 摘要
股票平均价格计算
代码详解
这四个类一起实现了对股票平均价格的计算处理,并展示了Apache Flink在实时流处理中的应用。
类名 | 功能 |
Stock | 表示股票交易数据的实体类 |
StockStream | 创建流处理环境,读取股票交易数据,并计算每分钟股票平均价格的类 |
ResultWindowFunction | 汇总每分钟内股票平均价格的计算结果,并输出为字符串的类 |
AvgStockAggregateFunction | 计算每个股票代码在每分钟内的平均交易价格的类 |
Stock
股票价格的实体类
根据数据来看:
US2.AAPL,20200108,093003,297.260000000,100
可以分析为
股票名字 | 交易日期 | 交易时间 | 股票价格 | 股票数量 |
US2.AAPL | 20200108 | 093003 | 297.260000000 | 100 |
那么定义一个股票实体类
StockStream
首先,定义了一个名为
StockStream
的类,并在该类中定义了一个 main
方法作为程序的入口点。在
main
方法中,首先获取了一个 StreamExecutionEnvironment
对象,这是 Flink 流处理的上下文环境。然后,使用
readTextFile
方法从一个名为 "input/stock.txt" 的文本文件中读取数据,返回一个 DataStreamSource<String>
对象。接着,使用
map
方法将每一行字符串转换为 Stock
对象,并使用 assignTimestampsAndWatermarks
方法为每个事件分配时间戳和水印。然后,使用
keyBy
方法按照股票代码进行分组,返回一个 KeyedStream<Stock, String>
对象。接着,使用
window
方法定义了一个滚动窗口,窗口大小为 60 秒,然后使用 aggregate
方法聚合窗口内的数据,这里传入了 AvgStockAggregateFunction
和 ResultWindowFunction
两个函数,前者用于计算平均价格,后者用于格式化输出。最后,使用
print
方法将结果打印到控制台,然后调用 StreamExecutionEnvironment
的 execute
方法启动流处理。parseTimeToSeconds
是一个辅助方法,用于将形式为 "HHmmss" 的时间字符串解析为一天中的秒数。这个程序的主要目的是读取股票数据,按照股票代码进行分组,然后在每个 60 秒的窗口内计算每种股票的平均价格,并打印到控制台。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
:获取 Flink 流处理的环境。
env.setParallelism(1);
:这是设置并行度为 1,设置在一个线程中或者后续输出到一个同一个文件中。
DataStreamSource<String> source = env.readTextFile("input/stock.txt");
:这是从一个名为 "input/stock.txt" 的文本文件中读取数据,返回一个DataStreamSource<String>
对象。每一行字符串就是一个数据项。
DataStream<Stock> stockStream = source.map(...)
:这是将每一行字符串转换为Stock
对象。转换的逻辑是先将字符串按照逗号分割,然后取出各个字段的值,创建Stock
对象。
.assignTimestampsAndWatermarks(WatermarkStrategy.<Stock>forMonotonousTimestamps()...)
:这是为每个事件分配时间戳和水印。时间戳是事件的发生时间,水印是用于处理事件时间乱序的机制。
KeyedStream<Stock, String> keyedStream = stockStream.keyBy((KeySelector<Stock, String>) Stock::getStockCode);
:这是按照股票代码进行分组,返回一个KeyedStream<Stock, String>
对象。每个股票代码对应的所有Stock
对象会被分到同一个组。
SingleOutputStreamOperator<String> result = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(60)))...
:这是定义了一个滚动窗口,窗口大小为 60 秒,然后聚合窗口内的数据。这里传入了AvgStockAggregateFunction
和ResultWindowFunction
两个函数,前者用于计算平均价格,后者用于格式化输出。
result.print();
:将结果打印到控制台。
result.writeAsText("output/avgStock.txt")
:将平均价格写入到avgStock.txt文件内。
private static long parseTimeToSeconds(String time) {...}
:这是一个辅助方法,用于将形式为 "HHmmss" 的时间字符串解析为一天中的秒数。这个方法被assignTimestampsAndWatermarks
方法调用,用于分配时间戳。
ResultWindowFunction
定义了一个名为
ResultWindowFunction
的类,这个类实现了 WindowFunction
接口。接口的四个类型参数分别代表:
输入类型 | 输出类型 | 键类型 | 窗口类型 |
Double | String | String | TimeWindow |
在这个类中,实现了 apply 方法,这是窗口函数的核心方法,它会在每个窗口结束时被调用。方法参数包括:
键 | 窗口 | 输入数据 | 输出收集器 |
s | window | input (Double 类型迭代器) | out |
在
apply
方法中,首先获取了输入数据的第一个元素作为平均价格,然后通过输出收集器 out
发出一个格式化的字符串,这个字符串包含了股票代码和平均价格。
AvgStockAggregateFunction
这个 代码定义了一个名为
AvgStockAggregateFunction
的聚合函数,该函数用于计算股票的平均价格。首先,这个类实现了 AggregateFunction 接口,这个接口有三个类型参数:
输入类型 | 累加器类型 | 输出类型 |
Stock | Tuple2<Double, Integer> | Double |
在这个类中,实现了
AggregateFunction
接口的五个方法:createAccumulator()
:创建一个新的累加器,这里是一个包含两个元素的元组,第一个元素是总价格(初始化为 0.0),第二个元素是总数量(初始化为 0)。
add(Stock value, Tuple2<Double, Integer> accumulator)
:将输入数据添加到累加器,这里是将股票的价格乘以数量加到总价格上,将股票的数量加到总数量上。
getResult(Tuple2<Double, Integer> accumulator)
:从累加器获取结果,这里是计算平均价格,即总价格除以总数量。
merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b)
:合并两个累加器,这里是将两个累加器的总价格和总数量分别相加。
流程文档
本文主要介绍如何使用Apache Flink的AggregateFunction接口实现对股票平均价格的计算。整个流程包括读取股票交易数据,对数据进行处理和聚合,最后输出每个股票代码在每分钟内的平均交易价格。
1. 创建流处理环境
首先,我们需要创建一个StreamExecutionEnvironment对象,这是所有Flink程序的基础。然后,设置并行度为1,表示程序的并行级别。
2. 读取数据
使用env.readTextFile方法从文本文件中读取数据。这里我们假设数据文件名为"input/stock.txt"。
3. 数据处理
接着,使用map函数将读取的每行数据转换为Stock对象,并使用assignTimestampsAndWatermarks方法分配时间戳和水印。其中,我们使用了WatermarkStrategy.forMonotonousTimestamps策略,表示事件时间戳是单调递增的。
4. 数据分区
使用keyBy方法按照股票代码进行分区,这样每个股票代码的数据都会被分到同一个分区进行处理。
5. 数据聚合
在每个分区上,我们定义一个滑动窗口,窗口大小为60秒,然后使用AggregateFunction进行聚合计算。我们定义了一个AvgStockAggregateFunction,用于计算每个股票代码的平均价格。
6. 结果输出
最后,我们将计算结果打印出来,并执行任务。
以上就是使用AggregateFunction计算股票平均价格的整个流程。通过这个流程,我们可以实时计算每个股票代码在每分钟内的平均交易价格,为股票交易提供有价值的信息。
- Author:YXH1024
- URL:http://bk.yxh666.top/article/88e80e1a-f87e-4b24-bdbe-6743c0bf10a3
- Copyright:All articles in this blog, except for special statements, adopt BY-NC-SA agreement. Please indicate the source!