博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ学习之Publish/Subscribe(3)
阅读量:4518 次
发布时间:2019-06-08

本文共 4872 字,大约阅读时间需要 16 分钟。

 上一个教程中,我们创建了一个work queue. 其中的每个task都会被精确的传送到一个worker. 这节,我们将会讲把一个message传送到多个consumers. 这种模式叫做publish/subscribe(发布/订阅).

为了说明这种模式,我们将创建一个简单的日志系统(logging system. 它由两个程序组成,一个是发送日志message并且另一个接收。

最重要的,发布的日志message将会被广播到所有的receivers

Exchangs

前面我们讲的包含下面的:producer,queue,consumer

它的主要思想是producer绝不直接发送任何messagequeue. 很多情况下,producer甚至不知道一个message是否会被发送到任何queue.

如图,它会直接发送messages到一个exchange. 而对于exchange,一方面它接收来自producermessage,另一方面它把这些message推送到queues. 至于,messages是否会被发送一个特定的queue或者发送到很多queue或者丢弃,这些规则都由exchange type定义。

Exchange type: direct , topic , headers , fanout.

我们这节主要讲fanout,它会控制广播。

channel.ExchangeDeclare("logs", "fanout");

对于fanout exchange ,它会广播它收到的所有的messages 到它知道的所有的queue.

Listing exchanges

对于列出服务器上的exchanges , 你可以使用rabbitmqctl

sudo rabbitmqctl list_exchanges
The default exchange

在前面的教程中,我们不知道exchanges,但是我们仍然可以发送messages queues. 因为我们使用到了一个默认的exchange(a default exchange).这个默认的exchange是被空字符串(“”)定义。

回想下,我们之前怎样发送message

var message = GetMessage(args);    var body = Encoding.UTF8.GetBytes(message);    channel.BasicPublish(exchange: "",  //默认的exchange                         routingKey: "hello",                         basicProperties: null,                         body: body);

此时,messages会根据指定的routingKey被路由到queue.

现在,我们可以发布到指定的exchange.

var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: "logs",                     routingKey: "",                     basicProperties: null,                     body: body);

Temporary queues

之前我们使用过很多指定名称的queues(例如hellotask_queue). 可以命名一个queue是很重要的,我们可以指定workers到同一个queue。 而且使你可以在多个producersconsumers之前共享这个queue. 

We’re also interested only in currently flowing messages not in the old ones. 我们想要最新的message而不是仅仅之前的。

这需要解决两个事情。

  1. 首先,无论什么时候我们连接Rabbit,我们需要一个新的,空的queue。为了达到这个目的,我们可以创建一个带随机名称的queue。更好的办法,我们可以让服务器给我们选择一个随机的queue名称。
  2. 第二,一旦我们断开与consumer的连接,这个queue应该被自动删除。 

.NET客户端中,我们使用下面的语句创建一个带随机名称的queue (when we supply no parameters to QueueDeclare() we create a non-durable, exclusive, autodelete queue with a generated name)

var queueName = channel.QueueDeclare().QueueName;

Bindings

我们已经创建好了exchangequeue,它们之间的关系我们叫做binding. 用来告诉exchange发送messagesqueue. 

channel.QueueBind(queue: queueName,  //绑定                  exchange: "logs",                  routingKey: "");

现在,在logs exchange上会把messages发到我们的queue

Listing bindings
rabbitmqctl list_bindings

代码

这种fanout exchanges ,在发送时,会忽视routingKey的值。

EmitLog.cs(发送)

using System;using RabbitMQ.Client;using System.Text;class EmitLog{    public static void Main(string[] args)    {        var factory = new ConnectionFactory() { HostName = "localhost" };        using(var connection = factory.CreateConnection())        using(var channel = connection.CreateModel())        {            channel.ExchangeDeclare(exchange: "logs", type: "fanout");  //声明exchange            var message = GetMessage(args);            var body = Encoding.UTF8.GetBytes(message);            channel.BasicPublish(exchange: "logs",  //发送到logs exchange                                 routingKey: "",                                 basicProperties: null,                                 body: body);            Console.WriteLine(" [x] Sent {0}", message);        }        Console.WriteLine(" Press [enter] to exit.");        Console.ReadLine();    }    private static string GetMessage(string[] args)    {        return ((args.Length > 0)               ? string.Join(" ", args)               : "info: Hello World!");    }}

不允许发送到一个不存在的exchange.

如果没有queue绑定到exchangemessages将会丢失。如果没有consumer正在监听,我们可以安全的丢弃这些message.

ReceiveLogs.cs

using System;using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Text;class ReceiveLogs{    public static void Main()    {        var factory = new ConnectionFactory() { HostName = "localhost" };        using(var connection = factory.CreateConnection())        using(var channel = connection.CreateModel())        {            channel.ExchangeDeclare(exchange: "logs", type: "fanout"); //声明exchange            var queueName = channel.QueueDeclare().QueueName;  //获得随机queue name            channel.QueueBind(queue: queueName,  //定义queue和exchange的关系                              exchange: "logs",                              routingKey: "");            Console.WriteLine(" [*] Waiting for logs.");            var consumer = new EventingBasicConsumer(channel);  //回调            consumer.Received += (model, ea) =>            {                var body = ea.Body;                var message = Encoding.UTF8.GetString(body);                Console.WriteLine(" [x] {0}", message);            };            channel.BasicConsume(queue: queueName,                                 autoAck: true,                                 consumer: consumer);            Console.WriteLine(" Press [enter] to exit.");            Console.ReadLine();        }    }}

参考网址:

 

转载于:https://www.cnblogs.com/Vincent-yuan/p/10940726.html

你可能感兴趣的文章
简单的横向ListView实现(version 4.0)
查看>>
【转】jbdc程序启动报错:ORA-12505;PL/SQL却可以登录的解决方法
查看>>
Java Spring学习笔记03.@component
查看>>
(十)桥接模式-代码实现
查看>>
Windows Phone开发(29):隔离存储C 转:http://blog.csdn.net/tcjiaan/article/details/7447469...
查看>>
循环单链表操作
查看>>
iOS --- Touch ID指纹解锁
查看>>
强制命令-hdfs 主备间切换
查看>>
echarts使用记录(三):x/y轴数据和刻度显示及坐标中网格显示、格式化x/y轴数据...
查看>>
Spring事务
查看>>
修改Arduino串口缓冲区大小(转)
查看>>
深入解读键值产生原理,linux中的软链接和硬链接(转)
查看>>
CodeForces 591A
查看>>
super 、static、final关键字加深记忆哦!还有父子类构造函数调用问题
查看>>
JDBC之java数据库的连接与简单的sql语句执行
查看>>
图形验证码如何美化?
查看>>
「题解」:[组合数学][DP]:地精部落
查看>>
两个input之间有空隙,处理方法
查看>>
让你秒懂什么是 SEM、EDM、CPS、CPA、ROI、SEO……
查看>>
nginx connect failed (110- Connection timed out) 问题排查
查看>>