-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathhamiltonian.py
211 lines (170 loc) · 7.17 KB
/
hamiltonian.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
from typing import Tuple, Callable
import tensorflow as tf
import numpy as np
from tensorflow.keras import Model
from tensorflow.keras.layers import Input
from tensorflow import Tensor, Variable
from utils import _isin, _mlp_models
class Hamiltonian:
integrate_methods = ['leapfrog', 'euler']
def __init__(self, dim, kinetic: Tuple[int] or Model = (128, 128), potential: Tuple[int] or Model = (128, 128)):
"""
Separable Hamiltonian (kinetic and potential energies) and associated methods.
Parameters
----------
dim: Dimension of the input.
kinetic, potential: Models for the energies. If a tuple istructuress passed, will be a MLP with the associated number
of layers and units.
"""
self.kinetic = _mlp_models(dim, kinetic, "softplus", name="Kinetic")
self.potential = _mlp_models(dim, potential, "softplus", name="Potential")
self.dim = dim
def __call__(self, q: Tensor, p: Tensor, *args, **kwargs) -> Tensor:
"""
Returns value of the Hamiltonian in a given state.
Parameters
----------
q: value of the position, for the potential energy.
p: value of the momentum, for the kinetic energy.
"""
return self.potential(q) + self.kinetic(p)
@property
def trainable_variables(self):
return self.kinetic.trainable_variables + self.potential.trainable_variables
# Really important decorator here! Otherwise, this is a python object. Then, when passed to leapfrog integration,
# it will make retracing necessary at each step.
@tf.function
def grad_potential(self, q: Variable) -> Tensor:
"""Returns the gradient of the potential. Needed for integration."""
with tf.GradientTape() as t:
# Watch is necessary in case q is a Tensor (and not a Variable). Passing a Tensor without watch result in
# None result.
t.watch(q)
potential = self.potential(q)
return t.gradient(potential, q)
@tf.function
def grad_kinetic(self, p: Variable) -> Tensor:
"""Returns the gradient of the kinetic term. Needed for integration."""
with tf.GradientTape() as t:
t.watch(p)
kinetic = self.kinetic(p)
return t.gradient(kinetic, p)
def integrate(self, q: Variable, p: Variable, n_steps: int, eps: float, forward, method="leapfrog"):
"""
Perform integration of a state, according to Hamiltonian dynamics.
Parameters
----------
q, p:
The current state (position, momentum).
n_steps:
The number of integration step to perform.
eps:
The size of each step.
forward:
Whether to perform a *forward* integration or a *backward* one, thanks to the reversibility of
Hamiltonians dynamics.
method: {"leapfrog", "euler"}
Which numerical scheme to use. Leapfrog has eps**3 local error and eps**2 global error.
Euler has eps**2, eps errors.
Returns
-------
q, p:
Tuple of new state: position, momentum.
"""
_isin(method, Hamiltonian.integrate_methods)
# Again, casting hyperparameters to tf.Tensor is crucial here. Otherwise, leapfrog will retrace at each
# change of arguments.
# Also necessary to cast for avoiding compatibility issues; even if float64 by default with the tf.constant(...)
# constructor
eps = tf.constant(eps, dtype='float32')
params = dict(q=q, p=p,
n_steps=tf.constant(n_steps),
eps=eps,
forward=tf.constant(forward),
grad_func_q=self.grad_potential,
grad_func_p=self.grad_kinetic)
if method == "leapfrog":
return _leapfrog_integration(**params)
elif method == "euler":
return _euler_integration(**params)
@tf.function
def _leapfrog_integration(q: Tensor, p: Tensor, n_steps: int, eps: float, forward: bool,
grad_func_q: Callable[[Tensor], Tensor],
grad_func_p: Callable[[Tensor], Tensor]):
# TODO: for now, retrace at each change of batch_size for (q,p)...
print(f"Tracing with {q}, {p}, {grad_func_q}, {grad_func_p}")
if not forward:
eps = - eps
# Make half step for the momentum
p = p - eps / 2 * grad_func_q(q)
for i in range(n_steps):
# Full step of position
q = q + eps * grad_func_p(p)
# Full step of momentum, except at the end
if i != n_steps-1:
p = p - eps * grad_func_q(q)
# Make half step of momentum
p = p - eps / 2 * grad_func_q(q)
return q, p
@tf.function
def _euler_integration(q: Tensor, p: Tensor, n_steps: int, eps: float, forward: bool,
grad_func_q: Callable[[Tensor], Tensor],
grad_func_p: Callable[[Tensor], Tensor]):
print(f"Tracing with {q}, {p}, {grad_func_q}, {grad_func_p}")
if not forward:
eps = - eps
for i in range(n_steps):
q = q + eps * grad_func_p(p)
p = p - eps * grad_func_q(q)
return q, p
def test_integration(method='leapfrog'):
"""
Test integration on a simple example:
U(q) = q^2/2
K(p) = p^2/2
3 plots, side by side. Forward motion (in blue) then backward motion (in red).
"""
n_steps = 20
list_eps = [0.3, 0.8, 1.5]
initial_p = tf.ones(1)
initial_q = tf.zeros(1)
simulations = np.zeros((3, 2*n_steps, 2))
energies = np.zeros((3, 2*n_steps))
def square_layer():
x = Input(shape=(1, ))
return Model(inputs=x, outputs=tf.square(x) / 2)
hamiltonian = Hamiltonian(1, kinetic=square_layer(), potential=square_layer())
for r, e, eps in zip(simulations, energies, list_eps):
q, p = initial_q, initial_p
for i in range(n_steps):
q, p = hamiltonian.integrate(q, p, n_steps=1,
eps=eps, forward=True, method=method)
r[i] = q.numpy(), p.numpy()
e[i] = hamiltonian(q, p)
for i in range(n_steps):
q, p = hamiltonian.integrate(q, p, n_steps=1, eps=eps, forward=False, method=method)
r[n_steps+i] = q.numpy(), p.numpy()
e[n_steps+i] = hamiltonian(q, p)
import matplotlib.pyplot as plt
plt.style.use('ggplot')
fig, axes = plt.subplots(2, 3)
baseline = np.linspace(0, 1, 1000)
baseline_x = np.cos(baseline * 2 * np.pi)
baseline_y = np.sin(baseline * 2 * np.pi)
ax: plt.Axes
fig: plt.Figure
for r, eps, ax in zip(simulations, list_eps, axes[0]):
ax.plot(r[:n_steps, 0], r[:n_steps, 1], 'b+-')
ax.plot(r[n_steps:, 0], r[n_steps:, 1], 'r+-')
ax.plot(baseline_x, baseline_y, 'k')
ax.set_aspect('equal')
ax.set_title(f'$\epsilon = {eps}$')
for e, ax in zip(energies, axes[1]):
ax.plot(e)
ax.axhline(e[0], linestyle="--", color="k")
ax.axvline(n_steps, linestyle="-.", color="k")
fig.suptitle("$n_{steps} = %d$" % n_steps)
plt.show()
return simulations
if __name__ == '__main__':
result = test_integration(method="euler")