安装
RabbitMQ基于erlang,我们需要先安装erlang。
erlang:https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-21.3.8.21-1.el7.x86_64.rpm
RabbitMQ:https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.8.8-1.el7.noarch.rpm
也可以使用费其他版本,centos8就要使用el8,按自己情况来,然后进入界面右上角点击下载(最开始找半天,没看到下载按钮😂),然后可以使用xftp传到linux的文件夹中,我放在/usr/local/software。
生产消费者简单模式(Hello World)
一个生产者P通过队列queue发送信息,消费者C接收信息
具体流程:
- 创建连接工厂对象
- 创建连接获取信道
- 通过信道创建消息队列
- 进行消息的发送与接收操作
生产者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public class Producer { public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.158.135"); factory.setUsername("admin"); factory.setPassword("111");
Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false,null);
String message = "hello world";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("消息发送成功"); } }
|
消费者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class Consumer { public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.158.135"); factory.setUsername("admin"); factory.setPassword("111");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> { System.out.println(new String(message.getBody())); };
CancelCallback cancelCallback = (String consumerTag) -> { System.out.println("消息被中断"); };
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }
|
工作队列模式(Work Queues)
一个生产者通过队列发送大量消息,有多个消费者进行消息接收,其中队列唯一,而消费者不会重复处理消息,通过轮询的方式对消息进行处理。
我们可以在idea里选择多次运行消费者,然后在生产者中手动发送信息,会发现多个消费者轮询获取信息。
通用信道工具类
1 2 3 4 5 6 7 8 9 10 11
| public class ChannelUtils { public static Channel getChannel() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.158.135"); factory.setUsername("admin"); factory.setPassword("111"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class workProducer { public static final String QUEUE_NAME = "WorkQueues";
public static void main(String[] args) throws IOException, TimeoutException { Channel channel = ChannelUtils.getChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Scanner scanner = new Scanner(System.in); while(scanner.hasNext()){ String message = scanner.next(); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("发送消息成功:" + message); } } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class workConsumer { public static final String QUEUE_NAME = "WorkQueues";
public static void main(String[] args) throws IOException, TimeoutException { Channel channel = ChannelUtils.getChannel(); DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> { System.out.println("接收的消息:" + new String(message.getBody())); };
CancelCallback cancelCallback = (String consumerTag) -> { System.out.println("消息被中断"); };
System.out.println("------------C2等待接收信息------------"); channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }
|
消息应答
为了保证MQ的消息在发送过程中不丢失,需要进行消息应答,即消费者在接收消息并处理消息后,会告诉MQ一处理消息,此时MQ才可以将消息进行删除。
一般分为自动应答与手动应答,我们推荐使用手动应答。
应答的方式
1 2 3 4 5 6
| channel.basicAck(···);
channel.basicNack(···);
channel.basicReject(···);
|
消息手动应答(推荐)
可以发现消费者1、2和最开始的工作队列模式一样是轮询获取信息,但一个处理快,一个处理慢,如果我们把处理慢的消费者宕机,由于打开了消息手动应答,本应由消费者2处理的消息不会丢失,而是由消费者1进行处理。生产者和一开始使用的工作队列模式的一致,只需要改变消费者。
消费者1,处理时间短,使用手动应答与线程休眠
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class ackConsumer1 { public static final String QUEUE_NAME = "WorkQueues";
public static void main(String[] args) throws IOException, TimeoutException { Channel channel = ChannelUtils.getChannel(); System.out.println("C1等待处理,处理时间短");
boolean autoAck = false; DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("接收的消息:" + new String(message.getBody(), "UTF-8"));
channel.basicAck(message.getEnvelope().getDeliveryTag(), false); };
CancelCallback cancelCallback = (String consumerTag) -> { System.out.println("消息被中断"); }; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback); } }
|
消费者2,处理时间长
1 2 3 4 5 6 7 8 9 10
| public class ackConsumer2 { public static final String QUEUE_NAME = "WorkQueues";
public static void main(String[] args) throws IOException, TimeoutException { Channel channel = ChannelUtils.getChannel(); System.out.println("C2等待处理,处理时间长"); ··· } }
|
不公平分发(开启预取值)
默认prefetchCount = 0,此时消费者对信息进行轮询处理,自定义prefetchCount>0时,并通过信道的basicQos进行设置,则规定了每一个队列的当前看处理信息的最大数。也就说还是执行轮询操作,但某个消费者信息处理时不应答造成信息阻塞,这个阻塞队列的最大值即prefetchCount,当阻塞数据超过最大值,则该消费者不再接收新消息。而后续消息全由有空闲的消费者处理。
1 2 3
| int prefetchCount = 1; channel.basicQos(prefetchCount);
|
持久化
1 2 3 4
| channel.queueDeclare(queueName, true, false, false, null);
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
|
发布与确认
当我们将队列以及消息进行持久化操作后,信息需要存储到磁盘上才可完成持久化操作,若存储到磁盘的过程发生了宕机,那么持久化也是失败的,所以我们还需要对信息进行发布与确认,保证信息持久化成功。
一般分为以下三种模式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| public class confirmMessage { public static final int MESSAGE_COUNT = 1000;
public static void main(String[] args) throws Exception { confirmMessage.singlePublish(); confirmMessage.batchPublish(); confirmMessage.asynchronousAllPublish(); }
public static void singlePublish() throws Exception{ Channel channel = ChannelUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); channel.confirmSelect();
long begin = System.currentTimeMillis();
for(int i = 0; i < MESSAGE_COUNT; i++){ String message = i + ""; channel.basicPublish("", queueName, null, message.getBytes()); channel.waitForConfirms(); }
long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "条数据,单一处理需要耗时" + (end - begin) + "ms"); }
public static void batchPublish() throws Exception { ··· int batchSize = 100; for(int i = 0; i < MESSAGE_COUNT; i++){ String message = i + ""; channel.basicPublish("", queueName, null, message.getBytes()); if((i + 1) % batchSize == 0){ channel.waitForConfirms(); } } long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "条数据,批量处理需要耗时" + (end - begin) + "ms"); }
public static void asynchronousAllPublish() throws Exception{ ··· ConcurrentSkipListMap<Long, String> map = new ConcurrentSkipListMap<>(); ConfirmCallback ackCallback = (long deliveryTag, boolean mutiple) -> { if(mutiple){ ConcurrentNavigableMap<Long, String> confirm = map.headMap(deliveryTag); confirm.clear(); }else { map.remove(deliveryTag); } System.out.println("确认的消息" + deliveryTag); }; ConfirmCallback nackCallback = (long deliveryTag, boolean mutiple) -> { System.out.println("未确认的消息" + deliveryTag); }; channel.addConfirmListener(ackCallback, nackCallback); for(int i = 0; i < MESSAGE_COUNT; i++){ String message = "消息" + i; channel.basicPublish("", queueName, null, message.getBytes()); map.put(channel.getNextPublishSeqNo(), message); } long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "条数据,异步处理需要耗时" + (end - begin) + "ms"); } }
|
Exchanges交换机
- 默认交换机:代码中就是使用””空字符串,即不使用
- 直连交换机direct:对应路由模式,使用RoutingKey匹配队列
- 主题交换机topic:对应主题模式
- 首部交换机headers
- 扇出交换机fanout :对应发布与订阅模式,相当于广播
临时队列
也就是没有实现持久化的队列,其名字随机,断开消费者连接后,队列会自动删除。用完即删。
1
| String queueName = channel.queueDeclare().getQueue();
|
订阅与发布模式(Publish/Subscribe)
一个生产者,一个交换机,多个队列以及多个消费者。生产者的消息不会直接发送给队列,而是通过交换机对消息完成分配。将接收的消息通过广播的形式发送给所有队列。也就是不考虑RoutingKey的影响。没有设置RoutingKey。也就是交换机的fanout扇出类型的实现。
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class fanoutProducer { public static final String EXCHANGE_NAME = "fanout";
public static void main(String[] args) throws Exception{ Channel channel = ChannelUtils.getChannel(); Scanner scanner = new Scanner(System.in); while(scanner.hasNext()){ String message = scanner.next(); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println("生产者发出消息:" + message); } } }
|
消费者
多个相同的消费者,会通过交换机同时接收到消息,也就是fanout交换机实现了广播。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class fanoutConsumer1 { public static final String EXCHANGE_NAME = "fanout"; public static void main(String[] args) throws Exception{ Channel channel = ChannelUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("等待消息进行接收···");
DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> { System.out.println("1号测试接收的消息:" + new String(message.getBody(), "UTF-8")); };
CancelCallback cancelCallback = (String consumerTag) -> { System.out.println("消息被中断"); }; channel.basicConsume(queueName, true, deliverCallback, cancelCallback); } }
|
路由模式(Routing)
基于发布与订阅模式,只是交换机类型改为使用directt,交换机发送消息根据绑定的RoutingKey来分配队列。主要RoutingKey匹配的队列都会接收消息,所以若全部队列的RoutingKey一致,就相当于fanout类型进行广播。
生产者
我们发送信息时,给交换机分配不同的RoutingKey,就会发送给相对应的队列。
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class directProducer { public static final String EXCHANGE_NAME = "direct";
public static void main(String[] args) throws Exception{ Channel channel = ChannelUtils.getChannel(); Scanner scanner = new Scanner(System.in); while(scanner.hasNext()){ String message = scanner.next(); channel.basicPublish(EXCHANGE_NAME, "222", null, message.getBytes()); System.out.println("生产者发出消息:" + message); } } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class directConsumer1 { public static final String EXCHANGE_NAME = "direct"; public static void main(String[] args) throws Exception{ Channel channel = ChannelUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "111"); System.out.println("等待消息进行接收···"); DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> { System.out.println("1号测试接收的消息:" + new String(message.getBody(), "UTF-8")); }; CancelCallback cancelCallback = (String consumerTag) -> { System.out.println("消息被中断"); }; channel.basicConsume(queueName, true, deliverCallback, cancelCallback); } }
|
主题模式(Topics)
由于路由模式使用direct,其消息是一一对应发送的,有一定的局限性,因为RoutingKey统一,我们队列不能同时接收多个不同的RoutingKey。例如我们队列想接收 tc.name 与 tc.age 这两个消息,但使用direct只能明确指明其中一种,此时就要使用topic主题模式,相当于对RoutingKey进行模糊化处理,匹配全部相关联的RoutingKey进行发送。
即使一个队列可以匹配多个RoutingKey,但同一个队列每次只能接收一次消息。
定义以及模糊匹配
主题模式的RoutingKey必须是一个单词列表,通过 . 进行分割,最多255字节。例如 tc.name、tc.age这样定义,一般带有一定的指向性。然后topic中就可以使用类似sql的通配符进行模糊匹配。
也就是说RoutingKey定义为 tc.* ,可以同时匹配tc.name 和 tc.age。
*注意若只绑定#,就相当于fanout类型,因为都可以匹配。而绑定的RoutingKey没有出现 或#,说明没有做模糊匹配处理,就相当于direct类型。可以说topic包括了fanout与direct。
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class topicProducer { public static final String EXCHANGE_NAME = "topic";
public static void main(String[] args) throws Exception{ Channel channel = ChannelUtils.getChannel(); HashMap<String, String> bindMap = new HashMap<>(); bindMap.put("quick.orange.rabbit", "Q1、Q2"); bindMap.put("lazy.orange.elephant", "Q1、Q2"); bindMap.put("quick.orange.fox", "Q1"); bindMap.put("lazy.brown.fox", "Q2"); bindMap.put("lazy.pink.rabbit", "Q2"); bindMap.put("quick.brown.fox", "null"); bindMap.put("quick.orange.male.rabit", "null"); bindMap.put("lazy.orange.male.rabbit", "Q2");
for(Map.Entry<String, String> entry : bindMap.entrySet()){ String RoutingKey = entry.getKey(); String info = entry.getValue(); channel.basicPublish(EXCHANGE_NAME, RoutingKey, null, info.getBytes() ); System.out.println("生产者发出消息:" + info); } } }
|
消费者
测试后可以发现交换机按照模糊匹,向队列发送消息。
一个消费者可以绑定多个RoutingKey。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class topicConsumer1 { public static final String EXCHANGE_NAME = "topic";
public static void main(String[] args) throws Exception{ Channel channel = ChannelUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = "Q1"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*"); System.out.println("等待接收消息······"); DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> { System.out.println("Q1队列接收的消息:" + new String(message.getBody(), "UTF-8")); System.out.println("接收的队列是" + queueName + ",绑定的Key是" + message.getEnvelope().getRoutingKey()); }; CancelCallback cancelCallback = (String consumerTag) -> { System.out.println("消息被中断"); }; channel.basicConsume(queueName, true, deliverCallback, cancelCallback); } }
|
RPC模式
死信队列(DLX)
死信(dead-letter-exchange)即无法被消费的消息,例如我们微信付款,有一个付款时间,超过时间的付款应归为死信,需要由原队列推给死信队列进行处理。一般成为死信有以下几种情况:
- 消息被拒绝:被正常队列拒绝,则归为死信
- 消息TTL过期:直到时间过期,仍未接收到消息则归为死信
- 队列达到最大长度:当消息超过队列的最大长度时,归为死信
测试时,先将正常消费者启动创建需要的队列,然后停掉消费者,启动生产者,最后启动死信队列消费者查看死信队列的情况。
生产者
1 2 3 4 5 6 7 8 9 10 11 12
| public class DLXProducer { public static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] args) throws Exception{ Channel channel = ChannelUtils.getChannel(); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build(); for(int i = 1; i < 11; i++){ String message = "info" + i; channel.basicPublish(NORMAL_EXCHANGE, "normal", properties, message.getBytes()); } } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
|
public class DLXConsumer1 { public static final String NORMAL_EXCHANGE = "normal_exchange"; public static final String DEAD_EXCHANGE = "dead_exchange"; public static final String NORMAL_QUEUE = "normal_queue"; public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{ Channel channel = ChannelUtils.getChannel(); channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
Map<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); arguments.put("x-dead-letter-routing-key", "dead"); channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments); channel.queueDeclare(DEAD_QUEUE, false, false, false, null); channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal"); channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead"); System.out.println("等待接收消息");
DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> { System.out.println("Consumer1接收的消息是:" + new String(message.getBody(), "UTF-8")); }; CancelCallback cancelCallback = (String consumerTag) -> {}; channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, cancelCallback); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public class DLXConsumer2 { public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{ Channel channel = ChannelUtils.getChannel(); System.out.println("等待接收消息"); DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> { System.out.println("Consumer2接收的消息是:" + new String(message.getBody(), "UTF-8")); }; CancelCallback cancelCallback = (String consumerTag) -> {}; channel.basicConsume(DEAD_QUEUE, true, deliverCallback, cancelCallback); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> { String msg = new String(message.getBody(), "UTF-8"); if(msg.equals("info5") || msg.equals("info8") || msg.equals("info9")){ System.out.println("Consumer1拒绝的消息是:" + msg); channel.basicReject(message.getEnvelope().getDeliveryTag(), false); }else{ System.out.println("Consumer1接收的消息是:" + msg); channel.basicAck(message.getEnvelope().getDeliveryTag(), false); } };
channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);
|
延迟队列
延迟队列可理解为死信队列中的TTL过期情况。具体场景如我们网上购票,先确定订单,此时我们占用了一张票,若规定时间内我们没有付款,那么超时后就无法对订单付款,这个票也重新回到代售的状态。限时的短信验证也是延迟队列的运用。
而我们有两种方式实现延迟队列:
- 基于死信队列的TTL实现
- 基于官方插件的交换机延迟实现
整合Spring Boot依赖以及配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.50</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies>
|
1 2 3 4 5 6 7
| spring: rabbitmq: host: 192.168.158.136 port: 5672 username: admin password: 111
|
基于死信实现的延迟队列(队列延迟)
整体流程图如下:
我们通过设置队列的TTL失效时间来模拟延迟队列,但有一个缺陷,也就是我们不可能每次都规定好TTL,那么每有一个新TTL就要新增一个队列,这个问题可以使用自定义延时也就是queueC解决,但又有新问题,也就是多个自定义延迟是走同一个队列的,会有队列阻塞的情况发生,也就是当我们先发送一个延时时长较长的消息,后发送一个延时时长较短的消息,那么较短的消息需要等待较长消息处理完后再处理,严格遵守队列的先进先出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| @Configuration public class TTLQueueConfig { public static final String NORMAL_EXCHANGE = "normal_exchange"; public static final String DEAD_EXCHANGE = "dead_exchange"; public static final String QUEUE_A = "queueA"; public static final String QUEUE_B = "queueB"; public static final String QUEUE_C = "queueC"; public static final String DEAD_QUEUE = "deadQueue";
@Bean("nExchange") public DirectExchange nExchange(){ return new DirectExchange(NORMAL_EXCHANGE); } @Bean("dExchange") public DirectExchange dExchange(){ return new DirectExchange(DEAD_EXCHANGE); } @Bean("queueA") public Queue queueA(){ Map<String, Object> arguments = new HashMap<>(4); arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); arguments.put("x-dead-letter-routing-key", "dead"); arguments.put("x-message-ttl", 10000); return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build(); } @Bean("queueB") public Queue queueB(){ Map<String, Object> arguments = new HashMap<>(4); arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); arguments.put("x-dead-letter-routing-key", "dead"); arguments.put("x-message-ttl", 30000); return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build(); } @Bean("queueC") public Queue queueC(){ Map<String, Object> arguments = new HashMap<>(4); arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); arguments.put("x-dead-letter-routing-key", "dead"); return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build(); } @Bean("deadQueue") public Queue deadQueue(){ return QueueBuilder.durable(DEAD_QUEUE).build(); } @Bean public Binding queueABindNormal(@Qualifier("queueA") Queue queueA, @Qualifier("nExchange") DirectExchange nExchange){ return BindingBuilder.bind(queueA).to(nExchange).with("normalA"); } @Bean public Binding queueBBindNormal(@Qualifier("queueB") Queue queueB, @Qualifier("nExchange") DirectExchange nExchange){ return BindingBuilder.bind(queueB).to(nExchange).with("normalB"); } @Bean public Binding queueCBindNormal(@Qualifier("queueC") Queue queueC, @Qualifier("nExchange") DirectExchange nExchange){ return BindingBuilder.bind(queueC).to(nExchange).with("normalC"); } @Bean public Binding deadQueueBindDead(@Qualifier("deadQueue") Queue deadQueue, @Qualifier("dExchange") DirectExchange dExchange){ return BindingBuilder.bind(deadQueue).to(dExchange).with("dead"); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Slf4j @RestController @RequestMapping("/ttl") public class sendMsgController { @Resource private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}") public void sendMsg(@PathVariable String message){ log.info("开始时间:{},发送消息给两个TTL队列:{}", new Date().toString(), message); rabbitTemplate.convertAndSend("normal_exchange", "normalA", "10s的TTL队列:" + message); rabbitTemplate.convertAndSend("normal_exchange", "normalB", "40s的TTL队列:" + message); }
@GetMapping("/sendTTLMsg/{message}/{TTL}") public void sendMsg(@PathVariable String message, @PathVariable String TTL){ log.info("开始时间:{},发送自定义TTL为{}ms的消息给队列:{}", new Date().toString(), TTL, message); rabbitTemplate.convertAndSend("normal_exchange", "normalC", message, msg ->{ msg.getMessageProperties().setExpiration(TTL); return msg; }); } }
|
1 2 3 4 5 6 7 8 9
| @Slf4j @Component public class deadQueueConsumer { @RabbitListener(queues = "deadQueue") public void receive(Message message) throws Exception{ String msg = new String(message.getBody()); log.info("接收时间:{},收到死信队列的消息:{}", new Date().toString(), msg); } }
|
RabbitMQ插件实现延迟队列(交换机延迟,推荐)
github:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v3.8.0
下载官方插件,将压缩包放到MQ的插件目录 /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins ,然后重启MQ,随后我们在Web管理界面的交换机类型中看见 x-delayed-message ,也就是官方插件的延迟队列实现。
插件实现延迟,整体流程更简单,仅仅只需要一个队列和一个交换机。由交换机内部进行延时的设定,所以我们要对交换机进行参数配置。
完成插件延迟队列后,不会出现TTL那种同一个队列拥塞的情况。所以推荐使用插件实现延迟队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| @Configuration public class DelayedQueueConfig { public static final String DELAYED_QUEUE = "delayed_queue"; public static final String DELAYED_EXCHANGE = "delayed_exchange"; public static final String DELAYED_ROUTING_KEY = "delayed_routing_key";
@Bean public Queue delayedQueue() { return new Queue(DELAYED_QUEUE); } @Bean public CustomExchange delayedExchange(){ Map<String, Object> arguments = new HashMap<>(); arguments.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, arguments); } @Bean public Binding delayedBind(@Qualifier("delayedQueue") Queue delayedQueue, @Qualifier("delayedExchange") CustomExchange delayedExchange){ return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); } }
|
1 2 3 4 5 6 7 8 9
| @GetMapping("/sendDelayMsg/{message}/{delayTime}") public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime){ log.info("开始时间:{},发送插件延时为{}ms的消息给队列:{}", new Date().toString(), delayTime, message); rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE, DelayedQueueConfig.DELAYED_ROUTING_KEY, message, msg ->{ msg.getMessageProperties().setDelay(delayTime); return msg; }); }
|
1 2 3 4 5 6 7 8 9
| @Slf4j @Component public class delayQueueConsumer { @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE) public void receive(Message message) throws Exception{ String msg = new String(message.getBody()); log.info("接收时间:{},收到死信队列的消息:{}", new Date().toString(), msg); } }
|
发布与确认进阶
若RabbitMQ服务器发生宕机,其重启期间生产者消息发送失败,导致消息丢失,此时我们需要手动恢复消息。也就是说生产者发送消息时,交换机或队列收不到消息该如何处理。
就是当交换机与RoutingKey有错误时,可自定义错误的输出,方便我们查看哪一步出错,进而执行消息重发操作。
交换机确认消息(交换机匹配错误返回失败情况)
交换机判断消息的接收情况,执行回调方法的确认消息
1
| publisher-confirm-type: correlated
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Configuration public class AdvancedConfig { public static final String CONFIRM_EXCHANGE = "confirm_exchange"; public static final String CONFIRM_QUEUE = "confirm_queue"; public static final String ROUTING_KEY = "routing1";
@Bean public DirectExchange confirmExchange(){ return new DirectExchange(CONFIRM_EXCHANGE); } @Bean public Queue confirmQueue(){ return QueueBuilder.durable(CONFIRM_QUEUE).build(); } @Bean public Binding bind(Queue confirmQueue,DirectExchange confirmExchange){ return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @Slf4j @Component public class myCallBack implements RabbitTemplate.ConfirmCallback { @Resource private RabbitTemplate rabbitTemplate;
@PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(this); }
@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String ID = correlationData != null ? correlationData.getId() : ""; if(ack){ log.info("交换机已收到ID为{}的消息", ID); }else { log.info("交换机没有收到ID为{}的消息,其原因是{}", ID, cause); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Slf4j @RestController @RequestMapping("/confirm") public class AdvancedController { @Resource private RabbitTemplate rabbitTemplate; @GetMapping("/sendConfirmMsg/{message}") public void sendMsg(@PathVariable String message){ CorrelationData correlationData = new CorrelationData("1"); log.info("发送消息为:{}", message); rabbitTemplate.convertAndSend(AdvancedConfig.CONFIRM_EXCHANGE, AdvancedConfig.ROUTING_KEY, message, correlationData); } }
|
1 2 3 4 5 6 7 8 9
| @Slf4j @Component public class confirmConsumer { @RabbitListener(queues = AdvancedConfig.CONFIRM_QUEUE) public void recive (Message message){ String msg = new String(message.getBody()); log.info("接收发布队列的消息:{}", msg); } }
|
回退消息(返回key失败消息的信息)
1 2 3 4 5 6 7 8 9 10
| @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); } @Override public void returnedMessage(ReturnedMessage message) { log.error("消息:{},被交换机{}回退,回退原因:{},路由key:{}", new String(message.getMessage().getBody()), message.getExchange(), message.getReplyText(), message.getRoutingKey()); }
|
此时routingKey有错误,会进行回退操作,也就是返回失败消息的信息
1 2 3 4 5 6
| @GetMapping("/sendConfirmMsg/{message}") public void sendMsg(@PathVariable String message){ CorrelationData correlationData = new CorrelationData("1"); log.info("发送消息为:{}", message); rabbitTemplate.convertAndSend(AdvancedConfig.CONFIRM_EXCHANGE, AdvancedConfig.ROUTING_KEY + "1", message, correlationData); }
|
备份交换机
备份交换机优先级高于回退,优先备份,失败则进行回退。
在之前的配置中补上即可,注意备份模式是fanout,广播给备份队列和警告队列,无需RoutingKey。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Configuration public class AdvancedConfig { ····· public static final String BACK_UP_EXCHANGE = "back_up_exchange"; public static final String BACK_UP_QUEUE = "back_up_queue"; public static final String WARNING_QUEUE = "warning_queue"; @Bean public FanoutExchange backupExchange(){ return new FanoutExchange(BACK_UP_EXCHANGE); } @Bean public Queue backupQueue(){ return QueueBuilder.durable(BACK_UP_EXCHANGE).build(); } @Bean public Queue warningQueue(){ return QueueBuilder.durable(WARNING_QUEUE).build(); } @Bean public Binding backupBind(Queue backupQueue,FanoutExchange backupExchange){ return BindingBuilder.bind(backupQueue).to(backupExchange); } @Bean public Binding warningBind(Queue warningQueue,FanoutExchange backupExchange){ return BindingBuilder.bind(warningQueue).to(backupExchange); } }
|
备份队列消费者就是之前的发布确认的消费者,这里警告队列需要一个新的警告消费者。
然后通过错误的路由key查看警告处理
1 2 3 4 5 6 7 8 9
| @Slf4j @Component public class warningConsumer { @RabbitListener(queues = AdvancedConfig.WARNING_QUEUE) public void recive (Message message){ String msg = new String(message.getBody()); log.error("警告:发现消息{},存在路由问题", msg); } }
|