主页 > 软件开发  > 

深入理解Python多进程编程multiprocessing

深入理解Python多进程编程multiprocessing
深入理解Python多进程编程 multiprocessing

flyfish

Python 的 multiprocessing 模块允许创建多个进程,从而可以利用多核处理器的能力来并行执行任务。这意味着程序的不同部分可以在不同的CPU核心上同时运行,极大地提高了处理效率,特别是在执行计算密集型任务时。

与多线程相比,multiprocessing 使用的是系统级的进程而不是线程。每个进程都有独立的内存空间和系统资源,而线程则共享同一个进程的内存空间。因此,在Python中(特别是由于全局解释器锁GIL的存在),对于CPU密集型任务,使用multiprocessing比多线程能更有效地利用多核CPU的优势。

进程的概念

在计算机操作系统中,进程是操作系统进行资源分配和调度的基本单位。一个进程可以包含多个线程。当使用multiprocessing模块时,可以创建新的进程,这些新进程将与主程序并行运行,并且它们各自拥有独立的内存空间。

示例代码1:单个进程打印数字

下面是一个简单的示例,演示如何使用multiprocessing模块创建一个进程来打印从1到5的数字:

import multiprocessing import time def print_numbers(): """打印从1到5的数字""" for i in range(1, 6): print("数字:", i) time.sleep(1) # 模拟耗时操作 if __name__ == "__main__": # 创建一个新的进程 process = multiprocessing.Process(target=print_numbers) # 启动进程 process.start() # 等待进程完成 process.join() 数字: 1 数字: 2 数字: 3 数字: 4 数字: 5 multiprocessing.Process():创建一个新的进程对象。target=print_numbers:指定该进程的目标函数为print_numbers。process.start():启动进程。process.join():等待进程结束。 示例代码2:两个进程分别打印不同字符串

下面是另一个示例,演示如何同时启动两个进程,每个进程打印不同的字符串:

import multiprocessing def print_message(message): """打印传入的消息""" print(f"消息: {message}") if __name__ == "__main__": # 创建两个进程 process1 = multiprocessing.Process(target=print_message, args=("Hello from Process 1",)) process2 = multiprocessing.Process(target=print_message, args=("Hello from Process 2",)) # 启动两个进程 process1.start() process2.start() # 等待两个进程都完成 process1.join() process2.join() 消息: Hello from Process 1 消息: Hello from Process 2

在这个例子中,定义了一个print_message函数,它接受一个字符串参数并打印出来。然后,创建了两个进程,每个进程都调用这个函数,但传递了不同的字符串参数。通过args参数,可以向目标函数传递额外的参数。最后,启动这两个进程,并等待它们完成各自的执行。这样,就可以看到两个进程几乎同时开始工作,并打印出各自的消息。

示例3:使用 multiprocessing.Value 在多个进程中共享一个计数器

multiprocessing.Value Value 允许多个进程共享一个值。它适用于需要在多个进程中共享简单数据类型(如整数或浮点数)的情况。

import multiprocessing def increment(counter, lock): """增加计数器的值""" for _ in range(1000): with lock: counter.value += 1 if __name__ == "__main__": # 创建一个共享的整数值和锁 counter = multiprocessing.Value('i', 0) # 'i' 表示整数类型 lock = multiprocessing.Lock() # 创建多个进程来增加计数器 processes = [multiprocessing.Process(target=increment, args=(counter, lock)) for _ in range(10)] # 启动所有进程 for p in processes: p.start() # 等待所有进程完成 for p in processes: p.join() print("最终计数器值:", counter.value) 最终计数器值: 10000 multiprocessing.Value(typecode_or_type, *args, lock=True):创建一个新的共享值对象。typecode_or_type 指定了要共享的数据类型(例如 'i' 表示整数)。value.value:访问共享值的实际内容。lock:确保对共享资源的安全访问,防止竞态条件。

进程(Process)和线程(Thread)在Python中的区别

