生产问题总结及性能优化
# Kafka可视化管理工具kafka-manager (opens new window)
# 线上环境规划
- 网卡
- 硬盘,默认7日日志留存和存储的副本数
- CPU和内存要大一点
# JVM参数设置
- 用G1可以设置GC最大停顿时间
- 写数据到磁盘会用到操作系统的page cache,所以JVM内存不宜分配过大,需要给操作系统的缓存留出几个G
# 消息丢失问题
# 消息发送端
# acks=0
- 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息
- 性能最高,但是最容易丢消息
# acks=1
- 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入
- 如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失
# acks=-1或all
这意味着leader需要等待所有备份(min.insync.replicas配置的备份个数)都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。
如果此时断电,producer可以知道消息没有被发送成功,将会重新发送。如果在follower收到数据以后,成功返回ack,leader断电,broker发送ack之前,数据将存在于原来的follower中(客户端重复发送,会造成数据重复)。在重新选举以后,新的leader会持有该部分数据。
ISR中的follower为空,还是会丢失的。
# 总结
- 客户端设置重试、ack=-1、配置kafka中的最小副本(保证ISR里面有follower)、还有做try/catch做业务容错。
- 思路: ISR>0防止ack降级为1,其他情况利用ack和重试机制保证、最后用业务容错做支撑。
# 消息消费端
如果消费这边配置的是自动提交,万一消费到数据还没处理完,就自动提交offset了,但是此时你consumer直接宕机了,未处理完的数据丢失了,下次也消费不到了。所以设置手动提交
# 总结
这里一般都是看场景设置ack和手动提交
# 消息重复消费
# 消息发送端
发送消息如果配置了重试机制,比如网络抖动时间过长导致发送端发送超时,实际broker可能已经接收到消息,但发送方会重新发送消息
# 消息消费端
如果消费这边配置的是自动提交,刚拉取了一批数据处理了一部分,但还没来得及提交,服务挂了,下次重启又会拉取相同的一批数据重复处理。
- 手动提交也有可能会出现上面的情况
# 总结
一般都是消费端都是要做消费幂等处理的。
# 消息乱序
# 消息发送端
- 如果发送端配置了重试机制,kafka不会等之前那条消息完全发送成功才去发送下一条消息,这样可能会出现,发送了1,2,3条消息,第一条超时了,后面两条发送成功,再重试发送第1条消息,这时消息在broker端的顺序就是2,3,1了。
- 是否一定要配置重试要根据业务情况而定。也可以用同步发送的模式去发消息,当然acks不能设置为0,这样也能保证消息发送的有序。
# 保证全链路消息顺序消费
- 从发送端开始,将所有有序消息发送到同一个分区,然后用一个消费者去消费,但是这种性能比较低。
- 或者: 消费者端接收到消息后将需要保证顺序消费的几条消费发到内存队列(可以搞多个),一个内存队列开启一个线程顺序处理消息
# 消息积压
# 实时/消费任务挂掉导致的消费滞后
线上有时因为发送方发送消息速度过快,或者消费方处理消息过慢,可能会导致broker积压大量未消费消息
方案一: 此种情况如果积压了上百万未消费消息需要紧急处理,可以修改消费端程序,让其将收到的消息快速转发到其他topic(可以设置很多分区),然后再启动多个消费者同时消费新主题的不同分区。
方案二: 临时扩容新节点,但是小心触发重平衡,导致消费者不消费。
# Kafka分区少了
如果数据量很大,合理的增加Kafka分区数是关键。但是小心触发重平衡,导致消费者不消费。
# 由于Kafka消息key设置的不合理,导致分区数据不均衡
可以在Kafka producer处,给key加随机后缀,使其均衡。
# 程序有bug/数据问题
- 由于消息数据格式变动或消费者程序有bug,导致消费者一直消费不成功,也可能导致broker积压大量未消费消息。
此种情况可以将这些消费不成功的消息转发到其它队列里去(类似死信队列),后面再慢慢分析死信队列里的消息处理问题。
# 延时队列的实现
# 延时队列存储的对象是延时消息。
- 订单完成1小时后通知用户进行评价。
- 30 分钟后未支付,取消订单
# 实现思路
- 发送延时消息时先把消息按照不同的延迟时间段发送到指定的队列中(topic_1s,topic_5s,topic_10s,...topic_2h,这个一般不能支持任意时间段的延时)
- 然后通过定时器进行轮训消费这些topic,查看消息是否到期,如果到期就把这个消息发送到具体业务处理的topic中,队列中消息越靠前的到期时间越早,具体来说就是定时器在一次消费过程中,对消息的发送时间做判断,看下是否延迟到对应时间了,如果到了就转发,如果还没到这一次定时任务就可以提前结束了,并把偏移量seek到上一次消费的位置以便等待下一个周期再次消费这条消息。
- 一个消费者对应一个分区,而一个分区是局部有序的,所以也不用考虑,分区的顺序问题,但是如果重试机制导致的顺序,这里会导致业务部分的延迟,但是延迟可接受。
- 这里需要注意spring-boot,多线程多消费者,的定时器编写。
# 消息回溯
可以用consumer的offsetsForTimes、seek等方法指定从某个offset偏移的消息开始消费
# 分区数越多吞吐量越高吗
如果分区数设置过大,比如设置10000,可能会设置不成功,后台会报错"java.io.IOException : Too many open files"
这是linux下用ulimit设置打开文件的数量,默认是1024
ulimit -n 65535
# 消息传递保障
# at most once
- 消费者最多收到一次消息,0--1次
- acks = 0
# at least once
- 消费者至少收到一次消息,1--多次
- ack = all 可以实现
# exactly once
- 消费者刚好收到一次消息
- at least once 加上消费者幂等性可以实现
# kafka生产者的幂等性
- 只需在生产者加上参数 props.put(“enable.idempotence”, true)
# 实现原理
kafka每次发送消息会生成PID和Sequence Number,并将这两个属性一起发送给broker,broker会将PID和Sequence Number跟消息绑定一起存起来,下次如果生产者重发相同消息,broker会检查PID和Sequence Number,如果相同不会再接收。
# kafka消费者的幂等性
见RabbitMQ的章节.
# kafka的事务
- Kafka的事务主要是保障一次发送多条消息的事务一致性(要么同时成功要么同时失败)
- 不同于Rocketmq,Rocketmq是保障本地事务(比如数据库)与mq消息发送的事务一致性
# kafka高性能的原因
- 磁盘顺序读写:kafka消息不能修改以及不会从文件中间删除保证了磁盘顺序读,kafka的消息写入文件都是追加在文件末尾,不会写入文件中的某个位置(随机写)保证了磁盘顺序写
- 数据传输的零拷贝
- 读写数据的批量batch处理以及压缩传输
# 参考资料
Kafka集群消息积压问题及处理策略 (opens new window)
基于kafka实现延迟队列 (opens new window)
Kafka学习(一):消费者实现对分区的并发消费 (opens new window)
在spring中实现消息的并发消费采用的是线程封闭的策略,具体实现是在创建监听器容器时,会根据配置的concurrency来创建多个KafkaMessageListenerContainer,在该类中又有内部类ListenerConsumer,在该内部类中封闭创建了consumer对象。以此来实现主题消息的并发消费。也就是多线程多消费者。
因为一次 Kafka 宕机,我明白了 Kafka 高可用原理! (opens new window)
Asks=All就不会出现丢失消息的情况吗?答案是否。当ISR列表只剩Leader的情况下,Asks=All相当于Asks=1,这种情况下如果节点宕机了,还能保证数据不丢失吗?因此只有在Asks=All并且有ISR中有两个副本的情况下才能保证数据不丢失。设置
replication-factor副本数为3。 通过设置最少的ISR,可以解决这个问题。
解决kafka ISR缺失严重导致消费异常的方法 (opens new window)
对应无法消费的业务topic存在部分分区的ISR列表丢失2/3,且随着时间的推移,isr缺失的分区占比在增加。
导致无法消费kafka,且往kafka写消息报错 调整这些参数 (opens new window): replica..... 的配置,如 响应leader的最长等待时间、落后消息数量、socket超时时间、socket缓存大小、每次获取数据的最大字节数、通信之间的最大等待时间、复制的线程数、将最高水位进行flush的时间间隔等等
Kafka 会不会丢消息?怎么处理的? (opens new window)
Kafka——一致性重要机制之ISR(kafka replica) (opens new window)
min.insync.replicas=1 # 需要保证ISR中至少有多少个replica