feat(plugins): 添加绝对色感游戏并优化图片处理

- 新增绝对色感游戏插件,包含颜色生成和猜测逻辑
- 重构部分插件,使用异步函数处理网络请求和图片生成
- 优化图片下载和删除操作,提高代码复用性- 更新命令列表和菜单显示,增加新功能入口
This commit is contained in:
SlyAimer 2025-02-28 14:19:15 +08:00
parent 3b96de2a91
commit 7eb40ad582
17 changed files with 189 additions and 57 deletions

View file

@ -0,0 +1,101 @@
import random
from PIL import Image, ImageDraw
async def generate_diff_color(base_color, level):
"""生成差异颜色"""
factor = 1.1
if level == 2:
factor = 1.08
elif level == 3:
factor = 1.05
elif level == 4:
factor = 1.02
new_color = [
min(255, max(0, int(c * factor)))
for c in base_color
]
return tuple(new_color)
async def generate_base_color():
"""生成基础颜色"""
return (
random.randint(0, 255),
random.randint(0, 255),
random.randint(0, 255)
)
# 原类方法转换为独立函数
async def init_game_state(level_decision):
"""初始化"""
level = 1 if level_decision == "初级" else 2 if level_decision == "中级" else 3 if level_decision == "高级" else 4
size = 3 if level == 1 else 5 if level == 2 else 7 if level == 3 else 9
return {
"level": level,
"size": size,
"cell_size": 50,
"base_color": await generate_base_color(),
"diff_color": None,
"target_row": None,
"target_col": None
}
async def generate_new_level(state):
"""生成新关卡替代generate_new_level方法"""
state["base_color"] = await generate_base_color()
state["diff_color"] = await generate_diff_color(state["base_color"], state["level"])
state["target_row"] = random.randint(0, state["size"] - 1)
state["target_col"] = random.randint(0, state["size"] - 1)
return state
async def create_image(state, image_path):
"""创建图片"""
# 计算画布尺寸
image_width = state["size"] * state["cell_size"]
image_height = state["size"] * state["cell_size"]
image = Image.new("RGB", (image_width, image_height), "white")
draw = ImageDraw.Draw(image)
for row in range(state["size"]):
for col in range(state["size"]):
x1 = col * state["cell_size"]
y1 = row * state["cell_size"]
x2 = x1 + state["cell_size"]
y2 = y1 + state["cell_size"]
color = state["diff_color"] if (row == state["target_row"] and col == state["target_col"]) else state["base_color"]
draw.rectangle([x1, y1, x2, y2], fill=color)
image.save(image_path)
return [image_path, state["target_row"], state["target_col"]]
# 示例:在插件中组合使用这些函数
async def game_flow(level_decision: str, file_path: str):
game_state = await init_game_state(level_decision)
updated_state = await generate_new_level(game_state)
return await create_image(updated_state, file_path)
async def check_guess(guess, target_row, target_col):
"""验证答案"""
if guess == "#":
return True, "游戏结束!"
if len(guess) != 2 or not guess.isdigit():
return False, "请输入两位数字例如12输入 # 退出游戏"
guessed_row = int(guess[0]) - 1
guessed_col = int(guess[1]) - 1
return (guessed_row == target_row and guessed_col == target_col), "恭喜!回答正确!" if (
guessed_row == target_row and guessed_col == target_col) else "回答错误,请再试一次!"

View file

