python并发编程--进程、线程、协程、锁、池、队列

it2025-03-31  11

文章目录

操作系统的概念进程multiprocessing模块守护进程使用多进程实现一个并发的socket的server锁生产者消费者模型数据共享线程threading模块守护线程和线程锁递归锁和死锁(科学家吃面)队列池协程gevent模块asyncio模块

操作系统的概念

操作系统 人机矛盾:cpu100%工作 I/O操作 输入输出 相对内存来讲的 多道操作系统:一个程序遇到IO就把cpu让给别人 顺序的一个个的执行的思路变成 共同存在在一台计算机中,其中以恶搞程序执行让出cpu之后,另一个程序能继续使用cpu 来提高cpu利用率 单纯的切换会占用时间,但是多道操作系统的原理整体上还是节省了时间,提高了cpu的利用率 时空复用的概念

单cpu 分时操作系统:每一个程序轮流执行一个时间片,时间片轮转,提高了用户体验 先来先服务为 FCFS 时间片

实时操作系统

分布式操作系统:把一个大任务,分解成许多个小任务,分别给不同的操作系统去执行,最后汇总。 任务可分解 celery python分布式框架

进程

进程:进行中的程序。 占用资源 需要操作系统调用 pid:能够唯一标识进程 计算机中最小的资源分配单位 并发: 多个程序同时进行:只有一个cpu,多个程序轮流在一个cpu上执行。 并行 多个程序同时执行,并且同时在多个cpu上执行。 同步 在做一件事的时候发起另一件事,必须等待上一次事件结束后才能执行 异步 在做A事件时发起B事件,不需要等待A事件结束就可以开始B事件 阻塞 cpu不工作 非阻塞 cpu工作 同步阻塞: input sleep(cpu不工作) 异步阻塞

线程 是进程中的一个单位,不能脱离进程存在 线程是计算机中能够被cpu调度的最小单位

进程的三状态图: 就绪 <-> 运行 -> 阻塞 -> 就绪 -> 运行 进程的调度算法 给所有的进程分配资源或者分配cpu使用权的一种方法 短作业优先 先来先服务 多级反馈算法

multiprocessing模块

# multiple 多元化的 # processing 进程 # multiprocessing 多元的处理进程的模块 # from multiprocessing import Process # import time # import os # def func(name, age): # # print(os.getpid(), os.getppid(), name, age) # pid process id ppid parent process id # print(f'发了一封邮件给{name, age}') # time.sleep(1) # print('发送完毕') # # if __name__ == '__main__': # # 只会再主进程 # # print('main:', os.getpid(), os.getppid()) # # p = Process(target=func, args=('alex', 82)) # # p.start() # 开启了一个子进程 # arg_lst = [('大壮', 78), ('alex', 89)] # p_lst = [] # for arg in arg_lst: # p = Process(target=func, args=arg) # p.start() # p_lst.append(p) # for p in p_lst: # p.join() # print('所有的邮件都已经发送完毕') # 能不能给子进程传递参数 可以 # 能不能获取子进程的返回值 不能 # 能不能同时开启多个子进程 可以 开启子进程时没有等他结束,就进行下一个进程 异步非阻塞 # join的用法 # p.join() # 阻塞p这个进程,直到p执行完毕 同步阻塞 # 多进程之间的数据是否隔离 是的 from multiprocessing import Process n = 0 def func(): global n n += 1 if __name__ == '__main__': p_lst = [] for i in range(10): p = Process(target=func) p.start() p_lst.append(p) for p in p_lst: p.join() print(n) # 0 当前进程的n不会变 变的是子进程中的n # 使用多进程实现一个并发的socket的server

开启进程的另一种方法

import os from multiprocessing import Process class MyProcess(Process): def __init__(self, a, b, c): self.a = a self.b = b self.c = c super().__init__() def run(self): print(os.getppid(), os.getpid(), self.a, self.b, self.c) if __name__ == '__main__': print(os.getpid()) p = MyProcess(1, 2, 3) p.start() # p.terminate() # 强制结束一个子进程

