Merhaba
Bir önceki yazıda RabbitMQ’nun kuyruktaki mesajı sadece 1 adet işçiye gönderdiğini görmüştük. Bu yazıda ise bir mesajın birden fazla işçiye aynı anda gönderilmesi anlatılacaktır.
RabbitMQ yazı serisinin ikinci bölümününe https://www.mehmetince.net/rabbitmq-ile-python-uygulamalari-2/ adresinden erişebilirsiniz.
Exchanges
Bir önceki yazıda kuyruğa mesaj gönderen publisher’ların aslında direk kuyruk ile değil, mesajları kuyruğa ekleme işiyle ilgilenen exchange ile konuştuklarından bahsedilmişti.
Exchange’ler tahmin edildiği üzere, mesajların ilgili kuyruklara yerleştirilmesiyle görevlidirler. RabbitMQ yapısında direct, topic, headers ve fanout olmak üzere hazır gelen exchange tipleri bulunmaktadır.
Önceki örneklerde basic_puslish metodu exchange ismini boş olarak göndermekteydi. Bu durumda gönderilen mesajı default tanımlı exchange yapısı karşılamaktadır.
Publisher.py örneği
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print " [x] Sent %r" % (message,) connection.close()
Publisher.py örneği analiz edildiğinde, logs adından bir exchange tanımlanmıştır ve tipi fanout‘tur. Ardından bu exchange’e mesaj gönderilirken basic_publish metodunun parametresi olan exchange, logs atanmıştır.
Geçici Kuyruklar
Önceki örneklerde publisher tarafından bir kuyruk tanımlanmak ve tüm worker/consumer’lar aynı isimli kuyruk’u dinlemekteydi. Bu örnekte ise kuyruk’un RabbitMQ tarafından oluşturulması ve mesaj gönderildikten sonra silinmesi talep edilmektedir.
result = channel.queue_declare(exclusive=True)
Kuyruk tanımlama fonksiyonu çağırıldığında RabbitMQ, kuyruğa random bir isim verecektir. Ayrıca kuyruk dinleme işlemi bittiğinde ise bu kuyruğun silinmesini exclusive=True sağlamaktayız.
Consumer.py kodları
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print ' [*] Waiting for logs. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] %r" % (body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
Kuyruktan mesajı okuyacak olan consumer.py kodları incelendiğinde şimdiye kadarki örneklere nazaran biraz daha farklı bir yapı bulunmaktadır. En önemli değişiklik queue_bind metodudur. Bu metodun çağırılması ile exchange, mesajları belirtilen queue’a aktarmaya başlayacaktır.
DEMO
Soldaki ekranda 2 adet consumer dinleme modundadır. Sağ taraftaki terminalden ise kuyruğa mesaj gönderilmektedir. Bu mesajı kuyruğu dinlemekte olan 2 consumer’a da gönderilmiştir.
Referans : Bu yazı serisinin hazırlanmasında kullanılan en önemli kaynak rabbitmq dökümantasyonudur. Bu dökümantasyona aşağıdaki linkten ulaşabilirsiniz.
http://www.rabbitmq.com/getstarted.html