-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathajcs.py
253 lines (195 loc) · 7.51 KB
/
ajcs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
"""
sparse implementation of the Galerkin discretization to the AJC as in the preprint.
also provides finite hitting time computations via the space-time committor approach.
"""
import scipy.sparse as sp
import numpy as np
from scipy.sparse.linalg import spsolve
from copy import deepcopy
from scipy.optimize import minimize
class AJCS():
""" sparse implementation of the Galerkin discretization to the AJC as in the preprint
uses the """
def __init__(self, Q, dt):
self.nx = Q[0].shape[0]
self.nt = len(dt)
self.nxt = self.nx * self.nt
self.Q = Q
self.dt = np.array(dt)
self.compute()
def compute(self):
self.qt, self.qi = self.qtilde(self.Q)
self.k, self.H, self.S = self.jumpkernel(self.qt, self.qi, self.dt)
@staticmethod
def jumpkernel(qt, qi, dt):
""" compute the galerkin discretization of the jump kernel eq[50] """
nx, nt = np.shape(qi)
s = np.exp(-qi * dt[None, :]) # (nx, nt)
H = np.zeros((nx, nt, nt))
for i in range(nx):
for k in range(nt):
prod = np.append(1, np.cumprod(s[i, k+1:-1]))
H[i, k, k+1:] = (1-s[i, k]) * (1-s[i, k+1:]) * prod
H[i, k, k] = s[i, k] + dt[k] * qi[i, k] - 1
#J = np.einsum('k, ijl, ik, ikl -> ikjl', 1/dt, qt, 1/qi, H)
J = np.empty((nt, nt), dtype=object)
S = np.zeros((nt, nt, nx))
for k in range(nt):
for l in range(nt):
S[k, l, :] = (1/dt[k]) * H[:, k, l] / (qi[:, k])
J[k, l] = sp.diags(S[k, l, :]).dot(qt[l])
# is csr_scale_rows faster?
return J, H, S
@staticmethod
def qtilde(Qs):
""" given an array of rate matrices compute the jump chain matrix
qt[i,j,t] = q^tilde_ij(t) : the row normalized jump matrix [eq. 14]
qi[i,t] = q_i(t): the outflow rate [eq. 6]
"""
qt = [sp.dok_matrix(Q) for Q in Qs]
nt = len(qt)
nx = qt[0].shape[0]
qi = np.zeros((nt, nx))
for k in range(nt):
qt[k][range(nx), range(nx)] = 0
qi[k, :] = qt[k].sum(axis=1).A[:, 0]
qt[k] = sp.diags(1/qi[k, :]).dot(qt[k])
z = np.where(qi[k, :] == 0) # special case q_i = 0 => q_ij = kron_ij
qt[k][z[0], :] = 0
qt[k][z[0], z[0]] = 1
return qt, qi.T
@property
def km(self):
if not hasattr(self, "_km"):
self._km = flatten_spacetime(self.k) # TODO: find a good way to deal with differing representations
return self._km
# TODO: compute this without falling back to the kernel matrix
def jump(self, p):
return np.reshape(p.flatten() @ self.km, p.shape)
def geiger(self, p, n=100):
g = np.copy(p)
pp = p
for i in range(n):
pp = self.jump(pp)
g = g + pp
return g
def holding_probs(self):
S = (1 - np.cumsum(self.S, axis=1))
S = np.moveaxis(np.triu(np.moveaxis(S, 2, 0)), 0, 2)
return S
def synchronize(self, p):
return np.einsum('sti, is -> it', self.holding_probs(), p)
### SOLVERS FOR THE LINEAR SYSTEM
class AugmentedSolver:
""" methods for solving the augmented linear system
Ax = b where A is upper triangular block
where K[i,j] containing the blocks and b[i] the corresponding RHS """
def __init__(self, K):
self.K = K
self.nt = np.size(K, axis=0)
self.nx = np.size(K[0,0], axis=0)
@classmethod
def backwardsolve(self, K, b):
nt = np.size(K, axis=0)
b = b.copy()
q = np.zeros(np.shape(b))
for s in range(nt)[::-1]:
for t in np.arange(s, nt)[::-1]:
if s < t:
b[s] -= K[s, t] @ q[t]
elif s == t:
q[s] = spsolve(K[s, s], b[s])
return q
def koopman_system_one(self, i):
""" solve the koopman system for a single target state """
nx, nt = self.nx, self.nt
K = - deepcopy(self.k)
b = np.zeros((nt, nx))
S = self.holding_probs()[:,-1,:]
for s in range(nt):
K[s,s] += sp.identity(nx)
b[s][i] = S[s,i]
K[-1,-1] = sp.identity(nx)
b[-1][:] = 0
b[-1][i] = 1
q = self.backwardsolve(K, b)
return q
def koopman_system_iterated(self):
""" solve the koopman system for a all target states by solving individually for each """
nx = self.nx
K = np.zeros((nx, nx))
for i in range(nx):
K[:, i] = self.koopman_system_one(i)[0,:]
return K
def koopman_system(self):
return self.koopman_system_all()[0,:,:]
def koopman_system_all(self):
""" solve the koopman system for all target states together """
nx, nt = self.nx, self.nt
K = - deepcopy(self.k)
b = np.zeros((nt, nx, nx))
S = self.holding_probs()[:,-1,:]
for s in range(nt):
K[s,s] += sp.identity(nx)
b[s] = np.diag(S[s, :])
K[-1,-1] = sp.identity(nx)
q = self.backwardsolve(K, b)
return q
""" HITTING PROBABILITIES """
def finite_time_hitting_prob(self, state):
""" Compute the probability to hit a given state n_state over the
time horizon of the jump chain from any space-time point by solving
Kp = p in C, p=1 in C'
where C' is the time-fibre of n_state and C the rest """
nx, nt = self.nx, self.nt
K = deepcopy(self.k)
b = np.zeros((nt, nx))
# Kp - p = 0 in C
for s in range(nt):
for t in np.arange(s, nt):
K[s, t][state, :] = 0
K[s, s] = K[s, s] - sp.identity(nx)
K[s, s][state, state] = 1
b[s, state] = 1
q = self.backwardsolve(K, b)
return q
def finite_time_hitting_probs(self):
""" finite_time_hitting_probs[i,j] is the probability to hit state j starting in i in the time window of the process """
# TODO: check if this is indeed the ordering
return np.vstack([self.finite_time_hitting_prob(i)[0,:] for i in range(self.nx)]).T
def optimize(self, iters=100, penalty=0, x0=None, adaptive=False):
""" gradient free optimization of the minimal finite time hitting prob """
# TODO: assert identical sparsity structures
j = self
jp = deepcopy(self)
if x0 is None:
x0 = np.zeros_like(j.Q[0].data)
def obj(x):
self.lastx = x
for t in range(j.nt):
#jp.Q[t].data = np.maximum(j.Q[t].data + x, 0) # TODO: ignore diagonal
jp.Q[t].data = j.Q[t].data + x
jp.compute()
o = - min(jp.finite_time_hitting_probs())
op = o + np.sum(np.abs(x)) ** 2 * penalty
print(op)
return op
# q = sqrt (exp -bU / exp -bU)
res = minimize(obj, x0=x0, method='nelder-mead', options={'maxiter': iters, 'adaptive': adaptive})
obj(res.x)
return res, jp
# TODO: this should also work for vectors
# we might want something to convert between stacked K[s,t][i,j], flattened [st, ij] and tensor [s,i,t,j]
def flatten_spacetime(tensor):
ns, nt = tensor.shape
for s in range(ns):
for t in range(nt):
if t == 0:
row = tensor[s, t]
else:
row = sp.hstack((row, tensor[s, t]))
if s == 0:
M = row
else:
M = sp.vstack((M, row))
return M