mirror of
https://github.com/ClovertaTheTrilobita/SanYeCao-Nonebot.git
synced 2026-04-05 02:56:36 +00:00
feat(cloud_music): 改用三方API实现网易云音乐搜索下载功能
This commit is contained in:
parent
6c5945cdc7
commit
9447ee68ad
2 changed files with 162 additions and 53 deletions
|
|
@ -1,5 +1,6 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
import os
|
import os
|
||||||
|
import httpx
|
||||||
import qrcode
|
import qrcode
|
||||||
import base64
|
import base64
|
||||||
import codecs
|
import codecs
|
||||||
|
|
@ -8,6 +9,8 @@ import requests
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
from Crypto.Cipher import AES
|
from Crypto.Cipher import AES
|
||||||
from graiax import silkcoder
|
from graiax import silkcoder
|
||||||
|
from nonebot import get_driver
|
||||||
|
from typing import Optional
|
||||||
import src.clover_music.cloud_music.agent as agent
|
import src.clover_music.cloud_music.agent as agent
|
||||||
from src.clover_image.delete_file import delete_file
|
from src.clover_image.delete_file import delete_file
|
||||||
|
|
||||||
|
|
@ -15,6 +18,27 @@ from src.clover_image.delete_file import delete_file
|
||||||
requests.packages.urllib3.disable_warnings()
|
requests.packages.urllib3.disable_warnings()
|
||||||
headers = {'User-Agent': agent.get_user_agents(), 'Referer': 'https://music.163.com/'}
|
headers = {'User-Agent': agent.get_user_agents(), 'Referer': 'https://music.163.com/'}
|
||||||
|
|
||||||
|
# 在 cloud_music.py 文件顶部添加
|
||||||
|
|
||||||
|
|
||||||
|
# 全局异步客户端
|
||||||
|
async_client: Optional[httpx.AsyncClient] = None
|
||||||
|
|
||||||
|
# 机器人启动时初始化
|
||||||
|
@get_driver().on_startup
|
||||||
|
async def init_netease_client():
|
||||||
|
global async_client
|
||||||
|
async_client = httpx.AsyncClient(
|
||||||
|
verify=False,
|
||||||
|
timeout=30.0,
|
||||||
|
limits=httpx.Limits(max_connections=100)
|
||||||
|
)
|
||||||
|
|
||||||
|
# 机器人关闭时清理
|
||||||
|
@get_driver().on_shutdown
|
||||||
|
async def close_netease_client():
|
||||||
|
if async_client:
|
||||||
|
await async_client.aclose()
|
||||||
|
|
||||||
# 解密params和encSecKey值
|
# 解密params和encSecKey值
|
||||||
def keys(key):
|
def keys(key):
|
||||||
|
|
@ -233,3 +257,62 @@ async def netease_music_download(song_id, song_name, singer, session):
|
||||||
return None
|
return None
|
||||||
except requests.RequestException as e:
|
except requests.RequestException as e:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
async def music_search(keyword):
|
||||||
|
"""
|
||||||
|
第三方歌曲搜索
|
||||||
|
Args:
|
||||||
|
keyword:
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
url = "https://api.kxzjoker.cn/api/163_search"
|
||||||
|
params = {
|
||||||
|
"name": keyword,
|
||||||
|
"limit": 10,
|
||||||
|
}
|
||||||
|
|
||||||
|
response = await async_client.get(url,params=params)
|
||||||
|
result = response.json()
|
||||||
|
song_id = result["data"][0]["id"]
|
||||||
|
song_name = result["data"][0]["name"]
|
||||||
|
singer = result["data"][0]["artists"][0]["name"]
|
||||||
|
return song_id,song_name,singer
|
||||||
|
|
||||||
|
async def music_download(song_id, song_name, singer):
|
||||||
|
if not os.path.exists(save_path):
|
||||||
|
os.makedirs(save_path)
|
||||||
|
try:
|
||||||
|
headers = {
|
||||||
|
"Referer": "https://kxzjoker.cn/",
|
||||||
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
|
||||||
|
"Origin": "https://kxzjoker.cn"
|
||||||
|
}
|
||||||
|
|
||||||
|
# 构造请求URL
|
||||||
|
url = f'https://api.kxzjoker.cn/api/163_music?ids={song_id}&level=lossless&type=down'
|
||||||
|
|
||||||
|
# 异步流式下载
|
||||||
|
async with async_client.stream("GET",url,headers=headers,follow_redirects=True) as response:
|
||||||
|
response.raise_for_status()
|
||||||
|
|
||||||
|
file_path = os.path.join(save_path, f"{song_name}-{singer}.wav")
|
||||||
|
file_name = os.path.basename(f"{song_name}-{singer}.wav")
|
||||||
|
|
||||||
|
with open(file_path, "wb") as f:
|
||||||
|
async for chunk in response.aiter_bytes(chunk_size=8192):
|
||||||
|
f.write(chunk)
|
||||||
|
|
||||||
|
output_silk_path = os.path.join(save_path, os.path.splitext(file_name)[0] + ".silk")
|
||||||
|
# 使用 graiax-silkcoder 进行转换
|
||||||
|
silkcoder.encode(file_path, output_silk_path, rate=32000, tencent=True, ios_adaptive=True)
|
||||||
|
# 删除临时文件
|
||||||
|
await delete_file(file_path)
|
||||||
|
return output_silk_path
|
||||||
|
|
||||||
|
except httpx.HTTPStatusError as e:
|
||||||
|
print(f"❌ HTTP错误 {e.response.status_code}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"❌ 下载失败:{str(e)}")
|
||||||
|
return None
|
||||||
|
|
|
||||||
|
|
@ -8,64 +8,24 @@ from nonebot.adapters.qq import MessageSegment,MessageEvent
|
||||||
from src.clover_music.cloud_music.cloud_music import *
|
from src.clover_music.cloud_music.cloud_music import *
|
||||||
from src.clover_image.delete_file import delete_file
|
from src.clover_image.delete_file import delete_file
|
||||||
|
|
||||||
unikey_cache = {'unikey': None, 'expires': 0}
|
|
||||||
|
|
||||||
music = on_command("点歌", rule=to_me(), priority=10,block=False)
|
music = on_command("点歌", rule=to_me(), priority=10,block=False)
|
||||||
@music.handle()
|
@music.handle()
|
||||||
async def handle_function(msg: MessageEvent):
|
async def handle_function(msg: MessageEvent):
|
||||||
qr_path = ""
|
|
||||||
keyword = msg.get_plaintext().replace("/点歌", "").strip(" ")
|
|
||||||
|
|
||||||
|
keyword = msg.get_plaintext().replace("/点歌", "").strip(" ")
|
||||||
if keyword == "":
|
if keyword == "":
|
||||||
await music.finish("\n请输入“/点歌+歌曲名”喔🎶")
|
await music.finish("\n请输入“/点歌+歌曲名”喔🎶")
|
||||||
|
|
||||||
#获取登录信息
|
|
||||||
session = requests.session()
|
|
||||||
if not os.path.exists('cloud_music_cookies.cookie'):
|
|
||||||
with open('cloud_music_cookies.cookie', 'wb') as f:
|
|
||||||
pickle.dump(session.cookies, f)
|
|
||||||
# 读取 cookie
|
|
||||||
session.cookies = pickle.load(open('cloud_music_cookies.cookie', 'rb'))
|
|
||||||
session, status,user_id = await netease_cloud_music_is_login(session)
|
|
||||||
if not status:
|
|
||||||
await music.send("登录失效,请联系管理员进行登录")
|
|
||||||
# 检查缓存是否有效(二维码有效期5分钟)
|
|
||||||
if unikey_cache['unikey'] and time.time() < unikey_cache['expires']:
|
|
||||||
unikey = unikey_cache['unikey']
|
|
||||||
else:
|
|
||||||
# 获取新 unikey 并设置过期时间
|
|
||||||
unikey = await get_qr_key(session)
|
|
||||||
unikey_cache.update({
|
|
||||||
'unikey': unikey,
|
|
||||||
'expires': time.time() + 300 # 大约是5分钟有效期 失效时间会有几秒误差
|
|
||||||
})
|
|
||||||
qr_path = await create_qr_code(unikey)
|
|
||||||
"""是否要发送到QQ上面登录 """
|
|
||||||
# await clover_music.send(MessageSegment.file_image(Path(path)))
|
|
||||||
"""是否要发送到QQ上面登录 """
|
|
||||||
for _ in range(60): # 限制最大等待时间5分钟(300秒/5秒间隔)
|
|
||||||
code = await check_qr_code(unikey, session)
|
|
||||||
if code in (803,): break # 成功状态
|
|
||||||
if code not in (801, 802):
|
|
||||||
print('二维码失效' if code == 800 else f'异常状态码:{code}')
|
|
||||||
break
|
|
||||||
await asyncio.sleep(5)
|
|
||||||
with open('cloud_music_cookies.cookie', 'wb') as f:
|
|
||||||
pickle.dump(session.cookies, f)
|
|
||||||
else:
|
|
||||||
#搜索歌曲
|
#搜索歌曲
|
||||||
song_id,song_name,singer,song_url = await netease_music_search(keyword,session)
|
song_id,song_name,singer = await music_search(keyword)
|
||||||
song_name = str(song_name).replace(".", "·").replace("/", "、")
|
song_name = str(song_name).replace(".", "·").replace("/", "、")
|
||||||
if song_id is None:
|
if song_id is None:
|
||||||
await music.finish("\n没有找到歌曲,或检索到的歌曲均为付费喔qwq\n这绝对不是我的错,绝对不是!")
|
await music.finish("\n没有找到歌曲,或检索到的歌曲均为付费喔qwq\n这绝对不是我的错,绝对不是!")
|
||||||
else:
|
else:
|
||||||
await music.send(MessageSegment.text(f" 来源:网易云音乐\n歌曲:{song_name} - {singer}\n请稍等喔🎵"))
|
await music.send(MessageSegment.text(f" 来源:网易云音乐\n歌曲:{song_name} - {singer}\n请稍等喔🎵"))
|
||||||
#返回转换后的歌曲路径
|
#返回转换后的歌曲路径
|
||||||
output_silk_path = await netease_music_download(song_id, song_name, singer,session)
|
output_silk_path = await music_download(song_id, song_name, singer)
|
||||||
|
|
||||||
if output_silk_path == -1:
|
if output_silk_path is None:
|
||||||
await music.send("歌曲音频获取失败:登录信息失效。")
|
|
||||||
elif output_silk_path is None:
|
|
||||||
await music.send("歌曲音频获取失败了Σヽ(゚Д ゚; )ノ,请重试。")
|
await music.send("歌曲音频获取失败了Σヽ(゚Д ゚; )ノ,请重试。")
|
||||||
else:
|
else:
|
||||||
await music.send(MessageSegment.file_audio(Path(output_silk_path)))
|
await music.send(MessageSegment.file_audio(Path(output_silk_path)))
|
||||||
|
|
@ -73,6 +33,72 @@ async def handle_function(msg: MessageEvent):
|
||||||
await delete_file(output_silk_path)
|
await delete_file(output_silk_path)
|
||||||
await music.finish()
|
await music.finish()
|
||||||
|
|
||||||
|
"""
|
||||||
|
由于网易云防爬虫机制,导致验证二维码登录也被限制 现已弃用 替换为三方接口
|
||||||
|
"""
|
||||||
|
# unikey_cache = {'unikey': None, 'expires': 0}
|
||||||
|
#
|
||||||
|
# async def handle_function(msg: MessageEvent):
|
||||||
|
# qr_path = ""
|
||||||
|
# keyword = msg.get_plaintext().replace("/点歌", "").strip(" ")
|
||||||
|
#
|
||||||
|
# if keyword == "":
|
||||||
|
# await music.finish("\n请输入“/点歌+歌曲名”喔🎶")
|
||||||
|
#
|
||||||
|
# #获取登录信息
|
||||||
|
# session = requests.session()
|
||||||
|
# if not os.path.exists('cloud_music_cookies.cookie'):
|
||||||
|
# with open('cloud_music_cookies.cookie', 'wb') as f:
|
||||||
|
# pickle.dump(session.cookies, f)
|
||||||
|
# # 读取 cookie
|
||||||
|
# session.cookies = pickle.load(open('cloud_music_cookies.cookie', 'rb'))
|
||||||
|
# session, status,user_id = await netease_cloud_music_is_login(session)
|
||||||
|
# if not status:
|
||||||
|
# await music.send("登录失效,请联系管理员进行登录")
|
||||||
|
# # 检查缓存是否有效(二维码有效期5分钟)
|
||||||
|
# if unikey_cache['unikey'] and time.time() < unikey_cache['expires']:
|
||||||
|
# unikey = unikey_cache['unikey']
|
||||||
|
# else:
|
||||||
|
# # 获取新 unikey 并设置过期时间
|
||||||
|
# unikey = await get_qr_key(session)
|
||||||
|
# unikey_cache.update({
|
||||||
|
# 'unikey': unikey,
|
||||||
|
# 'expires': time.time() + 300 # 大约是5分钟有效期 失效时间会有几秒误差
|
||||||
|
# })
|
||||||
|
# qr_path = await create_qr_code(unikey)
|
||||||
|
# """是否要发送到QQ上面登录 """
|
||||||
|
# # await clover_music.send(MessageSegment.file_image(Path(path)))
|
||||||
|
# """是否要发送到QQ上面登录 """
|
||||||
|
# for _ in range(60): # 限制最大等待时间5分钟(300秒/5秒间隔)
|
||||||
|
# code = await check_qr_code(unikey, session)
|
||||||
|
# if code in (803,): break # 成功状态
|
||||||
|
# if code not in (801, 802):
|
||||||
|
# print('二维码失效' if code == 800 else f'异常状态码:{code}')
|
||||||
|
# break
|
||||||
|
# await asyncio.sleep(5)
|
||||||
|
# with open('cloud_music_cookies.cookie', 'wb') as f:
|
||||||
|
# pickle.dump(session.cookies, f)
|
||||||
|
# else:
|
||||||
|
# #搜索歌曲
|
||||||
|
# song_id,song_name,singer,song_url = await netease_music_search(keyword,session)
|
||||||
|
# song_name = str(song_name).replace(".", "·").replace("/", "、")
|
||||||
|
# if song_id is None:
|
||||||
|
# await music.finish("\n没有找到歌曲,或检索到的歌曲均为付费喔qwq\n这绝对不是我的错,绝对不是!")
|
||||||
|
# else:
|
||||||
|
# await music.send(MessageSegment.text(f" 来源:网易云音乐\n歌曲:{song_name} - {singer}\n请稍等喔🎵"))
|
||||||
|
# #返回转换后的歌曲路径
|
||||||
|
# output_silk_path = await netease_music_download(song_id, song_name, singer,session)
|
||||||
|
#
|
||||||
|
# if output_silk_path == -1:
|
||||||
|
# await music.send("歌曲音频获取失败:登录信息失效。")
|
||||||
|
# elif output_silk_path is None:
|
||||||
|
# await music.send("歌曲音频获取失败了Σヽ(゚Д ゚; )ノ,请重试。")
|
||||||
|
# else:
|
||||||
|
# await music.send(MessageSegment.file_audio(Path(output_silk_path)))
|
||||||
|
# #删除临时文件
|
||||||
|
# await delete_file(output_silk_path)
|
||||||
|
# await music.finish()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue