目录

RabbitMQ - 死信队列

# 死信的概念

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

# 死信的来源

  • 消息 TTL 过期

    TTL是 Time To Live 的缩写, 也就是生存时间

  • 队列达到最大长度

    队列满了,无法再添加数据到 MQ 中

  • 消息被拒绝

    (basic.reject 或 basic.nack) 并且 requeue = false

# 死信实战

交换机类型是 direct,两个消费者,一个生产者,两个队列:消息队列和死信队列

image-20211110190646829

# 消息TTL过期

生产者代码

package com.kbt.demo08;

import com.kbt.utils.RabbitMQUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Young Kbt
 * @date 2021/11/10 19:10
 * @description 生产者
 */
public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

        // 设置消息的 TTL 时间 10s
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
        // 该信息是用作演示队列个数限制
        for (int i = 1; i <= 10; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
            System.out.println("生产者发送消息:" + message);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

消费者 C1 代码(启动之后关闭该消费者 模拟其接收不到消息)

package com.kbt.demo08;

import com.kbt.utils.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
 * @author Young Kbt
 * @date 2021/11/10 19:13
 * @description 消费者 C1
 */
public class Consumer01 {
    // 普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    // 死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        
        // 声明死信和普通交换机 类型为 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        
        // 声明私信队列
        String deadQueueName = "dead-queue";
        channel.queueDeclare(deadQueueName, false, false, false, null);
        // 死信队列绑定:队列、交换机、路由键(routingKey)
        channel.queueBind(deadQueueName, DEAD_EXCHANGE, "lisi");

        // 正常队列绑定死信队列信息
        Map<String, Object> params = new HashMap<>();
        // 正常队列设置死信交换机 参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //正常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");
        
        // 正常队列
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
        System.out.println("等待接收消息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println("Consumer01 接收到消息" + message);
        };
        channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {});
        
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

先启动消费者 C1,创建出队列,然后停止该 C1 的运行,则 C1 将无法收到队列的消息,无法收到的消息 10 秒后进入死信队列。启动生产者 producer 生产消息

image-20211110192243321

生产者生产消息完成,10 秒后启动 C2 消费者,它消费死信队列里面的消息,如果消费成功,则代表原本 C1 消费的消息 10 秒进入了死信队列

消费者 C2 代码

package com.kbt.demo08;

import com.kbt.utils.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
 * @author Young Kbt
 * @date 2021/11/10 19:13
 * @description 消费者 C1
 */
public class Consumer02 {
    // 死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        
        // 声明交换机
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 声明队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

        System.out.println("等待接收死信消息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println("Consumer02 接收到消息" + message);
        };
        channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {});
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

效果演示

image-20211110192743276

image-20211110192651293

# 死信最大长度

  1. 消息生产者代码去掉 TTL 属性,basicPublish 的第三个参数改为 null








     
     




     





    public class Producer {
        private static final String NORMAL_EXCHANGE = "normal_exchange";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMQUtils.getChannel();
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
    
            // 设置消息的 TTL 时间 10s
            // AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
    
            // 该信息是用作演示队列个数限制
            for (int i = 1; i <= 10; i++) {
                String message = "info" + i;
                channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());
                System.out.println("生产者发送消息:" + message);
            }
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
  2. C1 消费者修改以下代码(启动之后关闭该消费者 模拟其接收不到消息)



























     















    public class Consumer01 {
        // 普通交换机名称
        private static final String NORMAL_EXCHANGE = "normal_exchange";
        // 死信交换机名称
        private static final String DEAD_EXCHANGE = "dead_exchange";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMQUtils.getChannel();
    
            // 声明死信和普通交换机 类型为 direct
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
    
            // 声明私信队列
            String deadQueueName = "dead-queue";
            channel.queueDeclare(deadQueueName, false, false, false, null);
            // 死信队列绑定:队列、交换机、路由键(routingKey)
            channel.queueBind(deadQueueName, DEAD_EXCHANGE, "lisi");
    
            // 正常队列绑定死信队列信息
            Map<String, Object> params = new HashMap<>();
            // 正常队列设置死信交换机 参数 key 是固定值
            params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
            //正常队列设置死信 routing-key 参数 key 是固定值
            params.put("x-dead-letter-routing-key", "lisi");
            // 设置正常队列长度的限制,例如发送 10 个消息,6 个位正常,4 个则为死信
            params.put("x-max-length", 6);
    
            // 正常队列
            String normalQueue = "normal-queue";
            channel.queueDeclare(normalQueue, false, false, false, params);
            channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
            System.out.println("等待接收消息........... ");
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
                System.out.println("Consumer01 接收到消息" + message);
            };
            channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {});
    
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41

    注意

    因为参数改变了,所以需要把原先队列删除

    2021-11-10 @Young Kbt

  3. C2 消费者代码不变(启动 C2 消费者)

image-20211110193547802

# 死信消息被拒

1、消息生产者代码同上生产者一致

2、需求:消费者 C1 拒收消息 "info5"

消费者 C1 代码









































 
 
 
 
 
 
 
 







/**
 * @author Young Kbt
 * @date 2021/11/10 19:13
 * @description 消费者 C1
 */
public class Consumer01 {
    // 普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    // 死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        
        // 声明死信和普通交换机 类型为 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        
        // 声明私信队列
        String deadQueueName = "dead-queue";
        channel.queueDeclare(deadQueueName, false, false, false, null);
        // 死信队列绑定:队列、交换机、路由键(routingKey)
        channel.queueBind(deadQueueName, DEAD_EXCHANGE, "lisi");

        // 正常队列绑定死信队列信息
        Map<String, Object> params = new HashMap<>();
        // 正常队列设置死信交换机 参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //正常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");
        // 设置正常队列长度的限制,例如发送 10 个消息,6 个位正常,4 个则为死信
        // params.put("x-max-length", 6);
        
        // 正常队列
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
        System.out.println("等待接收消息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            if (message.equals("info5")) {
                System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
                //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
                channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
            } else {
                System.out.println("Consumer01 接收到消息" + message);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        // 开启手动应答
        channel.basicConsume(normalQueue, false, deliverCallback, consumerTag -> {});
        
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

image-20211110194202134

  1. C2 消费者代码不变

    /**
     * @author Young Kbt
     * @date 2021/11/10 19:13
     * @description 消费者 C1
     */
    public class Consumer02 {
        // 死信交换机名称
        private static final String DEAD_EXCHANGE = "dead_exchange";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMQUtils.getChannel();
            
            // 声明交换机
            channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
            // 声明队列
            String deadQueue = "dead-queue";
            channel.queueDeclare(deadQueue, false, false, false, null);
            channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
    
            System.out.println("等待接收死信消息........... ");
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
                System.out.println("Consumer02 接收到消息" + message);
            };
            channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
            });
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28

启动消费者 C1 等待 10 秒,再启动消费者 C2

效果演示

image-20211110194727053

更新时间: 2024/01/17, 05:48:13
最近更新
01
JVM调优
12-10
02
jenkins
12-10
03
Arthas
12-10
更多文章>