diff --git a/Services/ark_nights.py b/Services/ark_nights.py index a085114..e03316a 100644 --- a/Services/ark_nights.py +++ b/Services/ark_nights.py @@ -1,287 +1,3 @@ """ -Arknights headhunt recruitment simulator. +This function is waiting for a rewrite and is currently deprecated. """ -import random -import sqlite3 -import time -from typing import Union - - -class ArknightsPity: - def __init__(self): - self.sanity_poll_dict = {} - - def record_poll(self, group_id): - if group_id not in self.sanity_poll_dict: - self.sanity_poll_dict[group_id] = 10 - else: - self.sanity_poll_dict[group_id] += 10 - - def get_offset_setting(self, group_id) -> int: - if group_id not in self.sanity_poll_dict: - self.record_poll(group_id) - return 0 - else: - poll_count = self.sanity_poll_dict[group_id] - if poll_count <= 50: - return 0 - else: - return (poll_count - 50) * 2 - - def reset_offset(self, group_id): - self.sanity_poll_dict[group_id] = 0 - - -class ArkHeadhunt: - """ - Init data for arknights headhunt simulator. - """ - - def __init__(self, times=1): - self.times = times - self.count = 0 - self.offset = 0 - self.arknights_agent_db = sqlite3.connect('data/db/stats.db') - - self._init() - - self.random_agent = [] - self.random_class = [] - - def _init(self) -> dict: - self.arknights_agent_db.execute( - """ - create table if not exists arknights_op ( - "ch_name" varchar(50) unique on conflict ignore, - "stars" integer, - "is_limited" boolean, - "is_secondary_up" boolean, - "is_up" boolean - ) - """ - ) - self.arknights_agent_db.commit() - - def _get_if_limit_banner_on(self): - result = self.arknights_agent_db.execute( - """ - select ch_name from arknights_op where is_limited = true - """ - ).fetchone() - - return result if result and result is not None else '' - - def get_randomized_results(self, offset_setting=0): - """ - Get randomized list for agent star numbers. - :param offset_setting: Offset that can set for rigging the result. - :return: void. - """ - if self.times < 0: - raise ValueError("Pulling value cannot be less than 0") - - random.seed(time.time_ns()) - random_class = [] - self.count += 1 - for _ in range(0, self.times): - rand_num = random.randint(0, 100) + offset_setting - if rand_num >= 98: - random_class.append(6) - self.count = 0 - elif rand_num >= 90: - random_class.append(5) - elif rand_num >= 40: - random_class.append(4) - else: - random_class.append(3) - - time.sleep(0.05) - - self.random_class = random_class - self.random_agent = self._get_ops() - - def _get_uped_op(self, star: int): - result = self._get_op_from_db(star, is_up=True) - if result is None or not result: - return self._get_op_from_db(star) - - return result - - def _get_secondary_up_op(self, star: int): - result = self._get_op_from_db(star, is_secondary_up=True) - if result is None: - return self._get_op_from_db(star) - - return result - - def _get_op_count_by_stars(self, star: int) -> int: - result = self.arknights_agent_db.execute( - """ - select count(ch_name) from arknights_op where stars = ? - """, (star,) - ).fetchone() - - return int(result[0]) - - def _get_op_from_db(self, star: int, is_up=False, is_secondary_up=False): - if is_up or is_secondary_up: - where_query = f'where stars = ? and (is_up = {is_up.__str__().lower()} ' \ - f'or is_secondary_up = {is_secondary_up.__str__().lower()})' - else: - where_query = 'where stars = ?' - result = self.arknights_agent_db.execute( - f""" - select ch_name from arknights_op - {where_query} order by random() limit 1 - """, (star,) - ).fetchone() - - return result if result and result is not None else '' - - def _insert_op(self, star: int, name: str, is_limited=False, is_sec_up=False, is_up=False): - is_up = is_limited or is_sec_up or is_up - self.arknights_agent_db.execute( - """ - insert or replace into arknights_op (ch_name, stars, is_limited, is_secondary_up, is_up) values ( - ?, ?, ?, ?, ? - ) - """, (name, star, is_limited, is_sec_up, is_up) - ) - self._commit_change() - - def _get_ops(self) -> list: - """ - Get a list of agent's name. - :return: A list with all operator names. - """ - random_agent = [] - random.seed(time.time_ns()) - for elements in self.random_class: - random_int = random.randint(0, 100) - if self._get_if_limit_banner_on() and elements == 6: - if random_int < 70: - random_agent.append(self._get_uped_op(6)) - else: - # 30%中的五倍权值爆率。 - second_random = random.randint(0, self._get_op_count_by_stars(6)) - if second_random < 5 and self._get_if_limit_banner_on(): - random_agent.append(self._get_secondary_up_op(6)) - else: - random_agent.append(self._get_op_from_db(6)) - - else: - if random_int < 50: - random_agent.append(self._get_uped_op(elements)) - else: - random_agent.append(self._get_op_from_db(elements)) - - return random_agent - - def _get_all_secondary_up_op(self, star: int): - result = self.arknights_agent_db.execute( - """ - select ch_name, stars from arknights_op where stars = ? and is_secondary_up = true - """, (star,) - ).fetchall() - - result = [x[0] for x in result if x is not None and x[0] is not None] - return result - - def _get_all_uped_op(self, star: int): - result = self.arknights_agent_db.execute( - """ - select ch_name, stars from arknights_op where stars = ? and is_up = true - """, (star,) - ).fetchall() - - result = [x[0] for x in result if x is not None and x[0] is not None] - return result - - def up_op(self, agent: str, star: Union[int, str], is_limited=False, is_second_up=False, is_up=True): - if isinstance(star, str) and not star.isdigit(): - return '?' - - star = int(star) - self._insert_op(star, agent, is_limited, is_second_up, is_up) - return 'Done' - - def _commit_change(self): - self.arknights_agent_db.commit() - - def clear_ups(self): - self.arknights_agent_db.execute( - """ - delete from arknights_op where is_limited = true - """ - ) - self.arknights_agent_db.execute( - """ - update arknights_op set is_secondary_up = false, is_up = false, is_limited = false - """ - ) - self._commit_change() - - def add_op(self, agent: str, star: Union[int, str]): - if isinstance(star, str) and not star.isdigit(): - return '?' - - star = int(star) - self._insert_op(star, agent) - return f'成功将{agent}加入{star}星干员组' - - def get_up(self) -> str: - result = '' - four_up = self._get_all_uped_op(4) - five_up = self._get_all_uped_op(5) - six_up = self._get_all_uped_op(6) - - secondary_up = self._get_all_secondary_up_op(6) - - result += '四星:' + ','.join(four_up) + '\n' - result += '五星:' + ','.join(five_up) + '\n' - result += '六星:' + ','.join(six_up) + '\n' - result += ('六星小UP:' + ','.join(secondary_up) + '\n') if self._get_if_limit_banner_on() else '' - - return result if result else '无\n' - - def __str__(self): - """ - Generating the result of the headhunt. - :return: str, the result of the headhunt in Chinese. - """ - response = '' - - response += f'本次卡池UP:\n{self.get_up()}' - six_star = 0 - for idx, elements in enumerate(self.random_class): - if elements == 6 or elements == -1: - six_star += 1 - - if elements == -1: - element = 6 - else: - element = elements - response += str(element) + '星干员: %s\n' % self.random_agent[idx] - - if 5 not in self.random_class and 6 not in self.random_class: - congrats = '哈↑哈↓紫气东来' - - else: - if six_star > 6: - congrats = '你丫神仙吧草w' - elif six_star > 3: - congrats = '这个爆率感觉机器人应该是坏了吧' - elif six_star >= 1: - congrats = '有点幸运了啊' - else: - congrats = '没事这结果挺正常的,会好起来的,哈哈哈哈嗝~' - - response += '本次寻访获得了%d个六星干员,%s' % (six_star, congrats) - return response - - -# Test -if __name__ == '__main__': - api = ArkHeadhunt(times=10) - api.get_randomized_results() - print(api.__str__()) diff --git a/Services/discord_service.py b/Services/discord_service.py index 8f6896b..05bfbfc 100644 --- a/Services/discord_service.py +++ b/Services/discord_service.py @@ -1,7 +1,7 @@ import sqlite3 from datetime import datetime from json import dumps, loads -from os import getcwd, path +from os import path from re import findall from time import time from typing import List @@ -10,10 +10,12 @@ from nonebot.adapters.onebot.v11 import MessageSegment from youtube_dl.utils import sanitize_filename -from Services.util.common_util import HttpxHelperClient, DiscordGroupNotification, DiscordMessageStatus, time_to_literal -from awesome.Constants.path_constants import BILIBILI_PIC_PATH +from Services.util.common_util import HttpxHelperClient, time_to_literal +from awesome.Constants.path_constants import BILIBILI_PIC_PATH, DB_PATH from awesome.Constants.vtuber_function_constants import GPT_4_MODEL_NAME from config import SUPER_USER, DISCORD_AUTH +from model.common_model import DiscordMessageStatus, DiscordGroupNotification +from util.db_utils import fetch_one_or_default from util.helper_util import construct_message_chain @@ -25,7 +27,7 @@ def __init__(self): ' AppleWebKit/537.36 (KHTML, like Gecko)' ' Chrome/126.0.0.0 Safari/537.36' } - self.database = sqlite3.connect(f'{getcwd()}/data/db/live_notification_data.db') + self.database = sqlite3.connect(path.join(DB_PATH, 'live_notification_data.db')) self._init_database() self.client = HttpxHelperClient() @@ -110,11 +112,7 @@ async def _get_the_latest_existing_discord_message(self, channel_id): """, (channel_id,) ).fetchone() - data = data if not isinstance(data, tuple) else data[0] - if data is None: - return 0 - - return data + return fetch_one_or_default(data, 0) async def check_discord_updates(self) -> List[DiscordGroupNotification]: notification_list = self._retrieve_all_record_in_db() diff --git a/Services/rate_limiter.py b/Services/rate_limiter.py index 7fdbf3c..be4eb79 100644 --- a/Services/rate_limiter.py +++ b/Services/rate_limiter.py @@ -3,6 +3,7 @@ from typing import Union from Services.util.common_util import time_to_literal +from model.common_model import RateLimitStatus class UserLimitModifier: @@ -193,15 +194,16 @@ async def _query_permission( query_user_prompt, wait_time = await self._query_user_permission(function_name, user_id, user_limit_modifier) return query_user_prompt, wait_time - async def _assemble_limit_prompt(self, prompt, wait_time) -> [None, str]: + async def _assemble_limit_prompt(self, prompt, wait_time) -> RateLimitStatus: if prompt == self.LIMIT_BY_USER: - return f'别玩啦,过{await time_to_literal(wait_time)}再回来玩好不好?' + return RateLimitStatus( + True, f'别玩啦,过{await time_to_literal(wait_time if wait_time > 0 else 1)}再回来玩好不好?') elif prompt == self.LIMIT_BY_GROUP: - return f'群使用已到达允许上限,请稍等{await time_to_literal(wait_time)}重试' + return RateLimitStatus(True, f'群使用已到达允许上限,请稍等{await time_to_literal(wait_time)}重试') elif prompt == self.TEMPRORARY_DISABLED: - return f'该功能全局禁用中,请稍等{await time_to_literal(wait_time)}再试。' + return RateLimitStatus(True, f'该功能全局禁用中,请稍等{await time_to_literal(wait_time)}再试。') - return None + return RateLimitStatus(False, None) async def user_group_limit_check( self, @@ -209,7 +211,7 @@ async def user_group_limit_check( user_id: Union[str, int], group_id: Union[str, int], user_limit_modifier: UserLimitModifier - ) -> Union[str, None]: + ) -> RateLimitStatus: user_id = str(user_id) group_id = str(group_id) @@ -221,7 +223,7 @@ async def user_limit_check( function_name: str, user_id: Union[str, int], user_limit_modifier: UserLimitModifier - ): + ) -> RateLimitStatus: user_id = str(user_id) query_result, wait_time = await self._query_user_permission(function_name, user_id, user_limit_modifier) return await self._assemble_limit_prompt(query_result, wait_time) @@ -232,7 +234,7 @@ async def group_limit_check( group_id: Union[str, int], time_period=60, function_limit=None - ): + ) -> RateLimitStatus: group_id = str(group_id) query_result, wait_time = await self._query_group_permission( function_name, diff --git a/Services/twitch_service.py b/Services/twitch_service.py index ce73188..1185510 100644 --- a/Services/twitch_service.py +++ b/Services/twitch_service.py @@ -18,8 +18,8 @@ from youtube_dl.utils import sanitize_filename from Services.live_notification import LiveNotificationData -from Services.util.common_util import OptionalDict, HttpxHelperClient, Status, TwitchDownloadStatus, \ - ValidatedTimestampStatus +from Services.util.common_util import OptionalDict, HttpxHelperClient +from model.common_model import Status, ValidatedTimestampStatus, TwitchDownloadStatus from config import SUPER_USER, PATH_TO_ONEDRIVE, SHARE_LINK, CLOUD_STORAGE_SIZE_LIMIT_GB from util.helper_util import construct_message_chain diff --git a/Services/util/common_util.py b/Services/util/common_util.py index 1891af6..51e1bb4 100644 --- a/Services/util/common_util.py +++ b/Services/util/common_util.py @@ -1,5 +1,3 @@ -import dataclasses -import time from asyncio import sleep, get_running_loop from functools import lru_cache from hashlib import sha1 @@ -9,7 +7,8 @@ from os.path import exists from random import randint from ssl import SSLContext -from typing import List, Literal, Dict, Any, Union +from time import time +from typing import List, Literal, Dict, Any, Union, TypeVar from uuid import uuid4 import markdown2 @@ -34,42 +33,10 @@ from webdriver_manager.chrome import ChromeDriverManager from Services.util.ctx_utility import get_user_id, get_group_id +from awesome.Constants.path_constants import BOT_RESPONSE_PATH TEMP_FILE_DIRECTORY = path.join(getcwd(), 'data', 'temp') - - -@dataclasses.dataclass -class Status: - is_success: bool - message: any - - -@dataclasses.dataclass -class ValidatedTimestampStatus(Status): - validated_timestamp: str = '' - - -@dataclasses.dataclass -class TwitchDownloadStatus(Status): - file_path: str = '' - - -@dataclasses.dataclass -class DiscordMessageStatus(Status): - message: List[MessageSegment] = dataclasses.field(default_factory=lambda: []) - group_to_notify: str = '' - has_update: bool = False - is_edit: bool = False - - -@dataclasses.dataclass -class DiscordGroupNotification(Status): - message: Message - has_update: bool - group_to_notify: str - channel_name: str - channel_id: str - is_edit: bool +T = TypeVar("T") def chunk_string(string, length): @@ -246,7 +213,7 @@ def markdown_to_html(string: str): html_string = markdown2.markdown( string, extras=['fenced-code-blocks', 'strike', 'tables', 'task_list', 'code-friendly']) - file_name = f'{getcwd()}/data/bot/response/{int(time.time())}.html' + file_name = f'{getcwd()}/data/bot/response/{int(time())}.html' with open(file_name, 'w+', encoding='utf-8') as file: file.write(r"""