特性进程(Process)线程(Thread)内存空间每个进程有独立的内存空间所有线程共享同一进程的内存空间资源消耗开销较大,需要更多系统资源轻量级,开销小,资源共享通信难度进程间通信复杂(IPC),如管道、套接字等线程间通信简单,直接访问相同变量和数据结构全局解释器锁(GIL)不受GIL限制,适合计算密集型任务受GIL限制,对于计算密集型任务效率提升有限适用场景计算密集型任务,稳定性要求高的应用I/O密集型任务,快速响应用户界面的应用崩溃影响一个进程崩溃不影响其他进程一个线程出错可能导致整个进程崩溃

Python中多线程(Thread)和多进程(Process)的区别

特性多线程(Thread)多进程(Process)内存空间所有线程共享同一进程的内存空间每个进程有独立的内存空间资源消耗轻量级,开销小,资源共享开销较大,需要更多系统资源通信难度线程间通信简单,直接访问相同变量和数据结构进程间通信复杂(IPC),如管道、套接字等全局解释器锁 (GIL)受GIL限制,对于计算密集型任务效率提升有限不受GIL限制,适合计算密集型任务适用场景I/O密集型任务,快速响应用户界面的应用计算密集型任务,稳定性要求高的应用崩溃影响一个线程出错可能导致整个进程崩溃一个进程崩溃不影响其他进程创建与销毁开销创建和销毁开销较小创建和销毁开销较大并发性能对于I/O密集型任务性能较好,但对于CPU密集型任务受限对于CPU密集型任务性能较好示例用途网络请求、文件读写、GUI应用等数据分析、图像处理、科学计算等 进程间通信

在Python的multiprocessing模块中,提供了几种常用的进程间通信(IPC)方式,包括队列(Queue)、管道(Pipe)等。这些工具允许不同的进程之间安全地传递数据。

使用 multiprocessing.Queue 实现进程间通信

Queue 是一个线程和进程安全的 FIFO 队列,非常适合用于进程间的简单数据交换。

示例代码:

import multiprocessing def producer(queue): """生产者函数,向队列中添加数据""" for i in range(5): queue.put(f"数据 {i}") print(f"生产者放入: 数据 {i}") def consumer(queue): """消费者函数,从队列中取出数据""" while not queue.empty(): data = queue.get() print(f"消费者获取: {data}") if __name__ == "__main__": # 创建一个队列对象 queue = multiprocessing.Queue() # 创建生产者和消费者进程 p1 = multiprocessing.Process(target=producer, args=(queue,)) p2 = multiprocessing.Process(target=consumer, args=(queue,)) # 启动进程 p1.start() p2.start() # 等待两个进程完成 p1.join() p2.join() 生产者放入: 数据 0 生产者放入: 数据 1 生产者放入: 数据 2 生产者放入: 数据 3 生产者放入: 数据 4 消费者获取: 数据 0 消费者获取: 数据 1 消费者获取: 数据 2 消费者获取: 数据 3 消费者获取: 数据 4 队列的使用:queue.put() 用于向队列中添加数据,queue.get() 用于从队列中取出数据。数据传递原理:生产者进程通过调用 put 方法将数据放入队列,而消费者进程通过调用 get 方法从队列中取出数据。Queue 对象是进程安全的,因此多个进程可以同时访问它而不发生冲突。 使用 multiprocessing.Pipe 实现进程间通信

Pipe 提供了一个双向通道,适用于两个进程之间的直接通信。

示例代码:

import multiprocessing def sender(conn, messages): """发送者函数,通过管道发送消息""" for msg in messages: conn.send(msg) print(f"发送者发送: {msg}") conn.close() def receiver(conn): """接收者函数,通过管道接收消息""" while True: msg = conn.recv() if msg == "END": break print(f"接收者接收: {msg}") if __name__ == "__main__": # 创建一个管道对象 parent_conn, child_conn = multiprocessing.Pipe() # 准备要发送的消息 messages = ["Hello", "from", "sender", "END"] # 创建发送者和接收者进程 p1 = multiprocessing.Process(target=sender, args=(child_conn, messages)) p2 = multiprocessing.Process(target=receiver, args=(parent_conn,)) # 启动进程 p1.start() p2.start() # 等待两个进程完成 p1.join() p2.join() 发送者发送: Hello 发送者发送: from 发送者发送: sender 发送者发送: END 接收者接收: Hello 接收者接收: from 接收者接收: sender 进程池的使用

