netcore下RabbitMQ佇列、死信佇列、延時佇列及小應用

2023-01-03 06:00:59

關於安裝rabbitmq這裡一筆掠過了。

下面進入正題:

1.新建aspnetcorewebapi空專案,NormalQueue,刪除controllers資料夾已經無關的檔案,這裡為了偷懶不用console控制檯:

public class Program
    {
        public static void Main(string[] args)
        {
            var builder = WebApplication.CreateBuilder(args);

            // Add services to the container.

            builder.Services.AddControllers();
            // Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
            builder.Services.AddEndpointsApiExplorer();
            builder.Services.AddSwaggerGen();
            builder.Services.AddHostedService<ConsumerService>();
            builder.Services.AddHostedService<DeadLetterExchangeConsuerService>();
            builder.Services.AddHostedService<DelayExchangeConsumerService>();
            var app = builder.Build();

            // Configure the HTTP request pipeline.
            if (app.Environment.IsDevelopment())
            {
                app.UseSwagger();
                app.UseSwaggerUI();
            }
            app.MapGet("/normal/{message}", ([FromRoute] string message) =>
            {
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = "localhost";
                factory.Port = 5672;
                using (IConnection connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        var queueName = "rbTest202301";
                        channel.QueueDeclare(queueName, true, false, false, null);

                        {
                            string sendMessage = string.Format("Message_{0}", message);
                            byte[] buffer = Encoding.UTF8.GetBytes(sendMessage);
                            IBasicProperties basicProperties = channel.CreateBasicProperties();
                            basicProperties.DeliveryMode = 2; //持久化  1=非持久化
                            channel.BasicPublish("", queueName, basicProperties, buffer);
                            Console.WriteLine("訊息傳送成功:" + sendMessage);
                        }
                    }
                }
            });

            app.MapGet("/deadletterexchange/{message}",([FromRoute] string message) =>{
                DeadLetterExchange.Send(message);
            });

            app.MapGet("/delayexchange/{message}", ([FromRoute] string message) => {
                DelayExchange.SendMessage(message);
            });
            app.UseHttpsRedirection();

            app.UseAuthorization();


            app.MapControllers();

            app.Run();
        }
    }

大概的介紹一下program檔案:

這裡有三個mini控制器,從這裡傳送對應的訊息到rabbitmq

"/normal/{message}"   普通佇列,
"/deadletterexchange/{message}" 死信佇列
"/deadletterexchange/{message}"   延時佇列

 

        builder.Services.AddHostedService<ConsumerService>();
            builder.Services.AddHostedService<DeadLetterExchangeConsuerService>();
            builder.Services.AddHostedService<DelayExchangeConsumerService>();

這裡就是消費的服務,註冊成HostedService。

ConsumerService程式碼如下:
 public class ConsumerService : BackgroundService
    {
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            Console.WriteLine("normal Rabbitmq消費端開始工作!");
            while (!stoppingToken.IsCancellationRequested)
            {
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = "localhost";
                factory.Port = 5672;
               
                IConnection connection = factory.CreateConnection();
                {
                    IModel channel = connection.CreateModel();
                    {
                        var queueName = "rbTest202301";
                        channel.QueueDeclare(queueName, true, false, false, null);
                        //輸入1,那如果接收一個訊息,但是沒有應答,則使用者端不會收到下一個訊息
                        channel.BasicQos(0, 1, false);
                        //在佇列上定義一個消費者
                        var consumer = new EventingBasicConsumer(channel);
                        channel.BasicConsume(queueName, false, consumer);
                        consumer.Received += (ch, ea) =>
                        {
                            byte[] bytes = ea.Body.ToArray();
                            string str = Encoding.UTF8.GetString(bytes);
                            Console.WriteLine("佇列訊息:" + str.ToString());
                            //回覆確認
                            channel.BasicAck(ea.DeliveryTag, false);
                        };
                    }
                }
                await Task.Delay(5000);
            }
        }
    }

 

