Subscriber

    0

    1

    This code uses Polly, RabbitMQ.Client, RabbitMQ.Client.Events, RabbitMQ.Client.Exceptions, Shared.OptionsBinding, System.Net.Sockets, and System.Text.

    The Subscriber class is a background service that publishes messages to a RabbitMQ queue. It also handles connection shutdowns and connection failures.

    The StartConnection() method starts the RabbitMQ connection and sets up the subscriptions.

    The DeclareChannel() method declares the exchange and the queue.

    The DeclareQueue() method declares the queue and the exchange.

    The QueueBind() method binds the queue to the exchange.

    The ProcessEvent() method handles subscription and message processing.

    The Dispose() method releases all the resources used by the Subscriber.

    Shortcut: subscriber

    
    using Polly;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using RabbitMQ.Client.Exceptions;
    using Shared.OptionsBinding;
    using System.Net.Sockets;
    using System.Text;
    
    
    namespace
    {
      public class Subscriber : BackgroundService, IDisposable
      {
    
        private Serilog.ILogger _log;
        private IModel _channel;
        private IConnection _connection;
    
        private readonly SubscriberConfiguration _config;
        protected Subscriber(Serilog.ILogger log, SubscriberConfiguration configuration)
        {
          _log = log;
          _config = configuration;
    
          StartConnection();
        }
    
        public void StartConnection()
        {
          var factory = new ConnectionFactory()
          {
            UserName = _config.UserName,
            Password = _config.Password,
            HostName = _config.Hostname,
            Port = _config.Port,
            VirtualHost = _config.VirtualHost,
          };
    
          var policy = Policy.Handle<BrokerUnreachableException>()
            .Or<SocketException>()
            .WaitAndRetry(3, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
            {
              _log.Warning(ex, "Could not connect to RabbitMQ after {Timeout}s ({ExceptionMessage})",
                $"{time.TotalSeconds:n1}", ex.Message);
            });
    
          policy.Execute(() => _connection = factory.CreateConnection());
    
          _channel = _connection.CreateModel();
    
          _connection.ConnectionShutdown += OnConnectionShutdown;
          _connection.CallbackException += OnCallbackException;
          _connection.ConnectionBlocked += OnConnectionBlocked;
        }
    
    
    
        protected void DeclareChannel() =>
          _channel.ExchangeDeclare(
          exchange: _config.Exchange,
          durable: _config.EIsDurable,
          autoDelete: _config.EAutoDelete,
          type: _config.ExchangeType.Trim().ToLower());
    
    
        protected void DeclareQueue() =>
          _channel.QueueDeclare(
          queue: _config.QueueName,
          durable: _config.QIsDurable,
          exclusive: _config.EIsExclusive,
          autoDelete: _config.EAutoDelete);
    
    
        protected void QueueBind() =>
          _channel.QueueBind(
          queue: _config.QueueName,
          exchange: _config.ExchangeName,
          routingKey: _config.RoutingKey);
    
    
    
    
    
        protected override Task ExecuteAsync(CancellationToken stoppingToken)
        {
          stoppingToken.ThrowIfCancellationRequested();
    
          var consumer = new EventingBasicConsumer(_channel);
    
          consumer.Received += async (ModuleHandle, ea) =>
          {
            var body = ea.Body;
            var notificationMessage = Encoding.UTF8.GetString(body.ToArray());
    
            await ProcessEvent(notificationMessage);
          };
    
          _channel.BasicConsume(queue: _config.QueueName, autoAck: _config.AutoAck, consumer: consumer);
    
          return Task.CompletedTask;
        }
    
    
        protected async virtual Task ProcessEvent(string notificationMessage) { }
    
    
        private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e) =>
            _log.Warning("A RabbitMQ connection is shutdown. Trying to re-connect...");
    
        private void OnCallbackException(object sender, CallbackExceptionEventArgs e) =>
            _log.Warning("A RabbitMQ connection throw exception. Trying to re-connect...");
    
        private void OnConnectionShutdown(object sender, ShutdownEventArgs reason) =>
            _log.Information("A RabbitMQ connection is on shutdown.");
    
    
    
        public override void Dispose()
        {
          if (_channel.IsOpen)
          {
            try
            {
              _channel.Close();
              _connection.Close();
    
            }
            catch (IOException ex)
            {
              _log.Fatal(ex.ToString());
            }
          }
          base.Dispose();
        }
    
      }
    }
    
    
    
    Codiga Logo
    Codiga Hub
    • Rulesets
    • Playground
    • Snippets
    • Cookbooks
    soc-2 icon

    We are SOC-2 Compliance Certified

    G2 high performer medal

    Codiga – All rights reserved 2022.