@ -1,6 +1,6 @@
import requests
def download_image(url,file_path):
async def download_image(url,file_path):
"""
下载图片
:param url:

View file

@ -6,7 +6,7 @@ from src.configs.path_config import image_local_path
from src.configs.api_config import smms_token,smms_image_upload_history,ju_he_token,ju_he_image_list
"""本地图片"""
def get_image_names():
async def get_image_names():
image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp'] # 定义常见的图片文件扩展名
image_names = []
for root, dirs, files in os.walk(image_local_path):
@ -18,7 +18,7 @@ def get_image_names():
return local_image_path
""" sm.ms 图床"""
def get_smms_image_url():
async def get_smms_image_url():
# 定义请求的参数
data = requests.get(smms_image_upload_history, headers={'Authorization': smms_token}, params={"page": "1"}).json().get('data')
urls = [item['url'] for item in data]
@ -26,7 +26,7 @@ def get_smms_image_url():
return random_url
"""聚合图床"""
def get_juhe_image_url():
async def get_juhe_image_url():
# 定义请求的参数
params = {"token": ju_he_token,"f": "json","categories": "猫羽雫","page": 1, "size": 400}
random_url = random.choice(requests.get(ju_he_image_list, params=params).json().get('docs', [])).get('url')

View file

@ -36,9 +36,3 @@ def download_qq_image_by_account(account):
file.write(response.content) # 将响应内容写入文件
return save_path
"""删除QQ头像"""
def qq_image_delete():
for root, dirs, files in os.walk(image_local_qq_image_path):
for file in files:
file_path = os.path.join(root, file)
os.remove(file_path)

View file

@ -6,10 +6,10 @@ import codecs
import json
import requests
from io import BytesIO
from random import Random
from Crypto.Cipher import AES
from graiax import silkcoder
import src.clover_music.cloud_music.agent as agent
from src.clover_image.delete_file import delete_file
requests.packages.urllib3.disable_warnings()
@ -66,7 +66,7 @@ qrcode_path = os.getcwd()+'/src/clover_music'
# 判断cookie是否有效
def netease_cloud_music_is_login(session):
async def netease_cloud_music_is_login(session):
try:
session.cookies.load(ignore_discard=True)
except Exception:
@ -88,7 +88,7 @@ def netease_cloud_music_is_login(session):
return session, False
# 获取二维码的key
def get_qr_key(session):
async def get_qr_key(session):
url = f"https://music.163.com/weapi/login/qrcode/unikey"
data = {"params": login_params(None),"encSecKey": login_encSecKey()}
response = session.post(url, headers=headers,params=data)
@ -102,7 +102,7 @@ def get_qr_key(session):
# 创建 QRCode 对象
qr = qrcode.QRCode( version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=10, border=4, )
# 生成二维码
def create_qr_code(unikey):
async def create_qr_code(unikey):
# 添加数据
png_url = f"http://music.163.com/login?codekey={unikey}"
qr.add_data(png_url)
@ -113,13 +113,13 @@ def create_qr_code(unikey):
return qrcode_path + '/qrcode.png'
# 检查二维码状态是否被扫描
def check_qr_code(unikey,session):
async def check_qr_code(unikey,session):
token_url = f"https://music.163.com/weapi/login/qrcode/client/login?csrf_token="
u = str({'key': unikey, 'type': "1", 'csrf_token': ""})
qrcode_data = session.post( token_url,data={'params': login_params(u),'encSecKey': login_encSecKey()},headers=headers).json()
return qrcode_data.get('code')
def netease_music_search(keyword,session):
async def netease_music_search(keyword,session):
url = "http://music.163.com/api/search/get"
params = {
"s": keyword,
@ -163,7 +163,7 @@ def netease_music_search(keyword,session):
# return None
#所有歌曲都可以下载
def netease_music_download(song_id,song_name,singer,session):
async def netease_music_download(song_id,song_name,singer,session):
if not os.path.exists(save_path):
os.makedirs(save_path)
@ -189,15 +189,12 @@ def netease_music_download(song_id,song_name,singer,session):
output_silk_path = os.path.join(save_path, os.path.splitext(file_name)[0] + ".silk")
# 使用 graiax-silkcoder 进行转换
silkcoder.encode(file_path, output_silk_path,rate=32000 ,tencent=True,ios_adaptive=True)
#删除临时文件
await delete_file(file_path)
return output_silk_path
else:
return None
def netease_music_delete():
for root, dirs, files in os.walk(save_path):
for file in files:
file_path = os.path.join(root, file)
os.remove(file_path)

View file

@ -8,7 +8,7 @@ from nonebot.adapters.qq import MessageSegment,MessageEvent, Message
import src.clover_videos.billibili.biliVideos as biliVideos
from src.configs.path_config import video_path
bili_vid = on_command("B站搜索",rule=to_me(), priority=10, block=True)
bili_vid = on_command("B站搜索",rule=to_me(), priority=10)
@bili_vid.handle()
async def get_bili_vid_info(message: MessageEvent):
content = message.get_plaintext().replace("/B站搜索", "").strip()
@ -41,7 +41,7 @@ async def get_bili_vid_info(message: MessageEvent):
await bili_vid.finish(f"展示{len(search_result)}条结果中的前3条。")
bili_bv_search = on_command("BV搜索", rule=to_me(), priority=10, block=True)
bili_bv_search = on_command("BV搜索", rule=to_me(), priority=10)
@bili_bv_search.handle()
async def get_video_file(message: MessageEvent):
keyword = message.get_plaintext().replace("/BV搜索", "").strip().split()

View file

@ -10,10 +10,11 @@ from src.clover_sqlite.models.user import UserList
menu = ["/重启","/今日运势","/今日塔罗","/图","/日报","/点歌","/摸摸头","/群老婆","/今日老婆", "/开启ai","/关闭ai","/角色列表","/添加人设", "/更新人设", "/删除人设", "/切换人设", "/管理员注册",
"/待办", "/test","/天气","我喜欢你", "", "/待办查询", "/新建待办", "/删除待办" ,"/cf","/B站搜索", "/BV搜索", "/喜报", "/悲报", "/luxun","/鲁迅说",
"/奶龙", "/repo", "/info", "/menu", "/轻小说","/本季新番","/新番观察"]
"/奶龙", "/repo", "/info", "/menu", "/轻小说","/本季新番","/新番观察","/绝对色感"]
send_menu = ["/menu","/今日运势","/今日塔罗","/图","/日报","/点歌","/摸摸头","/群老婆","/待办","/天气",
"/待办查询", "/新建待办", "/删除待办" ,"/cf","/B站搜索", "/BV搜索", "/喜报", "/悲报","/鲁迅说","/轻小说","/本季新番","/新番观察"]
"/待办查询", "/新建待办", "/删除待办" ,"/cf","/B站搜索", "/BV搜索", "/喜报", "/悲报","/鲁迅说",
"/轻小说","/本季新番","/新番观察","/绝对色感"]
async def check_value_in_menu(message: MessageEvent) -> bool:
value = message.get_plaintext().strip().split(" ")
@ -23,13 +24,13 @@ async def check_value_in_menu(message: MessageEvent) -> bool:
group_id = "C2C" # 非群聊消息存为c2c
#缓存用户id
await UserList.insert_user(message.author.id,group_id)
if value[0] in menu:
if value[0] in menu or value[0].isdigit() or value[0] == "#":
return False
else:
return True
check = on_message(rule=to_me() & Rule(check_value_in_menu) ,block=True, priority=10)
check = on_message(rule=to_me() & Rule(check_value_in_menu), priority=10)
@check.handle()
async def handle_function(message: MessageEvent):
@ -41,7 +42,7 @@ async def handle_function(message: MessageEvent):
member_openid, content = message.author.id, message.get_plaintext()
status = await GroupChatRole.is_on(group_openid)
if status:
msg = await ai_chat.silicon_flow(group_openid,content)
msg = await ai_chat.deepseek_chat(group_openid,content)
await check.finish(msg)
else:
await check.finish(message=Message(random.choice(text_list)))

View file

@ -5,8 +5,9 @@ from nonebot import on_command
from nonebot.rule import to_me
from nonebot.adapters.qq import MessageSegment,MessageEvent
from src.clover_music.cloud_music.cloud_music import *
from src.clover_image.delete_file import delete_file
music = on_command("点歌", rule=to_me(), priority=10, block=True)
music = on_command("点歌", rule=to_me(), priority=10)
@music.handle()
async def handle_function(msg: MessageEvent):
keyword = msg.get_plaintext().replace("/点歌", "").strip(" ")
@ -21,18 +22,18 @@ async def handle_function(msg: MessageEvent):
pickle.dump(session.cookies, f)
# 读取 cookie
session.cookies = pickle.load(open('cloud_music_cookies.cookie', 'rb'))
session, status = netease_cloud_music_is_login(session)
session, status = await netease_cloud_music_is_login(session)
if not status:
await music.send("登录失效,请联系管理员进行登录")
unikey = get_qr_key(session)
path = create_qr_code(unikey)
unikey = await get_qr_key(session)
path = await create_qr_code(unikey)
"""是否要发送到QQ上面登录 """
# await clover_music.send(MessageSegment.file_image(Path(path)))
"""是否要发送到QQ上面登录 """
while True:
code = check_qr_code(unikey, session)
code = await check_qr_code(unikey, session)
if '801' in str(code):
print('二维码未失效,请扫码!')
elif '802' in str(code):
@ -47,14 +48,14 @@ async def handle_function(msg: MessageEvent):
pickle.dump(session.cookies, f)
#搜索歌曲
song_id,song_name,singer,song_url = netease_music_search(keyword,session)
song_id,song_name,singer,song_url = await netease_music_search(keyword,session)
song_name = str(song_name).replace(".", "·").replace("/", "")
if song_id is None:
await music.finish("\n没有找到歌曲或检索到的歌曲均为付费喔qwq\n这绝对不是我的错,绝对不是!")
else:
await music.send(MessageSegment.text(f" 来源:网易云音乐\n歌曲:{song_name} - {singer}\n请稍等喔🎵"))
#返回转换后的歌曲路径
output_silk_path = netease_music_download(song_id, song_name, singer,session)
output_silk_path = await netease_music_download(song_id, song_name, singer,session)
if output_silk_path == -1:
await music.send("歌曲音频获取失败:登录信息失效。")
@ -64,7 +65,7 @@ async def handle_function(msg: MessageEvent):
await music.send(MessageSegment.file_audio(Path(output_silk_path)))
#删除临时文件
netease_music_delete()
await delete_file(output_silk_path)
await music.finish()

View file

@ -8,7 +8,7 @@ from src.configs.path_config import daily_news_path
import os
daily_report = on_command("日报", rule=to_me(), priority=10, block=True)
daily_report = on_command("日报", rule=to_me(), priority=10)
@daily_report.handle()
async def handle_function():
now = datetime.now()

View file

@ -8,11 +8,11 @@ from src.clover_sqlite.models.fortune import QrFortune
from src.clover_sqlite.models.tarot import TarotExtractLog
import time
fortune_by_sqlite = on_command("今日运势", rule=to_me(), priority=10, block=True)
fortune_by_sqlite = on_command("今日运势", rule=to_me(), priority=10)
@fortune_by_sqlite.handle()
async def get_today_fortune(message: MessageEvent):
local_image_path = get_image_names()
local_image_path = await get_image_names()
result = await QrFortune.get_fortune(message.get_user_id())
content = ("\n" + "您的今日运势为:" + "\n" +
@ -35,7 +35,7 @@ async def get_today_fortune(message: MessageEvent):
await fortune_by_sqlite.finish("您的今日运势被外星人抢走啦,请重试。这绝对不是咱的错,绝对不是!")
tarot = on_command("今日塔罗", rule=to_me(), priority=10, block=True)
tarot = on_command("今日塔罗", rule=to_me(), priority=10)
@tarot.handle()
async def get_tarot(message: MessageEvent):
#extract_type : 1大阿尔克纳牌 2小阿尔克纳牌 3 混合牌组 4三角牌阵 5六芒星牌阵 6凯尔特十字牌阵 7恋人牌阵

View file

@ -39,7 +39,7 @@ from src.configs.path_config import font_path,good_bad,temp_path
# await good_news.finish("出错啦,请重试。")
good_news = on_command("喜报", rule=to_me(), priority=10, block=True, aliases={"悲报"})
good_news = on_command("喜报", rule=to_me(), priority=10, aliases={"悲报"})
@good_news.handle()
async def function(message: MessageEvent):
value = message.get_plaintext().split(" ")

View file

@ -8,11 +8,11 @@ from src.clover_image.download_image import download_image
from src.configs.path_config import temp_path
image = on_command("", rule=to_me(), priority=10, block=True)
image = on_command("", rule=to_me(), priority=10)
@image.handle()
async def handle_function():
local_image_path = get_image_names()
local_image_path = await get_image_names()
await image.finish(MessageSegment.file_image(Path(local_image_path)))
@ -23,5 +23,5 @@ async def handle_function(message: MessageEvent):
filename = str(message.get_user_id()) + str(random.randint(0, 10000)) + ".jpg"
image_ptah = temp_path + filename
download_image(message.attachments[0].url, image_ptah)
await download_image(message.attachments[0].url, image_ptah)
await image.finish(MessageSegment.file_image(Path(image_ptah)))

36
src/plugins/mini_game.py Normal file
View file

@ -0,0 +1,36 @@
import random
from pathlib import Path
from nonebot.params import CommandArg
from nonebot.rule import to_me
from nonebot.plugin import on_command
from nonebot.matcher import Matcher
from nonebot.internal.params import ArgPlainText
from nonebot.adapters.qq import MessageSegment, Message
from src.clover_image.color_sensitive_game import game_flow,check_guess
from src.configs.path_config import temp_path
from src.clover_image.delete_file import delete_file
color_sensitive_game = on_command("绝对色感", rule=to_me(), priority=10)
@color_sensitive_game.handle()
async def handle_function(matcher: Matcher, args: Message = CommandArg()):
level = args.extract_plain_text().replace("/绝对色感","").strip(" ")
if level not in ["初级","中级","高级","超神"]:
await color_sensitive_game.finish("请输入正确的参数, \n 如:/绝对色感 初级、中级、高级、超神")
file_path = temp_path + str(random.randint(0, 10000)) + ".jpeg"
result_list = await game_flow(level_decision = level,file_path = file_path)
await color_sensitive_game.send(MessageSegment.file_image(Path(result_list[0])))
matcher.state["result_list"] = result_list
matcher.set_arg("guess", args)
await delete_file(file_path)
@color_sensitive_game.got("guess", prompt="请输入坐标")
async def got_location(matcher: Matcher,guess: str = ArgPlainText()):
result_list = matcher.state.get("result_list")
target_row = result_list[1] # x
target_col = result_list[2] # y
boolean, msg = await check_guess(guess, target_row, target_col)
if boolean :
await color_sensitive_game.finish(msg)
await color_sensitive_game.reject(msg)

View file

@ -3,7 +3,7 @@ from nonebot.plugin import on_command
from nonebot.rule import to_me
from src.clover_sqlite.models.to_do import ToDoList
get_todo_list = on_command("待办查询", rule=to_me(), priority=10, block=True, aliases={"代办", "daiban"})
get_todo_list = on_command("待办查询", rule=to_me(), priority=10, aliases={"代办", "daiban"})
@get_todo_list.handle()
async def show_todo_list(message: MessageEvent):
"""
@ -21,7 +21,7 @@ async def show_todo_list(message: MessageEvent):
await get_todo_list.finish(f"您的待办有如下哦:⭐\n\n{todo_list}")
insert_todo = on_command("新建待办", rule=to_me(), priority=10, block=True)
insert_todo = on_command("新建待办", rule=to_me(), priority=10)
@insert_todo.handle()
async def insert_todo_list(message: MessageEvent):
member_openid = message.get_user_id()
@ -33,7 +33,7 @@ async def insert_todo_list(message: MessageEvent):
await insert_todo.finish("\n请输入 /新建待办+待办内容 哦")
delete_todo = on_command("删除待办", rule=to_me(), priority=10, block=True)
delete_todo = on_command("删除待办", rule=to_me(), priority=10)
@delete_todo.handle()
async def del_todo(message: MessageEvent):
member_openid = message.get_user_id()

