diff --git a/examples/pipeline.py b/examples/pipeline.py index 35cf6dd..9fa3279 100644 --- a/examples/pipeline.py +++ b/examples/pipeline.py @@ -13,6 +13,7 @@ def datetime_from_string(datetime_string: str) -> datetime: streaming_data_starting_from = datetime_from_string("2022-01-01") streaming_data_ending_at = datetime_from_string("2022-08-01") time_interval = 60 + chunk_size = 100 active_strategy = True cdd_strategy = DDM( warning_level=0.1, @@ -28,6 +29,7 @@ def datetime_from_string(datetime_string: str) -> datetime: stream_start=streaming_data_starting_from, stream_end=streaming_data_ending_at, interval_min=time_interval, + chunk_size=chunk_size, active_strategy=active_strategy, cdd_strategy=cdd_strategy, output_parent_dir=folder_path_to_save_result, diff --git a/src/pipeline.py b/src/pipeline.py index bdad359..3089ac8 100644 --- a/src/pipeline.py +++ b/src/pipeline.py @@ -25,6 +25,7 @@ def run_exp( stream_start: datetime, stream_end: datetime, interval_min: float, + chunk_size: int, active_strategy: Optional[bool] = False, cdd_strategy: Optional[IStrategy] = None, output_parent_dir: Optional[str] = "./", @@ -38,7 +39,8 @@ def run_exp( hist_end: End timestamp (exclusive) for historical data. stream_start: Start timestamp (inclusive) for streaming data. stream_end: End timestamp (exclusive) for streaming data. - interval_min: Time interval (in minutes) for data processing. + interval_min: Minimum waiting time (in minutes) for data processing. + chunk_size: Minimum required amount of data for data processing. active_strategy: Flag indicating whether to use active strategy (default is False). cdd_strategy: Strategy to be used for detecting concept drift. Required if active_strategy is True. output_parent_dir: Parent directory's path to save experiment results. @@ -49,13 +51,37 @@ def run_exp( """ warnings.filterwarnings("ignore", category=SettingWithCopyWarning) + if interval_min == 0 and chunk_size == 0: + raise ValueError("Both interval_min & chunk_size cannot be set to zero.") + else: + count_not_enough = False + is_end_reached = False + + hybrid_bp = interval_min != 0 and chunk_size != 0 + scheduled_bp = interval_min != 0 and chunk_size == 0 + + if hybrid_bp: + bp_technique = "HYBRID" + elif scheduled_bp: + bp_technique = "SCHEDULED" + else: + bp_technique = "PERIODIC" + if active_strategy and cdd_strategy is None: raise ValueError("cdd_strategy must be provided when active_strategy is True.") + else: + strategy = "ACTIVE" if active_strategy else "PASSIVE" + + print(f"BATCH PROCESSING TECHNIQUE: {bp_technique} | CONCEPT DRIFT HANDLING STRATEGY: {strategy}") rt_df: DataFrame = BUS_654_FEATURES_ADDED_RUNNING_TIMES.dataframe dt_df: DataFrame = BUS_654_FEATURES_ADDED_DWELL_TIMES.dataframe dt_df: DataFrame = dt_df.drop(columns=["dwell_time_in_seconds_old"]) - dt_df["arrival_datetime"] = to_datetime(dt_df["date"] + " " + dt_df["arrival_time"]) + + dt_df["arrival_datetime"] = to_datetime( + dt_df["date"] + " " + dt_df["arrival_time"], format="%Y-%m-%d %H:%M:%S" + ) + dt_df.sort_values(by="arrival_datetime", inplace=True) base_model: Optional[MME4BAT] = None model: Optional[MME4BAT] = None @@ -76,6 +102,16 @@ def run_exp( to_date_time = hist_end while from_date_time < stream_end: + if hybrid_bp and count_not_enough: + temp_df = dt_df.loc[ + (dt_df["arrival_datetime"] >= from_date_time), + :, + ].reset_index(drop=True) + if temp_df.shape[0] < chunk_size: + is_end_reached = True + else: + to_date_time = temp_df["arrival_datetime"].iloc[chunk_size - 1] + timedelta(minutes=1) + print( f"DATA STREAM: [{from_date_time.strftime('%Y-%m-%d %H:%M:%S')} - {to_date_time.strftime('%Y-%m-%d %H:%M:%S')})", end="", @@ -87,71 +123,83 @@ def run_exp( & (dt_df["arrival_datetime"] < to_date_time), :, ].reset_index(drop=True) + count_not_enough = dt_chunk.shape[0] < chunk_size + + if ( + scheduled_bp + or (hybrid_bp and not count_not_enough) + or is_end_reached + ): + from_date_time = ( + stream_start if from_date_time == hist_start else to_date_time + ) + to_date_time = from_date_time + timedelta(minutes=interval_min) + + if len(dt_chunk) == 0: + print(" | NO INSTANCES") + continue + else: + print(f" | NUMBER OF INSTANCES: {len(dt_chunk)}") - from_date_time = stream_start if from_date_time == hist_start else to_date_time - to_date_time = from_date_time + timedelta(minutes=interval_min) - - if len(dt_chunk) == 0: - print(" | NO INSTANCES") - continue - - numeric_dt_chunk = dt_chunk.select_dtypes(include="number") + numeric_dt_chunk = dt_chunk.select_dtypes(include="number") - dt_x: DataFrame = numeric_dt_chunk.drop(columns=["dwell_time_in_seconds"]) - dt_y: DataFrame = numeric_dt_chunk[["dwell_time_in_seconds"]] + dt_x: DataFrame = numeric_dt_chunk.drop(columns=["dwell_time_in_seconds"]) + dt_y: DataFrame = numeric_dt_chunk[["dwell_time_in_seconds"]] - if not model: - base_model = MME4BAT() - model = MME4BAT(cdd_strategy=cdd_strategy) + if not model: + base_model = MME4BAT() + model = MME4BAT(cdd_strategy=cdd_strategy) - base_model.fit(rt_x=None, rt_y=None, dt_x=dt_x, dt_y=dt_y) - model.fit(rt_x=None, rt_y=None, dt_x=dt_x, dt_y=dt_y) + base_model.fit(rt_x=None, rt_y=None, dt_x=dt_x, dt_y=dt_y) + model.fit(rt_x=None, rt_y=None, dt_x=dt_x, dt_y=dt_y) - print(" | MODEL INITIATED") - else: - true_prediction = dt_y["dwell_time_in_seconds"].tolist() - base_model_prediction = base_model.predict(rt_x=None, dt_x=dt_x)[ - "prediction" - ].tolist() - model_prediction = model.predict(rt_x=None, dt_x=dt_x)[ - "prediction" - ].tolist() - - true_predictions.extend(true_prediction) - base_model_predictions.extend(base_model_prediction) - model_predictions.extend(model_prediction) - - dt_chunk["true_prediction"] = true_prediction - dt_chunk["base_model_prediction"] = base_model_prediction - dt_chunk["model_prediction"] = model_prediction - - result_dt_df = concat([result_dt_df, dt_chunk], ignore_index=True) - - if active_strategy: - if (dt_x_buffer is None) or (dt_y_buffer is None): - dt_x_buffer = dt_x - dt_y_buffer = dt_y + print(" | MODEL INITIATED") + else: + true_prediction = dt_y["dwell_time_in_seconds"].tolist() + base_model_prediction = base_model.predict(rt_x=None, dt_x=dt_x)[ + "prediction" + ].tolist() + model_prediction = model.predict(rt_x=None, dt_x=dt_x)[ + "prediction" + ].tolist() + + true_predictions.extend(true_prediction) + base_model_predictions.extend(base_model_prediction) + model_predictions.extend(model_prediction) + + dt_chunk["true_prediction"] = true_prediction + dt_chunk["base_model_prediction"] = base_model_prediction + dt_chunk["model_prediction"] = model_prediction + + result_dt_df = concat([result_dt_df, dt_chunk], ignore_index=True) + + if active_strategy: + if (dt_x_buffer is None) or (dt_y_buffer is None): + dt_x_buffer = dt_x + dt_y_buffer = dt_y + else: + dt_x_buffer = concat([dt_x_buffer, dt_x], ignore_index=True) + dt_y_buffer = concat([dt_y_buffer, dt_y], ignore_index=True) + + is_detected = model.is_concept_drift_detected( + ni_rt_x=None, ni_rt_y=None, ni_dt_x=dt_x, ni_dt_y=dt_y + ) + if is_detected: + model.incremental_fit( + ni_rt_x=None, + ni_rt_y=None, + ni_dt_x=dt_x_buffer, + ni_dt_y=dt_y_buffer, + ) + dt_x_buffer = None + dt_y_buffer = None else: - dt_x_buffer = concat([dt_x_buffer, dt_x], ignore_index=True) - dt_y_buffer = concat([dt_y_buffer, dt_y], ignore_index=True) - - is_detected = model.is_concept_drift_detected( - ni_rt_x=None, ni_rt_y=None, ni_dt_x=dt_x, ni_dt_y=dt_y - ) - if is_detected: model.incremental_fit( - ni_rt_x=None, - ni_rt_y=None, - ni_dt_x=dt_x_buffer, - ni_dt_y=dt_y_buffer, + ni_rt_x=None, ni_rt_y=None, ni_dt_x=dt_x, ni_dt_y=dt_y ) - dt_x_buffer = None - dt_y_buffer = None - else: - model.incremental_fit( - ni_rt_x=None, ni_rt_y=None, ni_dt_x=dt_x, ni_dt_y=dt_y - ) - print() + print() + else: + print(f" | NUMBER OF INSTANCES: {len(dt_chunk)} | COUNT IS NOT ENOUGH. WAITING FOR MORE DATA POINTS.") print("\rDATA STREAMING ENDED.", flush=True) print( @@ -176,8 +224,10 @@ def run_exp( - Historical data ending at: {hist_end} - Streaming data starting from: {stream_start} - Streaming data ending at: {stream_end} -- Time interval: {interval_min} -- Concept drift detection strategy: {"ACTIVE" if active_strategy else "PASSIVE"} +- Minimum waiting time: {interval_min} minutes +- Minimum required amount of data: {chunk_size} +- Batch processing technique: {bp_technique} +- Concept drift handling strategy: {strategy} - Concept drift detection algorithm: {cdd_strategy.__class__.__name__ if active_strategy else None} {cdd_strategy_content}