diff --git a/src/clover_music/cloud_music/cloud_music.py b/src/clover_music/cloud_music/cloud_music.py index f941d2e..32f962d 100644 --- a/src/clover_music/cloud_music/cloud_music.py +++ b/src/clover_music/cloud_music/cloud_music.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- import os +import httpx import qrcode import base64 import codecs @@ -8,6 +9,8 @@ import requests from io import BytesIO from Crypto.Cipher import AES from graiax import silkcoder +from nonebot import get_driver +from typing import Optional import src.clover_music.cloud_music.agent as agent from src.clover_image.delete_file import delete_file @@ -15,6 +18,27 @@ from src.clover_image.delete_file import delete_file requests.packages.urllib3.disable_warnings() headers = {'User-Agent': agent.get_user_agents(), 'Referer': 'https://music.163.com/'} +# 在 cloud_music.py 文件顶部添加 + + +# 全局异步客户端 +async_client: Optional[httpx.AsyncClient] = None + +# 机器人启动时初始化 +@get_driver().on_startup +async def init_netease_client(): + global async_client + async_client = httpx.AsyncClient( + verify=False, + timeout=30.0, + limits=httpx.Limits(max_connections=100) + ) + +# 机器人关闭时清理 +@get_driver().on_shutdown +async def close_netease_client(): + if async_client: + await async_client.aclose() # 解密params和encSecKey值 def keys(key): @@ -232,4 +256,63 @@ async def netease_music_download(song_id, song_name, singer, session): else: return None except requests.RequestException as e: - return None \ No newline at end of file + return None + + +async def music_search(keyword): + """ + 第三方歌曲搜索 + Args: + keyword: + Returns: + + """ + url = "https://api.kxzjoker.cn/api/163_search" + params = { + "name": keyword, + "limit": 10, + } + + response = await async_client.get(url,params=params) + result = response.json() + song_id = result["data"][0]["id"] + song_name = result["data"][0]["name"] + singer = result["data"][0]["artists"][0]["name"] + return song_id,song_name,singer + +async def music_download(song_id, song_name, singer): + if not os.path.exists(save_path): + os.makedirs(save_path) + try: + headers = { + "Referer": "https://kxzjoker.cn/", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36", + "Origin": "https://kxzjoker.cn" + } + + # 构造请求URL + url = f'https://api.kxzjoker.cn/api/163_music?ids={song_id}&level=lossless&type=down' + + # 异步流式下载 + async with async_client.stream("GET",url,headers=headers,follow_redirects=True) as response: + response.raise_for_status() + + file_path = os.path.join(save_path, f"{song_name}-{singer}.wav") + file_name = os.path.basename(f"{song_name}-{singer}.wav") + + with open(file_path, "wb") as f: + async for chunk in response.aiter_bytes(chunk_size=8192): + f.write(chunk) + + output_silk_path = os.path.join(save_path, os.path.splitext(file_name)[0] + ".silk") + # 使用 graiax-silkcoder 进行转换 + silkcoder.encode(file_path, output_silk_path, rate=32000, tencent=True, ios_adaptive=True) + # 删除临时文件 + await delete_file(file_path) + return output_silk_path + + except httpx.HTTPStatusError as e: + print(f"❌ HTTP错误 {e.response.status_code}") + except Exception as e: + print(f"❌ 下载失败:{str(e)}") + return None diff --git a/src/plugins/cloud_music.py b/src/plugins/cloud_music.py index 385483c..8523d38 100644 --- a/src/plugins/cloud_music.py +++ b/src/plugins/cloud_music.py @@ -8,71 +8,97 @@ from nonebot.adapters.qq import MessageSegment,MessageEvent from src.clover_music.cloud_music.cloud_music import * from src.clover_image.delete_file import delete_file -unikey_cache = {'unikey': None, 'expires': 0} - music = on_command("点歌", rule=to_me(), priority=10,block=False) @music.handle() async def handle_function(msg: MessageEvent): - qr_path = "" - keyword = msg.get_plaintext().replace("/点歌", "").strip(" ") + keyword = msg.get_plaintext().replace("/点歌", "").strip(" ") if keyword == "": await music.finish("\n请输入“/点歌+歌曲名”喔🎶") - - #获取登录信息 - session = requests.session() - if not os.path.exists('cloud_music_cookies.cookie'): - with open('cloud_music_cookies.cookie', 'wb') as f: - pickle.dump(session.cookies, f) - # 读取 cookie - session.cookies = pickle.load(open('cloud_music_cookies.cookie', 'rb')) - session, status,user_id = await netease_cloud_music_is_login(session) - if not status: - await music.send("登录失效,请联系管理员进行登录") - # 检查缓存是否有效(二维码有效期5分钟) - if unikey_cache['unikey'] and time.time() < unikey_cache['expires']: - unikey = unikey_cache['unikey'] - else: - # 获取新 unikey 并设置过期时间 - unikey = await get_qr_key(session) - unikey_cache.update({ - 'unikey': unikey, - 'expires': time.time() + 300 # 大约是5分钟有效期 失效时间会有几秒误差 - }) - qr_path = await create_qr_code(unikey) - """是否要发送到QQ上面登录 """ - # await clover_music.send(MessageSegment.file_image(Path(path))) - """是否要发送到QQ上面登录 """ - for _ in range(60): # 限制最大等待时间5分钟(300秒/5秒间隔) - code = await check_qr_code(unikey, session) - if code in (803,): break # 成功状态 - if code not in (801, 802): - print('二维码失效' if code == 800 else f'异常状态码:{code}') - break - await asyncio.sleep(5) - with open('cloud_music_cookies.cookie', 'wb') as f: - pickle.dump(session.cookies, f) + #搜索歌曲 + song_id,song_name,singer = await music_search(keyword) + song_name = str(song_name).replace(".", "·").replace("/", "、") + if song_id is None: + await music.finish("\n没有找到歌曲,或检索到的歌曲均为付费喔qwq\n这绝对不是我的错,绝对不是!") else: - #搜索歌曲 - song_id,song_name,singer,song_url = await netease_music_search(keyword,session) - song_name = str(song_name).replace(".", "·").replace("/", "、") - if song_id is None: - await music.finish("\n没有找到歌曲,或检索到的歌曲均为付费喔qwq\n这绝对不是我的错,绝对不是!") - else: - await music.send(MessageSegment.text(f" 来源:网易云音乐\n歌曲:{song_name} - {singer}\n请稍等喔🎵")) - #返回转换后的歌曲路径 - output_silk_path = await netease_music_download(song_id, song_name, singer,session) + await music.send(MessageSegment.text(f" 来源:网易云音乐\n歌曲:{song_name} - {singer}\n请稍等喔🎵")) + #返回转换后的歌曲路径 + output_silk_path = await music_download(song_id, song_name, singer) - if output_silk_path == -1: - await music.send("歌曲音频获取失败:登录信息失效。") - elif output_silk_path is None: + if output_silk_path is None: await music.send("歌曲音频获取失败了Σヽ(゚Д ゚; )ノ,请重试。") - else: - await music.send(MessageSegment.file_audio(Path(output_silk_path))) + else: + await music.send(MessageSegment.file_audio(Path(output_silk_path))) #删除临时文件 await delete_file(output_silk_path) await music.finish() +""" +由于网易云防爬虫机制,导致验证二维码登录也被限制 现已弃用 替换为三方接口 +""" +# unikey_cache = {'unikey': None, 'expires': 0} +# +# async def handle_function(msg: MessageEvent): +# qr_path = "" +# keyword = msg.get_plaintext().replace("/点歌", "").strip(" ") +# +# if keyword == "": +# await music.finish("\n请输入“/点歌+歌曲名”喔🎶") +# +# #获取登录信息 +# session = requests.session() +# if not os.path.exists('cloud_music_cookies.cookie'): +# with open('cloud_music_cookies.cookie', 'wb') as f: +# pickle.dump(session.cookies, f) +# # 读取 cookie +# session.cookies = pickle.load(open('cloud_music_cookies.cookie', 'rb')) +# session, status,user_id = await netease_cloud_music_is_login(session) +# if not status: +# await music.send("登录失效,请联系管理员进行登录") +# # 检查缓存是否有效(二维码有效期5分钟) +# if unikey_cache['unikey'] and time.time() < unikey_cache['expires']: +# unikey = unikey_cache['unikey'] +# else: +# # 获取新 unikey 并设置过期时间 +# unikey = await get_qr_key(session) +# unikey_cache.update({ +# 'unikey': unikey, +# 'expires': time.time() + 300 # 大约是5分钟有效期 失效时间会有几秒误差 +# }) +# qr_path = await create_qr_code(unikey) +# """是否要发送到QQ上面登录 """ +# # await clover_music.send(MessageSegment.file_image(Path(path))) +# """是否要发送到QQ上面登录 """ +# for _ in range(60): # 限制最大等待时间5分钟(300秒/5秒间隔) +# code = await check_qr_code(unikey, session) +# if code in (803,): break # 成功状态 +# if code not in (801, 802): +# print('二维码失效' if code == 800 else f'异常状态码:{code}') +# break +# await asyncio.sleep(5) +# with open('cloud_music_cookies.cookie', 'wb') as f: +# pickle.dump(session.cookies, f) +# else: +# #搜索歌曲 +# song_id,song_name,singer,song_url = await netease_music_search(keyword,session) +# song_name = str(song_name).replace(".", "·").replace("/", "、") +# if song_id is None: +# await music.finish("\n没有找到歌曲,或检索到的歌曲均为付费喔qwq\n这绝对不是我的错,绝对不是!") +# else: +# await music.send(MessageSegment.text(f" 来源:网易云音乐\n歌曲:{song_name} - {singer}\n请稍等喔🎵")) +# #返回转换后的歌曲路径 +# output_silk_path = await netease_music_download(song_id, song_name, singer,session) +# +# if output_silk_path == -1: +# await music.send("歌曲音频获取失败:登录信息失效。") +# elif output_silk_path is None: +# await music.send("歌曲音频获取失败了Σヽ(゚Д ゚; )ノ,请重试。") +# else: +# await music.send(MessageSegment.file_audio(Path(output_silk_path))) +# #删除临时文件 +# await delete_file(output_silk_path) +# await music.finish() +