← Back to Python

All Topics

Advertisement

Learn/Python/Message Queues

Message Queues - RabbitMQ, Kafka, Producer-Consumer

Topic: Message Queue Systems

Advertisement

Introduction

Message queues enable asynchronous communication between services. This tutorial covers RabbitMQ and Kafka, the most popular message queue systems, with Python examples.

RabbitMQ Implementation

# producer.py
import pika
import json
import time

def connect():
    credentials = pika.PlainCredentials('guest', 'guest')
    parameters = pika.ConnectionParameters(
        'localhost', 
        5672,
        '/',
        credentials
    )
    return pika.BlockingConnection(parameters)

def publish_message(queue: str, message: dict):
    connection = connect()
    channel = connection.channel()
    
    channel.queue_declare(queue=queue, durable=True)
    
    channel.basic_publish(
        exchange='',
        routing_key=queue,
        body=json.dumps(message),
        properties=pika.BasicProperties(
            delivery_mode=2,  # persistent
            content_type='application/json'
        )
    )
    connection.close()

def publish_task(task_type: str, payload: dict):
    message = {
        'type': task_type,
        'payload': payload,
        'timestamp': time.time()
    }
    publish_message('tasks', message)

# consumer.py
import pika
import json

def callback(ch, method, properties, body):
    message = json.loads(body)
    print(f"Received: {message}")
    
    # Process message
    process_message(message)
    
    # Acknowledge
    ch.basic_ack(delivery_tag=method.delivery_tag)

def consume_messages(queue: str):
    connection = connect()
    channel = connection.channel()
    
    channel.queue_declare(queue=queue, durable=True)
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(queue=queue, on_message_callback=callback)
    
    print('Waiting for messages...')
    channel.start_consuming()

def process_message(message: dict):
    task_type = message.get('type')
    payload = message.get('payload')
    
    if task_type == 'email':
        send_email(payload)
    elif task_type == 'notification':
        send_notification(payload)

Kafka Implementation

# kafka_producer.py
from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8') if k else None
)

def send_message(topic: str, key: str, message: dict):
    producer.send(topic, key=key, value=message)
    producer.flush()

def send_order(order_data: dict):
    send_message('orders', str(order_data['id']), order_data)

# kafka_consumer.py
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['localhost:9092'],
    group_id='order-processor',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest',
    enable_auto_commit=True
)

def process_order(order: dict):
    print(f"Processing order: {order['id']}")

for message in consumer:
    process_order(message.value)

Advanced Patterns

# pub_sub.py - RabbitMQ Pub/Sub
def setup_fanout_exchange(exchange_name: str):
    connection = connect()
    channel = connection.channel()
    
    channel.exchange_declare(
        exchange=exchange_name,
        exchange_type='fanout'
    )
    
    return channel

def publish_to_exchange(exchange: str, message: dict):
    channel = setup_fanout_exchange(exchange)
    channel.basic_publish(
        exchange=exchange,
        routing_key='',
        body=json.dumps(message)
    )

# consumer_groups.py - Kafka consumer groups
consumer = KafkaConsumer(
    'orders',
    group_id='order-processors',
    bootstrap_servers=['localhost:9092']
)

Practice Problems

  1. Implement a dead letter queue for failed messages
  2. Create a message schema with versioning support
  3. Implement message ordering within partitions
  4. Add retry logic with exponential backoff
  5. Build a message batching system for high throughput

Advertisement

Advertisement

Need More Practice?

Get personalized Python help from ChatWhole's AI-powered platform.

Get Expert Help →