安装

RabbitMQ基于erlang,我们需要先安装erlang。

erlang:https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-21.3.8.21-1.el7.x86_64.rpm

RabbitMQ:https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.8.8-1.el7.noarch.rpm

也可以使用费其他版本,centos8就要使用el8,按自己情况来,然后进入界面右上角点击下载(最开始找半天,没看到下载按钮😂),然后可以使用xftp传到linux的文件夹中,我放在/usr/local/software。

  • 安装命令,按序执行

    1
    2
    3
    rpm -ivh erlang-21.3.8.21-1.el7.x86_64.rpm
    yum install socat -y
    rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
  • 基本指令

    1
    2
    3
    4
    chkconfig rabbitmq-server on			# 开机启动服务设置
    /sbin/service rabbitmq-server start # 自行启动服务
    /sbin/service rabbitmq-server status # 查看服务状态
    /sbin/service rabbitmq-server stop # 停止运行
  • 启动web管理插件

    1
    2
    # 要在mq服务停止时启动,然后再开启mq则插件生效
    rabbitmq-plugins enable rabbitmq_management

    然后访问本机地址的15672端口,http://192.168.158.134:15672/,注意需要先开放端口,或关闭防火墙。

  • 用户管理

    1
    2
    3
    4
    5
    6
    7
    8
    # 设置账号密码
    rabbitmqctl add_user admin 111
    # 设置账号的角色
    rabbitmqctl set_user_tags admin administrator
    # 设置角色的权限,根目录下全部可读可写可执行
    rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
    # 查看所有账号
    rabbitmqctl list_users
  • 代码实现

    我们通过Java来具体实现MQ的操作流程,先导入相关依赖。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    <dependencies>
    <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.8.0</version>
    </dependency>

    <dependency>
    <groupId>commons-io</groupId>
    <artifactId>commons-io</artifactId>
    <version>2.6</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-nop -->
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-nop</artifactId>
    <version>1.7.30</version>
    </dependency>
    </dependencies>

生产消费者简单模式(Hello World)

27.png

一个生产者P通过队列queue发送信息,消费者C接收信息

具体流程:

  • 创建连接工厂对象
  • 创建连接获取信道
  • 通过信道创建消息队列
  • 进行消息的发送与接收操作

生产者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class Producer {
//队列name
public static final String QUEUE_NAME = "hello";

public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.158.135");
factory.setUsername("admin");
factory.setPassword("111");

//创建连接
Connection connection = factory.newConnection();
//获取连接的信道
Channel channel = connection.createChannel();
/**
* 通过信道创建队列声明queueDeclare
* 1、队列名称
* 2、durable:队列是否持久化
* 3、exclusive:队列是否排他
* 4、autoDelete:队列是否自动删除
* 5、argurnents:其他参数
*/
channel.queueDeclare(QUEUE_NAME, false, false, false,null);

String message = "hello world";

/**
* 发送消息
* 1、交换机名,不使用交换机则为""
* 2、发送信息的队列(RoutingKey)
* 3、其他参数
* 4、发送的消息体,必须是byte数组
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("消息发送成功");
}
}

消费者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Consumer {
public static final String QUEUE_NAME = "hello";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.158.135");
factory.setUsername("admin");
factory.setPassword("111");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

//使用Lambda表达式简单编写回调接口
//只需要返回消息体,若返回message则是一个地址
DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
System.out.println(new String(message.getBody()));
};

CancelCallback cancelCallback = (String consumerTag) -> {
System.out.println("消息被中断");
};
/**
* 消费者接收消息
* 1、队列名称
* 2、autoAck:是否自动接收消息
* 3、deliverCallback:消息的回调接口
* 4、cancelCallback:消费者取消订阅时的回调函数
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}

工作队列模式(Work Queues)

28.png

一个生产者通过队列发送大量消息,有多个消费者进行消息接收,其中队列唯一,而消费者不会重复处理消息,通过轮询的方式对消息进行处理。

我们可以在idea里选择多次运行消费者,然后在生产者中手动发送信息,会发现多个消费者轮询获取信息。

通用信道工具类

1
2
3
4
5
6
7
8
9
10
11
public class ChannelUtils {
public static Channel getChannel() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.158.135");
factory.setUsername("admin");
factory.setPassword("111");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class workProducer {
public static final String QUEUE_NAME = "WorkQueues";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = ChannelUtils.getChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("发送消息成功:" + message);
}
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class workConsumer {
public static final String QUEUE_NAME = "WorkQueues";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = ChannelUtils.getChannel();
DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
System.out.println("接收的消息:" + new String(message.getBody()));
};

CancelCallback cancelCallback = (String consumerTag) -> {
System.out.println("消息被中断");
};

System.out.println("------------C2等待接收信息------------");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}

消息应答

为了保证MQ的消息在发送过程中不丢失,需要进行消息应答,即消费者在接收消息并处理消息后,会告诉MQ一处理消息,此时MQ才可以将消息进行删除。

一般分为自动应答与手动应答,我们推荐使用手动应答。

  • 应答的方式

    1
    2
    3
    4
    5
    6
    //确认
    channel.basicAck(···);
    //不确认
    channel.basicNack(···);
    //不确认,相比Nack,Reject少了一个参数
    channel.basicReject(···);

消息手动应答(推荐)

可以发现消费者1、2和最开始的工作队列模式一样是轮询获取信息,但一个处理快,一个处理慢,如果我们把处理慢的消费者宕机,由于打开了消息手动应答,本应由消费者2处理的消息不会丢失,而是由消费者1进行处理。生产者和一开始使用的工作队列模式的一致,只需要改变消费者。

  • 消费者1,处理时间短,使用手动应答与线程休眠

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    public class ackConsumer1 {
    public static final String QUEUE_NAME = "WorkQueues";

    public static void main(String[] args) throws IOException, TimeoutException {
    Channel channel = ChannelUtils.getChannel();
    System.out.println("C1等待处理,处理时间短");

    boolean autoAck = false;
    DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
    //休眠1s
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println("接收的消息:" + new String(message.getBody(), "UTF-8"));

    /**
    * 手动应答,basicAck肯定
    * 1、消息的标记,类似数组索引
    * 2、是否批量应答
    */
    channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
    };

    CancelCallback cancelCallback = (String consumerTag) -> {
    System.out.println("消息被中断");
    };
    channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
    }
    }
  • 消费者2,处理时间长

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public class ackConsumer2 {
    public static final String QUEUE_NAME = "WorkQueues";

    public static void main(String[] args) throws IOException, TimeoutException {
    Channel channel = ChannelUtils.getChannel();
    System.out.println("C2等待处理,处理时间长");
    //改为休眠20s,其余与消费者1一致
    ···
    }
    }

不公平分发(开启预取值)

默认prefetchCount = 0,此时消费者对信息进行轮询处理,自定义prefetchCount>0时,并通过信道的basicQos进行设置,则规定了每一个队列的当前看处理信息的最大数。也就说还是执行轮询操作,但某个消费者信息处理时不应答造成信息阻塞,这个阻塞队列的最大值即prefetchCount,当阻塞数据超过最大值,则该消费者不再接收新消息。而后续消息全由有空闲的消费者处理。

1
2
3
//不公平分发
int prefetchCount = 1;
channel.basicQos(prefetchCount);

持久化

1
2
3
4
//创建队列时,第二个参数durable设为true开启持久化
channel.queueDeclare(queueName, true, false, false, null);
//发布消息时,第二个参数进行响应的设置,实现消息持久化
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

发布与确认

当我们将队列以及消息进行持久化操作后,信息需要存储到磁盘上才可完成持久化操作,若存储到磁盘的过程发生了宕机,那么持久化也是失败的,所以我们还需要对信息进行发布与确认,保证信息持久化成功。