守护进程

from multiprocessing import Process import time def son1(): while True: print('in son1') time.sleep(1) def son2(): for i in range(10): print('in son2') time.sleep(1) if __name__ == '__main__': p1 = Process(target=son1) p1.daemon = True # 表示设置p1为守护进程 p1.start() p2 = Process(target=son2) p2.start() p2.join() time.sleep(5) print('in main') # 会输出5个 in son1 # 主进程会等待所有子进程结束, 是为了回收子进程资源 # 守护进程会等待主进程的代码执行结束之后再结束,只守护主进程的代码时间 # 主进程的代码什么时候结束,守护进程就什么时候结束,和其他子进程无关 # 要求守护进程必须在p2结束后再结束

使用多进程实现一个并发的socket的server

server.py

import socket from multiprocessing import Process def talk(conn): while True: msg = conn.recv(1024).decode('utf-8') ret = msg.upper().encode('utf-8') conn.send(ret) conn.close() if __name__ == '__main__': sk = socket.socket() sk.bind(('127.0.0.1', 9000)) sk.listen() while True: conn, addr = sk.accept() Process(target=talk, args=(conn,)).start() sk.close()

client.py

import socket import time sk = socket.socket() sk.connect(('127.0.0.1', 9000)) while True: sk.send(b'hello') msg = sk.recv(1024).decode('utf-8') print(msg) time.sleep(0.5) sk.close()

# 互斥锁 不能载同一个进程中连续acquire多次 # 进程之间数据安全的问题 # 抢票的例子 import json import time from multiprocessing import Process, Lock def search(i): with open('ticket', encoding='utf-8') as f: ticket = json.load(f) print(f"{i}:当前的余票是{ticket['count']}") def buy_ticket(i): with open('ticket', encoding='utf-8') as f: ticket = json.load(f) if ticket['count'] > 0: ticket['count'] -= 1 print(f'{i}买到票了') time.sleep(0.2) with open('ticket', mode='w', encoding='utf-8') as f: json.dump(ticket, f) def get_ticket(i, lock): search(i) # lock.acquire() # buy_ticket(i) # lock.release() # 等同 with lock: # 代替上边,并能做一些异常处理,保证一个代码出错下一个进程也能执行 buy_ticket(i) if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=get_ticket, args=(i, lock)).start() # 这种只有一张票 所有人都能买 # import time # from multiprocessing import Lock, Process # # def func(i, lock): # lock.acquire() # 拿钥匙 # print(f'被锁住的代码{i}') # time.sleep(1) # lock.release() # 还钥匙 # # # if __name__ == '__main__': # lock = Lock() # for i in range(10): # Process(target=func, args=(i, lock)).start()

生产者消费者模型

