多线程
import requests from lxml import etree from urllib import request import threading from queue import Queue import os import re import time class Page_url_loader(threading.Thread): def __init__(self,urlqueue,page,*args,**kwargs): super(Page_url_loader,self).__init__(*args,**kwargs) self.page_queue = page self.url_queue = urlqueue def run(self): while not self.url_queue.empty(): url = self.url_queue.get() for page in self.parse_url(url): self.page_queue.put(page) def parse_url(self,url): """ 提取并返回img_url列表 :param url: :return: res -> list """ headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1' } response = requests.get(url, headers=headers) text = response.text selector = etree.HTML(text) href = selector.xpath("//div[@class='article']/h2/a/@href") return href class Producer(threading.Thread): """ 负责将每个page_url中的img_url提取 """ def __init__(self,urlqueue,page,img,*args,**kwargs): super(Producer,self).__init__(*args,**kwargs) self.page_queue = page self.img_queue = img self.url_queue = urlqueue def run(self): print("run producer") while not self.page_queue.empty(): # if self.page_queue.empty(): # break u = self.page_queue.get() for tag in self.parse_page(u): self.img_queue.put(tag) def parse_page(self,url): """ 提取并返回img_url列表 :param url: :return: res -> list """ headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1' } response = requests.get(url, headers=headers) text = response.text selector = etree.HTML(text) # res = selector.xpath("//div[@class='article']/h2/a/@href") tag = selector.xpath("//div[@id='post_content']/p/img") res = [] for i in range(len(tag)): href = tag[i].xpath("./@src")[0] title = tag[i].xpath("./@title")[0] title = re.sub(r"[.?!,|<>*\\/?。,、]","",title) filename = title + "_%s" % i + os.path.splitext(href)[1] res.append((href,filename)) return res class Consumer(threading.Thread): def __init__(self,page,img,*args, **kwargs): super(Consumer, self).__init__(*args, **kwargs) self.page_queue = page self.img_queue = img def run(self): while not self.page_queue.empty() or not self.img_queue.empty(): img_url, filename = self.img_queue.get() folder = re.findall(r"(.*)_[0-9]+\..*",filename)[0] try: request.urlretrieve(img_url,"./image/"+folder+"/"+filename) except FileNotFoundError as e: print("cannot find folder:"+folder) print("creating new folder:"+folder+"and retry") os.mkdir("./image/"+folder) request.urlretrieve(img_url, "./image/" + folder + "/" + filename) print("retry success") finally: print("finish download:"+filename) def parse_page(url): """ 提取并返回img_url列表 :param url: :return: res -> list """ headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1' } response = requests.get(url, headers=headers) text = response.text selector = etree.HTML(text) tag = selector.xpath("//div[@id='post_content']/p/img") for i in range(len(tag)): href = tag[i].xpath("./@src") title = tag[i].xpath("./@title") filename = title[0] + "_%s"%i +os.path.splitext(href[0])[1] yield (href[0],filename) # res = selector.xpath("//div[@class='article']/h2/a/@href") # res = selector.xpath("//div[@id='post_content']/p/img/@src") if __name__ == "__main__": url_queue = Queue(10) page_queue = Queue(100) img_queue = Queue(1000) for i in range(1,2): url = "http://www.bbsnet.com/page/%s"%i url_queue.put(url) print("url", url_queue.empty()) print("page", page_queue.empty()) print("img", img_queue.empty()) start = time.clock() for _ in range(6): p = Page_url_loader(url_queue,page_queue) p.start() p.join() print("url", url_queue.empty()) print("page", page_queue.empty()) print("img", img_queue.empty()) print("-"*30) for _ in range(6): pro = Producer(url_queue,page_queue,img_queue) pro.start() pro.join() print("url", url_queue.empty()) print("page", page_queue.empty()) print("img", img_queue.empty()) for _ in range(6): c = Consumer(page_queue,img_queue) c.start() c.join() end = time.clock() print(end-start) # 58s 2pagesAsyncio 异步IO
import asyncio from lxml import etree import os import re import time import aiohttp async def url_to_page(url_queue,page_queue,client): """ :param url_queue: 每个page的队列 :param page_queue: 每个article的队列 :param client: aiohttp session对象 :return: None """ headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1' } while not url_queue.empty(): url = await url_queue.get() async with client.get(url,headers=headers) as resp: text = await resp.text() selector = etree.HTML(text) href = selector.xpath("//div[@class='article']/h2/a/@href") for h in href: await page_queue.put(h) print("put "+h+" in page queue") url_queue.task_done() # return href async def page_to_img(page_queue,img_queue,client): headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1' } while True: url = await page_queue.get() async with client.get(url, headers=headers) as resp: text = await resp.text() selector = etree.HTML(text) tag = selector.xpath("//div[@id='post_content']/p/img") for i in range(len(tag)): href = tag[i].xpath("./@src")[0] title = tag[i].xpath("./@title")[0] title = re.sub(r"[.?!,|<>*\\/?。,、 ]", "", title) filename = title + "_%s" % i + os.path.splitext(href)[1] await img_queue.put((href, filename)) print("put" + filename + "in img queue") page_queue.task_done() if page_queue.empty(): break async def write_file(response,folder,filename): con = await response.content.read() with open("./image/" + folder + "/" + filename, "wb") as f: f.write(con) print("finish downloading:"+filename) async def download_img(img_queue,client): headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1' } while True: url, filename = await img_queue.get() folder = re.findall(r"(.*)_[0-9]+\..*", filename)[0] async with client.get(url, headers=headers) as r: if os.path.exists("./image/"+folder): await write_file(r,folder,filename) else: os.mkdir("./image/" + folder) await write_file(r, folder, filename) img_queue.task_done() if img_queue.empty(): await asyncio.sleep(3) if img_queue.empty(): break async def main(): url_queue = asyncio.Queue() page_queue = asyncio.Queue() img_queue = asyncio.Queue() start = time.clock() for i in range(1,3): url = "http://www.bbsnet.com/page/%s"%i await url_queue.put(url) async with aiohttp.ClientSession() as client: t1 = asyncio.create_task(url_to_page(url_queue,page_queue,client)) t2 = asyncio.create_task(page_to_img(page_queue,img_queue,client)) t3 = asyncio.create_task(download_img(img_queue,client)) await t1 await t2 await t3 end = time.clock() print(url_queue.empty(),page_queue.empty(),img_queue.empty(),end-start) if __name__ == "__main__": asyncio.run(main()) # 43s 2pages总结:由于GIL锁的存在,单线程异步IO会比多线程略快。