From 575e45b1adce6bd126523b0ce9bc8d10355cb584 Mon Sep 17 00:00:00 2001 From: ClovertaTheTrilobita Date: Sat, 18 Jan 2025 01:51:23 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=9C=BA=E5=99=A8=E4=BA=BA?= =?UTF-8?q?=E7=AE=A1=E7=90=86=E5=91=98=E9=89=B4=E6=9D=83=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=20=E7=AE=A1=E7=90=86=E5=91=98=E5=8F=AF=E9=80=89=E6=8B=A9?= =?UTF-8?q?=E6=9C=BA=E5=99=A8=E4=BA=BA=E6=98=AF=E5=90=A6=E5=90=AF=E5=8A=A8?= =?UTF-8?q?=E8=AF=AD=E8=A8=80=E5=AF=B9=E8=AF=9D=E6=A8=A1=E5=9E=8B=20?= =?UTF-8?q?=E5=88=A0=E9=99=A4=E5=86=97=E4=BD=99=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bot.py | 1 + src/my_sqlite/admin_manage_by_sqlite.py | 33 +++++++++++++++ src/my_sqlite/data_init/admin_init.py | 14 +++++++ src/my_sqlite/data_init/data_init.py | 30 +++++++++++++- src/my_sqlite/todo_by_sqlite.py | 5 +-- src/qq_plugins/check.py | 53 ++++++++++++++++++++++--- 6 files changed, 127 insertions(+), 9 deletions(-) create mode 100644 src/my_sqlite/admin_manage_by_sqlite.py create mode 100644 src/my_sqlite/data_init/admin_init.py diff --git a/bot.py b/bot.py index 77746ea..ad6cb6c 100644 --- a/bot.py +++ b/bot.py @@ -13,6 +13,7 @@ def init_all(): data_init.QrFortune_init() data_init.touch_init() data_init.todo_init() + data_init.admin_init() if __name__ == "__main__": init_all() diff --git a/src/my_sqlite/admin_manage_by_sqlite.py b/src/my_sqlite/admin_manage_by_sqlite.py new file mode 100644 index 0000000..b905deb --- /dev/null +++ b/src/my_sqlite/admin_manage_by_sqlite.py @@ -0,0 +1,33 @@ +from sqlalchemy import Column, Integer, String, Date, create_engine, text +from sqlalchemy.orm import declarative_base, sessionmaker + +class SqliteSqlalchemy(object): + def __init__(self): + # 创建Sqlite连接引擎 + engine = create_engine('sqlite:///./chat_bot.db', echo=True) + # 创建Sqlite的session连接对象 + self.session = sessionmaker(bind=engine)() + +def insert_administrator(member_openid): + session = SqliteSqlalchemy().session + session.execute(insertAdminID, {'member_openid': member_openid}) + session.commit() + session.close() + +def check_admin_access(member_openid): + session = SqliteSqlalchemy().session + result = session.execute(selectAdminID, {'member_openid': member_openid}).fetchone() + session.close() + return result + +# 插入数据 +insertAdminID = text( + "INSERT INTO admin_list (user_id) VALUES (:member_openid)") + +# 查找用户 +selectAdminID = text( + "SELECT user_id FROM admin_list WHERE user_id = :member_openid") + +if __name__ == "__main__": + insert_administrator('1234') + print(check_admin_access('1234')) \ No newline at end of file diff --git a/src/my_sqlite/data_init/admin_init.py b/src/my_sqlite/data_init/admin_init.py new file mode 100644 index 0000000..ce719db --- /dev/null +++ b/src/my_sqlite/data_init/admin_init.py @@ -0,0 +1,14 @@ +import sqlite3 + +# 连接到数据库,如果不存在则创建 +conn = sqlite3.connect('chat_bot.db') +# 创建游标 +c = conn.cursor() +# 创建表 +c.execute(""" + +CREATE TABLE IF NOT EXISTS admin_list ( + user_id VARCHAR(100) PRIMARY KEY +); + +""") \ No newline at end of file diff --git a/src/my_sqlite/data_init/data_init.py b/src/my_sqlite/data_init/data_init.py index 297fb65..e679806 100644 --- a/src/my_sqlite/data_init/data_init.py +++ b/src/my_sqlite/data_init/data_init.py @@ -90,6 +90,31 @@ def execute_init_file3(): except subprocess.CalledProcessError as e: print(f"执行初始化文件时出错: {e}") + +def admin_init(): + session = SqliteSqlalchemy().session + # 检查某个表是否存在 + table_exists = session.execute(selectAdminList).fetchone() + if table_exists: + return print("管理员表状态正常。") + else: + print("管理员表不存在,开始执行初始化文件。") + execute_init_file4() + return "" + + +def execute_init_file4(): + # 拼接文件的完整路径 + file_path = os.getcwd() + "/src/my_sqlite/data_init/admin_init.py" + init_file_path = os.path.join(os.path.dirname(__file__), file_path) + try: + # 执行初始化文件 + subprocess.run(["python", init_file_path], check=True) + print("初始化文件已成功执行。") + except subprocess.CalledProcessError as e: + print(f"执行初始化文件时出错: {e}") + + # 查询初始化表是否存在 selectQrFortune = text( "SELECT name FROM sqlite_master WHERE type='table' AND name='qr_fortune';") @@ -101,4 +126,7 @@ selectQrTouch = text( selectTodoTable = text( "SELECT name FROM sqlite_master WHERE type='table' AND name='user_todo_list';") selectUserList = text( - "SELECT name FROM sqlite_master WHERE type='table' AND name='user_list';") \ No newline at end of file + "SELECT name FROM sqlite_master WHERE type='table' AND name='user_list';") + +# 查询管理员表是否存在 +selectAdminList = text("SELECT name FROM sqlite_master WHERE type='table' AND name='admin_list';") \ No newline at end of file diff --git a/src/my_sqlite/todo_by_sqlite.py b/src/my_sqlite/todo_by_sqlite.py index fa310f3..388493b 100644 --- a/src/my_sqlite/todo_by_sqlite.py +++ b/src/my_sqlite/todo_by_sqlite.py @@ -1,6 +1,5 @@ -from sqlalchemy import Column, Integer, String, Date, create_engine, text -from sqlalchemy.orm import declarative_base, sessionmaker -import sqlite3 +from sqlalchemy import create_engine, text +from sqlalchemy.orm import sessionmaker class SqliteSqlalchemy(object): diff --git a/src/qq_plugins/check.py b/src/qq_plugins/check.py index c3689c9..1c724fb 100644 --- a/src/qq_plugins/check.py +++ b/src/qq_plugins/check.py @@ -2,13 +2,19 @@ from nonebot.plugin import on_command, on_keyword import random from nonebot.rule import Rule, to_me from nonebot import on_message -from nonebot.adapters.qq import Message +from nonebot.adapters.qq import Message, MessageEvent from nonebot.adapters import Bot, Event from src.ai_chat import ai_chat import os import yaml +from src.my_sqlite.admin_manage_by_sqlite import insert_administrator, check_admin_access -menu = ['/今日运势','/天气','/图','/点歌','/摸摸头','/群老婆','/今日老婆', '/待办', '/test', '我喜欢你', "❤", "/待办查询", "/新建待办", "/删除待办", "/openai", "/cf"] +""" +设置管理员鉴权密码 +""" +admin_passwd = "1234" + +menu = ['/今日运势','/天气','/图','/点歌','/摸摸头','/群老婆','/今日老婆', '/待办', '/test', '我喜欢你', "❤", "/待办查询", "/新建待办", "/删除待办", "/activate_ai", "/cf", "/管理员确认"] async def check_value_in_menu(event: Event) -> bool: value = event.get_plaintext().strip().split(" ") if value[0] in menu: @@ -17,13 +23,12 @@ async def check_value_in_menu(event: Event) -> bool: return True rule = Rule(check_value_in_menu) -with open(os.getcwd() +'/src/ai_chat/config/chat_ai.yaml', 'r', encoding='utf-8') as f: - is_ai = yaml.load(f.read(), Loader=yaml.FullLoader).get('chat_ai').get('active') + check = on_message(rule=to_me() & rule ,block=True) @check.handle() async def check(bot: Bot, event: Event): - if is_ai == "True": + if is_ai() == "True": msg = ai_chat.deepseek_chat(event.get_plaintext()) await bot.send(message=msg,event=event) else: @@ -47,4 +52,42 @@ test = on_command("test", rule=to_me(), priority=10, block=True) async def bot_on_ready(): await test.finish("\nBoost & Magnum, ready fight!!!") +verification = on_command("管理员确认", rule=to_me(), priority=10, block=True) +@verification.handle() +async def verify_as_administrator(message: MessageEvent): + passwd = message.get_plaintext().replace("/管理员确认", "").strip(" ") + if passwd == admin_passwd: + insert_administrator(message.get_user_id()) + await verification.finish("成功注册为管理员。") + else: + await verification.finish("管理员鉴权失败。") +ai_is_available = on_command("activate_ai", rule=to_me(), priority=10, block=True) +@ai_is_available.handle() +async def change_ai_availability(message: MessageEvent): + member_openid = message.get_user_id() + result = check_admin_access(member_openid) + if result is None: + await ai_is_available.finish(message=Message(random.choice(text_list))) + elif str(result).lstrip("('").rstrip("',)") == member_openid: + if is_ai() == "True": + change_chatai_yaml_availability_to("False") + await ai_is_available.finish("成功关闭语言模型对话功能。") + else: + change_chatai_yaml_availability_to("True") + await ai_is_available.finish("成功开启语言模型对话功能。一起来聊天吧~") + + +def change_chatai_yaml_availability_to(is_available): + with open(os.getcwd() + '/src/ai_chat/config/chat_ai.yaml', 'r', encoding='utf-8') as f1: + dic_temp = yaml.load(f1, Loader=yaml.FullLoader) + dic_temp['chat_ai']['active'] = is_available + with open(os.getcwd() + '/src/ai_chat/config/chat_ai.yaml', 'w', encoding='utf-8') as f1: + yaml.dump(dic_temp, f1) + print(dic_temp) + f1.close() + +def is_ai(): + with open(os.getcwd() + '/src/ai_chat/config/chat_ai.yaml', 'r', encoding='utf-8') as f: + state = yaml.load(f.read(), Loader=yaml.FullLoader).get('chat_ai').get('active') + return state \ No newline at end of file