diff --git a/examples/pipeline.py b/examples/pipeline.py index 3a22e1a..ef61be1 100644 --- a/examples/pipeline.py +++ b/examples/pipeline.py @@ -12,9 +12,10 @@ def datetime_from_string(datetime_string: str) -> datetime: historical_data_ending_at = datetime_from_string("2022-02-01") streaming_data_starting_from = datetime_from_string("2022-02-01") streaming_data_ending_at = datetime_from_string("2022-11-01") - time_interval = 60 * 2 + time_interval = 60 * 3 chunk_size = 100 active_strategy = True + is_buffer_enabled = True cdd_strategy = DDM( warning_level=0.1, drift_level=1.5, @@ -31,6 +32,7 @@ def datetime_from_string(datetime_string: str) -> datetime: interval_min=time_interval, chunk_size=chunk_size, active_strategy=active_strategy, + is_buffer_enabled=is_buffer_enabled, cdd_strategy=cdd_strategy, output_parent_dir=folder_path_to_save_result, label=experiment_label, diff --git a/src/pipeline.py b/src/pipeline.py index 8c482f0..465aa75 100644 --- a/src/pipeline.py +++ b/src/pipeline.py @@ -29,6 +29,7 @@ def run_exp( interval_min: float, chunk_size: int, active_strategy: Optional[bool] = False, + is_buffer_enabled: Optional[bool] = False, cdd_strategy: Optional[IStrategy] = None, output_parent_dir: Optional[str] = "./", label: Optional[str] = "", @@ -44,6 +45,7 @@ def run_exp( interval_min: Minimum waiting time (in minutes) for data processing. chunk_size: Minimum required amount of data for data processing. active_strategy: Flag indicating whether to use active strategy (default is False). + is_buffer_enabled: To do (default is False). cdd_strategy: Strategy to be used for detecting concept drift. Required if active_strategy is True. output_parent_dir: Parent directory's path to save experiment results. label: Experiment label (default is an empty string). @@ -179,12 +181,18 @@ def run_exp( start_time = time() if active_strategy: - if (dt_x_buffer is None) or (dt_y_buffer is None): - dt_x_buffer = dt_x - dt_y_buffer = dt_y + if is_buffer_enabled: + if (dt_x_buffer is None) or (dt_y_buffer is None): + dt_x_buffer = dt_x + dt_y_buffer = dt_y + else: + dt_x_buffer = concat([dt_x_buffer, dt_x], ignore_index=True) + dt_y_buffer = concat([dt_y_buffer, dt_y], ignore_index=True) + ni_dt_x = dt_x_buffer + ni_dt_y = dt_y_buffer else: - dt_x_buffer = concat([dt_x_buffer, dt_x], ignore_index=True) - dt_y_buffer = concat([dt_y_buffer, dt_y], ignore_index=True) + ni_dt_x = dt_x + ni_dt_y = dt_y is_detected = model.is_concept_drift_detected( ni_rt_x=None, ni_rt_y=None, ni_dt_x=dt_x, ni_dt_y=dt_y @@ -193,8 +201,8 @@ def run_exp( model.incremental_fit( ni_rt_x=None, ni_rt_y=None, - ni_dt_x=dt_x, - ni_dt_y=dt_y, + ni_dt_x=ni_dt_x, + ni_dt_y=ni_dt_y, ) dt_x_buffer = None dt_y_buffer = None @@ -243,6 +251,7 @@ def run_exp( - Minimum required amount of data: {chunk_size} - Batch processing technique: {bp_technique} - Concept drift handling strategy: {strategy} +- Is buffer enabled: {is_buffer_enabled} - Concept drift detection algorithm: {cdd_strategy.__class__.__name__ if active_strategy else None} {cdd_strategy_content}