from multiprocessing.dummy import Pool as ThreadPool
def worker(n):
return n + 2
numbers = range(100)
pool = ThreadPool(processes=10)
result = pool.map(worker, numbers)
pool.close()
pool.join()
print(result)
concurrent.futures.Executor:
import concurrent.futures
def worker(n):
return n + 2
numbers = range(100)
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
result = executor.map(worker, numbers)
print(list(result))
queue + Threading:
from collections import deque
import queue
import threading
def do_work(n):
return n + 2
def worker():
while True:
item = q.get()
if item is None:
break
result.append(do_work(item))
q.task_done()
q = queue.Queue()
result = deque()
num_worker_threads = 10
threads = []
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
for item in range(100):
q.put(item)
# block until all tasks are done
q.join()
# stop workers
for i in range(num_worker_threads):
q.put(None)
for t in threads:
t.join()
print(result)
至少三种方法及相应的参考实现:
multiprocessing.dummy:
concurrent.futures.Executor:
queue + Threading:
官方文档:
17.2. multiprocessing — Process-based parallelism — Python 3.5.2 documentation
17.4. concurrent.futures — Launching parallel tasks — Python 3.5.2 documentation
17.7. queue — A synchronized queue class — Python 3.5.2 documentation
17.1. threading — Thread-based parallelism — Python 3.5.2 documentation
应该有好几种线程同步方案吧,比如线程池,互斥锁等
讲个我以前用过的大概思路,主线程声明一个堆栈,设置堆栈的最大堆叠数,10个线程就是10
Queue.Queue(10)把堆栈作为变量传入子线程,子线程工作前,往堆栈里插入1个值Queue.put(1,timeout=30),完成工作后从堆栈里取出一个值Queue.get()设置timeout被阻塞的线程会等待30s后再次尝试插入值