RocketMQ中ConsumeFromWhere详解
一、ConsumeFromWhere
RocketMQ的Consumer有一个consumeFromWhere
属性,表示Consumer启动后将从哪个位置开始消费消息,该属性取值有以下情况:
/**
* Consuming point on consumer booting. There are three consuming points:
* CONSUME_FROM_LAST_OFFSET: consumer clients pick up where it stopped previously. If it were a newly booting up consumer client, according aging of the consumer group, there are two cases:
* if the consumer group is created so recently that the earliest message being subscribed has yet expired, which means the consumer group represents a lately launched business, consuming will start from the very beginning;
* if the earliest message being subscribed has expired, consuming will start from the latest messages, meaning messages born prior to the booting timestamp would be ignored.
* CONSUME_FROM_FIRST_OFFSET: Consumer client will start from earliest messages available.
* CONSUME_FROM_TIMESTAMP: Consumer client will start from specified timestamp, which means messages born prior to consumeTimestamp will be ignored
*/
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
二、源码分析
consumeFromWhere在使用过程中,发现设置CONSUME_FROM_LAST_OFFSET后,有些情况仍然从头开始消费,接下来结合源码分析一下consumeFromWhere处理逻辑。
2.1 RocketMQ客户端consumeFromWhere处理逻辑
Consumer启动是调用computePullFromWhere(MessageQueue mq)
方法获取offset,以下为该方法的源码实现:org.apache.rocketmq.client.impl.consumer.RebalancePushImpl.java
@Override
public long computePullFromWhere(MessageQueue mq) {
long result = -1;
final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
switch (consumeFromWhere) {
case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
case CONSUME_FROM_MIN_OFFSET:
case CONSUME_FROM_MAX_OFFSET:
case CONSUME_FROM_LAST_OFFSET: {
// 从broker获取消费者组的offset,此时分为两种情况
// 1. 该消费者组之前已经存在,则broker返回该消费者组的最后的offset,此时offset > 0
// 2. 如果是一个新的消费者组,则又分为两种情况:
// 2.1 如果消费者组订阅的消息仍在broker的内存中,则broker返回offset=0
// 2.2 如果消费者组订阅的消息不在broker的内存中,则broker返回未查找到,客户端将offset设置为-1
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
// 消费者组之前已经存在(offset>0),或新消费者组且订阅的历史消息仍存在broker内存中(offset=0)
if (lastOffset >= 0) {
result = lastOffset;
}
// 新消费者组且历史消息不在broker的内存中
// First start,no offset
else if (-1 == lastOffset) {
if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
result = 0L;
} else {
// 从最新的消息开始消费
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
result = -1;
}
}
} else {
result = -1;
}
break;
}
case CONSUME_FROM_FIRST_OFFSET: {
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
// 消费者组之前已经存在(offset>0),或新消费者组且历史消息仍存在broker的内存中(offset=0)
if (lastOffset >= 0) {
result = lastOffset;
} else if (-1 == lastOffset) {
result = 0L;
} else {
result = -1;
}
break;
}
case CONSUME_FROM_TIMESTAMP: {
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
// 消费者组之前已经存在(offset>0),或新消费者组且历史消息仍存在broker的内存中(offset=0)
if (lastOffset >= 0) {
result = lastOffset;
} else if (-1 == lastOffset) {
if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
result = -1;
}
} else {
// 从时间戳对应的offset开始消费
try {
long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
UtilAll.YYYYMMDDHHMMSS).getTime();
result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
} catch (MQClientException e) {
result = -1;
}
}
} else {
result = -1;
}
break;
}
default:
break;
}
return result;
}
2.2 broker端查询消费者组offset的处理逻辑
broker处理queryConsumerOffset请求源码实现
org.apache.rocketmq.broker.processor.ConsumerManageProcessor
// broker处理客户端查找Consumer的offset的请求
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
return this.getConsumerListByGroup(ctx, request);
case RequestCode.UPDATE_CONSUMER_OFFSET:
return this.updateConsumerOffset(ctx, request);
case RequestCode.QUERY_CONSUMER_OFFSET:
return this.queryConsumerOffset(ctx, request);
default:
break;
}
return null;
}
private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
final QueryConsumerOffsetResponseHeader responseHeader =
(QueryConsumerOffsetResponseHeader) response.readCustomHeader();
final QueryConsumerOffsetRequestHeader requestHeader =
(QueryConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);
// 从ConsumerOffsetManager中获取offset
long offset =
this.brokerController.getConsumerOffsetManager().queryOffset(
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
if (offset >= 0) {
responseHeader.setOffset(offset);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
} else { // offset小于0,即broker中不存在该消费者组的offset信息(是一个新启动的消费者组)
// 获取消费队列的最小的offset,如果该队列文件未进行清除,则offset=0
long minOffset =
this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
requestHeader.getQueueId());
// 如果minOffset小于等于0,且未被消费的消息仍在内存中则返回offset=0
if (minOffset <= 0
&& !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
responseHeader.setOffset(0L);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
} else { // 新启动的消费者组,且积压的消息不在内存中,则返回QUERY_NOT_FOUND
response.setCode(ResponseCode.QUERY_NOT_FOUND);
response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");
}
}
return response;
}
broker查找消费者组offset源码实现
org.apache.rocketmq.broker.offset.ConsumerOffsetManager.java
// broker启动时加载启动用户目录下的store/config/consumerOffset.json文件,该文件保存了对应Topic下的消费者组在队列的offset
@Override
public void decode(String jsonString) {
if (jsonString != null) {
ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);
if (obj != null) {
this.offsetTable = obj.offsetTable;
}
}
}
// 根据Topic、消费者组、队列Id从broker获取offset,如果不存在则返回-1
public long queryOffset(final String group, final String topic, final int queueId) {
// topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
if (null != map) {
Long offset = map.get(queueId);
if (offset != null)
return offset;
}
return -1;
}
总结
- 如果一个消费者组之前已经启动过,再次启动后,无论consumeFromWhere设置何值,都会根据在broker记录的offset进行消费
- 如果是一个新启动的消费者组,且消费者组订阅的消息仍在broker的内存中,无论consumeFromWhere设置何值,都将从0开始消费
- 如果是一个新启动的消费者组,且消费者组订阅的消息不在broker的内存中,根据consumeFromWhere设置的获取对应的offset进行消费
克隆或重置消费者组的offset
根据分析结果,如果是一个已经存在的消费者组,如果想要跳过积压的消息,设置consumeFromWhere是无效的,但是可用通过rocketmq-dashboard或mqadmin重置offset。
如果是新建消费者组,不想要重头开始消费,可以在代码中做时间校验跳过历史消息,或者通过rocketmq-dashboard或mqadmin新建消费者组并重置offset。
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 逐光の博客!
评论