主页 > 游戏开发  > 

[RabbitMQ]常见面试题汇总|工作流程|消息可靠性|消息顺序性|幂等性|高级特性|延迟队列|仲裁队列|工作

[RabbitMQ]常见面试题汇总|工作流程|消息可靠性|消息顺序性|幂等性|高级特性|延迟队列|仲裁队列|工作

 Author:MTingle major:人工智能


Build your hopes like a tower!



目录

一.MQ的作用及应用场景

二.常用MQ之间的区别

1. Kafka

2. RabbitMQ

3.RocketMQ

三.RabbitMQ的核心概念和工作流程

核心概念

工作流程

工作模式

简单模式

工作队列

发布/订阅模式

路由模式

topic通配符模式

RPC通信

发布确认模式

四.RabbitMQ消息可靠性保证​编辑

五.RabbitMQ保证消息顺序性

六.RabbitMQ消息积压问题

常见的消息积压原因

解决方案

怎么实现work模式能者多劳

七.如何保证消息消费时的幂等性

幂等性概念

解决方案

全局唯一ID

业务逻辑判断

八.RabbitMQ的死信队列

死信队列概念

消息变死信场景

九.介绍下RabbitMQ的延迟队列

应用场景

延迟队列实现方式

十.推拉模式

推拉模式概念

推拉模式实现


一.MQ的作用及应用场景

消息队列(MQ)是⼀种应用程序间的通信方法,它允许系统组件以异步的方式进行交互,在不同的应用场景下可以展现不同的作用.常见的应用场景如下:

异步解耦: 在一些业务场景中,一些操作可能比较耗时,但不需要立即返回结果,比如用户注册的时候,需要给用户发送短信,此时我们就可以作为异步处理,而不必等待所有操作都完成后,才告诉用户注册成功.

使用消息队列进行异步解耦后,当用户注册请求到来时,系统首先将用户信息存储到数据库,然后将发送短信或邮件的任务封装成消息发送到消息队列中,接着立即返回注册成功的结果给用户。之后,会有专门的消费者从消息队列中取出消息,并执行发送短信或邮件的操作。这样,发送短信或邮件的操作就与用户注册的主流程解耦开来,实现了异步处理。

流量削峰: 在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样突发的流量并不常见,如果因为这些特殊情况增加太多的机器,会造成巨大的浪费.于是我们可以使用MQ这种组件,支撑突发的访问压力,不会让我们的数据库因为突发流量而崩溃.比如双十一等活动的特价秒杀,就可以使用MQ来控制流量,让请求在队列中排好队,系统根据自己的处理能力逐步处理这些请求.

异步通信: 在很多时候我们并不需要立即处理消息,MQ为我们提供了异步机制,允许应用把一些消息放到MQ中,而不用立即处理它,在需要的时候再慢慢处理.就像我们在购买电影票的时候,程序会为我们锁票,我们不需要立即付款,而有一个订单失效时间,在我们确认付款后,系统才会处理我们的订单信息.

消息分发: 当多个系统需要对同一数据做出响应时,可以使用MQ进行消息分发,比如支付成功后,支付系统可以向MQ发送信息,其他系统订阅该信息,无需轮询数据库.还可以起到一个类似广播的效果,通知所有订阅MQ的程序.

延迟通知: 在需要在特定的时间后发送通知的场景中,可以使用MQ的延迟信息功能.比如在电子商务平台中,如果用户下单后⼀定时间内未支付,可以使用延迟队列在超时后自动取消订单

二.常用MQ之间的区别

目前业界有很多的MQ产品,例如RabbitMQ,RocketMQ,Kafka等.

1. Kafka

Kafka⼀开始的目的就是用于日志收集和传输,追求高吞吐量,性能卓越,单机吞吐达到十万级,在日志领域比较成熟,功能较为简单,主要支持简单的MQ功能.适合大数据处理,日志聚合,实时分析等场景.

2. RabbitMQ

采用Erlang语言开发,MQ功能比较完善,且几乎支持所有主流语言,开源提供的界面也非常友好,性能较好,吞吐量达到万级,社区活跃度高,文档更新频繁.比较适合数据量没那么大,并发量没那么高的场景.

3.RocketMQ

采用Java语言开发,在可用性,可靠性以及稳定性等方面都非常出色,吞吐量能达到十万级,在Alibaba集团内部广泛使用,但支持的客户端语言不多,产品较新文档较少, 且社区活跃度⼀般.适合于大规模分布式系统,可靠性要求高,且并发量大的场景,比如互联网金融.

这些消息队列,各有侧重,没有好坏之分,要看使用的场景,具体问题具体分析.

三.RabbitMQ的核心概念和工作流程

核心概念

RabbitMQ是一个消息中间件,也是一个生产者消费者模型,他负责接收消息,转发消息.

Producer: 生产者, 向RabbitMQ发送消息.

Consumer: 消费者, 从RabbitMQ接收消息.

Broker: 消息队列服务器或服务实例,也就是RabbitMQServer.接收消息并转发消息.

Connection: 网络连接,它允许客户端与RabbitMQ通信.

