Python 多进程Multiprocessing

it2025-01-17  3

文章目录

多进程 Multiprocessing添加进程 Process存储进程输出 Queue效率对比 threading & multiprocessing进程池 Pool共享内存 shared memory进程锁 Lock

多进程 Multiprocessing

添加进程 Process

import multiprocessing as mp #import threading as td # 定义一个被线程和进程调用的函数 def job(a,d): print('aaaaa') # 创建线程和进程 #t1 = td.Thread(target=job,args=(1,2)) p1 = mp.Process(target=job,args=(1,2)) #注意:Thread和Process的首字母都要大写,被调用的函数没有括号,被调用的函数的参数放在args(…)中 #启动线程和进程 #t1.start() p1.start() #别连接线程和进程 #t1.join() p1.join()

存储进程输出 Queue

import multiprocessing as mp #定义一个被多线程调用的函数,q 就像一个队列,用来保存每次函数运行的结果 ##该函数没有返回值!!! def job(q): res=0 for i in range(1000): res+=i+i**2+i**3 q.put(res) #queue if __name__=='__main__': q = mp.Queue()#定义一个多线程队列,用来存储结果 #定义两个线程函数,用来处理同一个任务, args 的参数只要一个值的时候,参数后面需要加一个逗号,表示args是可迭代的,后面可能还有别的参数,不加逗号会出错 p1 = mp.Process(target=job,args=(q,)) p2 = mp.Process(target=job,args=(q,)) p1.start() p2.start() p1.join() p2.join() res1 = q.get() res2 = q.get() print(res1+res2)

效率对比 threading & multiprocessing

import multiprocessing as mp import threading as td import time def job(q): res = 0 for i in range(1000000): res += i + i**2 + i**3 q.put(res) # queue def multicore(): q = mp.Queue() p1 = mp.Process(target=job, args=(q,)) p2 = mp.Process(target=job, args=(q,)) p1.start() p2.start() p1.join() p2.join() res1 = q.get() res2 = q.get() print('multicore:',res1 + res2) # 接下来创建多线程程序,创建多线程和多进程有很多相似的地方。首先import threading然后定义multithread()完成同样的任务 def multithread(): q = mp.Queue() # thread可放入process同样的queue中 t1 = td.Thread(target=job, args=(q,)) t2 = td.Thread(target=job, args=(q,)) t1.start() t2.start() t1.join() t2.join() res1 = q.get() res2 = q.get() print('multithread:', res1 + res2) # 最后我们定义最普通的函数。注意,在上面例子中我们建立了两个进程或线程,均对job()进行了两次运算,所以在normal()中我们也让它循环两次 def normal(): res = 0 for _ in range(2): for i in range(1000000): res += i + i**2 + i**3 print('normal:', res) # 最后,为了对比各函数运行时间,我们需要import time, 然后依次运行定义好函数: if __name__ == '__main__': st = time.time() normal() st1 = time.time() print('normal time:', st1 - st) multithread() st2 = time.time() print('multithread time:', st2 - st1) multicore() print('multicore time:', time.time() - st2) """ # range(1000000) ('normal:', 499999666667166666000000L) ('normal time:', 1.1306169033050537) ('thread:', 499999666667166666000000L) ('multithread time:', 1.3054230213165283) ('multicore:', 499999666667166666000000L) ('multicore time:', 0.646507978439331) """ # 普通/多线程/多进程的运行时间分别是1.13,1.3和0.64秒。 我们发现多核/多进程最快,说明在同时间运行了多个任务。 而多线程的运行时间居然比什么都不做的程序还要慢一点,说明多线程还是有一定的短板的。

进程池 Pool

#进程池 Pool() 和 map() import multiprocessing as mp def job(x): return x*x #有了池子之后,就可以让池子对应某一个函数,向池子里丢数据,池子就会返回函数返回的值。 Pool和之前的Process的不同点是丢向Pool的函数有返回值,而Process的没有返回值。 def multicore(): pool = mp.Pool(processes=2)## 定义CPU核数量为2 默认全部。 #接下来用map()获取结果,在map()中需要放入函数和需要迭代运算的值,然后它会自动分配给CPU核,返回结果 res = pool.map(job, range(10)) print(res) #pool.close() # 关闭进程池,不再接受新的进程 #pool.join() # 主进程阻塞等待子进程的退出 # apply_async()中只能传递一个值,它只会放入一个核进行运算,但是传入值时要注意是可迭代的,所以在传入值后需要加逗号, 同时需要用get()方法获取返回值 res = pool.apply_async(job, (2,)) print(res.get()) # 在此我们将apply_async() 放入迭代器中,定义一个新的multi_res # 迭代器,i=0时apply一次,i=1时apply一次等等 multi_res = [pool.apply_async(job, (i,)) for i in range(10)] # 从迭代器中取出 print([res.get() for res in multi_res]) if __name__ == '__main__': multicore() ''' [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # map() 4 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # multi_res ''' # 可以看出在apply用迭代器的得到的结果和用map得到的结果是一样的

总结

Pool默认调用是CPU的核数,传入processes参数可自定义CPU核数map() 放入迭代参数,返回多个结果apply_async()只能放入一组参数,并返回一个结果,如果想得到map()的效果需要通过迭代

共享内存 shared memory

#通过使用Value数据存储在一个共享的内存表中。 import multiprocessing as mp # 其中d和i参数用来设置数据类型的,d表示一个双精浮点类型,i表示一个带符号的整型。更多的形式请查看本页最后的表. value1 = mp.Value('i', 0) value2 = mp.Value('d', 3.14) #在Python的mutiprocessing中,有还有一个Array类,可以和共享内存交互,来实现在进程之间共享数据。 array = mp.Array('i', [1, 2, 3, 4]) #这里的Array和numpy中的不同,它只能是一维的,不能是多维的。同样和Value 一样,需要定义数据形式,否则会报错。 参考数据形式 https://docs.python.org/3.5/library/array.html

进程锁 Lock

import multiprocessing as mp import time def job(v, num, l): l.acquire() # 锁住 for _ in range(5): time.sleep(0.1) v.value += num # v.value获取共享内存 print(v.value) l.release() # 释放 def multicore(): l = mp.Lock() # 定义一个进程锁 v = mp.Value('i', 0)# 定义共享变量 p1 = mp.Process(target=job, args=(v, 1, l)) # 需要将Lock传入 p2 = mp.Process(target=job, args=(v, 3, l)) p1.start() p2.start() p1.join() p2.join() if __name__ == '__main__': multicore()
最新回复(0)