SanYeCao-Nonebot/src/clover_music/cloud_music/cloud_music.py
SlyAimer 72ab72071c fix(cloud_music): 修复csrf_token为None时的返回值问题
chore(requirements): 移除重复的Crypto依赖
2025-03-24 08:48:34 +08:00

239 lines
No EOL
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import os
import qrcode
import base64
import codecs
import json
import requests
from io import BytesIO
from Crypto.Cipher import AES
from graiax import silkcoder
import src.clover_music.cloud_music.agent as agent
from src.clover_image.delete_file import delete_file
requests.packages.urllib3.disable_warnings()
headers = {'User-Agent': agent.get_user_agents(), 'Referer': 'https://music.163.com/'}
# 解密params和encSecKey值
def keys(key):
while len(key) % 16 != 0:
key += '\0'
return str.encode(key)
#登录加密
def AES_aes(t, key, iv):
def p(s): return s + (AES.block_size - len(s) %AES.block_size) * chr(AES.block_size - len(s) % AES.block_size)
encrypt = str(base64.encodebytes(AES.new( keys(key), AES.MODE_CBC,keys(iv)).encrypt(str.encode(p(t)))),encoding='utf-8')
return encrypt
def RSA_rsa(i, e, f):
return format(int(codecs.encode(
i[::-1].encode('utf-8'), 'hex_codec'), 16) ** int(e, 16) % int(f, 16), 'x').zfill(256)
# 获取的参数
key = agent.S() # i6c的值
d = str({'key': key, 'type': "1", 'csrf_token': ""})
e = "010001" # (["流泪", "强"])的值
f = "00e0b509f6259df8642dbc35662901477df22677ec152b5ff68ace615bb7b725152b3ab17a876aea8a5aa76d2e417629ec4ee341f56135fccf695280104e0312ecbda92557c93870114af6c9d05c4f7f0c3685b7a46bee255932575cce10b424d813cfe4875d3e82047b97ddef52741d546b8e289dc6935b3ece0462db0a22b8e7"
g = "0CoJUm6Qyw8W8jud" # (["爱心", "女孩", "惊恐", "大笑"])的值
iv = "0102030405060708" # 偏移量
i = agent.a() # 随机生成长度为16的字符串
def login_params(u):
if u is None:
return AES_aes(AES_aes(d, g, iv), i, iv) # g 和 i 都是key代替
else:
return AES_aes(AES_aes(u,g,iv),i,iv) # g 和 i 都是key代替
def login_encSecKey():
return RSA_rsa(i, e, f)
def get_music(id):
return agent.get_params(id)
"""
使用二维码登录网易云音乐需要先获取二维码的key然后使用该key生成二维码扫描二维码登录最后通过登录接口 返回cookie 保存起来
"""
save_path = os.getcwd()+'/src/clover_music/netease_music'
os.makedirs(save_path, exist_ok=True)
qrcode_path = os.getcwd()+'/src/clover_music'
# 判断cookie是否有效
async def netease_cloud_music_is_login(session):
"""
验证登录状态
Args:
session:
Returns:
"""
try:
session.cookies.load(ignore_discard=True)
except Exception:
pass
csrf_token = session.cookies.get('__csrf')
if csrf_token is None:
return session, False,None
else:
try:
loginurl = session.post(f'https://music.163.com/weapi/w/nuser/account/get?csrf_token={csrf_token}',data={'params': login_params(None), 'encSecKey': login_encSecKey()}, headers=headers).json()
if loginurl['account'] is not None:
return session, True,loginurl['account']['id']
else:
return session, False,None
except BaseException:
return session, False,None
# 获取二维码的key
async def get_qr_key(session):
"""
获取unikey
Args:
session:
Returns:
"""
url = f"https://music.163.com/weapi/login/qrcode/unikey"
data = {"params": login_params(None),"encSecKey": login_encSecKey()}
response = session.post(url, headers=headers,params=data)
result = json.loads(response.text)
if result.get("code") == 200:
unikey = result.get("unikey")
return unikey
else:
return None
# 创建 QRCode 对象
qr = qrcode.QRCode( version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=10, border=4, )
# 生成二维码
async def create_qr_code(unikey):
"""
二维码生成
Args:
unikey:
Returns:
"""
# 添加数据
png_url = f"http://music.163.com/login?codekey={unikey}"
qr.add_data(png_url)
img = qr.make_image()
a = BytesIO()
img.save(a, 'png')
img.save(os.path.join(qrcode_path, 'qrcode.png'))
return qrcode_path + '/qrcode.png'
# 检查二维码状态是否被扫描
async def check_qr_code(unikey,session):
token_url = f"https://music.163.com/weapi/login/qrcode/client/login?csrf_token="
u = str({'key': unikey, 'type': "1", 'csrf_token': ""})
qrcode_data = session.post( token_url,data={'params': login_params(u),'encSecKey': login_encSecKey()},headers=headers).json()
return qrcode_data.get('code')
async def netease_music_search(keyword,session):
"""
歌曲搜索
Args:
keyword:
session:
Returns:
"""
url = "http://music.163.com/api/search/get"
params = {
"s": keyword,
"type": 1, # 1 表示搜索歌曲2 表示搜索专辑3 表示搜索歌手等
"limit": 10, # 限制搜索结果的数量
"offset": 0, # 搜索结果的偏移量,可用于分页
"sub": "false",
}
response = session.get(url, headers=headers, params=params)
data = response.json()
if "result" in data and "songs" in data["result"]:
songs = data["result"]["songs"]
if songs:
first_song = songs[0] # 获取第一首歌曲
song_name = first_song["name"]
singer = first_song["artists"][0]["name"]
song_id = first_song["id"]
song_url = f"https://music.163.com/song?id={song_id}"
print(f"搜索结果:{song_name} - {singer}")
print(f"歌曲链接:{song_url}")
return song_id,song_name,singer,song_url
return None, None, None, None
#仅限于免费歌曲
# def netease_music_download(song_id,song_name,singer,session):
# if not os.path.exists(save_path):
# os.makedirs(save_path)
# download_url = f"http://music.163.com/song/media/outer/url?br=999000&id={song_id}.mp3"
# response = session.get(download_url, headers=headers)
# if response.status_code == 200:
# print(f"正在下载 {song_name} - {singer} 歌曲...")
# file_path = os.path.join(save_path, f"{song_name}-{singer}.mp3")
# file_name = os.path.basename(f"{song_name}-{singer}.mp3")
# with open(file_path, "wb") as f:
# f.write(response.content)
# output_silk_path = os.path.join(save_path, os.path.splitext(file_name)[0] + ".silk")
# # 使用 graiax-silkcoder 进行转换
# silkcoder.encode(file_path, output_silk_path)
# return output_silk_path
# else:
# return None
#所有歌曲都可以下载
async def netease_music_download(song_id, song_name, singer, session):
"""
歌曲下载
Args:
song_id:
song_name:
singer:
session:
Returns:
"""
if not os.path.exists(save_path):
os.makedirs(save_path)
# 获取加密后的歌曲id信息
result = get_music(song_id)
data = {'params': result['encText'], 'encSecKey': result['encSecKey']}
download_url = 'https://music.163.com/weapi/song/enhance/player/url/v1?br=999000'
response_data = session.post(download_url, headers=headers, data=data).json()
url = response_data['data'][0]['url']
if url is None:
return -1
# 下载歌曲
try:
response = requests.get(url, stream=True)
if response.status_code == 200:
file_path = os.path.join(save_path, f"{song_name}-{singer}.wav")
file_name = os.path.basename(f"{song_name}-{singer}.wav")
with open(file_path, "wb") as file:
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
output_silk_path = os.path.join(save_path, os.path.splitext(file_name)[0] + ".silk")
# 使用 graiax-silkcoder 进行转换
silkcoder.encode(file_path, output_silk_path, rate=32000, tencent=True, ios_adaptive=True)
# 删除临时文件
await delete_file(file_path)
return output_silk_path
else:
return None
except requests.RequestException as e:
return None