Channel: 连接里的⼀个虚拟通道,发送或者接收消息都是通过通道进行的.

Exchange: 交换机,负责接收生产者发送的消息,并根据路由算法和规则将消息路由到⼀个或多个队 列.接收消息并路由消息.

Queue: 消息队列,存储消息直到它们被消费者消费.

工作流程

1. 创建连接(Connection),开启信道(Channel).

2. 声明交换机和队列,以及绑定规则(RountingKey)

3. 发布消息.

4. 消息存入消息队列,如果未找到相应的队列,则根据生产者的配置,选择丢弃或者退回给生产者.

5. 消费消息,有手动消息确认和自动消息确认.

6. 消息被确认后,会从队列中删除.

工作模式 public class Contants { public static final String HOST = "123.207.74.192"; public static final Integer PORT = 5672; public static final String USER_NAME = "study"; public static final String PASSWORD = "study"; public static final String VIRTUAL_HOST = "bite"; // 工作队列模式 public static final String WORK_QUEUE = "work.queue"; // 发布订阅模式 public static final String FANOUT_EXCHANGE = "fanout.exchange"; public static final String FANOUT_QUEUE1 = "fanout.queue1"; public static final String FANOUT_QUEUE2 = "fanout.queue2"; // 路由模式 public static final String DIRECT_EXCHANGE = "direct.exchange"; public static final String DIRECT_QUEUE1 = "direct.queue1"; public static final String DIRECT_QUEUE2 = "direct.queue2"; // 通配符模式 public static final String TOPIC_EXCHANGE = "topic.exchange"; public static final String TOPIC_QUEUE1 = "topic.queue1"; public static final String TOPIC_QUEUE2 = "topic.queue2"; // RPC模式 public static final String RPC_REQUEST_QUEUE = "rpc.request"; public static final String RPC_RESPONSE_QUEUE = "rpc.response"; //publisher confirms public static final String PUBLISHER_CONFIRMS_QUEUE1 = "publisher.confirms.queue1"; public static final String PUBLISHER_CONFIRMS_QUEUE2 = "publisher.confirms.queue2"; public static final String PUBLISHER_CONFIRMS_QUEUE3 = "publisher.confirms.queue3"; } 简单模式

public class ConsumerDemo { public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { // 1.声明队列 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("123.207.74.192"); connectionFactory.setPort(5672); connectionFactory.setUsername("study"); connectionFactory.setPassword("study"); connectionFactory.setVirtualHost("bite"); Connection connection = connectionFactory.newConnection(); // 2.创建Channel Channel channel = connection.createChannel(); // 3.声明队列(可以省略,但不建议) channel.queueDeclare("hello",true,false,false,null); // 已存在则不会创建,不存在则会创建, // 不声明若队列不存在,则会报错,影响生产环境 // 4.消费消息 /** * String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException; * 参数说明 * queue: 队列名称 * autoAck: 是否自动确认 * callback: 接收到消息后,执行的逻辑 */ DefaultConsumer consumer = new DefaultConsumer(channel) { // 从队列中接收到信息就会执行的内容 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { /** * consumerTag: 消费者标签 * envelope: 封装信息,队列名称,交换机等 * propertie: 配置信息 * body: 消息内容 */ // TODO System.out.println("接收消息: "+new String(body)); } }; channel.basicConsume("hello",true,consumer); // 等待程序执行完成 Thread.sleep(2000); // 5.释放资源 // channel.close(); // connection.close(); } } public class ProducerDemo { public static void main(String[] args) throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Contants.HOST); connectionFactory.setPort(Contants.PORT); connectionFactory.setUsername(Contants.USER_NAME); connectionFactory.setPassword(Contants.PASSWORD); connectionFactory.setVirtualHost(Contants.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); // 2.开启信道 Channel channel = connection.createChannel(); // 3.声明交换机 使用内置交换机 // 4.声明队列 /** * Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, * Map<String, Object> arguments) throws IOException; * String queue: 队列名称 * boolean durable: 是否可以持久化 * boolean exclusive: 是否独占 * boolean autoDelete: 是否自动删除 * Map<String, Object> arguments: 参数 */ channel.queueDeclare("hello",true,false,false,null); // 5.发送消息 /** * String exchange, String routingKey, BasicProperties props, byte[] body * exchange: 交换机名称 * routingKey: 使用内置交换机,routingKey和队列名称保持一致 * props: 属性 * body: 发送的消息 */ for (int i=0;i<10;i++){ String msg = "hello RabbitMQ: "+i; channel.basicPublish("","hello",null,msg.getBytes()); } System.out.println("消息发送成功"); // // 6.资源释放(有前后顺序) // channel.close(); // 可以省略 // connection.close(); } } 工作队列

简单模式的增强版,和简单模式的区别就是:简单模式有⼀个消费者,工作队列模式支持多个消费者接收消息,消费者之间是竞争关系,每个消息只能被⼀个消费者接收

public class Producer { public static void main(String[] args) throws Exception { // 1.建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Contants.HOST); connectionFactory.setPort(Contants.PORT); connectionFactory.setUsername(Contants.USER_NAME); connectionFactory.setPassword(Contants.PASSWORD); connectionFactory.setVirtualHost(Contants.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); // 2.开启信道 Channel channel = connection.createChannel(); // 3.声明交换机 使用内置交换机 channel.queueDeclare(Contants.WORK_QUEUE,true,false,false,null); // 4.发送消息 for (int i=0;i<10;i++){ String msg = "hello work queue.... "+i; channel.basicPublish("",Contants.WORK_QUEUE,null,msg.getBytes()); } System.out.println("消息发送成功"); // // 6.资源释放(有前后顺序) // channel.close(); // 可以省略 // connection.close(); } } public class Consumer1 { public static void main(String[] args) throws Exception { // 1.声明队列 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Contants.HOST); connectionFactory.setPort(Contants.PORT); connectionFactory.setUsername(Contants.USER_NAME); connectionFactory.setPassword(Contants.PASSWORD); connectionFactory.setVirtualHost(Contants.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); // 2.创建Channel Channel channel = connection.createChannel(); // 3.声明队列(使用内置交换机) channel.queueDeclare(Contants.WORK_QUEUE,true,false,false,null);// 已存在则不会创建,不存在则会创建, // 不声明若队列不存在,则会报错,影响生产环境 // 4.消费消息 DefaultConsumer consumer = new DefaultConsumer(channel) { // 从队列中接收到信息就会执行的内容 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // TODO System.out.println("接收消息: "+new String(body)); } }; channel.basicConsume(Contants.WORK_QUEUE,true,consumer); // 等待程序执行完成 // 5.释放资源 // channel.close(); // connection.close(); } } public class Consumer2 { public static void main(String[] args) throws Exception { // 1.声明队列 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Contants.HOST); connectionFactory.setPort(Contants.PORT); connectionFactory.setUsername(Contants.USER_NAME); connectionFactory.setPassword(Contants.PASSWORD); connectionFactory.setVirtualHost(Contants.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); // 2.创建Channel Channel channel = connection.createChannel(); // 3.声明队列(使用内置交换机) channel.queueDeclare(Contants.WORK_QUEUE,true,false,false,null);// 已存在则不会创建,不存在则会创建, // 不声明若队列不存在,则会报错,影响生产环境 // 4.消费消息 DefaultConsumer consumer = new DefaultConsumer(channel) { // 从队列中接收到信息就会执行的内容 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // TODO System.out.println("接收消息: "+new String(body)); } }; channel.basicConsume(Contants.WORK_QUEUE,true,consumer); // 等待程序执行完成 // 5.释放资源 // channel.close(); // connection.close(); } } 发布/订阅模式

public class Producer { public static void main(String[] args) throws Exception { // 1.建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Contants.HOST); connectionFactory.setPort(Contants.PORT); connectionFactory.setUsername(Contants.USER_NAME); connectionFactory.setPassword(Contants.PASSWORD); connectionFactory.setVirtualHost(Contants.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); // 2.开启信道 Channel channel = connection.createChannel(); // 3.声明交换机 channel.exchangeDeclare(Contants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT,true); // 4.声明队列 channel.queueDeclare(Contants.FANOUT_QUEUE1,true,false,false,null); channel.queueDeclare(Contants.FANOUT_QUEUE2,true,false,false,null); // 5.交换机和队列绑定 channel.queueBind(Contants.FANOUT_QUEUE1,Contants.FANOUT_EXCHANGE,""); channel.queueBind(Contants.FANOUT_QUEUE2,Contants.FANOUT_EXCHANGE,""); // 6.发送消息 String msg = "hello fanout..."; channel.basicPublish(Contants.FANOUT_EXCHANGE,"",null,msg.getBytes()); // 7.释放资源 channel.close(); connection.close(); } } public class Consumer1 { public static void main(String[] args) throws Exception { // 1.声明队列 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Contants.HOST); connectionFactory.setPort(Contants.PORT); connectionFactory.setUsername(Contants.USER_NAME); connectionFactory.setPassword(Contants.PASSWORD); connectionFactory.setVirtualHost(Contants.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); // 2.创建Channel Channel channel = connection.createChannel(); // 3.声明队列(使用内置交换机) channel.queueDeclare(Contants.FANOUT_QUEUE1,true,false,false,null);// 已存在则不会创建,不存在则会创建, // 不声明若队列不存在,则会报错,影响生产环境 // 4.消费消息 DefaultConsumer consumer = new DefaultConsumer(channel) { // 从队列中接收到信息就会执行的内容 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // TODO System.out.println("接收消息: "+new String(body)); } }; channel.basicConsume(Contants.FANOUT_QUEUE1,true,consumer); // 等待程序执行完成 // 5.释放资源 // channel.close(); // connection.close(); } } public class Consumer2 { public static void main(String[] args) throws Exception { // 1.声明队列 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Contants.HOST); connectionFactory.setPort(Contants.PORT); connectionFactory.setUsername(Contants.USER_NAME); connectionFactory.setPassword(Contants.PASSWORD); connectionFactory.setVirtualHost(Contants.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); // 2.创建Channel Channel channel = connection.createChannel(); // 3.声明队列(使用内置交换机) channel.queueDeclare(Contants.FANOUT_QUEUE2,true,false,false,null);// 已存在则不会创建,不存在则会创建, // 不声明若队列不存在,则会报错,影响生产环境 // 4.消费消息 DefaultConsumer consumer = new DefaultConsumer(channel) { // 从队列中接收到信息就会执行的内容 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // TODO System.out.println("接收消息: "+new String(body)); } }; channel.basicConsume(Contants.FANOUT_QUEUE2,true,consumer); // 等待程序执行完成 // 5.释放资源 // channel.close(); // connection.close(); } } 路由模式

public class Producer { public static void main(String[] args) throws Exception { // 1.建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Contants.HOST); connectionFactory.setPort(Contants.PORT); connectionFactory.setUsername(Contants.USER_NAME); connectionFactory.setPassword(Contants.PASSWORD); connectionFactory.setVirtualHost(Contants.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); // 2.开启信道 Channel channel = connection.createChannel(); // 3.声明交换机 channel.exchangeDeclare(Contants.DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT,true); // 4.声明队列 channel.queueDeclare(Contants.DIRECT_QUEUE1,true,false,false,null); channel.queueDeclare(Contants.DIRECT_QUEUE2,true,false,false,null); // 5.交换机和队列绑定 channel.queueBind(Contants.DIRECT_QUEUE1,Contants.DIRECT_EXCHANGE,"a",null); channel.queueBind(Contants.DIRECT_QUEUE2,Contants.DIRECT_EXCHANGE,"a",null); channel.queueBind(Contants.DIRECT_QUEUE2,Contants.DIRECT_EXCHANGE,"b",null); channel.queueBind(Contants.DIRECT_QUEUE2,Contants.DIRECT_EXCHANGE,"c",null); // 6.发送消息 String msg_a = "hello direct,my routingkey is a..."; channel.basicPublish(Contants.DIRECT_EXCHANGE,"a",null,msg_a.getBytes()); String msg_b = "hello direct,my routingkey is b..."; channel.basicPublish(Contants.DIRECT_EXCHANGE,"b",null,msg_b.getBytes()); String msg_c = "hello direct,my routingkey is c..."; channel.basicPublish(Contants.DIRECT_EXCHANGE,"c",null,msg_c.getBytes()); System.out.println("消息发送成功"); // 7.释放资源 channel.close(); connection.close(); } } public class Consumer1 { public static void main(String[] args) throws Exception { // 1.建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Contants.HOST); connectionFactory.setPort(Contants.PORT); connectionFactory.setUsername(Contants.USER_NAME); connectionFactory.setPassword(Contants.PASSWORD); connectionFactory.setVirtualHost(Contants.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); // 2.开启信道 Channel channel = connection.createChannel(); // 3.声明队列(使用内置交换机) channel.queueDeclare(Contants.DIRECT_QUEUE1,true,false,false,null);// 已存在则不会创建,不存在则会创建, // 不声明若队列不存在,则会报错,影响生产环境 // 4.消费消息 DefaultConsumer consumer = new DefaultConsumer(channel) { // 从队列中接收到信息就会执行的内容 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // TODO System.out.println("接收消息: "+new String(body)); } }; channel.basicConsume(Contants.DIRECT_QUEUE1,true,consumer); // 等待程序执行完成 // 5.释放资源 // channel.close(); // connection.close(); } } public class Consumer2 { public static void main(String[] args) throws Exception { // 1.建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Contants.HOST); connectionFactory.setPort(Contants.PORT); connectionFactory.setUsername(Contants.USER_NAME); connectionFactory.setPassword(Contants.PASSWORD); connectionFactory.setVirtualHost(Contants.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); // 2.开启信道 Channel channel = connection.createChannel(); // 3.声明队列(使用内置交换机) channel.queueDeclare(Contants.DIRECT_QUEUE2,true,false,false,null);// 已存在则不会创建,不存在则会创建, // 不声明若队列不存在,则会报错,影响生产环境 // 4.消费消息 DefaultConsumer consumer = new DefaultConsumer(channel) { // 从队列中接收到信息就会执行的内容 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // TODO System.out.println("接收消息: "+new String(body)); } }; channel.basicConsume(Contants.DIRECT_QUEUE2,true,consumer); // 等待程序执行完成 // 5.释放资源 // channel.close(); // connection.close(); } } topic通配符模式

public class Producer { public static void main(String[] args) throws Exception { // 1.建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Contants.HOST); connectionFactory.setPort(Contants.PORT); connectionFactory.setUsername(Contants.USER_NAME); connectionFactory.setPassword(Contants.PASSWORD); connectionFactory.setVirtualHost(Contants.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); // 2.开启信道 Channel channel = connection.createChannel(); // 3.声明交换机 channel.exchangeDeclare(Contants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC,true); // 4.声明队列 channel.queueDeclare(Contants.TOPIC_QUEUE1,true,false,false,null); channel.queueDeclare(Contants.TOPIC_QUEUE2,true,false,false,null); // 5.交换机和队列绑定 channel.queueBind(Contants.TOPIC_QUEUE1,Contants.TOPIC_EXCHANGE,"*.a.*",null); channel.queueBind(Contants.TOPIC_QUEUE2,Contants.TOPIC_EXCHANGE,"*.*.b",null); channel.queueBind(Contants.TOPIC_QUEUE2,Contants.TOPIC_EXCHANGE,"c.#",null); // 6.发送消息 String msg_a = "hello topic,my routingkey is ae.a.f"; channel.basicPublish(Contants.TOPIC_EXCHANGE,"ae.a.f",null,msg_a.getBytes()); // 转发的Q1 String msg_b = "hello topic,my routingkey is ef.a.b"; channel.basicPublish(Contants.TOPIC_EXCHANGE,"ef.a.b",null,msg_b.getBytes()); // 转发到Q1和Q2 String msg_c = "hello topic,my routingkey is c.ef.d"; channel.basicPublish(Contants.TOPIC_EXCHANGE,"c.ef.d",null,msg_c.getBytes()); // 转发到Q2 System.out.println("消息发送成功"); // 7.释放资源 channel.close(); connection.close(); } } public class Consumer1 { public static void main(String[] args) throws Exception { // 1.建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Contants.HOST); connectionFactory.setPort(Contants.PORT); connectionFactory.setUsername(Contants.USER_NAME); connectionFactory.setPassword(Contants.PASSWORD); connectionFactory.setVirtualHost(Contants.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); // 2.开启信道 Channel channel = connection.createChannel(); // 3.声明队列(使用内置交换机) channel.queueDeclare(Contants.TOPIC_QUEUE1,true,false,false,null);// 已存在则不会创建,不存在则会创建, // 不声明若队列不存在,则会报错,影响生产环境 // 4.消费消息 DefaultConsumer consumer = new DefaultConsumer(channel) { // 从队列中接收到信息就会执行的内容 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // TODO System.out.println("接收消息: "+new String(body)); } }; channel.basicConsume(Contants.TOPIC_QUEUE1,true,consumer); // 等待程序执行完成 // 5.释放资源 // channel.close(); // connection.close(); } } public class Consumer2 { public static void main(String[] args) throws Exception { // 1.建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Contants.HOST); connectionFactory.setPort(Contants.PORT); connectionFactory.setUsername(Contants.USER_NAME); connectionFactory.setPassword(Contants.PASSWORD); connectionFactory.setVirtualHost(Contants.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); // 2.开启信道 Channel channel = connection.createChannel(); // 3.声明队列(使用内置交换机) channel.queueDeclare(Contants.TOPIC_QUEUE2,true,false,false,null);// 已存在则不会创建,不存在则会创建, // 不声明若队列不存在,则会报错,影响生产环境 // 4.消费消息 DefaultConsumer consumer = new DefaultConsumer(channel) { // 从队列中接收到信息就会执行的内容 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // TODO System.out.println("接收消息: "+new String(body)); } }; channel.basicConsume(Contants.TOPIC_QUEUE2,true,consumer); // 等待程序执行完成 // 5.释放资源 // channel.close(); // connection.close(); } } RPC通信

public class RpcClient { public static void main(String[] args) throws Exception{ // 1.建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Contants.HOST); connectionFactory.setPort(Contants.PORT); connectionFactory.setUsername(Contants.USER_NAME); connectionFactory.setPassword(Contants.PASSWORD); connectionFactory.setVirtualHost(Contants.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); // 2.开启信道 Channel channel = connection.createChannel(); channel.queueDeclare(Contants.RPC_REQUEST_QUEUE,true,false,false,null); channel.queueDeclare(Contants.RPC_RESPONSE_QUEUE,true,false,false,null); // 3.发送请求 // 设置请求的相关属性 String msg = "hello rpc"; String correlationId = UUID.randomUUID().toString(); AMQP.BasicProperties props = new AMQP.BasicProperties().builder() .correlationId(correlationId) // 设置请求的唯一标识 .replyTo(Contants.RPC_RESPONSE_QUEUE) // 告诉服务端(Server)完成响应后将结果放在 Contants.RPC_RESPONSE_QUEUE 中 .build(); channel.basicPublish("",Contants.RPC_REQUEST_QUEUE,props,msg.getBytes()); // 4.接受响应 // 使用阻塞队列,存储响应信息 final BlockingQueue<String> response = new ArrayBlockingQueue<>(1); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String respMsg = new String(body); System.out.println("接收到回调消息: " + respMsg); if (correlationId.equals(properties.getCorrelationId())){ // 如果correlationId一致,校验成功 response.offer(respMsg); } } }; channel.basicConsume(Contants.RPC_RESPONSE_QUEUE,true,consumer); // 客户端的消费队列是 Contants.RPC_RESPONSE_QUEUE String result = response.take(); System.out.println("RPC Client 响应结果: "+result); } } public class RpcServer { public static void main(String[] args) throws Exception{ // 1.建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Contants.HOST); connectionFactory.setPort(Contants.PORT); connectionFactory.setUsername(Contants.USER_NAME); connectionFactory.setPassword(Contants.PASSWORD); connectionFactory.setVirtualHost(Contants.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); // 2.开启信道 Channel channel = connection.createChannel(); // 3.接受请求 channel.basicQos(1); // 一次只能接收一条信息 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String request = new String(body,"UTF-8"); System.out.println("接收到请求: "+request); String response = "针对request: "+request+" 响应成功"; AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder() .correlationId(properties.getCorrelationId()) .build(); channel.basicPublish("",Contants.RPC_RESPONSE_QUEUE,basicProperties,response.getBytes()); // getDeliveryTag:开启手动确认会生成,明确确认的信息是哪一条 false -> 不设置批量确认 channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(Contants.RPC_REQUEST_QUEUE,false,consumer); } } 发布确认模式

public class PublisherConfirms { private static final Integer MESSAGE_COUNT = 10000; static Connection createConnection() throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); //需要提前开放端口号 connectionFactory.setUsername(Constants.USER_NAME);//账号 connectionFactory.setPassword(Constants.PASSWORD); //密码 connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机 return connectionFactory.newConnection(); } public static void main(String[] args) throws Exception { //Strategy #1: Publishing Messages Individually //单独确认 // publishingMessagesIndividually(); //Strategy #2: Publishing Messages in Batches //批量确认 publishingMessagesInBatches(); //Strategy #3: Handling Publisher Confirms Asynchronously //异步确认 handlingPublisherConfirmsAsynchronously(); } /** * 异步确认 */ private static void handlingPublisherConfirmsAsynchronously() throws Exception{ try (Connection connection = createConnection()){ //1. 开启信道 Channel channel = connection.createChannel(); //2. 设置信道为confirm模式 channel.confirmSelect(); //3. 声明队列 channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null); //4. 监听confirm //集合中存储的是未确认的消息ID long start = System.currentTimeMillis(); SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>()); channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { if (multiple){ confirmSeqNo.headSet(deliveryTag+1).clear(); }else { confirmSeqNo.remove(deliveryTag); } } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { if (multiple){ confirmSeqNo.headSet(deliveryTag+1).clear(); }else { confirmSeqNo.remove(deliveryTag); } //业务需要根据实际场景进行处理, 比如重发, 此处代码省略 } }); //5. 发送消息 for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "hello publisher confirms"+i; long seqNo = channel.getNextPublishSeqNo(); channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes()); confirmSeqNo.add(seqNo); } while (!confirmSeqNo.isEmpty()){ Thread.sleep(10); } long end = System.currentTimeMillis(); System.out.printf("异步确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start); } } /** * 批量确认 * @throws Exception */ private static void publishingMessagesInBatches() throws Exception{ try(Connection connection = createConnection()) { //1. 开启信道 Channel channel = connection.createChannel(); //2. 设置信道为confirm模式 channel.confirmSelect(); //3. 声明队列 channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null); //4. 发送消息, 并进行确认 long start = System.currentTimeMillis(); int batchSize = 100; int outstandingMessageCount = 0; for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "hello publisher confirms"+i; channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes()); outstandingMessageCount++; if (outstandingMessageCount==batchSize){ channel.waitForConfirmsOrDie(5000); outstandingMessageCount = 0; } } if (outstandingMessageCount>0){ channel.waitForConfirmsOrDie(5000); } long end = System.currentTimeMillis(); System.out.printf("批量确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start); } } /** * 单独确认 */ private static void publishingMessagesIndividually() throws Exception { try(Connection connection = createConnection()) { //1. 开启信道 Channel channel = connection.createChannel(); //2. 设置信道为confirm模式 channel.confirmSelect(); //3. 声明队列 channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null); //4. 发送消息, 并等待确认 long start = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "hello publisher confirms"+i; channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes()); //等待确认 channel.waitForConfirmsOrDie(5000); } long end = System.currentTimeMillis(); System.out.printf("单独确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start); } } } 四.RabbitMQ消息可靠性保证

1. 生产者问题.因为应用程序故障,网络抖动等各种原因,生产者没有成功向broker发送消息.

2. 消息中间件自身问题.生产者成功发送给了Broker,但是Broker没有把消息保存好,导致消息丢失.

3. 消费者问题.Broker发送消息到消费者,消费者在消费消息时,因为没有处理好,导致broker将消费 失败的消息从队列中删除了.

针对 Producer -> Broker ,我们可以采用发送方确认机制实现.

        1) Producer -> Exchange confirm模式

        2) Exchang -> Queue return模式 

        3) 队列设置长度: 死信

针对 Broker ,交换机,队列,消息持久化.

针对 Broker -> Consumer 消费者确认

        1) 自动确认,消息到达消费者之后,如果消费者有逻辑错误,处理失败,此时消息已经从队列中删除,会导致消息丢失.

        2) 手动确认,提高消息可靠性.

        3) 重试机制,当消息消费失败可以重复消费来确保消息可靠性,不过要设置最大重试次数避免CPU资源的浪费.

五.RabbitMQ保证消息顺序性

消息顺序性保障分为:局部顺序性保证和全局顺序性保证. 局部顺序性通常指的是在单个队列内部保证消息的顺序.全局顺序性是指在多个队列或多个消费者之间保证消息的顺序.

在实际应用中,全局顺序性很难实现,可以考虑使用业务逻辑来保证顺序性,比如在消息中嵌入全局唯一序列号, 并在消费端进行排序处理.比如在购买商品的时候商品有唯一的可以排序订单号,我们可以通过查看该订单号的前一条和后一条是否已经被消费,再考虑是否消费该信息.(业务逻辑控制)

也可以考虑单队列,单消费者,不过全局顺序性这种情况下效率太低,我们再生产环境中要尽量避免.

相对而言, 局部顺序性更常见,也更容易实现.单个消费者的吞吐太低了,当需要多个消费者以提高处理速度时,可以使用分区消费.把⼀个队列分割成 多个分区,每个分区由⼀个消费者处理,以此来保持每个分区内消息的顺序性.(分区消费)

还可以考虑采用消息确认机制,消费者在处理完⼀条消息后,显式地发送确认,这样RabbitMQ才会移除并继续发送下一条消息.

六.RabbitMQ消息积压问题

消息积压是指在消息队列(如RabbitMQ)中,待处理的消息数量超过了消费者处理能力,导致消息在队列中不断堆积的现象.

常见的消息积压原因

1. 消息生产过快.在高流量或者高负载的情况下,生产者以极高的速率发送消息,超过了消费者的处理 能力.

2.消费者能力不足.消费者处理处理消息的速度跟不上消息生产的速度,也会导致消息在队列中积压.比如消费端业务逻辑复杂,耗时长,消费端性能低,系统资源限制,异常处理不当等

3.网络问题.出现网络延迟或者网络抖动,消费者无法及时或确认消息,最终导致消息积压.

4.RabbitMQ服务配置过低.

解决方案

1. 提高消费者效率.

a. 增加消费者实例数量,比如新增机器.

b. 优化业务逻辑,比如使用多线程来处理业务.

c. 设置prefetchCount,当⼀个消费者阻塞时,消息转发到其他未阻塞的消费者.

d. 消息发生异常时,设置合适的重试策略,或者转入到死信队列.

2. 限制生产者速率:流量控制,限流算法.

a. 流量控制:在消息生产者中实现流量控制逻辑,根据消费者处理能力动态调整发送速率.

b. 限流:使用限流工具,为消息发送速率设置⼀个上限.

c. 设置过期时间.如果消息过期未消费,可以配置死信队列,以避免消息丢失,并减少对主队列的压力.

3. 资源与配置优化

升级RabbitMQ服务器硬件,调整配置参数等.

怎么实现work模式能者多劳

在 Work 模式中,默认情况下 RabbitMQ 采用轮询(round-robin)的方式将消息依次分发给各个消费者,即每个消费者会轮流接收到消息。但这种方式可能会导致性能不均衡,能力强的消费者可能会闲置,而能力弱的消费者可能会堆积大量任务。为了实现能者多劳,可以通过设置消费者的预取计数(prefetch count)来实现。

七.如何保证消息消费时的幂等性 幂等性概念

在应用程序中,幂等性就是指对⼀个系统进行重复调用(相同参数),不论请求多少次,这些请求对系统的影响都是相同的效果.

比如数据库的 select 操作.不同时间两次查询的结果可能不同,但是这个操作是符合幂等性的.幂等 性指的是对资源的影响,而不是返回结果.查询操作对数据资源本身不会产生影响,之所以结果不同,可 能是因为两次查询之间有其他操作对资源进行了修改.

对于MQ而言,幂等性是指同⼀条消息,多次消费,对系统的影响是相同的.

一般消息中间件的消息传输保障分为三个层级.

1. Atmostonce:最多⼀次.消息可能会丢失,但绝不会重复传输.

2. Atleast once:最少⼀次.消息绝不会丢失,但可能会重复传输.

3. Exactly once:恰好⼀次.每条消息肯定会被传输⼀次且仅传输⼀次. RabbitMQ支持"最多一次"和"最少一次".

解决方案

MQ消费者的幂等性解决方案,一般有以下几种:

全局唯一ID

1. 为每条消息分配⼀个唯一标识符,比如UUID或者MQ消息中的唯一ID,但是⼀定要保证唯一性.

2. 消费者收到消息后,先用该id判断该消息是否已经消费过,如果已经消费过,则放弃处理.

3. 如果未消费过,消费者开始消费消息,业务处理成功后,把唯一ID保存起来(数据库或Redis等)

业务逻辑判断

在业务逻辑层面实现消息处理的幂等性. 例如:通过检查数据库中是否已存在相关数据记录,或者使用乐观锁机制来避免更新已被其他事务更改的数据,再或者在处理消息之前,先检查相关业务的状态,确保消息对应的操作尚未执行,然后才进行处 理,具体根据业务场景来处理.

八.RabbitMQ的死信队列 死信队列概念

死信(deadmessage) 简单理解就是因为种种原因,无法被消费的信息,就是死信. 有死信,自然就有死信队列.当消息在⼀个队列中变成死信之后,它能被重新被发送到另⼀个交换器中,这个交换器就是DLX( Dead Letter Exchange ),绑定DLX的队列,就称为死信队列(Dead Letter Queue,简称DLQ).

消息变死信场景

消息变成死信⼀般是由于以下几种情况:

1. 消息被拒绝( Basic.Reject/Basic.Nack ),并且设置 requeue 参数为 false (放弃重试)

2. 消息过期.

3. 队列达到最大长度.

九.介绍下RabbitMQ的延迟队列

延迟队列(DelayedQueue),即消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后, 消费者才能拿到这个消息进行消费.

应用场景

1. 智能家居:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作.这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到智能设备.

2. 日常管理:预定会议后,需要在会议开始前十五分钟提醒参会人参加会议.

3. 用户注册成功后,7天后发送短信,提高用户活跃度等.

延迟队列实现方式

1. 使用死信队列达到延迟队列的效果.

2. 使用RabbitMQ官方提供的延迟队列插件.

1. 基于死信实现的延迟队列.

        a. 优点: 1) 灵活不需要额外的插件支持.

        b. 缺点: 1) 存在消息顺序问题.

                     2) 需要额外的逻辑来处理死信队列的消息,增加了系统的复杂性.

2. 基于插件实现的延迟队列.

        a. 优点: 1) 通过插件可以直接创建延迟队列,简化延迟消息的实现.

                     2) 避免了DLX的时序问题.(使用死信作为延迟队列,消息一30s过期,消息二10s过期,消息一在消息二前面,此时消息一过期了,后面的消息二也跟着过期了).

        b. 缺点: 1) 需要依赖特定的插件,有运维工作.

                     2) 只适⽤特定版本.