View file

@ -3,10 +3,11 @@ from nonebot.adapters.qq import Message, MessageEvent
from nonebot.adapters.qq import MessageSegment
from nonebot.plugin import on_command
from nonebot.rule import to_me
from src.clover_image.qq_image import download_qq_image,qq_image_delete
from src.clover_image.qq_image import download_qq_image
from src.clover_image.delete_file import delete_file
from src.clover_sqlite.models.user import UserList
today_group_wife = on_command("群老婆", rule=to_me(), priority=10, block=True)
today_group_wife = on_command("群老婆", rule=to_me(), priority=10)
@today_group_wife.handle()
async def handle_function(message: MessageEvent):
member_openid = message.get_user_id()
@ -19,11 +20,11 @@ async def handle_function(message: MessageEvent):
MessageSegment.text("您的今日群老婆"),
MessageSegment.file_image(Path(local_image_path)),
])
qq_image_delete()
await delete_file(local_image_path)
await today_group_wife.finish(msg)
today_wife = on_command("今日老婆", rule=to_me(), priority=10, block=True)
today_wife = on_command("今日老婆", rule=to_me(), priority=10)
@today_wife.handle()
async def handle_function(message: MessageEvent):
member_openid = message.get_user_id()
@ -33,7 +34,7 @@ async def handle_function(message: MessageEvent):
MessageSegment.file_image(Path(local_image_path)),
])
qq_image_delete()
await delete_file(local_image_path)
await today_wife.finish(msg)

