进程(多进程、队列、进程池)、线程(多线程)、协程

it2023-07-08  67

进程

from multiprocessing import Process m = 1 def func1(n): global m m += 1 print(m) # 2 def func2(n): global m m += 1 print(m) # 2 # 主进程 if __name__ == '__main__': p1 = Process(target=func1,name='进程1',args=(1,)) p1.start() p2 = Process(target=func2,name='进程2',args=(2,)) p2.start() m += 1 print(m) # 2 # Process.start() 启动进程并执行任务 # Process.run() 只是执行了任务但是没有启动进程 # terminate() 终止进程 # os.getpid() 获取当前进程的主进程 # 多进程访问全局变量,每个进程各自拥有此变量,进程之间不相互干扰

自定义进程类

from multiprocessing import Process class MyProcess(Process): def __init__(self, name): super(MyProcess, self).__init__() self.name = name # 重写run方法 def run(self): print(self.name) if __name__ == '__main__': p1 = MyProcess('进程1') p1.start() p2 = MyProcess('进程2') p2.start()

进程池

# 非阻塞式进程 # 一个进程开始后,另一个进程开始,进程池满之后,其他进程等待,有进程结束其它进程开始,复用内存空间 from multiprocessing import Process, Pool import time def func1(name): print(f'{name}开始') time.sleep(1.5) print(f'{name}完成') if __name__ == '__main__': pool = Pool(3) tasks = ['进程1', '进程2', '进程3', '进程4', '进程5'] for task in tasks: pool.apply_async(func1, args=(task,), callback=None) # 异步 pool.close() # 添加任务结束 pool.join() # 等待子进程结束后,继续向下执行,主进程结束进程池结束,进程池为空时等待结束 print('over!!!') # 阻塞式进程 # 一个进程结束后,另一个进程开始,分配新的内存空间,进程池最大容量用完后,其它进程复用内存空间 from multiprocessing import Pool import time def func1(name): print(f'{name}开始') time.sleep(1) print(f'{name}完成') if __name__ == '__main__': pool = Pool(3) tasks = ['进程1', '进程2', '进程3', '进程4', '进程5'] for task in tasks: pool.apply(func1, args=(task,)) pool.close() pool.join() print('over!!!')

进程间通信

from multiprocessing import Process, Queue import time def func1(q1): list1 = [1, 2, 3, 4, 5] for i in list1: print(f'{i}开始') time.sleep(0.5) q1.put(i) def func2(q2): while True: try: j = q2.get(timeout=3) print(f'{j}完成') except: break if __name__ == '__main__': q = Queue(3) p1 = Process(target=func1, args=(q,)) p2 = Process(target=func2, args=(q,)) p1.start() p2.start() p2.join() # q.qsize() 队列长度 # q.empty()/q.full() 队列是否为空/满

线程

import threading n = 0 def func1(): global n for i in range(1000000): n += 1 def func2(): global n for i in range(1000000): n += 1 if __name__ == '__main__': t1 = threading.Thread(target=func1, name='t1') t2 = threading.Thread(target=func2, name='t2') t1.start() t2.start() t2.join() print(n) # 线程可以共享全局变量 # GIL 全局解释器锁,当计算量小时,python底层自动加锁,当一个任务结束后执行另一个任务,当计算量大到一定值时,锁释放,一个任务未结束便执行另一个任务,如n进行运算后尚未赋值完成,又被另一线程调用,这时的n为尚未进行运算之前的n # 线程:用于耗时操作,爬虫,IO # 进程:大量计算

多线程同步

import threading import time lock = threading.Lock() n = 0 def func1(): global n lock.acquire() # 获取锁 for i in range(1000000): n += 1 lock.release() #释放锁 def func2(): global n lock.acquire() for i in range(1000000): n += 1 lock.release() if __name__ == '__main__': t1 = threading.Thread(target=func1, name='t1') t2 = threading.Thread(target=func2, name='t2') t1.start() t2.start() t2.join() print(n)

自定义线程与死锁

from threading import Thread import time lockA = Thread.Lock() lockB = Thread.Lock() class MyThread1(Thread): # def __init__(self, name): # pass def run(self): # start() if lockA.acquire(): # 如果可以获取到锁则返回True print(self.name+'获取了A锁') time.sleep(0.1) if lockB.acquire(timeout=3): print(self.name+'又获取了B锁') lockB.release() lockA.release() class MyThread2(Thread): def run(self): # start() if lockB.acquire(): # 如果可以获取到锁则返回True print(self.name+'获取了B锁') time.sleep(0.1) if lockA.acquire(timeout=3): print(self.name+'又获取了A锁') lockA.release() lockB.release() if __name__ == '__main__': t1 = MyThread1() t2 = MyThread2() t1.start() t2.start()

生成者与消费者模式(线程间通信)

import threading import queue import random import time def produce(q): i = 0 while i < 10: num = random.randint(1, 100) q.put(num) print('生产了:%d' % num) time.sleep(1) i += 1 q.put(None) # 完成任务 q.task_done() def consume(q): while True: item = q.get() if item is None: break print('消费了:%s' % item) time.sleep(3) # 完成任务 q.task_done() if __name__ == '__main__': q.queue.Queue(10) arr[] # 创建生产者 tp = threading.Thread(target=produce, args=(q,)) tp = start() # 创建消费者 tc = threading.Thread(tatget=consume, args=(q,)) tp.join() tc.join() print('end')

协程

def task1(): for i in range(3): print('A' + str(i) yield time.sleep(0.2) def task2(): for i in range(3): print('B' + str(i) yield time.sleep(0.2) if __name__ = '__main__': g1 = task1() g2 = task2() while True: try: next(g1) next(g2) except: break # greenlet 完成协程任务 import time from greenlet import greenlet def a(): # 任务A for i in range(5): print('A' + str(i)) gb.switch() time.sleep(0.1) def b(): # 任务B for i in range(5): print('B' + str(i)) gc.switch() time.sleep(0.1) def c(): # 任务C for i in range(5): print('C' + str(i)) ga.switch() time.sleep(0.1) if __name__ == '__main__': ga = greenlet(a) gb = greenlet(b) gc = greenlet(c) ga.switch() # 保证总有greenlet在运行,而不是等待IO import time import gevent from gevent import monkey monkey.patch_all() # 猴子补丁将time.sleep()替换为gevent.sleep() def a(): # 任务A for i in range(5): print('A' + str(i)) time.sleep(0.1) def b(): # 任务B for i in range(5): print('B' + str(i)) time.sleep(0.1) def c(): # 任务C for i in range(5): print('C' + str(i)) time.sleep(0.1) if __name__ == '__main__': g1 = gevent.spawn(a) g2 = gevent.spawn(b) g3 = gevent.spawn(c) g1.join() g2.join() g3.join() # 案例 import requests import gevent from gevent import monkey import urllib.request monkey.patch_all() def download(url): response = urllib.request.urlopen(url) content = response.read() print(f'下载了{url}的数据,长度:{len(content)}') if __name__ == '__main__': urls = ['http://www.163.com', 'http://www.qq.com', 'http://www.baidu.com'] g1 = gevent.spawn(download, urls[0]) g2 = gevent.spawn(download, urls[1]) g3 = gevent.spawn(download, urls[2]) # gevent.joinall(g1, g2, g3) # 类似g1.join() g1.join() g2.join() g3.join()
最新回复(0)