-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathpeasyshell.py
330 lines (273 loc) · 10.3 KB
/
peasyshell.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
"""
Description : Python easy shell utilities
Author : [email protected]
License : Apache v2
"""
import atexit
import logging
import os
import shlex
import signal
import subprocess
import sys
import time
logger = logging.getLogger(__name__)
class ColorCodes:
"""16 colors ANSI color palette that works with most terminals and terminals emulators."""
def __init__(self):
pass
default = "\x1b[39m"
black = "\x1b[30m"
red = "\x1b[31m"
green = "\x1b[32m"
yellow = "\x1b[33m"
blue = "\x1b[34m"
magenta = "\x1b[35m"
cyan = "\x1b[36m"
white = "\x1B[37m"
bold_black = "\x1b[30;1m"
bold_red = "\x1b[31;1m"
bold_green = "\x1b[32;1m"
bold_yellow = "\x1b[33;1m"
bold_blue = "\x1b[34;1m"
bold_magenta = "\x1b[35;1m"
bold_cyan = "\x1b[36;1m"
bold_white = "\x1B[37;1m"
dark_gray = "\x1b[90m"
light_red = "\x1b[91m"
light_green = "\x1b[92m"
light_yellow = "\x1b[93m"
light_blue = "\x1b[94m"
light_magenta = "\x1b[95m"
light_cyan = "\x1b[96m"
light_white = "\x1B[97m"
reset = "\x1b[0m"
class ColorizedArgsFormatter(logging.Formatter):
"""
Log formatter that prints each verbosity level in each own color
"""
def __init__(self, fmt):
super(ColorizedArgsFormatter, self).__init__(fmt)
self.level_to_formatter = {}
def add_color_format(level, color, _format):
formatter = logging.Formatter(color + _format + ColorCodes.reset)
self.level_to_formatter[level] = formatter
add_color_format(logging.DEBUG, ColorCodes.dark_gray, fmt)
add_color_format(logging.INFO, ColorCodes.green, fmt)
add_color_format(logging.WARNING, ColorCodes.yellow, fmt)
add_color_format(logging.ERROR, ColorCodes.red, fmt)
add_color_format(logging.CRITICAL, ColorCodes.bold_red, fmt)
def format(self, record):
formatter = self.level_to_formatter.get(record.levelno)
formatted = formatter.format(record)
return formatted
def init_logging(min_level=logging.DEBUG):
"""
Initializing logging to stdout with different color per verbosity level
:param min_level:
Min level to print
:return:
Root logger
"""
root_logger = logging.getLogger()
root_logger.setLevel(min_level)
console_handler = logging.StreamHandler(stream=sys.stdout)
console_handler.setFormatter(ColorizedArgsFormatter("%(message)s"))
root_logger.addHandler(console_handler)
return root_logger
def print_color(text, color):
"""Prints text to stdout with ANSI color"""
print(color + text + ColorCodes.reset)
def is_env_option_set(key, env=os.environ, default=False):
"""Convert an environment key/val into a boolean"""
env_val = env.get(key) or ""
env_val = env_val.lower()
if env_val in ["1", "on", "true", "yes", "y", "t"]:
return True
if env_val in ["0", "off", "false", "no", "n", "f"]:
return False
return default
class ShellResult:
def __init__(self):
self.returncode = None
self.stdout = None
self.stderr = None
child_process_ids = set()
"""
The exit code that the script will end with when timeout occurred and exit_on_timeout is enabled
"""
timeout_exitcode = 88
shres = ShellResult()
def sh(cmd,
fmt_args=(),
echo_cmd=True,
echo_cmd_args=False,
exit_on_fail=True,
exit_on_timeout=False,
capture_out=False,
capture_err=False,
output_remove_trailing_newlines=True,
timeout_sec=0,
poll_wait_sec=0.1,
shell=False,
shell_single_command=True,
env=None,
log_process_id=True,
**fmt_kwargs
):
"""
Executes a command in a child process.
:param fmt_args:
When list is not empty, cmd will be formatted using the args in this list.
:param str cmd:
The command to execute as a single string
:param bool echo_cmd:
Print command before executing (imitating shell set -x (xtrace) argument)
:param bool echo_cmd_args:
Print command as arguments before executing (only when not in shell mode)
:param bool exit_on_fail:
When enabled, script will terminate with exit code of the command when exit code != 0
:param bool exit_on_timeout:
When enabled, script will terminate with exit code specified in timeout_exitcode if command execurion
was not finished in the specified timeout value.
:param bool capture_out:
When enabled, command stdout is returned instead of being printed.
:param bool capture_err:
When enabled, command stderr is returned instead of being printed.
:param bool output_remove_trailing_newlines:
When enabled, captured command output will be cleared from trailing line separators.
This is typically desired because most of shell commands output ends with linebreak and we don't want it when
processing the result
:param int timeout_sec:
max time to wait until a command execution is complete (child process exits).
0 means no timeout.
:param int poll_wait_sec:
Time to wait between polls to check whether command execution already finished. Relevant only for calls
with timeout enabled.
:param bool shell:
Use the shell as the program to execute. This allows using features like pipes in commands, but is
considered a security risk due to the possibility of shell injection vulnerabilities. Use it only
if you trust the source of input for the command.
:param bool shell_single_command:
In shell mode, treat multiple lines as a single command by adding backslash at
the end of each line. when disabled, each line will be treated as separate shell command. Note that when
not in shell mode, single-command is always enabled.
:param env:
A mapping that defines the environment variables for the child process.
Will use the script environment when not specified.
:param log_process_id:
When enabled, ID of child process will be logged at debug level when created and terminated.
:param fmt_kwargs:
Dict of formatting args to format cmd with.
:return:
ShellResult instance with command exit code and captured output (if enabled)
"""
if len(fmt_args) > 0 or len(fmt_kwargs) > 0:
cmd = cmd.format(*fmt_args, **fmt_kwargs)
cmd = cmd.strip()
if shell and shell_single_command:
cmd = cmd.replace("\n", " \\\n")
if echo_cmd:
logger.info("+ " + cmd)
if not shell:
cmd = shlex.split(cmd)
if echo_cmd_args:
logger.info("+ " + str(cmd))
std_out_param = subprocess.PIPE if capture_out else None
std_err_param = subprocess.PIPE if capture_err else None
p = subprocess.Popen(cmd,
stdout=std_out_param,
stderr=std_err_param,
shell=shell,
universal_newlines=False,
env=env)
if log_process_id:
logger.debug("process ID {} created".format(p.pid))
# register the process ID so that we can terminate it on exit hook
# if the parent process terminates while still waiting for child process to complete
# note: child will not be terminated if parent (this) process is killed
child_process_ids.add(p.pid)
timed_out = False
if timeout_sec == 0:
p.wait()
else:
start_time = time.time()
while True:
if p.poll() is not None:
break
if time.time() - start_time > timeout_sec:
logger.debug("shell call timeout ({} sec)".format(timeout_sec))
timed_out = True
os.kill(p.pid, signal.SIGHUP)
os.kill(p.pid, signal.SIGTERM)
p.returncode = timeout_exitcode
break
time.sleep(poll_wait_sec)
if log_process_id:
logger.debug("process ID {} terminated with code {}".format(p.pid, p.returncode))
# child process already terminated, no need to kill it at exit
child_process_ids.remove(p.pid)
# on python3, process output need to be decoded
# on python2 its already a string.
def try_decode(s):
try:
return s.decode()
except (UnicodeDecodeError, AttributeError):
return s
shres.returncode = p.returncode
shres.stdout = None
shres.stdout = None
if p.stdout:
shres.stdout = try_decode(p.stdout.read())
if p.stderr:
shres.stderr = try_decode(p.stderr.read())
if output_remove_trailing_newlines:
if shres.stdout:
shres.stdout = shres.stdout.rstrip(os.linesep)
if shres.stderr:
shres.stderr = shres.stderr.rstrip(os.linesep)
if exit_on_timeout and timed_out:
logger.error("shell call timed-out - stopping execution")
exit(p.returncode)
if not timed_out and exit_on_fail and p.returncode != 0:
logger.error("shell call failed with code {} - stopping execution".format(p.returncode))
exit(p.returncode)
return shres
def input_compatible(prompt=None):
"""
Get input from stdin, compatible with both python 2.7 and Python 3+
"""
try:
input_func = raw_input
except NameError:
input_func = input
# noinspection PyUnboundLocalVariable
return input_func(prompt)
def yes_or_no(question, yes="y", no="n"):
"""Gets y/n input"""
prompt = '{} ({}/{}): '.format(question, yes, no)
answer = input_compatible(prompt)
print(answer)
reply = str(answer).lower().strip()
if reply == yes:
return True
if reply == no:
return False
else:
return yes_or_no(question)
# noinspection PyUnresolvedReferences
def kill_child_processes():
"""
Kills all child processes that were created with sh() and did were not terminated yet.
"""
for pid in child_process_ids:
logger.warn("at_exit: killing pid {}".format(pid))
try:
os.kill(pid, signal.SIGHUP)
os.kill(pid, signal.SIGTERM)
except ProcessLookupError:
# child process may already been terminated,so ignore it
# note that ProcessLookupError is Python v3.5+ only but
# Python 2.7 interpreter should just ignore it.
pass
atexit.register(kill_child_processes)