Skip to content

RabbitMQ

Oszkár Semeráth edited this page Oct 29, 2018 · 22 revisions

Megbízható üzenetküldés RabbitMQ alapon

Készítette: Bozóki Szilárd, Honfi Dávid

Bevezető és rövid ismertető

A mérés célja az üzenetsorokhoz kapcsolódó alapvető ismeretek elsajátítása. A mérés során a hallgatók a korábban elkészített Java alapú munkafolyamatot alakítják át üzenet alapú vezérlésűvé RabbitMQ használatával, ami egy Advanced Message Queuing Protocol (AMQP) implementáció.

Üzenetsorok

Egy üzenet alapú kommunikációs alrendszer célja, hogy kézbesítse a termelő (feladó, publisher) üzenetét (message) a fogyasztónak (consumer). Alap esetben az üzenetek a termelőktől direkt a fogyasztókhoz kerülnének, de üzenetsorok (queue) használata esetén az üzenetek egy soron keresztül indirekt kerülnek a fogyasztókhoz. Az üzenetsorok közbeiktatása révén így a termelő és a fogyasztó nincsenek direkt kapcsolatban és az üzenetek kézbesítés két lépésre bontható. Az egyik lépésben eljut az üzenet az üzenetsorig, a másik lépésben megérkezik a az üzenetsorból a fogyasztóhoz. Gyakorlatilag üzenetsorok használata során a termelő alkalmazás nem a konkrét fogyasztó alkalmazást célozza (címzi) meg, hanem egy cél üzenetsort.

Egy alkalmazásnak több bemeneti és több kimeneti sora is lehet, így a termelő és a fogyasztó szerepek természetesen keveredhetnek és sorról sorra változhatnak.

Üzenetsorok használata esetén a termelőnek azzal sem kell törődnie, hogy a cél fogyasztó elfoglalt vagy egyáltalán elérhető-e állapotban van e, ugyanis az üzenetküldő keretrendszer elintézi a célállomáshoz való szállítást.

Az ábrán két, egymással kommunikáló program látható, ahol az egyik kimeneti sora a másik bemeneti sora és fordítva, így a programok mind termelőként és fogyasztóként is viselkednek. A téglalapok a sorok és programok között a kommunikációs alrendszer interfészeit (API) reprezentálják, amelyeken keresztül a kommunikációs alrendszer és a programok érintkeznek.

RabbitMQ

AMQP esetén az üzeneteket az Exchange irányítja (route) az üzenetsorok felé. Egy üzenet több sorba is továbbítható, így az Exchange működése lehet például broadcast, unicast vagy multicast is.

Egy üzenetsorhoz több fogyasztó is tartozhat, ebben az esetben az üzenetsor beállítása határozza meg a kézbesítési logikát, ami lehet például round robin, ami az üzeneteket egyenletesen osztja el a fogyasztók között.

RabbitMQ eszközök

A rabbitmqctl a RabbitMQ legalapvetőbb eszköze. Olyan alapvető parancsokat lehet vele kiadni, mint például indítás, újraindítás, leállítás és a további pluginek engedélyezése.

