Skip to content

Commit

Permalink
feat: Introduce a parameter is_buffer_enabled (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
kajanan1212 authored Apr 22, 2024
1 parent 14d83d0 commit 463d7d0
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 8 deletions.
4 changes: 3 additions & 1 deletion examples/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ def datetime_from_string(datetime_string: str) -> datetime:
historical_data_ending_at = datetime_from_string("2022-02-01")
streaming_data_starting_from = datetime_from_string("2022-02-01")
streaming_data_ending_at = datetime_from_string("2022-11-01")
time_interval = 60 * 2
time_interval = 60 * 3
chunk_size = 100
active_strategy = True
is_buffer_enabled = True
cdd_strategy = DDM(
warning_level=0.1,
drift_level=1.5,
Expand All @@ -31,6 +32,7 @@ def datetime_from_string(datetime_string: str) -> datetime:
interval_min=time_interval,
chunk_size=chunk_size,
active_strategy=active_strategy,
is_buffer_enabled=is_buffer_enabled,
cdd_strategy=cdd_strategy,
output_parent_dir=folder_path_to_save_result,
label=experiment_label,
Expand Down
23 changes: 16 additions & 7 deletions src/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def run_exp(
interval_min: float,
chunk_size: int,
active_strategy: Optional[bool] = False,
is_buffer_enabled: Optional[bool] = False,
cdd_strategy: Optional[IStrategy] = None,
output_parent_dir: Optional[str] = "./",
label: Optional[str] = "",
Expand All @@ -44,6 +45,7 @@ def run_exp(
interval_min: Minimum waiting time (in minutes) for data processing.
chunk_size: Minimum required amount of data for data processing.
active_strategy: Flag indicating whether to use active strategy (default is False).
is_buffer_enabled: To do (default is False).
cdd_strategy: Strategy to be used for detecting concept drift. Required if active_strategy is True.
output_parent_dir: Parent directory's path to save experiment results.
label: Experiment label (default is an empty string).
Expand Down Expand Up @@ -179,12 +181,18 @@ def run_exp(
start_time = time()

if active_strategy:
if (dt_x_buffer is None) or (dt_y_buffer is None):
dt_x_buffer = dt_x
dt_y_buffer = dt_y
if is_buffer_enabled:
if (dt_x_buffer is None) or (dt_y_buffer is None):
dt_x_buffer = dt_x
dt_y_buffer = dt_y
else:
dt_x_buffer = concat([dt_x_buffer, dt_x], ignore_index=True)
dt_y_buffer = concat([dt_y_buffer, dt_y], ignore_index=True)
ni_dt_x = dt_x_buffer
ni_dt_y = dt_y_buffer
else:
dt_x_buffer = concat([dt_x_buffer, dt_x], ignore_index=True)
dt_y_buffer = concat([dt_y_buffer, dt_y], ignore_index=True)
ni_dt_x = dt_x
ni_dt_y = dt_y

is_detected = model.is_concept_drift_detected(
ni_rt_x=None, ni_rt_y=None, ni_dt_x=dt_x, ni_dt_y=dt_y
Expand All @@ -193,8 +201,8 @@ def run_exp(
model.incremental_fit(
ni_rt_x=None,
ni_rt_y=None,
ni_dt_x=dt_x,
ni_dt_y=dt_y,
ni_dt_x=ni_dt_x,
ni_dt_y=ni_dt_y,
)
dt_x_buffer = None
dt_y_buffer = None
Expand Down Expand Up @@ -243,6 +251,7 @@ def run_exp(
- Minimum required amount of data: {chunk_size}
- Batch processing technique: {bp_technique}
- Concept drift handling strategy: {strategy}
- Is buffer enabled: {is_buffer_enabled}
- Concept drift detection algorithm: {cdd_strategy.__class__.__name__ if active_strategy else None}
{cdd_strategy_content}
Expand Down

0 comments on commit 463d7d0

Please sign in to comment.