"""
.. module:: sde.py
:synopsis: Numerical integration techniques
.. moduleauthor:: Jonathan Gross <jarthurgross@gmail.com>
"""
import numpy as np
[docs]def euler(drift_fn, diffusion_fn, X0, ts, Us):
r"""Integrate a system of ordinary stochastic differential equations subject
to scalar noise:
.. math::
d\vec{X}=\vec{a}(\vec{X},t)\,dt+\vec{b}(\vec{X},t)\,dW_t
Uses the Euler method:
.. math::
\vec{X}_{i+1}=\vec{X}_i+\vec{a}(\vec{X}_i,t_i)\Delta t_i+
\vec{b}(\vec{X}_i,t_i)\Delta W_i
where :math:`\Delta W_i=U_i\sqrt{\Delta t}`, :math:`U` being a normally
distributed random variable with mean 0 and variance 1.
:param drift_fn: Computes the drift coefficient
:math:`\vec{a}(\vec{X},t)`
:type drift_fn: callable(X, t)
:param diffusion_fn: Computes the diffusion coefficient
:math:`\vec{b}(\vec{X},t)`
:type diffusion_fn: callable(X, t)
:param X0: Initial condition on X
:type X0: array
:param ts: A sequence of time points for which to solve for X.
The initial value point should be the first element
of this sequence.
:type ts: array
:param Us: Normalized Weiner increments for each time step
(i.e. samples from a Gaussian distribution with mean
0 and variance 1).
:type Us: array, shape=(len(t) - 1)
:return: Array containing the value of X for each desired
time in t, with the initial value `X0` in the first
row.
:rtype: numpy.array, shape=(len(ts), len(X0))
"""
dts = [tf - ti for tf, ti in zip(ts[1:], ts[:-1])]
# Scale the Weiner increments to the time increments.
sqrtdts = np.sqrt(dts)
dWs = np.product(np.array([sqrtdts, Us]), axis=0)
X = np.array([X0])
for t, dt, dW in zip(ts[:-1], dts, dWs):
X = np.vstack((X, X[-1] + drift_fn(X[-1], t)*dt +
diffusion_fn(X[-1], t)*dW))
return X
[docs]def meas_euler(drift_fn, diffusion_fn, dW_fn, X0, ts, dMs):
r"""Integrate a system of ordinary stochastic differential equations
conditioned on an incremental measurement record:
.. math::
d\vec{X}=\vec{a}(\vec{X},t)\,dt+\vec{b}(\vec{X},t)\,dW_t
Uses the Euler method:
.. math::
\vec{X}_{i+1}=\vec{X}_i+\vec{a}(\vec{X}_i,t_i)\Delta t_i+
\vec{b}(\vec{X}_i,t_i)\Delta W_i
where :math:`\Delta W_i=f(\Delta M_i,\vec{X}, t)`, :math:`\Delta M_i` being
the incremental measurement record being used to drive the SDE.
:param drift_fn: Computes the drift coefficient
:math:`\vec{a}(\vec{X},t)`
:type drift_fn: callable(X, t)
:param diffusion_fn: Computes the diffusion coefficient
:math:`\vec{b}(\vec{X},t)`
:type diffusion_fn: callable(X, t)
:param dW_fn: The function that converts the incremental
measurement and current state to the Wiener
increment.
:type dW_fn: callable(dM, dt, X, t)
:param X0: Initial condition on X
:type X0: array
:param ts: A sequence of time points for which to solve for X.
The initial value point should be the first element
of this sequence.
:type ts: array
:param dMs: Incremental measurement outcomes used to drive the
SDE.
:type dMs: array, shape=(len(t) - 1)
:return: Array containing the value of X for each desired
time in t, with the initial value `X0` in the first
row.
:rtype: numpy.array, shape=(len(ts), len(X0))
"""
dts = [tf - ti for tf, ti in zip(ts[1:], ts[:-1])]
X = np.array([X0])
for t, dt, dM in zip(ts[:-1], dts, dMs):
dW = dW_fn(dM, dt, X[-1], t)
X = np.vstack((X, X[-1] + drift_fn(X[-1], t)*dt +
diffusion_fn(X[-1], t)*dW))
return X
[docs]def milstein(drift, diffusion, b_dx_b, X0, ts, Us):
r"""Integrate a system of ordinary stochastic differential equations subject
to scalar noise:
.. math::
d\vec{X}=\vec{a}(\vec{X},t)\,dt+\vec{b}(\vec{X},t)\,dW_t
Uses the Milstein method:
.. math::
\vec{X}_{i+1}=\vec{X}_i+\vec{a}(\vec{X}_i,t_i)\Delta t_i+
\vec{b}(\vec{X}_i,t_i)\Delta W_i+
\frac{1}{2}\left(\vec{b}(\vec{X}_i,t_i)\cdot\vec{\nabla}_{\vec{X}}\right)
\vec{b}(\vec{X}_i,t_i)\left((\Delta W_i)^2-\Delta t_i\right)
where :math:`\Delta W_i=U_i\sqrt{\Delta t}`, :math:`U` being a normally
distributed random variable with mean 0 and variance 1.
:param drift: Computes the drift coefficient
:math:`\vec{a}(\vec{X},t)`
:type drift: callable(X, t)
:param diffusion: Computes the diffusion coefficient
:math:`\vec{b}(\vec{X},t)`
:type diffusion: callable(X, t)
:param b_dx_b: Computes the correction coefficient
:math:`\left(\vec{b}(\vec{X},t)\cdot
\vec{\nabla}_{\vec{X}}\right)\vec{b}(\vec{X},t)`
:type b_dx_b: callable(X, t)
:param X0: Initial condition on X
:type X0: array
:param ts: A sequence of time points for which to solve for X.
The initial value point should be the first element
of this sequence.
:type ts: array
:param Us: Normalized Weiner increments for each time step
(i.e. samples from a Gaussian distribution with mean
0 and variance 1).
:type Us: array, shape=(len(t) - 1)
:return: Array containing the value of X for each desired
time in t, with the initial value `X0` in the first
row.
:rtype: numpy.array, shape=(len(ts), len(X0))
"""
dts = [tf - ti for tf, ti in zip(ts[1:], ts[:-1])]
# Scale the Weiner increments to the time increments.
sqrtdts = np.sqrt(dts)
dWs = np.product(np.array([sqrtdts, Us]), axis=0)
X = np.array([X0])
for t, dt, dW in zip(ts[:-1], dts, dWs):
X = np.vstack((X, X[-1] + drift(X[-1], t)*dt + diffusion(X[-1], t)*dW +
b_dx_b(X[-1], t)*(dW**2 - dt)/2))
return X
[docs]def meas_milstein(drift_fn, diffusion_fn, b_dx_b_fn, dW_fn, X0, ts, dMs):
r"""Integrate a system of ordinary stochastic differential equations
conditioned on an incremental measurement record:
.. math::
d\vec{X}=\vec{a}(\vec{X},t)\,dt+\vec{b}(\vec{X},t)\,dW_t
Uses the Milstein method:
.. math::
\vec{X}_{i+1}=\vec{X}_i+\vec{a}(\vec{X}_i,t_i)\Delta t_i+
\vec{b}(\vec{X}_i,t_i)\Delta W_i+
\frac{1}{2}\left(\vec{b}(\vec{X}_i,t_i)\cdot\vec{\nabla}_{\vec{X}}\right)
\vec{b}(\vec{X}_i,t_i)\left((\Delta W_i)^2-\Delta t_i\right)
where :math:`\Delta W_i=f(\Delta M_i,\vec{X}, t)`, :math:`\Delta M_i` being
the incremental measurement record being used to drive the SDE.
:param drift_fn: Computes the drift coefficient
:math:`\vec{a}(\vec{X},t)`
:type drift_fn: callable(X, t)
:param diffusion_fn: Computes the diffusion coefficient
:math:`\vec{b}(\vec{X},t)`
:type diffusion_fn: callable(X, t)
:param b_dx_b_fn: Computes the correction coefficient
:math:`\left(\vec{b}(\vec{X},t)\cdot
\vec{\nabla}_{\vec{X}}\right)\vec{b}(\vec{X},t)`
:type b_dx_b_fn: callable(X, t)
:param dW_fn: The function that converts the incremental
measurement and current state to the Wiener
increment.
:type dW_fn: callable(dM, dt, X, t)
:param X0: Initial condition on X
:type X0: array
:param ts: A sequence of time points for which to solve for X.
The initial value point should be the first element
of this sequence.
:type ts: array
:param dMs: Incremental measurement outcomes used to drive the
SDE.
:type dMs: array, shape=(len(t) - 1)
:return: Array containing the value of X for each desired
time in t, with the initial value `X0` in the first
row.
:rtype: numpy.array, shape=(len(ts), len(X0))
"""
dts = [tf - ti for tf, ti in zip(ts[1:], ts[:-1])]
X = np.array([X0])
for t, dt, dM in zip(ts[:-1], dts, dMs):
dW = dW_fn(dM, dt, X[-1], t)
X = np.vstack((X, X[-1] + drift_fn(X[-1], t)*dt +
diffusion_fn(X[-1], t)*dW +
b_dx_b_fn(X[-1], t)*(dW**2 - dt)/2))
return X
[docs]def time_ind_taylor_1_5(drift, diffusion, b_dx_b, b_dx_a, a_dx_b, a_dx_a,
b_dx_b_dx_b, b_b_dx_dx_b, b_b_dx_dx_a,
X0, ts, U1s, U2s):
r"""Integrate a system of ordinary stochastic differential equations with
time-independent coefficients subject to scalar noise:
.. math::
d\vec{X}=\vec{a}(\vec{X})\,dt+\vec{b}(\vec{X})\,dW_t
Uses an order 1.5 Taylor method:
.. math::
\begin{align}
\rho^\mu_{i+1}&=\rho^\mu_i+a^\mu_i\Delta t_i+
b^\mu_i\Delta W_i+\frac{1}{2}b^\nu_i\partial_\nu b^\mu_i\left(
(\Delta W_i)^2-\Delta t_i\right)+ \\
&\quad b^\nu_i\partial_\nu a^\mu_i\Delta Z_i
+\left(a^\nu_i\partial_\nu
+\frac{1}{2}b^\nu_ib^\sigma_i\partial_\nu\partial_\sigma\right)
b^\mu_i\left(\Delta W_i\Delta t_i-\Delta Z_i\right)+ \\
&\quad\frac{1}{2}\left(a^\nu_i\partial_\nu
+\frac{1}{2}b^\nu_ib^\sigma_i\partial_\nu\partial_\sigma\right)
a^\mu_i\Delta t_i^2
+\frac{1}{2}b^\nu_i\partial_\nu b^\sigma_i\partial_\sigma b^\mu_i\left(
\frac{1}{3}(\Delta W_i)^2-\Delta t_i\right)\Delta W_i
\end{align}
:param drift: Computes the drift coefficient
:math:`\vec{a}(\vec{X})`
:type drift: callable(X)
:param diffusion: Computes the diffusion coefficient
:math:`\vec{b}(\vec{X})`
:type diffusion: callable(X)
:param b_dx_b: Computes the coefficient
:math:`\left(\vec{b}(\vec{X})\cdot
\vec{\nabla}_{\vec{X}}\right)\vec{b}(\vec{X})`
:type b_dx_b: callable(X)
:param b_dx_a: Computes the coefficient
:math:`\left(\vec{b}(\vec{X})\cdot
\vec{\nabla}_{\vec{X}}\right)\vec{a}(\vec{X})`
:type b_dx_a: callable(X)
:param a_dx_b: Computes the coefficient
:math:`\left(\vec{a}(\vec{X})\cdot
\vec{\nabla}_{\vec{X}}\right)\vec{b}(\vec{X})`
:type a_dx_b: callable(X)
:param a_dx_a: Computes the coefficient
:math:`\left(\vec{a}(\vec{X})\cdot
\vec{\nabla}_{\vec{X}}\right)\vec{a}(\vec{X})`
:type a_dx_a: callable(X)
:param b_dx_b_dx_b: Computes the coefficient
:math:`\left(\vec{b}(\vec{X})\cdot
\vec{\nabla}_{\vec{X}}\right)^2\vec{b}(\vec{X})`
:type b_dx_b_dx_b: callable(X)
:param b_b_dx_dx_b: Computes
:math:`b^\nu b^\sigma\partial_\nu\partial_\sigma
b^\mu\hat{e}_\mu`.
:type b_dx_b_dx_b: callable(X)
:param b_b_dx_dx_a: Computes
:math:`b^\nu b^\sigma\partial_\nu\partial_\sigma
a^\mu\hat{e}_\mu`.
:type b_dx_b_dx_a: callable(X)
:param X0: Initial condition on X
:type X0: array
:param ts: A sequence of time points for which to solve for X.
The initial value point should be the first element
of this sequence.
:type ts: array
:param U1s: Normalized Weiner increments for each time step
(i.e. samples from a Gaussian distribution with mean
0 and variance 1).
:type U1s: array, shape=(len(t) - 1)
:param U2s: Normalized Weiner increments for each time step
(i.e. samples from a Gaussian distribution with mean
0 and variance 1).
:type U2s: array, shape=(len(t) - 1)
:return: Array containing the value of X for each desired
time in t, with the initial value `X0` in the first
row.
:rtype: numpy.array, shape=(len(t), len(X0))
"""
dts = ts[1:] - ts[:-1]
sqrtdts = np.sqrt(dts)
dWs = U1s*sqrtdts
dZs = (U1s + U2s/np.sqrt(3))*sqrtdts*dts/2
Xs = [np.array(X0)]
for t, dt, dW, dZ in zip(ts[:-1], dts, dWs, dZs):
X = Xs[-1]
Xs.append(X + drift(X)*dt + diffusion(X)*dW + b_dx_b(X)*(dW**2 - dt)/2 +
b_dx_a(X)*dZ + (a_dx_b(X)+b_b_dx_dx_b(X)/2)*(dW*dt - dZ) +
(a_dx_a(X)+b_b_dx_dx_a(X)/2)*dt**2/2 +
b_dx_b_dx_b(X)*(dW**2/3 - dt)*dW/2)
return Xs
[docs]def faulty_milstein(drift, diffusion, b_dx_b, X0, ts, Us):
r"""Integrate a system of ordinary stochastic differential equations subject
to scalar noise:
.. math::
d\vec{X}=\vec{a}(\vec{X},t)\,dt+\vec{b}(\vec{X},t)\,dW_t
Uses a faulty Milstein method (i.e. missing the factor of 1/2 in the term
added to Euler integration):
.. math::
\vec{X}_{i+1}=\vec{X}_i+\vec{a}(\vec{X}_i,t_i)\Delta t_i+
\vec{b}(\vec{X}_i,t_i)\Delta W_i+
\left(\vec{b}(\vec{X}_i,t_i)\cdot\vec{\nabla}_{\vec{X}}\right)
\vec{b}(\vec{X}_i,t_i)\left((\Delta W_i)^2-\Delta t_i\right)
where :math:`\Delta W_i=U_i\sqrt{\Delta t}`, :math:`U` being a normally
distributed random variable with mean 0 and variance 1.
:param drift: Computes the drift coefficient
:math:`\vec{a}(\vec{X},t)`
:type drift: callable(X, t)
:param diffusion: Computes the diffusion coefficient
:math:`\vec{b}(\vec{X},t)`
:type diffusion: callable(X, t)
:param b_dx_b: Computes the correction coefficient
:math:`\left(\vec{b}(\vec{X},t)\cdot
\vec{\nabla}_{\vec{X}}\right)\vec{b}(\vec{X},t)`
:type b_dx_b: callable(X, t)
:param X0: Initial condition on X
:type X0: array
:param ts: A sequence of time points for which to solve for X.
The initial value point should be the first element
of this sequence.
:type ts: array
:param Us: Normalized Weiner increments for each time step
(i.e. samples from a Gaussian distribution with mean
0 and variance 1).
:type Us: array, shape=(len(t) - 1)
:return: Array containing the value of X for each desired
time in t, with the initial value `X0` in the first
row.
:rtype: numpy.array, shape=(len(t), len(X0))
"""
dts = [tf - ti for tf, ti in zip(ts[1:], ts[:-1])]
# Scale the Weiner increments to the time increments.
sqrtdts = np.sqrt(dts)
dWs = np.product(np.array([sqrtdts, Us]), axis=0)
X = [np.array(X0)]
for t, dt, dW in zip(ts[:-1], dts, dWs):
X.append(X[-1] + drift(X[-1], t)*dt + diffusion(X[-1], t)*dW +
b_dx_b(X[-1], t)*(dW**2 - dt))
return X