A Management Plugin a RabbitMQ server kezelésére és monitorozására szolgál. Három felülettel is rendelkezik, amiket az adott szerveren különböző címeken lehet elérni.

  1. Grafikus webes (pl. http://server-name:15672/)
  2. HTTP API (pl. http://server-name:15672/api/)
  3. CLI (Management Command Line Tool) (pl. http://server-name:15672/cli/)

Hello World mintapélda

Termelő

A Hello World termelő oldali mintapéldája során először kapcsolódunk a RabbitMQ szerverhez és létrehozunk egy kommunikációs csatornát. Ezek után a csatornán deklarálunk egy üzenetsort egy választott névvel. Utána publikálunk egy rövid üzenetet és végezetül lezárjuk a csatornát és a kapcsolatot.

Kapcsolódás a RabbitMQ-hoz és a csatorna felépítése
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

Kapcsolódás során először létrehozzuk a kapcsolat felépítő objektumot (ConnectionFactory). Ezek után beállítjuk rajta a RabbitMQ szerver címét, ami a jelen példa esetén „localhost”. Majd létrehozzuk a kapcsolatot (newConnection). Végezetül a kapcsolaton felépítünk egy csatornát.

Üzenetsor létrehozása
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

Üzenetsort a csatorna queueDeclare metódusával tudunk létrehozni. A metódus első paramétere a létrehozandó üzenetsor neve, ami majd azonosítani fogja az üzenetsort. Az üzenetsor neve jelen esetben a QUEUE_NAME változó értéke lesz.

Üzenet publikálása
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

Üzenetküldéshez a basicPublish metódust használhatjuk. Címzéshez meg kell adni a csatorna nevét, ami jelen esetben a QUEUE_NAME változó értéke. Az üzenet tartalmát bájt tömbként (byte[]) lehet átadni, így a szerializálásról gondoskodnunk kell (getBytes()).

Csatorna és kapcsolat lezárása
channel.close();
connection.close();

Fogyasztó

A Hello World fogyasztó oldali mintapéldája során a termelőhöz hasonlóan először kapcsolódunk a RabbitMQ szerverhez és létrehozunk egy kommunikációs csatornát, majd után a csatornán deklarálunk egy üzenetsort a termelővel azonos névvel. Ezek után a csatornához hozzárendelünk egy puffer objektumot, amibe a RabbitMQ aszinkron (callback) kézbesíti az üzeneteket. A puffertől szinkron blokkolva várakozással kapjuk meg az üzeneteket.

Kapcsolódás a RabbitMQ-hoz és a csatorna felépítése

Megegyezik a termelőnél leírtakkal.

Üzenetsor létrehozása

Megegyezik a termelőnél leírtakkal. Fontos, hogy az üzenetsor neve azonos legyen a termelőnél használttal (QUEUE_NAME).

Puffer objektum létrehozása
Consumer consumer = new DefaultConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);

Először létrehozzuk a puffer objektumot az adott csatornán. (Consumer) Után hozzárendeljük a puffer objektumot mint üzenetsor fogyasztót aszinkron üzenetküldési célpontként az üzenetsorunkhoz. (basicConsume) Fontos, hogy a csatorna név megfelelő legyen (QUEUE_NAME).

Üzenet fogadása
Consumer.Delivery delivery = consumer.handleDelivery();
String message = new String(delivery.getBody());

Az üzenetek szinkron blokkolva fogadásához a puffer objektum nextDelivery metódusa használható. Miután kinyertük az üzenet bájt tömb tartalmát a getBody() metódussal, ezt deszerializálni szükséges a megfelelő objektummá.

Csatorna és kapcsolat lezárása

Megegyezik a termelőnél leírtakkal.

Felkészülés a mérésre

Az elméleti felkészüléshez az AMQP leírása használandó, míg a gyakorlati felkészüléshez a RabbitMQ gyorstalpaló.

Segítség a laborhoz

RabbitMQ Management felület engedélyezése

sudo rabbitmq-plugins enable rabbitmq_management

Management felület elérése

127.0.0.1:15672 - belépés: guest, guest

Jar képzése libekkel Maven segítségével

https://goo.gl/ftKnMy

Elosztott működéshez távoli csatlakozás engedélyezése

/etc/rabbitmq/rabbitmq.config -> [{ rabbit , [{ loopback_users , [ ] } ] } ] . (figyelj a pontra a végén!)

Feladatok

  • Készítse el a DocumentSimilarityEstimator RabbitMQ változatát! Alkalmazásában a feldolgozó csomópontok legyenek a megszokottak, csatornákként viszont minden esetben alkalmazzon RabbitMQ csatornákat!
  • Az alkalmazás kezdetben építse fel a csatornákat, és channel.basicPublish illetve Consumer-ekkel kommunikáljon! Az adatok sorosításáról gondoskodjon!
  • A könnyebb fejlesztés és hibakeresés miatt elégséges, ha a külöböző csomópontok több szálon futnak (mint a korábbi Többszálú alkalmazások fejlesztése Java nyelven laboron), nem szükséges külön alkalmazást indítani minden csomópontnak.
  • Alkalmazásában feltételezheti, hogy egy DocumentSimilarityEstimator fut, és nem kell a csatornákon másik géppel osztoznia.
  • Az alkalmazás futását szemléltesse a FutureTest teszttel!

Iránymutató az ellenőrző kérdésekhez

  • Mi egy üzenet alapú kommunikációs alrendszer célja?
  • Mik egy üzenet alapú kommunikációs alrendszer feladatai?
  • Miben különbözik az üzenet alapú kommunikáció egy közvetlen hívástól? Milyen következményekkel lehet számolni?
  • Jellemezd az alapfogalmakat (exchange, binding, queue, publisher, consumer, binding key, routing key) az alábbi kérdések megválaszolásával!
  • Hol helyezkedik el az üzenet alapú kommunikációban?
  • Milyen részfeladatot lát el?
  • Milyen más fogalmakkal van kapcsolatban és miért?
  • Hogyan működik?
  • Hogyan működnek és mire lehet használni a különböző exchange módokat (direct, fanout, topic, headers)?
  • Topic exchange esetén hányszor lesz kézbesítve az az üzenet, ami több mintára is illeszkedik?
  • Topic exchangeből hogyan lehet direct/fanout exchanget csinálni?
  • Hogy lehet direct exchangeből fanouthoz hasonlót csinálni? Mi lesz a különbség?
  • RabbitMQ esetén mi az alapértelmezett exchange típus és hogyan kapcsolódnak hozzá a csatornák?
  • RabbitMQ esetén mi az alapértelmezett csatorna kézbesítési mód, ha több fogyasztó is tartozik egy csatornához? Milyen problémát okozhat ez a mód? Hogyan lehet változtatni a módon?
  • RabbitMQ esetén lehet-e alapértelmezett beállításoknál a RabbitMQ-hoz bárhonnan csatlakozni?
  • Mi ellen véd az ACK üzenet?
  • Mikor történik az üzenetek újraküldés? Mi jelzi a feldolgozó meghibásodását?
  • Mi az auto ACK, mire használható és mire nem?
  • Milyen jelenség várható akkor, ha lemarad a manuális visszajelzés?
  • Mit állít vissza (mit biztosít) a queue durability és a message persistence?
  • Mikor és miért van szükség correlationId-ra? Milyen értékeket célszerű használni?
  • Milyen versenyhelyzet alakulhat ki, ami miatt az ismeretlen correlationID-val rendelkező üzeneteket célszerű eldobni?
  • Mi a Channel lényege? (Mivel takarékoskodik?)