Source code for stumpy.mstumped

# STUMPY
# Copyright 2019 TD Ameritrade. Released under the terms of the 3-Clause BSD license.
# STUMPY is a trademark of TD Ameritrade IP Company, Inc. All rights reserved.

import numpy as np

from . import config, core
from .maamped import maamped
from .mstump import _get_first_mstump_profile, _get_multi_QT, _mstump


def _dask_mstumped(
    dask_client,
    T_A,
    T_B,
    m,
    excl_zone,
    M_T,
    Σ_T,
    μ_Q,
    σ_Q,
    T_subseq_isconstant,
    Q_subseq_isconstant,
    include,
    discords,
):
    """
    Compute the multi-dimensional z-normalized matrix profile with a distributed
    dask cluster

    This is a highly distributed implementation around the Numba JIT-compiled
    parallelized `_mstump` function which computes the multi-dimensional matrix
    profile according to STOMP. Note that only self-joins are supported.

    Parameters
    ----------
    dask_client : client
        A Dask Distributed client. Setting up a distributed cluster is beyond
        the scope of this library. Please refer to the Dask Distributed
        documentation.

    T_A : numpy.ndarray
        The time series or sequence for which to compute the multi-dimensional
        matrix profile. Each row in `T_A` represents data from a different
        dimension while each column in `T_A` represents data from the same
        dimension.

    T_B : numpy.ndarray
        The time series or sequence that will be used to annotate T_A. For every
        subsequence in T_A, its nearest neighbor in T_B will be recorded. Default is
        `None` which corresponds to a self-join.

    m : int
        Window size

    excl_zone : int
        The half width for the exclusion zone relative to the current
        sliding window

    M_T : numpy.ndarray
        Sliding mean of time series, `T`

    Σ_T : numpy.ndarray
        Sliding standard deviation of time series, `T`

    μ_Q : numpy.ndarray
        Mean of the query sequence, `Q`, relative to the current sliding window

    σ_Q : numpy.ndarray
        Standard deviation of the query sequence, `Q`, relative to the current
        sliding window

    T_subseq_isconstant : numpy.ndarray
        A boolearn array representing Rolling isconstant for `T`

    Q_subseq_isconstant : numpy.ndarray
        A boolearn array representing Rolling isconstant for `Q`

    include : numpy.ndarray
        A list of (zero-based) indices corresponding to the dimensions in `T` that
        must be included in the constrained multidimensional motif search.
        For more information, see Section IV D in:

        `DOI: 10.1109/ICDM.2017.66 \
        <https://www.cs.ucr.edu/~eamonn/Motif_Discovery_ICDM.pdf>`__

    discords : bool
        When set to `True`, this reverses the distance profile to favor discords rather
        than motifs. Note that indices in `include` are still maintained and respected.

    Returns
    -------
    P : numpy.ndarray
        The multi-dimensional matrix profile. Each row of the array corresponds
        to each matrix profile for a given dimension (i.e., the first row is
        the 1-D matrix profile and the second row is the 2-D matrix profile).

    I : numpy.ndarray
        The multi-dimensional matrix profile index where each row of the array
        corresponds to each matrix profile index for a given dimension.
    """
    d, n = T_B.shape
    k = n - m + 1
    P = np.empty((d, k), dtype=np.float64)
    I = np.empty((d, k), dtype=np.int64)

    hosts = list(dask_client.ncores().keys())
    nworkers = len(hosts)

    step = 1 + k // nworkers

    for i, start in enumerate(range(0, k, step)):
        P[:, start], I[:, start] = _get_first_mstump_profile(
            start,
            T_A,
            T_B,
            m,
            excl_zone,
            M_T,
            Σ_T,
            μ_Q,
            σ_Q,
            T_subseq_isconstant,
            Q_subseq_isconstant,
            include,
            discords,
        )

    # Scatter data to Dask cluster
    T_A_future = dask_client.scatter(T_A, broadcast=True, hash=False)
    M_T_future = dask_client.scatter(M_T, broadcast=True, hash=False)
    Σ_T_future = dask_client.scatter(Σ_T, broadcast=True, hash=False)
    μ_Q_future = dask_client.scatter(μ_Q, broadcast=True, hash=False)
    σ_Q_future = dask_client.scatter(σ_Q, broadcast=True, hash=False)
    T_subseq_isconstant_future = dask_client.scatter(
        T_subseq_isconstant, broadcast=True, hash=False
    )
    Q_subseq_isconstant_future = dask_client.scatter(
        Q_subseq_isconstant, broadcast=True, hash=False
    )

    QT_futures = []
    QT_first_futures = []

    for i, start in enumerate(range(0, k, step)):
        QT, QT_first = _get_multi_QT(start, T_A, m)

        QT_future = dask_client.scatter(QT, workers=[hosts[i]], hash=False)
        QT_first_future = dask_client.scatter(QT_first, workers=[hosts[i]], hash=False)

        QT_futures.append(QT_future)
        QT_first_futures.append(QT_first_future)

    futures = []
    for i, start in enumerate(range(0, k, step)):
        stop = min(k, start + step)

        futures.append(
            dask_client.submit(
                _mstump,
                T_A_future,
                m,
                stop,
                excl_zone,
                M_T_future,
                Σ_T_future,
                QT_futures[i],
                QT_first_futures[i],
                μ_Q_future,
                σ_Q_future,
                T_subseq_isconstant_future,
                Q_subseq_isconstant_future,
                k,
                start + 1,
                include,
                discords,
            )
        )

    results = dask_client.gather(futures)
    for i, start in enumerate(range(0, k, step)):
        stop = min(k, start + step)
        P[:, start + 1 : stop], I[:, start + 1 : stop] = results[i]

    return P, I