一般分为以下三种模式:

  • 单一确认:每发送一条消息就进行一次确认,效率低

  • 批量确认:发送多条消息后,再进行一次确认

  • 异步确认:先将消息全部发送,随后由broker进行确认并返回信息,相当于一个消息的监听器,生产者再将未确认的消息进行重新发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public class confirmMessage {
//消息的数据量
public static final int MESSAGE_COUNT = 1000;

public static void main(String[] args) throws Exception {
confirmMessage.singlePublish(); //单一
confirmMessage.batchPublish(); //批量
confirmMessage.asynchronousAllPublish(); //异步批量
}

//单一确认
public static void singlePublish() throws Exception{
Channel channel = ChannelUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, true, false, false, null);
//开启发布与确认
channel.confirmSelect();

//开始时间
long begin = System.currentTimeMillis();

for(int i = 0; i < MESSAGE_COUNT; i++){
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
channel.waitForConfirms();
}

long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "条数据,单一处理需要耗时" + (end - begin) + "ms");
}

//批量确认
public static void batchPublish() throws Exception {
···
//批量确认的消息数量限制
int batchSize = 100;
for(int i = 0; i < MESSAGE_COUNT; i++){
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
//消息发送达到限制时,需进行一次确认
if((i + 1) % batchSize == 0){
channel.waitForConfirms();
}
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "条数据,批量处理需要耗时" + (end - begin) + "ms");
}

//异步确认
public static void asynchronousAllPublish() throws Exception{
···
ConcurrentSkipListMap<Long, String> map = new ConcurrentSkipListMap<>();
//设置成功的消息监听器
ConfirmCallback ackCallback = (long deliveryTag, boolean mutiple) -> {
//过滤确认消息,留下未确认的
if(mutiple){
ConcurrentNavigableMap<Long, String> confirm = map.headMap(deliveryTag);
confirm.clear();
}else {
map.remove(deliveryTag);
}
System.out.println("确认的消息" + deliveryTag);
};
//设置失败的消息监听器
ConfirmCallback nackCallback = (long deliveryTag, boolean mutiple) -> {
System.out.println("未确认的消息" + deliveryTag);
};
//异步通知
channel.addConfirmListener(ackCallback, nackCallback);
for(int i = 0; i < MESSAGE_COUNT; i++){
String message = "消息" + i;
channel.basicPublish("", queueName, null, message.getBytes());
//记录所有要发送的消息,随后在监听器中删除一确认的消息,留下未确认的
map.put(channel.getNextPublishSeqNo(), message);
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "条数据,异步处理需要耗时" + (end - begin) + "ms");
}
}

Exchanges交换机

  • 默认交换机:代码中就是使用””空字符串,即不使用
  • 直连交换机direct:对应路由模式,使用RoutingKey匹配队列
  • 主题交换机topic:对应主题模式
  • 首部交换机headers
  • 扇出交换机fanout :对应发布与订阅模式,相当于广播

临时队列

也就是没有实现持久化的队列,其名字随机,断开消费者连接后,队列会自动删除。用完即删。

1
String queueName = channel.queueDeclare().getQueue();

订阅与发布模式(Publish/Subscribe)

29.png

一个生产者,一个交换机,多个队列以及多个消费者。生产者的消息不会直接发送给队列,而是通过交换机对消息完成分配。将接收的消息通过广播的形式发送给所有队列。也就是不考虑RoutingKey的影响。没有设置RoutingKey。也就是交换机的fanout扇出类型的实现。

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
public class fanoutProducer {
public static final String EXCHANGE_NAME = "fanout";

public static void main(String[] args) throws Exception{
Channel channel = ChannelUtils.getChannel();
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println("生产者发出消息:" + message);
}
}
}

消费者

多个相同的消费者,会通过交换机同时接收到消息,也就是fanout交换机实现了广播。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class fanoutConsumer1 {
public static final String EXCHANGE_NAME = "fanout";
public static void main(String[] args) throws Exception{
Channel channel = ChannelUtils.getChannel();
//声明交换机,1、交换机名,2、交换机类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//临时队列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机与队列,1、队列名,2、交换机名,3、RoutingKey
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("等待消息进行接收···");

DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
System.out.println("1号测试接收的消息:" + new String(message.getBody(), "UTF-8"));
};

CancelCallback cancelCallback = (String consumerTag) -> {
System.out.println("消息被中断");
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
}
}

