RabbitMQ ile Python Uygulamaları – 2

Merhaba

RabbitMQ yazı serisinin ilk bölümününe https://www.mehmetince.net/rabbitmq-ile-python-uygulamalari-1/ adresinden erişebilirsiniz.

İlk yazıda RabbitMQ’nun yapısında bahsedilmişti. Basit bir uygulama ile rabbit kuyruğuna mesaj gönderilmiş ve bu mesaj başka bir uygulama ile alınmıştı.

Bu yazıda ise iş kuyruğu oluşturulacak, bu işleri alan birden fazla worker yapısından bahsedilecektir.

Hazırlık

RabbitMQ’nun uzun zaman işler için kullanıma uygun olduğundan bahsetmiştik. Örneklerde uzun zaman iş alan süreçleri simule etmek için time.sleep() kullanılacaktır. Bu sayede uygulama, sanki 30 saniye süren bir iş varmış gibi davranacaktır.

Mesaj Takibi

Farz edelim ki, kuyruk’a gelen bir mesajı alıp işe koyuna bir worker, iş tamamlanmadan servis dışı kalmış olsun. Yani worker uygulamasında elektriklerin kesilmesi, serviş dışı saldırıları vs gibi herhangi bir ekstra durum gelişirse gönderilen işin tamalanıp tamamlanmadığı nasıl takip edilecektir ? İlk yazıda anlatılan RabbitMQ uygulamasında, kuyruğa gelen mesaj worker’a iletildiğinde mesaj kuyruktan silinmekteydi. Bu durumda sürecin takibini yapmak neredeyse imkansız bir hal almaktaydı.

Mesaj’ların kaybolmasını engellemek için “acknowledgments” özelliği mevcuttur. Mesajı alıp işe koyulan işçi, süreci başarı ile tamamladığında RabbitMQ’ya “mesajı hafızadan silebilirsin, benim işim tamam” diyebilmektedir.

Mesajların takibini sağladıktan sonra bir diğer tehlike ise RabbitMQ servisinde veya sunucusunda yaşanacak problemlerdir. Herhangi bir servış dışı durum gelişirse bu sefer RabbitMQ hafızasında bulunan mesajları kaybedecektir. Bunun önüne geçebilmek için kuyruk tanımlaması yapılırken durable=True parametresi verilebilir. Ardından gönderilen mesajlar kalıcı olarak işaretlenmelidir.Bu işlemi ise mesaj publish edilirken delivery_mode = 2 parametresi verilmelidir.

Round-robin dispatching

RabbitMQ’da iş kuyruklarının kullanılmasının avantajlarından biriside birden fazla worker’ı pararlel çalıştırabilmektir. Şöyle ki; bir adet publisher.py saniyede 1 adet olmak üzere işleri kuyruk’a gönderdiğini farz edelim. 3 farklı sunucuda, her birinde bir adet olmak üzere worker.py olduğunu düşünelim. Kuyruk’a gelen her işin tamamlanması ortalama 10 saniye aldığını farz edersek, ortaya performans problemi çıkmaktadır. Bu noktada RabbitMQ işleri, worker’lara eş olarak dağıtacaktır. Adım adım analiz edecek olursak, kuyruk’a mesaj geldiğinde bunu worker1.py işçisine gönderir, artık worker1.py işçisi 10 saniye süren bir işe girişmiştir. Aradan geçen 1 saniye sonra kuyruk’a yeni mesaj gelir. RabbitMQ bu mesajı worker2.py işçisine gönderecektir. Bu sayede kuyrukta uzun süre beklemelerin ve adaletsiz iş dağılımının önüne geçilmiş olunur.

Publisher.py Kodları

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print " [x] Sent %r" % (message,)
connection.close()

Worker.py Kodları

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

Dikkat ederseniz, worker.py’nın callback fonksiyonunun son satırında basic_ack fonksiyonu çağırılmaktadır. Bir önceki yazıdan bildiğiniz üzere ise callback mesaj gelince, mesajı alan ve işe koyulan metod olarak anlatılmıştır. İş tamamlandığında RabbitMQ’ya iş tamamlandı mesajı bu şekilde gönderilebilir.

Her iki python kodu içinde queu_declare durable=True parametresi ile çalıştırılmıştır. Bu mesajların esktra durumlarda kaybolmasının önüne geçmek üzere bulunmaktadır.

Yazının 3. bölümü için : https://www.mehmetince.net/rabbitmq-ile-python-uygulamalari-3/

Referans : Bu yazı serisinin hazırlanmasında kullanılan en önemli kaynak rabbitmq dökümantasyonudur. Bu dökümantasyona aşağıdaki linkten ulaşabilirsiniz.

http://www.rabbitmq.com/getstarted.html