博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spring Boot(十四)RabbitMQ延迟队列
阅读量:6962 次
发布时间:2019-06-27

本文共 4326 字,大约阅读时间需要 14 分钟。

一、前言

延迟队列的使用场景:1.未按时支付的订单,30分钟过期之后取消订单;2.给活跃度比较低的用户间隔N天之后推送消息,提高活跃度;3.过1分钟给新注册会员的用户,发送注册邮件等。

<!--more-->

实现延迟队列的方式有两种:

  1. 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能;
  2. 使用rabbitmq-delayed-message-exchange插件实现延迟功能;

注意: 延迟插件rabbitmq-delayed-message-exchange是在RabbitMQ 3.5.7及以上的版本才支持的,依赖Erlang/OPT 18.0及以上运行环境。

由于使用死信交换器相对曲折,本文重点介绍第二种方式,使用rabbitmq-delayed-message-exchange插件完成延迟队列的功能。

二、安装延迟插件

1.1 下载插件

打开官网下载:

选择相应的对应的版本“3.7.x”点击下载。

注意: 下载的是.zip的安装包,下载完之后需要手动解压。

1.2 安装插件

拷贝插件到Docker:

docker cp D:\rabbitmq_delayed_message_exchange-20171201-3.7.x.ez rabbit:/plugins

RabbitMQ在Docker的安装,请参照本系列的上一篇文章:

1.3 启动插件

进入docker内部:

docker exec -it rabbit /bin/bash

开启插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

查询安装的所有插件:

rabbitmq-plugins list

安装正常,效果如下图:

重启RabbitMQ,使插件生效

docker restart rabbit

三、代码实现

3.1 配置队列

import com.example.rabbitmq.mq.DirectConfig;import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configurationpublic class DelayedConfig {    final static String QUEUE_NAME = "delayed.goods.order";    final static String EXCHANGE_NAME = "delayedec";    @Bean    public Queue queue() {        return new Queue(DelayedConfig.QUEUE_NAME);    }    // 配置默认的交换机    @Bean    CustomExchange customExchange() {        Map
args = new HashMap<>(); args.put("x-delayed-type", "direct"); //参数二为类型:必须是x-delayed-message return new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args); } // 绑定队列到交换器 @Bean Binding binding(Queue queue, CustomExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs(); }}

3.2 发送消息

import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessagePostProcessor;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;import java.util.Date;@Componentpublic class DelayedSender {    @Autowired    private AmqpTemplate rabbitTemplate;    public void send(String msg) {        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");        System.out.println("发送时间:" + sf.format(new Date()));        rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() {            @Override            public Message postProcessMessage(Message message) throws AmqpException {                message.getMessageProperties().setHeader("x-delay", 3000);                return message;            }        });    }}

3.3 消费消息

import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;import java.util.Date;@Component@RabbitListener(queues = "delayed.goods.order")public class DelayedReceiver {    @RabbitHandler    public void process(String msg) {        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");        System.out.println("接收时间:" + sdf.format(new Date()));        System.out.println("消息内容:" + msg);    }}

3.4 测试队列

import com.example.rabbitmq.RabbitmqApplication;import com.example.rabbitmq.mq.delayed.DelayedSender;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import java.text.SimpleDateFormat;import java.util.Date;@RunWith(SpringRunner.class)@SpringBootTestpublic class DelayedTest {    @Autowired    private DelayedSender sender;    @Test    public void Test() throws InterruptedException {        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");        sender.send("Hi Admin.");        Thread.sleep(5 * 1000); //等待接收程序执行之后,再退出测试    }}

执行结果如下:

发送时间:2018-09-11 20:47:51接收时间:2018-09-11 20:47:54消息内容:Hi Admin.

完整代码访问我的GitHub:

四、总结

到此为止我们已经使用“rabbitmq-delayed-message-exchange”插件实现了延迟功能,但是需要注意的一点是,如果使用命令“rabbitmq-plugins disable rabbitmq_delayed_message_exchange”禁用了延迟插件,那么所有未发送的延迟消息都将丢失。

转载地址:http://ohmil.baihongyu.com/

你可能感兴趣的文章
Android平台<软硬整合实践技术>_答问集
查看>>
云计算数据中心:想好了再做
查看>>
将一些类型设计成集合模式
查看>>
吉林艺术学院监考人员被指为考生改画 学校回应
查看>>
为什么程序员应该有一台 Mac 个人电脑
查看>>
iOS 开发中 runtime 常用的几种方法
查看>>
JS执行机制(浏览器事件环 vs Node事件环)
查看>>
树形数据结构总结一(堆,Trie,并查集)
查看>>
企业上云的四个阶段
查看>>
九、 一个简单的播放器(各自同步)
查看>>
一步一图,带你了解分布式架构的前世今生!
查看>>
转行程序员深漂的这三年 #3
查看>>
[转载]责任链模式
查看>>
揭秘!双11万亿流量下的分布式缓存系统 Tair
查看>>
《图解HTTP》第4章_返回结果的HTTP状态码-思维导图
查看>>
[译] iPhone X 网页设计
查看>>
webpack入门及踩坑应对指南
查看>>
为什么你学Python效率会这么低?
查看>>
深圳晶泰科技招聘后端工程师,获腾讯Google红杉投资
查看>>
对于数据库优化的理解
查看>>