相关信息
本代码功能:抓取某电影免费下载网站资料,然后用flask搭建服务,用API方式执行任务,发布到Z-Blog和订阅号。 实现在线操作,不需要本地运行代码。
python# -*- coding: utf-8 -*-
# @Time : 2023/5/19 23:38
# @Author : Kyln.Wu
# @Email : kylnwu@qq.com
# @FileName : app.py
# @IDE : PyCharm
import subprocess
from flask import Flask, request, jsonify
import asyncio
import os
import re
import time
import json
from PIL import Image
import aiofiles
import aiohttp
import httpx
import requests
from lxml import html
from lxml.html import tostring
app = Flask(__name__)
# 这是使用浏览器向服务器发送GET请求,让Flask去调用异步协程do_job()下载图片的API
@app.route('/api', methods=['GET', 'POST'])
# @verify_token # 使用verify_token验证
def api():
print(f"当前时间:{time.strftime('%Y-%m-%d')}")
start_id = request.args.get('s')
end_id = request.args.get('n')
# 为请求创建新事件循环
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# 运行任务
task = loop.create_task(do_job(start_id, end_id))
loop.run_until_complete(task)
return jsonify({'msg': "successed!"})
@app.route('/')
def index():
return "hello world!"
# 这是调用上传图片和资料信息到微信公从号的API
@app.route('/to_wx')
def to_wx():
global access_token
APPID = '你的公众号APPID'
APPSECRET = '你的公众号APPSECRET'
access_token_resp = requests.get(
f'https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={APPID}&secret={APPSECRET}')
json_obj = json.loads(access_token_resp.text)
access_token = json_obj.get('access_token')
if not access_token:
print(f"access_token: {access_token}")
start_id = request.args.get('s')
# print(start_id, type(start_id))
end_id = request.args.get('n')
# 开始上传到公众号
m_details_lst = get_movie_details(start_id, end_id)
to_wx_draft(m_details_lst)
# 移动图片到UploadFile中
mv_result_lst = mv_pics()
return jsonify({'msg': mv_result_lst})
# 合并do_smthing和main任务
async def do_job(start_id, end_id):
url_list = await get_pic_list(start_id, end_id)
await download_pics(url_list)
etree = html.etree
# 构建请求头,根据实现情况修改
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9",
"Cache-Control": "max-age=0",
"Connection": "keep-alive",
"Cookie": "isfirstvisited=false; PHPSESSID=gdep6db1598vdl8lg99uh6jfn1",
"Host": "www.bd51.net",
"Referer": "http://www.bd51.net/",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.87 Safari/537.36 SE 2.X MetaSr 1.0"
}
# 异步方法遍历图片总张数,获取真实的图片地址,返回列表
async def get_pic_list(start, end):
print('正在解析图片地址...')
pic_url_list = []
tasks = []
for i in range(int(start), int(end) + 1):
each_url = f'http://www.bd51.net/index/mdetail/index.html?id={i}'
# print(each_url)
task = asyncio.ensure_future(parse_html(each_url, pic_url_list))
task.add_done_callback(get_urls)
tasks.append(task)
# 异步执行解析图片地址,等待任务完成
await asyncio.wait(tasks)
return pic_url_list
# 异步方法解析网页,返回图片地址列表
async def parse_html(url, pic_url_list):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
res_html = etree.HTML(await resp.text())
try:
part_pic_url = res_html.xpath('/html/body/div[3]/div[1]/div[2]/div[1]/div[2]/img/@src')[0]
pic_url = f"http://www.bd51.net{part_pic_url}"
print(pic_url)
s_name = part_pic_url.rsplit('/', 1)[1]
s_pic_url = f"http://www.bd51.net/UploadFile/{s_name.split('.')[0]}_s.{s_name.split('.')[1]}"
print(s_pic_url)
pic_url_list.append(pic_url)
pic_url_list.append(s_pic_url)
except Exception as e:
print(e)
return pic_url_list
# 异步方法下载图片,并存入文件中
async def aio_download(url, path):
name = url.rsplit('/', 1)[1]
try:
# 异步发送请求http1.x
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as resp:
# print(f'aiohttp异步请求:{resp.status}')
if resp.status == 200:
# 异步写入文件
async with aiofiles.open(f'{path}/{name}', 'wb') as f:
await f.write(await resp.content.read())
pic_resize(path, name)
else:
await httpx_download(url, path)
ch_pics_own_mod(path, name)
print(f"{name}--->>搞定!")
except Exception as e:
print(e)
# 异步模式下载http2.0的链接
async def httpx_download(url, path):
name = url.rsplit('/', 1)[1]
# 也可以写成client = httpx.AsyncClient()
async with httpx.AsyncClient() as client:
# 这里有个坑,httpx.get默认不追踪网页重定向,如果碰到返回代码301,需要加上参数follow_redirects=True
resp = await client.get(url, headers=headers, follow_redirects=True)
# print(f'httpx异步请求:{resp.status_code}')
if resp.status_code == 200:
# 异步写入文件
async with aiofiles.open(f'{path}/{name}', 'wb') as f:
await f.write(resp.content)
pic_resize(path, name)
# 判断下载的图片大小和分辨率,进行压缩处理
def pic_resize(path, name):
# 获取图片文件大小
size = os.path.getsize(f'{path}/{name}') / 1024
print(f'图片大小: {size}KB')
# 获取图片分辨率
img = Image.open(f'{path}/{name}')
print('图片宽度和高度:', img.size)
# 如果大于 1024KB,或宽度大小1080px,进行压缩
if size > 1024 or img.size[0] > 1080:
width = 1080
height_scale = width / img.size[0]
height = int(img.size[1] * height_scale)
img = img.resize((width, height))
print('图片缩小后宽度和高度:', img.size)
# 图像压缩
img.save(f'{path}/{name}', quality=70, optimize=True)
print('压缩后图片大小:', os.path.getsize(f'{path}/{name}') / 1024, 'KB')
else:
print('无需压缩')
def ch_pics_own_mod(path, name):
src_file = f'{path}/{name}'
target_path = '你服务器上存放图片的路径' # 如果是linux服务器,最好写绝对路径,否则可能报错
chown_cmd = ['chown', 'www', src_file]
chmod_cmd = ['chmod', '755', src_file]
# mv_cmd = ['mv', src_file, target_path]
subprocess.run(chown_cmd)
subprocess.run(chmod_cmd)
# subprocess.run(mv_cmd)
print(f'{src_file} chown_mod succeeded')
def mv_pics():
path = create_save_path()
result_lst = []
for root, dirs, files in os.walk(path):
for file in files:
# print(file)
# 使用join函数将文件名称和文件所在根目录连接起来
if any((file.endswith('jpg'), file.endswith('png'))):
src_file = os.path.join(path, file)
# print(src_file)
# 如果是linux服务器,最好写绝对路径,否则可能报错
target_path = '你服务器上存放图片的路径'
mv_cmd = ['mv', src_file, target_path]
mv_res = subprocess.run(mv_cmd)
if mv_res.returncode == 0:
result_lst.append(f'{src_file} mv succeeded')
print(f'{src_file} mv succeeded')
else:
result_lst.append(f'{src_file} mv failed')
print(f'{src_file} mv failed')
print(result_lst)
return result_lst
# 这个函数是配合task.add_done_callback方法用的
# 定义一个任务对象的回调函数,回调函数必须有参数,参数为回调函数的绑定者(任务对象)
def get_urls(task):
result = task.result()
return result
# 按日期自定义保存目录
def create_save_path():
time_folder = time.strftime('%Y-%m-%d')
path = os.path.join(os.getcwd(), time_folder)
if not os.path.exists(path):
os.makedirs(path)
return path
# 异步协程主程序
async def download_pics(urls):
save_path = create_save_path()
tasks = [asyncio.create_task(aio_download(url, save_path)) for url in urls]
await asyncio.wait(tasks)
# ======================================
# 微信公众号发布消息代码开始
# ======================================
def get_movie_details(start_id, end_id):
"""
这个函数是抓取影片资料的,同时过滤一些不要的信息,并重新组合成一个content,
返回一个多条影片资料的列表
"""
movie_infos_lst = []
for k in range(int(start_id), int(end_id) + 1):
movie_infos_dict = {}
url = f"http://www.bd51.net/index/mdetail/index.html?id={k}"
resp = requests.get(url)
code = resp.apparent_encoding # 获取url对应的编码格式
resp.encoding = code
html_code = resp.text
movie_infos_html = etree.HTML(html_code)
movie_cn_name = movie_infos_html.xpath("/html/body/div[3]/div[1]/div[2]/div[1]/div[2]/div/h3/text()")[0]
movie_year = re.findall('\((\d+)\)', movie_cn_name)[0]
part_pic_url = movie_infos_html.xpath('/html/body/div[3]/div[1]/div[2]/div[1]/div[2]/img/@src')[0]
pic_url = f"http://www.bd51.net{part_pic_url}"
imdb_rate = get_imdb_rate(k)
div_tag1 = movie_infos_html.xpath("/html/body/div[3]/div[1]/div[2]/div[1]/div[2]/div")[0]
div_tag2 = movie_infos_html.xpath("/html/body/div[3]/div[1]/div[2]/div[2]")[0]
digest = movie_infos_html.xpath('/html/body/div[3]/div[1]/div[2]/div[2]/div[2]/p//text()')[0]
html1 = tostring(div_tag1, encoding=code).decode(code).replace('\n', '')
html1 = html1.replace('<h3>', '<p><strong>').replace('</h3>', '</strong></p>')
html1 = html1.replace('首映日期:</strong></span>', f'上映年份:</strong>{movie_year}</span>')
html1 = html1.replace('<p class="imob">0</p>', f'<p><img src="{pic_url}"></p>')
html1 = html1.replace(' 00:00:00', '')
html1 = html1.replace('</span><span>', '</span><br><span>')
imdb_partern = re.compile('(人气:</strong>\d+</span>)', re.S)
html1 = re.sub(imdb_partern, f'IMDB:</strong>{imdb_rate}</span>', html1)
html2 = tostring(div_tag2, encoding=code).decode(code).replace('\n', '')
html2_partern = re.compile('(<img.*?g">)', re.S)
html2 = re.sub(html2_partern, '', html2)
html2 = html2.replace('剧情介绍', '</strong>剧情介绍</span>')
# print(html1)
# print(html2)
wx_content = html1 + html2
movie_infos_dict['中文名'] = movie_cn_name
movie_infos_dict['海报'] = pic_url
movie_infos_dict['影片简介'] = wx_content
movie_infos_dict['摘要'] = digest
movie_infos_lst.append(movie_infos_dict)
# print(movie_detail)
time.sleep(1)
return movie_infos_lst
def get_imdb_rate(id):
"""
:param id: 影片ID,接收影片id,到影片列表页进行搜索,匹配豆瓣评分
:return: 如果匹配到了,就结束搜索,返回评分值
"""
for j in range(1, 10):
# 遍历9页搜索影片
url = f"http://www.bd51.net/index/mlist/index.html?page={j}"
res = requests.get(url).text
time.sleep(3)
index_html = etree.HTML(res)
try:
search_xpath = index_html.xpath(f'//*[@id="{id}"]')
if search_xpath:
imdb_rate = index_html.xpath(f'//*[@id="{id}"]/td[4]/p/text()')[0]
# print(imdb_rate)
return imdb_rate
except:
print(f"ID:{id}没找到。")
# 把文章分组并上传到草稿箱
def to_wx_draft(movie_datas_list):
# ==========微信公众号同步发布===========
# 如果爬取资料少于5篇
if len(movie_datas_list) < 5:
# 构造上传多图文消息的data
data = {
"articles": [pack_articles_list(movie_details) for movie_details in movie_datas_list]
}
# print(data)
# 上传到草稿箱
upd_post2cgx(data)
print("上传公众号完成!")
else:
# 如果爬取资料大于5篇,则每5篇合成一篇发送到草稿箱
for i in range(0, len(movie_datas_list), 5):
# 构造上传多图文消息的data
data = {
"articles": [pack_articles_list(movie_details) for movie_details in movie_datas_list[i:i + 5]]
}
# print(data)
# 上传到草稿箱
upd_post2cgx(data)
print("上传公众号完成!")
return
# post到公众号草稿箱
def upd_post2cgx(data):
try:
url = 'https://api.weixin.qq.com/cgi-bin/draft/add?access_token=' + access_token
vx_res = requests.post(url=url, data=json.dumps(data, ensure_ascii=False).encode("utf-8"))
obj = json.loads(vx_res.content)
# print(obj)
return obj['media_id']
except Exception as e:
print(e)
# 获取公众号文章必须的title, digest, content, img_url,构造并返回多图文消息articles的结构
def pack_articles_list(movie_details):
# 获取本次发布的所有文章的title, img_url, content, tags, intro, digest
wx_title = movie_details.get('中文名') + ' 更新上架'
digest = movie_details.get('摘要')[:100]
base_folder = '/你自己定义的存放路径/api/' + time.strftime('%Y-%m-%d')
file_name = movie_details.get('海报').rsplit('/', 1)[1]
print(file_name)
file_path = os.path.join(base_folder, file_name)
if not os.path.exists(base_folder):
os.makedirs(base_folder)
# 2 上传封面获取封面id
wx_fm_img_id, file_path = upd_fm_pic(file_name, file_path)
# 3 上传正文图片到微信公众号,并获取返回的url地址
wx_all_imgs_url = upd_imgs(file_path, file_name)
# 4 用微信公众号返回的图片地址替换content里面的图片地址,获取最后要发布的正文内容
img_url_pattern = re.compile('(https?://[a-zA-Z0-9.?/%-_]*.jpg)', re.S)
content = movie_details.get('影片简介')
wx_content = re.sub(img_url_pattern, wx_all_imgs_url, content)
# print(wx_content)
# 构造多图文消息articles结构
articles_dict = create_post_dict(wx_title, digest, wx_content, wx_fm_img_id)
# print(articles_lst)
return articles_dict
# 2 上传封面图片到微信公众号,并返回封面图片id和本地图片绝对路径
def upd_fm_pic(file_name, file_path):
try:
url = f'https://api.weixin.qq.com/cgi-bin/material/add_material?access_token={access_token}&type=image'
request_file = {
'media': (file_name, open(file_path, 'rb'), 'image/jpeg')}
vx_res = requests.post(url=url, files=request_file)
obj = json.loads(vx_res.content)
print(obj)
return obj['media_id'], file_path
except Exception as e:
print(e)
# 3 上传正文图片到微信公众号,并返回正文图片网址
def upd_imgs(file_path, file_name):
try:
vx_img_url = 'https://api.weixin.qq.com/cgi-bin/media/uploadimg'
request_file = {
'media': (file_name, open(file_path, 'rb'), 'image/jpeg')}
data = {
'access_token': access_token
}
vx_res = requests.post(url=vx_img_url, files=request_file, data=data)
obj = json.loads(vx_res.content)
print(obj)
return obj['url']
except Exception as e:
print(e)
# 4 构造多图文消息体的articles部分
def create_post_dict(wx_title, digest, wx_content, wx_fm_img_id):
articles_dict = {
"title": wx_title,
"author": '',
"digest": digest,
"content": wx_content,
"show_cover_pic": 1,
"need_open_comment": 0,
"only_fans_can_comment": 1,
"thumb_media_id": wx_fm_img_id
}
return articles_dict
# ======================================
# 微信公众号发布消息代码结束
# ======================================
async def do_smthing(start_id, end_id):
# 创建任务对象
# get_pic_list()是一个协程函数对象,需要用asyncio.ensure_future()创建成任务,并去执行才会有结果
url_list_task = asyncio.ensure_future(get_pic_list(start_id, end_id))
# 想要得到任务对象的返回值,需要用task.add_done_callback()方法回调返回值
# 想要回调返回值,绑定回调函数。
url_list_task.add_done_callback(get_urls)
# print(url_list)
return url_list_task
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
pythonbind = '0.0.0.0:5000'
user = 'root'
workers = 2
threads = 4
backlog = 512
timeout = 180
chdir = '这里是你的flask代码存放路径' # 记得要写绝对路径
access_log_format = '%(t)s %(p)s %(h)s "%(r)s" %(s)s %(L)s %(b)s %(f)s" "%(a)s"'
loglevel = 'info'
worker_class = 'sync'
errorlog = chdir + '/logs/error.log'
accesslog = chdir + '/logs/access.log'
pidfile = chdir + '/logs/电影更新api.pid'
本文作者:kyln.wu
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!