-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlogging_tqdm.py
111 lines (96 loc) · 5.27 KB
/
logging_tqdm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import logging
import sys
import types
from typing import Union, Any, Callable
from tqdm import tqdm
logging.basicConfig(level=logging.INFO,
format='%(asctime)s: %(message)s')
LOGGER = logging.getLogger()
class LoggingTqdm(tqdm):
class PreserveLastOutputStreamInterceptor():
def __init__(self, output_stream: Any, last_output_initial_value: str = "(no output yet)") -> None:
self.output_stream = output_stream
if hasattr(output_stream, 'encoding'):
self.encoding = output_stream.encoding
self.last_output = last_output_initial_value
def write(self, string: str) -> None:
if string != '\n' and string != '': # don't store or print the end of line after the last iteration
self.last_output = string
self.output_stream.write(string)
else:
# get rid of the status bar line so that it is
# effectively replaced by the log of the last output
self.output_stream.write('\r')
def flush(self) -> None:
self.output_stream.flush()
def __init__(self, *args, **kwargs) -> None:
if 'file' not in kwargs:
kwargs['file'] = LoggingTqdm.PreserveLastOutputStreamInterceptor(sys.stderr)
super(tqdm, self).__init__(*args, **kwargs)
def __iter__(self) -> types.GeneratorType:
iterable = super(tqdm, self).__iter__()
def iter_logging_wrapper():
count = 0
item = None
try:
for count, item in enumerate(iterable):
yield item
finally:
if self.n != count + 1:
print('') # kinda dirty but ensures that the next log line is not appended to the last output line
LOGGER.info(f'failed on item: {item}')
self.n = count
self.refresh()
LOGGER.info(self.fp.last_output[1:])
return iter_logging_wrapper()
@classmethod
def pandas(tclass: type, *targs, **tkwargs) -> None: # pylint: disable=C0202
# adhering to tqdm's definition
if 'file' in tkwargs:
super(tqdm, tclass).pandas(*targs, **tkwargs)
else:
tkwargs['file']=LoggingTqdm.PreserveLastOutputStreamInterceptor(sys.stderr)
from pandas.core.frame import DataFrame
from pandas.core.series import Series
from pandas.core.groupby import DataFrameGroupBy
from pandas.core.groupby import SeriesGroupBy
from pandas.core.groupby import GroupBy
from pandas.core.groupby import PanelGroupBy
from pandas import Panel
tqdm.pandas(tclass, *targs, **tkwargs)
def wrap_progress_apply(original_progress_apply) -> Callable[[Any, Callable, Any, Any], Any]:
def progress_apply(df: Union[DataFrame, Series, DataFrameGroupBy,
SeriesGroupBy, GroupBy, PanelGroupBy, Panel],
func: Callable, *args, **kwargs
) -> Union[DataFrame, Series, DataFrameGroupBy,
SeriesGroupBy, GroupBy, PanelGroupBy, Panel]:
# error handling in progress_apply is difficult because
# the instance of the tqdm object only exists within
# the tqdm progress_apply classmethod
# we can still keep track of the invocations by wrapping
# the apply function but we won't be able to produce an
# updated version of the status bar in the case of
# failure because at that point the tqdm object
# does not exist anymore (this can only be changed
# inside the tqdm code itself which we leave untouched)
count = 1
def counter_wrapped_func(item, *args, **kwargs):
nonlocal count
try:
res = func(item, *args, **kwargs)
except Exception: # pylint: disable=W0703
# needs to be general
print('') # kinda dirty but ensures that the next log line
# is not appended to the last output line
LOGGER.info(f'Failed on pandas apply during the {count}. invocation of the ' +
f'provided apply function processing item: \n{item}')
count+=1
return res
progress_apply_res = original_progress_apply(df, counter_wrapped_func, *args, **kwargs)
LOGGER.info(tkwargs['file'].last_output[1:])
return progress_apply_res
return progress_apply
for datatype in [DataFrame, Series, DataFrameGroupBy, SeriesGroupBy, GroupBy, PanelGroupBy, Panel]:
datatype.progress_apply = wrap_progress_apply(datatype.progress_apply)
tqdm = LoggingTqdm # pylint: disable=C0103
# we want users to be able to use logging.tqdm just like tqdm.tqdm