Skip to content

Commit

Permalink
Misc: code structure update.
Browse files Browse the repository at this point in the history
  • Loading branch information
remiliacn committed Sep 5, 2024
1 parent 1c7b417 commit 5673616
Show file tree
Hide file tree
Showing 23 changed files with 188 additions and 567 deletions.
286 changes: 1 addition & 285 deletions Services/ark_nights.py
Original file line number Diff line number Diff line change
@@ -1,287 +1,3 @@
"""
Arknights headhunt recruitment simulator.
This function is waiting for a rewrite and is currently deprecated.
"""
import random
import sqlite3
import time
from typing import Union


class ArknightsPity:
def __init__(self):
self.sanity_poll_dict = {}

def record_poll(self, group_id):
if group_id not in self.sanity_poll_dict:
self.sanity_poll_dict[group_id] = 10
else:
self.sanity_poll_dict[group_id] += 10

def get_offset_setting(self, group_id) -> int:
if group_id not in self.sanity_poll_dict:
self.record_poll(group_id)
return 0
else:
poll_count = self.sanity_poll_dict[group_id]
if poll_count <= 50:
return 0
else:
return (poll_count - 50) * 2

def reset_offset(self, group_id):
self.sanity_poll_dict[group_id] = 0


class ArkHeadhunt:
"""
Init data for arknights headhunt simulator.
"""

def __init__(self, times=1):
self.times = times
self.count = 0
self.offset = 0
self.arknights_agent_db = sqlite3.connect('data/db/stats.db')

self._init()

self.random_agent = []
self.random_class = []

def _init(self) -> dict:
self.arknights_agent_db.execute(
"""
create table if not exists arknights_op (
"ch_name" varchar(50) unique on conflict ignore,
"stars" integer,
"is_limited" boolean,
"is_secondary_up" boolean,
"is_up" boolean
)
"""
)
self.arknights_agent_db.commit()

def _get_if_limit_banner_on(self):
result = self.arknights_agent_db.execute(
"""
select ch_name from arknights_op where is_limited = true
"""
).fetchone()

return result if result and result is not None else ''

def get_randomized_results(self, offset_setting=0):
"""
Get randomized list for agent star numbers.
:param offset_setting: Offset that can set for rigging the result.
:return: void.
"""
if self.times < 0:
raise ValueError("Pulling value cannot be less than 0")

random.seed(time.time_ns())
random_class = []
self.count += 1
for _ in range(0, self.times):
rand_num = random.randint(0, 100) + offset_setting
if rand_num >= 98:
random_class.append(6)
self.count = 0
elif rand_num >= 90:
random_class.append(5)
elif rand_num >= 40:
random_class.append(4)
else:
random_class.append(3)

time.sleep(0.05)

self.random_class = random_class
self.random_agent = self._get_ops()

def _get_uped_op(self, star: int):
result = self._get_op_from_db(star, is_up=True)
if result is None or not result:
return self._get_op_from_db(star)

return result

def _get_secondary_up_op(self, star: int):
result = self._get_op_from_db(star, is_secondary_up=True)
if result is None:
return self._get_op_from_db(star)

return result

def _get_op_count_by_stars(self, star: int) -> int:
result = self.arknights_agent_db.execute(
"""
select count(ch_name) from arknights_op where stars = ?
""", (star,)
).fetchone()

return int(result[0])

def _get_op_from_db(self, star: int, is_up=False, is_secondary_up=False):
if is_up or is_secondary_up:
where_query = f'where stars = ? and (is_up = {is_up.__str__().lower()} ' \
f'or is_secondary_up = {is_secondary_up.__str__().lower()})'
else:
where_query = 'where stars = ?'
result = self.arknights_agent_db.execute(
f"""
select ch_name from arknights_op
{where_query} order by random() limit 1
""", (star,)
).fetchone()

return result if result and result is not None else ''

def _insert_op(self, star: int, name: str, is_limited=False, is_sec_up=False, is_up=False):
is_up = is_limited or is_sec_up or is_up
self.arknights_agent_db.execute(
"""
insert or replace into arknights_op (ch_name, stars, is_limited, is_secondary_up, is_up) values (
?, ?, ?, ?, ?
)
""", (name, star, is_limited, is_sec_up, is_up)
)
self._commit_change()

