queue 线程安全队列示例如下:
# -*- coding: utf-8 -*- # 生产者、消费者 from queue import Queue import threading import random, time class Producer(threading.Thread): def __init__(self, q, name): super(Producer, self).__init__() self.q = q self.name = name print(self.name + ":Producer-init 完成: ") def run(self): while True: time.sleep(random.randint(6, 10)) if self.q.full(): # 队列满 print(self.name + ':Producer-Queue is full') else: value = random.randint(0, 10) print(self.name+': put value:' + str(value) + '-into queue') self.q.put((self.name + ":" + str(value))) # 放入队列 class Consumer(threading.Thread): def __init__(self, q, name): super(Consumer, self).__init__() self.q = q self.name = name print(self.name + ":Consumer-init 完成:") def run(self): while True: time.sleep(random.randint(6, 10)) if self.q.empty(): # with write_lock: print(self.name + ':Consumer-Queue-empty') else: value = self.q.get() print(self.name + ':Consumer:取值消费:get Queue-' + str(value)) if __name__ == "__main__": q = Queue(10) p1 = Producer(q, 'P1') p2 = Producer(q, 'P2') p1.start() p2.start() c1 = Consumer(q, 'C1') # c2 = Consumer(q, 'C2') # c3 = Consumer(q, 'C3') c1.start() # c2.start() # c3.start()threading多线程锁实现
# -*- coding: utf-8 -*- import threading import time from decimal import Decimal condition = threading.Condition() num = 0 box_size = 15 class GoodsProduce(threading.Thread): def __init__(self, company_name, produce_speed, info): super(GoodsProduce, self).__init__() self.companyName = company_name self.produceSpeed = Decimal(2 / produce_speed).quantize(Decimal('0.00')) self.info = info def run(self): global num while True: if condition.acquire(): # 获取锁对象 if num < box_size: # 商品小于最大数量 time.sleep(self.produceSpeed) num += 1 print("GoodsProduce : {} Produce one , 现有数量 :{}".format(self.companyName, num)) # condition.notify() # 通知消费者 condition.notifyAll() # 通知消费者 condition.release() # 释放锁对象 else: print("NOTE: BOX is full , size -{} ,生产完成后数量: - {}".format(box_size, num)) condition.wait() # 线程挂起 def show(self): print("show companyName -- {} ,produceSpeed -- {}, info -- {}".format(self.companyName, self.produceSpeed, self.info)) class GoodsConsume(threading.Thread): def __init__(self, cname, area, info): super(GoodsConsume, self).__init__() self.cname = cname self.area = area self.info = info def run(self): global num while True: if condition.acquire(): # 获取锁对象 if num >= 1: num -= 1 print("GoodsConsumer {} Consume one , 现有数量:{}".format(self.cname, num)) # condition.notify() # 通知生产者 condition.notifyAll() # 通知生产者 condition.release() # 释放锁对象 else: print("NOTE: BOX is null ,please wait ... size {} ,消费完后数量: {}".format(box_size, num)) time.sleep(1) condition.wait() # 线程挂起 time.sleep(1) def show(self): print("show GoodsConsume {} area -- {} ,info -- {}".format(self.cname, self.area, self.info)) if __name__ == "__main__": produce_0 = GoodsProduce("Prd-{}".format(0), 1, "this is {} prd company".format(0)) produce_1 = GoodsProduce("Prd-{}".format(1), 2, "this is {} prd company".format(1)) produce_0.start() produce_1.start() produce_0.show() produce_1.show() customer_0 = GoodsConsume("cus-{}".format(0), "area-{}".format(0), "this is {} customer".format(0)) customer_1 = GoodsConsume("cus-{}".format(1), "area-{}".format(1), "this is {} customer".format(1)) customer_2 = GoodsConsume("cus-{}".format(2), "area-{}".format(2), "this is {} customer".format(2)) customer_3 = GoodsConsume("cus-{}".format(3), "area-{}".format(3), "this is {} customer".format(3)) customer_0.start() customer_0.show() customer_1.start() customer_1.show() customer_2.start() customer_2.show() customer_3.start() customer_3.show()