瑞A耗高原因是什么?如何改善?,

性能优化:一个Flink参数节省50%的CPU消耗

导读:本文属于 Flink 在生产环境的大规模 CPU 优化实战,大并发任务预计节省 30~50% 的 CPU 消耗。下文会详细分析优化相关的实现原理、问题定位以及优化过程。往往在做性能优化时就会发现:当已经定位到性能瓶颈时,很容易想到优化思路去解决或优化。但定位问题的过程其实是最难的,也就是找性能瓶颈的这个过程更具有意义。本文问题定位的过程以及用到的性能分析工具、命令可能对于广大的技术同学更有收益。

0、 结论

Flink 大并发任务(超过 500 并发)在使用 keyBy 或者 rebalance 的情况下,将 bufferTimeout 设置为 1s 可以节省 30~50% 的 CPU 消耗。中等并发任务也会有不少收益。

1、 Flink 如何权衡吞吐与延迟的关系

TM 是真正处理数据的进程,如下图所示,上游 Task 2 的输出做为下游 Task 1 的输入,必然会产生 shuffle,也就是网络间数据交换。

如果 Task2 每处理一条数据就通过网络发送一条数据给 Task1,显然数据传输效率比较低。为了提高吞吐,Flink 设计了 NetworkBuffer 来实现攒批。即:Task2 的每个 subtask 内部都会缓存一批数据再发送给下游 Task1。

1.1 网络传输 buffer 实现原理

假设一个任务有两个 Task,分别是 TaskA 和 TaskB,TaskA 的并行度为 2,TaskB 的并行度为 3,且 TaskA 和 TaskB 之间的连接方式是 keyBy。如下图所以,总共有 2 * 3 = 6 个连接。

详细的数据传输如下图所示。上游每个 subtask 中会有 3 个 resultSubPartition,连接下游算子的 3 个 subtask。下游每个 subtask 中会有 2 个 InputChannel,连接上游算子的 2 个 subtask。一个个彩色的小圆圈就是一个个 buffer,buffer 生成以后,就可以发送到下游 TaskB。

Note:Subtask 中 buffer 生成好以后,会被 Netty 消费,由 Netty Server 负责将数据发送给下游 Subtask B。SubtaskB 的 Netty Client 将接收到的 buffer 写到 InputChannel 中,下游的 Subtask 就可以开始反序列化处理数据了。具体参考官网图示:

1.2 生成 buffer 的规则

Netty Server 会消费 SubtaskA resultSubPartition 产生的 buffer 数据,那 NettyServer 什么时候能消费到 buffer 呢?

正在写入的 buffer 肯定不能被 Netty 消费到,只有写完的 buffer 才能被消费到。在 Flink 中有三种情况会认为 buffer 写完了,可以被 Netty Server 消费:

  • buffer 写满了
  • buffer 超时了
  • 遇到特殊的 Event,例如:Checkpoint barrier

默认 buffer 大小是 32KB,如果 32KB 写满就认为当前 buffer 写完了,可以将当前 buffer 发送到下游 Task,也就是条件1。

如果低峰期数据量比较小,1 分钟也没有将 32KB 的 buffer 写满,那么数据将一直缓存在 buffer 中,从而导致大量的延迟。为了解决数据延迟的问题,Flink 增加了一个时间策略,默认是 100ms。如果 100ms buffer 还没写满,为了保障数据的实时性,也认为 buffer 写完了,可以将当前 buffer 发送到下游 Task,也就是条件 2。

遇到一些特殊的事件,例如遇到 Checkpoint barrier,为了加快 Checkpoint 效率,会直接认为 buffer 写完了,可以将当前 buffer 发送到下游 Task,也就是条件 3。

当然条件 2 的 100ms 是可以调节的,Flink 1.10 及以后的版本直接通过配置参数 execution.buffer-timeout: 100ms 可以设置,Flink 1.10 之前通过代码 env.setBufferTimeout(100); 可以设置。

