RocketMQ中的Consumer及ConsumerGroup详解
一、基本概念
消息消费者(Consumer)
负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
消费者组(Consumer Group)
同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。
集群消费(Clustering)
集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
广播消费(Broadcasting)
广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。
二、订阅关系一致
订阅关系一致指的是同一个消费者Group ID下所有Consumer实例所订阅的Topic、Tag必须完全一致。如果订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。
背景信息
消息队列RocketMQ版里的一个消费者Group ID代表一个Consumer实例群组。对于大多数分布式应用来说,一个消费者Group ID下通常会挂载多个Consumer实例。
由于消息队列RocketMQ版的订阅关系主要由Topic+Tag共同组成,因此,保持订阅关系一致意味着同一个消费者Group ID下所有的Consumer实例需在以下方面均保持一致:
- 订阅的Topic必须一致,例如:Consumer1订阅TopicA和TopicB,Consumer2也必须订阅TopicA和TopicB,不能只订阅TopicA、只订阅TopicB或订阅TopicA和TopicC。
- 订阅的同一个Topic中的Tag必须一致,包括Tag的数量和Tag的顺序,例如:Consumer1订阅TopicB且Tag为Tag1||Tag2,Consumer2订阅TopicB的Tag也必须是Tag1||Tag2,不能只订阅Tag1、只订阅Tag2或者订阅Tag2||Tag1。
正确的订阅关系如下,多个Group ID分别订阅了不同的Topic,但是同一个Group ID下的多个Consumer实例C1、C2、C3订阅的Topic和Tag都一致。
三、订阅关系不一致
当RocketMQ订阅关系不一致时,消息消费的逻辑就会混乱,甚至导致消息丢失,接下来我们结合案例及源码做进一步验证分析。
RocketMQ Version : 4.8.0
消费者组内消费者订阅相同的Topic不同的Tag
RocketMQ相同消费者组内订阅相同Topic不同Tag的测试基本信息:
- Topic名称: SUBSCRIBE_TEST
- Topic读队列数量:4
- Topic写队列数量:4
- Topic读写权限:6
- 消费者组名称: SUBSCRIBE_TEST_CONSUMER_GROUP
- 消息TAG类型: TagA,TagB
- TagA消息消费者: ConsumerForTagA
- TagB消息消费者: ConsumerForTagB
消费者
ConsumerForTagA
订阅信息: 订阅TopicSUBSCRIBE_TEST
和tagA
ConsumerForTagB
订阅信息: 订阅TopicSUBSCRIBE_TEST
和tagB
消费者代码示例:
public class ConsumerForTagA {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SUBSCRIBE_TEST_CONSUMER_GROUP");
consumer.setNamesrvAddr("172.16.1.15:9876");
consumer.subscribe("SUBSCRIBE_TEST", "tagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("Receive New Messages: %s %n", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("ConsumerForTagA started!");
}
}
public class ConsumerForTagB {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SUBSCRIBE_TEST_CONSUMER_GROUP");
consumer.setNamesrvAddr("172.16.1.15:9876");
consumer.subscribe("SUBSCRIBE_TEST", "tagB");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("Receive New Messages: %s %n", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("ConsumerForTagB started!");
}
}
先启动ConsumerForTagA再启动ConsumerForTagB
生产者
生产者向SUBSCRIBE_TEST
Topic中写入8条消息,前4条消息的TAG为tagA
,后4条消息的TAG为tagB
.
生产者代码示例
public class SyncProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {
DefaultMQProducer producer = new DefaultMQProducer("SUBSCRIBE_TEST_PRODUCER_GROUP");
producer.setNamesrvAddr("172.16.1.15:9876");
producer.setSendMsgTimeout(15000);
producer.start();
for (int i = 0; i < 8; i++) {
String tag = (i < 4) ? "tagA" : "tagB";
Message msg = new Message("SUBSCRIBE_TEST", tag, ("MsgStr" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
启动生产者发送消息
SendResult [sendStatus=SEND_OK, msgId=AC10DFD6121618B4AAC216EE4B9C0000, offsetMsgId=AC10010F00002A9F00000017038663BE, messageQueue=MessageQueue [topic=SUBSCRIBE_TEST, brokerName=broker-172.16.1.15:10911, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC10DFD6121618B4AAC216EE4C040001, offsetMsgId=AC10010F00002A9F0000001703866483, messageQueue=MessageQueue [topic=SUBSCRIBE_TEST, brokerName=broker-172.16.1.15:10911, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC10DFD6121618B4AAC216EE4C760002, offsetMsgId=AC10010F00002A9F0000001703866548, messageQueue=MessageQueue [topic=SUBSCRIBE_TEST, brokerName=broker-172.16.1.15:10911, queueId=0], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC10DFD6121618B4AAC216EE4C890003, offsetMsgId=AC10010F00002A9F000000170386660D, messageQueue=MessageQueue [topic=SUBSCRIBE_TEST, brokerName=broker-172.16.1.15:10911, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC10DFD6121618B4AAC216EE4CBE0004, offsetMsgId=**AC10010F00002A9F00000017038666D2**, messageQueue=MessageQueue [topic=SUBSCRIBE_TEST, brokerName=broker-172.16.1.15:10911, queueId=2], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=AC10DFD6121618B4AAC216EE4D190005, offsetMsgId=**AC10010F00002A9F0000001703866797**, messageQueue=MessageQueue [topic=SUBSCRIBE_TEST, brokerName=broker-172.16.1.15:10911, queueId=3], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=AC10DFD6121618B4AAC216EE4D380006, offsetMsgId=AC10010F00002A9F000000170386685C, messageQueue=MessageQueue [topic=SUBSCRIBE_TEST, brokerName=broker-172.16.1.15:10911, queueId=0], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=AC10DFD6121618B4AAC216EE4DE50007, offsetMsgId=AC10010F00002A9F0000001703866921, messageQueue=MessageQueue [topic=SUBSCRIBE_TEST, brokerName=broker-172.16.1.15:10911, queueId=1], queueOffset=1]
消费者消费消息情况
ConsumerForTagA未消费到消息。
ConsumerForTagB消费到2条消息,消息内容如下:
Receive New Messages: [MessageExt [brokerName=broker-172.16.1.15:10911, queueId=2, storeSize=197, queueOffset=1, sysFlag=0, bornTimestamp=1630810315966, bornHost=/172.16.0.103:62659, storeTimestamp=1630810316147, storeHost=/172.16.1.15:10911, msgId=**AC10010F00002A9F00000017038666D2**, commitLogOffset=98843387602, bodyCRC=686395636, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='SUBSCRIBE_TEST', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1630810316068, UNIQ_KEY=AC10DFD6121618B4AAC216EE4CBE0004, CLUSTER=DefaultCluster, WAIT=true, TAGS=tagB}, body=[77, 115, 103, 83, 116, 114, 52], transactionId='null'}]]
Receive New Messages: [MessageExt [brokerName=broker-172.16.1.15:10911, queueId=3, storeSize=197, queueOffset=1, sysFlag=0, bornTimestamp=1630810316057, bornHost=/172.16.0.103:62659, storeTimestamp=1630810316223, storeHost=/172.16.1.15:10911, msgId=**AC10010F00002A9F0000001703866797**, commitLogOffset=98843387799, bodyCRC=1609474146, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='SUBSCRIBE_TEST', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1630810316088, UNIQ_KEY=AC10DFD6121618B4AAC216EE4D190005, CLUSTER=DefaultCluster, WAIT=true, TAGS=tagB}, body=[77, 115, 103, 83, 116, 114, 53], transactionId='null'}]]
根据上述测试结果,可以分析得出如下结论:
- 测试Topic的队列数量为4,生产者分别向每个队列总共写入了2条消息,TAG分别为
tagA
和tagB
- 消费者组
SUBSCRIBE_TEST_CONSUMER_GROUP
下有两个消费者,消费者ConsumerForTagA
先启动,消费者ConsumerForTagB
后启动 - 消费者
ConsumerForTagA
消费的队列ID为0
和1
,消费者ConsumerForTagB
消息的队列ID为2
和3
- 消费者
ConsumerForTabA
对分配给自己的队列0
和1
中的TAG为tagA
和tagB
的消息,都未进行消费 - 消费者
ConsumerForTagB
消费了分配给自己的队列2
和3
中的TAG为tagB
的消息,但没有消费TAG为tagA
的消息
总结
消费者ConsumerForTagB
在消费者ConsumerForTabA
之后启动,覆盖了RocketMQ的Broker中的消费者组SUBSCRIBE_TEST_CONSUMER_GROUP
订阅关系,该消费者组在消费者ConsumerForTagB
启动后,订阅的消息变为tagB
,消费者ConsumerForTagA
在消费消息时,由于消费者组的订阅TAG为tagB
,所以过滤掉了tagA
的消息。
源码分析
Broken获取消息的源码
DefaultMessageStore.java
中获取Message的源码, 其中去除了部分边界检测逻辑
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
final int maxMsgNums,
final MessageFilter messageFilter) {
long beginTime = this.getSystemClock().now();
GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
long nextBeginOffset = offset;
long minOffset = 0;
long maxOffset = 0;
GetMessageResult getResult = new GetMessageResult();
final long maxOffsetPy = this.commitLog.getMaxOffset();
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
if (consumeQueue != null) {
minOffset = consumeQueue.getMinOffsetInQueue();
maxOffset = consumeQueue.getMaxOffsetInQueue();
if (maxOffset == 0) {
status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
} else if (offset < minOffset) {
status = GetMessageStatus.OFFSET_TOO_SMALL;
nextBeginOffset = nextOffsetCorrection(offset, minOffset);
} else if (offset == maxOffset) {
status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
nextBeginOffset = nextOffsetCorrection(offset, offset);
} else if (offset > maxOffset) {
status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
if (0 == minOffset) {
nextBeginOffset = nextOffsetCorrection(offset, minOffset);
} else {
nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
}
} else {
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
if (bufferConsumeQueue != null) {
try {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
long nextPhyFileStartOffset = Long.MIN_VALUE;
long maxPhyOffsetPulling = 0;
int i = 0;
final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
maxPhyOffsetPulling = offsetPy;
if (nextPhyFileStartOffset != Long.MIN_VALUE) {
if (offsetPy < nextPhyFileStartOffset)
continue;
}
boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
isInDisk)) {
break;
}
boolean extRet = false, isTagsCodeLegal = true;
if (consumeQueue.isExtAddr(tagsCode)) {
extRet = consumeQueue.getExt(tagsCode, cqExtUnit);
if (extRet) {
tagsCode = cqExtUnit.getTagsCode();
} else {
// can't find ext content.Client will filter messages by tag also.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}",
tagsCode, offsetPy, sizePy, topic, group);
isTagsCodeLegal = false;
}
}
// 这里是messageFilter根据消费队列中的tag值进行过滤,如果队列中的tag与messageFileter中不匹配则跳过该消息
if (messageFilter != null
&& !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
continue;
}
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
if (null == selectResult) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.MESSAGE_WAS_REMOVING;
}
nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
continue;
}
if (messageFilter != null
&& !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
// release...
selectResult.release();
continue;
}
this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
getResult.addMessage(selectResult);
status = GetMessageStatus.FOUND;
nextPhyFileStartOffset = Long.MIN_VALUE;
}
if (diskFallRecorded) {
long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);
}
nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);
} finally {
bufferConsumeQueue.release();
}
} else {
status = GetMessageStatus.OFFSET_FOUND_NULL;
nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
+ maxOffset + ", but access logic queue failed.");
}
}
} else {
status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
}
if (GetMessageStatus.FOUND == status) {
this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
} else {
this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
}
long elapsedTime = this.getSystemClock().now() - beginTime;
this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime);
getResult.setStatus(status);
getResult.setNextBeginOffset(nextBeginOffset);
getResult.setMaxOffset(maxOffset);
getResult.setMinOffset(minOffset);
return getResult;
}
消息过滤器TAG匹配源码
messageFilter.isMatchedByConsumeQueue
的代码实现.
根据代码这里是根据messageFilter中的subscriptionData的订阅TAG进行匹配。
@Override
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
if (null == subscriptionData) {
return true;
}
if (subscriptionData.isClassFilterMode()) {
return true;
}
// by tags code.
if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
if (tagsCode == null) {
return true;
}
if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
return true;
}
// 根据subscriptionData中的订阅tag判断是否匹配
return subscriptionData.getCodeSet().contains(tagsCode.intValue());
} else {
// no expression or no bloom
if (consumerFilterData == null || consumerFilterData.getExpression() == null
|| consumerFilterData.getCompiledExpression() == null || consumerFilterData.getBloomFilterData() == null) {
return true;
}
// message is before consumer
if (cqExtUnit == null || !consumerFilterData.isMsgInLive(cqExtUnit.getMsgStoreTime())) {
log.debug("Pull matched because not in live: {}, {}", consumerFilterData, cqExtUnit);
return true;
}
byte[] filterBitMap = cqExtUnit.getFilterBitMap();
BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter();
if (filterBitMap == null || !this.bloomDataValid
|| filterBitMap.length * Byte.SIZE != consumerFilterData.getBloomFilterData().getBitNum()) {
return true;
}
BitsArray bitsArray = null;
try {
bitsArray = BitsArray.create(filterBitMap);
boolean ret = bloomFilter.isHit(consumerFilterData.getBloomFilterData(), bitsArray);
log.debug("Pull {} by bit map:{}, {}, {}", ret, consumerFilterData, bitsArray, cqExtUnit);
return ret;
} catch (Throwable e) {
log.error("bloom filter error, sub=" + subscriptionData
+ ", filter=" + consumerFilterData + ", bitMap=" + bitsArray, e);
}
}
return true;
}
消息过滤器中订阅数据来源
PullMessageProcessor.java
中的Consumer订阅数据
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
throws RemotingCommandException {
RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
final PullMessageRequestHeader requestHeader =
(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
response.setOpaque(request.getOpaque());
log.debug("receive PullMessage request command, {}", request);
// 此处略去部分代码
final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());
final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());
final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());
final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) {
log.error("the topic {} not exist, consumer: {}", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
return response;
}
if (!PermName.isReadable(topicConfig.getPerm())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");
return response;
}
if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
log.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo);
return response;
}
SubscriptionData subscriptionData = null;
ConsumerFilterData consumerFilterData = null;
if (hasSubscriptionFlag) {
try {
subscriptionData = FilterAPI.build(
requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType()
);
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
consumerFilterData = ConsumerFilterManager.build(
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
requestHeader.getExpressionType(), requestHeader.getSubVersion()
);
assert consumerFilterData != null;
}
} catch (Exception e) {
log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(),
requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
response.setRemark("parse the consumer's subscription failed");
return response;
}
} else {
ConsumerGroupInfo consumerGroupInfo =
this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
// 此处略去部分卫语句判断
// 这里从subscriptionTable获取到了subscriptionData
subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
if (null == subscriptionData) {
log.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
}
if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
log.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
subscriptionData.getSubString());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
response.setRemark("the consumer's subscription not latest");
return response;
}
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(),
requestHeader.getConsumerGroup());
if (consumerFilterData == null) {
response.setCode(ResponseCode.FILTER_DATA_NOT_EXIST);
response.setRemark("The broker's consumer filter data is not exist!Your expression may be wrong!");
return response;
}
if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion()) {
log.warn("The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}",
requestHeader.getConsumerGroup(), requestHeader.getTopic(), consumerFilterData.getClientVersion(), requestHeader.getSubVersion());
response.setCode(ResponseCode.FILTER_DATA_NOT_LATEST);
response.setRemark("the consumer's consumer filter data not latest");
return response;
}
}
}
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())
&& !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());
return response;
}
MessageFilter messageFilter;
if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
} else {
messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
}
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
// 此处略去后续处理的相关代码
return response;
}
Consumer订阅数据更新
ConsumerGroupInfo.java
public boolean updateSubscription(final Set<SubscriptionData> subList) {
boolean updated = false;
for (SubscriptionData sub : subList) {
// Consumer订阅新的Topic后,更新订阅关系
SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
if (old == null) {
SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
if (null == prev) {
updated = true;
log.info("subscription changed, add new topic, group: {} {}",
this.groupName,
sub.toString());
}
} else if (sub.getSubVersion() > old.getSubVersion()) {
if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
log.info("subscription changed, group: {} OLD: {} NEW: {}",
this.groupName,
old.toString(),
sub.toString()
);
}
/**
* 当Consumer的subVersion大于Broker中的subVersion时,则更新订阅信息
* 这里old.getSubVersion()即为最后消费者组内最后一个消费者的启动时间
* 只有当有新的消费者启动,或者消费者进行了重新负载,订阅关系才会被更新
*/
this.subscriptionTable.put(sub.getTopic(), sub);
}
}
Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, SubscriptionData> next = it.next();
String oldTopic = next.getKey();
boolean exist = false;
for (SubscriptionData sub : subList) {
if (sub.getTopic().equals(oldTopic)) {
exist = true;
break;
}
}
if (!exist) {
log.warn("subscription changed, group: {} remove topic {} {}",
this.groupName,
oldTopic,
next.getValue().toString()
);
it.remove();
updated = true;
}
}
this.lastUpdateTimestamp = System.currentTimeMillis();
return updated;
}
总结
Broker在获取消息时会根据subscriptionTable中对应的消费者组的订阅Tag进行消息过滤,同一个消费者组内,Broker中保存了最后启动的Consumer的订阅信息,后续只有当新的消费者添加进来或者消费者组内的消费者进行了重新负载均衡(重新负载的可能原因:消费者新增或移除、队列数量变更),订阅信息才会被更新。