Skip to main content

heapq

heapq 模块实现了堆队列算法(优先队列),用最小堆维护一个始终能以 O(1) 获取最小元素的列表结构。

heapq

基本操作

import heapq

# 从列表创建最小堆(原地转换,O(n))
data = [5, 1, 8, 3, 7, 2]
heapq.heapify(data)
print(data) # [1, 3, 2, 5, 7, 8]
print(data[0]) # 1(最小元素始终在 data[0])

# 入堆
heapq.heappush(data, 0)
print(data[0]) # 0

# 出堆(弹出最小值)
smallest = heapq.heappop(data)
print(smallest) # 0
print(data[0]) # 1(新的最小值)

heapreplace 与 heappushpop

import heapq

heap = [1, 3, 5, 7, 9]
heapq.heapify(heap)

# heapreplace:先弹出最小值,再压入新值(堆大小不变)
old = heapq.heapreplace(heap, 4)
print(f"弹出: {old}, 堆顶: {heap[0]}") # 弹出: 1, 堆顶: 3

# heappushpop:先压入新值,再弹出最小值
old = heapq.heappushpop(heap, 2)
print(f"弹出: {old}, 堆顶: {heap[0]}") # 弹出: 2, 堆顶: 3
heapreplace vs heappushpop
  • heapreplace(heap, item):先 pop 后 push,返回值可能大于 item
  • heappushpop(heap, item):先 push 后 pop,返回值一定是最小的那个
  • 两者都比分开调用更高效(只做一次堆调整)

nlargest 与 nsmallest

import heapq

scores = [85, 92, 78, 96, 88, 73, 91, 82]

# 取前 N 大 / 前 N 小
print(heapq.nlargest(3, scores)) # [96, 92, 91]
print(heapq.nsmallest(3, scores)) # [73, 78, 82]

# 支持 key 参数
students = [
{"name": "Alice", "score": 92},
{"name": "Bob", "score": 85},
{"name": "Carol", "score": 96},
{"name": "Dave", "score": 78},
]
top2 = heapq.nlargest(2, students, key=lambda s: s["score"])
print([s["name"] for s in top2]) # ['Carol', 'Alice']
何时使用 nlargest/nsmallest
  • n=1 时,直接用 min() / max() 更快
  • n 接近列表长度时,sorted(data)[:n] 更快
  • n 远小于列表长度时,heapq.nlargest / nsmallest 最高效

merge 合并有序序列

import heapq

# 合并多个已排序的迭代器(不会一次性加载到内存)
a = [1, 3, 5, 7]
b = [2, 4, 6, 8]
c = [0, 9, 10]

merged = list(heapq.merge(a, b, c))
print(merged) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# 支持 key 参数和 reverse 参数
logs = [
(1, "服务器A: 启动"),
(3, "服务器A: 就绪"),
]
logs2 = [
(2, "服务器B: 启动"),
(4, "服务器B: 就绪"),
]
for ts, msg in heapq.merge(logs, logs2, key=lambda x: x[0]):
print(f"[{ts}] {msg}")

优先队列模式

import heapq
import itertools

class PriorityQueue:
"""支持更新优先级和删除任务的优先队列"""

REMOVED = "<removed>"

def __init__(self):
self._heap = []
self._entry_map = {}
self._counter = itertools.count()

def push(self, task, priority=0):
if task in self._entry_map:
self.remove(task)
count = next(self._counter)
entry = [priority, count, task]
self._entry_map[task] = entry
heapq.heappush(self._heap, entry)

def remove(self, task):
entry = self._entry_map.pop(task)
entry[-1] = self.REMOVED

def pop(self):
while self._heap:
priority, count, task = heapq.heappop(self._heap)
if task is not self.REMOVED:
del self._entry_map[task]
return priority, task
raise KeyError("队列为空")

pq = PriorityQueue()
pq.push("写文档", priority=3)
pq.push("修 bug", priority=1)
pq.push("写测试", priority=2)
pq.push("修 bug", priority=0) # 更新优先级

print(pq.pop()) # (0, '修 bug')
print(pq.pop()) # (2, '写测试')
print(pq.pop()) # (3, '写文档')

模拟最大堆

import heapq

# heapq 只提供最小堆,取负值即可模拟最大堆
nums = [3, 1, 4, 1, 5, 9, 2, 6]
max_heap = [-x for x in nums]
heapq.heapify(max_heap)

# 取最大值
largest = -heapq.heappop(max_heap)
print(largest) # 9

# 压入新值
heapq.heappush(max_heap, -10)
print(-max_heap[0]) # 10
heapq vs queue.PriorityQueue
特性heapqqueue.PriorityQueue
线程安全是(内置锁)
接口函数式面向对象
性能更快略慢(锁开销)
适用场景单线程算法多线程生产者-消费者