Skip to content

Commit

Permalink
feat: Introduce active strategy within experiment pipeline (#21)
Browse files Browse the repository at this point in the history
* feat: Introduce active strategy logicwithin experiment pipeline

* feat: Introduce active strategy within experiment pipeline

* feat: Introduce active strategy within experiment pipeline

* Run experiment [upto 2021-11-01, with interval 60 min]

---------

Co-authored-by: Kajanan Selvanesan <[email protected]>
  • Loading branch information
KesaviAravinthan and kajanan1212 authored Dec 31, 2023
1 parent 2798dbb commit c15bffd
Show file tree
Hide file tree
Showing 173 changed files with 171 additions and 24 deletions.
4 changes: 2 additions & 2 deletions examples/concept_drift_detector/ddm.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ def split_train_test(split_date, df):
]
)

y_train = train["arrival_time_in_seconds"]
y_train = train[["arrival_time_in_seconds"]]

y_test = test["arrival_time_in_seconds"]
y_test = test[["arrival_time_in_seconds"]]

pipeline = Pipeline([("model", XGBRegressor(objective="reg:squarederror"))])

Expand Down
13 changes: 11 additions & 2 deletions examples/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
from datetime import datetime
from src.pipeline import run_exp
from src.concept_drift_detector.strategies import DDM


def datetime_from_string(datetime_string: str) -> datetime:
return datetime.strptime(datetime_string, '%Y-%m-%d')
return datetime.strptime(datetime_string, "%Y-%m-%d")


if __name__ == "__main__":
historical_data_starting_from = datetime_from_string("2021-10-01")
historical_data_ending_at = datetime_from_string("2021-10-10")
streaming_data_starting_from = datetime_from_string("2021-10-10")
streaming_data_ending_at = datetime_from_string("2022-11-01")
streaming_data_ending_at = datetime_from_string("2021-11-01")
time_interval = 60
active_strategy = True
cdd_strategy = DDM(
warning_level=2,
drift_level=3,
min_num_instances=5,
)
folder_path_to_save_result = "../experiments"
experiment_label = "m-xgb-s-xgb_model"

Expand All @@ -21,6 +28,8 @@ def datetime_from_string(datetime_string: str) -> datetime:
stream_start=streaming_data_starting_from,
stream_end=streaming_data_ending_at,
interval_min=time_interval,
active_strategy=active_strategy,
cdd_strategy=cdd_strategy,
output_parent_dir=folder_path_to_save_result,
label=experiment_label,
)
28 changes: 28 additions & 0 deletions experiments/ex-m-xgb-s-xgb_model-2023-12-31_18-57-15/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@

# Experiment: m-xgb-s-xgb_model

## Parameters
- Historical data starting from: 2021-10-01 00:00:00
- Historical data ending at: 2021-10-10 00:00:00
- Streaming data starting from: 2021-10-10 00:00:00
- Streaming data ending at: 2021-11-01 00:00:00
- Time interval: 60
- Concept drift detection strategy: ACTIVE
- Concept drift detection algorithm: DDM
- DDM:

| Attribute | Value |
|---|---|
| Warning Level Factor | 2.0 |
| Drift Level Factor | 3.0 |
| Minimum Numbers of Instances to Start Looking for Changes | 5 |


## Results
### Model Performance Metrics
| Model | MAE (s) | MAPE (%) | RMSE (s) |
|-------------------------------------|-----------|------------|------------|
| Base Model (XGBoost) | 11.031494904113444 | 50.88909107177862 | 20.646408296378063 |
| Base Model with Incremental Learning | 4.549701973795891 | 18.23491959340943 | 13.490315714393848 |


Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
7 changes: 7 additions & 0 deletions src/concept_drift_detector/strategies/adwin.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Any, Dict

from river.drift import ADWIN as rADWIN
from matplotlib import gridspec, pyplot as plt
from src.concept_drift_detector.strategies.istrategy import IStrategy
Expand All @@ -23,6 +25,11 @@ def is_concept_drift_detected(self, model, ni_x, ni_y) -> bool:

return is_detected

def get_attributes(self) -> Dict[str, Any]:
return {
"Delta": self._delta,
}


def plot_data(stream, drifts=None):
fig = plt.figure(figsize=(7, 3), tight_layout=True)
Expand Down
36 changes: 23 additions & 13 deletions src/concept_drift_detector/strategies/ddm.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Any, Dict

from sklearn.metrics import mean_absolute_percentage_error
from frouros.detectors.concept_drift import DDM as fDDM, DDMConfig
from frouros.metrics import PrequentialError
Expand Down Expand Up @@ -28,11 +30,12 @@ def is_concept_drift_detected(self, model, ni_x, ni_y) -> bool:
idx_drift, idx_warning = [], []
metric_error = 0

for (i, x), y in zip(ni_x.iterrows(), ni_y):
for (i, x), (j, y) in zip(ni_x.iterrows(), ni_y.iterrows()):
x = x.to_frame().transpose()
y = y.to_frame().transpose()
y_pred = model.predict(x)

mape = mean_absolute_percentage_error([y], y_pred)
mape = mean_absolute_percentage_error(y, y_pred)
metric_error = self._metric(error_value=mape)

