Kafka如何实现顺序消费?

Kafka的消息是存储在指定的topic中的某个partition中的。并且一个topic是可以有多个partition的。同一个partition中的消息是有序的,但是跨partition,或者跨topic的消息就是无序的了。

为什么同一个partition的消息是有序的?

因为当生产者向某个partition发送消息时,消息会被追加到该partition的日志文件(log)中,并且被分配一个唯一的 offset,文件的读写是有顺序的。而消费者在从该分区消费消息时,会从该分区的最早 offset 开始逐个读取消息,保证了消息的顺序性。

基于此,想要实现消息的顺序消费,可以有以下几个办法:

  1. 在一个topic中,只创建一个partition,这样这个topic下的消息都会按照顺序保存在同一个partition中,这就保证了消息的顺序消费。
  2. 发送消息的时候指定partition,如果一个topic下有多个partition,那么我们可以把需要保证顺序的消息都发送到同一个partition中,这样也能做到顺序消费。

RocketMQ如何保证消息的顺序性?

和Kafka只支持同一个Partition内消息的顺序性一样,RocketMQ中也提供了基于队列(分区)的顺序消费。即同一个队列内的消息可以做到有序,但是不同队列内的消息是无序的!
当我们作为MQ的生产者需要发送顺序消息时,需要在send方法中,传入一个MessageQueueSelector。
MessageQueueSelector中需要实现一个select方法,这个方法就是用来定义要把消息发送到哪个MessageQueue的,通常可以使用取模法进行路由:

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
     @Override
   //mqs:该Topic下所有可选的MessageQueue
     //msg:待发送的消息
     //arg:发送消息时传递的参数
     public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        //根据参数,计算出一个要接收消息的MessageQueue的下标
        int index = id % mqs.size();
        //返回这个MessageQueue
        return mqs.get(index);
    }
}, orderId);

通过以上形式就可以将需要有序的消息发送到同一个队列中。需要注意的时候,这里需要使用同步发送的方式!
消息按照顺序发送的消息队列中之后,那么,消费者如何按照发送顺序进行消费呢?
RocketMQ的MessageListener回调函数提供了两种消费模式,有序消费模式MessageListenerOrderly和并发消费模式MessageListenerConcurrently。所以,想要实现顺序消费,需要使用MessageListenerOrderly模式接收消息:

consumer.registerMessageListener(new MessageListenerOrderly() {
    Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs ,ConsumeOrderlyContext context) {
         System.out.printf("Receive order msg:" + new String(msgs.get(0).getBody()));
         return ConsumeOrderlyStatus.SUCCESS ; 
    }
});

当我们用以上方式注册一个消费之后,为了保证同一个队列中的有序消息可以被顺序消费,就要保证RocketMQ的Broker只会把消息发送到同一个消费者上,这时候就需要加锁了。
在实现中,ConsumeMessageOrderlyService 初始化的时候,会启动一个定时任务,会尝试向 Broker 为当前消费者客户端申请分布式锁。如果获取成功,那么后续消息将会只发给这个Consumer。
接下来在消息拉取的过程中,消费者会一次性拉取多条消息的,并且会将拉取到的消息放入 ProcessQueue,同时将消息提交到消费线程池进行执行。
那么拉取之后的消费过程,怎么保证顺序消费呢?这里就需要更多的锁了。
RocketMQ在消费的过程中,需要申请 MessageQueue 锁,确保在同一时间,一个队列中只有一个线程能处理队列中的消息。
获取到 MessageQueue 的锁后,就可以从ProcessQueue中依次拉取一批消息处理了,但是这个过程中,为了保证消息不会出现重复消费,还需要对ProcessQueue进行加锁。(这个在扩展知识中展开)
然后就可以开始处理业务逻辑了。
总结下来就是三次加锁,先锁定Broker上的MessageQueue,确保消息只会投递到唯一的消费者,对本地的MessageQueue加锁,确保只有一个线程能处理这个消息队列。对存储消息的ProcessQueue加锁,确保在重平衡的过程中不会出现消息的重复消费。

第三把锁有什么用?

前面介绍客户端加锁过程中,一共加了三把锁,那么,有没有想过这样一个问题,第三把锁如果不加的话,是不是也没问题?
因为我们已经对MessageQueue加锁了,为啥还需要对ProcessQueue再次加锁呢?
这里其实主要考虑的是重平衡的问题。
当我们的消费者集群,新增了一些消费者,发生重平衡的时候,某个队列可能会原来属于客户端A消费的,但是现在要重新分配给客户端B了。
这时候客户端A就需要把自己加在Broker上的锁解掉,而在这个解锁的过程中,就需要确保消息不能在消费过程中就被移除了,因为如果客户端A可能正在处理一部分消息,但是位点信息还没有提交,如果客户端B立马去消费队列中的消息,那存在一部分数据会被重复消费。
那么如何判断消息是否正在消费中呢,就需要通过这个ProcessQueue上面的锁来判断了,也就是说在解锁的线程也需要尝试对ProcessQueue进行加锁,加锁成功才能进行解锁操作。以避免过程中有消息消费。

从业务角度思考

......

最后修改:2025 年 04 月 26 日 10 : 01 PM
如果我的文章对你有用,请随意赞赏