路由模式(Routing)

30.png

基于发布与订阅模式,只是交换机类型改为使用directt,交换机发送消息根据绑定的RoutingKey来分配队列。主要RoutingKey匹配的队列都会接收消息,所以若全部队列的RoutingKey一致,就相当于fanout类型进行广播。

生产者

我们发送信息时,给交换机分配不同的RoutingKey,就会发送给相对应的队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class directProducer {
public static final String EXCHANGE_NAME = "direct";

public static void main(String[] args) throws Exception{
Channel channel = ChannelUtils.getChannel();
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME, "222", null, message.getBytes());
System.out.println("生产者发出消息:" + message);
}
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class directConsumer1 {
public static final String EXCHANGE_NAME = "direct";
public static void main(String[] args) throws Exception{
Channel channel = ChannelUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "111");
System.out.println("等待消息进行接收···");
DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
System.out.println("1号测试接收的消息:" + new String(message.getBody(), "UTF-8"));
};
CancelCallback cancelCallback = (String consumerTag) -> {
System.out.println("消息被中断");
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
}
}

主题模式(Topics)

31.png

由于路由模式使用direct,其消息是一一对应发送的,有一定的局限性,因为RoutingKey统一,我们队列不能同时接收多个不同的RoutingKey。例如我们队列想接收 tc.name 与 tc.age 这两个消息,但使用direct只能明确指明其中一种,此时就要使用topic主题模式,相当于对RoutingKey进行模糊化处理,匹配全部相关联的RoutingKey进行发送。

即使一个队列可以匹配多个RoutingKey,但同一个队列每次只能接收一次消息。

定义以及模糊匹配

主题模式的RoutingKey必须是一个单词列表,通过 . 进行分割,最多255字节。例如 tc.name、tc.age这样定义,一般带有一定的指向性。然后topic中就可以使用类似sql的通配符进行模糊匹配。

1
2
* 代表任意一个词
# 可代表0或多个单词

也就是说RoutingKey定义为 tc.* ,可以同时匹配tc.name 和 tc.age。

*注意若只绑定#,就相当于fanout类型,因为都可以匹配。而绑定的RoutingKey没有出现 或#,说明没有做模糊匹配处理,就相当于direct类型。可以说topic包括了fanout与direct。

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class topicProducer {
public static final String EXCHANGE_NAME = "topic";

public static void main(String[] args) throws Exception{
Channel channel = ChannelUtils.getChannel();
HashMap<String, String> bindMap = new HashMap<>();
bindMap.put("quick.orange.rabbit", "Q1、Q2");
bindMap.put("lazy.orange.elephant", "Q1、Q2");
bindMap.put("quick.orange.fox", "Q1");
bindMap.put("lazy.brown.fox", "Q2");
bindMap.put("lazy.pink.rabbit", "Q2");
bindMap.put("quick.brown.fox", "null");
bindMap.put("quick.orange.male.rabit", "null");
bindMap.put("lazy.orange.male.rabbit", "Q2");

for(Map.Entry<String, String> entry : bindMap.entrySet()){
String RoutingKey = entry.getKey();
String info = entry.getValue();
channel.basicPublish(EXCHANGE_NAME, RoutingKey, null, info.getBytes() );
System.out.println("生产者发出消息:" + info);
}
}
}

消费者

测试后可以发现交换机按照模糊匹,向队列发送消息。

一个消费者可以绑定多个RoutingKey。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class topicConsumer1 {
public static final String EXCHANGE_NAME = "topic";

public static void main(String[] args) throws Exception{
Channel channel = ChannelUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = "Q1";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
System.out.println("等待接收消息······");
DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
System.out.println("Q1队列接收的消息:" + new String(message.getBody(), "UTF-8"));
System.out.println("接收的队列是" + queueName + ",绑定的Key是" + message.getEnvelope().getRoutingKey());
};
CancelCallback cancelCallback = (String consumerTag) -> {
System.out.println("消息被中断");
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
}
}

