mirror of
https://github.com/ClovertaTheTrilobita/SanYeCao-Nonebot.git
synced 2026-04-01 22:04:51 +00:00
添加机器人管理员鉴权功能
管理员可选择机器人是否启动语言对话模型 删除冗余代码
This commit is contained in:
parent
c6814caf80
commit
575e45b1ad
6 changed files with 127 additions and 9 deletions
1
bot.py
1
bot.py
|
|
@ -13,6 +13,7 @@ def init_all():
|
|||
data_init.QrFortune_init()
|
||||
data_init.touch_init()
|
||||
data_init.todo_init()
|
||||
data_init.admin_init()
|
||||
|
||||
if __name__ == "__main__":
|
||||
init_all()
|
||||
|
|
|
|||
33
src/my_sqlite/admin_manage_by_sqlite.py
Normal file
33
src/my_sqlite/admin_manage_by_sqlite.py
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
from sqlalchemy import Column, Integer, String, Date, create_engine, text
|
||||
from sqlalchemy.orm import declarative_base, sessionmaker
|
||||
|
||||
class SqliteSqlalchemy(object):
|
||||
def __init__(self):
|
||||
# 创建Sqlite连接引擎
|
||||
engine = create_engine('sqlite:///./chat_bot.db', echo=True)
|
||||
# 创建Sqlite的session连接对象
|
||||
self.session = sessionmaker(bind=engine)()
|
||||
|
||||
def insert_administrator(member_openid):
|
||||
session = SqliteSqlalchemy().session
|
||||
session.execute(insertAdminID, {'member_openid': member_openid})
|
||||
session.commit()
|
||||
session.close()
|
||||
|
||||
def check_admin_access(member_openid):
|
||||
session = SqliteSqlalchemy().session
|
||||
result = session.execute(selectAdminID, {'member_openid': member_openid}).fetchone()
|
||||
session.close()
|
||||
return result
|
||||
|
||||
# 插入数据
|
||||
insertAdminID = text(
|
||||
"INSERT INTO admin_list (user_id) VALUES (:member_openid)")
|
||||
|
||||
# 查找用户
|
||||
selectAdminID = text(
|
||||
"SELECT user_id FROM admin_list WHERE user_id = :member_openid")
|
||||
|
||||
if __name__ == "__main__":
|
||||
insert_administrator('1234')
|
||||
print(check_admin_access('1234'))
|
||||
14
src/my_sqlite/data_init/admin_init.py
Normal file
14
src/my_sqlite/data_init/admin_init.py
Normal file
|
|
@ -0,0 +1,14 @@
|
|||
import sqlite3
|
||||
|
||||
# 连接到数据库,如果不存在则创建
|
||||
conn = sqlite3.connect('chat_bot.db')
|
||||
# 创建游标
|
||||
c = conn.cursor()
|
||||
# 创建表
|
||||
c.execute("""
|
||||
|
||||
CREATE TABLE IF NOT EXISTS admin_list (
|
||||
user_id VARCHAR(100) PRIMARY KEY
|
||||
);
|
||||
|
||||
""")
|
||||
|
|
@ -90,6 +90,31 @@ def execute_init_file3():
|
|||
except subprocess.CalledProcessError as e:
|
||||
print(f"执行初始化文件时出错: {e}")
|
||||
|
||||
|
||||
def admin_init():
|
||||
session = SqliteSqlalchemy().session
|
||||
# 检查某个表是否存在
|
||||
table_exists = session.execute(selectAdminList).fetchone()
|
||||
if table_exists:
|
||||
return print("管理员表状态正常。")
|
||||
else:
|
||||
print("管理员表不存在,开始执行初始化文件。")
|
||||
execute_init_file4()
|
||||
return ""
|
||||
|
||||
|
||||
def execute_init_file4():
|
||||
# 拼接文件的完整路径
|
||||
file_path = os.getcwd() + "/src/my_sqlite/data_init/admin_init.py"
|
||||
init_file_path = os.path.join(os.path.dirname(__file__), file_path)
|
||||
try:
|
||||
# 执行初始化文件
|
||||
subprocess.run(["python", init_file_path], check=True)
|
||||
print("初始化文件已成功执行。")
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"执行初始化文件时出错: {e}")
|
||||
|
||||
|
||||
# 查询初始化表是否存在
|
||||
selectQrFortune = text( "SELECT name FROM sqlite_master WHERE type='table' AND name='qr_fortune';")
|
||||
|
||||
|
|
@ -102,3 +127,6 @@ selectTodoTable = text(
|
|||
"SELECT name FROM sqlite_master WHERE type='table' AND name='user_todo_list';")
|
||||
selectUserList = text(
|
||||
"SELECT name FROM sqlite_master WHERE type='table' AND name='user_list';")
|
||||
|
||||
# 查询管理员表是否存在
|
||||
selectAdminList = text("SELECT name FROM sqlite_master WHERE type='table' AND name='admin_list';")
|
||||
|
|
@ -1,6 +1,5 @@
|
|||
from sqlalchemy import Column, Integer, String, Date, create_engine, text
|
||||
from sqlalchemy.orm import declarative_base, sessionmaker
|
||||
import sqlite3
|
||||
from sqlalchemy import create_engine, text
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
|
||||
class SqliteSqlalchemy(object):
|
||||
|
|
|
|||
|
|
@ -2,13 +2,19 @@ from nonebot.plugin import on_command, on_keyword
|
|||
import random
|
||||
from nonebot.rule import Rule, to_me
|
||||
from nonebot import on_message
|
||||
from nonebot.adapters.qq import Message
|
||||
from nonebot.adapters.qq import Message, MessageEvent
|
||||
from nonebot.adapters import Bot, Event
|
||||
from src.ai_chat import ai_chat
|
||||
import os
|
||||
import yaml
|
||||
from src.my_sqlite.admin_manage_by_sqlite import insert_administrator, check_admin_access
|
||||
|
||||
menu = ['/今日运势','/天气','/图','/点歌','/摸摸头','/群老婆','/今日老婆', '/待办', '/test', '我喜欢你', "❤", "/待办查询", "/新建待办", "/删除待办", "/openai", "/cf"]
|
||||
"""
|
||||
设置管理员鉴权密码
|
||||
"""
|
||||
admin_passwd = "1234"
|
||||
|
||||
menu = ['/今日运势','/天气','/图','/点歌','/摸摸头','/群老婆','/今日老婆', '/待办', '/test', '我喜欢你', "❤", "/待办查询", "/新建待办", "/删除待办", "/activate_ai", "/cf", "/管理员确认"]
|
||||
async def check_value_in_menu(event: Event) -> bool:
|
||||
value = event.get_plaintext().strip().split(" ")
|
||||
if value[0] in menu:
|
||||
|
|
@ -17,13 +23,12 @@ async def check_value_in_menu(event: Event) -> bool:
|
|||
return True
|
||||
|
||||
rule = Rule(check_value_in_menu)
|
||||
with open(os.getcwd() +'/src/ai_chat/config/chat_ai.yaml', 'r', encoding='utf-8') as f:
|
||||
is_ai = yaml.load(f.read(), Loader=yaml.FullLoader).get('chat_ai').get('active')
|
||||
|
||||
|
||||
check = on_message(rule=to_me() & rule ,block=True)
|
||||
@check.handle()
|
||||
async def check(bot: Bot, event: Event):
|
||||
if is_ai == "True":
|
||||
if is_ai() == "True":
|
||||
msg = ai_chat.deepseek_chat(event.get_plaintext())
|
||||
await bot.send(message=msg,event=event)
|
||||
else:
|
||||
|
|
@ -47,4 +52,42 @@ test = on_command("test", rule=to_me(), priority=10, block=True)
|
|||
async def bot_on_ready():
|
||||
await test.finish("\nBoost & Magnum, ready fight!!!")
|
||||
|
||||
verification = on_command("管理员确认", rule=to_me(), priority=10, block=True)
|
||||
@verification.handle()
|
||||
async def verify_as_administrator(message: MessageEvent):
|
||||
passwd = message.get_plaintext().replace("/管理员确认", "").strip(" ")
|
||||
if passwd == admin_passwd:
|
||||
insert_administrator(message.get_user_id())
|
||||
await verification.finish("成功注册为管理员。")
|
||||
else:
|
||||
await verification.finish("管理员鉴权失败。")
|
||||
|
||||
ai_is_available = on_command("activate_ai", rule=to_me(), priority=10, block=True)
|
||||
@ai_is_available.handle()
|
||||
async def change_ai_availability(message: MessageEvent):
|
||||
member_openid = message.get_user_id()
|
||||
result = check_admin_access(member_openid)
|
||||
if result is None:
|
||||
await ai_is_available.finish(message=Message(random.choice(text_list)))
|
||||
elif str(result).lstrip("('").rstrip("',)") == member_openid:
|
||||
if is_ai() == "True":
|
||||
change_chatai_yaml_availability_to("False")
|
||||
await ai_is_available.finish("成功关闭语言模型对话功能。")
|
||||
else:
|
||||
change_chatai_yaml_availability_to("True")
|
||||
await ai_is_available.finish("成功开启语言模型对话功能。一起来聊天吧~")
|
||||
|
||||
|
||||
def change_chatai_yaml_availability_to(is_available):
|
||||
with open(os.getcwd() + '/src/ai_chat/config/chat_ai.yaml', 'r', encoding='utf-8') as f1:
|
||||
dic_temp = yaml.load(f1, Loader=yaml.FullLoader)
|
||||
dic_temp['chat_ai']['active'] = is_available
|
||||
with open(os.getcwd() + '/src/ai_chat/config/chat_ai.yaml', 'w', encoding='utf-8') as f1:
|
||||
yaml.dump(dic_temp, f1)
|
||||
print(dic_temp)
|
||||
f1.close()
|
||||
|
||||
def is_ai():
|
||||
with open(os.getcwd() + '/src/ai_chat/config/chat_ai.yaml', 'r', encoding='utf-8') as f:
|
||||
state = yaml.load(f.read(), Loader=yaml.FullLoader).get('chat_ai').get('active')
|
||||
return state
|
||||
Loading…
Reference in a new issue