nexthexonextbutterflyvolantisyearnyiliashokaindigoapollolandscapecactusmateryicarusfluidmaterial

运行源码

我们将运行1.13.0版本的Flink,其scala环境为2.12

Step1. 获取学习项目

1
git clone https://github.com/fightinggg/flink-src-study.git --recursive

在这个项目中,笔者把flink源码作为了一个git submodule放置于文件夹flink中,用来临时查看,当然我个人不建议看这些代码,因为这个文件夹太大了,IDE都不能很好的处理他。

然后就可以直接运行了

Step2. Enjoy It

现在你可以直接从这里进入flink的控制台http://localhost:8081, 你也可以直接在ideal中调试flink。

Step3. Debug

自己设断点就好了。

使用Flink

下面使用flink-examples-streaming_2.12-1.13.0来演示

执行环境

这个包下的所有的example在main函数的第一行全部首先选择获取环境,代码如下。

1
env = StreamExecutionEnvironment.getExecutionEnvironment();

数据源

当我们获取运行环境以后,紧接着就需要拿到数据源,examples中的各个例子获取数据源的方案如下。

方案 example
从数组获取 1. WordCount
2. WindowWordCount
从文件按行获取 1. WordCount
2. WindowWordCount
从自定义Source获取 1. TopSpeedWindowing
2. KafkaEventsGeneratorJob
从Kafka获取 1. StateMachineExample
从Socket获取 1. SocketWindowWordCount
从集合获取 1. WindowJoin

算子

第一个问题就是:什么是算子?

算子描述了一系列的计算操作,他告诉计算机一个数据应该如何处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
graph LR  
%% style
classDef green fill:#a3e4d7,stroke:#333,stroke-width:1px
classDef blue fill:#d6eaf8,stroke:#333,stroke-width:1px
classDef brown fill:#edbb99,stroke:#333,stroke-width:1px
classDef grey fill:#f2f3f4,stroke:#333,stroke-width:1px

%% point
start((数据源)):::green
op1(算子1):::blue
op2(算子1):::blue
op3(算子1):::blue
op4(算子2):::blue
op5(算子2):::blue
op6(算子2):::blue
output((输出)):::brown
shuffle((shuffle)):::grey

%% edge
start --> op1 & op2 & op3 --- shuffle --> op4 & op5 & op6 --> output

一旦我们有了数据源以后,数据源源源不断的产生数据,我们可以把它当作一个流,可以进行计算了,DataStream被flatMap以后是SingleOutputStreamOperator,实际上这个类和DataSream区别并不是特别大,SingleOutputStreamOperator继承自DataStream且没有重写任何函数。

KeyedStream则提供了一些聚合函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
graph LR
%% style
classDef green fill:#a3e4d7,stroke:#333,stroke-width:1px

%% point
DataStream(DataStream<br/>数据源):::green
SingleOutputStreamOperator(SingleOutputStreamOperator<br/>简单的输出流):::green
KeyedStream(KeyedStream<br/>被Key分组的流):::green

%% edge
DataStream -->|flatMap| SingleOutputStreamOperator
DataStream -->|keyBy| KeyedStream

更具体一点,如wordCount,他经过flatMap分词,然后使用词进行Key,最后聚合,代码如下。

1
2
3
4
5
6
DataStream<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(value -> value.f0)
.sum(1);

窗口

当然复杂一点点的如WindowWordCount,中间穿插了一个计数窗口,代码如下。

1
2
3
4
5
6
7
8
DataStream<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new WordCount.Tokenizer())
// create windows of windowSize records slided every slideSize records
.keyBy(value -> value.f0)
.countWindow(windowSize, slideSize)
// group by the tuple field "0" and sum up tuple field "1"
.sum(1);

Socket数据源