# 进程之间数据隔离 # 进程之间通信(IPC) Inter Process Communication # 基于文件:同一台机器上的多个进程之间的通信 Queue # # 基于网络:同一台机器或者多态机器上的多进程通信 # from multiprocessing import Queue, Process # # def son(q): # q.put('hello') # # if __name__ == '__main__': # q = Queue() # Process(target=son, args=(q, )).start() # print(q.get()) # # son 和 mian 之间的通信 # 生产者消费者模型 # 爬虫的时候 # 分布式 # 本质就是让生产数据和消费数据的效率达到平衡并且最大化的效率 # from multiprocessing import Process, Queue # import time # import random # # def consumer(q):# 消费者:通常取到数据之后还要进行某些操作 # for i in range(10): # print(q.get()) # # # def producer(q): # 生产者:通常再放数据前需要先通过某些代码来获取数据 # for i in range(10): # time.sleep(random.random()) # q.put(i) # # # if __name__ == '__main__': # q = Queue() # c1 = Process(target=consumer, args=(q,)) # p1 = Process(target=producer, args=(q,)) # c1.start() # p1.start() # import requests # from multiprocessing import Process, Queue # # def producer(i, url, q): # ret = requests.get(url) # q.put((i, ret.status_code)) # # print(ret.status_code) # # if __name__ == '__main__': # q = Queue() # url_lst = ['https://www.cnblogs.com/Eva-J/p/7277026.html', # 'https://blog.csdn.net/qq_31910669', # 'https://blog.csdn.net/qq_31910669/article/details/109136837'] # # for i, url in enumerate(url_lst): # # producer(i, url, '1') # for index, url in enumerate(url_lst): # Process(target=producer, args=(index, url, q)).start() # for i in range(3): # 异步阻塞, 等哪个子进程先结束我就先获取谁的结果 # print(q.get()) import requests from multiprocessing import Process, Queue def producer(i, url, q): ret = requests.get(url) q.put((i, ret.text)) # print(ret.status_code) def consumer(q): while True: res = q.get() if res is None: break with open(f'{res[0]}.html', mode='w', encoding='utf-8') as f: f.write(res[1]) if __name__ == '__main__': q = Queue() url_dic = {'cn':'https://www.cnblogs.com/Eva-J/p/7277026.html', 'mu':'https://blog.csdn.net/qq_31910669', 'li':'https://blog.csdn.net/qq_31910669/article/details/109136837'} p = [] for key in url_dic: p1 = Process(target=producer, args=(key, url_dic[key], q)) p1.start() p.append(p1) Process(target=consumer, args=(q,)).start() for p2 in p: p2.join() q.put(None)

数据共享

# 数据共享的前提下雅瑶保证数据的安全性,加上lock from multiprocessing import Process, Manager, Lock def change_dic(dic, lock): with lock: dic['count'] -= 1 if __name__ == '__main__': m = Manager() dic = m.dict({'count':100}) lock = Lock() p_lst = [] for i in range(50): p = Process(target=change_dic, args=(dic, lock)) p.start() p_lst.append(p) for p in p_lst: p.join() print(dic)

线程threading模块

# 进程:数据隔离,资源分配的最小单位,可以利用多核,操作系统调度 # 如何开启进程 multiprocessing, 如何开启 start join # 进程有数据不安全问题 Lock # 进程之间可以通信: # 队列(安全) 管道 # 第三方工具:redis # 进程之间数据共享 Manager # 生产者消费者模型 # 线程 # 什么是线程:能被操作系统调度的最小单位 # 一个进程可以有多个线程 # 同一个进程中多个线程同时被cpu执行? 就是这样的 # 可以利用多核, 数据共享,开启关闭切换时间开销小 # CPython中的多线程 # gc 垃圾回收机制 # 引用计数 + 分代回收 # 全局解释器锁是未来完成gc的回收机制 # 全局解释器锁(GIL)global interpreter lock # 导致了同一个进程中的多个线程只能有一个线程真正被cpu执行 # 节省的是IO操作时间 # threading模块 import time from threading import Thread, current_thread, enumerate, active_count # current_thread() 获取线程id current_thread().ident 获取线程id def func(i): print(f'start{i},{current_thread()}') time.sleep(1) print(f'end{i}') if __name__ == '__main__': t1 = [] for i in range(10): t = Thread(target=func, args=(i,)) t.start() t1.append(t) print(enumerate(), active_count()) # 11个 还有一个主线程 for t in t1: t.join() # print(enumerate(), active_count()) # 1个 只剩1个主线程 print('所有的线程都执行完了') # 线程是不能从外部terminate # 所有的线程必须自己执行完毕 # enumerate 列表 存储了所有活着的线程对象 # active_count 数字 存储了所有或者的线程个数 # 面向对象的方式开启一个线程 # 线程之间的数据是共享的

守护线程和线程锁

