Publisher creates a connection to RabbitMQ and initializes a model representing the message queues and exchanges in the broker. It defines a RetryPolicy object, which controls how often Publisher attempts to connect to RabbitMQ. It publishes messages to a queue, using a routing key and a mandatory flag. Finally, it calls Dispose to release the resources used by the Publisher.

    Shortcut: publisher

    using Newtonsoft.Json;
    using Polly;
    using RabbitMQ.Client.Events;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Exceptions;
    using Shared.Messaging.Payloads;
    using Shared.OptionsBinding;
    using System.Net.Sockets;
    using System.Text;
    using Polly.Retry;
      public abstract class Publisher : IDisposable
        private IConnection _connection;
        private IModel _channel;
        private Serilog.ILogger _log;
        protected RetryPolicy Policy;
        protected PublisherConfiguration _config;
        public Publisher(Serilog.ILogger log, PublisherConfiguration configuration)
          _log = log;
          _config = configuration;
          this.Policy = Polly.Policy.Handle<BrokerUnreachableException>()
            .WaitAndRetry(3, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
              _log.Warning(ex, "Could not connect to RabbitMQ after {Timeout}s ({ExceptionMessage})",
                $"{time.TotalSeconds:n1}", ex.Message);
        public void StartConnection()
          var factory = new ConnectionFactory()
            UserName = _config.UserName,
            Password = _config.Password,
            HostName = _config.Hostname,
            Port = _config.Port,
            VirtualHost = _config.VirtualHost,
          Policy.Execute(() => _connection = factory.CreateConnection());
          _channel = _connection.CreateModel();
          _connection.ConnectionShutdown += OnConnectionShutdown;
          _connection.CallbackException += OnCallbackException;
          _connection.ConnectionBlocked += OnConnectionBlocked;
        public void Publish<T>(Message<T> msg)
          var message = JsonConvert.SerializeObject(msg);
          var body = Encoding.UTF8.GetBytes(message);
          Policy.Execute(() =>
            var properties = _channel.CreateBasicProperties();
            properties.DeliveryMode = 1;
              exchange: msg.ExchangeName,
              routingKey: msg.RoutingKey,
              mandatory: true,
              basicProperties: properties,
              body: body);
        private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e) =>
            _log.Warning("A RabbitMQ connection is shutdown. Trying to re-connect...");
        private void OnCallbackException(object sender, CallbackExceptionEventArgs e) =>
            _log.Warning("A RabbitMQ connection throw exception. Trying to re-connect...");
        private void OnConnectionShutdown(object sender, ShutdownEventArgs reason) =>
            _log.Information("A RabbitMQ connection is on shutdown.");
        public void Dispose()
          if (_channel.IsOpen)
            catch (IOException ex)
    Codiga Logo
    Codiga Hub
    • Rulesets
    • Playground
    • Snippets
    • Cookbooks
    soc-2 icon

    We are SOC-2 Compliance Certified

    G2 high performer medal

    Codiga – All rights reserved 2022.