Introduction
Message queues enable asynchronous communication between services. This tutorial covers RabbitMQ and Kafka, the most popular message queue systems, with Python examples.
RabbitMQ Implementation
# producer.py
import pika
import json
import time
def connect():
credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters(
'localhost',
5672,
'/',
credentials
)
return pika.BlockingConnection(parameters)
def publish_message(queue: str, message: dict):
connection = connect()
channel = connection.channel()
channel.queue_declare(queue=queue, durable=True)
channel.basic_publish(
exchange='',
routing_key=queue,
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # persistent
content_type='application/json'
)
)
connection.close()
def publish_task(task_type: str, payload: dict):
message = {
'type': task_type,
'payload': payload,
'timestamp': time.time()
}
publish_message('tasks', message)
# consumer.py
import pika
import json
def callback(ch, method, properties, body):
message = json.loads(body)
print(f"Received: {message}")
# Process message
process_message(message)
# Acknowledge
ch.basic_ack(delivery_tag=method.delivery_tag)
def consume_messages(queue: str):
connection = connect()
channel = connection.channel()
channel.queue_declare(queue=queue, durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue=queue, on_message_callback=callback)
print('Waiting for messages...')
channel.start_consuming()
def process_message(message: dict):
task_type = message.get('type')
payload = message.get('payload')
if task_type == 'email':
send_email(payload)
elif task_type == 'notification':
send_notification(payload)
Kafka Implementation
# kafka_producer.py
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None
)
def send_message(topic: str, key: str, message: dict):
producer.send(topic, key=key, value=message)
producer.flush()
def send_order(order_data: dict):
send_message('orders', str(order_data['id']), order_data)
# kafka_consumer.py
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['localhost:9092'],
group_id='order-processor',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=True
)
def process_order(order: dict):
print(f"Processing order: {order['id']}")
for message in consumer:
process_order(message.value)
Advanced Patterns
# pub_sub.py - RabbitMQ Pub/Sub
def setup_fanout_exchange(exchange_name: str):
connection = connect()
channel = connection.channel()
channel.exchange_declare(
exchange=exchange_name,
exchange_type='fanout'
)
return channel
def publish_to_exchange(exchange: str, message: dict):
channel = setup_fanout_exchange(exchange)
channel.basic_publish(
exchange=exchange,
routing_key='',
body=json.dumps(message)
)
# consumer_groups.py - Kafka consumer groups
consumer = KafkaConsumer(
'orders',
group_id='order-processors',
bootstrap_servers=['localhost:9092']
)
Practice Problems
- Implement a dead letter queue for failed messages
- Create a message schema with versioning support
- Implement message ordering within partitions
- Add retry logic with exponential backoff
- Build a message batching system for high throughput