# import time # from threading import Thread # # def son(): # while True: # print('in son') # time.sleep(1) # def son2(): # for i in range(10): # print('in son2') # time.sleep(1) # t = Thread(target=son) # t.daemon = True # 守护线程,子线程随着主线程结束而结束 # Thread(target=son2).start() # 守护线程会再主线程的代码结束之后继续守护其他子线程 # t.start() # 主线程会等待子线程结束后才结束 # 为什么? # 主线程结束进程就会结束 # 守护线程会再主线程的代码结束之后继续守护其他子线程 # 为什么? # 守护线程随着进程的结束才结束 # 线程锁 from threading import Thread,Lock n = 0 def add(lock): with lock: for i in range(200000): global n n += 1 def sub(lock): with lock: for i in range(200000): global n n -= 1 lock = Lock() Thread(target=add, args=(lock,)) Thread(target=sub, args=(lock,)) print(n) # += -= 数据不安全 因为 + 和 赋值是分开的操作 # append pop 就安全 # 解决线程数据不安全的问题加锁

递归锁和死锁(科学家吃面)

# 递归锁 RLock (Recursion) # 在同一个线程中可以acquire多次 # 互斥锁是一个人进去,其他人都要等待哪个人出来之后才能进去,效率高 # 递归锁,拿一把万能钥匙,其他人在等,里边有多个门,都打开.出来后,其他人才能进去 # from threading import Lock, RLock # rl = RLock() # rl.acquire() # rl.acquire() # rl.acquire() # rl.acquire() # print('锁住的代码') # rl.acquire() # rl.acquire() # rl.acquire() # rl.acquire() # l = Lock() # l.acquire() # print('锁住的代码') # l.release() # from threading import RLock, Thread # # def func(i, lock): # lock.acquire() # lock.acquire() # lock.acquire() # print(f'{i}:start') # lock.release() # # lock.release() # # lock.release() # print(f'{i}:end') # lock = RLock() # for i in range(5): # Thread(target=func, args=(i, lock)).start() # 死锁现象 # 科学家吃面 一个拿到叉子 一个拿到面 都吃不到 死锁 from threading import Lock, Thread, RLock import time # noodle_lock = Lock() # fork_lock = Lock() noodle_lock = fork_lock = RLock() # 这样就不会死锁了 def eat(name): noodle_lock.acquire() # 抢到面了 print(f'{name}抢到面了') fork_lock.acquire() print(f'{name}抢到叉子了') print(f'{name}吃面') time.sleep(0.1) fork_lock.release() print(f'{name}放下叉子了') noodle_lock.release() print(f'{name}放下面了') def eat2(name): fork_lock.acquire() print(f'{name}抢到叉子了') noodle_lock.acquire() # 抢到面了 print(f'{name}抢到面了') print(f'{name}吃面') time.sleep(0.1) noodle_lock.release() print(f'{name}放下面了') fork_lock.release() print(f'{name}放下叉子了') Thread(target=eat, args=('alex',)).start() Thread(target=eat2, args=('taibai',)).start() Thread(target=eat, args=('wusie',)).start() Thread(target=eat2, args=('dazhuang',)).start() # 死锁现象产生的原因:多把锁 并且在多个线程中交替使用 # 如果是互斥锁出现了死锁现象,最快速的解决方法就是把所有的互斥锁变成一把递归锁 # 如果非要用互斥锁,就改为一把锁,锁住面条和叉子

队列

# import queue # 线程之间数据安全的容器队列 # # q = queue.Queue(4) # FIFO 先进先出 # # q.put(1) # q.put(2) # # q.get() # q.put(3) # q.put(4) # print('4 done') # q.put(5) # print('5 done') # q.get_nowait() # q.put_nowait() from queue import LifoQueue, PriorityQueue # LifoQueue last in first out 后进先出 # PriorityQueue 优先级队列 priq = PriorityQueue() priq.put((2, 'alex')) priq.put((5, 'xia')) priq.put((0, 'ming')) print(priq.get()) print(priq.get()) print(priq.get()) # (0, 'ming') # (2, 'alex') # (5, 'xia') # 线程队列 主要记住用法 # 先进先出 Queue # 后进先出 LifoQueue # 优先级 PriorityQueue