最复杂的SocketWindowWordCount,首先执行nc -l 12345,然后启动此类的main函数,nc可以直接输入,我们能发每5秒输出了一次实时计算结果,代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
DataStream<WordWithCount> windowCounts =
text.flatMap(
new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(
String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy(value -> value.word)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce(
new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});

异步数据源

首先提出一个背景,有一条来自Kafka的数据,由于某些原因,该数据中暂时不包含完整的字段,当我们使用Flink从Kafka读取数据以后,还需要查询Mysql补全其字段,此后才能使用Flink做接下来的操作。

如果自定义数据源,从Kafka消费数据,然后查询Mysql,最后输出,这其中其实涉及到很多问题,一个最简单的想法是从Kafka单线程消费数据,然后阻塞住,去查询Mysql。

1
2
3
4
5
6
7
8
9
10
11
sequenceDiagram
rect rgb(0, 0, 255, .1)
loop
DataStream ->> + Kafka : 拉取数据
Kafka --) - DataStream : 返回数据
DataStream ->> + Mysql : 查询未知字段
Mysql --) - DataStream : 返回未知字段
DataStream ->> + out : 产生一条数据
out --) - DataStream : 数据生产成功
end
end

这样做无可厚非,但是效率堪忧,Kafka拉取数据要快于Mysql,所以补全字段以及输出结果可以异步完成,基于第二个点,引入了RichAsyncFunction

1
2
3
4
5
6
7
8
9
10
sequenceDiagram
rect rgb(0, 0, 255, .1)
loop
DataStream ->> + Kafka : 拉取数据
Kafka --) - DataStream : 返回数据
DataStream -) + 线程池 : 后续工作委托给线程池
end
end
线程池 ->> - 线程池 : 完成剩下的工作

当然RichAsyncFunction做的工作不仅仅是这些,实际上处理流程也比这个复杂很多,这里从中挑几个出来聊一聊。

首先是顺序问题,由于后续工作委托给了线程池,线程池内部当然可以并发执行,那么我们就没办法保证有哪些数据先处理完毕,Async I/O 给出的第一个解决方案是通过队列保证顺序,哪个任务先执行完我不管,最终按入队顺序取结果;第二个解决方案是完全不理会顺序,谁先执行完就取出谁的结果;第三个解决方案是关注watermark,对于当前watermark下的数据,执行完就直接取出结果,对于下一个watermark的数据,将其缓存,直到他的watermark抵达。读者可以在这里看到更加详细的过程http://wuchong.me/blog/2017/05/17/flink-internals-async-io/

GPU计算

MatrixVectorMul是一个GPU计算例子,其中主要的内容在Multiplier中。这里 不做过多介绍。

Iterator模型

试想,如果有一些元素需要进行迭代计算,比如说我们计算两个元素进行斐波拉契数列的第n项,是不是可以写一个递归?

1
2
3
int fib(int a,int b,int n){
return n<=0 ? b : fib(b,a+b,n-1);
}

如果某些算子也需要进行这些操作,我们可以使用ProcessFunction, 下面这个代码和上面的代码的思想异曲同工。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public void processElement(
Tuple5<Integer, Integer, Integer, Integer, Integer> value,
Context ctx,
Collector<Tuple5<Integer, Integer, Integer, Integer, Integer>> out)
throws Exception {
Tuple5<Integer, Integer, Integer, Integer, Integer> element =
new Tuple5<>(value.f0, value.f1, value.f3, value.f2 + value.f3, ++value.f4);

if (value.f2 < BOUND && value.f3 < BOUND) {
ctx.output(ITERATE_TAG, element);
} else {
out.collect(element);
}
}

WindowJoin模型

参考SQL语法中的Join操作,两个stream将按照指定的key进行聚合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static DataStream<Tuple3<String, Integer, Integer>> runWindowJoin(
DataStream<Tuple2<String, Integer>> grades,
DataStream<Tuple2<String, Integer>> salaries,
long windowSize) {

return grades.join(salaries)
.where(new NameKeySelector())
.equalTo(new NameKeySelector())
.window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
.apply(
new JoinFunction<
Tuple2<String, Integer>,
Tuple2<String, Integer>,
Tuple3<String, Integer, Integer>>() {

@Override
public Tuple3<String, Integer, Integer> join(
Tuple2<String, Integer> first, Tuple2<String, Integer> second) {
return new Tuple3<String, Integer, Integer>(
first.f0, first.f1, second.f1);
}
});
}