DeadLetterExchangeConsuerService程式碼如下:

 public class DeadLetterExchangeConsuerService : BackgroundService
    {
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            Console.WriteLine("RabbitMQ消費端死信佇列開始工作");
            while (!stoppingToken.IsCancellationRequested)
            {
                DeadLetterExchange.Consumer();
                await Task.Delay(5000);
            }
        }
    }
  public class DeadLetterExchange
    {
        public static string dlxExchange = "dlx.exchange";
        public static string dlxQueueName = "dlx.queue";
        static string exchange = "direct-exchange";
        static string queueName = "queue_Testdlx";
        static string dlxExchangeKey = "x-dead-letter-exchange";
        static string dlxQueueKey = "x-dead-letter-rounting-key";
        public static void Send(string message)
        {
            using (var connection = new ConnectionFactory() { HostName = "localhost", Port = 5672 }.CreateConnection())
            {
                using(var channel = connection.CreateModel())
                {
                   
                    channel.ExchangeDeclare(exchange, ExchangeType.Direct, true, false); //建立交換機
                    channel.QueueDeclare(queueName, true, false, false,new Dictionary<string, object>
                    {
                        { dlxExchangeKey,dlxExchange },
                        {dlxQueueKey,dlxQueueName }
                    }); // 建立佇列
                    channel.QueueBind(queueName, exchange, queueName);
                    

                    var properties = channel.CreateBasicProperties();
                    properties.Persistent= true;//持久化
                    channel.BasicPublish(exchange,queueName,properties,Encoding.UTF8.GetBytes(message));
                    Console.WriteLine($"向佇列:{queueName}傳送訊息:{message}");
                }
            }
        }
        
        public static void Consumer()
        {
            var connection = new ConnectionFactory() { HostName = "localhost", Port = 5672 }.CreateConnection();
            var channel = connection.CreateModel();
            channel.ExchangeDeclare(dlxExchange, ExchangeType.Direct, true, false); //建立sixin交換機
            channel.QueueDeclare(dlxQueueName, true, false, false); // 建立sixin佇列
            channel.QueueBind(dlxQueueName, dlxExchange, dlxQueueName); //繫結sixin佇列sixin交換機

            channel.ExchangeDeclare(exchange, ExchangeType.Direct, true, false); //建立交換機
            channel.QueueDeclare(queueName, true, false, false, new Dictionary<string, object>
                    {
                        { dlxExchangeKey,dlxExchange },
                        {dlxQueueKey,dlxQueueName }
                    }); // 建立佇列
            channel.QueueBind(queueName, exchange, queueName);

            var consumer = new EventingBasicConsumer(channel);
            channel.BasicQos(0, 1, false);
            consumer.Received += (model, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                Console.WriteLine($"佇列{queueName}消費訊息:{message},不做ack確認");
                channel.BasicNack(ea.DeliveryTag, false, requeue: false);
            };
            channel.BasicConsume(queueName, autoAck: false, consumer);
        }
    }

 

DelayExchangeConsumerService程式碼如下:

 public class DelayExchangeConsumerService : BackgroundService
    {
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            Console.WriteLine("RabbitMQ消費端延遲佇列開始工作");
            while (!stoppingToken.IsCancellationRequested)
            {
              
                DelayExchange.Consumer();
                await Task.Delay(5000);
            }
        }
    }
 public class DelayExchange
    {

        public static void SendMessage(string message)
        {
            //死信交換機
            string dlxexChange = "dlx.exchange";
            //死信佇列
            string dlxQueueName = "dlx.queue";

            //訊息交換機
            string exchange = "direct-exchange";
            //訊息佇列
            string queueName = "delay_queue";

            using (var connection = new ConnectionFactory() { HostName = "localhost", Port = 5672 }.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    ////建立死信交換機
                    //channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                    ////建立死信佇列
                    //channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
                    ////死信佇列繫結死信交換機
                    //channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);

                    // 建立訊息交換機
                    channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                    //建立訊息佇列,並指定死信佇列,和設定這個佇列的訊息過期時間為10s
                    channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments:
                                        new Dictionary<string, object> {
                                             { "x-dead-letter-exchange",dlxexChange}, //設定當前佇列的DLX(死信交換機)
                                             { "x-dead-letter-routing-key",dlxQueueName}, //設定DLX的路由key,DLX會根據該值去找到死信訊息存放的佇列
                                             { "x-message-ttl",10000} //設定佇列的訊息過期時間
                                        });
                    //訊息佇列繫結訊息交換機
                    channel.QueueBind(queueName, exchange, routingKey: queueName);

                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //properties.Expiration = "5000";釋出訊息,延時5s
                    //釋出訊息
                    channel.BasicPublish(exchange: exchange,
                                         routingKey: queueName,
                                         basicProperties: properties,
                                         body: Encoding.UTF8.GetBytes(message));
                    Console.WriteLine($"{DateTime.Now},向佇列:{queueName}傳送訊息:{message}");
                }
            }
        }

        public static void Consumer()
        {
            //死信交換機
            string dlxexChange = "dlx.exchange";
            //死信佇列
            string dlxQueueName = "dlx.queue";
            var connection = new ConnectionFactory() { HostName = "localhost", Port = 5672 }.CreateConnection();
            {
                //建立通道
                var channel = connection.CreateModel();
                {
                    //建立死信交換機
                    channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                    //建立死信佇列
                    channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
                    //死信佇列繫結死信交換機
                    channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);

                    var consumer = new EventingBasicConsumer(channel);
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);
                    consumer.Received += (model, ea) =>
                    {
                        //處理業務
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Console.WriteLine($"{DateTime.Now},佇列{dlxQueueName}消費訊息:{message}");
                        channel.BasicAck(ea.DeliveryTag, false);
                    };
                    channel.BasicConsume(dlxQueueName, autoAck: false, consumer);
                }
            }
        }
    }

 

 