RPC模式

32.png

死信队列(DLX)

死信(dead-letter-exchange)即无法被消费的消息,例如我们微信付款,有一个付款时间,超过时间的付款应归为死信,需要由原队列推给死信队列进行处理。一般成为死信有以下几种情况:

  • 消息被拒绝:被正常队列拒绝,则归为死信
  • 消息TTL过期:直到时间过期,仍未接收到消息则归为死信
  • 队列达到最大长度:当消息超过队列的最大长度时,归为死信

测试时,先将正常消费者启动创建需要的队列,然后停掉消费者,启动生产者,最后启动死信队列消费者查看死信队列的情况。

生产者

1
2
3
4
5
6
7
8
9
10
11
12
public class DLXProducer {
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception{
Channel channel = ChannelUtils.getChannel();
// 死信消息,设置TTL时间,若测试最大长度则要注释掉TTL
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
for(int i = 1; i < 11; i++){
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE, "normal", properties, message.getBytes());
}
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 正常队列消费者,需要关联死信队列
// 这里模拟了TTL和最大长度
public class DLXConsumer1 {
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String DEAD_EXCHANGE = "dead_exchange";
public static final String NORMAL_QUEUE = "normal_queue";
public static final String DEAD_QUEUE = "dead_queue";

public static void main(String[] args) throws Exception{
Channel channel = ChannelUtils.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

// 现在要将普通队列与死信队列关联起来,方便死信消息的处理
// 需要对最后一个参数进行设置
Map<String, Object> arguments = new HashMap<>();
// 为普通队列设置对应的死信交换机以及死信的RoutingKey
// 注意这里键是规定好的
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
arguments.put("x-dead-letter-routing-key", "dead");

// 可设置最大长度,超过长度的消息归为死信
// arguments.put("x-max-length", 6);

channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal");
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead");
System.out.println("等待接收消息");

DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
System.out.println("Consumer1接收的消息是:" + new String(message.getBody(), "UTF-8"));
};
CancelCallback cancelCallback = (String consumerTag) -> {};
channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, cancelCallback);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 死信队列消费者
public class DLXConsumer2 {
public static final String DEAD_QUEUE = "dead_queue";

public static void main(String[] args) throws Exception{
Channel channel = ChannelUtils.getChannel();
System.out.println("等待接收消息");
DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
System.out.println("Consumer2接收的消息是:" + new String(message.getBody(), "UTF-8"));
};
CancelCallback cancelCallback = (String consumerTag) -> {};
channel.basicConsume(DEAD_QUEUE, true, deliverCallback, cancelCallback);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 手动拒绝消息,模拟消息拒绝的情况,替换正常消费者的DeliverCallback接口实现即可
DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
String msg = new String(message.getBody(), "UTF-8");
if(msg.equals("info5") || msg.equals("info8") || msg.equals("info9")){
System.out.println("Consumer1拒绝的消息是:" + msg);
// 拒绝消息,false代表不返回队列
channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
}else{
System.out.println("Consumer1接收的消息是:" + msg);
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
};
// 注意此时需要使用手动应答
channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);

延迟队列

延迟队列可理解为死信队列中的TTL过期情况。具体场景如我们网上购票,先确定订单,此时我们占用了一张票,若规定时间内我们没有付款,那么超时后就无法对订单付款,这个票也重新回到代售的状态。限时的短信验证也是延迟队列的运用。

而我们有两种方式实现延迟队列:

