Source code for pymc.distributions.timeseries

#   Copyright 2024 The PyMC Developers
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
import abc
import warnings

from abc import ABCMeta
from collections.abc import Callable

import numpy as np
import pytensor
import pytensor.tensor as pt

from pytensor.graph.basic import Node, ancestors
from pytensor.graph.replace import clone_replace
from pytensor.tensor import TensorVariable
from pytensor.tensor.random.op import RandomVariable
from pytensor.tensor.random.utils import normalize_size_param

from pymc.distributions.continuous import Normal, get_tau_sigma
from pymc.distributions.distribution import (
    Distribution,
    SymbolicRandomVariable,
    _support_point,
    support_point,
)
from pymc.distributions.multivariate import MvNormal, MvStudentT
from pymc.distributions.shape_utils import (
    _change_dist_size,
    change_dist_size,
    get_support_shape,
    get_support_shape_1d,
    rv_size_is_none,
)
from pymc.exceptions import NotConstantValueError
from pymc.logprob.abstract import _logprob
from pymc.logprob.basic import logp
from pymc.pytensorf import constant_fold, intX
from pymc.util import check_dist_not_registered

__all__ = [
    "RandomWalk",
    "GaussianRandomWalk",
    "MvGaussianRandomWalk",
    "MvStudentTRandomWalk",
    "AR",
    "GARCH11",
    "EulerMaruyama",
]


class RandomWalkRV(SymbolicRandomVariable):
    """RandomWalk Variable"""

    _print_name = ("RandomWalk", "\\operatorname{RandomWalk}")

    @classmethod
    def rv_op(cls, init_dist, innovation_dist, steps, size=None):
        # We don't allow passing `rng` because we don't fully control the rng of the components!
        steps = pt.as_tensor(steps, dtype=int, ndim=0)

        dist_ndim_supp = init_dist.owner.op.ndim_supp
        init_dist_shape = tuple(init_dist.shape)
        init_dist_batch_shape = init_dist_shape[: len(init_dist_shape) - dist_ndim_supp]
        innovation_dist_shape = tuple(innovation_dist.shape)
        innovation_batch_shape = innovation_dist_shape[
            : len(innovation_dist_shape) - dist_ndim_supp
        ]
        ndim_supp = dist_ndim_supp + 1

        size = normalize_size_param(size)

        # If not explicit, size is determined by the shapes of the input distributions
        if rv_size_is_none(size):
            size = pt.broadcast_shape(
                init_dist_batch_shape, innovation_batch_shape, arrays_are_shapes=True
            )

        # Resize input distributions. We will size them to (T, B, S) in order
        # to safely take random draws. We later swap the steps dimension so
        # that the final distribution will follow (B, T, S)
        # init_dist must have shape (1, B, S)
        init_dist = change_dist_size(init_dist, (1, *size))
        # innovation_dist must have shape (T-1, B, S)
        innovation_dist = change_dist_size(innovation_dist, (steps, *size))

        # We can only infer the logp of a dimshuffled variables, if the dimshuffle is
        # done directly on top of a RandomVariable. Because of this we dimshuffle the
        # distributions and only then concatenate them, instead of the other way around.
        # shape = (B, 1, S)
        init_dist_dimswapped = pt.moveaxis(init_dist, 0, -ndim_supp)
        # shape = (B, T-1, S)
        innovation_dist_dimswapped = pt.moveaxis(innovation_dist, 0, -ndim_supp)
        # shape = (B, T, S)
        grw = pt.concatenate([init_dist_dimswapped, innovation_dist_dimswapped], axis=-ndim_supp)
        grw = pt.cumsum(grw, axis=-ndim_supp)

        innov_supp_dims = [f"d{i}" for i in range(dist_ndim_supp)]
        innov_supp_str = ",".join(innov_supp_dims)
        out_supp_str = ",".join(["t", *innov_supp_dims])
        signature = f"({innov_supp_str}),({innov_supp_str}),(s),[rng]->({out_supp_str}),[rng]"
        return RandomWalkRV(
            [init_dist, innovation_dist, steps],
            # We pass steps_ through just so we can keep a reference to it, even though
            # it's no longer needed at this point
            [grw],
            signature=signature,
        )(init_dist, innovation_dist, steps)