View file

@ -3,10 +3,11 @@ from nonebot.rule import to_me
from nonebot.plugin import on_command
from nonebot.adapters.qq import Message, MessageEvent, MessageSegment
from src.clover_sqlite.models.touch import QrTouch, QrTouchLog
from src.clover_image.qq_image import download_qq_image_by_account, qq_image_delete
from src.clover_image.qq_image import download_qq_image_by_account
from src.clover_image.delete_file import delete_file
from src.clover_image.rua import rua
to = on_command("摸摸头", rule=to_me(), priority=10, block=True)
to = on_command("摸摸头", rule=to_me(), priority=10)
@to.handle()
async def handle_touch(message: MessageEvent):
member_openid = message.get_user_id()
@ -24,5 +25,5 @@ async def handle_touch(message: MessageEvent):
local_gif = rua(download_qq_image_by_account(None)).add_gif()
msg = Message([MessageSegment.file_image(Path(local_gif)),
MessageSegment.text(result.reply_touch_content),])
qq_image_delete()
await delete_file(local_gif)
await to.finish(msg)

View file

@ -3,7 +3,7 @@ from nonebot.plugin import on_command
from nonebot.adapters import Message
from nonebot.params import CommandArg
weather = on_command("天气", rule=to_me(), aliases={"weather", "查天气"}, priority=10, block=True)
weather = on_command("天气", rule=to_me(), aliases={"weather", "查天气"}, priority=10)
@weather.handle()
async def handle_function(args: Message = CommandArg()):