Python并发编程模式:多线程、多进程与协程

张开发
2026/4/10 9:13:35 15 分钟阅读

分享文章

Python并发编程模式:多线程、多进程与协程
Python并发编程模式多线程、多进程与协程1. 背景介绍在现代软件开发中充分利用多核CPU和处理高并发请求是提升应用性能的关键。Python提供了多种并发编程模型包括多线程、多进程和协程每种模型都有其适用场景和优缺点。本文将深入探讨Python并发编程的核心概念、实现模式、性能特点以及最佳实践帮助开发者选择合适的并发策略。2. 核心概念与技术2.1 Python并发模型多线程Threading共享内存空间适合I/O密集型任务多进程Multiprocessing独立内存空间适合CPU密集型任务协程Asyncio单线程异步适合高并发I/O操作GIL全局解释器锁限制多线程的CPU并行执行2.2 并发vs并行特性并发Concurrency并行Parallelism定义同时处理多个任务同时执行多个任务资源单核/多核多核适用I/O密集型CPU密集型实现线程/协程多进程2.3 选择指南场景推荐方案原因I/O密集型网络请求协程/多线程避免阻塞高效利用等待时间CPU密集型计算多进程绕过GIL利用多核混合场景进程线程/协程各取所长高并发Web服务协程线程池最大吞吐量3. 代码实现3.1 多线程编程importthreadingimporttimeimportqueuefromconcurrent.futuresimportThreadPoolExecutor,as_completedclassThreadSafeCounter:线程安全的计数器def__init__(self):self.value0self._lockthreading.Lock()defincrement(self):withself._lock:self.value1returnself.valuedefget(self):withself._lock:returnself.valueclassWorkerThread(threading.Thread):自定义工作线程def__init__(self,task_queue,result_queue):super().__init__(daemonTrue)self.task_queuetask_queue self.result_queueresult_queue self._stop_eventthreading.Event()defrun(self):whilenotself._stop_event.is_set():try:taskself.task_queue.get(timeout1)iftaskisNone:break# 执行任务resultself.process_task(task)self.result_queue.put(result)self.task_queue.task_done()exceptqueue.Empty:continuedefprocess_task(self,task):处理单个任务time.sleep(0.1)# 模拟工作returnfProcessed:{task}defstop(self):self._stop_event.set()# 使用ThreadPoolExecutordefconcurrent_map(func,items,max_workers4):并发执行map操作withThreadPoolExecutor(max_workersmax_workers)asexecutor:futures{executor.submit(func,item):itemforiteminitems}results[]forfutureinas_completed(futures):try:resultfuture.result()results.append(result)exceptExceptionase:print(fError processing{futures[future]}:{e})returnresults# 使用示例if__name____main__:# 线程池示例deffetch_data(url):time.sleep(0.5)returnfData from{url}urls[furl_{i}foriinrange(10)]resultsconcurrent_map(fetch_data,urls,max_workers5)print(fFetched{len(results)}results)3.2 多进程编程importmultiprocessingasmpfrommultiprocessingimportPool,Process,Queue,Managerimporttimeimportosdefcpu_intensive_task(n):CPU密集型任务result0foriinrange(n):resulti**2returnresultdefworker_process(task_queue,result_queue):工作进程whileTrue:tasktask_queue.get()iftaskisNone:break# 执行任务resultcpu_intensive_task(task)result_queue.put({pid:os.getpid(),task:task,result:result})classProcessPool:进程池实现def__init__(self,num_processesNone):self.num_processesnum_processesormp.cpu_count()self.poolPool(processesself.num_processes)defmap(self,func,iterable,chunksize1):并行mapreturnself.pool.map(func,iterable,chunksizechunksize)defimap_unordered(self,func,iterable,chunksize1):无序并行mapreturnself.pool.imap_unordered(func,iterable,chunksizechunksize)defclose(self):self.pool.close()self.pool.join()# 共享内存示例defshared_memory_example():使用共享内存withManager()asmanager:shared_listmanager.list()shared_dictmanager.dict()defworker(n,shared_list,shared_dict):shared_list.append(n)shared_dict[n]n**2processes[]foriinrange(5):pProcess(targetworker,args(i,shared_list,shared_dict))processes.append(p)p.start()forpinprocesses:p.join()print(fShared list:{list(shared_list)})print(fShared dict:{dict(shared_dict)})# 使用示例if__name____main__:# 进程池示例numbers[1000000,2000000,3000000,4000000,5000000]# 串行执行starttime.time()serial_results[cpu_intensive_task(n)forninnumbers]serial_timetime.time()-start# 并行执行starttime.time()poolProcessPool()parallel_resultspool.map(cpu_intensive_task,numbers)pool.close()parallel_timetime.time()-startprint(fSerial time:{serial_time:.2f}s)print(fParallel time:{parallel_time:.2f}s)print(fSpeedup:{serial_time/parallel_time:.2f}x)3.3 协程编程importasyncioimportaiohttpimporttimeasyncdeffetch_url(session,url):异步获取URLasyncwithsession.get(url)asresponse:returnawaitresponse.text()asyncdeffetch_all_urls(urls):并发获取多个URLasyncwithaiohttp.ClientSession()assession:tasks[fetch_url(session,url)forurlinurls]returnawaitasyncio.gather(*tasks)asyncdefproducer_consumer_example():生产者-消费者模式queueasyncio.Queue(maxsize10)asyncdefproducer():foriinrange(20):awaitqueue.put(i)awaitasyncio.sleep(0.1)awaitqueue.put(None)# 结束信号asyncdefconsumer():whileTrue:itemawaitqueue.get()ifitemisNone:breakprint(fConsumed:{item})awaitasyncio.sleep(0.2)awaitasyncio.gather(producer(),consumer())# 使用示例if__name____main__:urls[https://httpbin.org/delay/1,https://httpbin.org/delay/2,https://httpbin.org/delay/1]starttime.time()resultsasyncio.run(fetch_all_urls(urls))print(fFetched{len(results)}pages in{time.time()-start:.2f}s)3.4 混合并发模式importasynciofromconcurrent.futuresimportThreadPoolExecutor,ProcessPoolExecutorimportmultiprocessingasmpclassHybridExecutor:混合执行器结合线程池和进程池def__init__(self,max_workersNone):self.max_workersmax_workersormp.cpu_count()self.thread_poolThreadPoolExecutor(max_workersself.max_workers)self.process_poolProcessPoolExecutor(max_workersself.max_workers)asyncdefrun_in_thread(self,func,*args):在线程池中运行loopasyncio.get_event_loop()returnawaitloop.run_in_executor(self.thread_pool,func,*args)asyncdefrun_in_process(self,func,*args):在进程池中运行loopasyncio.get_event_loop()returnawaitloop.run_in_executor(self.process_pool,func,*args)defshutdown(self):self.thread_pool.shutdown()self.process_pool.shutdown()# 使用示例asyncdefhybrid_example():executorHybridExecutor()# I/O密集型任务在线程池执行asyncdefio_task():awaitasyncio.sleep(0.5)returnIO result# CPU密集型任务在进程池执行defcpu_task(n):returnsum(i**2foriinrange(n))# 并发执行io_resultasyncio.create_task(io_task())cpu_resultexecutor.run_in_process(cpu_task,1000000)resultsawaitasyncio.gather(io_result,cpu_result)print(fResults:{results})executor.shutdown()if__name____main__:asyncio.run(hybrid_example())4. 性能与效率分析4.1 性能对比并发模型I/O密集型加速比CPU密集型加速比内存占用适用场景多线程3-5x1x (GIL限制)低I/O密集型多进程1x4-8x (取决于CPU核心数)高CPU密集型协程5-10x1x很低高并发I/O混合模式5-10x4-8x中混合场景4.2 开销对比操作线程进程协程创建时间~50μs~1ms~1μs切换时间~1μs~10μs~100ns内存占用~8KB~50MB~1KB通信成本低共享内存高IPC低共享内存5. 最佳实践5.1 线程安全使用锁保护共享资源避免死锁按固定顺序获取锁使用线程安全数据结构queue、concurrent collections最小化锁粒度减少临界区代码5.2 进程间通信使用Queue进程安全的消息传递使用Pipe双向通信使用共享内存Manager、SharedMemory避免频繁通信减少序列化开销5.3 协程最佳实践避免阻塞操作使用异步版本的库合理使用gather控制并发数量异常处理使用try/except捕获异常资源管理使用async with5.4 调试技巧使用日志记录线程/进程ID监控工具htop、psutil性能分析cProfile、line_profiler死锁检测faulthandler6. 应用场景6.1 Web服务器多线程处理客户端请求协程高并发连接进程池处理CPU密集型任务6.2 数据处理多进程并行数据处理多线程I/O操作协程流式处理6.3 爬虫系统协程高并发请求线程池解析HTML进程池数据存储6.4 实时系统多线程实时响应协程异步I/O进程池后台计算7. 总结与展望Python并发编程提供了多种工具和方法选择合适的并发模型对于应用性能至关重要。通过本文的介绍读者应该掌握了多线程、多进程和协程的核心概念和使用方法。未来Python并发编程的发展方向包括GIL优化子解释器、 nogil Python异步生态完善更多异步库支持结构化并发更好的并发代码组织硬件加速GPU、TPU并行计算掌握并发编程技术将帮助开发者构建高性能、高可用的Python应用。

更多文章