  • 基于死信队列的TTL实现
  • 基于官方插件的交换机延迟实现

整合Spring Boot依赖以及配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.50</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
1
2
3
4
5
6
7
spring:
rabbitmq:
host: 192.168.158.136
# 注意端口不是15672
port: 5672
username: admin
password: 111

基于死信实现的延迟队列(队列延迟)

整体流程图如下:

33.png

我们通过设置队列的TTL失效时间来模拟延迟队列,但有一个缺陷,也就是我们不可能每次都规定好TTL,那么每有一个新TTL就要新增一个队列,这个问题可以使用自定义延时也就是queueC解决,但又有新问题,也就是多个自定义延迟是走同一个队列的,会有队列阻塞的情况发生,也就是当我们先发送一个延时时长较长的消息,后发送一个延时时长较短的消息,那么较短的消息需要等待较长消息处理完后再处理,严格遵守队列的先进先出。

  • 所有的队列以及交换机的设置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@Configuration
public class TTLQueueConfig {
// 普通交换机与死信交换机
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String DEAD_EXCHANGE = "dead_exchange";
// 多个普通队列,延时不一样
public static final String QUEUE_A = "queueA";
public static final String QUEUE_B = "queueB";
public static final String QUEUE_C = "queueC"; //自定义延时
// 死信队列
public static final String DEAD_QUEUE = "deadQueue";

// 将两个交换机、3个队列全部进行组件声明
@Bean("nExchange")
public DirectExchange nExchange(){
return new DirectExchange(NORMAL_EXCHANGE);
}
@Bean("dExchange")
public DirectExchange dExchange(){
return new DirectExchange(DEAD_EXCHANGE);
}
@Bean("queueA")
public Queue queueA(){
Map<String, Object> arguments = new HashMap<>(4);
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
arguments.put("x-dead-letter-routing-key", "dead");
arguments.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
}
@Bean("queueB")
public Queue queueB(){
Map<String, Object> arguments = new HashMap<>(4);
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
arguments.put("x-dead-letter-routing-key", "dead");
arguments.put("x-message-ttl", 30000);
return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
}
// 自定义队列,可满足不同的TTL
@Bean("queueC")
public Queue queueC(){
Map<String, Object> arguments = new HashMap<>(4);
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
arguments.put("x-dead-letter-routing-key", "dead");
return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
}
@Bean("deadQueue")
public Queue deadQueue(){
return QueueBuilder.durable(DEAD_QUEUE).build();
}
// 普通队列与死信队列进行绑定
@Bean
public Binding queueABindNormal(@Qualifier("queueA") Queue queueA,
@Qualifier("nExchange") DirectExchange nExchange){
return BindingBuilder.bind(queueA).to(nExchange).with("normalA");
}
@Bean
public Binding queueBBindNormal(@Qualifier("queueB") Queue queueB,
@Qualifier("nExchange") DirectExchange nExchange){
return BindingBuilder.bind(queueB).to(nExchange).with("normalB");
}
@Bean
public Binding queueCBindNormal(@Qualifier("queueC") Queue queueC,
@Qualifier("nExchange") DirectExchange nExchange){
return BindingBuilder.bind(queueC).to(nExchange).with("normalC");
}
@Bean
public Binding deadQueueBindDead(@Qualifier("deadQueue") Queue deadQueue,
@Qualifier("dExchange") DirectExchange dExchange){
return BindingBuilder.bind(deadQueue).to(dExchange).with("dead");
}
}
  • 生产者P
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Slf4j
@RestController
@RequestMapping("/ttl")
public class sendMsgController {
@Resource
private RabbitTemplate rabbitTemplate;

@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message){
log.info("开始时间:{},发送消息给两个TTL队列:{}", new Date().toString(), message);
rabbitTemplate.convertAndSend("normal_exchange", "normalA", "10s的TTL队列:" + message);
rabbitTemplate.convertAndSend("normal_exchange", "normalB", "40s的TTL队列:" + message);
}

@GetMapping("/sendTTLMsg/{message}/{TTL}")
public void sendMsg(@PathVariable String message, @PathVariable String TTL){
log.info("开始时间:{},发送自定义TTL为{}ms的消息给队列:{}", new Date().toString(), TTL, message);
rabbitTemplate.convertAndSend("normal_exchange", "normalC", message, msg ->{
// 自定义延迟时长
msg.getMessageProperties().setExpiration(TTL);
return msg;
});
}
}
  • 消费者C
