Publisher creates a connection to RabbitMQ and initializes a model representing the message queues and exchanges in the broker. It defines a RetryPolicy object, which controls how often Publisher attempts to connect to RabbitMQ. It publishes messages to a queue, using a routing key and a mandatory flag. Finally, it calls Dispose to release the resources used by the Publisher.
Shortcut: publisher
using Newtonsoft.Json;
using Polly;
using RabbitMQ.Client.Events;
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;
using Shared.Messaging.Payloads;
using Shared.OptionsBinding;
using System.Net.Sockets;
using System.Text;
using Polly.Retry;
namespace
{
public abstract class Publisher : IDisposable
{
private IConnection _connection;
private IModel _channel;
private Serilog.ILogger _log;
protected RetryPolicy Policy;
protected PublisherConfiguration _config;
public Publisher(Serilog.ILogger log, PublisherConfiguration configuration)
{
_log = log;
_config = configuration;
this.Policy = Polly.Policy.Handle<BrokerUnreachableException>()
.Or<SocketException>()
.WaitAndRetry(3, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
{
_log.Warning(ex, "Could not connect to RabbitMQ after {Timeout}s ({ExceptionMessage})",
$"{time.TotalSeconds:n1}", ex.Message);
});
StartConnection();
}
public void StartConnection()
{
var factory = new ConnectionFactory()
{
UserName = _config.UserName,
Password = _config.Password,
HostName = _config.Hostname,
Port = _config.Port,
VirtualHost = _config.VirtualHost,
};
Policy.Execute(() => _connection = factory.CreateConnection());
_channel = _connection.CreateModel();
_connection.ConnectionShutdown += OnConnectionShutdown;
_connection.CallbackException += OnCallbackException;
_connection.ConnectionBlocked += OnConnectionBlocked;
}
public void Publish<T>(Message<T> msg)
{
var message = JsonConvert.SerializeObject(msg);
var body = Encoding.UTF8.GetBytes(message);
Policy.Execute(() =>
{
var properties = _channel.CreateBasicProperties();
properties.DeliveryMode = 1;
_channel.BasicPublish(
exchange: msg.ExchangeName,
routingKey: msg.RoutingKey,
mandatory: true,
basicProperties: properties,
body: body);
});
}
private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e) =>
_log.Warning("A RabbitMQ connection is shutdown. Trying to re-connect...");
private void OnCallbackException(object sender, CallbackExceptionEventArgs e) =>
_log.Warning("A RabbitMQ connection throw exception. Trying to re-connect...");
private void OnConnectionShutdown(object sender, ShutdownEventArgs reason) =>
_log.Information("A RabbitMQ connection is on shutdown.");
public void Dispose()
{
if (_channel.IsOpen)
{
try
{
_channel.Close();
_connection.Close();
}
catch (IOException ex)
{
_log.Fatal(ex.ToString());
}
}
Dispose();
}
}
}