forked from pylhc/Beta-Beat.src
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmadx_wrapper.py
237 lines (181 loc) · 7.84 KB
/
madx_wrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
"""
:module: madx.madx_wrapper
Runs MADX with a file or a string as an input, it processes @required macros.
If defined, writes the processed MADX script and logging output into files.
TODO: write tests
"""
from os.path import abspath, join, dirname
import os
import sys
import re
import subprocess
import optparse
import contextlib
from tempfile import mkstemp
from utils import logging_tools
LOG = logging_tools.get_logger(__name__)
LIB = abspath(join(dirname(__file__), "madx", "lib"))
_LOCAL_PATH = join(dirname(__file__), "madx", "bin")
_AFS_PATH = join("/", "afs", "cern.ch", "user", "m", "mad", "madx", "releases", "last-rel")
if "darwin" in sys.platform:
_MADX_BIN = "madx-macosx64-intel"
elif "win" in sys.platform:
_MADX_BIN = "madx-win64-gnu.exe"
else:
_MADX_BIN = "madx-linux64-gnu"
MADX_PATH = abspath(join(_LOCAL_PATH, _MADX_BIN))
MADX_AFS_PATH = abspath(join(_AFS_PATH, _MADX_BIN))
def get_madx_path():
if os.path.exists(MADX_AFS_PATH):
return MADX_AFS_PATH
return MADX_PATH
class MadxError(Exception):
pass
# Arguments ##################################################################
def _parse_args():
parser = optparse.OptionParser()
parser.add_option("-f", "--file", metavar="FILE", dest="file",
help="The file with the annotated MADX input to run.")
parser.add_option("-o", "--output", metavar="OUTPUT", dest="output",
help="If defined, it will write the processed MADX script into the file.")
parser.add_option("-l", "--log", metavar="LOG", dest="log",
help="File where to write the MADX log output.")
parser.add_option("-m", "--madx", metavar="MADX", dest="madx_path",
help="Path to the MAD-X executable to use", default=MADX_PATH)
parser.add_option("-c", "--cwd", metavar="CWD", dest="cwd",
help="Set current working directory")
(options, args) = parser.parse_args()
if len(args) > 1 or ((options.file is None) == (len(args) == 0)):
raise IOError("No input found: either pass the file as first parameter or use --file (-f)")
if len(args) == 1:
return args[0], options.output, options.log, options.madx_path, options.cwd
return options.file, options.output, options.log, options.madx_path, options.cwd
# Main Methods ###############################################################
def resolve_and_run_file(input_file, output_file=None, log_file=None,
madx_path=get_madx_path(), cwd=None):
"""Runs MADX in a subprocess.
Attributes:
input_file: MADX input file
output_file: If given writes resolved MADX script.
log_file: If given writes MADX logging output.
madx_path: Path to MADX executable
"""
input_string = _read_input_file(input_file)
resolve_and_run_string(input_string, output_file=output_file, log_file=log_file,
madx_path=madx_path, cwd=cwd)
def resolve_and_run_string(input_string, output_file=None, log_file=None,
madx_path=MADX_PATH, cwd=None):
"""Runs MADX in a subprocess.
Attributes:
input_string: MADX input string
output_file: If given writes resolved MADX script.
log_file: If given writes MADX logging output.
madx_path: Path to MADX executable
"""
_check_log_and_output_files(output_file, log_file)
full_madx_script = _resolve(input_string)
_run(full_madx_script, log_file, output_file, madx_path, cwd)
# Main Private Methods ######################################################
def _run(full_madx_script, log_file=None, output_file=None, madx_path=MADX_PATH, cwd=None):
""" Starts the madx-process """
with _madx_input_wrapper(full_madx_script, output_file) as madx_jobfile:
process = subprocess.Popen([madx_path, madx_jobfile], shell=False,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=cwd)
with _logfile_wrapper(log_file) as log_handler, process.stdout:
for line in iter(process.stdout.readline, b''):
log_handler(line)
status = process.wait()
if status:
_raise_madx_error(log=log_file, file=output_file)
# Macro Handling #############################################################
def _resolve(input_string):
"""Resolves the !@requires annotations of the input_string, and returns the resulting script."""
macro_calls = "option, -echo;\n" + _resolve_required_macros(input_string) + "option, echo;\n\n"
full_madx_script = macro_calls + input_string
return full_madx_script
def _resolve_required_macros(file_content):
"""
Recursively searches for "!@require lib" MADX annotations in the input script,
adding the required macros library calls to the script header.
"""
call_commands = ""
for line in file_content.split("\n"):
match = re.search("^!@require\s+([^\s]+).*$", line)
if match is not None:
required_macros = _add_macro_lib_ending(match.group(1))
required_macros_file = abspath(join(LIB, required_macros))
call_commands += _resolve_required_macros(_read_input_file(required_macros_file))
call_commands += "call, file = \"" + required_macros_file + "\";\n"
return call_commands
def _add_macro_lib_ending(macro_lib_name):
macro_lib_name = macro_lib_name.strip()
if macro_lib_name.endswith(".macros.madx"):
return macro_lib_name
else:
return macro_lib_name + ".macros.madx"
# Wrapper ####################################################################
def _read_input_file(input_file):
with open(input_file) as text_file:
return text_file.read()
def _check_log_and_output_files(output_file, log_file):
if output_file is not None:
open(output_file, "a").close()
if log_file is not None:
open(log_file, "a").close()
@contextlib.contextmanager
def _logfile_wrapper(file_path=None):
""" Logs into file and debug if file is given, into info otherwise """
if file_path is None:
def log_handler(line):
line = line.rstrip()
if len(line):
LOG.info(line)
yield log_handler
else:
with open(file_path, "w") as log_file:
def log_handler(line):
log_file.write(line)
line = line.rstrip()
if len(line):
LOG.debug(line)
yield log_handler
@contextlib.contextmanager
def _madx_input_wrapper(content, file_path=None):
""" Writes content into an output file and returns filepath.
If file_path is not given, the output file is temporary and will be deleted afterwards.
"""
if file_path is None:
temp_file = True
fd, file_path = mkstemp(suffix=".madx", prefix="job.", text=True)
os.close(fd) # close file descriptor
if content:
with open(file_path, "w") as f:
f.write(content)
else:
temp_file = False
with open(file_path, "w") as f:
f.write(content)
try:
yield file_path
finally:
if temp_file:
os.remove(file_path)
def _raise_madx_error(log=None, file=None):
""" Rasing Error Wrapper
Extracts extra info from log and output file if given.
"""
message = "MADX run failed."
if log is not None:
try:
with open(log, "r") as f:
content = f.readlines()
if content[-1].startswith("+="):
message += " '{:s}'.".format(content[-1].replace("+=+=+=", "").strip())
except (IOError, IndexError):
pass
if file is not None:
message += " Run on File: '{:s}'.".format(file)
raise MadxError(message)
# Script Mode ################################################################
if __name__ == "__main__":
resolve_and_run_file(*_parse_args())