RabbitMQ 延迟队列

Eave 2019.10.23 23:01

TODO RabbitMQ 实现延迟队列的两种方式

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
rabbitmq-plugins disable rabbitmq_delayed_message_exchange
package com.qianyan.rabbitmq;

import java.util.Map;

import org.apache.commons.collections4.map.HashedMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSON;
import com.qianyan.constant.RabbitmqConstant;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * RabbitMQ延时队列
 * 插件:rabbitmq-delayed-message-exchange
 * 插件地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
 * 交换机名:exchange: x-delayed-message
 * 消息头配置:x-delayed-type fanout
 * @author 路遥
 */
public class XDelayedMessageProducer
{
    private static Logger logger = LoggerFactory.getLogger(XDelayedMessageProducer.class);

    public static void producer(String exchange, Object data)
    {
        String message = data instanceof String ? (String) data : JSON.toJSONString(data);

        logger.info("Exchange: {}, Message: {}", exchange, message);

        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        // 设置RabbitMQ相关信息
        factory.setHost(RabbitmqConstant.HOST);
        factory.setPort(RabbitmqConstant.PORT);
        factory.setUsername(RabbitmqConstant.USERNAME);
        factory.setPassword(RabbitmqConstant.PASSWORD);
        factory.setVirtualHost(RabbitmqConstant.VHOST);

        try
        {
            // 创建一个新的连接
            Connection connection = factory.newConnection();

            // 创建一个通道
            Channel channel = connection.createChannel();

            // 发送消息到队列中
            // basicPublish第一个参数为交换机名称
            // 第二个参数为队列映射的路由key
            // 第三个参数为消息的其他属性
            // 第四个参数为发送信息的主体
            // 广播模式的routingKey为空

            Map headers = new HashedMap();
            // 延迟5秒
            headers.put("x-delay", 5000);
            AMQP.BasicProperties properties = new AMQP.BasicProperties("text/plain", "UTF-8", headers, 2, 0, null, null, null, null, null, null, null, null, null);

            // 消息需要持久化到硬盘
            channel.basicPublish(exchange, "", properties, message.getBytes("UTF-8"));

            //关闭通道和连接
            channel.close();
            connection.close();
        }
        catch(Exception e)
        {
            // TODO: handle exception
            e.printStackTrace();
        }
    }
}