Introduction
The queue module provides thread-safe queue implementations: Queue (FIFO), LifoQueue (LIFO), and PriorityQueue for organized message passing.
Queue (FIFO)
import queue
import threading
q = queue.Queue()
def producer():
for i in range(10):
q.put(i)
q.put(None) # Sentinel
def consumer():
while True:
item = q.get()
if item is None:
break
print(f"Got: {item}")
q.task_done()
threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()
LifoQueue (Stack)
import queue
stack = queue.LifoQueue()
stack.put(1)
stack.put(2)
stack.put(3)
print(stack.get()) # 3 (last in)
print(stack.get()) # 2
print(stack.get()) # 1
PriorityQueue
import queue
class Task:
def __init__(self, priority, description):
self.priority = priority
self.description = description
def __lt__(self, other):
return self.priority < other.priority
pq = queue.PriorityQueue()
pq.put(Task(3, "Low priority"))
pq.put(Task(1, "High priority"))
pq.put(Task(2, "Medium priority"))
while not pq.empty():
task = pq.get()
print(task.description)
Thread-Safe Operations
import queue
q = queue.Queue(maxsize=10)
# Safe operations
q.put(item, block=True, timeout=5)
q.put_nowait(item) # Non-blocking
item = q.get(block=True, timeout=5)
item = q.get_nowait() # Non-blocking
q.empty() # Check if empty
q.full() # Check if full
q.qsize() # Approximate size
Queue with Multiple Producers
import queue
import threading
import time
q = queue.Queue()
def worker(n):
while True:
item = q.get()
if item is None:
q.put(None) # Pass sentinel to next worker
break
print(f"Worker {n} processed {item}")
q.task_done()
# Start multiple workers
threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
for t in threads:
t.start()
# Produce items
for i in range(10):
q.put(i)
q.put(None) # Signal end
for t in threads:
t.join()
Practice Problems
- Implement producer-consumer with Queue
- Use LifoQueue for undo functionality
- Implement PriorityQueue for task scheduling
- Handle Queue exceptions properly
- Build multi-threaded data pipeline