kafka消息转本地队列多线程消费下发现的问题
这个问题来源于对于trace系统的改造。
之前的trace消费其实是没有问题的,但是整个trade结构被改造成了本地消息队列的形式,所以trace迁移新库的同时决定统一采用消息队列的形式来进行消费。
trace本地队列主要由两部分组成:
- 核心线程池master:负责开两个线程池,一个是dispatch,一个是monitor。是一个ScheduledThreadPool,时延由apollo配置控制。
- dispatch线程池:负责从kafka拉取信息放入本地队列中,循环拉取,在执行完一批后将会拉去新的一批,跳出条件为running为false。
- monitor线程池:负责监控本地队列的占用情况
- worker:负责本地队列信息的消费,是一个ThreadPoolExecutor
- 本地队列:是一个blocking queue
1 | this.consumer.subscribe(this.getTopic()); |
一(消费线程退出版)
第一版的构成是不太一样的,思想是队列进来一个Runnable任务,就开启一个消费者线程进行消费,worker会创建一个新的Runnable匿名类消费这一条信息。
上线后发现功能无异常,但是会有巨大的创建销毁线程的开销,导致本地消费能力很低
二(消费线程不退出版)
因为第一版的问题,为了加大本地消费能力,修改:
1修改消费线程为一个while循环,唯一出口为running参数为false
2在初始化的时候创建消费线程,交给worker去执行
这样的好处是:在队列中有无消息的情况下,这些消费线程都不会退出。节省了线程创建销毁的开销。
上线后发现消费能力从3k上涨到5k左右
三
第二版上线后发现有服务停止运行的情况。
刚开始排查的问题以为是服务问题,以为服务down掉了,检查之后发现状况为:dispatch停止运行,但是monitor却正常运行,日志发现本地队列为空,kafka topic lag持续增长。
这么说只有两个原因:
kafka将consumer踢出去
dispatch线程池ScheduledThreadPool没有定时执行
加了日志排除了2. 所以只能是1。再进行细化。
kafka为什么要踢出我这个消费者?
稍微查了一下资料,发现==kafka在消费者长期未commit的情况下,将该消费者认为无法消费==,将进行rebalance。在之前代码的逻辑,为了满足业务需求,关闭了kafka自动提交。在trace放入本地队列失败的情况下(采用BlockingQueue的offer方法),将会重新拉取同一个offset这一批数据,重新消费。但是由于仍然无法消费,所以在这个地方卡住了,kafka就将该消费者踢出去了。
发现问题原因后有以下几个方法能解决(事后总结):
- 开启监控本地队列占用情况的线程,在本地队列将满的情况下,sleep dispatch线程。等待consumer消费本地队列。
- 原有的offer方法更改为put阻塞方法的执行。(本地队列是一个blockingQueue)
- 增加offer方法的等待时间
- 每次不再拉取一批数据放入队列, 而是一个逐一手动提交
四
没有采用上述2的原因是,put会严重降低效率,也测试了1方法,发现在业务上能实现,但是没有必要
尝试更改加长了offer时间,从50ms升高到500ms,发现报错基本消失
目前为止应用运行一段时间被kafka踢出消费者组的情况已经解决了。
但是又发现了新的问题。
消息有重复消费,因为每次拉取的数目是500条,如果有一条无法消费报错(如400),整段都无法commit。下次将会消费之前的offset。这样第1-399条将会重复消费。
如果解决这个问题。需要大改。将消费者改为每次拉取一条的情况,手动指定offset提交,可以完全避免。但是由于成本太大,放弃这个方案。
1 | // 放入队列的那段代码 |
五 最终版
吸取了之前的经验。最终版做了以下优化:
- 合理调节offer时间,使本地引用的消费能力与kafka本地队列的存放能力达到平衡。
- 在dispatch方法中加入错误跳出机制。
- 之前dispatch是一直循环拉kafka数据,就算error,也会立即重新拉,这样在本地线程阻塞的时候,会在瞬间频繁出现error。导致kafka踢出consumer。(也就是在拥塞的时候还会放东西进去)
- 修改为: 一旦发现不能添加,直接跳出整个dispatch方法(之前是只要拉取的数据不为空一直在dispatch,没有起到定时的作用)。这样可以在ScheduledThreadPool下次开启dispatch之间(dispatch前文说过是定时的)不再拉取新的kafka数据。给与本地队列喘息时间被消费。
- 结合2通过设置dispatch线程池重启时间进行合理调节
1 | while (!records.isEmpty()) { |
六 最终版plus
没想到
还是运行了一段时间就down掉了。检查了jvm和容器状态推测,在down掉的时间cpu很高,基本在200左右。
所以是因为cpu使用率太高导致容器重启。cpu和线程直接挂钩,于是降低了数据库连接池个数(之前错误开了很大的连接池),将cpu使用率由200降到了150。
但是核心原因是:为什么重启后consumer就不能消费了呢?
**分析: **容器在重启前会调用shutdown方法。之前的shutdown方法只关闭了master线程池,并没有关闭consumer线程池。这导致容器关闭前consumer拉取的那一批数据无法被消费。kafka将流量压力又只给了一个容器(容器重启前的最后一个,基于重启前的rebalance结果)
查阅kafka官方文档:
如果你没有在程序退出前很好关闭consumer,最明显的行为主是在下次启动程序消费数据时会发现consumer分配分区的过程可能非常慢,还有一个问题是Kafka不能立即知道consumer已经退出,如果同组在其他地方有其他消费者,rebalance要在心跳超时后才分触发
在shutdown方法中增加了consumer.close()
再结合上述的最终版, 最终这个服务达到了稳定.