Skip to content

Jsran/RMQ

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

16 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

背景

何为延迟队列?

顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。

先看看一下业务场景:

  • 1.订单超时,用户下单后进入支付页面(通常会有超时限制)超过15分钟没有进行操作,那么这个订单就需要作废处理。
  • 2.如何定期检查处于退款状态的订单是否已经退款成功?
  • 3.注册后到现在已经一周的用户,如何发短信撩动。
  • 4.交易信息双重效验防止因系统级/应用级/用户级等各种异常情况发生后导致的全部/部分丢失的订单信息。
  • 5.实现重复通知,默认失败连续通知10次(通知间隔为n*2+1/min),直到消费方正确响应,超出推送上限次数后标记为异常状态,可进行恢复!

延迟队列能做什么?

延迟队列多用于需要延迟工作的场景。最常见的是以下两种场景:

  • 1.延迟消费。比如:

    1.用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单。
    2.用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用。
    
  • 2.延迟重试。比如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试。

如果不使用延迟队列,那么我们只能通过一个轮询扫描程序去完成。

扫表存在的问题是:

  • 1.扫表与数据库长时间连接,在数量量大的情况容易出现连接异常中断,需要更多的异常处理,对程序健壮性要求高
  • 2.在数据量大的情况下延时较高,规定内处理不完,影响业务,虽然可以启动多个进程来处理,这样会带来额外的维护成本,不能从根本上解决。
  • 3.每个业务都要维护一个自己的扫表逻辑。 当业务越来越多时,发现扫表部分的逻辑会重复开发,但是非常类似

延时队列的实现

整个延迟队列主要由4个部分

  • JobInfo用来存放所有Job的元信息。
  • Bucket是一组以时间为维度的有序队列,用来存放所有需要延迟的Job(这里只存放Job Key)。
  • Timer负责实时扫描各个Bucket,并将delay时间大于等于当前时间的Job放入到对应的Ready Queue。
  • Ready Queue存放处于Ready状态的Job(这里只存放Job Key),以供消费程序消费。

image

消息结构 每个Job必须包含一下几个属性:

  1. topic:Job类型。可以理解成具体的业务名称。
  2. orderid:Job的唯一标识。用来检索和删除指定的Job信息。
  3. body:Job的内容,供消费者做具体的业务处理,以json格式存储。

除此之外job的body中还可以设置 notify_time 为指定时间进行消费,如果不设置的话将采用默认的delay(创建topic时配置)时间。

架构设计

整体架构

image

采用master-work架构模式,主要包括4个模块:

  • 1.mster: 主进程,负责管理子进程的创建,销毁,回收以及信号通知
  • 2.timer-N: 负责从redis的zset结构中扫描到期的消息,并负责写入ready 队列,个数可配置,一般2个就行了,因为消息在zset结构是按时间有序的
  • 3.consume-N: 负责从ready队列中读取消息并通知给对应回调接口,个数可配置
  • 4.RedisState: 负责检查redis的服务状态,如果redis宕机,发送告警邮件
流程图

DEMO 体验地址

http://39.106.98.102:8930/RMQS/main/home.do

~~

感谢chenlinzhong提供的开源项目 delayqueue,对本项目的开发提供了非常有意义的参考!

About

Redis Delay Message Queue

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published