multiprocessing.Pool 是一个用于管理一组工作进程的类,它可以简化并行任务的分配和结果收集。

示例代码:使用 Pool 并行计算数字的平方 import multiprocessing def square(n): """计算一个数的平方""" return n * n if __name__ == "__main__": # 定义要处理的数字列表 numbers = [1, 2, 3, 4, 5] # 创建一个包含4个进程的进程池 with multiprocessing.Pool(processes=4) as pool: # 使用map方法将square函数应用于每个数字 results = pool.map(square, numbers) print("结果:", results) 结果: [1, 4, 9, 16, 25] 进程池的概念和作用:Pool 允许你指定一定数量的工作进程,并且可以通过 map、apply 等方法轻松地将任务分配给这些进程。这样可以有效地利用多核CPU来加速计算密集型任务。设置进程池大小:通过 processes 参数指定进程池中的工作进程数量,默认情况下,它会根据系统CPU核心数自动调整。处理任务的方式:pool.map() 方法类似于内置的 map() 函数,但它会在多个进程中并行执行。在这个例子中,我们将 square 函数应用到 numbers 列表中的每个元素,并返回计算结果。 Semaphore(信号量)

信号量是一种更高级的同步机制,可以用来控制同时访问某一资源的进程数量。

示例:使用 Semaphore 控制并发访问

import multiprocessing import time def worker(semaphore, name): with semaphore: print(f"{name} 获得信号量") time.sleep(1) if __name__ == "__main__": semaphore = multiprocessing.Semaphore(3) # 最多允许3个进程同时访问 processes = [multiprocessing.Process(target=worker, args=(semaphore, f"进程 {i}")) for i in range(6)] for p in processes: p.start() for p in processes: p.join() Event(事件)

事件是一种简单的线程间通信机制,可以让一个或多个进程等待某个特定事件的发生。

示例:使用 Event 实现进程间的同步

import multiprocessing import time def wait_for_event(event): print("等待事件触发...") event.wait() # 阻塞直到事件被设置 print("事件已触发!") def set_event(event): time.sleep(3) event.set() # 触发事件 if __name__ == "__main__": event = multiprocessing.Event() p1 = multiprocessing.Process(target=wait_for_event, args=(event,)) p2 = multiprocessing.Process(target=set_event, args=(event,)) p1.start() p2.start() p1.join() p2.join() Manager(管理器)

Manager 提供了更高层次的接口,可以创建可以在不同进程之间共享的数据结构,如列表、字典等。

示例:使用 Manager 创建共享数据结构

import multiprocessing def append_to_list(shared_list, item): shared_list.append(item) print(f"添加到共享列表: {item}") if __name__ == "__main__": with multiprocessing.Manager() as manager: shared_list = manager.list() # 创建一个可共享的列表 processes = [multiprocessing.Process(target=append_to_list, args=(shared_list, i)) for i in range(5)] for p in processes: p.start() for p in processes: p.join() print("最终共享列表:", list(shared_list))

文中processes = [multiprocessing.Process(target=append_to_list, args=(shared_list, i)) for i in range(5)]这一句 等于下面的代码

processes = [] for i in range(5): p = multiprocessing.Process(target=append_to_list, args=(shared_list, i)) processes.append(p) 共享内存

multiprocessing 还支持通过共享内存的方式在进程之间共享数据,这对于大规模数据共享特别有用。

示例:使用 Array 共享数组

import multiprocessing def modify_array(shared_array, index, value): shared_array[index] = value if __name__ == "__main__": array = multiprocessing.Array('i', [1, 2, 3, 4, 5]) # 创建共享数组 processes = [ multiprocessing.Process(target=modify_array, args=(array, i, i*10)) for i in range(len(array)) ] for p in processes: p.start() for p in processes: p.join() print("修改后的数组:", list(array)) 修改后的数组: [0, 10, 20, 30, 40]
标签:

深入理解Python多进程编程multiprocessing由讯客互联软件开发栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“深入理解Python多进程编程multiprocessing