-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpreprocess_data.py
83 lines (65 loc) · 2.45 KB
/
preprocess_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import os
import pickle
import click
import pandas as pd
from sklearn.feature_extraction import DictVectorizer
def dump_pickle(obj, filename: str):
with open(filename, "wb") as f_out:
return pickle.dump(obj, f_out)
def read_dataframe(filename: str):
df = pd.read_parquet(filename)
df['duration'] = df['lpep_dropoff_datetime'] - df['lpep_pickup_datetime']
df.duration = df.duration.apply(lambda td: td.total_seconds() / 60)
df = df[(df.duration >= 1) & (df.duration <= 60)]
categorical = ['PULocationID', 'DOLocationID']
df[categorical] = df[categorical].astype(str)
return df
def preprocess(df: pd.DataFrame, dv: DictVectorizer, fit_dv: bool = False):
df['PU_DO'] = df['PULocationID'] + '_' + df['DOLocationID']
categorical = ['PU_DO']
numerical = ['trip_distance']
dicts = df[categorical + numerical].to_dict(orient='records')
if fit_dv:
X = dv.fit_transform(dicts)
else:
X = dv.transform(dicts)
return X, dv
@click.command()
@click.option(
"--raw_data_path",
help="Location where the raw NYC taxi trip data was saved"
)
@click.option(
"--dest_path",
help="Location where the resulting files will be saved"
)
def run_data_prep(raw_data_path: str, dest_path: str, dataset: str = "green"):
# Load parquet files
df_train = read_dataframe(
os.path.join(raw_data_path, f"{dataset}_tripdata_2022-01.parquet")
)
df_val = read_dataframe(
os.path.join(raw_data_path, f"{dataset}_tripdata_2022-02.parquet")
)
df_test = read_dataframe(
os.path.join(raw_data_path, f"{dataset}_tripdata_2022-03.parquet")
)
# Extract the target
target = 'tip_amount'
y_train = df_train[target].values
y_val = df_val[target].values
y_test = df_test[target].values
# Fit the DictVectorizer and preprocess data
dv = DictVectorizer()
X_train, dv = preprocess(df_train, dv, fit_dv=True)
X_val, _ = preprocess(df_val, dv, fit_dv=False)
X_test, _ = preprocess(df_test, dv, fit_dv=False)
# Create dest_path folder unless it already exists
os.makedirs(dest_path, exist_ok=True)
# Save DictVectorizer and datasets
dump_pickle(dv, os.path.join(dest_path, "dv.pkl"))
dump_pickle((X_train, y_train), os.path.join(dest_path, "train.pkl"))
dump_pickle((X_val, y_val), os.path.join(dest_path, "val.pkl"))
dump_pickle((X_test, y_test), os.path.join(dest_path, "test.pkl"))
if __name__ == '__main__':
run_data_prep()