RabbitMQ, .NET Core, Nancy Fx, MongoDb – przykład kolejkowania zdarzeń.

Siemanko.

Jak już wspominałem we wcześniejszych postach od pewnego czasu staram się zgłebiać wiedzę na temat systemów rozproszonych i podejścia DDD. Im głebiej w las tym bardziej się jaram i zarazem dostrzegam ułomności standardowego monolitycznego podejścia do budowania aplikacji z pseudo warstwami abstrakcji, które i tak w końcu zamieniają się w spaghetti code (niestety). W tym poście opiszę przykład asynchronicznej komunikacji między aplikacjami poprzez serwer RabbitMQ.

Założenia

Dwie osobne aplikacje .NET Core z wykorzystaniem Nancy FX (bo tak, ale może być Mvc). Pierwsza aplikacja po wejściu na „index” publikuje zdarzenie do kolejki RabbitMQ z DateTime.Now jako danymi (może być cokolowiek).

Druga aplikacja po uruchomieniu „rejestruje” się w kolejce RabbitMQ jako subskrybent i zapisuje każde dane z odczytanego zdarzenia (czyli DateTime.Now w moemencie publikacji). Po wejściu na jej „index” wyświetla listę wszystkich DateTime.Now z odczytanych zdarzeń.

Crew propgramu to to, że druga aplikacja wcale nie musi być uruchomiona aby pierwsza mogła działać i robić swoje (czyli w tym przypadku tylko publikować zdarzenia na każde wejście na „index”) i żeby nic nie zostało utracone.  W momencie kiedy druga aplikacja wystartuje odczyta wszystkie wiadomości z kolejki i zapisze je w swojej bazie danych.

rabbit1

Instalacja

  1. Pobrać i zainstalować Erlang (potrzebne do rabita) – http://www.erlang.org/downloads
  2. Pobrac i zainstalować RabbitMQ – https://www.rabbitmq.com/download.html
  3. Po instalacji Rabbita upewnić się czy masz ustawione wszystkie zmienne – https://www.rabbitmq.com/install-windows-manual.html
  4. Odpalić w konsoli – rabbitmq-plugins enable rabbitmq_management
  5. Odpalić w przeglądarce – http://localhost:15672
  6. Zalogować się – login: guest, hasło: guest

Jesli wszystko poszło dobrze to powinieneś zobaczyć panel zarządzania

2

Producent

To co musi zrobić producent to podłączyć się do kolejki (zostanie utworzona jeśli jej nie ma) i opublikować zdarzenie:

private readonly IModel channel;
private readonly IConnection connection;
private readonly string queueName;

public EventSender()
{
   queueName = "helloWorldQueue";
   var factory = new ConnectionFactory() { HostName = "localhost" };
   this.connection = factory.CreateConnection();
   this.channel = connection.CreateModel();
   this.channel.QueueDeclare(queue: queueName,
   durable: false,
   exclusive: false,
   autoDelete: false,
   arguments: null);
}

public void SendEvent(string message)
{
   var body = Encoding.UTF8.GetBytes(message);

   this.channel.BasicPublish(exchange: "",
                             routingKey: queueName,
                             basicProperties: null,
                             body: body);
}

Po uruchomieniu i wejściu na „index”

3

wystarczy wywołać seriwisik publikujący zdarzenie. Po klikukrotnym odświeżeniu zajrzyjmy do rabbita:

4

Jest 8 wiadomości w kolejce, a aplikacja obsługująca te wiadomości nie jest jeszcze uruchomiona – ba! nawet jeszcze jej nie ma ;).

Subskrybent

Aplikacja przy starcie rejestruje się w kolejce

private readonly IMessagesService messagesService;
private IModel channel;
private IConnection connection;
private readonly string queueName;
private EventingBasicConsumer consumer;

		public EventConsumer(IMessagesService messagesService)
		{
			this.messagesService = messagesService;
			this.queueName = "helloWorldQueue";
		}

		public void Start()
		{
			var factory = new ConnectionFactory() { HostName = "localhost" };
			this.connection = factory.CreateConnection();
			this.channel = connection.CreateModel();
			this.channel.QueueDeclare(queue: queueName,
				durable: false,
				exclusive: false,
				autoDelete: false,
				arguments: null);

			this.consumer = new EventingBasicConsumer(this.channel);
			consumer.Received += (model, ea) =>
			{
				var body = ea.Body;
				var message = Encoding.UTF8.GetString(body);

				messagesService.InsertMessage(message);
			};
			channel.BasicConsume(queue: this.queueName,
				noAck: true,
				consumer: consumer);
		}

Niech obiekt tej klasy będzie singletonem aby był ciągle podłączony do kolejki i mógł odczytywać wiadomości na bieżąco. Imlementacja IMessagesService to warstwa dostępu do danych. Każdą odczytaną wiadomość zapisuje do bazy (w tym przypadku użyłem Mongo). Po wejściu na „index” subskrybenta wszystkie zapisane wiadomości zostaną wyświetlone:

6

Podsumowanie

Cały kodzik obu aplikacji dostępny tutaj.

Pjona!

Advertisements

2 uwagi do wpisu “RabbitMQ, .NET Core, Nancy Fx, MongoDb – przykład kolejkowania zdarzeń.

  1. Fajne, krotkie i na temat wprowadzenie. Co prawda .NET Core malo tam widoczne, ale 6 kroczkow do odpalenia RabbitMQ to mega szybki tutorial 🙂

    Lubię to

Skomentuj

Wprowadź swoje dane lub kliknij jedną z tych ikon, aby się zalogować:

Logo WordPress.com

Komentujesz korzystając z konta WordPress.com. Log Out / Zmień )

Zdjęcie z Twittera

Komentujesz korzystając z konta Twitter. Log Out / Zmień )

Facebook photo

Komentujesz korzystając z konta Facebook. Log Out / Zmień )

Google+ photo

Komentujesz korzystając z konta Google+. Log Out / Zmień )

Connecting to %s