1
2
3
4
5
6
7
8
9
@Slf4j
@Component
public class deadQueueConsumer {
@RabbitListener(queues = "deadQueue")
public void receive(Message message) throws Exception{
String msg = new String(message.getBody());
log.info("接收时间:{},收到死信队列的消息:{}", new Date().toString(), msg);
}
}

RabbitMQ插件实现延迟队列(交换机延迟,推荐)

github:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v3.8.0

下载官方插件,将压缩包放到MQ的插件目录 /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins ,然后重启MQ,随后我们在Web管理界面的交换机类型中看见 x-delayed-message ,也就是官方插件的延迟队列实现。

插件实现延迟,整体流程更简单,仅仅只需要一个队列和一个交换机。由交换机内部进行延时的设定,所以我们要对交换机进行参数配置。

完成插件延迟队列后,不会出现TTL那种同一个队列拥塞的情况。所以推荐使用插件实现延迟队列。

  • 插件声明队列与延迟交换机
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Configuration
public class DelayedQueueConfig {
public static final String DELAYED_QUEUE = "delayed_queue";
public static final String DELAYED_EXCHANGE = "delayed_exchange";
public static final String DELAYED_ROUTING_KEY = "delayed_routing_key";

// 声明队列
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE);
}
// 声明自定义交换机
@Bean
public CustomExchange delayedExchange(){
// 这里其他参数代表直连的延迟类型,下面指这个是延迟消息
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-delayed-type", "direct");
/**
* 1、交换机名称
* 2、交换机类型
* 3、是否持久化
* 4、是否自动删除
* 5、其他参数
*/
return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, arguments);
}
// 完成绑定
@Bean
public Binding delayedBind(@Qualifier("delayedQueue") Queue delayedQueue,
@Qualifier("delayedExchange") CustomExchange delayedExchange){
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
  • 生产者
1
2
3
4
5
6
7
8
9
@GetMapping("/sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime){
log.info("开始时间:{},发送插件延时为{}ms的消息给队列:{}", new Date().toString(), delayTime, message);
rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE, DelayedQueueConfig.DELAYED_ROUTING_KEY, message, msg ->{
// 自定义延迟时长ms
msg.getMessageProperties().setDelay(delayTime);
return msg;
});
}
  • 插件延迟消费者
1
2
3
4
5
6
7
8
9
@Slf4j
@Component
public class delayQueueConsumer {
@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE)
public void receive(Message message) throws Exception{
String msg = new String(message.getBody());
log.info("接收时间:{},收到死信队列的消息:{}", new Date().toString(), msg);
}
}

发布与确认进阶

若RabbitMQ服务器发生宕机,其重启期间生产者消息发送失败,导致消息丢失,此时我们需要手动恢复消息。也就是说生产者发送消息时,交换机或队列收不到消息该如何处理。

就是当交换机与RoutingKey有错误时,可自定义错误的输出,方便我们查看哪一步出错,进而执行消息重发操作。

交换机确认消息(交换机匹配错误返回失败情况)

交换机判断消息的接收情况,执行回调方法的确认消息

  • yml配置文件开启发布确认模式
1
publisher-confirm-type: correlated
  • 配置交换机与队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Configuration
public class AdvancedConfig {
public static final String CONFIRM_EXCHANGE = "confirm_exchange";
public static final String CONFIRM_QUEUE = "confirm_queue";
public static final String ROUTING_KEY = "routing1";

@Bean
public DirectExchange confirmExchange(){
return new DirectExchange(CONFIRM_EXCHANGE);
}
@Bean
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE).build();
}
@Bean
public Binding bind(Queue confirmQueue,DirectExchange confirmExchange){
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY);
}
}
  • 回调方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 消息的回调接口,判断消息的接收情况
@Slf4j
@Component
public class myCallBack implements RabbitTemplate.ConfirmCallback {
@Resource
private RabbitTemplate rabbitTemplate;

// 本方法是RabbitTemplate的一个实现类,我们只是实现了接口
// 但是RabbitTemplate并没有注入这个实现类
// 所以我们使用@PostConstruct注解在进行注入
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
}