def _get_ops(self) -> list:
"""
Get a list of agent's name.
:return: A list with all operator names.
"""
random_agent = []
random.seed(time.time_ns())
for elements in self.random_class:
random_int = random.randint(0, 100)
if self._get_if_limit_banner_on() and elements == 6:
if random_int < 70:
random_agent.append(self._get_uped_op(6))
else:
# 30%中的五倍权值爆率。
second_random = random.randint(0, self._get_op_count_by_stars(6))
if second_random < 5 and self._get_if_limit_banner_on():
random_agent.append(self._get_secondary_up_op(6))
else:
random_agent.append(self._get_op_from_db(6))

else:
if random_int < 50:
random_agent.append(self._get_uped_op(elements))
else:
random_agent.append(self._get_op_from_db(elements))

return random_agent

def _get_all_secondary_up_op(self, star: int):
result = self.arknights_agent_db.execute(
"""
select ch_name, stars from arknights_op where stars = ? and is_secondary_up = true
""", (star,)
).fetchall()

result = [x[0] for x in result if x is not None and x[0] is not None]
return result

def _get_all_uped_op(self, star: int):
result = self.arknights_agent_db.execute(
"""
select ch_name, stars from arknights_op where stars = ? and is_up = true
""", (star,)
).fetchall()

result = [x[0] for x in result if x is not None and x[0] is not None]
return result

def up_op(self, agent: str, star: Union[int, str], is_limited=False, is_second_up=False, is_up=True):
if isinstance(star, str) and not star.isdigit():
return '?'

star = int(star)
self._insert_op(star, agent, is_limited, is_second_up, is_up)
return 'Done'

def _commit_change(self):
self.arknights_agent_db.commit()

def clear_ups(self):
self.arknights_agent_db.execute(
"""
delete from arknights_op where is_limited = true
"""
)
self.arknights_agent_db.execute(
"""
update arknights_op set is_secondary_up = false, is_up = false, is_limited = false
"""
)
self._commit_change()

def add_op(self, agent: str, star: Union[int, str]):
if isinstance(star, str) and not star.isdigit():
return '?'

star = int(star)
self._insert_op(star, agent)
return f'成功将{agent}加入{star}星干员组'

def get_up(self) -> str:
result = ''
four_up = self._get_all_uped_op(4)
five_up = self._get_all_uped_op(5)
six_up = self._get_all_uped_op(6)

secondary_up = self._get_all_secondary_up_op(6)

result += '四星:' + ','.join(four_up) + '\n'
result += '五星:' + ','.join(five_up) + '\n'
result += '六星:' + ','.join(six_up) + '\n'
result += ('六星小UP:' + ','.join(secondary_up) + '\n') if self._get_if_limit_banner_on() else ''

return result if result else '无\n'

def __str__(self):
"""
Generating the result of the headhunt.
:return: str, the result of the headhunt in Chinese.
"""
response = ''

response += f'本次卡池UP:\n{self.get_up()}'
six_star = 0
for idx, elements in enumerate(self.random_class):
if elements == 6 or elements == -1:
six_star += 1

if elements == -1:
element = 6
else:
element = elements
response += str(element) + '星干员: %s\n' % self.random_agent[idx]

if 5 not in self.random_class and 6 not in self.random_class:
congrats = '哈↑哈↓紫气东来'

else:
if six_star > 6:
congrats = '你丫神仙吧草w'
elif six_star > 3:
congrats = '这个爆率感觉机器人应该是坏了吧'
elif six_star >= 1:
congrats = '有点幸运了啊'
else:
congrats = '没事这结果挺正常的,会好起来的,哈哈哈哈嗝~'

response += '本次寻访获得了%d个六星干员,%s' % (six_star, congrats)
return response


# Test
if __name__ == '__main__':
api = ArkHeadhunt(times=10)
api.get_randomized_results()
print(api.__str__())
16 changes: 7 additions & 9 deletions Services/discord_service.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import sqlite3
from datetime import datetime
from json import dumps, loads
from os import getcwd, path
from os import path
from re import findall
from time import time
from typing import List
Expand All @@ -10,10 +10,12 @@
from nonebot.adapters.onebot.v11 import MessageSegment
from youtube_dl.utils import sanitize_filename

