queue
queue 模块实现了多生产者、多消费者的线程安全队列,适用于在多线程间安全地传递消息。本模块提供三种按取数顺序区分的队列:FIFO、LIFO 和优先级队列,以及一个简化的无界 FIFO 队列 SimpleQueue。
Queue(FIFO)
Queue 为先进先出队列。put(item, block=True, timeout=None) 将元素放入队列;get(block=True, timeout=None) 取出并返回一个元素。block=True 时会在队列满/空时阻塞;timeout 为正数时最多等待该秒数,超时抛出 Full 或 Empty。put_nowait(item) 等价于 put(item, block=False),get_nowait() 等价于 get(False)。
import queue
q = queue.Queue(maxsize=2)
q.put(1)
q.put(2)
# q.put(3) # 无 timeout 时会一直阻塞(队列已满)
q.put(3, timeout=0.5) # 0.5 秒内无空位则 raise queue.Full
print(q.get()) # 1
print(q.get()) # 2
# 队列已空,下面会阻塞或抛异常:
# q.get() # 阻塞
# q.get_nowait() # 立即可用时返回,否则 raise queue.Empty
LifoQueue 与 PriorityQueue
LifoQueue 为后进先出(栈式)。PriorityQueue 按优先级取数,值最小的先被取出;条目常用 (priority_number, data) 形式的元组,若 data 不可比较,可用 field(compare=False) 的 dataclass 包装。
import queue
# LIFO:最近放入的先被取出
lq = queue.LifoQueue()
lq.put(1); lq.put(2); lq.put(3)
print(lq.get(), lq.get(), lq.get()) # 3 2 1
# 优先级队列:数字小的先出
pq = queue.PriorityQueue()
pq.put((2, "b"))
pq.put((1, "a"))
pq.put((3, "c"))
print(pq.get()) # (1, 'a')
print(pq.get()) # (2, 'b')
任务完成跟踪 task_done / join
task_done() 表示当前通过 get() 取出的任务已处理完毕;join() 会阻塞直到队列中每个放入的条目都对应调用了 task_done()。适用于“等待所有排队任务被消费者处理完”的场景。
import threading
import queue
q = queue.Queue()
def worker():
while True:
item = q.get()
print(f"Working on {item}")
q.task_done()
threading.Thread(target=worker, daemon=True).start()
for i in range(3):
q.put(i)
q.join()
print("All work completed")
SimpleQueue
SimpleQueue 是无界的 FIFO 队列,没有 task_done()、join() 等接口,但实现可重入:同一线程内 put/get 可被另一 put 打断而不会死锁,适合在析构或 weakref 回调中使用。
import queue
sq = queue.SimpleQueue()
sq.put(1)
sq.put(2)
print(sq.get()) # 1
print(sq.empty()) # False(仅“大致”状态,不保证后续 get 不阻塞)
Empty / Full、与 multiprocessing.Queue / deque 对比
- Empty:在空队列上调用非阻塞
get()或get_nowait()时抛出。 - Full:在已满队列上调用非阻塞
put()或put_nowait()时抛出。 - multiprocessing.Queue:用于多进程间通信;
queue.Queue用于多线程。 - collections.deque:无界双端队列,
append/popleft原子且无需锁,但不提供阻塞、task_done/join等语义;需要“线程安全队列 + 阻塞”时用queue.Queue。