智一面的面试题提供python的测试题
使用地址:http://www.gtalent.cn/exam/interview?token=52cf92de494f4a8b6165d817a7279966
敲了130多行代码,利用协程实现漫画下载,亲测没问题,目前海贼王更新到930话,全部下载下来1小时左右,供大家参考,一起共勉。
- from gevent import monkey;monkey.patch_all()
- from gevent.pool import Pool
- from bs4 import BeautifulSoup
- from fake_useragent import UserAgent
- from requests.packages.urllib3.exceptions import InsecureRequestWarning
- import gevent
- import requests
- import time
- import os
- import shutil
- def getSource(urls, headers, types):
- try:
- # 实例化UserAgent类
- user_agent = UserAgent()
- # 为头文件随机分配User-Agent
- headers['User-Agent'] = user_agent.random
- # 禁用安全请求警告
- requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
- # 实例化Session
- request_session = requests.Session()
- # 设置重连次数
- request_session.mount('http://', requests.adapters.HTTPAdapter(max_retries=5))
- request_session.mount('https://', requests.adapters.HTTPAdapter(max_retries=5))
- # 执行请求
- get_response = request_session.get(urls, headers=headers, verify=False, timeout=(10, 10))
- # 关闭请求
- request_session.close()
- # 设置编码
- get_response.encoding = 'UTF-8'
- # 判断获取源码还是图片
- if types == 'text':
- get_response = get_response.text
- if types == 'content':
- get_response = get_response.content
- except Exception as e:
- print('getSource()函数异常:' + str(e))
- else:
- return get_response
- def sourceAnalysis(src, dic, typ):
- # 定义章节链接、标题、内容列表
- chapter_link = []
- chapter_name = []
- chapter_cont = []
- # 实例化BeautifulSoup
- soup = BeautifulSoup(src, 'html.parser')
- # 解析章节链接和标题
- if typ == 'chapter':
- analysis_lists = soup.find_all(dic['label'], class_=dic['class'])
- # 提取章节链接和标题
- for i in range(len(analysis_lists)):
- chapter_link.append(DOMAIN + analysis_lists[i].get('data-hreflink'))
- chapter_name.append(analysis_lists[i].get_text().strip())
- chapter_dic = {'chapter_link': chapter_link, 'chapter_name': chapter_name}
- return chapter_dic
- # 解析章节内图片链接
- if typ == 'content':
- analysis_lists = soup.find_all(dic['label'], class_=dic['class'])
- # 提取章节内图片链接
- for i in range(len(analysis_lists)):
- chapter_cont.append(analysis_lists[i].get('data-src'))
- return chapter_cont
- if __name__ == '__main__':
- # 系统启动时间
- start_time = time.time()
- # 定义常量
- DOMAIN = 'https://www.mkzhan.com/'
- REQUEST_URL = 'https://www.mkzhan.com/209871/'
- HEADERS = {'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
- 'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
- 'Connection': 'keep-alive',
- 'User-Agent': ''}
- LINK_PROPERTY = {'label': 'a', 'class': 'j-chapter-link'}
- IMAG_PROPERTY = {'label': 'img', 'class': 'lazy-read'}
- POOL = Pool(100)
- ROOT_PATH = "D:/OnePiece/"
- # 创建存储漫画文件夹,如果已有文件夹,则删除再新建
- if os.path.exists(ROOT_PATH):
- shutil.rmtree(ROOT_PATH)
- os.mkdir(ROOT_PATH)
- # 获取目录页源码
- function_run_time = time.time()
- print(time.strftime('[%Y-%m-%d %H:%M:%S]', time.localtime()) + " 获取目录页源码开始...")
- catalog_source = getSource(REQUEST_URL, HEADERS, 'text')
- print(time.strftime('[%Y-%m-%d %H:%M:%S]', time.localtime()) + " 获取目录页源码完成...[ %.1fs ]" % (time.time() - function_run_time))
- # 解析章节信息
- function_run_time = time.time()
- print(time.strftime('[%Y-%m-%d %H:%M:%S]', time.localtime()) + " 解析章节信息开始...")
- chapter_info = sourceAnalysis(catalog_source, LINK_PROPERTY, 'chapter')
- print(time.strftime('[%Y-%m-%d %H:%M:%S]', time.localtime()) + " 解析章节信息完成...[ %.1fs ]" % (time.time() - function_run_time))
- # 获取每章节源码
- function_run_time = time.time()
- print(time.strftime('[%Y-%m-%d %H:%M:%S]', time.localtime()) + " 获取每章节源码开始...")
- get_source_worker = [POOL.spawn(getSource, url, HEADERS, 'text') for url in chapter_info['chapter_link']]
- gevent.joinall(get_source_worker)
- chapter_source = [source.value for source in get_source_worker]
- print(time.strftime('[%Y-%m-%d %H:%M:%S]', time.localtime()) + " 获取每章节源码完成...[ %.1fs ]" % (time.time() - function_run_time))
- # 解析章节内图片链接
- function_run_time = time.time()
- print(time.strftime('[%Y-%m-%d %H:%M:%S]', time.localtime()) + " 解析章节内图片链接开始...")
- get_imglink_worker = [POOL.spawn(sourceAnalysis, src, IMAG_PROPERTY, 'content') for src in chapter_source]
- gevent.joinall(get_imglink_worker)
- image_list = [link.value for link in get_imglink_worker]
- print(time.strftime('[%Y-%m-%d %H:%M:%S]', time.localtime()) + " 解析章节内图片链接完成...[ %.1fs ]" % (time.time() - function_run_time))
- # 下载漫画
- for i in range(len(chapter_info['chapter_name'])):
- function_run_time = time.time()
- print(time.strftime('[%Y-%m-%d %H:%M:%S]', time.localtime()) + " 下载 " + chapter_info['chapter_name'][i] + " 开始...")
- get_images_worker = [POOL.spawn(getSource, url, HEADERS, 'content') for url in image_list[i]]
- gevent.joinall(get_images_worker)
- # 创建章节文件夹
- save_path = ROOT_PATH + chapter_info['chapter_name'][i] + '/'
- os.mkdir(save_path)
- for j in range(len(get_images_worker)):
- with open(save_path + str(j) + '.jpg', 'wb') as image:
- image.write(get_images_worker[j].value)
- print(time.strftime('[%Y-%m-%d %H:%M:%S]', time.localtime()) + " 下载 " + chapter_info['chapter_name'][i] + " 完成...[ %.1fs ]" % (time.time() - function_run_time))
- print(time.strftime('[%Y-%m-%d %H:%M:%S]', time.localtime()) + ' System executing done...[ %.1fs ]' % (time.time() - start_time))