forked from x4nth055/pythoncode-tutorials
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
319 lines (241 loc) · 10.3 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
class CategoricalEncoder(BaseEstimator, TransformerMixin):
"""
Encodes categorical columns using LabelEncoding, OneHotEncoding and TargetEncoding.
LabelEncoding is used for binary categorical columns
OneHotEncoding is used for columns with <= 10 distinct values
TargetEncoding is used for columns with higher cardinality (>10 distinct values)
"""
def __init__(self, cols = None, lcols = None, ohecols = None, tcols = None, reduce_df = False):
"""
Parameters
----------
cols : list of str
Columns to encode. Default is to one-hot/target/label encode all categorical columns in the DataFrame.
reduce_df : bool
Whether to use reduced degrees of freedom for encoding
(that is, add N-1 one-hot columns for a column with N
categories). E.g. for a column with categories A, B,
and C: When reduce_df is True, A=[1, 0], B=[0, 1],
and C=[0, 0]. When reduce_df is False, A=[1, 0, 0],
B=[0, 1, 0], and C=[0, 0, 1]
Default = False
"""
if isinstance(cols,str):
self.cols = [cols]
else :
self.cols = cols
if isinstance(lcols,str):
self.lcols = [lcols]
else :
self.lcols = lcols
if isinstance(ohecols,str):
self.ohecols = [ohecols]
else :
self.ohecols = ohecols
if isinstance(tcols,str):
self.tcols = [tcols]
else :
self.tcols = tcols
self.reduce_df = reduce_df
def fit(self, X, y):
"""Fit label/one-hot/target encoder to X and y
Parameters
----------
X : pandas DataFrame, shape [n_samples, n_columns]
DataFrame containing columns to encode
y : pandas Series, shape = [n_samples]
Target values.
Returns
-------
self : encoder
Returns self.
"""
# Encode all categorical cols by default
if self.cols is None:
self.cols = [c for c in X if str(X[c].dtype)=='object']
# Check columns are in X
for col in self.cols:
if col not in X:
raise ValueError('Column \''+col+'\' not in X')
# Separating out lcols, ohecols and tcols
if self.lcols is None:
self.lcols = [c for c in self.cols if X[c].nunique() <= 2]
if self.ohecols is None:
self.ohecols = [c for c in self.cols if ((X[c].nunique() > 2) & (X[c].nunique() <= 10))]
if self.tcols is None:
self.tcols = [c for c in self.cols if X[c].nunique() > 10]
## Create Label Encoding mapping
self.lmaps = dict()
for col in self.lcols:
self.lmaps[col] = dict(zip(X[col].values, X[col].astype('category').cat.codes.values))
## Create OneHot Encoding mapping
self.ohemaps = dict() #dict to store map for each column
for col in self.ohecols:
self.ohemaps[col] = []
uniques = X[col].unique()
for unique in uniques:
self.ohemaps[col].append(unique)
if self.reduce_df:
del self.ohemaps[col][-1]
## Create Target Encoding mapping
self.global_target_mean = y.mean().round(2)
self.sum_count = dict()
for col in self.tcols:
self.sum_count[col] = dict()
uniques = X[col].unique()
for unique in uniques:
ix = X[col]==unique
self.sum_count[col][unique] = (y[ix].sum(),ix.sum())
## Return the fit object
return self
def transform(self, X, y=None):
"""Perform label/one-hot/target encoding transformation.
Parameters
----------
X : pandas DataFrame, shape [n_samples, n_columns]
DataFrame containing columns to label encode
Returns
-------
pandas DataFrame
Input DataFrame with transformed columns
"""
Xo = X.copy()
## Perform label encoding transformation
for col, lmap in self.lmaps.items():
# Map the column
Xo[col] = Xo[col].map(lmap)
Xo[col].fillna(-1, inplace=True) ## Filling new values with -1
## Perform one-hot encoding transformation
for col, vals in self.ohemaps.items():
for val in vals:
new_col = col+'_'+str(val)
Xo[new_col] = (Xo[col]==val).astype('uint8')
del Xo[col]
## Perform LOO target encoding transformation
# Use normal target encoding if this is test data
if y is None:
for col in self.sum_count:
vals = np.full(X.shape[0], np.nan)
for cat, sum_count in self.sum_count[col].items():
vals[X[col]==cat] = (sum_count[0]/sum_count[1]).round(2)
Xo[col] = vals
Xo[col].fillna(self.global_target_mean, inplace=True) # Filling new values by global target mean
# LOO target encode each column
else:
for col in self.sum_count:
vals = np.full(X.shape[0], np.nan)
for cat, sum_count in self.sum_count[col].items():
ix = X[col]==cat
if sum_count[1] > 1:
vals[ix] = ((sum_count[0]-y[ix].reshape(-1,))/(sum_count[1]-1)).round(2)
else :
vals[ix] = ((y.sum() - y[ix])/(X.shape[0] - 1)).round(2) # Catering to the case where a particular
# category level occurs only once in the dataset
Xo[col] = vals
Xo[col].fillna(self.global_target_mean, inplace=True) # Filling new values by global target mean
## Return encoded DataFrame
return Xo
def fit_transform(self, X, y=None):
"""Fit and transform the data via label/one-hot/target encoding.
Parameters
----------
X : pandas DataFrame, shape [n_samples, n_columns]
DataFrame containing columns to encode
y : pandas Series, shape = [n_samples]
Target values (required!).
Returns
-------
pandas DataFrame
Input DataFrame with transformed columns
"""
return self.fit(X, y).transform(X, y)
class AddFeatures(BaseEstimator):
"""
Add new, engineered features using original categorical and numerical features of the DataFrame
"""
def __init__(self, eps = 1e-6):
"""
Parameters
----------
eps : A small value to avoid divide by zero error. Default value is 0.000001
"""
self.eps = eps
def fit(self, X, y=None):
return self
def transform(self, X):
"""
Parameters
----------
X : pandas DataFrame, shape [n_samples, n_columns]
DataFrame containing base columns using which new interaction-based features can be engineered
"""
Xo = X.copy()
## Add 4 new columns - bal_per_product, bal_by_est_salary, tenure_age_ratio, age_surname_mean_churn
Xo['bal_per_product'] = Xo.Balance/(Xo.NumOfProducts + self.eps)
Xo['bal_by_est_salary'] = Xo.Balance/(Xo.EstimatedSalary + self.eps)
Xo['tenure_age_ratio'] = Xo.Tenure/(Xo.Age + self.eps)
Xo['age_surname_enc'] = np.sqrt(Xo.Age) * Xo.Surname_enc
## Returning the updated dataframe
return Xo
def fit_transform(self, X, y=None):
"""
Parameters
----------
X : pandas DataFrame, shape [n_samples, n_columns]
DataFrame containing base columns using which new interaction-based features can be engineered
"""
return self.fit(X,y).transform(X)
class CustomScaler(BaseEstimator, TransformerMixin):
"""
A custom standard scaler class with the ability to apply scaling on selected columns
"""
def __init__(self, scale_cols = None):
"""
Parameters
----------
scale_cols : list of str
Columns on which to perform scaling and normalization. Default is to scale all numerical columns
"""
self.scale_cols = scale_cols
def fit(self, X, y=None):
"""
Parameters
----------
X : pandas DataFrame, shape [n_samples, n_columns]
DataFrame containing columns to scale
"""
# Scaling all non-categorical columns if user doesn't provide the list of columns to scale
if self.scale_cols is None:
self.scale_cols = [c for c in X if ((str(X[c].dtype).find('float') != -1) or (str(X[c].dtype).find('int') != -1))]
## Create mapping corresponding to scaling and normalization
self.maps = dict()
for col in self.scale_cols:
self.maps[col] = dict()
self.maps[col]['mean'] = np.mean(X[col].values).round(2)
self.maps[col]['std_dev'] = np.std(X[col].values).round(2)
# Return fit object
return self
def transform(self, X):
"""
Parameters
----------
X : pandas DataFrame, shape [n_samples, n_columns]
DataFrame containing columns to scale
"""
Xo = X.copy()
## Map transformation to respective columns
for col in self.scale_cols:
Xo[col] = (Xo[col] - self.maps[col]['mean']) / self.maps[col]['std_dev']
# Return scaled and normalized DataFrame
return Xo
def fit_transform(self, X, y=None):
"""
Parameters
----------
X : pandas DataFrame, shape [n_samples, n_columns]
DataFrame containing columns to scale
"""
# Fit and return transformed dataframe
return self.fit(X).transform(X)