博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ 一二事(3) - 订阅模式(微信公众号模式)的应用
阅读量:5988 次
发布时间:2019-06-20

本文共 3395 字,大约阅读时间需要 11 分钟。

之前讲的消费者互相可以把队列中的消息全部读取,但是不是读完整的所有信息

那么采用订阅模式就行,这就是微信公众号的模式,

比如10个人订阅了我的公众号"BeJavaGod",当我发送一条消息的时候,

那么这10个人都能收到我的消息并且查看,比如本条消息,对吧?

生产者制造消息发送给交换机X,而不是发送给队列,队列和交换机绑定,消费者从各自的队列中获得消息

这样则实现一个生产者发送的所有消息都能被所有的消费者同时接收到

需要注意的地方是,在生产者创建消息发送到交换机时,此时没有队列,那么消息则丢失,消费者的队列绑定后再次发送则消息传达,原理是消息必须存放在队列中

生产者:

1 public class Send { 2  3     private final static String EXCHANGE_NAME = "test_exchange_fanout"; 4  5     public static void main(String[] argv) throws Exception { 6         // 获取到连接以及mq通道 7         Connection connection = ConnectionUtil.getConnection(); 8         Channel channel = connection.createChannel(); 9 10         // 声明exchange11         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");12 13         // 消息内容14         String message = "id=1001";15         channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());16         System.out.println(" [x] Sent '" + message + "'");17 18         channel.close();19         connection.close();20     }21 }

消费者1

1 public class Recv { 2  3     private final static String QUEUE_NAME = "test_queue_fanout_1"; 4  5     private final static String EXCHANGE_NAME = "test_exchange_fanout"; 6  7     public static void main(String[] argv) throws Exception { 8  9         // 获取到连接以及mq通道10         Connection connection = ConnectionUtil.getConnection();11         Channel channel = connection.createChannel();12 13         // 声明队列14         channel.queueDeclare(QUEUE_NAME, false, false, false, null);15 16         // 绑定队列到交换机17         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");18 19         // 同一时刻服务器只会发一条消息给消费者20         channel.basicQos(1);21 22         // 定义队列的消费者23         QueueingConsumer consumer = new QueueingConsumer(channel);24         // 监听队列,手动返回完成25         channel.basicConsume(QUEUE_NAME, false, consumer);26 27         // 获取消息28         while (true) {29             QueueingConsumer.Delivery delivery = consumer.nextDelivery();30             String message = new String(delivery.getBody());31             System.out.println(" [x] Received '" + message + "'");32             Thread.sleep(10);33 34             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);35         }36     }37 }

消费者2

1 public class Recv2 { 2  3     private final static String QUEUE_NAME = "test_queue_fanout_2"; 4  5     private final static String EXCHANGE_NAME = "test_exchange_fanout"; 6  7     public static void main(String[] argv) throws Exception { 8  9         // 获取到连接以及mq通道10         Connection connection = ConnectionUtil.getConnection();11         Channel channel = connection.createChannel();12 13         // 声明队列14         channel.queueDeclare(QUEUE_NAME, false, false, false, null);15 16         // 绑定队列到交换机17         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");18 19         // 同一时刻服务器只会发一条消息给消费者20         channel.basicQos(1);21 22         // 定义队列的消费者23         QueueingConsumer consumer = new QueueingConsumer(channel);24         // 监听队列,手动返回完成25         channel.basicConsume(QUEUE_NAME, false, consumer);26 27         // 获取消息28         while (true) {29             QueueingConsumer.Delivery delivery = consumer.nextDelivery();30             String message = new String(delivery.getBody());31             System.out.println(" [x] Received '" + message + "'");32             Thread.sleep(10);33 34             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);35         }36     }37 }

ok,这样就是最简单的订阅demo

转载地址:http://hvjlx.baihongyu.com/

你可能感兴趣的文章
如何使用Android MediaStore裁剪大图片
查看>>
重绘和回流
查看>>
ORA-03113:通信通道的文件结尾
查看>>
Django自带分页
查看>>
CefSharp High DPI问题的解决
查看>>
关于SQL Server服务占用内存过大---限制数据库内存使用
查看>>
解析ASP.NET Mvc开发之查询数据实例
查看>>
小程序传入两个值
查看>>
使用GRPC远程服务调用
查看>>
selenium自动化之显式等待和EC(expected_conditions)模块
查看>>
Python库安装相关问题
查看>>
speech to text
查看>>
java多态成员的特点总结
查看>>
20172328 2018-2019《Java软件结构与数据结构》第七周学习总结
查看>>
工作总结--琐碎操作和业务
查看>>
benchmarks
查看>>
Hibernate体系结构(入门)
查看>>
python学习12
查看>>
[记]创建常量、原子性的值类型
查看>>
Hadoop单机模式集群搭建
查看>>