Merge remote-tracking branch 'origin/master'

# Conflicts:
#	src/configs/api_config_example.py
This commit is contained in:
ClovertaTheTrilobita 2025-04-08 17:32:11 +08:00
commit 709da98694
10 changed files with 225 additions and 79 deletions

7
.gitignore vendored
View file

@ -18,6 +18,7 @@ bili.cookie
*.mp4
/bili.cookie
/src/configs/api_config.py
/src/configs/jm_config.yml
/src/resources/temp
/update_remote_code.py
/src/resources/image/report/*
@ -28,3 +29,9 @@ src/clover_lightnovel/wenku8.cookie
src/clover_lightnovel/output1.html
*.pyc
/src/resources/image/jm/*
Elysia
runtime
/src/configs/tts
bot.pid

View file

@ -3,6 +3,7 @@ from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
from email.mime.text import MIMEText
import aiosmtplib
from nonebot import logger
from src.configs.api_config import google_smtp_server,google_email,google_password
from src.configs.api_config import qq_smtp_server,qq_email,qq_password
@ -123,7 +124,7 @@ async def send_email_by_google(receiver_email: str, file_path: str):
try:
# 验证文件存在性
if not os.path.isfile(file_path):
print(f"文件不存在:{file_path}")
logger.error(f"文件不存在:{file_path}")
return False
# 添加单个文件附件
@ -145,11 +146,10 @@ async def send_email_by_google(receiver_email: str, file_path: str):
) as server:
await server.login(google_email, google_password)
await server.send_message(msg)
print("文件邮件发送成功!")
return True
except Exception as e:
print(f"邮件发送失败: {str(e)}")
logger.error(f"邮件发送失败:{e}")
return False
async def send_email_by_qq(receiver_email: str, file_path: str):
@ -162,7 +162,7 @@ async def send_email_by_qq(receiver_email: str, file_path: str):
try:
if not os.path.exists(file_path):
print(f"文件不存在:{file_path}")
logger.error(f"文件不存在:{file_path}")
return False
# 添加附件
@ -187,5 +187,5 @@ async def send_email_by_qq(receiver_email: str, file_path: str):
print("QQ文件邮件发送成功")
return True
except Exception as e:
print(f"QQ邮件发送失败: {str(e)}")
logger.error(f"QQ邮件发送失败{e}")
return False

View file

@ -4,7 +4,9 @@ import zipfile
from pathlib import Path
from PIL import Image
from natsort import natsorted
from nonebot import logger
__name__ = "cliver_jm | disguise_pdf"
async def webp_to_pdf(input_folder, output_pdf):
"""
@ -17,7 +19,8 @@ async def webp_to_pdf(input_folder, output_pdf):
)
if not webp_files:
print("未找到WebP图片")
logger.error("未找到WebP图片")
return False
images = []
for webp_file in webp_files:
@ -31,10 +34,10 @@ async def webp_to_pdf(input_folder, output_pdf):
else:
images.append(img.convert('RGB'))
except Exception as e:
print(f"处理失败 {webp_file}: {e}")
logger.error(f"处理失败 {webp_file}: {e}")
if not images:
print("无有效图片")
logger.error("无有效图片")
images[0].save(
output_pdf,
@ -43,6 +46,7 @@ async def webp_to_pdf(input_folder, output_pdf):
optimize=True,
quality=80
)
return True
async def batch_convert_subfolders(base_dir,output_dir):
"""

View file

@ -1,25 +1,33 @@
import yaml
import uuid
import jmcomic
from datetime import datetime
from src.configs.api_config import qrserver_url,qrserver_size,anonfile_download_url
from src.clover_jm.disguise_pdf import *
from concurrent.futures import ThreadPoolExecutor
from src.configs.path_config import jm_path,jm_config_path
from src.clover_providers.cloud_file_api import anonfile
from src.clover_image.delete_file import delete_folder,delete_file
from src.clover_email.send_email import send_email_by_google,send_email_by_qq
__name__ = "clover | jm_comic"
# 创建线程池
jm_executor = ThreadPoolExecutor(max_workers=5)
jm_executor.submit(lambda: None).result()
async def download_jm(album_id: str| None,receiver_email: str| None):
async def download_jm_Pemail(album_id: str| None,receiver_email: str| None):
# 修改配置文件的下载路径
source_path = await get_jm_config(receiver_email)
option = jmcomic.JmOption.from_file(jm_config_path)
# 还原配置文件
await recover_jm_config(source_path)
#调用JM下载api
try:
album_detail,downloader = await asyncio.get_event_loop().run_in_executor(jm_executor,jmcomic.download_album,album_id,option)
if album_detail.title is None:
return "下载失败,请检查JM ID 是否正确"
except Exception as e:
logger.error(f"下载失败 :{e}")
return "下载失败,请重试"
# 创建变量
folder_path = f"{jm_path}{receiver_email}"
zip_path = f"{jm_path}{album_detail.title}.zip"
@ -40,13 +48,53 @@ async def download_jm(album_id: str| None,receiver_email: str| None):
await delete_file(zip_path)
return "发送邮件失败,请重试!"
async def download_jm_qr(album_id: str| None):
# 修改配置文件的下载路径
file_name = f"JM-{album_id}-{datetime.now().date()}@{uuid.uuid4().hex}"
source_path = await get_jm_config(file_name)
option = jmcomic.JmOption.from_file(jm_config_path)
# 还原配置文件
await recover_jm_config(source_path)
#调用JM下载api
album_detail,downloader = await asyncio.get_event_loop().run_in_executor(jm_executor,jmcomic.download_album,album_id,option)
if album_detail.title is None:
return {
"msg":"下载失败,请检查JM ID 是否正确"
}
# 创建变量
folder_path = f"{jm_path}{album_detail.title}"
pdf_path = f"{jm_path}{file_name}.pdf"
# 转为pdf
pdf_status = await webp_to_pdf(folder_path,pdf_path)
if not pdf_status:
await delete_folder(folder_path)
return {
"msg":"PDF转换失败"
}
# 发送文件
send_status = await anonfile.upload_file(pdf_path)
if send_status["success"]== "true":
file_code=send_status["code"]
# 删除文件
await delete_folder(pdf_path)
await delete_folder(folder_path)
return {
"msg":"获取成功~!码上下载!~",
"qr_code": f"{qrserver_url}?size={qrserver_size}&data={anonfile_download_url}{file_code}"
}
else:
await delete_folder(pdf_path)
await delete_folder(folder_path)
return {
"msg":"发送失败,请重试!"
}
async def get_jm_config(receiver_email: str):
async def get_jm_config(file_name: str):
with open(jm_config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
source_path = config['dir_rule']['base_dir']
new_base_dir = str(Path(source_path) / receiver_email)
new_base_dir = str(Path(source_path) / file_name)
config['dir_rule']['base_dir'] = new_base_dir
with open(jm_config_path, 'w', encoding='utf-8') as f:
yaml.dump(config, f, sort_keys=False, allow_unicode=True)

View file

@ -62,7 +62,7 @@ def get_music(id):
"""
save_path = os.getcwd()+'/src/clover_music/netease_music'
os.makedirs(save_path, exist_ok=True)
qrcode_path = os.getcwd()+'/src/clover_music'
qrcode_path = os.getcwd()+'/src/clover_music/'
# 判断cookie是否有效
@ -112,8 +112,6 @@ async def get_qr_key(session):
else:
return None
# 创建 QRCode 对象
qr = qrcode.QRCode( version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=10, border=4, )
# 生成二维码
async def create_qr_code(unikey):
"""
@ -124,14 +122,12 @@ async def create_qr_code(unikey):
Returns:
"""
# 添加数据
png_url = f"http://music.163.com/login?codekey={unikey}"
qr.add_data(png_url)
img = qr.make_image()
a = BytesIO()
img.save(a, 'png')
img.save(os.path.join(qrcode_path, 'qrcode.png'))
return qrcode_path + '/qrcode.png'
qr = qrcode.QRCode(version=1,error_correction=qrcode.constants.ERROR_CORRECT_L,box_size=10,border=4)
qr.add_data(f"https://music.163.com/login?codekey={unikey}")
#保存二维码
img_path = os.path.join(qrcode_path, 'qrcode.png')
qr.make_image().save(img_path)
return img_path
# 检查二维码状态是否被扫描
async def check_qr_code(unikey,session):

View file

@ -0,0 +1,48 @@
import requests
from nonebot import logger
__name__ = 'Anonfile Api'
async def upload_file(file_path):
"""
### 上传文件到anonfile并返回 \n
:param file_path: 上传文件路径
:return: {\n
"success": true,\n
"code": "文件码",\n
"message": "File uploaded successfully",\n
"isProtected": fales\n
}
### GET获取文件信息:
https://anonfile.io/f/{ 文件码 }
### GET下载文件:
https://anonfile.io/api/download/{ 文件码 }
"""
try:
# 创建Data对象并添加文件
files = {'file': open(file_path, 'rb')}
upload_settings = {
'password': '', # 密码
'expiryDays': 7, # 过期天数 7、30
}
# 如果设置了密码和过期天数则添加到Data中
data = {}
if upload_settings['password']:
data['password'] = upload_settings['password']
data['expiryDays'] = str(upload_settings['expiryDays'])
# 发送上传请求
response = requests.post('https://anonfile.io/api/upload', files=files, data=data)
# 检查响应状态码
if response.status_code == 200:
return response.json()
else:
logger.error('Upload failed:', response.status_code, response.text)
except Exception as error:
# 捕获并处理上传过程中的错误
logger.error('Upload error:', error)

View file

@ -54,6 +54,17 @@ wenku8_password = "<passwd>"
"""
proxy_api = "<KEY>"
"""
二维码生成 API 参数
"""
qrserver_url = "https://api.qrserver.com/v1/create-qr-code/"
qrserver_size= "200x200"
"""
anonfile API 参数
"""
anonfile_download_url = "https://anonfile.io/api/download/"
"""
CodeForces API
"""

View file

@ -7,6 +7,6 @@ download:
cache: true # 如果要下载的文件在磁盘上已存在不用再下一遍了吧默认为true
image:
decode: true # JM的原图是混淆过的要不要还原默认为true
suffix: .jpg # 把图片都转为.jpg格式默认为null表示不转换。
suffix: null # 把图片都转为.jpg格式默认为null表示不转换。
threading:
photo: 5
photo: 1

View file

@ -8,15 +8,18 @@ from nonebot.adapters.qq import MessageSegment,MessageEvent
from src.clover_music.cloud_music.cloud_music import *
from src.clover_image.delete_file import delete_file
unikey_cache = {'unikey': None, 'expires': 0}
music = on_command("点歌", rule=to_me(), priority=10,block=False)
@music.handle()
async def handle_function(msg: MessageEvent):
qr_path = ""
keyword = msg.get_plaintext().replace("/点歌", "").strip(" ")
if keyword == "":
await music.finish("\n请输入“/点歌+歌曲名”喔🎶")
#获取登录信息 可以获取更换高音质
#获取登录信息
session = requests.session()
if not os.path.exists('cloud_music_cookies.cookie'):
with open('cloud_music_cookies.cookie', 'wb') as f:
@ -26,27 +29,30 @@ async def handle_function(msg: MessageEvent):
session, status,user_id = await netease_cloud_music_is_login(session)
if not status:
await music.send("登录失效,请联系管理员进行登录")
# 检查缓存是否有效二维码有效期5分钟)
if unikey_cache['unikey'] and time.time() < unikey_cache['expires']:
unikey = unikey_cache['unikey']
else:
# 获取新 unikey 并设置过期时间
unikey = await get_qr_key(session)
path = await create_qr_code(unikey)
unikey_cache.update({
'unikey': unikey,
'expires': time.time() + 300 # 大约是5分钟有效期 失效时间会有几秒误差
})
qr_path = await create_qr_code(unikey)
"""是否要发送到QQ上面登录 """
# await clover_music.send(MessageSegment.file_image(Path(path)))
"""是否要发送到QQ上面登录 """
while True:
for _ in range(60): # 限制最大等待时间5分钟300秒/5秒间隔
code = await check_qr_code(unikey, session)
if '801' in str(code):
print('二维码未失效,请扫码!')
elif '802' in str(code):
print('已扫码,请确认!')
elif '803' in str(code):
print('已确认,登入成功!')
if code in (803,): break # 成功状态
if code not in (801, 802):
print('二维码失效' if code == 800 else f'异常状态码:{code}')
break
else:
break
await asyncio.sleep(2)
await asyncio.sleep(5)
with open('cloud_music_cookies.cookie', 'wb') as f:
pickle.dump(session.cookies, f)
else:
#搜索歌曲
song_id,song_name,singer,song_url = await netease_music_search(keyword,session)
song_name = str(song_name).replace(".", "·").replace("/", "")
@ -63,7 +69,6 @@ async def handle_function(msg: MessageEvent):
await music.send("歌曲音频获取失败了Σヽ(゚Д ゚; )ノ,请重试。")
else:
await music.send(MessageSegment.file_audio(Path(output_silk_path)))
#删除临时文件
await delete_file(output_silk_path)
await music.finish()

View file

@ -1,23 +1,50 @@
import re
from nonebot import logger
from nonebot.rule import to_me
from nonebot.plugin import on_command
from nonebot.adapters.qq import MessageEvent
from src.clover_jm.jm_comic import download_jm
from nonebot.adapters.qq import MessageEvent, MessageSegment,Message
from src.clover_jm.jm_comic import download_jm_qr, download_jm_Pemail
from nonebot.exception import FinishedException
__name__ = "JM_Download"
jm = on_command("jm", rule=to_me(), priority=10, block=False)
async def handle_email_download(album_id: str, email: str):
"""处理邮箱发送逻辑"""
if not validate_email(email):
await jm.finish("邮箱格式不正确!")
await jm.send("正在发送中,请稍等~")
msg = await download_jm_Pemail(album_id=album_id, receiver_email=email)
await jm.finish(msg)
async def handle_qrcode_download(album_id: str):
"""处理二维码发送载逻辑"""
await jm.send("正在下载中,请稍等~")
msgs = await download_jm_qr(album_id=album_id)
if "qr_code" not in msgs:
await jm.finish(msgs["msg"])
msg = Message([
MessageSegment.text(msgs["msg"]),
MessageSegment.image(msgs["qr_code"])
])
await jm.finish(msg)
@jm.handle()
async def handle_function(message: MessageEvent):
values = message.get_plaintext().replace("/jm", "").split(" ")
if len(values) != 3:
await jm.finish("请输入正确的格式 /jm+id+邮箱号")
else:
if not validate_email(values[2]):
await jm.finish("邮箱格式不正确!")
await jm.send("正在发送中,请稍等~")
msg = await download_jm(album_id = values[1],receiver_email = values[2])
await jm.finish(msg)
values = message.get_plaintext().replace("/jm", "").split()
try:
if len(values) == 0 or not all(values[1:len(values)]):
await jm.finish("请输入正确的格式 /jm+id 或 /jm+id+邮箱号")
elif len(values) == 1:
await handle_qrcode_download(values[0])
elif len(values) == 2:
await handle_email_download(values[0], values[1])
except Exception as e:
if isinstance(e, FinishedException):
return
logger.error(f"处理请求时发生错误: {e}")
await jm.finish("处理请求时发生错误,请稍后重试")
def validate_email(email: str) -> bool:
"""验证邮箱格式是否合法"""