diff --git a/pyproject.toml b/pyproject.toml index 8d5d909..294b4bb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "skit-calls" -version = "0.2.46" +version = "0.2.47" description = "Library to fetch calls from a given environment." authors = ["ltbringer "] license = "GPL-3.0-only" diff --git a/secrets.dvc b/secrets.dvc index c2bf076..96ad02c 100644 --- a/secrets.dvc +++ b/secrets.dvc @@ -1,5 +1,5 @@ outs: -- md5: e955074d27999737daa70b494ac9df32.dir - size: 5364 +- md5: 371b502452d45963aab9f99e55aaa894.dir + size: 5495 nfiles: 5 path: secrets diff --git a/skit_calls/calls.py b/skit_calls/calls.py index 44889a7..58f299f 100644 --- a/skit_calls/calls.py +++ b/skit_calls/calls.py @@ -9,7 +9,7 @@ from skit_calls import constants as const from skit_calls.data import mutators, query from skit_calls.data.model import Turn - +from skit_calls.utils import convert_str_to_int_list def save_turns_in_memory(stream: Iterable[Dict[str, Any]]) -> pd.DataFrame: return pd.DataFrame(list(stream)) @@ -24,6 +24,42 @@ def save_turns_on_disk(stream: Iterable[Dict[str, Any]]) -> str: writer.writerow(turn) return file_path +def get_call_ids_for_flow(flow_id, + call_quantity, + random_call_id_limit, + start_date, + end_date, + org_ids, + call_type, + lang, + min_duration, + template_id, + use_case, + flow_name, + ignore_callers, + reported): + logger.info(f"Random id limit {random_call_id_limit}") + logger.info(f"Call quantity limit {call_quantity}") + logger.info(f"Flow ids {flow_id}") + call_ids = query.gen_random_call_ids( + start_date=start_date, + end_date=end_date, + ids_=org_ids, + limit=call_quantity, + call_type=call_type, + lang=lang, + min_duration=min_duration, + template_id=template_id, + use_case=use_case, + flow_name=flow_name, + excluded_numbers=ignore_callers, + reported=reported, + flow_id=flow_id, + random_id_limit=random_call_id_limit + ) + logger.info(f"Number of call Ids obtained is {len(call_ids)}") + return call_ids + def sample( start_date: str, @@ -47,6 +83,7 @@ def sample( batch_turns: int = const.TURNS_LIMIT, delay: float = const.Q_DELAY, timezone: str = const.DEFAULT_TIMEZONE, + flow_ids: Optional[List[str]] = [], ) -> Union[str, pd.DataFrame]: """ Sample calls. @@ -101,29 +138,53 @@ def sample( :param timezone: Timezone for the sampling, defaults to "Asia/Kolkata" :type timezone: str, optional + + :param flow_ids: A list of flow ids from which to retrieve the data + :type flow_ids: Optional[str] :return: A directory path if save is set to "files" otherwise path to a file. :rtype: str """ start_time = time.time() - random_call_ids = query.gen_random_call_ids( - start_date, - end_date, - ids_=org_ids, - limit=call_quantity, - call_type=call_type, - lang=lang, - min_duration=min_duration, - template_id=template_id, - use_case=use_case, - flow_name=flow_name, - excluded_numbers=ignore_callers, - reported=reported, - ) - logger.info(f"Number of call Ids obtained is {len(random_call_ids)}") - end_time_first = time.time() - total_time = str(end_time_first - start_time) - logger.info(f"Time required to obtain call IDs {total_time} seconds") + logger.info(f"Flow ids: {flow_ids}") + flow_ids = convert_str_to_int_list(flow_ids) + random_id_limit = min(30*call_quantity, 75000) + all_call_ids = [] + org_ids = convert_str_to_int_list(org_ids) + for flow_id in flow_ids: + flow_id_list = [] + flow_id_list.append(flow_id) + random_call_ids = get_call_ids_for_flow(flow_id_list, const.MIN_ASSURED_CALL_QUANTITY, + const.MIN_RANDOM_CALL_ID_LIMIT, start_date, + end_date, org_ids, call_type, lang, + min_duration, template_id, use_case, + flow_name, ignore_callers, reported) + random_call_id_list_1= list(random_call_ids) + logger.info(f"Number of call ids for flow {flow_id}: {len(random_call_id_list_1)}") + all_call_ids += random_call_id_list_1 + + loop_end_time = time.time() + final_time = str(loop_end_time-start_time) + logger.info(f"Time to finish loop: {final_time}") + + random_call_ids = get_call_ids_for_flow(flow_ids, call_quantity, + random_id_limit, start_date, + end_date, org_ids, call_type, lang, + min_duration, template_id, use_case, + flow_name, ignore_callers, reported) + + end_time_1 = time.time() + final_time_1 = str(end_time_1-start_time) + + random_call_id_list_2= list(random_call_ids) + all_call_ids += random_call_id_list_2 + + final_call_ids = tuple(set(all_call_ids)) + + logger.info(f"Number of call ids: {len(final_call_ids)}") + + logger.info(f"Time to finish getting call ids: {final_time_1}") + random_call_data = query.gen_random_calls( random_call_ids, asr_provider=asr_provider, @@ -136,7 +197,7 @@ def sample( timezone=timezone, ) end_time_second = time.time() - total_time_second_query = str(end_time_second - end_time_first) + total_time_second_query = str(end_time_second - end_time_1) logger.info(f"Time required to obtain call data from queried IDs {total_time_second_query} seconds") if on_disk: return save_turns_on_disk(random_call_data) @@ -144,7 +205,6 @@ def sample( logger.info(f"Number of call with data obtained is {df.shape[0]}") return df - def select( call_ids: Optional[List[int]] = None, org_ids: Optional[Set[int]] = None, diff --git a/skit_calls/cli.py b/skit_calls/cli.py index 042c5f5..4c15652 100644 --- a/skit_calls/cli.py +++ b/skit_calls/cli.py @@ -188,6 +188,13 @@ def build_sample_command(parser: argparse.ArgumentParser) -> None: help="A comma separated list of states to keep turns from, and remove all else.", default=[], ) + parser.add_argument( + "--flow-ids", + type=str, + nargs="*", + help="A comma separated list of flow ids to keep turns from, and remove all else.", + default=[], + ) def build_select_command(parser: argparse.ArgumentParser) -> None: @@ -299,6 +306,7 @@ def random_sample_calls(args: argparse.Namespace) -> Union[str, pd.DataFrame]: batch_turns=args.batch_turns, delay=args.delay, timezone=args.timezone, + flow_ids=args.flow_ids ) logger.info(f"Finished in {time.time() - start:.2f} seconds") return maybe_df diff --git a/skit_calls/constants.py b/skit_calls/constants.py index 1f5eb8f..3f0f41f 100644 --- a/skit_calls/constants.py +++ b/skit_calls/constants.py @@ -115,6 +115,8 @@ CALL_IDS = "call_ids" USE_CASE = "use_case" FLOW_NAME = "flow_name" +FLOW_ID = "flow_id" +RANDOM_ID_LIMT = "random_id_limit" TEMPLATE_ID = "template_id" LANG = "lang" MIN_AUDIO_DURATION = "min_duration" @@ -140,3 +142,5 @@ UCASE_INPUT = "INPUT" UCASE_AUDIO = "AUDIO" MARGIN = 0.1 +MIN_ASSURED_CALL_QUANTITY = 25 # minimum assured number of calls per flow id +MIN_RANDOM_CALL_ID_LIMIT = 750 # An upper limit of MIN_ASSURED_CALL_QUANTITY * 30 \ No newline at end of file diff --git a/skit_calls/data/query.py b/skit_calls/data/query.py index c7a4fc7..873199d 100644 --- a/skit_calls/data/query.py +++ b/skit_calls/data/query.py @@ -35,18 +35,24 @@ def gen_random_call_ids( lang: Optional[str] = None, template_id: Optional[int] = None, flow_name: Optional[str] = None, + flow_id: Optional[Set[str]] = [], min_duration: Optional[float] = None, excluded_numbers: Optional[Set[str]] = None, retry_limit: int = 2, + random_id_limit: int = const.DEFAULT_CALL_QUANTITY, ): excluded_numbers = set(excluded_numbers) or set() ids_ = set(ids_) or set() + if not ids_ or template_id : + ids_ = None + elif ids_ and not template_id: + ids_= tuple(ids_) excluded_numbers = excluded_numbers.union(const.DEFAULT_IGNORE_CALLERS_LIST) reported_status = 0 if reported else None call_filters = { const.END_DATE: end_date, const.START_DATE: start_date, - const.ID: tuple(ids_) if not template_id else (None,), + const.ID: ids_, const.CALL_TYPE: tuple(call_type), const.RESOLVED: reported_status, const.LANG: lang, @@ -56,6 +62,8 @@ def gen_random_call_ids( const.FLOW_NAME: flow_name, const.LIMIT: limit + const.MARGIN * limit, const.TEMPLATE_ID: template_id, + const.FLOW_ID: flow_id, + const.RANDOM_ID_LIMT: random_id_limit } logger.debug(f"call_filters={pformat(call_filters)} | {limit=}") diff --git a/skit_calls/utils.py b/skit_calls/utils.py index a51bea5..d56004f 100644 --- a/skit_calls/utils.py +++ b/skit_calls/utils.py @@ -90,4 +90,11 @@ def optimal_paging_params(total_count: int, page_size: int, delay: int) -> Tuple init_call_quantity *= 10 delay += 0.05 page_size //= 1.8 - return int(page_size), delay \ No newline at end of file + return int(page_size), delay + +def convert_str_to_int_list(str_values): + int_list = [] + if str_values and len(str_values[0])>0: + str_list = str_values[0].strip("[]").split(',') + int_list = [int(value) for value in str_list] + return int_list \ No newline at end of file