RabbitMQ 延迟队列
Eave
2019.10.23 23:01
TODO RabbitMQ 实现延迟队列的两种方式
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
rabbitmq-plugins disable rabbitmq_delayed_message_exchange
package com.qianyan.rabbitmq;
import java.util.Map;
import org.apache.commons.collections4.map.HashedMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.qianyan.constant.RabbitmqConstant;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* RabbitMQ延时队列
* 插件:rabbitmq-delayed-message-exchange
* 插件地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
* 交换机名:exchange: x-delayed-message
* 消息头配置:x-delayed-type fanout
* @author 路遥
*/
public class XDelayedMessageProducer
{
private static Logger logger = LoggerFactory.getLogger(XDelayedMessageProducer.class);
public static void producer(String exchange, Object data)
{
String message = data instanceof String ? (String) data : JSON.toJSONString(data);
logger.info("Exchange: {}, Message: {}", exchange, message);
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置RabbitMQ相关信息
factory.setHost(RabbitmqConstant.HOST);
factory.setPort(RabbitmqConstant.PORT);
factory.setUsername(RabbitmqConstant.USERNAME);
factory.setPassword(RabbitmqConstant.PASSWORD);
factory.setVirtualHost(RabbitmqConstant.VHOST);
try
{
// 创建一个新的连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 发送消息到队列中
// basicPublish第一个参数为交换机名称
// 第二个参数为队列映射的路由key
// 第三个参数为消息的其他属性
// 第四个参数为发送信息的主体
// 广播模式的routingKey为空
Map headers = new HashedMap();
// 延迟5秒
headers.put("x-delay", 5000);
AMQP.BasicProperties properties = new AMQP.BasicProperties("text/plain", "UTF-8", headers, 2, 0, null, null, null, null, null, null, null, null, null);
// 消息需要持久化到硬盘
channel.basicPublish(exchange, "", properties, message.getBytes("UTF-8"));
//关闭通道和连接
channel.close();
connection.close();
}
catch(Exception e)
{
// TODO: handle exception
e.printStackTrace();
}
}
}