RabbitMQ ile Python Uygulamaları – 4 ( RPC Mesajları)

Merhaba

Şimdiye kadar anlatılanlarda üretici kuyruğa mesaj göndermekte, mesajı alan işçiler ise gerekli süreçlerini tamamlamaktaydı. Peki ya uzak bir sunucu fonksiyon çalıştırılması ve sonucunun alınması gerekiyorsa ? Bu ihtiyaçlar, Remote Procedure Call denilen RPC desteği ile karşılanmaktadır.

RabbitMQ yazı serisinin ikinci bölümününe https://www.mehmetince.net/rabbitmq-ile-python-uygulamalari-3/ adresinden erişebilirsiniz.

Şimdiye kadar anlatılan örneklerde mesajı alan ve belli işlemler gerçekleştiren consumer yazılımdan, mesajı gönderen producer yazılıma bir sonuç dönmemekteydi. Bu dönüşün sağlanabilmesi için client tarafından callback queue denilen kuyruğa mesaj gönderilmelidir.

Mesaj Özellikleri

Daha önceki yazılarda delivery_mode kullanılmıştı. Sık kullanılan mesaj özelliklerinin tanımı aşağıdadır.

delivery_mode = Mesajların kalıcı olması için kullanılır. Bknz : 2. yazı.

content_type = Dönüş yapılan verinin tipi belirlenmektedir. Genellikle application/json kullanılır.

reply_to = Mesajı alan client’ın, mesajı üreten producer’a hangi callback queue’su üzerinden geri dönüş yapacağını belirtir.

correlation_id = Oluşturulan RCP mesajları ve cevapları arasındaki bağlantıyı sağlar.

Correlation ID

Her RPC mesajı için geri dönüş kuyruğu oluşturulur. Bu kuyruklar üzerinde, mesajı gönderilen RPC mesajına ilgili cevap dönülmektedir.

Yoğun trafiklerin olduğu sistemlerde, her RPC mesajı için kuyruk oluşturulması performans problemlerine neden olabilir. Bu nedenle geri dönen sonuçların tutulacağı tek bir kuyruk oluşturulabilir. Bu durumda ise geri dönüş mesajlarının ait olduğu RPC taleplerinin tespiti problemi yaşanacaktır. Bu  problem correlation_id ile yapılarak çözülebilir.

ÖZET

rabbitmq rpc diagram

RCP request – response mantığı yukarıdaki resim ile anlatılmaktadır. Adım adım analizi aşağıdaki şekildedir.

  1. Client çalışmaya başladığında exclusive (3. yazıyı hatırlayın) modda bir callback queue’ı oluşturur.
  2. Client,  içeriğinde 2 adet özellik taşıyan bir RPC talebi oluşturur. reply_to ve correlation_id. reply_to geri dönüş kuyruğunu ifade ederken, correlation_id bu RPC talebine özgü olan bir key’i ifade etmektedir.
  3. Talep rpc_queue kuyruğuna ulaşır.
  4. RPC Server ise bu kuyruktan mesaj beklemektedir. Mesaj geldiğinde yapacağı işleri gerçekleştirir ve sonucu  reply_to bölümünde tanımlanmış kuyruğa gönderir.
  5. Bu sırada client callback kuyruğundan geri dönüş datası beklemektedir. Mesaj geldiğinde ilk yaptığı işlem correlation_id kontrolüdür. Bu kontrol ile dönen cevabın kendisine ait olup olmadığını tespit eder.
  6. Eğer 5. adımda TRUE sonuç alırsa gelen cevabı alarak işlemlerine devam eder.

rpc_server.p kodu

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):
    n = int(body)

    print " [.] fib(%s)"  % (n,)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                     props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')

print " [x] Awaiting RPC requests"
channel.start_consuming()

rpc_client.py kodu

#!/usr/bin/env python
import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print " [x] Requesting fib(30)"
response = fibonacci_rpc.call(30)
print " [.] Got %r" % (response,)

Referans : Bu yazı serisinin hazırlanmasında kullanılan en önemli kaynak rabbitmq dökümantasyonudur. Bu dökümantasyona aşağıdaki linkten ulaşabilirsiniz.

http://www.rabbitmq.com/getstarted.html