storm 1.0.1 bug哪个版本比较问题点,bug少

Storm 集群空闲 CPU 飙高问题排查 - 推酷
Storm 集群空闲 CPU 飙高问题排查
最近将公司的在线业务迁移到Storm集群上,上线后遇到低峰期CPU耗费严重的情况。在解决问题的过程中深入了解了storm的内部实现原理,并且解决了一个storm0.9-0.10版本一直存在的严重bug,目前代码已经合并到了storm新版本中,在这篇文章里会介绍这个问题出现的场景、分析思路、解决的方式和一些个人的收获。
首先简单介绍一下Storm,熟悉的同学可以直接跳过这段。
Storm是Twitter开源的一个大数据处理框架,专注于流式数据的处理。Storm通过创建拓扑结构(Topology)来转换数据流。和Hadoop的作业(Job)不同,Topology会持续转换数据,除非被集群关闭。
下图是一个简单的Storm Topology结构图。
可以看出Topology是由不同组件(Component)串/并联形成的有向图。数据元组(Tuple)会在Component之间通过数据流的形式进行有向传递。Component有两种
Spout:Tuple来源节点,持续不断的产生Tuple,形成数据流
Bolt:Tuple处理节点,处理收到的Tuple,如果有需要,也可以生成新的Tuple传递到其他Bolt
目前业界主要在离线或者对实时性要求不高业务中使用Storm。随着Storm版本的更迭,可靠性和实时性在逐渐增强,已经有运行在线业务的能力。因此我们尝试将一些实时性要求在百毫秒级的在线业务迁入Storm集群。
某次高峰时,Storm上的一个业务拓扑频繁出现消息处理延迟。延时达到了10s甚至更高。查看高峰时的物理机指标监控,CPU、内存和IO都有很大的余量。判断是随着业务增长,服务流量逐渐增加,某个Bolt之前设置的并行度不够,导致消息堆积了。
临时增加该Bolt并行度,解决了延迟的问题,但是第二天的低峰期,服务突然报警,CPU负载过高,达到了100%。
用Top看了下CPU占用,系统调用占用了70%左右。再用
对Storm的工作进程进行分析,找到了CPU占用最高的线程 java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for
&0xa248f8& (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
at com.lmax.disruptor.BlockingWaitStrategy.waitFor(BlockingWaitStrategy.java:87)
at com.lmax.disruptor.ProcessingSequenceBarrier.waitFor(ProcessingSequenceBarrier.java:54)
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:97)
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
at backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748)
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:745)
我们可以看到这些线程都在信号量上等待。调用的来源是disruptor$consume_batch_when_available。
disruptor是Storm内部消息队列的封装。所以先了解了一下Storm内部的消息传输机制。
(图片来源
Storm的工作节点称为Worker(其实就是一个JVM进程)。不同Worker之间通过Netty(旧版Storm使用ZeroMQ)进行通讯。 每个Worker内部包含一组Executor。Strom会为拓扑中的每个Component都分配一个Executor。在实际的数据处理流程中,数据以消息的形式在Executor之间流转。Executor会循环调用绑定的Component的处理方法来处理收到的消息。 Executor之间的消息传输使用队列作为消息管道。Storm会给每个Executor分配两个队列和两个处理线程。
工作线程:读取接收队列,对消息进行处理,如果产生新的消息,会写入发送队列
发送线程:读取发送队列,将消息发送其他Executor
当Executor的发送线程发送消息时,会判断目标Executor是否在同一Worker内,如果是,则直接将消息写入目标Executor的接收队列,如果不是,则将消息写入Worker的传输队列,通过网络发送。 Executor工作/发送线程读取队列的代码如下,这里会循环调用consume-batch-when-available读取队列中的消息,并对消息进行处理。 (async-loop
(disruptor/consume-batch-when-available receive-queue event-handler)
我们再来看一下consume_batch_when_available这个函数里做了什么。 (defn consume-batch-when-available
[^DisruptorQueue queue handler]
(.consumeBatchWhenAvailable queue handler))
前面提到Storm使用队列作为消息管道。Storm作为流式大数据处理框架,对消息传输的性能很敏感,因此使用了高效内存队列Disruptor Queue作为消息队列。
Disruptor Queue是LMAX开源的一个无锁内存队列。内部实现如下。
(图片来源
Disruptor Queue通过Sequencer来管理队列,Sequencer内部使用RingBuffer存储消息。RingBuffer中消息的位置使用Sequence表示。队列的生产消费过程如下
Sequencer使用一个Cursor来保存写入位置。
每个Consumer都会维护一个消费位置,并注册到Sequencer。
Consumer通过SequenceBarrier和Sequencer进行交互。Consumer每次消费时,SequenceBarrier会比较消费位置和Cursor来判断是否有可用消息:如果没有, 会按照设定的策略等待消息 ;如果有,则读取消息,修改消费位置。
Producer在写入前会查看所有消费者的消费位置,在有可用位置时会写入消息,更新Cursor。
查看DisruptorQueue.consumeBatchWhenAvailable实现如下 final long nextSequence = _consumer.get() + 1;
final long availableSequence = _barrier.waitFor(nextSequence, 10, TimeUnit.MILLISECONDS);
if (availableSequence &= nextSequence) {
consumeBatchToCursor(availableSequence, handler);
继续查看_barrier.waitFor方法 public long waitFor(final long sequence, final long timeout, final TimeUnit units) throws AlertException, InterruptedException {
checkAlert();
return waitStrategy.waitFor(sequence, cursorSequence, dependentSequences, this, timeout, units);
Disruptor Queue为消费者提供了若干种消息等待策略
BlockingWaitStrategy:阻塞等待,CPU占用小,但是会切换线程,延迟较高
BusySpinWaitStrategy:自旋等待,CPU占用高,但是无需切换线程,延迟低
YieldingWaitStrategy:先自旋等待,然后使用Thread.yield()唤醒其他线程,CPU占用和延迟比较均衡
SleepingWaitStrategy:先自旋,然后Thread.yield(),最后调用LockSupport.parkNanos(1L),CPU占用和延迟比较均衡
Storm的默认等待策略为BlockingWaitStrategy。BlockingWaitStrategy的waitFor函数实现如下 if ((availableSequence = cursor.get()) & sequence) {
lock.lock();
while ((availableSequence = cursor.get()) & sequence) {
barrier.checkAlert();
if (!processorNotifyCondition.await(timeout, sourceUnit)) {
lock.unlock();
BlockingWaitStrategy内部使用信号量来阻塞Consumer,当await超时后,Consumer线程会被自动唤醒,继续循环查询可用消息。
而DisruptorQueue.consumeBatchWhenAvailable方法中可以看到,Storm此处设置超时为10ms。推测在没有消息或者消息量较少时,Executor在消费队列时会被阻塞,由于超时时间很短,工作线程会频繁超时然后重新阻塞,导致CPU占用飙高。 尝试将10ms修改成100ms,编译Storm后重新部署集群,使用Storm的demo拓扑,将bolt并发度调到1000,修改spout代码为10s发一条消息。经测试CPU占用大幅减少。 再将100ms改成1s,测试CPU占用基本降为零。
但是随着调高超时,测试时并没有发现消息处理有延时。继续查看BlockingWaitStrategy代码,发现Disruptor Queu的Producer在写入消息后会唤醒等待的Consumer。 if (0 != numWaiters)
lock.lock();
processorNotifyCondition.signalAll();
lock.unlock();
这样,Storm的10ms超时就很奇怪了,没有减少消息延时,反而增加了系统负载。带着这个疑问查看代码的上下文,发现在构造DisruptorQueue对象时有这么一句注释 ;; :block strategy requires using a timeout on waitFor (implemented in DisruptorQueue),
as sometimes the consumer stays blocked even when there's an item on the queue.
(defnk disruptor-queue
[^String queue-name buffer-size :claim-strategy :multi-threaded :wait-strategy :block]
(DisruptorQueue. queue-name
((CLAIM-STRATEGY claim-strategy) buffer-size)
(mk-wait-strategy wait-strategy)))
Storm使用的Disruptor Queue版本为2.10.1。查看Disruptor Queue的change log,发现该版本的BlockingWaitStrategy有潜在的并发问题,可能导致某条消息在写入时没有唤醒等待的消费者。
2.10.2 Released (21-Aug-2012)
Bug fix, potential race condition in BlockingWaitStrategy.
Bug fix set initial SequenceGroup value to -1 (Issue #27).
Deprecate timeout methods that will be removed in version 3.
因此Storm使用了短超时,这样在出现并发问题时,没有被唤醒的消费方也会很快因为超时重新查询可用消息,防止出现消息延时。 这样如果直接修改超时到1000ms,一旦出现并发问题,最坏情况下消息会延迟1000ms。在权衡性能和延时之后,我们在Storm的配置文件中增加配置项来修改超时参数。这样使用者可以自己选择保证低延时还是低CPU占用率。
就BlockingWaitStrategy的潜在并发问题咨询了Disruptor Queue的作者,得知2.10.4版本已经修复了这个并发问题(
将Storm依赖升级到此版本。但是对Disruptor Queue的2.10.1做了并发测试,无法复现这个并发问题,因此也无法确定2.10.4是否彻底修复。谨慎起见,在升级依赖的同时保留了之前的超时配置项,并将默认超时调整为1000ms。经测试,在集群空闲时CPU占用正常,并且压测也没有出现消息延时。
关于集群空闲CPU反而飙高的问题,已经向Storm社区提交PR并且已被接受
。在线业务流量通常起伏很大,如果被这个问题困扰,可以考虑应用此patch。
Storm UI中可以看到很多有用的信息,但是缺乏记录,最好对其进行二次开发(或者直接读取ZooKeeper中信息),记录每个时间段的数据,方便分析集群和拓扑运行状况。
已发表评论数()
请填写推刊名
描述不能大于100个字符!
权限设置: 公开
仅自己可见
正文不准确
标题不准确
排版有问题
主题不准确
没有分页内容
图片无法显示
视频无法显示
与原文不一致9/6 QA 9.10版本BUG较多 等待进一步修复
作者:EnglishWorld1
来源:贴吧
发布时间: 08:09
内容简介: 今天内容不多。回答的问题基本都是和bug有关,因为Storm正在收集关于9.10 bug的反馈(显然有不少)。
  Not much going on - the answers are usually bug oriented because Storm is collecting player feedback on 9.10 bugs (apparently there are quite a few).
  今天内容不多。回答的问题基本都是和bug有关,因为Storm正在收集关于9.10 bug的反馈(显然有不少)。
  - Storm won't comment on supertest physics leaks - it's too early to publish anything concrete
  Storm不会对泄露的物理移动系统超测情报进行评论。现在就公开具体细节还太早。
  - Storm confirms they know about the sniper mode camera bug (red: how camera moved before, green: how it does now)
  Storm确认他们知道开镜模式下镜头移动的bug(红色:以前镜头的移动方向;绿色:现在的方向)。
  - 9.10 apparently also has a nasty bug where the enemy silhouette sometimes disappear even if you should be able to see it (model is visible, just the silhouette does not activate)
  显然9.10还有个恶心的bug,有时候在该显示敌方车辆轮廓的时候却不显示(模型是可见的,但不显示高亮轮廓)。
  近期精彩推荐:  
  [Top 1]
  [Top 2]
  [Top 3]
  [Top 4]
  [Top 5]
更多精彩欢迎访问扫描下载MIUI论坛APP
经验164 米
在线时间31 小时
积分 242, 距离下一级还需 258 积分
积分 242, 距离下一级还需 258 积分
机型红米手机3
发表于 2&小时前
通过手机发布
分享到微信朋友圈
打开微信,点击底部的“发现”,使用 “扫一扫” 即可将网页分享到我的朋友圈。
经验282 米
在线时间4 小时
版本V8.0.5.0.MALCNDG
积分 331, 距离下一级还需 169 积分
积分 331, 距离下一级还需 169 积分
机型红米手机3S/3X
MIUI版本V8.0.5.0.MALCNDG
发表于 1&小时前
果断7.3.11
经验548 米
在线时间55 小时
版本6.9.22
积分 599, 距离下一级还需 1401 积分
积分 599, 距离下一级还需 1401 积分
机型红米手机3S/3X
签到次数47
MIUI版本6.9.22
发表于 半小时前
不解锁卡刷不回去,烦
经验677 米
在线时间13 小时
版本6.9.22
积分 769, 距离下一级还需 1231 积分
积分 769, 距离下一级还需 1231 积分
机型红米手机3S
签到次数14
MIUI版本6.9.22
发表于 半小时前
通过手机发布
最新版的开发版还可以
经验677 米
在线时间13 小时
版本6.9.22
积分 769, 距离下一级还需 1231 积分
积分 769, 距离下一级还需 1231 积分
机型红米手机3S
签到次数14
MIUI版本6.9.22
发表于 半小时前
通过手机发布
最新版的开发版还可以
MIUI 300周
MIUI 300周更新纪念勋章
已关注微信
已关注极客秀微信
Copyright (C) 2016 MIUI
京ICP备号 | 京公网安备34号 | 京ICP证110507号

参考资料

 

随机推荐