Publisher

    0

    0

    Publisher creates a connection to RabbitMQ and initializes a model representing the message queues and exchanges in the broker. It defines a RetryPolicy object, which controls how often Publisher attempts to connect to RabbitMQ. It publishes messages to a queue, using a routing key and a mandatory flag. Finally, it calls Dispose to release the resources used by the Publisher.

    Shortcut: publisher

    using Newtonsoft.Json;
    using Polly;
    using RabbitMQ.Client.Events;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Exceptions;
    using Shared.Messaging.Payloads;
    using Shared.OptionsBinding;
    using System.Net.Sockets;
    using System.Text;
    
    using Polly.Retry;
    
    namespace
    {
    
      public abstract class Publisher : IDisposable
      {
    
        private IConnection _connection;
        private IModel _channel;
        private Serilog.ILogger _log;
    
        protected RetryPolicy Policy;
        protected PublisherConfiguration _config;
        public Publisher(Serilog.ILogger log, PublisherConfiguration configuration)
        {
          _log = log;
          _config = configuration;
    
          this.Policy = Polly.Policy.Handle<BrokerUnreachableException>()
            .Or<SocketException>()
            .WaitAndRetry(3, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
            {
              _log.Warning(ex, "Could not connect to RabbitMQ after {Timeout}s ({ExceptionMessage})",
                $"{time.TotalSeconds:n1}", ex.Message);
            });
    
    
          StartConnection();
        }
    
    
    
        public void StartConnection()
        {
          var factory = new ConnectionFactory()
          {
            UserName = _config.UserName,
            Password = _config.Password,
            HostName = _config.Hostname,
            Port = _config.Port,
            VirtualHost = _config.VirtualHost,
          };
    
          Policy.Execute(() => _connection = factory.CreateConnection());
    
          _channel = _connection.CreateModel();
    
          _connection.ConnectionShutdown += OnConnectionShutdown;
          _connection.CallbackException += OnCallbackException;
          _connection.ConnectionBlocked += OnConnectionBlocked;
        }
    
        public void Publish<T>(Message<T> msg)
        {
          var message = JsonConvert.SerializeObject(msg);
          var body = Encoding.UTF8.GetBytes(message);
    
    
          Policy.Execute(() =>
          {
            var properties = _channel.CreateBasicProperties();
            properties.DeliveryMode = 1;
    
            _channel.BasicPublish(
              exchange: msg.ExchangeName,
              routingKey: msg.RoutingKey,
              mandatory: true,
              basicProperties: properties,
              body: body);
          });
    
        }
    
    
        private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e) =>
            _log.Warning("A RabbitMQ connection is shutdown. Trying to re-connect...");
    
        private void OnCallbackException(object sender, CallbackExceptionEventArgs e) =>
            _log.Warning("A RabbitMQ connection throw exception. Trying to re-connect...");
    
        private void OnConnectionShutdown(object sender, ShutdownEventArgs reason) =>
            _log.Information("A RabbitMQ connection is on shutdown.");
    
    
        public void Dispose()
        {
          if (_channel.IsOpen)
          {
            try
            {
              _channel.Close();
              _connection.Close();
    
            }
            catch (IOException ex)
            {
              _log.Fatal(ex.ToString());
            }
          }
          Dispose();
        }
    
      }
    }
    Codiga Logo
    Codiga Hub
    • Rulesets
    • Playground
    • Snippets
    • Cookbooks
    soc-2 icon

    We are SOC-2 Compliance Certified

    G2 high performer medal

    Codiga – All rights reserved 2022.