self._ddm.update(value=mape)
Expand All @@ -41,19 +44,26 @@ def is_concept_drift_detected(self, model, ni_x, ni_y) -> bool:
if status["drift"] and not is_detected:
is_detected = True
idx_drift.append(i)
print(
f"Concept drift detected at step {i}. Accuracy: {1 - metric_error:.4f}"
)
# print(
# f"Concept drift detected at step {i}. Accuracy: {1 - metric_error:.4f}"
# )
if status["warning"]:
print(
f"warning detected: {i} MAPE={mape:.4f} Accuracy : {1 - metric_error:.4f} "
)
# print(
# f"warning detected: {i} MAPE={mape:.4f} Accuracy : {1 - metric_error:.4f} "
# )
idx_warning.append(i)
if not is_detected:
print("No concept drift detected")
# if not is_detected:
# print("No concept drift detected")

print(f"Final accuracy: {1 - metric_error:.4f}\n")
print("warning index : ", idx_warning)
print("drift index : ", idx_drift)
# print(f"Final accuracy: {1 - metric_error:.4f}\n")
# print("warning index : ", idx_warning)
# print("drift index : ", idx_drift)

return is_detected

def get_attributes(self) -> Dict[str, Any]:
return {
"Warning Level Factor": float(self._warning_level),
"Drift Level Factor": float(self._drift_level),
"Minimum Numbers of Instances to Start Looking for Changes": self._min_num_instances,
}
5 changes: 5 additions & 0 deletions src/concept_drift_detector/strategies/istrategy.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
from abc import abstractmethod
from typing import Any, Dict


class IStrategy:
@abstractmethod
def is_concept_drift_detected(self, model, ni_x, ni_y) -> bool:
pass

@abstractmethod
def get_attributes(self) -> Dict[str, Any]:
pass
7 changes: 7 additions & 0 deletions src/concept_drift_detector/strategies/page_hinkley.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Any, Dict

from river.drift import PageHinkley as rPageHinkley
from matplotlib import gridspec, pyplot as plt
from src.concept_drift_detector.strategies.istrategy import IStrategy
Expand All @@ -23,6 +25,11 @@ def is_concept_drift_detected(self, model, ni_x, ni_y) -> bool:

return is_detected

def get_attributes(self) -> Dict[str, Any]:
return {
"Threshold": self._threshold,
}


def plot_data(stream, drifts=None):
fig = plt.figure(figsize=(7, 3), tight_layout=True)
Expand Down
38 changes: 37 additions & 1 deletion src/models/use_cases/arrival_time/bus/mme4bat.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,26 @@
from copy import deepcopy
from typing import Optional

from src.concept_drift_detector import CDD
from src.concept_drift_detector.strategies import IStrategy
from src.models._base_models.xgboost import XGBClassifier, XGBRegressor


class MME4BAT:
def __init__(self):
def __init__(self, cdd_strategy: Optional[IStrategy] = None) -> None:
# self.xgb_rt_regressor = XGBRegressor()

self.xgb_dt_classifier = XGBClassifier()
self.xgb_dt_regressor = XGBRegressor()

if cdd_strategy:
self._cdd_of_xgb_dt_classifier = CDD(
strategy=deepcopy(cdd_strategy),
)
self._cdd_of_xgb_dt_regressor = CDD(
strategy=deepcopy(cdd_strategy),
)

def fit(self, rt_x, rt_y, dt_x, dt_y) -> None:
# self.xgb_rt_regressor.fit(rt_x, rt_y)

Expand Down Expand Up @@ -43,3 +56,26 @@ def predict(self, rt_x, dt_x):
dt_y.loc[dt_y["prediction"] < 0, "prediction"] = 0

return dt_y

def is_concept_drift_detected(self, ni_rt_x, ni_rt_y, ni_dt_x, ni_dt_y) -> bool:
try:
is_detected_2 = self._cdd_of_xgb_dt_classifier.is_concept_drift_detected(
model=self.xgb_dt_classifier,
ni_x=ni_dt_x,
ni_y=ni_dt_y,
)

ni_dt_x_gt_0 = ni_dt_x[ni_dt_y.iloc[:, 0] > 0]
ni_dt_y_gt_0 = ni_dt_y[ni_dt_y.iloc[:, 0] > 0]

