mirror of
https://github.com/ClovertaTheTrilobita/SanYeCao-Nonebot.git
synced 2026-04-01 22:04:51 +00:00
commit
b4fd7b1332
2 changed files with 39 additions and 46 deletions
|
|
@ -5,7 +5,10 @@ import qrcode
|
|||
import base64
|
||||
import codecs
|
||||
import json
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
import requests
|
||||
from nonebot import logger
|
||||
from io import BytesIO
|
||||
from Crypto.Cipher import AES
|
||||
from graiax import silkcoder
|
||||
|
|
@ -258,61 +261,50 @@ async def netease_music_download(song_id, song_name, singer, session):
|
|||
except requests.RequestException as e:
|
||||
return None
|
||||
|
||||
|
||||
async def music_search(keyword):
|
||||
"""
|
||||
第三方歌曲搜索
|
||||
Args:
|
||||
keyword:
|
||||
Returns:
|
||||
|
||||
"""
|
||||
url = "https://api.kxzjoker.cn/api/163_search"
|
||||
params = {
|
||||
"name": keyword,
|
||||
"limit": 10,
|
||||
}
|
||||
|
||||
response = await async_client.get(url,params=params)
|
||||
result = response.json()
|
||||
song_id = result["data"][0]["id"]
|
||||
song_name = result["data"][0]["name"]
|
||||
singer = result["data"][0]["artists"][0]["name"]
|
||||
return song_id,song_name,singer
|
||||
|
||||
async def music_download(song_id, song_name, singer):
|
||||
async def music_download(song_id):
|
||||
if not os.path.exists(save_path):
|
||||
os.makedirs(save_path)
|
||||
try:
|
||||
headers = {
|
||||
"Referer": "https://kxzjoker.cn/",
|
||||
"Referer": "https://www.byfuns.top/api/1/",
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
|
||||
"Origin": "https://kxzjoker.cn"
|
||||
"Origin": "https://www.byfuns.top"
|
||||
}
|
||||
|
||||
# 构造请求URL
|
||||
url = f'https://api.kxzjoker.cn/api/163_music?ids={song_id}&level=lossless&type=down'
|
||||
url = f'https://www.byfuns.top/api/1/?id={song_id}&level=lossless'
|
||||
|
||||
# 异步流式下载
|
||||
async with async_client.stream("GET",url,headers=headers,follow_redirects=True) as response:
|
||||
response.raise_for_status()
|
||||
# 获取Url内容
|
||||
song_url = requests.request("GET", url, headers=headers)
|
||||
if song_url.status_code == 200:
|
||||
# 异步流式下载
|
||||
async with async_client.stream("GET",song_url.text,headers=headers,follow_redirects=True) as response:
|
||||
response.raise_for_status()
|
||||
if response.status_code == 200:
|
||||
logger.debug(f"下载歌曲ID:{song_id}\nURL:{song_url.text}\n开始下载中...")
|
||||
file_path = os.path.join(save_path, f"{datetime.now().date()}-{uuid.uuid4().hex}-{song_id}.wav")
|
||||
file_name = os.path.basename(f"{datetime.now().date()}-{uuid.uuid4().hex}-{song_id}.wav")
|
||||
|
||||
file_path = os.path.join(save_path, f"{song_name}-{singer}.wav")
|
||||
file_name = os.path.basename(f"{song_name}-{singer}.wav")
|
||||
|
||||
with open(file_path, "wb") as f:
|
||||
async for chunk in response.aiter_bytes(chunk_size=8192):
|
||||
f.write(chunk)
|
||||
|
||||
output_silk_path = os.path.join(save_path, os.path.splitext(file_name)[0] + ".silk")
|
||||
# 使用 graiax-silkcoder 进行转换
|
||||
silkcoder.encode(file_path, output_silk_path, rate=32000, tencent=True, ios_adaptive=True)
|
||||
# 删除临时文件
|
||||
await delete_file(file_path)
|
||||
return output_silk_path
|
||||
with open(file_path, "wb") as f:
|
||||
async for chunk in response.aiter_bytes(chunk_size=8192):
|
||||
f.write(chunk)
|
||||
|
||||
output_silk_path = os.path.join(save_path, os.path.splitext(file_name)[0] + ".silk")
|
||||
# 使用 graiax-silkcoder 进行转换
|
||||
silkcoder.encode(file_path, output_silk_path, rate=32000, tencent=True, ios_adaptive=True)
|
||||
# 删除临时文件
|
||||
await delete_file(file_path)
|
||||
return output_silk_path
|
||||
else:
|
||||
logger.error(f"获取歌曲链接失败,状态码:{song_url.status_code}")
|
||||
return None
|
||||
else:
|
||||
logger.error(f"获取歌曲链接失败,状态码:{song_url.status_code}")
|
||||
return None
|
||||
except httpx.Timeout as e:
|
||||
logger.error(f"TimeoutError: {e}")
|
||||
except httpx.HTTPStatusError as e:
|
||||
print(f"❌ HTTP错误 {e.response.status_code}")
|
||||
logger.error(f"HTTPStatusError: {e}")
|
||||
except Exception as e:
|
||||
print(f"❌ 下载失败:{str(e)}")
|
||||
logger.error(e)
|
||||
return None
|
||||
|
|
|
|||
|
|
@ -16,14 +16,15 @@ async def handle_function(msg: MessageEvent):
|
|||
if keyword == "":
|
||||
await music.finish("\n请输入“/点歌+歌曲名”喔🎶")
|
||||
#搜索歌曲
|
||||
song_id,song_name,singer = await music_search(keyword)
|
||||
session = requests.session()
|
||||
song_id,song_name,singer,song_url = await netease_music_search(keyword,session)
|
||||
song_name = str(song_name).replace(".", "·").replace("/", "、")
|
||||
if song_id is None:
|
||||
await music.finish("\n没有找到歌曲,或检索到的歌曲均为付费喔qwq\n这绝对不是我的错,绝对不是!")
|
||||
else:
|
||||
await music.send(MessageSegment.text(f" 来源:网易云音乐\n歌曲:{song_name} - {singer}\n请稍等喔🎵"))
|
||||
#返回转换后的歌曲路径
|
||||
output_silk_path = await music_download(song_id, song_name, singer)
|
||||
output_silk_path = await music_download(song_id)
|
||||
|
||||
if output_silk_path is None:
|
||||
await music.send("歌曲音频获取失败了Σヽ(゚Д ゚; )ノ,请重试。")
|
||||
|
|
|
|||
Loading…
Reference in a new issue