Skip to main content

并发

multiprocessing 模块

进程是系统独立安排和分配系统资源(CPU、内存)的基本单位,操作系统以进程为单位分配存储空间,操作系统管理所有进程的执行,为它们合理的分配资源。

一个进程就是 macOS 中的“活动监视器”、Windows 中的“任务管理器”的一个执行程序。

Python 既支持多进程又支持多线程。

多进程

进程之间是相互独立的,Python 中的进程通信一般由进程对 Queue 完成。

进程绕过了全局解释器锁。因此,多进程模块允许程序员充分利用特定机器上的多个处理器。它在 Unix 和 Windows 上都能运行。

进程的数量等于 CPU 核心的数量,这是最有效的。如果核数太多,就不能充分利用核数。如果太少,会造成进程切换,增加程序的运行时间。

multiprocessing:Multiprocessing Module Code Documentation

from multiprocessing import Pool

def f(vaule):
x = vaule[0]
y = vaule[1]
return x*y

if __name__ == '__main__':
p = Pool(16) # new 16 process pools , because i have 16 cpu
print(p.map(f, [(1,1), (2,2), (3,3)])) # take in data
p.close() # close pool

# [1, 4, 9]

我们来完成 1~100000000 求和的计算密集型任务,循环解决,暂时也不考虑列表切片操作花费的时间,只是把做运算和合并运算结果的时间统计出来。

from time import time


def main():
total = 0
number_list = [x for x in range(1, 100000001)]
start = time()
for number in number_list:
total += number
print(total)
end = time()
print('Execution time: %.3fs' % (end - start))

main()
# 5000000050000000
# Execution time: 6.798s

利用多进程“分而治之”,

当我们将这个任务分解到 8 个进程中去执行:

from multiprocessing import Process, Queue
from time import time

core_num = 8


def task_handler(curr_list, result_queue):
total = 0
for number in curr_list:
total += number
result_queue.put(total)


def main():
processes = []
number_list = [x for x in range(1, 100000001)]
result_queue = Queue()
index = 0
# 启动core_num(8)个进程将数据切片后进行运算
index_batch = int(100000000 / core_num)
for _ in range(core_num):
p = Process(target=task_handler,
args=(number_list[index:index + index_batch], result_queue))
index += index_batch
processes.append(p)
p.start()
# 开始记录所有进程执行完成花费的时间
start = time()
for p in processes:
p.join()
# 合并执行结果
total = 0
while not result_queue.empty():
total += result_queue.get()
print(total)
end = time()
print('Execution time: ', (end - start), 's', sep='')


if __name__ == '__main__':
main()

以上代码保存为 multi_process.py

!python multi_process.py
# 5000000050000000
# Execution time: 0.7936668395996094s

明显,多进程更快。

使用多进程后由于获得了更多的 CPU 执行时间以及更好的利用了 CPU 的多核特性,明显的减少了程序的执行时间,而且计算量越大效果越明显。

threading 模块

线程是系统调度资源的最小单位(CPU 通过计时器来切换线程)

在 Python 中,同个时间只有一个线程在运行

当然,如果你运行大量的 I/O 任务,多进程依然是最好的选择

线程数等于 CPU 内核数的两倍是最高效的。

GIL 是一个防止多个线程同时执行 Python 字节码的互斥锁。之所以需要这种锁,主要是因为 CPython 的内存管理不是线程安全的

在这种环境下,GIL 限制解释器本身只能有一个线程运行,而且任何 Python 解释器级别的操作都是序列化的,因此任何时候都只能有一条语句抛出异常。与异常相关的共享变量也因此受到保护。

线程间通信的目的主要是为了线程同步,因此线程没有像进程通信那样用于数据交换的通信机制。

Python 的标准库提供了两个模块:_thread 和 threading,_thread 是低级模块,threading 是高级模块,对_thread 进行了封装。绝大多数情况下,我们只需要使用 threading 这个高级模块。

threading:Threading Multiprocessing Module Code Documentation

多线程


import time
import threading


def test_thread(para='hi', sleep=3):
time.sleep(sleep)
print(para)


def main():
# create thread
thread_hi = threading.Thread(target=test_thread)
thread_hello = threading.Thread(target=test_thread, args=('hello', 1))
# run thread
thread_hi.start()
thread_hello.start()
print('Main thread has ended!')


if __name__ == '__main__':
main()

如下所示的界面中,有“下载”和“关于”两个按钮,用休眠的方式模拟点击“下载”按钮会联网下载文件需要耗费 10 秒的时间,当点击“下载”按钮后,整个任务阻塞:

import time
import tkinter
import tkinter.messagebox