# 什么是池 # 要在程序开始的时候,还没提交任务先创建几个线程或则进程 # 放在一个池子里,这就是池 # 为什么要用池 # 如果先开好进程/线程,那么有任务之后就可以直接使用这个池中的数据了 # 并且开好的进程或者线程会一直存在池中,可以被多个任务反复使用 # 这样极大的减少了开启/关闭调度线程/进程的时间开销 # 池中的线程/进程个数控制了操作系统需要调度的任务个数,控制池中的单位 # 有利于提高操作系统的效率,减轻操作系统的负担 from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor from threading import current_thread import time import os # threading 模块 没有提供池 # multiprocessing 模块 仿照threading写的Pool # concurrent.futures 模块 线程池进程池 都能够使用享是的方式开启和调用/使用 # def func(a, b): # print(current_thread().ident, 'start', a, b) # time.sleep(1) # 一开始四个全进去,然后出来一个就进去一个 # print(current_thread().ident, 'end') # # # tp = ThreadPoolExecutor(4) # for i in range(20): # tp.submit(func, i, b=i+1) # 把任务提交给池, 传参 # 实例化 创建池 # 向池中提交任务, submit 传参数 # 进程池 def func(a, b): print(current_thread().ident, 'start', a, b) time.sleep(1) # 一开始四个全进去,然后出来一个就进去一个 print(current_thread().ident, 'end') return a*b if __name__ == '__main__': pp = ProcessPoolExecutor(4) future_l = [] for i in range(20): ret = pp.submit(func, i, b=i+1) # 把任务提交给池, 传参 # print(ret) # 未来对象 先返回一个对象,其结果可能还未执行,等到用result时才会拿到结果 # print(ret.result()) future_l.append(ret) # 防止发生阻塞 for ret in future_l: print(ret.result()) # map # ret = pp.map(func, range(20)) # for key in ret: # print(key) # 回调函数????????? 再学

协程

""" 协程 非常重要 数据共享 数据安全 是操作系统不可见的 协程的本质就是一条线程 多个任务在一条线程上来回切换 来规避IO操作,就达到了我么你将一条线程操作讲到最低的目的 """ # 切换并规避IO的两个模块 # gevent 利用了 greenlet 底层模块完成的切换 + 自动国币IO的功能 # asyncio yeild

gevent模块

# import gevent # # def func(): # 带有IO操作的内容下载函数里,然后日胶func给gevent # print('start func') # gevent.sleep(1) # print('end func') # # g1 = gevent.spawn(func) # g2 = gevent.spawn(func) # g3 = gevent.spawn(func) # gevent.joinall([g1, g2, g3]) # # g1.join() # 阻塞 直到协程g1任务执行结束 # # g1.join() # 阻塞 直到协程g1任务执行结束 # # g1.join() # 阻塞 直到协程g1任务执行结束 from gevent import monkey monkey.patch_all() import time import gevent def func(): # 带有IO操作的内容下载函数里,然后日胶func给gevent print('start func') time.sleep(1) print('end func') g1 = gevent.spawn(func) g2 = gevent.spawn(func) g3 = gevent.spawn(func) gevent.joinall([g1, g2, g3]) # g1.join() # 阻塞 直到协程g1任务执行结束 # g1.join() # 阻塞 直到协程g1任务执行结束 # g1.join() # 阻塞 直到协程g1任务执行结束

asyncio模块

import asyncio async def func(name): print(f'start{name}') # await 可能会发生阻塞的方法 # await 关键字必须写在一个async函数里 , py3.8的不需要 await asyncio.sleep(1) print('end') loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait([func('alex'), func('taibai')])) # 异步 # 原理
最新回复(0)