1.3 小结

Flink 通过 buffer-timeout 参数对吞吐和延迟做权衡,当设置为 0 时表示没有 timeout 策略,即:每条数据来了都认为 buffer 满了,将这一条数据单独发送给下游。保障了实时性,但吞吐可能会下降。要想吞吐不下降,就需要消耗更多的资源。

100ms 是 Flink 权衡过的 timeout 默认值,既能保证吞吐,又能保障延迟控制在 100ms 以内。

2、 大并发任务为什么会消耗更多的 CPU?

2.1 大并发的 resultSubPartition 图

假设 TaskA 有 1000 个并发,下游 TaskB 也有1000 个并发,意味着 SubtaskA0 总共有 1000 个 resultSubPartition,分别对应 SubtaskB0、B1 ... B999。如下所示是 SubtaskA0 的 resultSubPartition 图,其他的 SubtaskA1、A2 ... A999 也是同样的图示,图中可以看到 SubtaskA0 中会有 1000 个 resultSubPartition。

2.2 大并发情况,每个 buffer 中会缓存多少条数据?

buffer 是为了缓存一批数据,批量发送,提高效率。如果是大并发情况,到底每个 buffer 会缓存多少条数据做为一个批次呢?

默认 bufferTimeout = 100ms 意味着 100ms 必须将 buffer 中的数据发送到下游 Task。假设 SubtaskA0 每秒处理 1 万条数据,则 100ms 平均处理 10000 / 10 = 1000 条数据。这 1000 条数据要发送给 1000 个 resultSubPartition,所以平均一个 resultSubPartition 中只会有 1 条数据,每个 resultSubPartition 有自己独立的 buffer,也就是说每个 buffer 其实只缓存了 1 条数据。跟没有 buffer 没什么区别。。。

如果 TaskB 的并发为 2000,则 SubtaskA0 内部会有 2000 个 resultSubPartition。100ms 产生 1000 条数据,则只有 1000 个 resultSubPartition 中有数据,仍然是 100ms 发送 1000个 buffer 给下游 TaskB,每个 buffer 中只有 1 条数据。

2.3 大并发任务消耗 CPU 小结

生产环境很多任务的并发远大于 1000,所以造成很多 buffer 仅仅只缓存 1 条数据就被 timeout 策略触发发送给下游 Task。每条数据做为 1 个 buffer,每秒处理 1 万条数据,则后台线程每秒需要 flush 1 万个 buffer 到 NettyServer,从而大量消耗 CPU。

不仅是发送方效率降低,下游的 Subtask B 接受数据的效率也会降低。每秒接受 1 万的 buffer,每个 buffer 里 1 条数据。大量的小 buffer,大量的读取小数据,消耗大量的 CPU 资源。

2.4 小并发任务会存在 buffer 攒批效果不好的问题吗?

假设 TaskA 和 Task B 的并发是 50,SubtaskA 对应 50 个 resultSubPartition。每秒产生 1 万条数据,则 100ms 需要会处理 1000 条数据,平均每个 resultSubPartition 能攒 1000 / 50 = 20 条数据。相对而言攒批效果还算可以。

如果单条消息比较大,可能 20 条数据早就将 buffer 占满了,buffer 如果占满会直接发送到下游 Task,无需等待 timeout 超时才发送。

已经举了多个案例,大家可以使用上述的计算方式计算一下,生产环境自己开发的任务是否存在:因为攒批效果不好导致 CPU 消耗较多的任务?

3、 优化思路、测试结果及其优化收益

3.1 优化思路

经过上面的理论分析,很容易想到的优化思路就是:调大 bufferTimeout 参数。

TaskA 和 Task B 的并发仍然是 1000,将 bufferTimeout 调大到 1s。SubtaskA 对应 1000 个 resultSubPartition,每秒产生 1万条数据,则每个 resultSubPartition 平均分配 10 条数据,即:每个 buffer 平均缓存了 10 条数据,相比之前 1 条来讲,buffer 攒批的效果明显变好。

