1.1:MQ如何保证消息不丢失
1.1.1:哪些环节会有丢消息的可能?
其中,1(发送消息的时候),2(消息到达服务端持久化的时候),4(消费组消费消息的时候)
三个场景都是跨网络的,而跨网络就肯定会有丢消息的可能。
然后关于3这个环节,通常MQ存盘时都会先写入操作系统的缓存page cache中,然后再由操作系统异步的将消息写入硬盘。这个中间有个时间差,就可能会造成消息丢失。如果服务挂了,缓存中还没有来得及写入硬盘的消息就会丢失。
1.1.2:生产者发送消息如何保证消息不丢失
生产者确认,生成者发送消息后,服务端给生产者一个确认的通知,告知生产者这个消息在broker是否写入完成了。
1:rocketmq
rocketmq中,提供了三种不同的发送消息的方式:
//异步发送,这种不需要broker确认,会有丢失消息的可能 myProducer.sendOneway(msg); System.out.println("异步发送成功sendOneway"); //同步发送,需要等待broker的确认,消息最安全但是效率低 sendResult = myProducer.send(msg, 20 * 1000); System.out.println("发送成功同步send:" + sendResult); // 这种为异步发生方式,不会阻塞,会返回结果,收到broker确认后会调用回调函回。在效率与安全之间比较,均衡但不是这种就是最好的。因为使用这种方式那么主线程就不能立即杀掉,也就是说不能立马调用myProducer.shutdown();因为主线程一旦杀掉,那么子线程获取返回结果的也就不能获取到结果了,对性能消耗比较大,会增加客服端的负担。 myProducer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("发送成功异步:" + sendResult); } @Override public void onException(Throwable e) { System.out.println("发送失败异步:" + e.getMessage()); } });2:kafka
kafka同样也提供了这种异步和同步的发送方式
//直接send发送消息,返回的是一个Future。这就相当于异步调用 Future<RecordMetadata> future = producer.send(record); //调用future的get方法才会实际获取到发送的结果,生产者收到这个结果后,就可以指定消息是否成功发送到broker了,这个过程就变成了一个同步的过程。 RecordMetadata recordData = producer.send(record).get();3:RabbitMQ
RabbitMQ则是提供了一个Publisher Confirms生产者确认机制,Publisher收到Broker响应后再触发对应的回调方法。
//获取Channel Channel channel = ? //添加两个回调,一个处理ack响应,一个nack响应 channel.addConfirmListener(ConfirmCallback ackCallBack, ConfirmCallback nackCallBack);这些不同的处理方法的背后,都是一个思路,那就是给生产者响,让生产者知道消息有没有发送成功,如果没有,也可以由生产者自行补救重发,也可以抛出异常,反正都是让生产者自行处理。
4:RocketMq的事务消息机制
RocketMQ的事务消息机制就是为了保证零丢失来设计的,并且经过阿里的验证,肯定是非常靠谱的。
详情见本章节:5.8
1:为什么要发送个half消息?有什么用?
这个half消息是在订单系统进行下单操作前发送,并且对下游服务的消费者是不可见的。那这个消息的作用更多的体现在确认RocketMQ的服务端是否正常。相当于嗅探下RocketMQ服务是否正常,并且通知RocketMQ,我马上就要发一个很重要的消息了,你做好准备。
2:half消息如果写入失败了怎么办?
如果没有half消息这个流程,那我们通常是会在订单系统中先完成下单,再发送消息给MQ。这时候写入消息到MQ如果失败就会非常尴尬了。
而half消息如果写入失败,我们就可以认为MQ的服务是有问题的,这时,就不能通知下游服务了。我们可以在下单时给订单一个状态标记,然后等待MQ服务正常后再进行补偿操作,等MQ服务正常后重新下单通知下游服务。
3:订单系统写数据库失败了怎么办
如果没有使用事务消息,我们只能判断下单失败,抛出了异常,那就不往MQ发消息了,这样至少保证不会对下游服务进行错误的通知。但是这样的话,如果过一段时间数据库恢复过来了,这个消息就无法再次发送了。当然,也可以设计另外的补偿机制,例如将订单数据缓存起来,再启动一个线程定时尝试往数据库写。而如果使用事务消息机制,就可以有一种更优雅的方案。
如果下单时,写数据库失败(可能是数据库崩了,需要等一段时间才能恢复)。那我们可以另外找个地方把订单消息先缓存起来(Redis、文本或者其他方式),然后给RocketMQ返回一个UNKNOWN状态。这样RocketMQ就会过一段时间来回查事务状态。我们就可以在回查事务状态时再尝试把订单数据写入数据库,如果数据库这时候已经恢复了,那就能完整正常的下单,再继续后面的业务。这样这个订单的消息就不会因为数据库临时崩了而丢失。
4:half消息写入成功后RocketMQ挂了怎么办?
在事务消息的处理机制中,未知状态的事务状态回查是由RocketMQ的Broker主动发起的。也就是说如果出现了这种情况,那RocketMQ就不会回调到事务消息中回查事务状态的服务。这时,我们就可以将订单一直标记为"新下单"的状态。而等RocketMQ恢复后,只要存储的消息没有丢失,RocketMQ就会再次继续状态回查的流程。
5:下单成功后如何优雅的等待支付成功?
最简单的方式是启动一个定时任务,每隔一段时间扫描订单表,比对未支付的订单的下单时间,将超过时间的订单回收。这种方式显然是有很大问题的,需要定时扫描很庞大的一个订单信息,这对系统是个不小的压力。
其次是可以使用RocketMQ提供的延迟消息机制。往MQ发一个延迟1分钟的消息,消费到这个消息后去检查订单的支付状态,如果订单已经支付,就往下游发送下单的通知。而如果没有支付,就再发一个延迟1分钟的消息。最终在第十个消息时把订单回收。这个方案就不用对全部的订单表进行扫描,而只需要每次处理一个单独的订单消息。
如果使用上了事务消息。我们就可以用事务消息的状态回查机制来替代定时的任务。在下单时,给Broker返回一个UNKNOWN的未知状态。而在状态回查的方法中去查询订单的支付状态。这样整个业务逻辑就会简单很多。我们只需要配置RocketMQ中的事务消息回查次数(默认15次)和事务回查间隔时间(messageDelayLevel),就可以更优雅的完成这个支付状态检查的需求。
6:事务消息机制的作用
整体来说,在订单这个场景下,消息不丢失的问题实际上就还是转化成了下单这个业务与下游服务的业务的分布式事务一致性问题。而事务一致性问题一直以来都是一个非常复杂的问题。而RocketMQ的事务消息机制,实际上只保证了整个事务消息的一半,他保证的是订单系统下单和发消息这两个事件的事务一致性,而对下游服务的事务并没有保证。但是即便如此,也是分布式事务的一个很好的降级方案。
1.1.3:Broker刷盘写入数据如何保证不丢失
1:RocketMq
保证消息在broker端不丢失就涉及到一个概念,就是PageCache缓存。
数据会先写入缓存再刷到磁盘,这个过程中如果服务器发生了异常比如断电等等,缓存中的数据还没写入到磁盘,数据就会丢失。
所以我们就可以采用同步刷盘的方式,可以调用操作系统的提供的sync系统调用,申请一次刷盘操作,主动的将PageCache中的数据写入到磁盘,RocketMq的broker提供了配置项flushDiskType,有两个可选项,分别是SYNC_FLUSH(同步刷盘)和ASYNC_FLUSH(异步刷盘)
SYNC_FLUSH(同步刷盘)
Broker每往日志文件中写入一条数据,就会申请一次刷盘操作。
ASYNC_FLUSH(异步刷盘)
Broker每隔固定的时间(可以配置,默认200ms),才会去调用一次刷盘操作。
异步刷盘性能更加文档,但是会有丢失消息的可能。同步刷盘安全性更高,但是操作系统的压力会更大。
在RocketMq中,就算是同步刷盘,其实也不是写一次消息就刷一次盘,其同步刷盘方式实现方式其实是以10ms的间隔去调用的刷盘操作,从理论上来说,还是会有丢失消息的可能,但是这一套同步刷盘机制已经很不错了,可以满足绝大部分业务场景。
2:Kafka
在kafka中,并没有同步刷盘和异步刷盘的区别,不过可以使用一些参数来管理刷盘的频率
flsh.ms:多久进行一次强制刷盘 log.flsh.interval.message:表示的那个同一个Partiton的消息数量达到这个数量时,就会申请一次刷盘操作,默认是Long.MAX log.flsh.interval.ms:当一个消息在内存中保留的时间达到这个值的时候,就会申请一次刷盘操作,默认值是空的,如果这个为空,那么下一个参数就会生效 log.flsh.scheduler.interval.ms:检查是否有日志文件需要进行刷盘的评率,默认值也是Long.MAX3:RabbitMq
对于Classic经典队列,即便声明了持久化,RabbitMQ服务端也不会实时调用fsync,因此无法保证服务端消息不丢失,对于Stream流式队列,则更加直接,不会调用fsync进行刷盘,而是交由操作系统自行刷盘。
1.1.4:Broker主从同步如何保证消息不丢失
对于RocketMq的broker来说,通常slave的作用就是做数据备份,的那个master节点失效宕机甚至是磁盘坏了后,就可以从slave子节点获取信息,但是如果主从同步的时候失败了,那么在broker中这一层保证就会失效,因此主从同步也有可能会造成数据丢失。这里我们就可以用Dleger高可用集群。
1:RocketMQ的消息持久化机制
- CommitLog:消息真正的存储文件,所有的消息都存在CommitLog文件中。
RoktMO默认会将数据先存储在内存中俗一个缓存区中,每当缓冲区中的数据积累到一定的数量或者一定的时间后,就会将缓存区中的消品批量的写入到磁盘上的CommitLog文件中。消息写入CommitLog文件中后就可以被消费者消费了。
Commitlog文件的大小固定1G,写满之后生成新的文件,并且采用的是顺序写的方式。
- ConsumeQueue:消息消费的逻辑队列,类似数据库的索引文件。
RoketMQ 中每个主题下的每个消息队列都会对应一个 ConsumeQueue,ConsumeQueu存储了消息的ofise以及该offset对应的消息在CommitLog文件中的位置信息,便于消费者速定位并消费消息。
每个ConsumeQueue文件固定由30万个固定大小2obyte的数据块组成。内容包括msgPhyOffset(8byte,消息在文件中的其实位置)+msgSize(4byte,消息在文件中占用的长度)+msgTagCode(8byte,消息tag的hash值)。
- IndexFile:消息索引文件,主要存储消息Key与offset的对应关系,提升消息检索速度。
如果生产者在发送消息时设置了消息Key,那么RocketMQ会将消息Key值和消息的物理偏移量(offset)存储在IndexFile文件中,这样当消费者需要根据消息Key查消息时,就可以直接在IndexFile文件中查找对应的offset,然后通过ConsumeQueue文件快速定位并消费消息。
2:三个角色构成的消息储存结构如下:
3:消息存储过程:
4:Dleger高可用集群
dleger文件同步过程
在使用Dledger技术搭建的RocketMQ集群中,Dledger会通过两阶段提交的方式保证文件在主从之间成功同步。
简单来说,数据同步会通过两个阶段,一个是uncommitted阶段,一个是commited阶段。
Leader Broker上的Dledger收到一条数据后,会标记为uncommitted状态,然后他通过自己的DledgerServer组件把这个uncommitted数据发给Follower Broker的DledgerServer组件。
接着Follower Broker的DledgerServer收到uncommitted消息之后,必须返回一个ack给Leader Broker的Dledger。然后如果Leader Broker收到超过半数的Follower Broker返回的ack之后,就会把消息标记为committed状态。
再接下来, Leader Broker上的DledgerServer就会发送committed消息给Follower Broker上的DledgerServer,让他们把消息也标记为committed状态。这样,就基于Raft协议完成了两阶段的数据同步。
1.1.5:消费者如何保证消息不丢失
消费组在消息处理完成后,需要给broker一个响应,表示消息被正常处理了。如果broker端没有收到这个响应,不管是consumer没有拿到这个消息,还是处理完成了没有给出响应,broker端都会认为没有处理成功,就会重新投递这些消息。rockermq和kafka是依据offset机制来重新投递的,而Rabbitmq消息重新入队来处理的。
正常情况下,消费者端都是需要先处理本地事务,然后再给MQ一个ACK响应,这时MQ就会修改Offset,将消息标记为已消费,从而不再往其他消费者推送消息。所以在Broker的这种重新推送机制下,消息是不会在传输过程中丢失的。但是如果消费者端启动新线程来处理业务逻辑,然后主线程中给broker响应CONSUM_SUCCESS,结果处理业务逻辑的线程执行失败了,也是会造成消息丢失的,所以不建议这么做,或者要控制好线程并发的异常情况。
1.1.6:MQ服务全挂了如何保证不丢失
设计一个降级方案来处理这个问题了。例如在订单系统中,如果多次尝试发送RocketMQ不成功,那就只能另外找给地方(Redis、文件或者内存等)把订单消息缓存下来,然后起一个线程定时的扫描这些失败的订单消息,尝试往RocketMQ发送。这样等RocketMQ的服务恢复过来后,就能第一时间把这些消息重新发送出去。
1.1.7:MQ消息零丢失方案总结
生产者使用事务消息机制。
Broker配置同步刷盘+Dledger主从架构
消费者不要使用异步消费。
整个MQ挂了之后准备降级方案
1.2:MQ如何保证消息顺序
代码见博客5.5章节
MQ的顺序问题分为全局有序和局部有序。
全局有序:整个MQ系统的所有消息严格按照队列先入先出顺序进行消费。
局部有序:只保证一部分关键消息的消费顺序。
其实在大部分的MQ业务场景,我们只需要能够保证局部有序就可以了。例如我们用QQ聊天,只需要保证一个聊天窗口里的消息有序就可以了。而对于电商订单场景,也只要保证一个订单的所有消息是有序的就可以了。至于全局消息的顺序,并不会太关心。而通常意义下,全局有序都可以压缩成局部有序的问题。例如以前我们常用的聊天室,就是个典型的需要保证消息全局有序的场景。但是这种场景,通常可以压缩成只有一个聊天窗口的QQ来理解。即整个系统只有一个聊天通道,这样就可以用QQ那种保证一个聊天窗口消息有序的方式来保证整个系统的全局消息有序。
会通过MessageQueue轮询的方式保证消息尽量均匀的分布到所有的MessageQueue上,而消费者也就同样需要从多个MessageQueue上消费消息。而MessageQueue是RocketMQ存储消息的最小单元,他们之间的消息都是互相隔离的,在这种情况下,是无法保证消息全局有序的。
而对于局部有序的要求,只需要将有序的一组消息都存入同一个MessageQueue里,这样MessageQueue的FIFO设计天生就可以保证这一组消息的有序。
RocketMQ中,可以在发送者发送消息时指定一个MessageSelector对象,让这个对象来决定消息发入哪一个MessageQueue。这样就可以保证一组有序的消息能够发到同一个MessageQueue里。
1.2.1:生产者端发送消息
使用MessageQueueSelector编写有序消息生产者,有序消息生产者会按照一定的规则将消息发送到同一个队列中,从而保证同一个队列中的消息是有序的。RocketMQ并不保证整个主题内所有队列的消息都是按照发送顺序排列的。
public class Producer { public static void main(String[] args) throws MQClientException { try { DefaultMQProducer producer = new DefaultMQProducer("OrderlyMessageTest"); producer.setNamesrvAddr("localhost:9876"); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 20; i++) { int orderId = i % 10; Message msg = new Message("OrderlyTopic", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId); System.out.printf("%s%n", sendResult); } producer.shutdown(); } catch (Exception e) { e.printStackTrace(); throw new MQClientException(e.getMessage(), null); } } }1.2.2:消费者端消费消息
1:push模式
使用MessageListenerOrderly进行顺序消费与之对应的MessageListenerConcurrently并行消费(这种不能保证消费的顺序)。
MessageListenerOrderly是RocketMq专门提供的一种顺序消费的接口,他可以让消费组按照消息发送的顺序,一个一个的处理。
通过加队列锁的方式实现(有超时机制),一个队列同时只有一个消费者;并且存在一个定时任务,每隔一段时间就会延长锁的时间,直到整个消息队列全部消费结束。
public class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderlyMessageTest"); consumer.setNamesrvAddr("localhost:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("OrderlyTopic", "TagA || TagC || TagD"); consumer.registerMessageListener(new MessageListenerOrderly() { AtomicLong consumeTimes = new AtomicLong(0); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); this.consumeTimes.incrementAndGet(); if ((this.consumeTimes.get() % 2) == 0) { return ConsumeOrderlyStatus.SUCCESS; } else if ((this.consumeTimes.get() % 5) == 0) { context.setSuspendCurrentQueueTimeMillis(3000); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }这个时候如果消费失败了,如果选中的消费模式为MessageListenerConcurrently,那么就会返回RECONSUME_LATER
将失败的这个消息转发到重试队列中,然后接着消费后面的消息,这就无法保证消费的顺序了。
相反如果选择的是MessageListenerOrderly,那么就会返回SUSPEND_CURRENT_QUEUE_A_MOMENT。
意思就是阻塞一段时间当前的队列然后继续从失败的哪里开始消费。
2:pull模式
消费者端自己保证消费的顺序,消费组并发消费时保证消费线程数为1。
RocketMq的消费者可以开启多个消费线程同时消费一个队列中的消息,如果要保证消费的顺序,需要加消费线程数设置为1,这样,在同一个队列中,每个消息只会被单个消线程消费,从而保证消息的顺序性。
另外,通常所谓的保证Topic全局消息有序的方式,就是将Topic配置成只有一个MessageQueue队列(默认是4个)。这样天生就能保证消息全局有序了。这个说法其实就是我们将聊天室场景压缩成只有一个聊天窗口的QQ一样的理解方式。而这种方式对整个Topic的消息吞吐影响是非常大的,如果这样用,基本上就没有用MQ的必要了。这个接口支持按照消息的重试次数进行顺序消费、订单ID等作为消息键来实现顺序消费、批量消费等操作。