博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
.NET Core RabbitMQ探索(2)——RabbitMQ的Exchange
阅读量:4678 次
发布时间:2019-06-09

本文共 10807 字,大约阅读时间需要 36 分钟。

  实际上,RabbitMQ的生产者并不会直接把消息发送给队列,甚至生产者都不知道消息是否会被发送给一个队列。对于生产者而言,它们只能把消息发送到Exchange,一个Exchange所完成的工作相当简单,一方面,它从生产者那里接收消息;另一方面它将消息存入队列中。一个Exchange需要准确的知道它要如何处理它接收到的消息,例如,它需要把消息转发到特定的队列,还是进行广播处理,或者直接将它丢弃。可以通过exchange type来定义Exchange处理消息的规则。

  整个框架结构图如图所示。

  Exchange types有以下几种:direct、topic、headers和fanout。如果我们没有定义Exchange,那么系统就会默认使用一个默认的Exchange,名为:"",就像我们入门篇里的一样,它会自己创建一个""的默认Exchange,然后将消息转发给特定routingKey的队列。

  • Direct Exchange

  使用direct exchange时,会将exchange与特定的队列进行绑定,转发时由routingkey进行队列的匹配,如图所示。

  在direct类型的exchange中,只有这两个routingkey完全相同,exchange才会选择对应的binding进行消息路由,代码示例如下所示:

  1. 首先我们需要将exchange和queue进行binding
channel.QueueBind(queue: "create_pdf_queue",                    exchange: "pdf_events",                    routingKey: "pdf_create",                    arguments: null);

  绑定时需要设置:队列名、exchange名和它们的routingkey。

  1. 在发送消息到exchange时会设置对应的routingkey
channel.BasicPublish(exchange: "pdf_events",                        routingKey: "pdf_create",                        basicProperties: properties,                        body: body);

  生产者发布消息时,需要设置exchange名和routingKey,如果exchange名和routingKey都与上述绑定的完全一致,那么该exchange就会将这条消息路由到队列。

  • Topic Exchange

  此类exchange与direct类似,唯一不同的地方是,direct类型要求routingKey完全一致,而这里可以可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“JiangYuZhou.#”能够匹配到“JiangYuZhou.pets.cat”,但是“JiangYuZhou.*” 只会匹配到“JiangYuZhou.money”。

  所以,Topic Exchange 使用非常灵活,topic exchange如图所示。

  例如,我们首先声明一个topic exchange,它的名称为"agreements":

// Topic类型的exchange, 名称 agreements    channel.ExchangeDeclare(exchange: "agreements",                            type: ExchangeType.Topic,                            durable: true,                            autoDelete: false,                            arguments: null);

  然后,我们声明三个队列,它们分别如下:

// 创建berlin_agreements队列    channel.QueueDeclare(queue: "berlin_agreements",                            durable: true,                            exclusive: false,                            autoDelete: false,                            arguments: null);    //创建 all_agreements 队列    channel.QueueDeclare(queue: "all_agreements",                            durable: true,                            exclusive: false,                            autoDelete: false,                            arguments: null);    //创建 headstore_agreements 队列    channel.QueueDeclare(queue: "headstore_agreements",                            durable: true,                            exclusive: false,                            autoDelete: false,                            arguments: null);

  最后,我们将agreements exchange分别与上面的三个队列以不同通配符的routingKey进行绑定:

//绑定 agreements --> berlin_agreements 使用routingkey:agreements.eu.berlin.#    channel.QueueBind(queue: "berlin_agreements",                        exchange: "agreements",                        routingKey: "agreements.eu.berlin.#",                        arguments: null);    //绑定 agreements --> all_agreements 使用routingkey:agreements.#    channel.QueueBind(queue: "all_agreements",                        exchange: "agreements",                        routingKey: "agreements.#",                        arguments: null);    //绑定 agreements --> headstore_agreements 使用routingkey:agreements.eu.*.headstore    channel.QueueBind(queue: "headstore_agreements",                        exchange: "agreements",                        routingKey: "agreements.eu.*.headstore",                        arguments: null);

  这时我们如果发送下列消息:

var message = "hello world"; var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.Persistent = true;channel.BasicPublish(exchange: "agreements",    routingKey: "agreements.eu.berlin",    basicProperties: properties,    body: body);

  该消息设置的exchange为"agreements",routingKey为"agreements.eu.berlin",所以它可以匹配上面的"agreements.eu.berlin.#"和"agreements.#",消息被转发到了"berlin_agreements"和"all_agreements"队列。

  • Fanout Exchange

  该exchange无需对routingKey进行匹配操作,而是很简单的直接将消息路由到所有绑定的队列中,如图所示。

  • Header Exchange

  此类型的路由规是根据header来判断的,首先需要以键值对的形式设置header的参数,在绑定exchange的时候将header以arguments的形式传递进去,传递参数时,键为"x-match"的header可以设置它的值为all或any,其中,all表示只有当发布的消息匹配该header中除"x-match"以外的所有值时,消息才会被转发到该队列;any表示当发布的消息匹配该header种除"x-match"外的任意值时,该消息会被转发到匹配队列。

 

代码操练

  最后我们以header exchange为例,演示我们的Exchange。首先我们创建四个项目,其中一个作为生产者,另作三个均作为消费者,并且使用:

dotnet add package RabbitMQ.Client

  给四个项目均安装上RabbitMQ的.NET包,并进行restore,项目结构如图所示:

  开始编写Send端的代码,其中,RabbitMQ还是使用我们在上一章种使用的Docker中RabbitMQ,程序如下:

using System;using System.Collections.Generic;using System.Text;using RabbitMQ.Client;namespace Send{    class Program    {        static void Main(string[] args)        {            var factory = new ConnectionFactory() { HostName = "148.70.210.208" };            using (var connection = factory.CreateConnection())            {                using (var channel = connection.CreateModel())                {                    //声明Headers类型的exchange,名称为agreements                    channel.ExchangeDeclare(exchange: "agreements",                        type: ExchangeType.Headers,                        autoDelete: false,                        arguments: null);                    //创建队列queue.A                    channel.QueueDeclare(queue: "queue.A",                        durable: true,                        exclusive: false,                        autoDelete: false,                        arguments: null);                    //创建队列queue.B                    channel.QueueDeclare(queue: "queue.B",                        durable: true,                        exclusive: false,                        autoDelete: false,                        arguments: null);                    //创建队列queue.C                    channel.QueueDeclare(queue: "queue.C",                        durable: true,                        exclusive: false,                        autoDelete: false,                        arguments: null);                    //绑定agreements=>queue.A,使用arguments(format=pdf、type=report、x-match=all)                    //只有当header中同时满足format=pdf、type=report时,消息才会被转发到队列A                    Dictionary
aHeader = new Dictionary
(); aHeader.Add("format", "pdf"); aHeader.Add("type", "report"); aHeader.Add("x-match", "all"); channel.QueueBind(queue: "queue.A", exchange: "agreements", routingKey: string.Empty, arguments: aHeader); //绑定agreements=>queue.B,使用arguments(format=pdf、type=log、x-match=any) //当header中满足format=pdf或type=log任意一个时,消息就会被转发到队列B Dictionary
bHeader = new Dictionary
(); bHeader.Add("format", "pdf"); bHeader.Add("type", "log"); bHeader.Add("x-match", "any"); channel.QueueBind(queue: "queue.B", exchange: "agreements", routingKey: string.Empty, arguments: bHeader); //绑定agreements=>queue.C,使用arguments(format=zip、type=report、x-match=all) //当header中同时满足format=zip和type=report时,消息会被转发到队列C Dictionary
cHeader = new Dictionary
(); cHeader.Add("format", "zip"); cHeader.Add("type", "report"); cHeader.Add("x-match", "all"); channel.QueueBind(queue: "queue.C", exchange: "agreements", routingKey: string.Empty, arguments: cHeader); string message1 = "hello world From 1"; var body = Encoding.UTF8.GetBytes(message1); var properties1 = channel.CreateBasicProperties(); properties1.Persistent = true; Dictionary
mHeader1 = new Dictionary
(); mHeader1.Add("format", "pdf"); mHeader1.Add("type", "report"); properties1.Headers = mHeader1; //这条消息会被转发到queue.A和queue.B //queue.A 的binding (format=pdf, type=report, x-match=all) //queue.B 的binding (format=pdf, type=log, x-match=any) channel.BasicPublish(exchange: "agreements", routingKey: string.Empty, basicProperties: properties1, body: body); string message2 = "hello world From 2"; body = Encoding.UTF8.GetBytes(message2); var properties2 = channel.CreateBasicProperties(); properties2.Persistent = true; Dictionary
mHeader2 = new Dictionary
(); mHeader2.Add("type", "log"); properties2.Headers = mHeader2; //这条消息会被转发到queue.B //queue.B 的binding (format = pdf, type = log, x-match = any) channel.BasicPublish(exchange: "agreements", routingKey: string.Empty, basicProperties: properties2, body: body); string message3 = "hello world From 3"; body = Encoding.UTF8.GetBytes(message3); var properties3 = channel.CreateBasicProperties(); properties3.Persistent = true; Dictionary
mHeader3 = new Dictionary
(); mHeader3.Add("format", "zip"); properties3.Headers = mHeader3; //这条消息不会被路由 //队列C要求同时满足两个条件,这里只满足了一个,没有匹配的队列 channel.BasicPublish(exchange: "agreements", routingKey: string.Empty, basicProperties: properties3, body: body); } } } }}

  运行程序后,可以看到,queue.A中匹配了三条消息、queue.B中匹配了两条、queue.C中没有匹配到任何消息。

  可以看到,队列A中匹配了一条信息,即Message 1,队列B中匹配了两条信息,即Message 1和Message2,队列C中没有匹配信息,符合我们程序的编写,下面用接收端进行接收。

  接收端分别写了三个程序,分别接收队列A、B、C的消息,它们除了绑定队列名称不同外,其余全部相同,下面是绑定队列A的接收程序:

using System;using System.Text;using RabbitMQ.Client;using RabbitMQ.Client.Events;namespace Recieve1{    class Program    {        static void Main(string[] args)        {            var factory = new ConnectionFactory() { HostName = "148.70.210.208" };            using (var connection = factory.CreateConnection())            {                using (var channel = connection.CreateModel())                {                    //注意要与发送端的声明一致                    channel.ExchangeDeclare(exchange: "agreements",                        type: ExchangeType.Headers,                        autoDelete: false,                        arguments: null);                    //绑定了queue.C和agreements Exchange                    channel.QueueBind(queue: "queue.A",                        exchange: "agreements",                        routingKey: string.Empty);                    Console.WriteLine("Waiting for messages");                    var consumer = new EventingBasicConsumer(channel);                    //绑定接收完成事件                    consumer.Received += (model, ea) =>                    {                        var body = ea.Body;                        var message = Encoding.UTF8.GetString(body);                        Console.WriteLine($"Recieve Message:{message}");                    };                    channel.BasicConsume(queue: "queue.A",                        autoAck: true,                        consumer: consumer);                    Console.WriteLine("Press [enter] to exit");                    Console.ReadLine();                }            }        }    }}

  最后,我们分别运行这三个接收程序:

  符合程序设计。

  参考:JulyLuo——

转载于:https://www.cnblogs.com/cquptjyz/p/10888923.html

你可能感兴趣的文章
WPF 大数据加载过程中的等待效果——圆圈转动
查看>>
ASP.NET Core 搭配 Nginx 的真实IP问题
查看>>
WPF下载远程文件,并显示进度条和百分比
查看>>
WPF设计の画刷(Brush)
查看>>
3.1依赖注入「深入浅出ASP.NET Core系列」
查看>>
WindowsXamlHost:在 WPF 中使用 UWP 的控件(Windows Community Toolkit)
查看>>
SQL Server 2012:SQL Server体系结构——一个查询的生命周期(第3部分)(完结)...
查看>>
基于MMSeg算法的中文分词类库
查看>>
《Programming WPF》翻译 第6章 2.资源与样式
查看>>
ASP.NET MVC+EF框架+EasyUI实现权限管理系列(1)-框架搭建
查看>>
【MS SQL】通过执行计划来分析SQL性能
查看>>
js 正则学习小记之NFA引擎
查看>>
REST接口POST方法发送文件到服务器(C#)
查看>>
关于Oracle Varchar2(120) 的疑惑
查看>>
Cygwin
查看>>
linux常用命令(三)
查看>>
ios发短信
查看>>
项目杂记(MONTHS_BETWEEN,Having ,Spool)
查看>>
cmd ora-12560协议适配器错误
查看>>
Linux压缩解压缩(unzip,tar)
查看>>