文章目录
操作系统的概念进程multiprocessing模块守护进程使用多进程实现一个并发的socket的server锁生产者消费者模型数据共享线程threading模块守护线程和线程锁递归锁和死锁(科学家吃面)队列池协程gevent模块asyncio模块
操作系统的概念
操作系统 人机矛盾:cpu100%工作 I/O操作 输入输出 相对内存来讲的 多道操作系统:一个程序遇到IO就把cpu让给别人 顺序的一个个的执行的思路变成 共同存在在一台计算机中,其中以恶搞程序执行让出cpu之后,另一个程序能继续使用cpu 来提高cpu利用率 单纯的切换会占用时间,但是多道操作系统的原理整体上还是节省了时间,提高了cpu的利用率 时空复用的概念
单cpu 分时操作系统:每一个程序轮流执行一个时间片,时间片轮转,提高了用户体验 先来先服务为 FCFS 时间片
实时操作系统
分布式操作系统:把一个大任务,分解成许多个小任务,分别给不同的操作系统去执行,最后汇总。 任务可分解 celery python分布式框架
进程
进程:进行中的程序。 占用资源 需要操作系统调用 pid:能够唯一标识进程 计算机中最小的资源分配单位 并发: 多个程序同时进行:只有一个cpu,多个程序轮流在一个cpu上执行。 并行 多个程序同时执行,并且同时在多个cpu上执行。 同步 在做一件事的时候发起另一件事,必须等待上一次事件结束后才能执行 异步 在做A事件时发起B事件,不需要等待A事件结束就可以开始B事件 阻塞 cpu不工作 非阻塞 cpu工作 同步阻塞: input sleep(cpu不工作) 异步阻塞
线程 是进程中的一个单位,不能脱离进程存在 线程是计算机中能够被cpu调度的最小单位
进程的三状态图: 就绪 <-> 运行 -> 阻塞 -> 就绪 -> 运行 进程的调度算法 给所有的进程分配资源或者分配cpu使用权的一种方法 短作业优先 先来先服务 多级反馈算法
multiprocessing模块
from multiprocessing
import Process
n
= 0
def func():
global n
n
+= 1
if __name__
== '__main__':
p_lst
= []
for i
in range(10):
p
= Process
(target
=func
)
p
.start
()
p_lst
.append
(p
)
for p
in p_lst
:
p
.join
()
print(n
)
开启进程的另一种方法
import os
from multiprocessing
import Process
class MyProcess(Process
):
def __init__(self
, a
, b
, c
):
self
.a
= a
self
.b
= b
self
.c
= c
super().__init__
()
def run(self
):
print(os
.getppid
(), os
.getpid
(), self
.a
, self
.b
, self
.c
)
if __name__
== '__main__':
print(os
.getpid
())
p
= MyProcess
(1, 2, 3)
p
.start
()
守护进程
from multiprocessing
import Process
import time
def son1():
while True:
print('in son1')
time
.sleep
(1)
def son2():
for i
in range(10):
print('in son2')
time
.sleep
(1)
if __name__
== '__main__':
p1
= Process
(target
=son1
)
p1
.daemon
= True
p1
.start
()
p2
= Process
(target
=son2
)
p2
.start
()
p2
.join
()
time
.sleep
(5)
print('in main')
使用多进程实现一个并发的socket的server
server.py
import socket
from multiprocessing
import Process
def talk(conn
):
while True:
msg
= conn
.recv
(1024).decode
('utf-8')
ret
= msg
.upper
().encode
('utf-8')
conn
.send
(ret
)
conn
.close
()
if __name__
== '__main__':
sk
= socket
.socket
()
sk
.bind
(('127.0.0.1', 9000))
sk
.listen
()
while True:
conn
, addr
= sk
.accept
()
Process
(target
=talk
, args
=(conn
,)).start
()
sk
.close
()
client.py
import socket
import time
sk
= socket
.socket
()
sk
.connect
(('127.0.0.1', 9000))
while True:
sk
.send
(b
'hello')
msg
= sk
.recv
(1024).decode
('utf-8')
print(msg
)
time
.sleep
(0.5)
sk
.close
()
锁
import json
import time
from multiprocessing
import Process
, Lock
def search(i
):
with open('ticket', encoding
='utf-8') as f
:
ticket
= json
.load
(f
)
print(f
"{i}:当前的余票是{ticket['count']}")
def buy_ticket(i
):
with open('ticket', encoding
='utf-8') as f
:
ticket
= json
.load
(f
)
if ticket
['count'] > 0:
ticket
['count'] -= 1
print(f
'{i}买到票了')
time
.sleep
(0.2)
with open('ticket', mode
='w', encoding
='utf-8') as f
:
json
.dump
(ticket
, f
)
def get_ticket(i
, lock
):
search
(i
)
with lock
:
buy_ticket
(i
)
if __name__
== '__main__':
lock
= Lock
()
for i
in range(10):
Process
(target
=get_ticket
, args
=(i
, lock
)).start
()
生产者消费者模型
import requests
from multiprocessing
import Process
, Queue
def producer(i
, url
, q
):
ret
= requests
.get
(url
)
q
.put
((i
, ret
.text
))
def consumer(q
):
while True:
res
= q
.get
()
if res
is None:
break
with open(f
'{res[0]}.html', mode
='w', encoding
='utf-8') as f
:
f
.write
(res
[1])
if __name__
== '__main__':
q
= Queue
()
url_dic
= {'cn':'https://www.cnblogs.com/Eva-J/p/7277026.html',
'mu':'https://blog.csdn.net/qq_31910669',
'li':'https://blog.csdn.net/qq_31910669/article/details/109136837'}
p
= []
for key
in url_dic
:
p1
= Process
(target
=producer
, args
=(key
, url_dic
[key
], q
))
p1
.start
()
p
.append
(p1
)
Process
(target
=consumer
, args
=(q
,)).start
()
for p2
in p
:
p2
.join
()
q
.put
(None)
数据共享
from multiprocessing
import Process
, Manager
, Lock
def change_dic(dic
, lock
):
with lock
:
dic
['count'] -= 1
if __name__
== '__main__':
m
= Manager
()
dic
= m
.dict({'count':100})
lock
= Lock
()
p_lst
= []
for i
in range(50):
p
= Process
(target
=change_dic
, args
=(dic
, lock
))
p
.start
()
p_lst
.append
(p
)
for p
in p_lst
:
p
.join
()
print(dic
)
线程threading模块
import time
from threading
import Thread
, current_thread
, enumerate, active_count
def func(i
):
print(f
'start{i},{current_thread()}')
time
.sleep
(1)
print(f
'end{i}')
if __name__
== '__main__':
t1
= []
for i
in range(10):
t
= Thread
(target
=func
, args
=(i
,))
t
.start
()
t1
.append
(t
)
print(enumerate(), active_count
())
for t
in t1
:
t
.join
()
print('所有的线程都执行完了')
守护线程和线程锁
from threading
import Thread
,Lock
n
= 0
def add(lock
):
with lock
:
for i
in range(200000):
global n
n
+= 1
def sub(lock
):
with lock
:
for i
in range(200000):
global n
n
-= 1
lock
= Lock
()
Thread
(target
=add
, args
=(lock
,))
Thread
(target
=sub
, args
=(lock
,))
print(n
)
递归锁和死锁(科学家吃面)
from threading
import Lock
, Thread
, RLock
import time
noodle_lock
= fork_lock
= RLock
()
def eat(name
):
noodle_lock
.acquire
()
print(f
'{name}抢到面了')
fork_lock
.acquire
()
print(f
'{name}抢到叉子了')
print(f
'{name}吃面')
time
.sleep
(0.1)
fork_lock
.release
()
print(f
'{name}放下叉子了')
noodle_lock
.release
()
print(f
'{name}放下面了')
def eat2(name
):
fork_lock
.acquire
()
print(f
'{name}抢到叉子了')
noodle_lock
.acquire
()
print(f
'{name}抢到面了')
print(f
'{name}吃面')
time
.sleep
(0.1)
noodle_lock
.release
()
print(f
'{name}放下面了')
fork_lock
.release
()
print(f
'{name}放下叉子了')
Thread
(target
=eat
, args
=('alex',)).start
()
Thread
(target
=eat2
, args
=('taibai',)).start
()
Thread
(target
=eat
, args
=('wusie',)).start
()
Thread
(target
=eat2
, args
=('dazhuang',)).start
()
队列
from queue
import LifoQueue
, PriorityQueue
priq
= PriorityQueue
()
priq
.put
((2, 'alex'))
priq
.put
((5, 'xia'))
priq
.put
((0, 'ming'))
print(priq
.get
())
print(priq
.get
())
print(priq
.get
())
池
from concurrent
.futures
import ThreadPoolExecutor
, ProcessPoolExecutor
from threading
import current_thread
import time
import os
def func(a
, b
):
print(current_thread
().ident
, 'start', a
, b
)
time
.sleep
(1)
print(current_thread
().ident
, 'end')
return a
*b
if __name__
== '__main__':
pp
= ProcessPoolExecutor
(4)
future_l
= []
for i
in range(20):
ret
= pp
.submit
(func
, i
, b
=i
+1)
future_l
.append
(ret
)
for ret
in future_l
:
print(ret
.result
())
协程
"""
协程 非常重要 数据共享 数据安全
是操作系统不可见的
协程的本质就是一条线程 多个任务在一条线程上来回切换 来规避IO操作,就达到了我么你将一条线程操作讲到最低的目的
"""
gevent模块
from gevent
import monkey
monkey
.patch_all
()
import time
import gevent
def func():
print('start func')
time
.sleep
(1)
print('end func')
g1
= gevent
.spawn
(func
)
g2
= gevent
.spawn
(func
)
g3
= gevent
.spawn
(func
)
gevent
.joinall
([g1
, g2
, g3
])
asyncio模块
import asyncio
async def func(name
):
print(f
'start{name}')
await asyncio
.sleep
(1)
print('end')
loop
= asyncio
.get_event_loop
()
loop
.run_until_complete
(asyncio
.wait
([func
('alex'), func
('taibai')]))