Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add hybrid batch processing #29

Merged
merged 5 commits into from
Apr 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions examples/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def datetime_from_string(datetime_string: str) -> datetime:
streaming_data_starting_from = datetime_from_string("2022-01-01")
streaming_data_ending_at = datetime_from_string("2022-08-01")
time_interval = 60
chunk_size = 100
active_strategy = True
cdd_strategy = DDM(
warning_level=0.1,
Expand All @@ -28,6 +29,7 @@ def datetime_from_string(datetime_string: str) -> datetime:
stream_start=streaming_data_starting_from,
stream_end=streaming_data_ending_at,
interval_min=time_interval,
chunk_size=chunk_size,
active_strategy=active_strategy,
cdd_strategy=cdd_strategy,
output_parent_dir=folder_path_to_save_result,
Expand Down
172 changes: 111 additions & 61 deletions src/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def run_exp(
stream_start: datetime,
stream_end: datetime,
interval_min: float,
chunk_size: int,
active_strategy: Optional[bool] = False,
cdd_strategy: Optional[IStrategy] = None,
output_parent_dir: Optional[str] = "./",
Expand All @@ -38,7 +39,8 @@ def run_exp(
hist_end: End timestamp (exclusive) for historical data.
stream_start: Start timestamp (inclusive) for streaming data.
stream_end: End timestamp (exclusive) for streaming data.
interval_min: Time interval (in minutes) for data processing.
interval_min: Minimum waiting time (in minutes) for data processing.
chunk_size: Minimum required amount of data for data processing.
active_strategy: Flag indicating whether to use active strategy (default is False).
cdd_strategy: Strategy to be used for detecting concept drift. Required if active_strategy is True.
output_parent_dir: Parent directory's path to save experiment results.
Expand All @@ -49,13 +51,37 @@ def run_exp(
"""
warnings.filterwarnings("ignore", category=SettingWithCopyWarning)

if interval_min == 0 and chunk_size == 0:
raise ValueError("Both interval_min & chunk_size cannot be set to zero.")
else:
count_not_enough = False
is_end_reached = False

hybrid_bp = interval_min != 0 and chunk_size != 0
scheduled_bp = interval_min != 0 and chunk_size == 0

if hybrid_bp:
bp_technique = "HYBRID"
elif scheduled_bp:
bp_technique = "SCHEDULED"
else:
bp_technique = "PERIODIC"

if active_strategy and cdd_strategy is None:
raise ValueError("cdd_strategy must be provided when active_strategy is True.")
else:
strategy = "ACTIVE" if active_strategy else "PASSIVE"

print(f"BATCH PROCESSING TECHNIQUE: {bp_technique} | CONCEPT DRIFT HANDLING STRATEGY: {strategy}")

rt_df: DataFrame = BUS_654_FEATURES_ADDED_RUNNING_TIMES.dataframe
dt_df: DataFrame = BUS_654_FEATURES_ADDED_DWELL_TIMES.dataframe
dt_df: DataFrame = dt_df.drop(columns=["dwell_time_in_seconds_old"])
dt_df["arrival_datetime"] = to_datetime(dt_df["date"] + " " + dt_df["arrival_time"])

dt_df["arrival_datetime"] = to_datetime(
dt_df["date"] + " " + dt_df["arrival_time"], format="%Y-%m-%d %H:%M:%S"
)
dt_df.sort_values(by="arrival_datetime", inplace=True)

base_model: Optional[MME4BAT] = None
model: Optional[MME4BAT] = None
Expand All @@ -76,6 +102,16 @@ def run_exp(
to_date_time = hist_end

while from_date_time < stream_end:
if hybrid_bp and count_not_enough:
temp_df = dt_df.loc[
(dt_df["arrival_datetime"] >= from_date_time),
:,
].reset_index(drop=True)
if temp_df.shape[0] < chunk_size:
is_end_reached = True
else:
to_date_time = temp_df["arrival_datetime"].iloc[chunk_size - 1] + timedelta(minutes=1)

print(
f"DATA STREAM: [{from_date_time.strftime('%Y-%m-%d %H:%M:%S')} - {to_date_time.strftime('%Y-%m-%d %H:%M:%S')})",
end="",
Expand All @@ -87,71 +123,83 @@ def run_exp(
& (dt_df["arrival_datetime"] < to_date_time),
:,
].reset_index(drop=True)
count_not_enough = dt_chunk.shape[0] < chunk_size

if (
scheduled_bp
or (hybrid_bp and not count_not_enough)
or is_end_reached
):
from_date_time = (
stream_start if from_date_time == hist_start else to_date_time
)
to_date_time = from_date_time + timedelta(minutes=interval_min)

if len(dt_chunk) == 0:
print(" | NO INSTANCES")
continue
else:
print(f" | NUMBER OF INSTANCES: {len(dt_chunk)}")

from_date_time = stream_start if from_date_time == hist_start else to_date_time
to_date_time = from_date_time + timedelta(minutes=interval_min)

if len(dt_chunk) == 0:
print(" | NO INSTANCES")
continue

numeric_dt_chunk = dt_chunk.select_dtypes(include="number")
numeric_dt_chunk = dt_chunk.select_dtypes(include="number")

dt_x: DataFrame = numeric_dt_chunk.drop(columns=["dwell_time_in_seconds"])
dt_y: DataFrame = numeric_dt_chunk[["dwell_time_in_seconds"]]
dt_x: DataFrame = numeric_dt_chunk.drop(columns=["dwell_time_in_seconds"])
dt_y: DataFrame = numeric_dt_chunk[["dwell_time_in_seconds"]]

if not model:
base_model = MME4BAT()
model = MME4BAT(cdd_strategy=cdd_strategy)
if not model:
base_model = MME4BAT()
model = MME4BAT(cdd_strategy=cdd_strategy)

base_model.fit(rt_x=None, rt_y=None, dt_x=dt_x, dt_y=dt_y)
model.fit(rt_x=None, rt_y=None, dt_x=dt_x, dt_y=dt_y)
base_model.fit(rt_x=None, rt_y=None, dt_x=dt_x, dt_y=dt_y)
model.fit(rt_x=None, rt_y=None, dt_x=dt_x, dt_y=dt_y)

print(" | MODEL INITIATED")
else:
true_prediction = dt_y["dwell_time_in_seconds"].tolist()
base_model_prediction = base_model.predict(rt_x=None, dt_x=dt_x)[
"prediction"
].tolist()
model_prediction = model.predict(rt_x=None, dt_x=dt_x)[
"prediction"
].tolist()

true_predictions.extend(true_prediction)
base_model_predictions.extend(base_model_prediction)
model_predictions.extend(model_prediction)

dt_chunk["true_prediction"] = true_prediction
dt_chunk["base_model_prediction"] = base_model_prediction
dt_chunk["model_prediction"] = model_prediction

result_dt_df = concat([result_dt_df, dt_chunk], ignore_index=True)

if active_strategy:
if (dt_x_buffer is None) or (dt_y_buffer is None):
dt_x_buffer = dt_x
dt_y_buffer = dt_y
print(" | MODEL INITIATED")
else:
true_prediction = dt_y["dwell_time_in_seconds"].tolist()
base_model_prediction = base_model.predict(rt_x=None, dt_x=dt_x)[
"prediction"
].tolist()
model_prediction = model.predict(rt_x=None, dt_x=dt_x)[
"prediction"
].tolist()

true_predictions.extend(true_prediction)
base_model_predictions.extend(base_model_prediction)
model_predictions.extend(model_prediction)

dt_chunk["true_prediction"] = true_prediction
dt_chunk["base_model_prediction"] = base_model_prediction
dt_chunk["model_prediction"] = model_prediction

result_dt_df = concat([result_dt_df, dt_chunk], ignore_index=True)

if active_strategy:
if (dt_x_buffer is None) or (dt_y_buffer is None):
dt_x_buffer = dt_x
dt_y_buffer = dt_y
else:
dt_x_buffer = concat([dt_x_buffer, dt_x], ignore_index=True)
dt_y_buffer = concat([dt_y_buffer, dt_y], ignore_index=True)

is_detected = model.is_concept_drift_detected(
ni_rt_x=None, ni_rt_y=None, ni_dt_x=dt_x, ni_dt_y=dt_y
)
if is_detected:
model.incremental_fit(
ni_rt_x=None,
ni_rt_y=None,
ni_dt_x=dt_x_buffer,
ni_dt_y=dt_y_buffer,
)
dt_x_buffer = None
dt_y_buffer = None
else:
dt_x_buffer = concat([dt_x_buffer, dt_x], ignore_index=True)
dt_y_buffer = concat([dt_y_buffer, dt_y], ignore_index=True)

is_detected = model.is_concept_drift_detected(
ni_rt_x=None, ni_rt_y=None, ni_dt_x=dt_x, ni_dt_y=dt_y
)
if is_detected:
model.incremental_fit(
ni_rt_x=None,
ni_rt_y=None,
ni_dt_x=dt_x_buffer,
ni_dt_y=dt_y_buffer,
ni_rt_x=None, ni_rt_y=None, ni_dt_x=dt_x, ni_dt_y=dt_y
)
dt_x_buffer = None
dt_y_buffer = None
else:
model.incremental_fit(
ni_rt_x=None, ni_rt_y=None, ni_dt_x=dt_x, ni_dt_y=dt_y
)
print()
print()
else:
print(f" | NUMBER OF INSTANCES: {len(dt_chunk)} | COUNT IS NOT ENOUGH. WAITING FOR MORE DATA POINTS.")

print("\rDATA STREAMING ENDED.", flush=True)
print(
Expand All @@ -176,8 +224,10 @@ def run_exp(
- Historical data ending at: {hist_end}
- Streaming data starting from: {stream_start}
- Streaming data ending at: {stream_end}
- Time interval: {interval_min}
- Concept drift detection strategy: {"ACTIVE" if active_strategy else "PASSIVE"}
- Minimum waiting time: {interval_min} minutes
- Minimum required amount of data: {chunk_size}
- Batch processing technique: {bp_technique}
- Concept drift handling strategy: {strategy}
- Concept drift detection algorithm: {cdd_strategy.__class__.__name__ if active_strategy else None}
{cdd_strategy_content}

Expand Down