3.2 测试结果

测试条件:业务逻辑简单,keyBy 后不做任何业务处理,并发 1200。

因为没有业务处理,所以从业务角度来讲,CPU 应该主要消耗在序列化和反序列化 protobuf。从引擎角度来讲,TM 之间数据 flush、传输需要消耗不少的 CPU。

基于上述条件,仅调节 bufferTimeout 参数,观察单个 TM 的平均 CPU 消耗。

  • bufferTimeout = 100ms 时,TM 平均 CPU 使用 0.59 core
  • bufferTimeout = 1s 时,TM 平均 CPU 使用 0.39 core,CPU 节省了 33%
  • bufferTimeout = 10s 时,TM 平均 CPU 使用 0.33 core,CPU 节省了 44%

bufferTimeout 无限扩大并不能一直提高性能,因为 3 秒可能 buffer 就满了,此时设置 3 s 和 10s 并没有任何区别。

不同业务方的多个同学反馈:大多数业务场景都可以接受 1s 延迟,所以生产环境将大并发任务的 bufferTimeout 设置为 1s 是个不错的选择。既能节省不少的 CPU 消耗,又能保证延迟在 1s 以内。

3.3 生产环境任务的实际收益

测试结果毕竟没有生产环境更具有说服力。与多个业务线的同学沟通后,调整了 3 个大任务,CPU 最少的节省了 30%,最多的节省了 50%。

4、 问题分析定位的过程

上述部分都是在讲原理、优化过程以及最后的测试结果和优化后的收益。往往在做性能优化时就会发现:当已经定位到性能瓶颈时,很容易想到优化思路去解决或优化。但定位问题的过程其实是最难的,也就是找性能瓶颈的这个过程更具有意义。

下面主要讲述:如何发现调节 bufferTimeout 参数会节省很多的 CPU?一些性能优化的方法论。

其实很多同学都知道 bufferTimeout 参数调大可能会减少资源消耗,但却没想到大并发任务调大 bufferTimeout 后会有如此大的资源收益。

4.1 火焰图

业务方反馈有个 数千并发的 Flink 任务高峰期单个 TM CPU 使用 150% 以上,需要架构组同学辅助做优化。

划重点:CPU 使用比较高,要对 CPU 使用做优化,首先需要知道 CPU 大部分时间在做什么工作?假设 CPU 有 1% 的时间在执行 A,30% 的时间在执行 B,69% 的时间在执行 C。那么应该花更多的精力去优化 C。

至于那 1% 的 A 可能也有优化空间,优化后可能也有收益,但是由于占得比例特别低,就算优化了,收益也比较小,没必要耗费大量的精力去做 A(性能优化也要考虑 ROI)。

那有没有工具能分析进程的 CPU 在频繁执行哪些代码吗?有,那就是《火焰图》。

火焰图能呈现出应用程序在执行哪些方法,每个方法执行的时长占了整个 CPU 的消耗的百分比。由于篇幅原因,本文不会详细介绍火焰图,关于火焰图的生成以及如何读懂火焰图,大家可以自行学习。

下图是一张火焰图,可以看到 netty 相关线程的方法占了 29.38% 的 CPU。

使用火焰图对进程分析以后,发现有几大块 CPU 消耗比较大:

  • netty 相关
  • kafka 序列化压缩消息
  • protobuf 序列化反序列化

此时也没办法确定 netty 这块可以优化,以为大数据量下数据传输正常就需要这么多 CPU。至于后两者属于业务角度的 CPU 消耗,暂且忽略。

4.2 创建测试任务用于性能分析

线上任务没办法用于测试,所以需要创建一个新的副本任务用于测试。如果消费全量数据测试,则需要相同的资源量(几千的 TM),显然没有那么多的资源。

