RocketMQ源码-顺序消费实现原理_天天看点
1. 介绍
全局有序
在RocketMQ中,如果使消息全局有序,可以为Topic设置一个消息队列,使用一个生产者单线程发送数据,消费者端也使用单线程进行消费,从而保证消息的全局有序,但是这种方式效率低,一般不使用。
(相关资料图)
局部有序
假设一个Topic分配了两个消息队列,生产者在发送消息的时候,可以对消息设置一个路由ID,比如想保证一个订单的相关消息有序,那么就使用订单ID当做路由ID,在发送消息的时候,通过订单ID对消息队列的个数取余,根据取余结果选择消息队列,这样同一个订单的数据就可以保证发送到一个消息队列中,消费者端使用MessageListenerOrderly处理有序消息,这就是RocketMQ的局部有序,保证消息在某个消息队列中有序。
接下来看RoceketMQ源码中提供的顺序消息例子(稍微做了一些修改):
生产者
public class Producer { public static void main(String[] args) throws UnsupportedEncodingException { try { // 创建生产者 DefaultMQProducer producer = new DefaultMQProducer("生产者组"); // 启动 producer.start(); // 创建TAG String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 100; i++) { // 生成订单ID int orderId = i % 10; // 创建消息 Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { // 获取订单ID Integer id = (Integer) arg; // 对消息队列个数取余 int index = id % mqs.size(); // 根据取余结果选择消息要发送给哪个消息队列 return mqs.get(index); } }, orderId); // 这里传入了订单ID System.out.printf("%s%n", sendResult); } producer.shutdown(); } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { e.printStackTrace(); } }}复制代码消费者
public class Consumer { public static void main(String[] args) throws MQClientException { // 创建消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("消费者组"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 订阅主题 consumer.subscribe("TopicTest", "TagA || TagC || TagD"); // 注册消息监听器,使用的是MessageListenerOrderly consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); // 打印消息 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); }}复制代码从例子中可以看出生产者在发送消息的时候,通过订单ID作为路由信息,将同一个订单ID的消息发送到了同一个消息队列中,保证同一个订单ID的消息有序,那么消费者端是如何保证消息的顺序读取呢?接下来就去看下源码。
2. 顺序消息实现原理
在RocketMQ源码16-consumer消费流程一文中讲到,消费者在启动时会调用DefaultMQPushConsumerImpl的start方法:
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer { /** * 默认的消息推送实现类 */ protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; /** * 启动 */ @Override public void start() throws MQClientException { setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup)); // 启动消费者 this.defaultMQPushConsumerImpl.start(); // ... }}复制代码在DefaultMQPushConsumerImpl的start方法中,对消息监听器类型进行了判断,如果类型是MessageListenerOrderly表示要进行顺序消费,此时使用ConsumeMessageOrderlyService对ConsumeMessageService进行实例化,然后调用它的start方法进行启动:
public class DefaultMQPushConsumerImpl implements MQConsumerInner { // 消息消费service private ConsumeMessageService consumeMessageService; public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: // ... // 如果是顺序消费 if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { // 设置顺序消费标记 this.consumeOrderly = true; // 创建consumeMessageService,使用的是ConsumeMessageOrderlyService this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { this.consumeOrderly = false; // 并发消费使用ConsumeMessageConcurrentlyService this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } // 启动ConsumeMessageService this.consumeMessageService.start(); // ... break; // ... } // ... }}复制代码2.1 加锁定时任务
进入到ConsumeMessageOrderlyService的start方法中,可以看到,如果是集群模式,会启动一个定时加锁的任务,周期性的对订阅的消息队列进行加锁,具体是通过调用RebalanceImpl的lockAll方法实现的:
public class ConsumeMessageOrderlyService implements ConsumeMessageService { public void start() { // 如果是集群模式 if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { // 周期性的执行加锁方法 ConsumeMessageOrderlyService.this.lockMQPeriodically(); } catch (Throwable e) { log.error("scheduleAtFixedRate lockMQPeriodically exception", e); } } }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); } } public synchronized void lockMQPeriodically() { if (!this.stopped) { // 进行加锁 this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll(); } }}复制代码2.1.1 消息队列加锁
在RebalanceImpl的lockAll方法中,首先从处理队列表中获取当前消费者订阅的所有消息队列MessageQueue信息,返回数据是一个MAP,key为broker名称,value为broker下的消息队列,接着对MAP进行遍历,处理每一个broker下的消息队列:
获取broker名称,根据broker名称查找broker的相关信息;构建加锁请求,在请求中设置要加锁的消息队列,然后将请求发送给broker,表示要对这些消息队列进行加锁;加锁请求返回的响应结果中包含了加锁成功的消息队列,此时遍历加锁成功的消息队列,将消息队列对应的ProcessQueue中的locked属性置为true表示该消息队列已加锁成功;处理加锁失败的消息队列,如果响应中未包含某个消息队列的信息,表示此消息队列加锁失败,需要将其对应的ProcessQueue对象中的locked属性置为false表示加锁失败;public abstract class RebalanceImpl { public void lockAll() { // 从处理队列表中获取broker对应的消息队列,key为broker名称,value为broker下的消息队列 HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName(); // 遍历订阅的消息队列 Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator(); while (it.hasNext()) { Entry<String, Set<MessageQueue>> entry = it.next(); // broker名称 final String brokerName = entry.getKey(); // 获取消息队列 final Set<MessageQueue> mqs = entry.getValue(); if (mqs.isEmpty()) continue; // 根据broker名称获取broker信息 FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true); if (findBrokerResult != null) { // 构建加锁请求 LockBatchRequestBody requestBody = new LockBatchRequestBody(); // 设置消费者组 requestBody.setConsumerGroup(this.consumerGroup); // 设置ID requestBody.setClientId(this.mQClientFactory.getClientId()); // 设置要加锁的消息队列 requestBody.setMqSet(mqs); try { // 批量进行加锁,返回加锁成功的消息队列 Set<MessageQueue> lockOKMQSet = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000); // 遍历加锁成功的队列 for (MessageQueue mq : lockOKMQSet) { // 从处理队列表中获取对应的处理队列对象 ProcessQueue processQueue = this.processQueueTable.get(mq); // 如果不为空,设置locked为true表示加锁成功 if (processQueue != null) { if (!processQueue.isLocked()) { log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq); } // 设置加锁成功标记 processQueue.setLocked(true); processQueue.setLastLockTimestamp(System.currentTimeMillis()); } } // 处理加锁失败的消息队列 for (MessageQueue mq : mqs) { if (!lockOKMQSet.contains(mq)) { ProcessQueue processQueue = this.processQueueTable.get(mq); if (processQueue != null) { // 设置加锁失败标记 processQueue.setLocked(false); log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup, mq); } } } } catch (Exception e) { log.error("lockBatchMQ exception, " + mqs, e); } } } }}复制代码在RocketMQ源码16-consumer消费流程一文中讲到,消费者需要先向Broker发送拉取消息请求,从Broker中拉取消息,拉取消息请求构建在RebalanceImpl的updateProcessQueueTableInRebalance方法中,拉取消息的响应结果处理在PullCallback的onSuccess方法中,接下来看下顺序消费时在这两个过程中是如何处理的。
2.2 拉取消息
上面已经知道,在使用顺序消息时,会周期性的对订阅的消息队列进行加锁,不过由于负载均衡等原因,有可能给当前消费者分配新的消息队列,此时可能还未来得及通过定时任务加锁,所以消费者在构建消息拉取请求前会再次进行判断,如果processQueueTable中之前未包含某个消息队列,会先调用lock方法进行加锁,lock方法的实现逻辑与lockAll基本一致,如果加锁成功构建拉取请求进行消息拉取,如果加锁失败,则跳过继续处理下一个消息队列:
public abstract class RebalanceImpl { private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) { // ... List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); // 遍历队列集合 for (MessageQueue mq : mqSet) { // 如果processQueueTable之前不包含当前的消息队列 if (!this.processQueueTable.containsKey(mq)) { // 如果是顺序消费,调用lock方法进行加锁,如果加锁失败不往下执行,继续处理下一个消息队列 if (isOrder && !this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } // ... // 如果偏移量大于等于0 if (nextOffset >= 0) { // 放入处理队列表中 ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); } else { // 如果之前不存在,构建PullRequest,之后对请求进行处理,进行消息拉取 log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } else { log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); } } } // 添加消息拉取请求 this.dispatchPullRequest(pullRequestList); return changed; } public boolean lock(final MessageQueue mq) { // 获取broker信息 FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true); if (findBrokerResult != null) { // 构建加锁请求 LockBatchRequestBody requestBody = new LockBatchRequestBody(); requestBody.setConsumerGroup(this.consumerGroup); requestBody.setClientId(this.mQClientFactory.getClientId()); // 设置要加锁的消息队列 requestBody.getMqSet().add(mq); try { // 发送加锁请求 Set<MessageQueue> lockedMq = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000); for (MessageQueue mmqq : lockedMq) { ProcessQueue processQueue = this.processQueueTable.get(mmqq); // 如果加锁成功设置成功标记 if (processQueue != null) { processQueue.setLocked(true); processQueue.setLastLockTimestamp(System.currentTimeMillis()); } } boolean lockOK = lockedMq.contains(mq); log.info("the message queue lock {}, {} {}", lockOK ? "OK" : "Failed", this.consumerGroup, mq); return lockOK; } catch (Exception e) { log.error("lockBatchMQ exception, " + mq, e); } } return false; }}复制代码2.3 顺序消息消费
在PullCallback的onSuccess方法中可以看到,如果从Broker拉取到消息,会调用ConsumeMessageService的submitConsumeRequest方法将消息提交到ConsumeMessageService中进行消费:
public class DefaultMQPushConsumerImpl implements MQConsumerInner { public void pullMessage(final PullRequest pullRequest) { // ... // 拉取消息回调函数 PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { // 处理拉取结果 pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); // 判断拉取结果 switch (pullResult.getPullStatus()) { case FOUND: // ... // 如果未拉取到消息 if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { // 将拉取请求放入到阻塞队列中再进行一次拉取 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { // ... // 如果拉取到消息,将消息提交到ConsumeMessageService中进行消费 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); // ... } // ... } } } }; }}复制代码前面知道顺序消费时使用的是ConsumeMessageOrderlyService,首先在ConsumeMessageOrderlyService的构造函数中可以看到
初始化了一个消息消费线程池,也就是说顺序消费时也是开启多线程进行消费的:
public class ConsumeMessageOrderlyService implements ConsumeMessageService { public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerOrderly messageListener) { // ... // 设置消息消费线程池 this.consumeExecutor = new ThreadPoolExecutor( this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl(consumeThreadPrefix)); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_")); }}复制代码接下来看submitConsumeRequest方法,可以看到构建了ConsumeRequest对象,将拉取的消息提交到了消息消费线程池中进行消费:
public class ConsumeMessageOrderlyService implements ConsumeMessageService { @Override public void submitConsumeRequest( final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispathToConsume) { if (dispathToConsume) { // 构建ConsumeRequest ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue); this.consumeExecutor.submit(consumeRequest); } } }复制代码2.3.1 消费时的消息队列锁
ConsumeRequest是ConsumeMessageOrderlyService的内部类,它有两个成员变量,分别为MessageQueue消息队列和它对应的处理队列ProcessQueue对象。
在run方法中,对消息进行消费,处理逻辑如下:
public class ConsumeMessageOrderlyService implements ConsumeMessageService { class ConsumeRequest implements Runnable { private final ProcessQueue processQueue; // 消息队列对应的处理队列 private final MessageQueue messageQueue; // 消息队列 public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) { this.processQueue = processQueue; this.messageQueue = messageQueue; } @Override public void run() { // 处理队列如果已经被置为删除状态,跳过不进行处理 if (this.processQueue.isDropped()) { log.warn("run, the message queue not be able to consume, because it"s dropped. {}", this.messageQueue); return; } // 获取消息队列的对象锁 final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); // 对象消息队列的对象锁加锁 synchronized (objLock) { // 如果是广播模式、或者当前的消息队列已经加锁成功并且加锁时间未过期 if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) { final long beginTime = System.currentTimeMillis(); for (boolean continueConsume = true; continueConsume; ) { // 判断processQueue是否删除 if (this.processQueue.isDropped()) { log.warn("the message queue not be able to consume, because it"s dropped. {}", this.messageQueue); break; } // 如果是集群模式并且processQueue的加锁失败 if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) && !this.processQueue.isLocked()) { log.warn("the message queue not locked, so consume later, {}", this.messageQueue); // 稍后进行加锁 ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); break; } // 如果是集群模式并且消息队列加锁时间已经过期 if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) && this.processQueue.isLockExpired()) { log.warn("the message queue lock expired, so consume later, {}", this.messageQueue); // 稍后进行加锁 ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); break; } long interval = System.currentTimeMillis() - beginTime; // 如果当前时间距离开始处理的时间超过了最大消费时间 if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) { // 稍后重新进行处理 ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10); break; } // 批量消费消息个数 final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); // 获取消息内容 List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize); defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup()); if (!msgs.isEmpty()) { // ... try { // 加消费锁 this.processQueue.getConsumeLock().lock(); if (this.processQueue.isDropped()) { log.warn("consumeMessage, the message queue not be able to consume, because it"s dropped. {}", this.messageQueue); break; } // 消费消息 status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageOrderlyService.this.consumerGroup, msgs, messageQueue), e); hasException = true; } finally { // 释放消息消费锁 this.processQueue.getConsumeLock().unlock(); } // ... ConsumeMessageOrderlyService.this.getConsumerStatsManager() .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this); } else { continueConsume = false; } } } else { if (this.processQueue.isDropped()) { log.warn("the message queue not be able to consume, because it"s dropped. {}", this.messageQueue); return; } ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100); } } } }}复制代码MessageQueueLock中使用了ConcurrentHashMap存储每个消息队列对应的对象锁,对象锁实际上是一个Object类的对象,从Map中获取消息队列的对象锁时,如果对象锁不存在,则新建一个Object对象,并放入Map集合中:
public class MessageQueueLock { private ConcurrentMap<MessageQueue, Object> mqLockTable = new ConcurrentHashMap<MessageQueue, Object>(); public Object fetchLockObject(final MessageQueue mq) { // 获取消息队列对应的对象锁,也就是一个Object类型的对象 Object objLock = this.mqLockTable.get(mq); // 如果获取为null if (null == objLock) { // 创建对象 objLock = new Object(); // 加入到Map中 Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock); if (prevLock != null) { objLock = prevLock; } } return objLock; }}复制代码2.3.2 消息消费锁
ProcessQueue中持有一个消息消费锁,消费者调用consumeMessage进行消息前,会添加消费锁,上面已经知道在处理拉取到的消息时就已经调用messageQueueLock的fetchLockObject方法获取消息队列的对象锁然后使用syncronized对其加锁,那么为什么在消费之前还要再加一个消费锁呢?
public class ProcessQueue { // 消息消费锁 private final Lock consumeLock = new ReentrantLock(); public Lock getConsumeLock() { return consumeLock; }}复制代码这里讲一个小技巧,如果在查看源码的时候对某个方法有疑问,可以查看一下这个方法在哪里被调用了,结合调用处的代码处理逻辑进行猜测。
那么就来看下getConsumeLock在哪里被调用了,可以看到除了C的run方法中调用了之外,RebalancePushImpl中的removeUnnecessaryMessageQueue方法也调用了getConsumeLock方法:
removeUnnecessaryMessageQueue方法从名字上可以看出,是移除不需要的消息队列,RebalancePushImpl是与负载均衡相关的类,所以猜测有可能在负载均衡时,需要移除某个消息队列,那么消费者在进行消费的时候就要获取ProcessQueue的consumeLock进行加锁,防止正在消费的过程中,消费队列被移除:
public class RebalancePushImpl extends RebalanceImpl { @Override public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) { this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq); this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq); // 如果是顺序消费并且是集模式 if (this.defaultMQPushConsumerImpl.isConsumeOrderly() && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) { try { // 进行加锁 if (pq.getConsumeLock().tryLock(1000, TimeUnit.MILLISECONDS)) { try { return this.unlockDelay(mq, pq); } finally { pq.getConsumeLock().unlock(); } } else { log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", mq, pq.getTryUnlockTimes()); pq.incTryUnlockTimes(); } } catch (Exception e) { log.error("removeUnnecessaryMessageQueue Exception", e); } return false; } return true; }}复制代码不过在消费者在消费消息前已经对队列进行了加锁,负载均衡的时候为什么不使用队列锁而要使用消费锁?
这里应该是为了减小锁的粒度,因为消费者在对消息队列加锁后,还进行了一系列的判断,校验都成功之后从处理队列中获取消息内容,之后才开始消费消息,如果负载均衡使用消息队列锁就要等待整个过程完成才有可能加锁成功,这样显然会降低性能,而如果使用消息消费锁,就可以减少等待的时间,并且消费者在进行消息消费前也会判断ProcessQueue是否被移除,所以只要保证consumeMessage方法在执行的过程中,ProcessQueue不被移除即可。
3. 总结
目前一共涉及了三把锁,它们分别对应不同的情况:
向Broker申请的消息队列锁
集群模式下一个消息队列同一时刻只能被同一个消费组下的某一个消费者进行,为了避免负载均衡等原因引起的变动,消费者会向Broker发送请求对消息队列进行加锁,如果加锁成功,记录到消息队列对应的ProcessQueue中的locked变量中,它是boolean类型的:
public class ProcessQueue { private volatile boolean locked = false;}复制代码消费者处理拉取消息时的消息队列锁
消费者在处理拉取到的消息时,由于可以开启多线程进行处理,所以处理消息前通过MessageQueueLock中的mqLockTable获取到了消息队列对应的锁,锁住要处理的消息队列,这里加消息队列锁主要是处理多线程之间的竞争:
public class MessageQueueLock { private ConcurrentMap<MessageQueue, Object> mqLockTable = new ConcurrentHashMap<MessageQueue, Object>();复制代码消息消费锁
消费者在调用consumeMessage方法之前会加消费锁,主要是为了避免在消费消息时,由于负载均衡等原因,ProcessQueue被删除:
public class ProcessQueue { private final Lock consumeLock = new ReentrantLock();}复制代码
标签:
- 黄山提升政府采购透明度 助力实体经济持续健康稳定发展
- 1-2月黄山新签“双招双引”项目103个 总投资额139.7亿元
- 池州海关共签发RCEP原产地证书22份 签证金额92.7万美元
- 宿州泗县深入推进文旅融合发展 擦亮城市品牌
- 河北工业生产平稳开局 固定资产投资较快增长
- 昆明西山区深入实施人才强区战略 建立健全招商招才引资并轨新模式
- 2月唐山新建商品住宅销售价格与上月持平 同比下降0.8%
- 去年河北电子信息产业实现主营业务收入2367.7亿元 同比增长22.4%
- 绥化望奎以工业化思维为引领 推动肉类加工制造产业腾飞
- 绥化市监局推出多项举措 大力促进有机产品产业发展
- 半路出家无心插柳 杭州西湖区“菌菇宝宝”变废为宝
- 温州鹿城区藤桥主打藤桥熏鸡 近两年销售额年均增长20%以上
- 1-2月安徽限额以上消费品零售额1030.4亿元 同比增长10.4%
- 2021年合肥市茶园面积13.6万亩 产值176350万元
- 淡季不忘引流 京郊民宿市场将很快迎来回暖
- 郴州安仁文旅项目集中开工 总投资1000万元
- 江苏服务业继续保持发展强劲势头 为高质量发展提供有力支撑
- 黄山休宁经开区摸排基础设施项目17个 计划总投资29709万元
- 前两个月宣城出口31亿元 增长34.4%
- 滁州凤阳2021年共接待游客225万人次 旅游综合收入20.25亿元
- 1-2月宿州市民间投资同比增长19.2% 居全省第3位
- 宿州严格审核把关 抢抓发行地方政府专项债券政策机遇
- 1-2月亳州市进出口总值5.5亿元 同比下降17.4%
- 合肥:弘扬茶文化 初步形成一条生态发展之路
- 合肥:建成“数字中国”领先城市 推进城市数字化转型
- 甘肃再续“艾黎情”:探职业教育德技并修
- 【城市守望者】致敬抗“疫”一线的“拆弹专家”
- 浙江绍兴越城区核酸检测结果公布 除1例阳性外其余均为阴性
- 内地首例奥密克戎变异株感染者身体状况如何?来自哪里?专家解读→
- 对变异病毒已有准备!关于中国新冠药物,钟南山发声→
- 江苏睢宁小网格大担当 织就乡村振兴“幸福网”
- 改造老旧小区 共享幸福生活
- 天津静海:群众在哪里,文明实践就延伸到哪里
- 齐齐哈尔:初步判断疫情感染来源为接触新冠病毒污染环境和物品
- 重庆大竹林派出所副所长因对群众态度简单粗暴被停职
- 黑龙江讷河病例感染源初步判断为新冠病毒污染的环境和物品
- 致敬2021
- 浙江瑞安民警捐献造血干细胞:14年前的心愿终将如愿
- “考研房”涨价离谱 律师:借机宰客有违市场伦理
- 广州白云机场:14天内有东莞旅居史的旅客须凭48小时核酸阴性证明乘机
- 浙江绍兴本轮疫情已报告确诊病例145例 无症状感染者1例
- 福建龙岩一男子和前妻斗气 扛着126斤硬币到法院“还钱”
- 重庆这座立交酷似“悟空” 走红 设计师揭秘(图)
- 青海警方破获特大电诈案 涉案流水高达1.7亿
- 云南新增境外输入确诊病例3例
- 黑龙江讷河市5名核酸阳性人员流调溯源:接触被新冠病毒污染的环境和物品
- 男子爱上女主播 假扮女主播闺蜜教其他男粉丝刷单
- 广西三市警方联手破获毒品案 全链条摧毁跨境贩毒团伙
- 广东东莞发现2例无症状感染者,部分镇今起全员筛查
- 从百二秦关到闻道凯旋 一个殉职医生最后的朋友圈
- 浙江发补充说明:三地铁路出行政策随风险等级同步调整
- 内蒙古新增本土确诊病例5例 均在呼伦贝尔满洲里市
- 陕西新增本土确诊病例1例 系隔离酒店工作人员
- 31省份新增新冠肺炎确诊病例76例 其中本土51例
- 浙江新增新冠肺炎确诊病例45例 其中本土44例
- 技能就是财富 技工也是人才
- 黑龙江新增本土确诊病例1例、本土无症状感染者4例
- 冷空气“调休”!我国大部陆续迎回暖 中东部雨雪稀少
- 华北黄淮等地大气扩散条件转差 冷空气将影响中东部
- 别误读了野猪或将不再是“三有”动物
- 您的ETC已到期?当心这个诈骗短信!
- 对回家的“宝贝”少一些关注,也是一种帮助
- 升温!北京今日阳光在线 最高气温将升至8℃
- 那年今日 | 一张漫画涨知识之12月14日
- 40岁男子一觉醒来突然听不见了 原因是……
- 本年度星空压轴大赏上演 双子座流星雨观赏地图来了
- 广东东莞大朗镇报告2例新冠肺炎无症状感染者
- 商丘4885份被盗出生证去哪了?10年“悬案”引关注
- 浙江海宁警方通报国家公祭日女子穿和服逛街
- 厨艺不精调料凑?懒人调料:年轻人的“下厨神器”
- “您的ETC已到期?”警方提醒:当心这个诈骗短信
- “网红”局长的热度 自述:走红后我就没有周末了
- 寻回被拐10年的儿子后又送走 儿子:害我没家了
- 小城里的三张面孔和警号301137
- 倡导“就地过年”,需因地制宜科学防疫
- 别用“入乡随俗”为星巴克找借口
- 北京地铁14号线年底全线贯通运营
- 天津市从入境人员中检出奥密克戎变异株
- “外滩活地图”黄俊:一个不想出圈的段子手交警
- 寻找一双儿女的25年
- 无锡市场监管部门责成星巴克涉事门店停业整改
- 海岛警事:为了一座岛和2900平方公里的海
- 北京民警宏福苑抗疫26天:“今夜我和雪花一起出发”
- 星巴克的“金标准”缘何败给了“潜规则”
- 患者被低价药“惊呆”的场面应该更多些
- 影视剧“超前点评”不止是“低级错误”
- “南昌鹦鹉案”下发不起诉决定书 网店上架费氏牡丹鹦鹉被拒
- 河南商丘4885份出生医学证明被盗始末追踪
- 绍兴市病例62-109活动轨迹公布
- 12月7日以来,杭州累计报告新冠肺炎确诊病例19例
- 浙江绍兴新增确诊病例37例 上虞区占36例
- 河南高院对张成功案作出死刑判决
- 四川一滑雪场停电游客被困索道 官方回应
- 浙江绍兴越城区新增1例新冠肺炎确诊病例 当地对防控区域划分进行调整
- 中国内地首次检出新冠病毒奥密克戎变异株
- 知网除了涉及著作权纠纷,是否涉嫌违反《反垄断法》?
- 浙江绍兴越城区新增1例新冠肺炎确诊病例
- 四川眉山千箱柑橘送往呼和浩特市抗疫一线
- 两名青年男女探险三亚落笔洞遗址被困沼泽 消防成功救援
- 中国地理学大会在福州发布《中国地理学界碳中和科技行动福州宣言》
广告
广告
- RocketMQ源码-顺序消费实现原理_天天看点
- 为乡村注入活力 90后乡村规划师驻村记
- 最新免费qq号和密码大全_免费qq号带密码大全
- 吃什么降胆固醇高的食物有哪些_吃什么降胆固醇
- “五一”连珠穆拉玛峰都堵了?视频发布者:系误传!
- 劳动者的十二时辰 致敬全国各地辛勤工作的工程建设者们 当前热门
- 共同配送也称越库配送 共同配送
- 五一出游玩法多 哪一种你最心仪? 全球快播报
- 净水器十大品牌排名有哪些牌子_净水器十大品牌排名有哪些
- 今头条!190万人次!再创历史新高
- 当前热点-预计4月30日发送旅客360万人次左右,长三角铁路全力应对持续大客流
- 鸽子拉水便怎么治疗_具体操作方法总结
- 天天新动态:克洛普:若能在积分上领先布莱顿和维拉,这已经算是成功了
- 每日快播:石首市金象矿山机械有限公司
- 这些藏在海南山川湖海里的美食,跋山涉水也要吃到!
- 2023年5月1日广东省绿豆芽批发价格行情 每日讯息
- 是非经过不知难的上一句是什么_事非经过不知难的上一句|新消息
- “庆五一·迎亚运”主题灯光秀闪耀杭城 连续5天
- 一季度业绩暴增!全面注册制下 券商要“翻身”?
- 小型气泵空压机租赁-小型气泵空压机