延時佇列實際應用場景可能比較複雜,比如每條訊息的過期時間不一樣,收到的訊息的順序有可能會亂掉。這些不做深究,自行百度。

關於死信佇列常見應用場景之一下單,支付,支付超時的各種場景,下面通過一個簡單的例子模擬一下

同樣的新建一個空的webapi專案DeadLetterQueue,

program程式碼如下:

public class Program
    {
        public static void Main(string[] args)
        {
            var builder = WebApplication.CreateBuilder(args);

            // Add services to the container.

            builder.Services.AddControllers();
            // Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
            builder.Services.AddEndpointsApiExplorer();
            builder.Services.AddSwaggerGen();
            builder.Services.AddHostedService<ConsumerService>();
            builder.Services.AddHostedService<DeadConsumerService>();
            var app = builder.Build();

            // Configure the HTTP request pipeline.
            if (app.Environment.IsDevelopment())
            {
                app.UseSwagger();
                app.UseSwaggerUI();
            }
            app.MapGet("/normal/{message}", ([FromRoute] string message) =>
            {
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = "localhost";
                factory.Port = 5672;
                using (IConnection connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        var queueName = "rbTest2023010";
                      
                        //channel.ExchangeDeclare("exchange.dlx", ExchangeType.Direct, true);
                        //channel.QueueDeclare("queue.dlx", true, false, false, null);
                        channel.ExchangeDeclare("exchange.normal", ExchangeType.Fanout, true);
                        channel.QueueDeclare(queueName, true, false, false,
                            new Dictionary<string, object>
                        {
                            { "x-message-ttl" ,10000},
                            {"x-dead-letter-exchange","exchange.dlx" },
                            {"x-dead-letter-routing-key","routingkey" }
                        }
                            );
                       
                        channel.QueueBind(queueName, "exchange.normal", "");
                        {
                            string sendMessage = string.Format("Message_{0}", message);
                            byte[] buffer = Encoding.UTF8.GetBytes(sendMessage);
                            IBasicProperties basicProperties = channel.CreateBasicProperties();
                            basicProperties.DeliveryMode = 2; //持久化  1=非持久化
                            channel.BasicPublish("exchange.normal", queueName, basicProperties, buffer);
                            Console.WriteLine($"{DateTime.Now}訊息傳送成功:{sendMessage}" );
                        }
                    }
                }
            });
            app.UseHttpsRedirection();

            app.UseAuthorization();


            app.MapControllers();

            app.Run();
        }
    }

 