is_detected_3 = self._cdd_of_xgb_dt_regressor.is_concept_drift_detected(
model=self.xgb_dt_regressor,
ni_x=ni_dt_x_gt_0,
ni_y=ni_dt_y_gt_0,
)
print(
f" | CDD at xgb_dt_classifier: {is_detected_2} | CDD at xgb_dt_regressor: {is_detected_3}"
)
return is_detected_2 or is_detected_3
except NameError as e:
raise e
57 changes: 51 additions & 6 deletions src/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
mean_absolute_percentage_error,
mean_squared_error,
)
from src.concept_drift_detector.strategies import IStrategy
from src.datasets import (
BUS_654_FEATURES_ADDED_DWELL_TIMES,
BUS_654_FEATURES_ADDED_RUNNING_TIMES,
Expand All @@ -24,6 +25,8 @@ def run_exp(
stream_start: datetime,
stream_end: datetime,
interval_min: float,
active_strategy: Optional[bool] = False,
cdd_strategy: Optional[IStrategy] = None,
output_parent_dir: Optional[str] = "./",
label: Optional[str] = "",
) -> None:
Expand All @@ -36,15 +39,19 @@ def run_exp(
stream_start: Start timestamp (inclusive) for streaming data.
stream_end: End timestamp (exclusive) for streaming data.
interval_min: Time interval (in minutes) for data processing.
active_strategy: Flag indicating whether to use active strategy (default is False).
cdd_strategy: Strategy to be used for detecting concept drift. Required if active_strategy is True.
output_parent_dir: Parent directory's path to save experiment results.
label: Experiment label (default is an empty string).
Returns:
None
"""

warnings.filterwarnings("ignore", category=SettingWithCopyWarning)

if active_strategy and cdd_strategy is None:
raise ValueError("cdd_strategy must be provided when active_strategy is True.")

rt_df: DataFrame = BUS_654_FEATURES_ADDED_RUNNING_TIMES.dataframe
dt_df: DataFrame = BUS_654_FEATURES_ADDED_DWELL_TIMES.dataframe

Expand All @@ -62,12 +69,15 @@ def run_exp(
base_model_predictions = []
model_predictions = []

dt_x_buffer: Optional[DataFrame] = None
dt_y_buffer: Optional[DataFrame] = None

from_date_time = hist_start
to_date_time = hist_end

while from_date_time < stream_end:
print(
f"\rDATA STREAMING: [{from_date_time.strftime('%Y-%m-%d %H:%M:%S')} - {to_date_time.strftime('%Y-%m-%d %H:%M:%S')})",
f"DATA STREAM: [{from_date_time.strftime('%Y-%m-%d %H:%M:%S')} - {to_date_time.strftime('%Y-%m-%d %H:%M:%S')})",
end="",
flush=True,
)
Expand All @@ -82,6 +92,7 @@ def run_exp(
to_date_time = from_date_time + timedelta(minutes=interval_min)

if len(dt_chunk) == 0:
print(" | NO INSTANCES")
continue

numeric_dt_chunk = dt_chunk.select_dtypes(include="number")
Expand All @@ -91,10 +102,12 @@ def run_exp(

if not model:
base_model = MME4BAT()
model = MME4BAT()
model = MME4BAT(cdd_strategy=cdd_strategy)

base_model.fit(rt_x=None, rt_y=None, dt_x=dt_x, dt_y=dt_y)
model.fit(rt_x=None, rt_y=None, dt_x=dt_x, dt_y=dt_y)

print(" | MODEL INITIATED")
else:
true_prediction = dt_y["dwell_time_in_seconds"].tolist()
base_model_prediction = base_model.predict(rt_x=None, dt_x=dt_x)[
Expand All @@ -114,9 +127,30 @@ def run_exp(

result_dt_df = concat([result_dt_df, dt_chunk], ignore_index=True)

model.incremental_fit(
ni_rt_x=None, ni_rt_y=None, ni_dt_x=dt_x, ni_dt_y=dt_y
)
if active_strategy:
if (dt_x_buffer is None) or (dt_y_buffer is None):
dt_x_buffer = dt_x
dt_y_buffer = dt_y
else:
dt_x_buffer = concat([dt_x_buffer, dt_x], ignore_index=True)
dt_y_buffer = concat([dt_y_buffer, dt_y], ignore_index=True)

is_detected = model.is_concept_drift_detected(
ni_rt_x=None, ni_rt_y=None, ni_dt_x=dt_x, ni_dt_y=dt_y
)
if is_detected:
model.incremental_fit(
ni_rt_x=None,
ni_rt_y=None,
ni_dt_x=dt_x_buffer,
ni_dt_y=dt_y_buffer,
)
dt_x_buffer = None
dt_y_buffer = None
else:
model.incremental_fit(
ni_rt_x=None, ni_rt_y=None, ni_dt_x=dt_x, ni_dt_y=dt_y
)

print("\rDATA STREAMING ENDED.", flush=True)
print(
Expand All @@ -125,6 +159,14 @@ def run_exp(
flush=True,
)

cdd_strategy_content = ""
if active_strategy:
cdd_strategy_content = f"- {cdd_strategy.__class__.__name__}:\n\n"
cdd_strategy_content += f"| Attribute | Value |\n|---|---|\n"

for attr, value in cdd_strategy.get_attributes().items():
cdd_strategy_content += f"| {attr} | {value} |\n"

md_file_content = f"""
# Experiment: {label}
Expand All @@ -134,6 +176,9 @@ def run_exp(
- Streaming data starting from: {stream_start}
- Streaming data ending at: {stream_end}
- Time interval: {interval_min}
- Concept drift detection strategy: {"ACTIVE" if active_strategy else "PASSIVE"}
- Concept drift detection algorithm: {cdd_strategy.__class__.__name__ if active_strategy else None}
{cdd_strategy_content}
## Results
### Model Performance Metrics
Expand Down

0 comments on commit c15bffd

Please sign in to comment.