-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathisosolve.py
executable file
·1658 lines (1610 loc) · 73.1 KB
/
isosolve.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#! /usr/bin/env python3
"""Take symbolic measure matrix and reconstruct possible isotopomer/cumomer/emu groups (as elementary as possible, idealy each group composed of only one i/c/e) for each combination of measurement methods
"""
from time import strftime, localtime, process_time as tproc
globals()["_T0"]=tproc()
def timeme(s="", dig=2):
"if a global variable TIMEME is True and another global variable _T0 is set, print current CPU time relative to _T0. This printing is preceded by a message from 's'"
if TIMEME:
if "_T0" in globals():
print(s, ":\tCPU=", round(tproc()-_T0, dig), "s", sep="")
else:
globals()["_T0"]=tproc()
TIMEME=False
import csv
import numpy as np
tol=np.finfo(float).eps*2**7 # 2.842170943040401e-14
from scipy import stats
import re
import sys, os
from io import StringIO
import argparse
import pandas as pa
from itertools import combinations, chain
from operator import itemgetter
#import isosolve
fve=os.path.join(os.path.dirname(__file__), "isosolve", "version.txt")
with open(fve, "r") as f:
ver=f.read().rstrip()
__version__=ver
from sympy import symbols, Symbol, rcollect, Matrix, factor, pprint, Eq, solve, Poly, Add, Mul, Pow, lambdify, IndexedBase, simplify, nan
from mdutils.mdutils import MdUtils
from mdutils import Html
import markdown as md
timeme("imports")
#print(sys.argv)
TEMPLATE = """<!DOCTYPE html>
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
<meta name="referrer" content="no-referrer" />
<meta name="referrer" content="unsafe-url" />
<meta name="referrer" content="origin" />
<meta name="referrer" content="no-referrer-when-downgrade" />
<meta name="referrer" content="origin-when-cross-origin" />
<title>SymNmrMs</title>
<link href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.7/css/bootstrap.min.css" rel="stylesheet">
<style>
body {
font-family: Helvetica,Arial,sans-serif;
}
code, pre {
font-family: monospace;
}
table {
border-collapse: collapse;
}
table, th, td {
border: 1px solid black;
padding: 5px;
}
th {
background-color: #e0e0e0;
text-align: center;
}
tr:nth-child(even) {background-color: #f2f2f2;}
</style>
</head>
<body>
<div class="container">
{{content}}
</div>
<hr>
<h5>Legal information</h5>
<p style="font-size: small; padding: 5px">This content is produced by <tt>IsoSolve</tt> script ({{version}}) on {{date}} in {{duration}}.<br>
The script is written by Serguei Sokol <sokol [at] insa-toulouse [dot] fr> and it is a result of a collaboration between Mathematics Cell and MetaSys team, namely Pierre Millard at TBI (Toulouse Biotechnology Institute, Bio & Chemical Engineering).<br>
The copyright notice hereafter is relative to the script IsoSolve, not to the content of this page:<br>
Copyright © 2021, INRAE/INSA/CNRS.<br>
<tt>IsoSolve</tt> is released under <a href="https://www.gnu.org/licenses/old-licenses/gpl-2.0.en.html">GPL-2 license</a>.<br>
Follow this link for <a href="https://github.com/sgsokol/isosolve/issues">issue tracking</a>.
</p>
</body>
</html>
"""
class Object(object):
"Object to which user can add fields"
pass
def isstr(s):
"return True if 's' is of type 'str'"
return type(s) == str
def useq(seq):
"return iterator of unique elements in a sequence preserving initial order"
seen=set()
return (x for x in seq if not (x in seen or seen.add(x)))
def is_intstr(s):
"check if a string 's' represents exactly an integer number"
try:
i=int(s)
except:
return False
return str(i)==s
def assign(name, value, d):
"assign a 'value' to 'name' in dictionary 'd', e.g. 'd' can be locals(). Return 'value'."
d[name] = value
return value
def revenum(x):
"reversed enumerated iterable x"
return reversed(list(enumerate(x)))
def pdiff(expr, vs):
"partial derivatives of expr by vars in vs"
return [expr.diff(v) for v in vs]
def partcomb(x, p):
"return iterator of iterator tuples ([p distinct elements of x], [the rest len(x)-p distinct elements of x])"
from itertools import combinations, compress
import numpy as np
xn=len(x)
if p > xn or p < 0:
return iter(())
elif p == xn:
return iter([(iter(x), iter(()))])
elif p == 0:
return iter([(iter(()), iter(x))])
xlog=np.zeros(xn, bool)
ix=np.array(range(xn))
for ixp in combinations(ix, p):
xlog[:]=False
xlog[list(ixp)]=True
yield (compress(x, xlog), compress(x, ~xlog))
def limi(li, negative=False):
"apply or not sign '-' to all terms of the list li"
if not negative:
return li
return [-v for v in li]
def e2li(e):
"transform expression 'e' in a nested list. Each nested level corresponds to Add, Mul and Pow arguments, Add being the highest level."
from sympy import Add, Mul, Matrix, Pow, Atom
if isinstance(e, Atom):
return [[(e,1)]]
at=[]
for v in Add.make_args(e):
mli=[]
for u in Mul.make_args(v):
if type(u) is Mul:
u=Mul.make_args(term)
else:
u=[u]
# split each factor in couples (a,b) for a^b
u=[q.args if type(q) == Pow else (q,1) for q in u]
mli.extend(u)
at.append(mli) # each add-term is a list of factors
return at
def lfact(e, esub={}, cache={}, scache={}, pr=False, nsub={}, fast=False):
"factorize terms in expression 'e'"
from sympy import Add, Mul, Matrix, Pow, Atom
#if pr:
# import pdb; pdb.set_trace()
if isinstance(e, Atom):
return e
#if str(e) in ("(a*(b + c) + d)/(a - 1)",):
# import pdb; pdb.set_trace()
cl=type(e)
if cl == Matrix:
return Matrix([[lfact(v, esub, cache, scache, pr, nsub, fast=fast) for v in li] for li in e.tolist()])
if cl != list:
res=cache.get(e, None)
if res:
try:
if nsub and np.abs(e.subs(nsub).evalf() - res.subs(nsub).evalf()) > 1.e-14:
print("num e=", e, "\nres_cache=", res)
import pdb; pdb.set_trace()
except:
print("no num e=", e, "\nres_cache=", res)
import pdb; pdb.set_trace()
if pr:
print("from cache e=", e, "\nres=", res)
return res
if cl != Pow and cl != Mul and cl != Add and cl != list:
cache[e]=e
return e
if cl == Mul:
# recursive call on cofactors
if fast:
res=Mul(*[lfact(v, esub, cache, scache, pr, nsub, fast=fast) for v in Mul.make_args(e)])
else:
res=Mul(*[lfact(v, esub, cache, scache, pr, nsub, fast=fast) for v in Mul.make_args(e.cancel())])
cache[e]=res
return res
if cl == Pow:
# call on base
res=Pow(lfact(e.args[0], esub, cache, scache, pr, nsub, fast=fast), e.args[1])
cache[e]=res
return res
if cl == list:
# already decomposed => extract the most frequent factor
# make the base unique in each factor group
ee=[]
for fli in e:
dbase={} # base dictionary: base-> power
for ft in fli:
if ft[0] in dbase:
dbase[ft[0]] += ft[1]
elif -ft[0] in dbase:
po=dbase[-ft[0]]+ft[1]
del dbase[-ft[0]]
dbase[ft[0]]=po
dbase[-1]=dbase.get(-1, 0)+1
else:
dbase[ft[0]]=ft[1]
ee.append(list(dbase.items()))
e=ee
n=len(e)
ee=Add(*[Mul(*[Pow(*v) for v in li]) for li in e])
if ee in cache:
return cache[ee]
# register factors in each add term -> cnt={fct: [indexes in at]}
cnt={}
#import pdb; pdb.set_trace()
for ia,li in enumerate(e):
for im,t in enumerate(li):
if t[0] == 1 or t[0] == -1:
#import pdb; pdb.set_trace()
continue
cnt[t[0]]=cnt.get(t[0], [])
cnt[-(t[0])]=cnt.get(-(t[0]), [])
cnt[t[0]].append((ia,im,t[1],False))
cnt[-(t[0])].append((ia,im,t[1],True))
le=[(t,len(li)) for t,li in cnt.items()]
if not le:
# only 1s
cache[ee]=ee
return ee
pw=dict((t, min(li, key=lambda i: i[2])[2]) for t,li in cnt.items()) # base-> minimal power
wh,n=max(le, key=lambda i: i[1])
cnt_wh=cnt[wh]
pw=pw[wh] # keep only minimal power
wh=Pow(wh, pw)
# indexes of added terms in fterm
iawh=set(tu[0] for tu in cnt_wh)
rest=[li for i,li in enumerate(e) if i not in iawh]
#print("wh=", wh, "; n=", n, "; cnt_wh=", cnt_wh)
if n == 1:
# nothing to factorize => check if any Add-type cofactor is in the rest of add list
ali=[Mul(*[Pow(*v) for v in ml]) for ml in e] # add list
alim=[-v for v in ali] # -ali
found=False
fminus=False
for ia,fli in enumerate(e):
if type(ali[ia]) != Mul:
continue
for ifa,(b,p) in enumerate(fli):
if type(b) != Add or p < 0:
continue
# check if b terms are in the ali (nb b cannot coincide with ali[ia] as b is at best an element of add)
ib=iainb(b.args, ali)
if ib:
found=True
break
# try -ali
ib=iainb(b.args, alim)
if ib:
found=True
fminus=True
break
if found:
break
if found:
cof=b*(Mul(*[Pow(v[0], v[1] if iv != ifa else v[1]-1) for iv,v in enumerate(fli)])+(-1 if fminus else 1))+Add(*[v for iv,v in enumerate(ali) if iv not in ib and iv != ia])
cof=rsubs(cof, esub, scache)
else:
cof=rsubs(ee, esub, scache)
#print("e=", ee, "; res=", cof)
if nsub and np.abs(ee.subs(nsub).evalf() - cof.subs(nsub).evalf()) > 1.e-14:
print("num e=", ee, "\nres_cof=", cof)
import pdb; pdb.set_trace()
if pr:
print("e=", ee, "\nres_cof=", cof)
cache[ee]=cof
return cof
# factorized term
cof=[]
for ia,im,pm,fmin in cnt_wh:
tpw=e[ia][im][1]-pw # power which rests after simplification
tpw=[(e[ia][im][0], tpw)] if tpw else [] # wh factor which rests
tmp=e[ia][:im]+tpw+e[ia][(im+1):] or [(1,1)]
tmp=Mul(*(Pow(*v) for v in tmp))
if fmin:
tmp=-tmp
cof.append(tmp)
#print("ee=", ee, "\nwh=", wh, ", cof list=", cof)
cof=rsubs(Add(*cof), esub, scache)
if type(cof) == Add:
cof=lfact(cof, esub, cache, scache, pr, nsub, fast=fast)
#print("cof factorized=", cof)
cof=cof.args if type(cof) == Pow else (cof, 1)
wh=rsubs(wh, esub, scache)
wh=wh.args if type(wh) == Pow else (wh, 1)
if wh[0] == cof[0]:
fterm=[(wh[0],wh[1]+cof[1])]
elif wh[0] == -cof[0]:
fterm=[(-1,1), (wh[0],wh[1]+cof[1])]
elif ('real' in dir(cof[0]) and cof[0] < 0) or ('could_extract_minus_sign' in dir(cof[0]) and cof[0].could_extract_minus_sign()):
if wh[0].could_extract_minus_sign():
fterm=[(-wh[0],wh[1]), (-cof[0],cof[1])]
else:
fterm=[(-1,1), wh, (-cof[0],cof[1])]
else:
if wh[0].could_extract_minus_sign():
fterm=[(-1,1), (-wh[0],wh[1]), cof]
else:
fterm=[wh, cof]
fterm=Mul(*[Pow(rsubs(v[0], esub, scache), v[1]) for v in fterm])
#print("after wh=", wh, ", cof=", cof, ", fterm=", fterm)
#print("rest=", rest, "; fterm=", fterm, "e=", e)
if rest:
#import inspect
#if len(inspect.stack()) > 100:
# print("hm, stack is too big on fterm=", fterm, "; and rest=", rest)
# if len(inspect.stack()) > 103:
# raise Exception("duh")
if fterm == 0:
# can happen after simplifications by esub
res=lfact(rest, esub, cache, scache, pr, nsub, fast=fast)
try:
if nsub and np.abs(ee.subs(nsub).evalf() - res.subs(nsub).evalf()) > 1.e-14:
print("num e=", ee, "\nres_rest0=", res)
import pdb; pdb.set_trace()
except:
print("no num e=", ee, "\nres_rest0=", res)
import pdb; pdb.set_trace()
if pr:
print("e=", ee, "\nres_rest0=", res)
cache[ee]=res
return res
else:
#import inspect
#if len(inspect.stack()) > 40:
# print("hm, stack is too big on fterm=", fterm, "\nrest=", rest, "\nee=", ee, "\nres=", Add(fterm, lfact(rest, esub, cache, scache, pr, nsub, fast=fast)))
# #raise Exception("duh")
# #import pdb; pdb.set_trace()
#if len(inspect.stack()) > 46:
# #raise Exception("duh")
# import pdb; pdb.set_trace()
res=Add(fterm, lfact(rest, esub, cache, scache, pr, nsub, fast=fast))
if res != ee and res not in lfact.estack:
res=lfact(res, esub, cache, scache, pr, nsub, fast=fast)
try:
if nsub and np.abs(ee.subs(nsub).evalf() - res.subs(nsub).evalf()) > 1.e-14:
print("num e=", ee, "\nres_rest1=", res)
import pdb; pdb.set_trace()
except:
print("no num e=", ee, "\nres_rest1=", res)
import pdb; pdb.set_trace()
if pr:
print("e=", ee, "\nres_rest1=", res)
cache[ee]=res
return res
else:
#import inspect
#if len(inspect.stack()) > 10:
# print("hm, stack is too big on fterm=", fterm, "; and e=", e)
# raise Exception("duh")
res=fterm
if nsub and np.abs(ee.subs(nsub).evalf() - res.subs(nsub).evalf()) > 1.e-14:
print("num e=", ee, "\nres_fterm=", res)
import pdb; pdb.set_trace()
if pr:
print("e=", ee, "\nres_fterm=", res)
cache[ee]=res
return res
# here e is expr => decompose in list of lists and recall lfact()
lfact.estack=getattr(lfact, "estack", {})
if e in lfact.estack:
import pdb; pdb.set_trace()
raise Exception(f"cycling call is detected for e='{e}'")
lfact.estack[e]=None
if type(esub) != type({}):
esub=dict(esub)
at=Add.make_args(e)
li=[]
if len(at) <= 1:
res=e
else:
for term in at:
li.extend(e2li(lfact(term, esub, cache, scache, pr, nsub, fast=fast)))
res=lfact(li, esub, cache, scache, pr, nsub, fast=fast)
cache[e]=res
try:
if nsub and np.abs(e.subs(nsub).evalf() - res.subs(nsub).evalf()) > 1.e-12:
print("e=", e, "\nres=", res, "\n")
print("e.num=", e.subs(nsub), "\nres.num=", res.subs(nsub), "\n")
import pdb; pdb.set_trace()
except:
print("no num e=", e, "\nres=", res, "\n")
import pdb; pdb.set_trace()
if pr:
print("e=", e, "\nres=", res, "\n")
#print("cache=", cache)
del(lfact.estack[e])
return res
def iainb(a, b):
"return a list of indexes of a in b or None if a is not strictly in b. An empty list is returned for empty a. Repeated values in a must be repeated at least the same number of times in b."
if len(a) > len(b):
return None
# dictionary of indexes in b -- {item: list_of_indexes}
dib={}
for i,v in enumerate(b):
dib[v]=dib.get(v, [])
dib[v].append(i)
ilast={}
res=[None]*len(a) # place-holder for results
for ia,x in enumerate(a):
#ib=b.index(x, ilast.get(x, 0))
if x not in dib:
return None # x is not in b at all
ilast[x]=ilast.get(x, 0)
if ilast[x] == len(dib[x]):
return None # x is more present in a than in b
res[ia]=dib[x][ilast[x]]
ilast[x] += 1
return res
def rsubs(e, d, c={}):
"restricted substitution of keys from 'd' found in 'e' by d[key]. c is an optional cache dict"
if not d:
return e
from sympy import Add, Mul, Pow, Matrix, Atom
cl=type(e)
if type(d) != type({}):
d=dict(d)
if type(e) == Matrix:
#import pdb; pdb.set_trace()
return Matrix([[rsubs(v, d, c) for v in li] for li in e.tolist()])
res=c.get(e, None)
if res:
return res
if e in d:
res=d[e]
c[e]=res
return res
elif -e in d:
res=-d[-e]
c[e]=res
return res
elif isinstance(e, Atom):
res=e
c[e]=res
return res
elif isinstance(-e, Atom):
res=e
c[e]=res
return res
elif cl == Pow:
ve=e.args
res=Pow(rsubs(ve[0], d, c), ve[1])
c[e]=res
return res
elif cl == Add or cl == Mul:
ve=list(rsubs(v, d, c) for v in cl.make_args(e))
vme=limi(ve, True)
vli=[(v,cl.make_args(k)) for k,v in d.items()]
#print("ve=", ve, ", vme=", vme, ", vli=", vli)
for v,li in vli:
ie=iainb(li, ve)
if ie:
#import pdb; pdb.set_trace()
ie=sorted(ie, reverse=True)
for i in ie:
ve.pop(i)
vme.pop(i)
ve.append(v)
vme.append(-v)
continue
ie=iainb(li, vme)
if ie:
ie=sorted(ie, reverse=True)
for i in ie:
ve.pop(i)
vme.pop(i)
#print("ie=", ie, ", ve cut=", ve)
ve.append(-v)
vme.append(v)
#print("ve new=", ve)
res=cl(*ve)
#print("e=", e, "\nres=", res, "\n")
else:
res=e
c[e]=res
return res
def setsub(l, it, v):
"in a list l set values with indexes from it to v, i.e. l[it]=v where it is iterator. v is recycled if needed"
for ii,i in enumerate(it):
l[i]=v[ii%len(v)]
def hascumo(s1, s2):
"return a cumulated string (or None if not conformant) for binary cumomers in s1 and s2. E.g. 01x+11x => x1x"
if len(s1) != len(s2) or len(s1) == 0:
return None
res=list(s1)
ndif=0
for i,c1 in enumerate(s1):
c2=s2[i]
if c1 == c2:
res[i]=c1
elif c1 in "01" and c2 in "01" and ndif == 0:
ndif += 1
res[i]="x"
else:
return None
return "".join(res)
def lico2tb(lico, mdFile=None, text_align="center"):
"transform list of columns in lico to a flat list of strings suitable for md table writing. Write md table if mdFile is set."
# check that all column have the same non zero length
if not lico:
return ""
n0=len(lico[0])
if n0 == 0:
return ""
if not all(len(li)==n0 for li in lico):
raise Exception("Not all columns have the same length in lico")
tb=list(chain(*map(list, zip(*[[str(it) for it in li] for li in lico]))))
if mdFile:
mdFile.new_table(columns=len(lico), rows=n0, text=tb, text_align=text_align)
return tb
def ech(m, d={}, c={}, sc={}, nd={}, ld={}, fast=False):
"reduce augmented matrix m to echelon form applying substitutions from d. Return echelon matrix and permutation list of col-index couples. No check is made for non zero terms in the last column in all zero rows."
if type(m) != Matrix:
raise Exception("expected Matrix type on input")
nr,nc=m.shape
mres=m.copy()
#print("mres init")
#pprint(mres)
pe=[] # column permutations
per=[] # row permutations
if nr*nc == 0 or nr*(nc-1) == 0:
return (mres, pe)
nro=min(nr, nc-1)
if not nd:
nd=dict((v, np.random.rand(1)[0]) for v in mres.free_symbols)
mn=np.array(mres[:,:-1].subs(nd)).astype(np.double)
#print("mn init=", mn)
# find pivots on numeric matrix
for i in range(nro):
#if i == 15:
# import pdb; pdb.set_trace()
# search for shortest non zero rows
nz1d=list(zip(np.count_nonzero(np.round(mn[i:,i:], 10), axis=1), np.min(np.abs(mn[i:,i:]-1), axis=1)))
io=sorted(range(len(nz1d)),key=nz1d.__getitem__)
ip=[ir for ir in io if nz1d[ir][0] != 0]
if not ip:
# non zero was not found at all => the end
break
ip=ip[0]
jp=np.nonzero(np.round(mn[i+ip,i:], 10))[0]
if jp.size > 1:
# choose the column closest to 1
jp=jp[np.argmin(np.abs(mn[i+ip,i+jp]-1))]
else:
jp=jp[0]
#print("i=", i, "ip=", ip, "jp=", jp, "mn=\n", np.round(mn, 2))
#if i == 5:
# import pdb; pdb.set_trace()
#i1=np.where(np.round(np.abs(mn[i:,i:]), 12) == 1)
#if not len(i1[0]):
# 1 was not found, search for max.abs non zero
#mi=mn[i:,i:].min()
#ma=mn[i:,i:].max()
#mima=mi if -mi > ma else ma
#if round(mima, 12) == 0:
# # non zero was not found at all => the end
# break
#i1=np.where(mn[i:,i:] == mima)
if ip != 0:
mn[[i, ip+i]]=mn[[ip+i, i]]
per.append((i, ip+i))
mres.row_swap(i, ip+i)
if jp != 0:
mn[:,[i, jp+i]]=mn[:,[jp+i, i]]
pe.append((i, jp+i))
mres.col_swap(i, jp+i)
if mn[i, i] != 1:
mn[i, i:] *= 1./mn[i, i]
for ii in range(i+1, nr):
if round(mn[ii,i], 12):
mn[ii,(i+1):] -= mn[ii,i]*mn[i,(i+1):]
#if any(np.abs(mn[i, :]) > 1.e10):
# import pdb; pdb.set_trace()
mn[i+1:,i]=0.
#print("i=", i, "mn perm=\n", np.round(mn, 2))
#print("pe=", pe, "\nper=", per)
if round(mn[i,i], 12) == 0:
i -= 1
#import pdb; pdb.set_trace()
#timeme("mn")
#import pdb; pdb.set_trace()
nr=min(i+1, nro)
mres=mres[:nr,:]
#print("mres perm=")
#pprint(mres)
for i in range(nr):
if mres[i, i] != 1:
mres[i, (i+1):] *= 1/mres[i, i]
#if any(v==nan for v in mres[i, (i+1):]):
# import pdb; pdb.set_trace()
mres[i, (i+1):]=lfact(mres[i, (i+1):], d, c, sc, fast=fast)
mres[i, i]=1
vr=mres[i, (i+1):]
#print("vr=", vr)
for ii in range(i+1, nr):
if mres[ii,i]:
mres[ii,(i+1):] -= mres[ii,i]*vr
mres[ii,(i+1):]=lfact(mres[ii,(i+1):], d, c, sc, fast=fast)
mres[i+1:, i]=Matrix.zeros(nr-i-1, 1)
#mres=lfact(mres, d, c, sc, fast=fast)
#print("mres i=", i)
#pprint(mres)
#xgr=Matrix(ld["xgr"])
#print("m*x-b=", m[:,:-1].subs(nd)*xgr-m[:,-1].subs(nd))
#print("mres*x-b=", mres[:,:-1].subs(nd)*xgr.permute(pe)-mres[:,-1].subs(nd))
return (mres, pe)
def echsol(m, *unk, pe=[], d={}, c={}, sc={}, nsub={}, fast=False):
"""solves augmented linear system in echelon form resulted from ech(...).
Unknown symbols are taken from \*unk. pe is a possible permutation list of ij-tuples from ech(...).
Return a symbolic column vector"""
if type(m) != Matrix:
raise Exception("expected Matrix type on input")
nr,nc=m.shape
sol=Matrix(range(nc-1)) # place-holder for solution
if nc == 1:
# empty matrix
return sol
m=m.copy()
#import pdb; pdb.set_trace()
if nr < nc-1:
# under-determined system
sol[nr:,0]=unk[nr:]
m[:,-1] -= m[:,nr:-1]*sol[nr:,0]
m[:,-1]=lfact(m[:,-1], d, c, sc, fast=fast)
sol[nr-1,0]=m[nr-1,-1]
for i in range(nr-2, -1, -1):
m[:(i+1),-1] -= m[:(i+1),i+1]*sol[i+1,0]
m[:(i+1),-1]=lfact(m[:(i+1),-1], d, c, sc, fast=fast)
sol[i,0]=m[i,-1]
for i,j in reversed(pe):
sol.row_swap(i,j)
return sol
def sumi2sd(covmat, i, tol=tol):
"calculate sd for a new variable defined as sum of components in 'i'"
i=np.array(i)
val=np.sum(covmat[i[:,None],i])
val=val if val >= 0. else 0. if val >= -tol else val
return np.sqrt(val)
def new_header(level=None, title="", mdFile=None):
if mdFile is None or level is None:
return
mdFile.new_line('<a name="'+title.lower().replace(" ", "-")+'"></a>')
mdFile.new_header(level=level, title=title)
return
def dhms(f, dig=2):
"Convert float number 'f' representing seconds to a 'D days Hh Mm Ss"
f=round(f, dig)
if f < 60:
return f"{f}s"
elif f < 3600:
return f"{int(f//60)}m {round(f%60, dig)}s"
elif f < 24*3600:
return f"{int(f//3600)}h {int(f%3600//60)}m {round(f%60, dig)}s"
else:
d=f//(24*3600)
return f"{int(d)} day{'s' if d > 1 else ''} {int(f%(24*3600)//3600)}h {int(f%3600//60)}m {round(f%60, dig)}s"
def main_arg():
"Wrapper for `main(li=sys.argv[1:])` call when used from executable script"
main(li=sys.argv[1:])
def main(li=None, mm=None, colsel=None, tim=None, data=None, w=None, s=None, rd=None, path=None, fast=False, vers=None, inchi=False):
"""Resolve symbolically and numerically iso-, cumo- and EMU-mers from isotope labeling measurements (NMR, MS, ...)
:param li: list of argument to be processed, typically a ``sys.argv[1:]``, cf. ``isosolve -h`` or :ref:`cli` section for parameter significance. Defaults to None.
:type li: list, optional
:param mm: mapping matrix describes symbolically the mapping of each measurements method to isotopic space. Can be a file name in tsv format or a pandas data.frame. Either this parameter must be set or the only positional argument in li. Defaults to None.
:type mm: str or pandas.FataFrame, optional
:param colsel: if str, coma separated column selection from mm. The syntax is described in ``isosolve -h``. If iterable, a collection of items describing column selection. Each item can be an integer (positive or negative for exclusion), slice or string with column name. Column numbers are 1-based (not 0-based as usual for Python). Defaults to None which is equivalent to proceeding all the columns from mm.
:type colsel: str or iterable, optional
:param tim: if True, print CPU time after some key steps. Defaults to None.
:type tim: logical, optional
:param data: if DataFrame, numeric values and sd of measurements described in mm. If str, a file name in tsv format with such data in three columns: name, value, sd. Defaults to None.
:type data: str or pandas.DataFrame, optional
:param w: if True, write formatted results in .md (plain text, MarkDown format) and .html files. The basis of file name is the same as in mm when mm is string, or ``mm`` if mm is a DataFrame. The name ``mm`` can be overwritten with path parameter (cf. hereafter). Defaults to None which is equivalent to True when used in command line and to False when used programmatically.
:type w: logical, optional
:param s: seed for pseudo random generation. When used, randomly drawn values become reproducible. Only useful for debugging or issue reporting. Defaults to None.
:type s: int, optional
:param rd: if True, use randomly generated values for checking obtained formulas. In the output HTML file, numerical values that are not coinciding are typeset in **bold**. Only useful for debugging and issue reporting. Defaults to None.
:type rd: logical, optional
:param path: directory (when ends with "/") of file name (otherwise) for formatted output. Can be useful when mm is a DataFrame. Defaults to None which is equivalent to the current working directory.
:type path: str, optional
:param fast: if True, skip calculation of elementary measurable combinations.
:type fast: logical, optional
:param vers: if True, print the version number and return without any result.
:type vers: logical, optional
:param inchi: if True, writes TSV files with International Chemical Identifiers (InChI) for isotopomers, cumomers, EMUs and elementary measurable combinations.
:type vers: logical, optional
:return: dictionary with the following entries:
:xbc: 1-column DataFrame, randomly drawn values for isotopomers;
:xv: dict, key=measurement name and value=numeric value deduced from xbc;
:am: sympy Matrix, augmented matrix of system equations. The last column is the right hand side;
:rdn_mes: list, names of redundant measurements;
:smm: DataFrame, mapping matrix after column selection;
:mmd: dict, key=measurement name, value=numpy array with isotopomer indexes mapped onto this measurements;
:uvals: dict, key=method name (i.e. column name in mm), value=list of unique measurement names involved in this method;
:sy_meas: list, symbolic equations defining measurements;
:sbc: dict, key=isotopomer name (i.e. row name in mm), value=isotopomer symbolic variable;
:sols: list, symbolic solution for isotopomers. The order is the same as in mm;
:vsubs: dict, key=full symbolic expression, value=simplified symbolic expression. Used for simplification procedures;
:metrics: dict, the fields are:
:idef: list, defined isotopomer indexes;
:idef_c: set, indexes of defined cumomers;
:idef_e: dict, key=EMU name, value=set of defined M+i mass isotopologues in this EMU;
:emusol: dict, key=EMU name, value=list of symbolic solutions for M+i;
:cusol: dict, key=cumomer name, value=symbolic solution;
:ls: dict, least square solution. The fields are:
:iso: list, isotopomer values and sd;
:cov: isotopomer covariance matrix;
:cumo: list, cumomer values and sd;
:emu: list of dicts. Each dict has a key=EMU name and value=list of values (the first dict) or sd (the second one);
:mec: list, measurable elementary combinations values and sd;
:inchi: dict, InChI information. The fields are:
:iso: DataFrame for isotopomers;
:cumo: DataFrame for cumomers;
:emu: DataFrame for EMUs;
:mcomb: DataFrame for measurable elementary combinations;
:rtype: dict
"""
#print("li=", li)
#timeme("init")
globals()["_T0"]=tproc()
if vers:
print(ver)
return
if type(li) == list:
class DefaultHelpParser(argparse.ArgumentParser):
def error(self, message):
sys.stderr.write('error: %s\n' % message)
self.print_help()
sys.exit(2)
parser = DefaultHelpParser(description='calculate isotopmer/cumomer/EMU expressions (symbolic and optionally numeric) by a combination of isotope labeling measurements (NMR, MS, ...)', formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument('mm',
help="""file name (tsv) providing measure matrix which is organized as follows:
- each row corresponds to a given binary cumomer, e.g. '010x'
- each column corresponds to a given experiment type, e.g. 'HSQC-C_beta'
- each cell contains a variable name, e.g. 'a', 'b' etc. or empty, i.e. NA
- each variable name can appear multiple times in a given
column but not in different columns. If a variable appears
in several rows, these rows are considered as mixed in
equal parts in a given measurement method. E.g. in HSQC-C_beta,
contributions of patterns '001x' and '101x' are mixed
in one variable, say 'k', and contributions of '011x'
and '111x' are mixed in another one, say 'l'. Other rows
in this column must be left empty as they don't
contribute to any measurement.""")
parser.add_argument("-c", "--colsel", help="""column selection (full set by default).
Can be a slice, comma separated list of integers or names,
regular expression, or a mix of all this. In a slice,
a negative number means counting from the end. In a list,
if given negative numbers or names starting with '-',
the corresponding columns are excluded from treatment.
Names can be given as Python regular expressions.
The order of column selection is irrelevant.
Examples:
'1:3' - first 3 columns;
':3' - the same;
':-1' - all columns but the last;
'2::2' - even columns;
'1,3,6' - first, third and sixth columns;
'HSQC.*' - columns with names started by 'HSQC';
'-HN.*' - all columns except starting with 'HN'""")
parser.add_argument("-t", "--TIMEME", help="activate or not (default) CPU time printing. Useful only for debugging or issue reporting.", action='store_true')
parser.add_argument("-d", "--data", help="file name with 3 columns: name, value and sd. Numeric values in columns 'value', and 'sd' must be non-negative and positive respectively. Fields are tab-separated, comments start with sharp-sign '#'")
parser.add_argument("-w", "--write", help="force .md and .html file writing even in non CLI mode", action='store_true')
parser.add_argument("-s", "--seed", help="integer value used as a seed for pseudo-random drawing. Useful only for debugging or issue reporting.", type=int)
parser.add_argument("-r", "--rand", help="make random draws for numerical tests of formulas. Useful only for debugging or issue reporting.", action='store_true')
parser.add_argument("-p", "--path", help="path for .md and html files. If it ends by '/' it is interpreted as a directory path which is created if nonexistent. Otherwise, it is interpreted as a base before .md and .html extensions. If not given, .md and .html are written in the same directory with the same basename (before the extension) as 'MM' file ")
parser.add_argument("-f", "--fast", help="skip calculations of measurable combinations", action='store_true')
parser.add_argument("-i", "--inchi", help="write InChI files", action='store_true')
parser.add_argument("-v", "--version", help="print version number on stdout and exit. Useful only for debugging or issue reporting.", action='version', version=f'%(prog)s {ver}')
args = parser.parse_args(li)
fmm=args.mm
fdata=args.data
jc=args.colsel
wri=args.write or __name__ == "__main__" or __name__[:8] == "isosolve"
np.random.seed(args.seed)
globals()["TIMEME"]=args.TIMEME
rdraw=args.rand
path=args.path
fast=args.fast
inchi=args.inchi
#timeme("parser")
# overwrite options with explicit parameters
alist=[]
smm=None
dmm=None
if type(mm) in (str, pa.core.frame.DataFrame) or not li:
if isstr(mm):
fmm=mm
alist.append(f"mm='{mm}'")
smm=pa.read_csv(fmm, delimiter="\t", comment="#", dtype=str)
smm.set_index(smm.columns[0], inplace=True)
elif type(mm) == pa.core.frame.DataFrame:
# mm is supposed to be data.frame
fmm="mm.par"
smm=mm
alist.append(f"mm='data.frame {'x'.join(str(v) for v in smm.shape)}'")
elif type(li) == list and len(li) == 0:
parser.print_help()
return 1
else:
raise Exception(f"unknown type for 'mm'. Expected 'str' or pandas data.frame got {type(mm)}")
if li and smm is None:
smm=pa.read_csv(fmm, delimiter="\t", comment="#", dtype=str)
smm.set_index(smm.columns[0], inplace=True)
if colsel or not li:
jc=colsel
alist.append(f"colsel='{colsel}'")
if not tim is None or not li:
globals()["TIMEME"]=tim
alist.append(f"tim={tim}")
# get numeric measurement matrix
if type(data) in (str, pa.core.frame.DataFrame) or not li:
if isstr(data):
fdata=data
alist.append(f"data='{data}'")
dmm=pa.read_csv(fdata, delimiter="\t", comment="#")
elif type(data) == pa.core.frame.DataFrame:
# data is supposed to be data.frame
fdata="mm.data"
dmm=data
alist.append(f"data='data.frame {'x'.join(str(v) for v in dmm.shape)}'")
elif data is None:
fdata=None
else:
raise Exception(f"unknown type for 'data'. Expected None, 'str' or pandas data.frame got {type(data)}")
if li and dmm is None and fdata:
dmm=pa.read_csv(fdata, delimiter="\t", comment="#")
if not w is None or not li:
wri=w
alist.append(f"w={w}")
if s or not li:
np.random.seed(s)
alist.append(f"s={s}")
if rd or not li:
rdraw=rd
alist.append(f"rd={rd}")
# md: prepare markdown output
if wri or inchi:
nm_md=".".join(fmm.split(".")[:-1])
if path:
os.makedirs(path, exist_ok=True)
if path[-1:] == "/":
nm_md=os.path.join(path, os.path.basename(nm_md))
else:
nm_md=path
if wri:
mdFile = MdUtils(file_name=nm_md, title="Symbolic Resolution of Labeled Measurements")
new_header(level=1, title='SymNmrMs', mdFile=mdFile)
# md: h2 problem setup
new_header(level=2, title='Problem setup', mdFile=mdFile)
# md: h3 arguments
if li:
new_header(level=3, title='Used arguments', mdFile=mdFile)
mdFile.insert_code(" ".join("'"+v+"'" for v in li))
if alist:
new_header(level=3, title='Explicit Parameters', mdFile=mdFile)
mdFile.insert_code(" ".join(alist))
# get symbolic measurement matrix
nr,nc=smm.shape
nm_bc=smm.index
nm_exp=smm.columns
if fdata:
if dmm.shape[1] != 3:
raise Exception(f"Expected 3 columns in data file '{fdata}' instead got {dmm.shape[1]}")
dmm.set_index(dmm.columns[0], inplace=True)
dmm.columns=["value", "sd"]
ljc=[]
allj=list(range(nc))
if isstr(jc) and jc:
# jc can be like "1:3,4,-1,:5,:-2,-MS.*"
tmp={}
ljc=[tmp["vs"] for v in jc.split(",") if assign("vs", v.strip(), tmp)]
elif jc:
ljc=[str(v) for v in jc]
if ljc:
# get litteral column names and stripe them off
jc=[nm_exp.get_loc(ljc.pop(i)) for i,v in revenum(ljc) if v in nm_exp]
# get regexp column names and strip them off
for i,v in revenum(ljc.copy()):
# list of matched columns by v
lima=[nm_exp.get_loc(vma) for vma in nm_exp if re.match(v, vma)]
jc += lima
if lima:
ljc.pop(i)
# get integers convertible and in good range
jc += [int(ljc.pop(i))-1 for i,v in revenum(ljc) if is_intstr(v) and int(v) > 0 and int(v) <= nc]
# get slices
for i,it in revenum(ljc):
if it.count(":") == 0:
continue
# get begin, end, step of the slice
bes=it.split(":")
if len(bes) > 3:
raise Exception(f"slice '{it}' is badly formed: too many ':'")
s="" if len(bes) < 3 else bes[2]
b,e=bes[:2]
try:
b=int(b) if b else 0
e=int(e) if e else nc
s=int(s) if s else 1
except:
raise Exception(f"slice '{it}' is badly formed: not integer values")
if b > 0:
b -= 1
li=allj[slice(b, e, s)]
jc += li
ljc.pop(i)
jc=sorted(set(jc))
if not jc:
jc=allj # default: all
# remove "negative" litteral, regexp and integer indexes
for i,it in revenum(ljc):
if not it.startswith("-"):
continue
#import pdb; pdb.set_trace()
it=it[1:]
# litteral
if it in nm_exp:
j=nm_exp.get_loc(it)
if j in jc:
jc.remove(j)
ljc.pop(i)
continue
# regexp
lima=[nm_exp.get_loc(vma) for vma in nm_exp if re.match(it, vma)]
for j in lima:
if j in jc:
jc.remove(j)
if lima:
ljc.pop(i)
continue
# integer
if is_intstr(it):
j=int(it)-1
if j in jc:
jc.remove(j)
ljc.pop(i)
continue
else:
jc=allj
if ljc:
sys.stderr.write(f"Warning: following identifiers were not used as not matching any column:\n\t"+"\n\t".join(ljc)+"\n")
smm=smm.iloc[:,jc]
nr,nc=smm.shape
nm_bc=smm.index
nm_exp=smm.columns
flen=len(nm_bc[0]) # total fragment length
clen=nm_bc[0].count("1")+nm_bc[0].count("0") # account for 01 atoms
if nr != 2**clen:
raise Exception(f"not sufficient row number in '{fmm}': expected {2**clen}, got {nr}")
# unique values per column -> dict uvals
uvals=dict((nm,list(v for v in useq(col) if isstr(v))) for nm,col in smm.items())
# measurement name to column translator
v2col=dict((v, nm) for nm,li in uvals.items() for v in li)
# make dict measure matrix mmd: 'a'->integer indexes of '010x',...
mmd=dict()
for nmc, col in smm.items():
for v in uvals[nmc]:
if v in mmd:
raise Exception(f"variable '{v}' appears in different columns")
mmd[v]=np.where(col==v)[0]
# MS list of bc indexes
# cumomer list of bc indexes
i01=np.where([c=="1" or c=="0" for c in nm_bc[0]])[0]