class RandomWalk(Distribution):
    r"""RandomWalk Distribution

    TODO: Expand docstrings
    """

    rv_type = RandomWalkRV
    rv_op = RandomWalkRV.rv_op

    def __new__(cls, *args, innovation_dist, steps=None, **kwargs):
        steps = cls.get_steps(
            innovation_dist=innovation_dist,
            steps=steps,
            shape=None,  # Shape will be checked in `cls.dist`
            dims=kwargs.get("dims"),
            observed=kwargs.get("observed"),
        )

        return super().__new__(cls, *args, innovation_dist=innovation_dist, steps=steps, **kwargs)

    @classmethod
    def dist(cls, init_dist, innovation_dist, steps=None, **kwargs) -> pt.TensorVariable:
        if not (
            isinstance(init_dist, pt.TensorVariable)
            and init_dist.owner is not None
            and isinstance(init_dist.owner.op, RandomVariable | SymbolicRandomVariable)
        ):
            raise TypeError("init_dist must be a distribution variable")
        check_dist_not_registered(init_dist)

        if not (
            isinstance(innovation_dist, pt.TensorVariable)
            and innovation_dist.owner is not None
            and isinstance(innovation_dist.owner.op, RandomVariable | SymbolicRandomVariable)
        ):
            raise TypeError("innovation_dist must be a distribution variable")
        check_dist_not_registered(innovation_dist)

        if init_dist.owner.op.ndim_supp != innovation_dist.owner.op.ndim_supp:
            raise TypeError(
                "init_dist and innovation_dist must have the same support dimensionality"
            )

        # We need to check this, because we clone the variables when we ignore their logprob next
        if init_dist in ancestors([innovation_dist]) or innovation_dist in ancestors([init_dist]):
            raise ValueError("init_dist and innovation_dist must be completely independent")

        steps = cls.get_steps(
            innovation_dist=innovation_dist,
            steps=steps,
            shape=kwargs.get("shape"),
            dims=None,
            observed=None,
        )
        if steps is None:
            raise ValueError("Must specify steps or shape parameter")
        steps = pt.as_tensor_variable(intX(steps))

        return super().dist([init_dist, innovation_dist, steps], **kwargs)

    @classmethod
    def get_steps(cls, innovation_dist, steps, shape, dims, observed):
        # We need to know the ndim_supp of the innovation_dist
        if not (
            isinstance(innovation_dist, pt.TensorVariable)
            and innovation_dist.owner is not None
            and isinstance(innovation_dist.owner.op, RandomVariable | SymbolicRandomVariable)
        ):
            raise TypeError("innovation_dist must be a distribution variable")

        dist_ndim_supp = innovation_dist.owner.op.ndim_supp
        dist_shape = tuple(innovation_dist.shape)
        support_shape = None
        if steps is not None:
            support_shape = (steps,) + (dist_shape[len(dist_shape) - dist_ndim_supp :])
        support_shape = get_support_shape(
            support_shape=support_shape,
            shape=shape,
            dims=dims,
            observed=observed,
            support_shape_offset=1,
            ndim_supp=dist_ndim_supp + 1,
        )
        if support_shape is not None:
            steps = support_shape[-dist_ndim_supp - 1]
        return steps


@_change_dist_size.register(RandomWalkRV)
def change_random_walk_size(op, dist, new_size, expand):
    init_dist, innovation_dist, steps = dist.owner.inputs
    if expand:
        old_shape = tuple(dist.shape)
        old_size = old_shape[: len(old_shape) - op.ndim_supp]
        new_size = tuple(new_size) + tuple(old_size)
    return RandomWalk.rv_op(init_dist, innovation_dist, steps, size=new_size)


@_support_point.register(RandomWalkRV)
def random_walk_support_point(op, rv, init_dist, innovation_dist, steps):
    # shape = (1, B, S)
    init_support_point = support_point(init_dist)
    # shape = (T-1, B, S)
    innovation_support_point = support_point(innovation_dist)
    # shape = (T, B, S)
    grw_support_point = pt.concatenate([init_support_point, innovation_support_point], axis=0)
    grw_support_point = pt.cumsum(grw_support_point, axis=0)
    # shape = (B, T, S)
    grw_support_point = pt.moveaxis(grw_support_point, 0, -op.ndim_supp)
    return grw_support_point


@_logprob.register(RandomWalkRV)
def random_walk_logp(op, values, *inputs, **kwargs):
    # Although we can derive the logprob of random walks, it does not collapse
    # what we consider the core dimension of steps. We do it manually here.
    (value,) = values
    # Recreate RV and obtain inner graph
    rv_node = op.make_node(*inputs)
    rv = clone_replace(
        op.inner_outputs, replace={u: v for u, v in zip(op.inner_inputs, rv_node.inputs)}
    )[op.default_output]
    # Obtain logp of the inner graph and collapse steps dimension
    return logp(rv, value).sum(axis=-1)


