一、ConsumeFromWhere

RocketMQ的Consumer有一个consumeFromWhere属性,表示Consumer启动后将从哪个位置开始消费消息,该属性取值有以下情况:

/**
 * Consuming point on consumer booting. There are three consuming points:
 * CONSUME_FROM_LAST_OFFSET: consumer clients pick up where it stopped previously. If it were a newly booting up consumer client, according aging of the consumer group, there are two cases:
 * if the consumer group is created so recently that the earliest message being subscribed has yet expired, which means the consumer group represents a lately launched business, consuming will start from the very beginning;
 * if the earliest message being subscribed has expired, consuming will start from the latest messages, meaning messages born prior to the booting timestamp would be ignored.
 * CONSUME_FROM_FIRST_OFFSET: Consumer client will start from earliest messages available.
 * CONSUME_FROM_TIMESTAMP: Consumer client will start from specified timestamp, which means messages born prior to consumeTimestamp will be ignored
 */
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;

二、源码分析

consumeFromWhere在使用过程中,发现设置CONSUME_FROM_LAST_OFFSET后,有些情况仍然从头开始消费,接下来结合源码分析一下consumeFromWhere处理逻辑。

2.1 RocketMQ客户端consumeFromWhere处理逻辑

Consumer启动是调用computePullFromWhere(MessageQueue mq)方法获取offset,以下为该方法的源码实现:
org.apache.rocketmq.client.impl.consumer.RebalancePushImpl.java

@Override
public long computePullFromWhere(MessageQueue mq) {
    long result = -1;
    final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
    final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
    switch (consumeFromWhere) {
        case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
        case CONSUME_FROM_MIN_OFFSET:
        case CONSUME_FROM_MAX_OFFSET:
        case CONSUME_FROM_LAST_OFFSET: {
            
            // 从broker获取消费者组的offset,此时分为两种情况
            // 1. 该消费者组之前已经存在,则broker返回该消费者组的最后的offset,此时offset > 0
            // 2. 如果是一个新的消费者组,则又分为两种情况:
            // 2.1 如果消费者组订阅的消息仍在broker的内存中,则broker返回offset=0
            // 2.2 如果消费者组订阅的消息不在broker的内存中,则broker返回未查找到,客户端将offset设置为-1
            long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
            
            // 消费者组之前已经存在(offset>0),或新消费者组且订阅的历史消息仍存在broker内存中(offset=0)
            if (lastOffset >= 0) {
                result = lastOffset;
            }
            
            // 新消费者组且历史消息不在broker的内存中
            // First start,no offset
            else if (-1 == lastOffset) {
                if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    result = 0L;
                } else {
                    // 从最新的消息开始消费
                    try {
                        result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                    } catch (MQClientException e) {
                        result = -1;
                    }
                }
            } else {
                result = -1;
            }
            break;
        }
        case CONSUME_FROM_FIRST_OFFSET: {
            long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
            
            // 消费者组之前已经存在(offset>0),或新消费者组且历史消息仍存在broker的内存中(offset=0)
            if (lastOffset >= 0) {
                result = lastOffset;
            } else if (-1 == lastOffset) {
                result = 0L;
            } else {
                result = -1;
            }
            break;
        }
        case CONSUME_FROM_TIMESTAMP: {
            long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
            
             // 消费者组之前已经存在(offset>0),或新消费者组且历史消息仍存在broker的内存中(offset=0)
            if (lastOffset >= 0) {
                result = lastOffset;
            } else if (-1 == lastOffset) {
                if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    try {
                        result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                    } catch (MQClientException e) {
                        result = -1;
                    }
                } else {
                    // 从时间戳对应的offset开始消费
                    try {
                        long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
                            UtilAll.YYYYMMDDHHMMSS).getTime();
                        result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
                    } catch (MQClientException e) {
                        result = -1;
                    }
                }
            } else {
                result = -1;
            }
            break;
        }

        default:
            break;
    }

    return result;
}

2.2 broker端查询消费者组offset的处理逻辑

broker处理queryConsumerOffset请求源码实现

org.apache.rocketmq.broker.processor.ConsumerManageProcessor

// broker处理客户端查找Consumer的offset的请求
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
    throws RemotingCommandException {
    switch (request.getCode()) {
        case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
            return this.getConsumerListByGroup(ctx, request);
        case RequestCode.UPDATE_CONSUMER_OFFSET:
            return this.updateConsumerOffset(ctx, request);
        case RequestCode.QUERY_CONSUMER_OFFSET:
            return this.queryConsumerOffset(ctx, request);
        default:
            break;
    }
    return null;
}

private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {
        final RemotingCommand response =
            RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
        final QueryConsumerOffsetResponseHeader responseHeader =
            (QueryConsumerOffsetResponseHeader) response.readCustomHeader();
        final QueryConsumerOffsetRequestHeader requestHeader =
            (QueryConsumerOffsetRequestHeader) request
                .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);

        // 从ConsumerOffsetManager中获取offset
        long offset =
            this.brokerController.getConsumerOffsetManager().queryOffset(
                requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());

        if (offset >= 0) {
            responseHeader.setOffset(offset);
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
        } else { // offset小于0,即broker中不存在该消费者组的offset信息(是一个新启动的消费者组)
            // 获取消费队列的最小的offset,如果该队列文件未进行清除,则offset=0
            long minOffset =
                this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
                    requestHeader.getQueueId());
            // 如果minOffset小于等于0,且未被消费的消息仍在内存中则返回offset=0
            if (minOffset <= 0
                && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
                requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
                responseHeader.setOffset(0L);
                response.setCode(ResponseCode.SUCCESS);
                response.setRemark(null);
            } else { // 新启动的消费者组,且积压的消息不在内存中,则返回QUERY_NOT_FOUND
                response.setCode(ResponseCode.QUERY_NOT_FOUND);
                response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");
            }
        }

        return response;
    }

broker查找消费者组offset源码实现

org.apache.rocketmq.broker.offset.ConsumerOffsetManager.java

// broker启动时加载启动用户目录下的store/config/consumerOffset.json文件,该文件保存了对应Topic下的消费者组在队列的offset
@Override
public void decode(String jsonString) {
    if (jsonString != null) {
        ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);
        if (obj != null) {
            this.offsetTable = obj.offsetTable;
        }
    }
}

// 根据Topic、消费者组、队列Id从broker获取offset,如果不存在则返回-1
public long queryOffset(final String group, final String topic, final int queueId) {
    // topic@group
    String key = topic + TOPIC_GROUP_SEPARATOR + group;
    ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
    if (null != map) {
        Long offset = map.get(queueId);
        if (offset != null)
            return offset;
    }

    return -1;
}

总结

  • 如果一个消费者组之前已经启动过,再次启动后,无论consumeFromWhere设置何值,都会根据在broker记录的offset进行消费
  • 如果是一个新启动的消费者组,且消费者组订阅的消息仍在broker的内存中,无论consumeFromWhere设置何值,都将从0开始消费
  • 如果是一个新启动的消费者组,且消费者组订阅的消息不在broker的内存中,根据consumeFromWhere设置的获取对应的offset进行消费

克隆或重置消费者组的offset

根据分析结果,如果是一个已经存在的消费者组,如果想要跳过积压的消息,设置consumeFromWhere是无效的,但是可用通过rocketmq-dashboard或mqadmin重置offset。
如果是新建消费者组,不想要重头开始消费,可以在代码中做时间校验跳过历史消息,或者通过rocketmq-dashboard或mqadmin新建消费者组并重置offset。