下單後消費程式碼ConsumerService如下

 public class ConsumerService : BackgroundService
    {
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            Console.WriteLine("normal Rabbitmq消費端開始工作!");
            while (!stoppingToken.IsCancellationRequested)
            {
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = "localhost";
                factory.Port = 5672;

                IConnection connection = factory.CreateConnection();
                {
                    IModel channel = connection.CreateModel();
                    {
                        var queueName = "rbTest2023010";
                        channel.ExchangeDeclare("exchange.normal", ExchangeType.Fanout, true);
                        channel.QueueDeclare(queueName, true, false, false, new Dictionary<string, object>
                        {
                            { "x-message-ttl" ,10000},
                            {"x-dead-letter-exchange","exchange.dlx" },
                            {"x-dead-letter-routing-key","routingkey" }
                        });

                        channel.QueueBind(queueName, "exchange.normal", "");
                        //輸入1,那如果接收一個訊息,但是沒有應答,則使用者端不會收到下一個訊息
                        channel.BasicQos(0, 1, false);
                        //在佇列上定義一個消費者
                        var consumer = new EventingBasicConsumer(channel);
                        channel.BasicConsume(queueName, false, consumer);
                        consumer.Received += (ch, ea) =>
                        {
                            byte[] bytes = ea.Body.ToArray();
                            string str = Encoding.UTF8.GetString(bytes);
                            Console.WriteLine($"{DateTime.Now}來自死信佇列獲取的訊息: {str.ToString()}");
                            //回覆確認
                            if (str.Contains("跳過")) //假設超時不處理,留給後面deadconsumerservice處理
                            {
                                Console.WriteLine($"{DateTime.Now}來自死信佇列獲取的訊息: {str.ToString()},該訊息被拒絕");
                                channel.BasicNack(ea.DeliveryTag, false,false);
                            }
                            else  //正常訊息處理
                            {
                                Console.WriteLine($"{DateTime.Now}來自死信佇列獲取的訊息: {str.ToString()},該訊息被接受");
                                channel.BasicAck(ea.DeliveryTag, false);
                            }
                        };

                    }
                }
                await Task.Delay(5000);
            }
        }
    }

通過模擬傳送的訊息加入跳過兩個字會拒收這條訊息,這樣就會跳到設定的exchange.dlx交換機佇列去,如果沒有跳過那麼這條訊息就正常處理掉,消費確認。

超時不處理後我們通過新的消費服務DeadConsumerService來處理這異常的消費,比如回覆庫存,訂單狀態改為取消等等

public class DeadConsumerService:BackgroundService
    {
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            Console.WriteLine("normal Rabbitmq消費端開始工作!");
            while (!stoppingToken.IsCancellationRequested)
            {
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = "localhost";
                factory.Port = 5672;
            
                IConnection connection = factory.CreateConnection();
                {
                    IModel channel = connection.CreateModel();
                    {
                        var queueName = "queue.dlx";
                        channel.ExchangeDeclare("exchange.dlx", ExchangeType.Direct, true);
                        channel.QueueDeclare("queue.dlx", true, false, false, null);
                       
                        channel.QueueDeclare(queueName, true, false, false, null);

                        channel.QueueBind(queueName, "exchange.dlx", "");
                        //輸入1,那如果接收一個訊息,但是沒有應答,則使用者端不會收到下一個訊息
                        channel.BasicQos(0, 1, false);
                        //在佇列上定義一個消費者
                        var consumer = new EventingBasicConsumer(channel);
                        channel.BasicConsume("queue.dlx", false, consumer);
                        consumer.Received += (ch, ea) =>
                        {
                            byte[] bytes = ea.Body.ToArray();
                            string str = Encoding.UTF8.GetString(bytes);
                            Console.WriteLine($"{DateTime.Now}超時未處理的訊息: {str.ToString()}");
                            //回覆確認
                            {
                                channel.BasicAck(ea.DeliveryTag, false);
                            }
                        };

                    }
                }
                await Task.Delay(5000);
            }
        }
    }

 

執行結果:

關於rabbitmq的死信佇列和延時佇列的介紹什麼的這裡不去貼baidu了,應用demo就這麼多了,程式碼這裡exercisebook/RabbitMQ.Test at main · liuzhixin405/exercisebook (github.com) 。小面分享一個完整一點的例子。

exercisebook/cat.seckill/cat.seckill at main · liuzhixin405/exercisebook (github.com)

感覺自己還是不合適寫這些玩意兒,沒有那麼細心和耐心,有這時間真不如寫寫demo。