class PredefinedRandomWalk(ABCMeta):
    """Base class for predefined RandomWalk distributions"""

    def __new__(cls, name, *args, **kwargs):
        init_dist, innovation_dist, kwargs = cls.get_dists(*args, **kwargs)
        return RandomWalk(name, init_dist=init_dist, innovation_dist=innovation_dist, **kwargs)

    @classmethod
    def dist(cls, *args, **kwargs) -> pt.TensorVariable:
        init_dist, innovation_dist, kwargs = cls.get_dists(*args, **kwargs)
        return RandomWalk.dist(init_dist=init_dist, innovation_dist=innovation_dist, **kwargs)

    @classmethod
    @abc.abstractmethod
    def get_dists(cls, *args, **kwargs):
        pass


[docs] class GaussianRandomWalk(PredefinedRandomWalk): r"""Random Walk with Normal innovations. Parameters ---------- mu : tensor_like of float, default 0 innovation drift sigma : tensor_like of float, default 1 sigma > 0, innovation standard deviation. init_dist : unnamed_distribution Unnamed univariate distribution of the initial value. Unnamed refers to distributions created with the ``.dist()`` API. .. warning:: init_dist will be cloned, rendering them independent of the ones passed as input. steps : int, optional Number of steps in Gaussian Random Walk (steps > 0). Only needed if shape is not provided. """ @classmethod def get_dists(cls, mu=0.0, sigma=1.0, *, init_dist=None, **kwargs): if "init" in kwargs: warnings.warn( "init parameter is now called init_dist. Using init will raise an error in a future release.", FutureWarning, ) init_dist = kwargs.pop("init") if init_dist is None: warnings.warn( "Initial distribution not specified, defaulting to `Normal.dist(0, 100)`." "You can specify an init_dist manually to suppress this warning.", UserWarning, ) init_dist = Normal.dist(0, 100) mu = pt.as_tensor_variable(mu) sigma = pt.as_tensor_variable(sigma) innovation_dist = Normal.dist(mu=mu, sigma=sigma) return init_dist, innovation_dist, kwargs
[docs] class MvGaussianRandomWalk(PredefinedRandomWalk): r"""Random Walk with Multivariate Normal innovations Parameters ---------- mu : tensor_like of float innovation drift cov : tensor_like of float pos def matrix, innovation covariance matrix tau : tensor_like of float pos def matrix, inverse covariance matrix chol : tensor_like of float Cholesky decomposition of covariance matrix lower : bool, default=True Whether the cholesky fatcor is given as a lower triangular matrix. init_dist : unnamed_distribution Unnamed multivariate distribution of the initial value. .. warning:: init_dist will be cloned, rendering them independent of the ones passed as input. steps : int, optional Number of steps in Random Walk (steps > 0). Only needed if shape is not provided. Notes ----- Only one of cov, tau or chol is required. """ @classmethod def get_dists(cls, mu, *, cov=None, tau=None, chol=None, lower=True, init_dist=None, **kwargs): if "init" in kwargs: warnings.warn( "init parameter is now called init_dist. Using init will raise an error " "in a future release.", FutureWarning, ) init_dist = kwargs.pop("init") if init_dist is None: warnings.warn( "Initial distribution not specified, defaulting to `MvNormal.dist(0, I*100)`." "You can specify an init_dist manually to suppress this warning.", UserWarning, ) init_dist = MvNormal.dist(pt.zeros_like(mu.shape[-1]), pt.eye(mu.shape[-1]) * 100) innovation_dist = MvNormal.dist(mu=mu, cov=cov, tau=tau, chol=chol, lower=lower) return init_dist, innovation_dist, kwargs
[docs] class MvStudentTRandomWalk(PredefinedRandomWalk): r"""Multivariate Random Walk with StudentT innovations Parameters ---------- nu : int degrees of freedom mu : tensor_like of float innovation drift scale : tensor_like of float pos def matrix, innovation covariance matrix tau : tensor_like of float pos def matrix, inverse covariance matrix chol : tensor_like of float Cholesky decomposition of covariance matrix lower : bool, default=True Whether the cholesky fatcor is given as a lower triangular matrix. init_dist : unnamed_distribution Unnamed multivariate distribution of the initial value. .. warning:: init_dist will be cloned, rendering them independent of the ones passed as input. steps : int, optional Number of steps in Random Walk (steps > 0). Only needed if shape is not provided. Notes ----- Only one of cov, tau or chol is required. """ @classmethod def get_dists( cls, *, nu, mu, scale=None, tau=None, chol=None, lower=True, init_dist=None, **kwargs ): if "init" in kwargs: warnings.warn( "init parameter is now called init_dist. Using init will raise an error " "in a future release.", FutureWarning, ) init_dist = kwargs.pop("init") if init_dist is None: warnings.warn( "Initial distribution not specified, defaulting to `MvNormal.dist(0, I*100)`." "You can specify an init_dist manually to suppress this warning.", UserWarning, ) init_dist = MvNormal.dist(pt.zeros_like(mu.shape[-1]), pt.eye(mu.shape[-1]) * 100) innovation_dist = MvStudentT.dist( nu=nu, mu=mu, scale=scale, tau=tau, chol=chol, lower=lower, cov=kwargs.pop("cov", None) ) return init_dist, innovation_dist, kwargs
class AutoRegressiveRV(SymbolicRandomVariable): """A placeholder used to specify a log-likelihood for an AR sub-graph.""" signature = "(o),(),(o),(s),[rng]->[rng],(t)" ar_order: int constant_term: bool _print_name = ("AR", "\\operatorname{AR}") def __init__(self, *args, ar_order, constant_term, **kwargs): self.ar_order = ar_order self.constant_term = constant_term super().__init__(*args, **kwargs) @classmethod def rv_op(cls, rhos, sigma, init_dist, steps, ar_order, constant_term, size=None): # We don't allow passing `rng` because we don't fully control the rng of the components! noise_rng = pytensor.shared(np.random.default_rng()) size = normalize_size_param(size) # Init dist should have shape (*size, ar_order) if rv_size_is_none(size): # In this case the size of the init_dist depends on the parameters shape # The last dimension of rho and init_dist does not matter batch_size = pt.broadcast_shape( tuple(sigma.shape), tuple(rhos.shape)[:-1], tuple(pt.atleast_1d(init_dist).shape)[:-1], arrays_are_shapes=True, ) else: batch_size = size if init_dist.owner.op.ndim_supp == 0: init_dist_size = (*batch_size, ar_order) else: # In this case the support dimension must cover for ar_order init_dist_size = batch_size init_dist = change_dist_size(init_dist, init_dist_size) rhos_bcast_shape = init_dist.shape if constant_term: # In this case init shape is one unit smaller than rhos in the last dimension rhos_bcast_shape = (*rhos_bcast_shape[:-1], rhos_bcast_shape[-1] + 1) rhos_bcast = pt.broadcast_to(rhos, rhos_bcast_shape) def step(*args): *prev_xs, reversed_rhos, sigma, rng = args if constant_term: mu = reversed_rhos[-1] + pt.sum(prev_xs * reversed_rhos[:-1], axis=0) else: mu = pt.sum(prev_xs * reversed_rhos, axis=0) next_rng, new_x = Normal.dist(mu=mu, sigma=sigma, rng=rng).owner.outputs return new_x, {rng: next_rng} # We transpose inputs as scan iterates over first dimension innov, innov_updates = pytensor.scan( fn=step, outputs_info=[{"initial": init_dist.T, "taps": range(-ar_order, 0)}], non_sequences=[rhos_bcast.T[::-1], sigma.T, noise_rng], n_steps=steps, strict=True, ) (noise_next_rng,) = tuple(innov_updates.values()) ar = pt.concatenate([init_dist, innov.T], axis=-1) return AutoRegressiveRV( inputs=[rhos, sigma, init_dist, steps, noise_rng], outputs=[noise_next_rng, ar], ar_order=ar_order, constant_term=constant_term, )(rhos, sigma, init_dist, steps, noise_rng) def update(self, node: Node): """Return the update mapping for the noise RV.""" return {node.inputs[-1]: node.outputs[0]}
[docs] class AR(Distribution): r"""Autoregressive process with p lags. .. math:: x_t = \rho_0 + \rho_1 x_{t-1} + \ldots + \rho_p x_{t-p} + \epsilon_t, \epsilon_t \sim N(0,\sigma^2) The innovation can be parameterized either in terms of precision or standard deviation. The link between the two parametrizations is given by .. math:: \tau = \dfrac{1}{\sigma^2} Parameters ---------- rho : tensor_like of float Tensor of autoregressive coefficients. The n-th entry in the last dimension is the coefficient for the n-th lag. sigma : tensor_like of float, default 1 Standard deviation of innovation (sigma > 0). Only required if tau is not specified. tau : tensor_like of float, optional Precision of innovation (tau > 0). constant : bool, default False Whether the first element of rho should be used as a constant term in the AR process. init_dist : unnamed_distribution, optional Scalar or vector distribution for initial values. Distributions should have shape (*shape[:-1], ar_order). If not, it will be automatically resized. Defaults to pm.Normal.dist(0, 100, shape=...). .. warning:: init_dist will be cloned, rendering it independent of the one passed as input. ar_order : int, optional Order of the AR process. Inferred from length of the last dimension of rho, if possible. ar_order = rho.shape[-1] if constant else rho.shape[-1] - 1 steps : int, optional Number of steps in AR process (steps > 0). Only needed if shape is not provided. Notes ----- The init distribution will be cloned, rendering it distinct from the one passed as input. Examples -------- .. code-block:: python # Create an AR of order 3, with a constant term with pm.Model() as AR3: # The first coefficient will be the constant term coefs = pm.Normal("coefs", 0, size=4) # We need one init variable for each lag, hence size=3 init = pm.Normal.dist(5, size=3) ar3 = pm.AR("ar3", coefs, sigma=1.0, init_dist=init, constant=True, steps=500) """ rv_type = AutoRegressiveRV rv_op = AutoRegressiveRV.rv_op def __new__(cls, name, rho, *args, steps=None, constant=False, ar_order=None, **kwargs): rhos = pt.atleast_1d(pt.as_tensor_variable(rho)) ar_order = cls._get_ar_order(rhos=rhos, constant=constant, ar_order=ar_order) steps = get_support_shape_1d( support_shape=steps, shape=None, # Shape will be checked in `cls.dist` dims=kwargs.get("dims", None), observed=kwargs.get("observed", None), support_shape_offset=ar_order, ) return super().__new__( cls, name, rhos, *args, steps=steps, constant=constant, ar_order=ar_order, **kwargs )
[docs] @classmethod def dist( cls, rho, sigma=None, tau=None, *, init_dist=None, steps=None, constant=False, ar_order=None, **kwargs, ): _, sigma = get_tau_sigma(tau=tau, sigma=sigma) sigma = pt.as_tensor_variable(sigma) rhos = pt.atleast_1d(pt.as_tensor_variable(rho)) if "init" in kwargs: warnings.warn( "init parameter is now called init_dist. Using init will raise an error in a future release.", FutureWarning, ) init_dist = kwargs.pop("init") ar_order = cls._get_ar_order(rhos=rhos, constant=constant, ar_order=ar_order) steps = get_support_shape_1d( support_shape=steps, shape=kwargs.get("shape", None), support_shape_offset=ar_order ) if steps is None: raise ValueError("Must specify steps or shape parameter") steps = pt.as_tensor_variable(intX(steps), ndim=0) if init_dist is not None: if not isinstance(init_dist, TensorVariable) or not isinstance( init_dist.owner.op, RandomVariable | SymbolicRandomVariable ): raise ValueError( f"Init dist must be a distribution created via the `.dist()` API, " f"got {type(init_dist)}" ) check_dist_not_registered(init_dist) if init_dist.owner.op.ndim_supp > 1: raise ValueError( "Init distribution must have a scalar or vector support dimension, ", f"got ndim_supp={init_dist.owner.op.ndim_supp}.", ) else: warnings.warn( "Initial distribution not specified, defaulting to " "`Normal.dist(0, 100, shape=...)`. You can specify an init_dist " "manually to suppress this warning.", UserWarning, ) init_dist = Normal.dist(0, 100, shape=(*sigma.shape, ar_order)) return super().dist([rhos, sigma, init_dist, steps, ar_order, constant], **kwargs)
@classmethod def _get_ar_order(cls, rhos: TensorVariable, ar_order: int | None, constant: bool) -> int: """Compute ar_order given inputs If ar_order is not specified we do constant folding on the shape of rhos to retrieve it. For example, this will detect that Normal(size=(5, 3)).shape[-1] == 3, which is not known by PyTensor before. Raises ------ ValueError If inferred ar_order cannot be inferred from rhos or if it is less than 1 """ if ar_order is None: try: (folded_shape,) = constant_fold((rhos.shape[-1],)) except NotConstantValueError: raise ValueError( "Could not infer ar_order from last dimension of rho. Pass it " "explicitly or make sure rho have a static shape" ) ar_order = int(folded_shape) - int(constant) if ar_order < 1: raise ValueError( "Inferred ar_order is smaller than 1. Increase the last dimension " "of rho or remove constant_term" ) return ar_order
@_change_dist_size.register(AutoRegressiveRV) def change_ar_size(op, dist, new_size, expand=False): if expand: old_size = dist.shape[:-1] new_size = tuple(new_size) + tuple(old_size) return AR.rv_op( *dist.owner.inputs[:-1], ar_order=op.ar_order, constant_term=op.constant_term, size=new_size, ) @_logprob.register(AutoRegressiveRV) def ar_logp(op, values, rhos, sigma, init_dist, steps, noise_rng, **kwargs): (value,) = values ar_order = op.ar_order constant_term = op.constant_term # Convolve rhos with values if constant_term: expectation = pt.add( rhos[..., 0, None], *( rhos[..., i + 1, None] * value[..., ar_order - (i + 1) : -(i + 1)] for i in range(ar_order) ), ) else: expectation = pt.add( *( rhos[..., i, None] * value[..., ar_order - (i + 1) : -(i + 1)] for i in range(ar_order) ) ) # Compute and collapse logp across time dimension innov_logp = pt.sum( logp(Normal.dist(0, sigma[..., None]), value[..., ar_order:] - expectation), axis=-1 ) init_logp = logp(init_dist, value[..., :ar_order]) if init_dist.owner.op.ndim_supp == 0: init_logp = pt.sum(init_logp, axis=-1) return init_logp + innov_logp @_support_point.register(AutoRegressiveRV) def ar_support_point(op, rv, rhos, sigma, init_dist, steps, noise_rng): # Use last entry of init_dist support_point as the moment for the whole AR return pt.full_like(rv, support_point(init_dist)[..., -1, None]) class GARCH11RV(SymbolicRandomVariable): """A placeholder used to specify a GARCH11 graph.""" signature = "(),(),(),(),(),(s),[rng]->[rng],(t)" _print_name = ("GARCH11", "\\operatorname{GARCH11}") @classmethod def rv_op(cls, omega, alpha_1, beta_1, initial_vol, init_dist, steps, size=None): # We don't allow passing `rng` because we don't fully control the rng of the components! steps = pt.as_tensor(steps, ndim=0) omega = pt.as_tensor(omega) alpha_1 = pt.as_tensor(alpha_1) beta_1 = pt.as_tensor(beta_1) initial_vol = pt.as_tensor(initial_vol) noise_rng = pytensor.shared(np.random.default_rng()) size = normalize_size_param(size) if rv_size_is_none(size): # In this case the size of the init_dist depends on the parameters shape batch_size = pt.broadcast_shape(omega, alpha_1, beta_1, initial_vol) else: batch_size = size init_dist = change_dist_size(init_dist, batch_size) # Create OpFromGraph representing random draws from GARCH11 process def step(prev_y, prev_sigma, omega, alpha_1, beta_1, rng): new_sigma = pt.sqrt( omega + alpha_1 * pt.square(prev_y) + beta_1 * pt.square(prev_sigma) ) next_rng, new_y = Normal.dist(mu=0, sigma=new_sigma, rng=rng).owner.outputs return (new_y, new_sigma), {rng: next_rng} (y_t, _), innov_updates = pytensor.scan( fn=step, outputs_info=[ init_dist, pt.broadcast_to(initial_vol.astype("floatX"), init_dist.shape), ], non_sequences=[omega, alpha_1, beta_1, noise_rng], n_steps=steps, strict=True, ) (noise_next_rng,) = tuple(innov_updates.values()) garch11 = pt.concatenate([init_dist[None, ...], y_t], axis=0).dimshuffle( (*range(1, y_t.ndim), 0) ) return GARCH11RV( inputs=[omega, alpha_1, beta_1, initial_vol, init_dist, steps, noise_rng], outputs=[noise_next_rng, garch11], )(omega, alpha_1, beta_1, initial_vol, init_dist, steps, noise_rng) def update(self, node: Node): """Return the update mapping for the noise RV.""" return {node.inputs[-1]: node.outputs[0]}
[docs] class GARCH11(Distribution): r""" GARCH(1,1) with Normal innovations. The model is specified by .. math:: y_t \sim N(0, \sigma_t^2) .. math:: \sigma_t^2 = \omega + \alpha_1 * y_{t-1}^2 + \beta_1 * \sigma_{t-1}^2 where \sigma_t^2 (the error variance) follows a ARMA(1, 1) model. Parameters ---------- omega : tensor_like of float omega > 0, mean variance alpha_1 : tensor_like of float alpha_1 >= 0, autoregressive term coefficient beta_1 : tensor_like of float beta_1 >= 0, alpha_1 + beta_1 < 1, moving average term coefficient initial_vol : tensor_like of float initial_vol >= 0, initial volatility, sigma_0 """ rv_type = GARCH11RV rv_op = GARCH11RV.rv_op def __new__(cls, *args, steps=None, **kwargs): steps = get_support_shape_1d( support_shape=steps, shape=None, # Shape will be checked in `cls.dist` dims=kwargs.get("dims", None), observed=kwargs.get("observed", None), support_shape_offset=1, ) return super().__new__(cls, *args, steps=steps, **kwargs)
[docs] @classmethod def dist(cls, omega, alpha_1, beta_1, initial_vol, *, steps=None, **kwargs): steps = get_support_shape_1d( support_shape=steps, shape=kwargs.get("shape", None), support_shape_offset=1 ) if steps is None: raise ValueError("Must specify steps or shape parameter") init_dist = Normal.dist(0, initial_vol) return super().dist([omega, alpha_1, beta_1, initial_vol, init_dist, steps], **kwargs)
@_change_dist_size.register(GARCH11RV) def change_garch11_size(op, dist, new_size, expand=False): if expand: old_size = dist.shape[:-1] new_size = tuple(new_size) + tuple(old_size) return GARCH11.rv_op( *dist.owner.inputs[:-1], size=new_size, ) @_logprob.register(GARCH11RV) def garch11_logp( op, values, omega, alpha_1, beta_1, initial_vol, init_dist, steps, noise_rng, **kwargs ): (value,) = values # Move the time axis to the first dimension value_dimswapped = value.dimshuffle((value.ndim - 1, *range(0, value.ndim - 1))) initial_vol = initial_vol * pt.ones_like(value_dimswapped[0]) def volatility_update(x, vol, w, a, b): return pt.sqrt(w + a * pt.square(x) + b * pt.square(vol)) vol, _ = pytensor.scan( fn=volatility_update, sequences=[value_dimswapped[:-1]], outputs_info=[initial_vol], non_sequences=[omega, alpha_1, beta_1], strict=True, ) sigma_t = pt.concatenate([[initial_vol], vol]) # Compute and collapse logp across time dimension innov_logp = pt.sum(logp(Normal.dist(0, sigma_t), value_dimswapped), axis=0) return innov_logp @_support_point.register(GARCH11RV) def garch11_support_point(op, rv, omega, alpha_1, beta_1, initial_vol, init_dist, steps, noise_rng): # GARCH(1,1) mean is zero return pt.zeros_like(rv) class EulerMaruyamaRV(SymbolicRandomVariable): """A placeholder used to specify a log-likelihood for a EulerMaruyama sub-graph.""" dt: float sde_fn: Callable _print_name = ("EulerMaruyama", "\\operatorname{EulerMaruyama}") def __init__(self, *args, dt: float, sde_fn: Callable, **kwargs): self.dt = dt self.sde_fn = sde_fn super().__init__(*args, **kwargs) @classmethod def rv_op(cls, init_dist, steps, sde_pars, dt, sde_fn, size=None): # We don't allow passing `rng` because we don't fully control the rng of the components! noise_rng = pytensor.shared(np.random.default_rng()) # Init dist should have shape (*size,) if size is not None: batch_size = size else: batch_size = pt.broadcast_shape(*sde_pars, init_dist) init_dist = change_dist_size(init_dist, batch_size) # Create OpFromGraph representing random draws from SDE process def step(*prev_args): prev_y, *prev_sde_pars, rng = prev_args f, g = sde_fn(prev_y, *prev_sde_pars) mu = prev_y + dt * f sigma = pt.sqrt(dt) * g next_rng, next_y = Normal.dist(mu=mu, sigma=sigma, rng=rng).owner.outputs return next_y, {rng: next_rng} y_t, innov_updates = pytensor.scan( fn=step, outputs_info=[init_dist], non_sequences=[*sde_pars, noise_rng], n_steps=steps, strict=True, ) (noise_next_rng,) = tuple(innov_updates.values()) sde_out = pt.concatenate([init_dist[None, ...], y_t], axis=0).dimshuffle( (*range(1, y_t.ndim), 0) ) return EulerMaruyamaRV( inputs=[init_dist, steps, *sde_pars, noise_rng], outputs=[noise_next_rng, sde_out], dt=dt, sde_fn=sde_fn, signature=f"(),(s),{','.join('()' for _ in sde_pars)},[rng]->[rng],(t)", )(init_dist, steps, *sde_pars, noise_rng) def update(self, node: Node): """Return the update mapping for the noise RV.""" return {node.inputs[-1]: node.outputs[0]}
[docs] class EulerMaruyama(Distribution): r""" Stochastic differential equation discretized with the Euler-Maruyama method. Parameters ---------- dt : float time step of discretization sde_fn : callable function returning the drift and diffusion coefficients of SDE sde_pars : tuple parameters of the SDE, passed as ``*args`` to ``sde_fn`` init_dist : unnamed_distribution, optional Scalar distribution for initial values. Distributions should have shape (*shape[:-1]). If not, it will be automatically resized. Defaults to pm.Normal.dist(0, 100, shape=...). .. warning:: init_dist will be cloned, rendering it independent of the one passed as input. """ rv_type = EulerMaruyamaRV rv_op = EulerMaruyamaRV.rv_op def __new__(cls, name, dt, sde_fn, *args, steps=None, **kwargs): dt = pt.as_tensor_variable(dt) steps = get_support_shape_1d( support_shape=steps, shape=None, # Shape will be checked in `cls.dist` dims=kwargs.get("dims", None), observed=kwargs.get("observed", None), support_shape_offset=1, ) return super().__new__(cls, name, dt, sde_fn, *args, steps=steps, **kwargs)
[docs] @classmethod def dist(cls, dt, sde_fn, sde_pars, *, init_dist=None, steps=None, **kwargs): steps = get_support_shape_1d( support_shape=steps, shape=kwargs.get("shape", None), support_shape_offset=1 ) if steps is None: raise ValueError("Must specify steps or shape parameter") steps = pt.as_tensor_variable(intX(steps), ndim=0) dt = pt.as_tensor_variable(dt) sde_pars = [pt.as_tensor_variable(x) for x in sde_pars] if init_dist is not None: if not isinstance(init_dist, TensorVariable) or not isinstance( init_dist.owner.op, RandomVariable | SymbolicRandomVariable ): raise ValueError( f"Init dist must be a distribution created via the `.dist()` API, " f"got {type(init_dist)}" ) check_dist_not_registered(init_dist) if init_dist.owner.op.ndim_supp > 0: raise ValueError( "Init distribution must have a scalar support dimension, ", f"got ndim_supp={init_dist.owner.op.ndim_supp}.", ) else: warnings.warn( "Initial distribution not specified, defaulting to " "`Normal.dist(0, 100, shape=...)`. You can specify an init_dist " "manually to suppress this warning.", UserWarning, ) init_dist = Normal.dist(0, 100, shape=sde_pars[0].shape) return super().dist([init_dist, steps, sde_pars, dt, sde_fn], **kwargs)
@_change_dist_size.register(EulerMaruyamaRV) def change_eulermaruyama_size(op, dist, new_size, expand=False): if expand: old_size = dist.shape[:-1] new_size = tuple(new_size) + tuple(old_size) init_dist, steps, *sde_pars, _ = dist.owner.inputs return EulerMaruyama.rv_op( init_dist, steps, sde_pars, dt=op.dt, sde_fn=op.sde_fn, size=new_size, ) @_logprob.register(EulerMaruyamaRV) def eulermaruyama_logp(op, values, init_dist, steps, *sde_pars_noise_arg, **kwargs): (x,) = values # noise arg is unused, but is needed to make the logp signature match the rv_op signature *sde_pars, _ = sde_pars_noise_arg # sde_fn is user provided and likely not broadcastable to additional time dimension, # since the input x is now [..., t], we need to broadcast each input to [..., None] # below as best effort attempt to make it work sde_pars_broadcast = [x[..., None] for x in sde_pars] xtm1 = x[..., :-1] xt = x[..., 1:] f, g = op.sde_fn(xtm1, *sde_pars_broadcast) mu = xtm1 + op.dt * f sigma = pt.sqrt(op.dt) * g # Compute and collapse logp across time dimension sde_logp = pt.sum(logp(Normal.dist(mu, sigma), xt), axis=-1) init_logp = logp(init_dist, x[..., 0]) return init_logp + sde_logp