-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathauxiliary.py
180 lines (153 loc) · 6.37 KB
/
auxiliary.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import os
import math
import operator
def check_input(line: str, rownumber: int):
"""Check input rows for valid format"""
amino_acids_list = ['G', 'A', 'V', 'L',
'I', 'M', 'F', 'W',
'P', 'S', 'T', 'C',
'Y', 'N', 'Q', 'D',
'E', 'K', 'R', 'H', 'Z']
frames = line.split(':')
for a in frames:
if a[:-1].isdigit() & (str(a[-1]).upper() in amino_acids_list):
continue
else:
raise NameError('ERROR: invalid input format at line: {0}'.format(rownumber+2))
def read_genotypes(filename: str) -> list:
"""Read the genotypes from the 'filename' having header."""
genotypes = list()
with open(filename, 'r') as filehandle:
# Skip header line
filehandle.readline()
for ind, line in enumerate(filehandle):
first = line.split('\t')[0]
first = first.replace('\n', '')
if first == '' or first == 'wt':
genotypes.append(('0Z',))
else:
check_input(first, ind)
genotypes.append(tuple(first.split(':')))
return genotypes
def get_delta(genotype1: str, genotype2: str) -> str:
"""Return difference between 'genotype1' and 'genotype2' as
alphabetically ordered list of mutations."""
s1 = set(genotype1)
s2 = set(genotype2)
# Correction for wild-type: it is denoted as '0Z',
# 'Z' being wild-type amino acid
if len(s1) == 1 and '0Z' in s1:
s1 = set()
if len(s2) == 1 and '0Z' in s2:
s2 = set()
dpos_letters = dict((u[:-1], u[-1]) for u in s1.difference(s2))
Dpos_letters = dict((u[:-1], u[-1]) for u in s2.difference(s1))
positions = sorted(int(u) for u in set(list(dpos_letters.keys()) +
list(Dpos_letters.keys())))
reverse = False
# Processing first position, which defines 'reverse' or 'forward'
pos = str(positions[0])
if pos in dpos_letters and pos in Dpos_letters:
if dpos_letters[pos] < Dpos_letters[pos]:
change = dpos_letters[pos] + pos + Dpos_letters[pos]
else:
change = Dpos_letters[pos] + pos + dpos_letters[pos]
reverse = True
elif pos in dpos_letters:
change = dpos_letters[pos] + pos + 'Z'
else:
change = Dpos_letters[pos] + pos + 'Z'
reverse = True
delta = [change]
# Now adding all other mutations
for pos in positions[1:]:
pos = str(pos)
if pos in dpos_letters and pos in Dpos_letters:
if not (reverse):
change = dpos_letters[pos] + pos + Dpos_letters[pos]
else:
change = Dpos_letters[pos] + pos + dpos_letters[pos]
elif pos in dpos_letters:
if not (reverse):
change = dpos_letters[pos] + pos + 'Z'
else:
change = 'Z' + pos + dpos_letters[pos]
else:
if reverse:
change = Dpos_letters[pos] + pos + 'Z'
else:
change = 'Z' + pos + Dpos_letters[pos]
delta.append(change)
if reverse:
return 'reverse', tuple(delta)
else:
return 'forward', tuple(delta)
def mergeiter(*iterables, **kwargs):
"""Given a set of sorted iterables, yield the next value in merged order
Takes an optional `key` callable to compare values by.
Taken from https://stackoverflow.com/questions/14465154/sorting-text-file-by-using-python?noredirect=1&lq=1
"""
iterables = [iter(it) for it in iterables]
iterables = {i: [next(it), i, it] for i, it in enumerate(iterables)}
if 'key' not in kwargs:
key = operator.itemgetter(0)
else:
key = lambda item, key=kwargs['key']: key(item[0])
while True:
value, i, it = min(iterables.values(), key=key)
yield value
try:
iterables[i][0] = next(it)
except StopIteration:
del iterables[i]
if not iterables:
return
def sort_file(input_file_name: str, output_file_name: str):
"""Sort lines from the input_file_name in alphabetic order
and print them to the output_file_name."""
with open(input_file_name, 'r') as fh:
lines = fh.readlines()
lines.sort()
with open(output_file_name, 'w') as fh:
for line in lines:
fh.write(line)
def merge_sorted_files(sorted_file_names: list, max_open_files: int, output_file_name: str) -> bool:
"""Merge files from 'sorted_file_names' and writes the
content into 'output_file_name'."""
if len(sorted_file_names) > max_open_files:
num_parts = math.ceil(len(sorted_file_names) / max_open_files)
num_files_in_part = math.ceil(len(sorted_file_names) / num_parts)
found = False
sorted_file_names_new: list = list()
for i in range(num_parts):
start_index = num_files_in_part * i
end_index = num_files_in_part * (i + 1)
output_file_name_new = sorted_file_names[i] + "." + str(i)
f = merge_sorted_files(sorted_file_names[start_index:end_index],
max_open_files, output_file_name_new)
if not found:
found = f
sorted_file_names_new.append(output_file_name_new)
if not found:
return False
else:
return merge_sorted_files(sorted_file_names_new, max_open_files,
output_file_name)
else:
# Open *.sorted files into filehandle list to enable merging
filehandles: list = list()
for sorted_file_name in sorted_file_names:
fh = open(sorted_file_name, 'r')
filehandles.append(fh)
# Return False if no hypercubes are produced
if len(filehandles) == 0:
return False
# Merge sorted hypercubes into file output_file_name
with open(output_file_name, 'a') as fh:
for line in mergeiter(*filehandles):
print(line, end='', file=fh)
for fh in filehandles:
fh.close()
for sorted_file_name in sorted_file_names:
os.remove(sorted_file_name)
return True