写在前面,全部代码已开源,放到github上。点击查看欢迎star.
想几分钟了解目前社会最新动态和最热门事件,那么最好的选择就是搭建一个热点聚合网站。下面就记录下我使用python 搭建热点聚合的网站。
搭建一个这样的网站,我们首先需要的就是数据源。这里我们可以使用python中的requests库。
安装和和详细查看官方文档。https://2.python-requests.org//zh_CN/latest/user/quickstart.html界面解析使用的是xpath。一般的网站都会有反爬虫,这里使用简单的反爬虫措施。配置随机UA,控制爬取时间等这些都是基本操作。因为需要多处使用heads,这里创建一个settings.py 配置请求头。
# settings.py
# 请求头
import random
# 随机头信息 还有很多,这不显示全了
USER_AGENTS = [
"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; AcooBrowser; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; Acoo Browser; SLCC1; .NET CLR 2.0.50727; Media Center PC 5.0; .NET CLR 3.0.04506)",
"Mozilla/4.0 (compatible; MSIE 7.0; AOL 9.5; AOLBuild 4337.35; Windows NT 5.1; .NET CLR 1.1.4322; .NET CLR 2.0.50727
]
BASE_HEADERS = {
'User-Agent': random.choice(USER_AGENTS),
'Accept-Encoding': 'gzip, deflate, sdch',
'Accept-Language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
这里我们根据自己的需求定义一下requests请求。我在请求过程中发现访问github会偶尔出现请求超时的现象,我这里写 了个装饰器,进行失败尝试,默认最大失败次数为3次。helper.py 文件内容如下
# helper.py
import time
import requests
from requests.exceptions import ConnectionError
from settings import BASE_HEADERS
def retry(max_tries=3, wait=5):
"""
获取失败,进行再次爬取
:param max_tries: 失败次数
:param wait: 每次失败时等待时间
:return:
"""
def deco(fun):
def wrapper(*args, **kwargs):
for i in range(max_tries):
result = fun(*args, **kwargs)
if result is None:
print('retry%s' %i)
time.sleep(wait)
continue
else:
return result
return wrapper
return deco
@retry()
def get_text(url, options={}):
"""
:param method: 请求方法
:param url: 请求的目标url
:param options:添加的请求头
:return:
"""
headers = dict(BASE_HEADERS, **options)
print('正在抓取', url)
try:
res = requests.get(url, headers=headers, timeout=5)
# print(res.status_code)
if res.status_code == 200:
print('抓取成功', url, res.status_code)
return res
except ConnectionError:
print('抓取失败', url)
return None
下面展示一下爬取热榜的脚本,这里展示了爬取微博和知乎的方法,都是xpath简单使用。
# crawler.py
# 导入需要的包
from lxml import etree
# 刚我们定义的获取资源的方法
from helper import get_text
def crawler_wei_bo():
"""
爬取微博热榜
:return:
"""
url = 'https://s.weibo.com/top/summary?cate=realtimehot'
response_html = get_text(url)
content_list = []
if response_html:
tree = etree.HTML(response_html.text)
tr_list = tree.xpath('//table/tbody/tr')[1:]
for tr in tr_list:
# index = tr.xpath('./td[1]/text()')[0]
title = tr.xpath('./td[2]/a/text()')[0]
href = 'https://s.weibo.com%s' % tr.xpath('./td[2]/a/@href')[0]
content_list.append({'title': title, 'href': href})
return {'hot_name': '新浪微博', 'content': content_list}
def crawler_zhi_hu():
"""
获取知乎热榜
:return:
"""
url = 'https://www.zhihu.com/api/v3/feed/topstory/hot-lists/total?limit=50&desktop=true'
headers = {
'path': '/api/v3/feed/topstory/hot-lists/total?limit=50&desktop=true',
'x-api-version': '3.0.76',
'x-requested-with': 'fetch',
}
content_list = []
response_html = get_text(url, options=headers)
if response_html:
data_list = response_html.json().get('data', '')
# print(data_list)
if data_list:
for data in data_list:
title = data.get('target').get('title_area').get('text', '')
href = data.get('target').get('link').get('url', '')
content_list.append({'title': title, 'href': href})
return {'hot_name': '知乎热榜', 'content': content_list}
数据请求到了,我们就需要把数据存起来,可以保存到数据库,也可以保存的本地。这里因为没有其他额外的用途,就保存为json文件存到本地,保存json文件是便于和前端交互展示。下面我们写一个run.py 文件,把所有爬取结果保存起来。下面就是具体的函数了,这里使用了多线程和定时任务。热榜信息更新不是很快,这里定时20分钟爬取一次。
# run.py
import json
import os
from apscheduler.schedulers.blocking import BlockingScheduler
from concurrent.futures import ThreadPoolExecutor
from crawler import *
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
def run_crawler():
"""
多线程爬取
:return:
"""
crawler_list = [crawler_zhi_hu, crawler_v2ex, crawler_github, crawler_wei_bo, crawler_tie_ba, crawler_dou_ban,
crawler_tian_ya, crawler_wang_yi]
result = []
with ThreadPoolExecutor(max_workers=4) as pool:
def get_result(future):
"""
这个是 add_done_callback()方法来添加回调函数,
future.result()为函数运行的结果
:param future:
:return:
"""
crawler_result = future.result()
result.append(crawler_result)
# 这里是分别保存为txt文件
# hot_name = crawler_result.get('hot_name', '')
# file_path = r'%s/%s.txt' % (os.path.join(BASE_DIR, 'result'), hot_name)
# with open(file_path, 'a+', encoding='utf8') as f:
# f.write(str(crawler_result) + '\n')
for future in crawler_list:
pool.submit(future).add_done_callback(get_result)
file_path = r'{}/result.json'.format(os.path.join(BASE_DIR, 'result'))
with open(file_path, 'w', encoding='utf8') as f:
f.write(json.dumps(result, ensure_ascii=False, indent=4))
print('done')
def run():
"""
定时爬取
:return:
"""
print('开启定时任务')
scheduler = BlockingScheduler()
scheduler.add_job(func=run_crawler, trigger='interval', minutes=20) # 每20分钟抓取一次
scheduler.start()
if __name__ == '__main__':
run_crawler() # 单次爬虫运行
run() # 定时爬虫运行
后端数据已经获取完毕,接下来搭建后端服务。目前python的web服务器框架有很多,Django,Flask等。但我们这里功能简单,用Django和Flask都太重了。这里使用了轻量级web服务器wep.py。只需要十来行代码就ok了。
# server.py
import os
import json
import web
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# 路由
urls = (
'/hot', 'hot'
)
app = web.application(urls, globals())
application = app.wsgifunc()
# 视图函数
class hot:
def GET(self):
web.header('content-type','text/json')
web.header("Access-Control-Allow-Origin", "*")
file_path = r'{}/result.json'.format(os.path.join(BASE_DIR, 'result'))
with open(file_path, 'r', encoding='utf8') as f:
json_data = f.read()
return json.dumps(json_data)
if __name__ == "__main__":
app.run()
只需要执行python server.py 就能在本地起一个服务。
前端页面展示这里使用的ui框架是layui。具体代码在html/hot.html 文件下。
服务器部署和运行可参考我的github。https://github.com/pangxiaobin/CrawlerHott
效果图。