幸运的是我们团队的刚哥开发了一个 Feature:通过参数控制只消费 Kafka 部分的 partition。假设 kafka 有 2000 个 partition,可以通过参数指定只消费十分之一的 partition,也就是 200 个 partition。这样就只需要十分之一的资源用于测试即可。感谢刚哥开发的超级实用的 Feature。

有些任务消费十分之一的 partition,资源并不能减少为原来的十分之一,例如计算 uv 的场景,如果仅仅消费十分之一的数据,uv 应该不能减少原来的十分之一。有些任务消费十分之一的 partition,资源可以减少为原来的十分之一,例如计算 pv 的场景。

幸运的是:当前要优化的任务虽然不是简单的 pv 场景,但如果消费十分之一的 partition,资源理论来讲应该是原来的十分之一左右。

所以就创建了一个测试任务,相比线上任务做了以下三个改动:

  • Sink 替换成测试的 topic
  • Source 仅消费十分之一的 partition
  • 并行度改为生产环境的十分之一

4.3 分析性能

理论来讲,测试任务的 TM 平均 CPU 使用率与生产环境基本一样。因为数据量少了十倍,资源少了十倍。

但是发现测试任务的 CPU 使用率要比生产环境低 30%,不可思议。继续使用火焰图分析,对比测试任务和线上任务的 CPU 使用到底差在哪里?发现主要区别在于 netty 网络相关的 CPU 消耗:

  • 线上任务的 netty 占整个 CPU 使用率的 30%
  • 测试任务的 netty 占整个 CPU 使用率的 6%

两个任务的区别主要在于并发数,测试任务的 TM 数量是线上任务的十分之一。

所以初步怀疑:大并发情况,单个 TM 处理的数据量虽然不变,但网络间数据传输消耗的 CPU 会变多。

根据之前的计算公式:如果 SubtaskA 处理的数据量不变,下游并发数越多对应的 resultSubPartition 就会越多,导致每个 resultSubPartition 中的 buffer 攒批效果不好。反之下游并发数越少对应的 resultSubPartition 就会越少,则每个 resultSubPartition 中的 buffer 攒批变好,从而提高了性能。

于是猜想:任务的并发数不变,如果调大 bufferTimeout 时间,攒批效果也会变好,是不是性能也会有提升呢?

4.4 验证猜想

于是就有了本文 3.2 部分的测试结果。写了一个业务逻辑特别简单的任务,仅调节 bufferTimeout 参数,观察单个 TM 的平均 CPU 消耗。

  • bufferTimeout = 100ms 时,TM 平均 CPU 使用 0.59 core
  • bufferTimeout = 1s 时,TM 平均 CPU 使用 0.39 core,CPU 节省了 33%
  • bufferTimeout = 10s 时,TM 平均 CPU 使用 0.33 core,CPU 节省了 44%

于是就确信了:优化攒批效果会节省很多的 CPU 消耗。这也就是整个性能分析优化的过程。

5、 总结

Flink 依赖 NetworkBuffer 的 timeout 参数对吞吐与延迟做权衡。大并发任务如果使用 keyBy 或者 rebalance,则 buffer 的攒批效果会下降,每个 buffer 里存放一条数据,导致整体资源消耗变大。可以调大 bufferTimeout 将 buffer 中数据变多,提高攒批效果,从而优化资源消耗。

做性能分析和性能优化要合理使用科学的性能分析工具,合理的工具能实现事半功倍。当有很多优化思路时,性能分析工具可以在优化之前分析出每个优化项预期有多大收益。优先去做那些 ROI 比较高的优化。

感谢您的阅读,如果喜欢本文欢迎关注和转发,本头条号将坚持持续分享IT技术知识。对于文章内容有其他想法或意见建议等,欢迎提出共同讨论共同进步。

本文转自: 大数据渣渣瑞 https://mp.weixin.qq.com/s/UjCGiGTT2h0c9iZyoOo3tA

2024-02-28

后面没有了,返回>>电动车百科