[docs] @core.non_normalized( maamped, exclude=["normalize", "T_subseq_isconstant"], ) def mstumped( client, T, m, include=None, discords=False, p=2.0, normalize=True, T_subseq_isconstant=None, ): """ Compute the multi-dimensional z-normalized matrix profile with a distributed dask/ray cluster This is a highly distributed implementation around the Numba JIT-compiled parallelized `_mstump` function which computes the multi-dimensional matrix profile according to STOMP. Note that only self-joins are supported. Parameters ---------- client : client A Dask Distributed client that is connected to a Dask scheduler and Dask workers. Setting up a Dask distributed cluster is beyond the scope of this library. Please refer to the Dask Distributed documentation. T : numpy.ndarray The time series or sequence for which to compute the multi-dimensional matrix profile. Each row in `T` represents data from a different dimension while each column in `T` represents data from the same dimension. m : int Window size include : list, numpy.ndarray, default None A list of (zero-based) indices corresponding to the dimensions in `T` that must be included in the constrained multidimensional motif search. For more information, see Section IV D in: `DOI: 10.1109/ICDM.2017.66 \ <https://www.cs.ucr.edu/~eamonn/Motif_Discovery_ICDM.pdf>`__ discords : bool, default False When set to `True`, this reverses the distance matrix which results in a multi-dimensional matrix profile that favors larger matrix profile values (i.e., discords) rather than smaller values (i.e., motifs). Note that indices in `include` are still maintained and respected. p : float, default 2.0 The p-norm to apply for computing the Minkowski distance. Minkowski distance is typically used with `p` being 1 or 2, which correspond to the Manhattan distance and the Euclidean distance, respectively. normalize : bool, default True When set to `True`, this z-normalizes subsequences prior to computing distances. Otherwise, this function gets re-routed to its complementary non-normalized equivalent set in the `@core.non_normalized` function decorator. T_subseq_isconstant : numpy.ndarray, function, or list, default None A parameter that is used to show whether a subsequence of a time series in `T` is constant (True) or not. T_subseq_isconstant can be a 2D boolean numpy.ndarry or a function that can be applied to each time series in `T`. Alternatively, for maximum flexibility, a list (with length equal to the total number of time series) may also be used. In this case, T_subseq_isconstant[i] corresponds to the i-th time series T[i] and each element in the list can either be a 1D boolean np.ndarray, a function, or None. Returns ------- P : numpy.ndarray The multi-dimensional matrix profile. Each row of the array corresponds to each matrix profile for a given dimension (i.e., the first row is the 1-D matrix profile and the second row is the 2-D matrix profile). I : numpy.ndarray The multi-dimensional matrix profile index where each row of the array corresponds to each matrix profile index for a given dimension. See Also -------- stumpy.mstump : Compute the multi-dimensional z-normalized matrix profile stumpy.subspace : Compute the k-dimensional matrix profile subspace for a given subsequence index and its nearest neighbor index stumpy.mdl : Compute the number of bits needed to compress one array with another using the minimum description length (MDL) Notes ----- `DOI: 10.1109/ICDM.2017.66 \ <https://www.cs.ucr.edu/~eamonn/Motif_Discovery_ICDM.pdf>`__ See mSTAMP Algorithm Examples -------- >>> import stumpy >>> import numpy as np >>> from dask.distributed import Client >>> if __name__ == "__main__": ... with Client() as dask_client: ... stumpy.mstumped( ... np.array([[584., -11., 23., 79., 1001., 0., -19.], ... [ 1., 2., 4., 8., 16., 0., 32.]]), ... m=3) (array([[0. , 1.43947142, 0. , 2.69407392, 0.11633857], [0.777905 , 2.36179922, 1.50004632, 2.92246722, 0.777905 ]]), array([[2, 4, 0, 1, 0], [4, 4, 0, 1, 0]])) """ T_A = T T_B = T_A T_A = core._preprocess(T_A) T_B = core._preprocess(T_B) T_A_subseq_isconstant = T_subseq_isconstant T_A_subseq_isconstant = core.process_isconstant(T_A, m, T_A_subseq_isconstant) T_B_subseq_isconstant = T_A_subseq_isconstant T_A, M_T, Σ_T, T_subseq_isconstant = core.preprocess( T_A, m, T_subseq_isconstant=T_A_subseq_isconstant ) T_B, μ_Q, σ_Q, Q_subseq_isconstant = core.preprocess( T_B, m, T_subseq_isconstant=T_B_subseq_isconstant ) if T_A.ndim <= 1: # pragma: no cover err = f"T is {T_A.ndim}-dimensional and must be at least 1-dimensional" raise ValueError(f"{err}") core.check_window_size(m, max_size=min(T_A.shape[1], T_B.shape[1])) if include is not None: include = core._preprocess_include(include) excl_zone = int( np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM) ) # See Definition 3 and Figure 3 _mstumped = core._client_to_func(client) P, I = _mstumped( client, T_A, T_B, m, excl_zone, M_T, Σ_T, μ_Q, σ_Q, T_subseq_isconstant, Q_subseq_isconstant, include, discords, ) return P, I