from Services.util.common_util import HttpxHelperClient, DiscordGroupNotification, DiscordMessageStatus, time_to_literal
from awesome.Constants.path_constants import BILIBILI_PIC_PATH
from Services.util.common_util import HttpxHelperClient, time_to_literal
from awesome.Constants.path_constants import BILIBILI_PIC_PATH, DB_PATH
from awesome.Constants.vtuber_function_constants import GPT_4_MODEL_NAME
from config import SUPER_USER, DISCORD_AUTH
from model.common_model import DiscordMessageStatus, DiscordGroupNotification
from util.db_utils import fetch_one_or_default
from util.helper_util import construct_message_chain


Expand All @@ -25,7 +27,7 @@ def __init__(self):
' AppleWebKit/537.36 (KHTML, like Gecko)'
' Chrome/126.0.0.0 Safari/537.36'
}
self.database = sqlite3.connect(f'{getcwd()}/data/db/live_notification_data.db')
self.database = sqlite3.connect(path.join(DB_PATH, 'live_notification_data.db'))
self._init_database()
self.client = HttpxHelperClient()

Expand Down Expand Up @@ -110,11 +112,7 @@ async def _get_the_latest_existing_discord_message(self, channel_id):
""", (channel_id,)
).fetchone()

data = data if not isinstance(data, tuple) else data[0]
if data is None:
return 0

return data
return fetch_one_or_default(data, 0)

async def check_discord_updates(self) -> List[DiscordGroupNotification]:
notification_list = self._retrieve_all_record_in_db()
Expand Down
18 changes: 10 additions & 8 deletions Services/rate_limiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Union

from Services.util.common_util import time_to_literal
from model.common_model import RateLimitStatus


class UserLimitModifier:
Expand Down Expand Up @@ -193,23 +194,24 @@ async def _query_permission(
query_user_prompt, wait_time = await self._query_user_permission(function_name, user_id, user_limit_modifier)
return query_user_prompt, wait_time

async def _assemble_limit_prompt(self, prompt, wait_time) -> [None, str]:
async def _assemble_limit_prompt(self, prompt, wait_time) -> RateLimitStatus:
if prompt == self.LIMIT_BY_USER:
return f'别玩啦,过{await time_to_literal(wait_time)}再回来玩好不好?'
return RateLimitStatus(
True, f'别玩啦,过{await time_to_literal(wait_time if wait_time > 0 else 1)}再回来玩好不好?')
elif prompt == self.LIMIT_BY_GROUP:
return f'群使用已到达允许上限,请稍等{await time_to_literal(wait_time)}重试'
return RateLimitStatus(True, f'群使用已到达允许上限,请稍等{await time_to_literal(wait_time)}重试')
elif prompt == self.TEMPRORARY_DISABLED:
return f'该功能全局禁用中,请稍等{await time_to_literal(wait_time)}再试。'
return RateLimitStatus(True, f'该功能全局禁用中,请稍等{await time_to_literal(wait_time)}再试。')

return None
return RateLimitStatus(False, None)

async def user_group_limit_check(
self,
function_name: str,
user_id: Union[str, int],
group_id: Union[str, int],
user_limit_modifier: UserLimitModifier
) -> Union[str, None]:
) -> RateLimitStatus:
user_id = str(user_id)
group_id = str(group_id)

Expand All @@ -221,7 +223,7 @@ async def user_limit_check(
function_name: str,
user_id: Union[str, int],
user_limit_modifier: UserLimitModifier
):
) -> RateLimitStatus:
user_id = str(user_id)
query_result, wait_time = await self._query_user_permission(function_name, user_id, user_limit_modifier)
return await self._assemble_limit_prompt(query_result, wait_time)
Expand All @@ -232,7 +234,7 @@ async def group_limit_check(
group_id: Union[str, int],
time_period=60,
function_limit=None
):
) -> RateLimitStatus:
group_id = str(group_id)
query_result, wait_time = await self._query_group_permission(
function_name,
Expand Down
Loading

0 comments on commit 5673616

Please sign in to comment.