十.推拉模式

RabbitMQ支持两种消息传递模式:推模式(push)和拉模式(pull).

推拉模式概念

推模式:消息中间件主动将消息推送给消费者.

拉模式:消费者主动从消息中间件拉取消息.

RabbitMQ主要是基于推模式工作的,它的核心设计是让消息队列中的消费者接收到由生产者发送的消 息.使用channel.basicConsume方法订阅队列,RabbitMQ就会把消息推送到订阅该队列的消费者,如 果只想从队列中获取单条消息而不是持续订阅,则可以使⽤channel.basicGet方法来进行消费消息.

推拉模式实现 public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); //需要提前开放端口号 connectionFactory.setUsername(Constants.USER_NAME);//账号 connectionFactory.setPassword(Constants.PASSWORD); //密码 connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机 Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); //3. 声明队列 使用内置的交换机 //如果队列不存在, 则创建, 如果队列存在, 则不创建 channel.queueDeclare(Constants.MESSAGE_QUEUE, true, false, false, null); //4. 发送消息 for (int i = 0; i < 10; i++) { String msg = "hello message queue...."+i; channel.basicPublish("",Constants.MESSAGE_QUEUE, null, msg.getBytes()); } System.out.println("消息发送成功~"); //6. 资源释放 channel.close(); connection.close(); } } public class PullConsumer { public static void main(String[] args) throws IOException, TimeoutException { //1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); //需要提前开放端口号 connectionFactory.setUsername(Constants.USER_NAME);//账号 connectionFactory.setPassword(Constants.PASSWORD); //密码 connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机 Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); //3. 声明队列 使用内置的交换机 //如果队列不存在, 则创建, 如果队列存在, 则不创建 channel.queueDeclare(Constants.MESSAGE_QUEUE, true, false, false, null); //4. pull 模式获取消息 GetResponse getResponse = channel.basicGet(Constants.MESSAGE_QUEUE, true); System.out.println("拉模式获取消息: "+ new String(getResponse.getBody(),"UTF-8")); //6. 资源释放 // channel.close(); // connection.close(); } } public class PushConsumer { public static void main(String[] args) throws IOException, TimeoutException { //1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); //需要提前开放端口号 connectionFactory.setUsername(Constants.USER_NAME);//账号 connectionFactory.setPassword(Constants.PASSWORD); //密码 connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机 Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); //3. 声明队列 使用内置的交换机 //如果队列不存在, 则创建, 如果队列存在, 则不创建 channel.queueDeclare(Constants.MESSAGE_QUEUE, true, false, false, null); //4. 消费消息 DefaultConsumer consumer = new DefaultConsumer(channel){ //从队列中收到消息, 就会执行的方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //TODO System.out.println("接收到消息:"+ new String(body)); } }; channel.basicConsume(Constants.MESSAGE_QUEUE, true, consumer); // //6. 资源释放 // channel.close(); // connection.close(); } }

标签:

[RabbitMQ]常见面试题汇总|工作流程|消息可靠性|消息顺序性|幂等性|高级特性|延迟队列|仲裁队列|工作由讯客互联游戏开发栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“[RabbitMQ]常见面试题汇总|工作流程|消息可靠性|消息顺序性|幂等性|高级特性|延迟队列|仲裁队列|工作