如果是 CPU 密集型任务,最直接的突破方式是把多线程改为多进程,利用 multiprocessing 模块或 concurrent.futures 让每个进程拥有独立的解释器和内存空间,从而绕过 GIL 限制。
核心观点:只有在 CPU 密集型场景下才值得用多进程突破 GIL,I/O 密集型任务维持多线程或异步即可。
- 先定位:确认瓶颈是 CPU 计算还是 I/O 等待
- 先做:使用 ProcessPoolExecutor 或 multiprocessing.Pool 替代 threading.Thread
- 再验证:观察系统监控中是否启动了多个 Python 进程且 CPU 利用率上升
核心代码示例
下面是一个最小可用的多进程示例,推荐优先使用 concurrent.futures 接口,语法更简洁:
from concurrent.futures import ProcessPoolExecutor
import os
def work(n):
return n * n
if __name__ == '__main__':
# Windows 下建议显式设置启动模式,避免递归创建进程
# multiprocessing.set_start_method('spawn')
with ProcessPoolExecutor() as executor:
results = list(executor.map(work, range(10000)))
print(results[:5])性能对比验证
为了确认多进程是否真正提升性能,可以通过以下脚本对比多线程与多进程在 CPU 密集型任务下的耗时:
import time
import threading
import multiprocessing
def cpu_task():
sum_val = 0
for i in range(10000000):
sum_val += i
return sum_val
def run_thread():
start = time.time()
threads = []
for _ in range(4):
t = threading.Thread(target=cpu_task)
t.start()
threads.append(t)
for t in threads:
t.join()
print(f"Thread Time: {time.time() - start:.2f}s")
def run_process():
start = time.time()
processes = []
for _ in range(4):
p = multiprocessing.Process(target=cpu_task)
p.start()
processes.append(p)
for p in processes:
p.join()
print(f"Process Time: {time.time() - start:.2f}s")
if __name__ == '__main__':
run_thread()
run_process()运行后观察输出,CPU 密集型任务下多进程耗时通常显著低于多线程。
为什么会这样
CPython 解释器中存在全局解释器锁(GIL),同一时刻只允许一个线程在 CPU 上执行字节码。这意味着即使你有多个 CPU 核心,多线程的 Python 程序在计算密集型任务上也只能用到一个核心。
多进程方案通过启动多个独立的 Python 解释器进程,每个进程都有自己的 GIL,从而能够真正并行利用多核 CPU。
分步处理
1. 确认任务类型
如果任务主要是网络请求、文件读写,多线程或 asyncio 通常足够,不需要多进程。只有涉及大量数学计算、图像处理、数据压缩等 CPU 消耗时,才考虑多进程。
2. 修改代码结构
将线程池改为进程池。注意必须在 if __name__ == '__main__': 保护块下启动进程,尤其在 Windows 和 macOS 上,否则会导致进程无限递归创建。
3. 调整进程数量
通常进程数设置为 CPU 核心数即可。过多进程会增加上下文切换开销,反而降低性能。可以使用 os.cpu_count() 获取默认值。
进程间状态管理
进程间内存隔离,不能像线程那样直接修改变量。如果需要共享状态,需要使用 Queue 或 Manager:
from multiprocessing import Process, Manager
def worker(shared_dict, key, value):
shared_dict[key] = value
if __name__ == '__main__':
with Manager() as manager:
shared_dict = manager.dict()
p1 = Process(target=worker, args=(shared_dict, 'a', 1))
p2 = Process(target=worker, args=(shared_dict, 'b', 2))
p1.start()
p2.start()
p1.join()
p2.join()
print(shared_dict)注意:使用 Manager 会带来序列化开销,大数据量传输可能导致性能下降,仅适用于必要的状态同步。
怎么验证是否生效
运行程序后,打开系统监控工具:
- Linux/macOS:使用
htop或top -H -p <pid>,观察是否有多个 Python 进程占用 CPU,且总和超过 100%(单核为 100%)。 - Windows:任务管理器中查看 Python 进程是否有多个实例,CPU 利用率是否跨越多个逻辑核心。
如果只有一个进程占用高 CPU,其他核心空闲,说明 GIL 限制仍然存在,可能进程未成功启动或多线程未被替换。
常见坑
- 全局状态共享困难: 进程间内存隔离,不能像线程那样直接修改变量。需要使用 Queue、Pipe 或 Manager 来通信,这会增加序列化开销。
- 启动开销: 创建进程比创建线程重得多。如果任务执行时间极短,进程创建消耗可能大于计算节省的时间。
- 可 pickle 对象: 传递给子进程的函数和参数必须能被 pickle 序列化,lambda 函数、局部函数通常无法直接传递。
- Windows 启动模式: Windows 默认使用 spawn 模式,必须在 main 保护块下启动,否则报错。Linux 默认 fork 模式较快但需注意资源继承。
- 资源泄漏: 进程结束时需确保正确 join 或关闭池,否则可能留下僵尸进程。
参考来源
- Python 官方文档 - multiprocessing 模块:https://docs.python.org/3/library/multiprocessing.html
- Python 官方文档 - concurrent.futures:https://docs.python.org/3/library/concurrent.futures.html
- Python Wiki - GlobalInterpreterLock:https://wiki.python.org/moin/GlobalInterpreterLock