mirror of
https://github.com/ClovertaTheTrilobita/SanYeCao-Nonebot.git
synced 2026-04-01 22:04:51 +00:00
新增根据BV号发送视频功能(存在问题)
视频资源出现发送失败的BUG
This commit is contained in:
parent
f77fc046f5
commit
6d6edc9a8a
2 changed files with 153 additions and 8 deletions
|
|
@ -1,6 +1,8 @@
|
|||
# https://api.bilibili.com/x/web-interface/search/type?keyword=av28465342&search_type=video&page=1
|
||||
|
||||
import time
|
||||
from pathlib import Path
|
||||
import requests
|
||||
from nonebot import on_command
|
||||
from nonebot.rule import to_me
|
||||
from nonebot.adapters.qq import MessageSegment,MessageEvent, Message
|
||||
|
|
@ -10,7 +12,7 @@ bili_vid = on_command("B站搜索",rule=to_me(), priority=10, block=True)
|
|||
@bili_vid.handle()
|
||||
async def get_bili_vid_info(message: MessageEvent):
|
||||
content = message.get_plaintext().replace("/B站搜索", "").strip()
|
||||
response = biliVideos.get(content)
|
||||
response = biliVideos.get_video_info(content)
|
||||
if response['code'] != 0:
|
||||
bili_vid.finish(response['message'])
|
||||
search_result = response['data']['result']
|
||||
|
|
@ -37,3 +39,59 @@ async def get_bili_vid_info(message: MessageEvent):
|
|||
time.sleep(0.5)
|
||||
|
||||
await bili_vid.finish(f"展示{len(search_result)}条结果中的前3条。")
|
||||
|
||||
|
||||
bili_bv_search = on_command("BV搜索", rule=to_me(), priority=10, block=True)
|
||||
@bili_bv_search.handle()
|
||||
async def get_video_file(message: MessageEvent):
|
||||
keyword = message.get_plaintext().replace("/BV搜索", "").strip().split()
|
||||
P_num, pages, vid_title, vid_author, vid_pic = biliVideos.get_video_pages_info(keyword[0])
|
||||
if len(keyword) == 1:
|
||||
|
||||
if P_num > 1:
|
||||
content = "\n标题:" + vid_title + "\nup主: " + vid_author + "\n该视频为多P播放:\n"
|
||||
for page in pages:
|
||||
content = content + "P" + str(page['page']) + ": " + page['part'] + "\n时长: " + str(page['duration']) + "s\n\n"
|
||||
|
||||
content = content + "请选择您想播放的集数。\n决定好后麻烦回复 /BV搜索+BV号+序号 哦。\n"
|
||||
await bili_bv_search.finish(content)
|
||||
|
||||
elif P_num == 1:
|
||||
|
||||
content = ("\n标题: " + vid_title + "\nup主: " + vid_author + "\n视频加载中~请稍后~~~")
|
||||
msg = Message([
|
||||
MessageSegment.image(vid_pic),
|
||||
MessageSegment.text(content),
|
||||
])
|
||||
await bili_bv_search.send(msg)
|
||||
|
||||
cid = pages[0]['cid']
|
||||
video_url = biliVideos.get_video_file_url(keyword[0], cid)
|
||||
biliVideos.video_download(video_url, cid)
|
||||
# biliVideos.transcode_video(f"{cid}.mp4",f"{cid}-o.mp4")
|
||||
await bili_bv_search.send(Message(MessageSegment.file_video(Path(f"{cid}.mp4"))))
|
||||
|
||||
elif len(keyword) >= 2:
|
||||
|
||||
try:
|
||||
page_num = int(keyword[1])
|
||||
except:
|
||||
await bili_bv_search.finish("输入有误\n请确认是否为 /BV搜索+BV号+序号(数字) ")
|
||||
|
||||
if page_num > len(pages):
|
||||
page_num = len(pages)
|
||||
elif page_num < 1:
|
||||
page_num = 1
|
||||
|
||||
content = ("\n标题: " + vid_title + "\nup主: " + vid_author + "\n正在播放共" + str(P_num) + "P中的第" + str(page_num) + "P" + "\n视频加载中~请稍后~~~")
|
||||
msg = Message([
|
||||
MessageSegment.image(vid_pic),
|
||||
MessageSegment.text(content),
|
||||
])
|
||||
await bili_bv_search.send(msg)
|
||||
|
||||
cid = pages[page_num - 1]['cid']
|
||||
video_url = biliVideos.get_video_file_url(keyword[0], cid)
|
||||
biliVideos.video_download(video_url, cid)
|
||||
|
||||
await bili_bv_search.send(Message(MessageSegment.file_video(Path(f"{cid}.mp4"))))
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
import os
|
||||
import pickle
|
||||
|
||||
import ffmpeg
|
||||
import requests
|
||||
import hashlib
|
||||
import urllib.parse
|
||||
|
|
@ -31,9 +32,7 @@ headers = {
|
|||
|
||||
appkey = '1d8b6e7d45233436'
|
||||
appsec = '560c52ccd288fed045859ed18bffd973'
|
||||
params = {
|
||||
'search_type':'video'
|
||||
}
|
||||
|
||||
|
||||
def appsign(params, appkey, appsec):
|
||||
"""为请求参数进行 APP 签名"""
|
||||
|
|
@ -45,17 +44,105 @@ def appsign(params, appkey, appsec):
|
|||
return params
|
||||
|
||||
|
||||
def get(keyword):
|
||||
def get_video_info(keyword):
|
||||
params = {
|
||||
'search_type': 'video'
|
||||
}
|
||||
|
||||
signed_params = appsign(params, appkey, appsec)
|
||||
query = urllib.parse.urlencode(signed_params)
|
||||
url = f"https://api.bilibili.com/x/web-interface/search/type?keyword={keyword}&"
|
||||
|
||||
session = requests.session()
|
||||
session.get("https://www.bilibili.com/", headers=headers)
|
||||
response = session.get(url + query, headers=headers).json()
|
||||
# print(response['code'])
|
||||
return response
|
||||
# print(signed_params)
|
||||
# print(query)
|
||||
|
||||
def get_video_info_bv(keyword):
|
||||
params = {
|
||||
'bvid': keyword
|
||||
}
|
||||
|
||||
signed_params = appsign(params, appkey, appsec)
|
||||
query = urllib.parse.urlencode(signed_params)
|
||||
url = "https://api.bilibili.com/x/web-interface/view?&"
|
||||
session = requests.session()
|
||||
session.get("https://www.bilibili.com/", headers=headers)
|
||||
response = session.get(url + query, headers=headers).json()
|
||||
return response
|
||||
|
||||
def get_video_pages_info(keyword):
|
||||
"""
|
||||
通过BV号获取视频信息
|
||||
|
||||
:param keyword:
|
||||
|
||||
:return:
|
||||
len(vid_pages_info_list): 视频集数->int
|
||||
vid_pages_info_list: 视频分集信息->dict
|
||||
vid_title: 视频标题->str
|
||||
vid_author: 视频UP主->str
|
||||
vid_pic: 视频封面URL->str
|
||||
"""
|
||||
response = get_video_info_bv(keyword)
|
||||
vid_pages_info_list = response['data']['pages']
|
||||
vid_title = response['data']['title']
|
||||
vid_author = response['data']['owner']['name']
|
||||
vid_pic = response['data']['pic']
|
||||
return len(vid_pages_info_list), vid_pages_info_list, vid_title, vid_author, vid_pic
|
||||
|
||||
def get_video_pages_cid(vid_pages_info_list, num):
|
||||
cid = vid_pages_info_list[num - 1]['cid']
|
||||
return cid
|
||||
|
||||
def get_video_file_url(bvid, cid):
|
||||
params = {
|
||||
'bvid': bvid,
|
||||
'cid': cid,
|
||||
'qn': '16',
|
||||
'platform': 'html5'
|
||||
}
|
||||
|
||||
signed_params = appsign(params, appkey, appsec)
|
||||
query = urllib.parse.urlencode(signed_params)
|
||||
url = "https://api.bilibili.com/x/player/playurl?&"
|
||||
|
||||
session = requests.session()
|
||||
session.get("https://www.bilibili.com/", headers=headers)
|
||||
response = session.get(url + query, headers=headers).json()
|
||||
|
||||
file_url = response['data']['durl'][0]['url']
|
||||
|
||||
return file_url
|
||||
|
||||
def video_download(file_url, cid):
|
||||
response = requests.get(file_url, headers=headers)
|
||||
# 检查请求是否成功
|
||||
if response.status_code == 200:
|
||||
# 将视频保存到本地文件
|
||||
with open(f'{cid}.mp4', 'wb') as file:
|
||||
for chunk in response.iter_content(chunk_size=1024):
|
||||
if chunk:
|
||||
file.write(chunk)
|
||||
print("视频下载完成")
|
||||
else:
|
||||
print(f"下载失败,状态码:{response.status_code}")
|
||||
|
||||
def transcode_video(input_file, output_file):
|
||||
try:
|
||||
# 转码视频为 H.264 编码
|
||||
(
|
||||
ffmpeg.input(input_file)
|
||||
.output(output_file, c="libx264", crf=23, preset="veryfast")
|
||||
.run(capture_stdout=True, capture_stderr=True)
|
||||
)
|
||||
print(f"视频转码完成,输出文件:{output_file}")
|
||||
except ffmpeg.Error as e:
|
||||
print(f"转码失败:{e.stderr.decode()}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
print(get('海南某211台风过后现状111'))
|
||||
print(get_video_info('海南某211台风过后现状111'))
|
||||
print(get_video_file_url('BV1y7411Q7Eq', '171776208'))
|
||||
|
||||
print(get_video_pages_info('BV1y7411Q7Eq'))
|
||||
|
|
|
|||
Loading…
Reference in a new issue