def download():
# 模拟下载任务需要花费5秒钟时间
time.sleep(5)
tkinter.messagebox.showinfo('提示', '下载完成!')


def show_about():
tkinter.messagebox.showinfo('关于', '作者: 123(v1.0)')


def main():
top = tkinter.Tk()
top.title('单线程')
top.geometry('400x400')
top.wm_attributes('-topmost', True)

panel = tkinter.Frame(top)
button1 = tkinter.Button(panel, text='下载', command=download)
button1.pack(side='left')
button2 = tkinter.Button(panel, text='关于', command=show_about)
button2.pack(side='right')
panel.pack(side='bottom')

tkinter.mainloop()


if __name__ == '__main__':
main()

使用多线程后,不会阻塞了主线程:

import time
import tkinter
import tkinter.messagebox
from threading import Thread


def main():

class DownloadTaskHandler(Thread):

def run(self):
time.sleep(5)
tkinter.messagebox.showinfo('提示', '下载完成!')
# 启用下载按钮
button1.config(state=tkinter.NORMAL)

def download():
# 禁用下载按钮
button1.config(state=tkinter.DISABLED)
# 通过daemon参数将线程设置为守护线程(主程序退出就不再保留执行)
# 在线程中处理耗时间的下载任务
DownloadTaskHandler(daemon=True).start()

def show_about():
tkinter.messagebox.showinfo('关于', '作者: 123(v1.0)')

top = tkinter.Tk()
top.title('多线程')
top.geometry('400x400')
top.wm_attributes('-topmost', 1)

panel = tkinter.Frame(top)
button1 = tkinter.Button(panel, text='下载', command=download)
button1.pack(side='left')
button2 = tkinter.Button(panel, text='关于', command=show_about)
button2.pack(side='right')
panel.pack(side='bottom')

tkinter.mainloop()


if __name__ == '__main__':
main()

会看到弹出的窗口是多模态的,点击下载按钮不影响其他按钮操作。

Python 的多线程并不能发挥 CPU 的多核特性,这一点只要启动几个执行死循环的线程就可以得到证实了。之所以如此,是因为 Python 的解释器有一个“全局解释器锁”(GIL)的东西,任何线程执行前必须先获得 GIL 锁,然后每执行 100 条字节码,解释器就自动释放 GIL 锁,让别的线程有机会执行,这是一个历史遗留问题。

Python 解释器由于设计时有 GIL 全局锁,导致了多线程无法利用多核。多线程的并发在 Python 中就是一个美丽的梦。

多进程是有效的。

asyncio 模块

协程是编写并发代码的库,是构建 IO 密集型和高级结构化网络代码的最佳选择。

例程的运行方式是通过代码主动切换状态并等待处理,因此效率更高,语法也更详细。循环对象需要处于活动状态:创建、设置、提交、等待运行和停止。

例行程序的最佳数量取决于内存使用情况。

asyncio 模块包含了一些工具,用于编写异步代码。

协程的工作原理是事件循环,事件循环是一个无限循环,它等待事件并执行它们。

每次任务会被挂起至事件循环队列中,然后按顺序执行。

await 关键字用于挂起协程,直到它被调用。

async 关键字用于定义协程。

asyncio 模块用于实现异步编程。

asyncio:asyncio Multiprocessing Module Code Documentation

import asyncio

class TestA:
def __init__(self,loop) -> None:
self.loop = loop
asyncio.set_event_loop(loop=self.loop) # step 3.1

async def run_page(self,tid): # step 7
print(tid)
# 此处编写爬虫代码
return tid

async def close(self,):
for i in asyncio.all_tasks(): # step 9.1
i.cancel()
self.loop.stop() # step 9.2


def test():
get_async_loop = asyncio.new_event_loop() # step 1
asyncio.set_event_loop(get_async_loop) # step 2

async def spider(task_obj):
async_task = [asyncio.ensure_future(task_obj.run_page(1)),
asyncio.ensure_future(task_obj.run_page(2)),] # step 6
await asyncio.wait(async_task) # step 8

await task_obj.close() # step 9

task_obj = TestA(get_async_loop) #step 3
asyncio.run_coroutine_threadsafe(spider(task_obj), loop=get_async_loop) #step 4
get_async_loop.run_forever() # step 5

test()

生成器函数与协程(注:函数)非常相似,它们 yield 多次,它们具有多个入口点,并且它们的执行可以被挂起。唯一的区别是生成器函数不能控制在它在 yield 后交给哪里继续执行,控制权总是转移到生成器的调用者

在 Python 创建协程时,task 是 future 的子类,所以 task 继承了 future 的属性和方法。几乎没有不同。

🚧queue 模块

🚧concurrent 模块