/**
* 交换机的确认回调方法
* 1、保存回调消息的ID及相关信息
* 2、是否接收到消息
* 3、失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String ID = correlationData != null ? correlationData.getId() : "";
if(ack){
log.info("交换机已收到ID为{}的消息", ID);
}else {
log.info("交换机没有收到ID为{}的消息,其原因是{}", ID, cause);
}
}
}
  • 生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
@Slf4j
@RestController
@RequestMapping("/confirm")
public class AdvancedController {
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendConfirmMsg/{message}")
public void sendMsg(@PathVariable String message){
CorrelationData correlationData = new CorrelationData("1");
log.info("发送消息为:{}", message);
rabbitTemplate.convertAndSend(AdvancedConfig.CONFIRM_EXCHANGE, AdvancedConfig.ROUTING_KEY, message, correlationData);
}
}
  • 消费者
1
2
3
4
5
6
7
8
9
@Slf4j
@Component
public class confirmConsumer {
@RabbitListener(queues = AdvancedConfig.CONFIRM_QUEUE)
public void recive (Message message){
String msg = new String(message.getBody());
log.info("接收发布队列的消息:{}", msg);
}
}

回退消息(返回key失败消息的信息)

  • yml配置文件开启回退
1
publisher-returns: true
  • 回退接口,在myCallBack中增加方法
1
2
3
4
5
6
7
8
9
10
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
// 注入一下
rabbitTemplate.setReturnsCallback(this);
}
@Override
public void returnedMessage(ReturnedMessage message) {
log.error("消息:{},被交换机{}回退,回退原因:{},路由key:{}", new String(message.getMessage().getBody()), message.getExchange(), message.getReplyText(), message.getRoutingKey());
}
  • 生产者

此时routingKey有错误,会进行回退操作,也就是返回失败消息的信息

1
2
3
4
5
6
@GetMapping("/sendConfirmMsg/{message}")
public void sendMsg(@PathVariable String message){
CorrelationData correlationData = new CorrelationData("1");
log.info("发送消息为:{}", message);
rabbitTemplate.convertAndSend(AdvancedConfig.CONFIRM_EXCHANGE, AdvancedConfig.ROUTING_KEY + "1", message, correlationData);
}

备份交换机

34.png

备份交换机优先级高于回退,优先备份,失败则进行回退。

  • 队列与交换机配置

在之前的配置中补上即可,注意备份模式是fanout,广播给备份队列和警告队列,无需RoutingKey。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Configuration
public class AdvancedConfig {
·····
public static final String BACK_UP_EXCHANGE = "back_up_exchange";
public static final String BACK_UP_QUEUE = "back_up_queue";
public static final String WARNING_QUEUE = "warning_queue";
@Bean
public FanoutExchange backupExchange(){
return new FanoutExchange(BACK_UP_EXCHANGE);
}
@Bean
public Queue backupQueue(){
return QueueBuilder.durable(BACK_UP_EXCHANGE).build();
}
@Bean
public Queue warningQueue(){
return QueueBuilder.durable(WARNING_QUEUE).build();
}
@Bean
public Binding backupBind(Queue backupQueue,FanoutExchange backupExchange){
return BindingBuilder.bind(backupQueue).to(backupExchange);
}
@Bean
public Binding warningBind(Queue warningQueue,FanoutExchange backupExchange){
return BindingBuilder.bind(warningQueue).to(backupExchange);
}
}
  • 消费者

备份队列消费者就是之前的发布确认的消费者,这里警告队列需要一个新的警告消费者。

然后通过错误的路由key查看警告处理

1
2
3
4
5
6
7
8
9
@Slf4j
@Component
public class warningConsumer {
@RabbitListener(queues = AdvancedConfig.WARNING_QUEUE)
public void recive (Message message){
String msg = new String(message.getBody());
log.error("警告:发现消息{},存在路由问题", msg);
}
}