Source code for stumpy.stimp

# STUMPY
# Copyright 2019 TD Ameritrade. Released under the terms of the 3-Clause BSD license.
# STUMPY is a trademark of TD Ameritrade IP Company, Inc. All rights reserved.

import numpy as np
from . import core, stump, scrump, stumped


def _bfs_indices(n):
    """
    Generate the level order indices from the implicit construction of a binary
    search tree followed by a breadth first (level order) search.

    Example:

    If `n = 10` then the corresponding (zero-based index) balanced binary tree is:

                5
               * *
              *   *
             *     *
            *       *
           *         *
          2           8
         * *         * *
        *   *       *   *
       *     *     *     *
      1       4   7       9
     * *     *
    0   3   6

    And if we traverse the nodes at each level from left to right then the breadth
    first search indices would be `[5, 2, 8, 1, 4, 7, 9, 0, 3, 6]`. In this function,
    we avoid/skip the explicit construction of the binary tree and directly output
    the desired indices efficiently.

    Parameters
    ----------
    n : int
        The number indices to generate the ordered indices for

    Returns
    -------
    level_idx : ndarray
        The breadth first search (level order) indices
    """
    if n == 1:  # pragma: no cover
        return np.array([0], dtype=np.int64)

    nlevel = np.floor(np.log2(n) + 1).astype(np.int64)
    nindices = np.power(2, np.arange(nlevel))
    cumsum_nindices = np.cumsum(nindices)
    nindices[-1] = n - cumsum_nindices[np.searchsorted(cumsum_nindices, n) - 1]

    indices = np.empty((2, nindices.max()), dtype=np.int64)
    indices[0, 0] = 0
    indices[1, 0] = n
    tmp_indices = np.empty((2, 2 * nindices.max()), dtype=np.int64)

    out = np.empty(n, dtype=np.int64)
    out_idx = 0

    for nidx in nindices:
        level_indices = (indices[0, :nidx] + indices[1, :nidx]) // 2

        if out_idx + len(level_indices) < n:
            tmp_indices[0, 0 : 2 * nidx : 2] = indices[0, :nidx]
            tmp_indices[0, 1 : 2 * nidx : 2] = level_indices + 1
            tmp_indices[1, 0 : 2 * nidx : 2] = level_indices
            tmp_indices[1, 1 : 2 * nidx : 2] = indices[1, :nidx]

            mask = tmp_indices[0, : 2 * nidx] < tmp_indices[1, : 2 * nidx]
            mask_sum = np.count_nonzero(mask)
            indices[0, :mask_sum] = tmp_indices[0, : 2 * nidx][mask]
            indices[1, :mask_sum] = tmp_indices[1, : 2 * nidx][mask]

        # for level_idx in level_indices:
        #     yield level_idx

        out[out_idx : out_idx + len(level_indices)] = level_indices
        out_idx += len(level_indices)

    return out


def _normalize_pan(pan, ms, bfs_indices, n_processed):
    """
    Normalize the pan matrix profile nearest neighbor distances (inplace) relative
    to the corresponding subsequence length from which they were computed

    Parameters
    ----------
    pan : ndarray
        The pan matrix profile

    ms : ndarray
        The breadth-first-search sorted subsequence window sizes

    bfs_indices : ndarray
        The breadth-first-search indices

    n_processed : ndarray
        The number of subsequence window sizes and breadth-first-search indices to
        normalize

    Returns
    -------
    None
    """
    idx = bfs_indices[:n_processed]
    norm = 1.0 / np.sqrt(2 * ms[:n_processed])
    pan[idx] = pan[idx] * norm[:, np.newaxis]


def _contrast_pan(pan, threshold, bfs_indices, n_processed):
    """
    Center the pan matrix profile (inplace) around the desired distance threshold
    in order to increase the contrast

    Parameters
    ----------
    pan : ndarray
        The pan matrix profile

    threshold : float
        The distance threshold value in which to center the pan matrix profile around

    bfs_indices : ndarray
        The breadth-first-search indices

    n_processed : ndarray
        The number of breadth-first-search indices to apply contrast to

    Returns
    -------
    None
    """
    idx = bfs_indices[:n_processed]
    l = n_processed * pan.shape[1]
    tmp = pan[idx].argsort(kind="mergesort", axis=None)
    ranks = np.empty(l, dtype=np.int64)
    ranks[tmp] = np.arange(l)

    percentile = np.full(ranks.shape, np.nan)
    percentile[:l] = np.linspace(0, 1, l)
    percentile = percentile[ranks].reshape(pan[idx].shape)
    pan[idx] = 1.0 / (1.0 + np.exp(-10 * (percentile - threshold)))


def _binarize_pan(pan, threshold, bfs_indices, n_processed):
    """
    Binarize the pan matrix profile (inplace) such all values below the `threshold`
    are set to `0.0` and all values above the `threshold` are set to `1.0`.

    Parameters
    ----------
    pan : ndarray
        The pan matrix profile

    threshold : float
        The distance threshold value in which to center the pan matrix profile around

    bfs_indices : ndarray
        The breadth-first-search indices

    n_processed : ndarray
        The number of breadth-first-search indices to binarize

    Returns
    -------
    None
    """
    idx = bfs_indices[:n_processed]
    pan[idx] = np.where(pan[idx] <= threshold, 0.0, 1.0)


class _stimp:
    """
    Compute the Pan Matrix Profile

    This is based on the SKIMP algorithm.

    Parameters
    ----------
    T : ndarray
        The time series or sequence for which to compute the pan matrix profile

    m_start : int, default 3
        The starting (or minimum) subsequence window size for which a matrix profile
        may be computed

    m_stop : int, default None
        The stopping (or maximum) subsequence window size for which a matrix profile
        may be computed. When `m_stop = Non`, this is set to the maximum allowable
        subsequence window size

    m_step : int, default 1
        The step between subsequence window sizes

    percentage : float, default 0.01
        The percentage of the full matrix profile to compute for each subsequence
        window size. When `percentage < 1.0`, then the `scrump` algorithm is used.
        Otherwise, the `stump` algorithm is used when the exact matrix profile is
        requested.

    pre_scrump : bool, default True
        A flag for whether or not to perform the PreSCRIMP calculation prior to
        computing SCRIMP. If set to `True`, this is equivalent to computing
        SCRIMP++. This parameter is ignored when `percentage = 1.0`.

    dask_client : client, default None
        A Dask Distributed client that is connected to a Dask scheduler and
        Dask workers. Setting up a Dask distributed cluster is beyond the
        scope of this library. Please refer to the Dask Distributed
        documentation.

    device_id : int or list, default None
        The (GPU) device number to use. The default value is `0`. A list of
        valid device ids (int) may also be provided for parallel GPU-STUMP
        computation. A list of all valid device ids can be obtained by
        executing `[device.id for device in numba.cuda.list_devices()]`.

    mp_func : object, default stump
        The matrix profile function to use when `percentage = 1.0`

    Attributes
    ----------
    PAN_ : ndarray
        The transformed (i.e., normalized, contrasted, binarized, and repeated)
        pan matrix profile

    M_ : ndarray
        The full list of (breadth first search (level) ordered) subsequence window
        sizes

    Methods
    -------
    update():
        Compute the next matrix profile using the next available (breadth-first-search
        (level) ordered) subsequence window size and update the pan matrix profile

    Notes
    -----
    `DOI: 10.1109/ICBK.2019.00031 \
    <https://www.cs.ucr.edu/~eamonn/PAN_SKIMP%20%28Matrix%20Profile%20XX%29.pdf>`__

    See Table 2
    """

    def __init__(
        self,
        T,
        min_m=3,
        max_m=None,
        step=1,
        percentage=0.01,
        pre_scrump=True,
        dask_client=None,
        device_id=None,
        mp_func=stump,
    ):
        """
        Initialize the `stimp` object and compute the Pan Matrix Profile

        Parameters
        ----------
        T : ndarray
            The time series or sequence for which to compute the pan matrix profile

        min_m : int, default 3
            The minimum subsequence window size to consider computing a matrix profile
            for

        max_m : int, default None
            The maximum subsequence window size to consider computing a matrix profile
            for. When `max_m = None`, this is set to the maximum allowable subsequence
            window size

        step : int, default 1
            The step between subsequence window sizes

        percentage : float, default 0.01
            The percentage of the full matrix profile to compute for each subsequence
            window size. When `percentage < 1.0`, then the `scrump` algorithm is used.
            Otherwise, the `stump` algorithm is used when the exact matrix profile is
            requested.

        pre_scrump : bool, default True
            A flag for whether or not to perform the PreSCRIMP calculation prior to
            computing SCRIMP. If set to `True`, this is equivalent to computing
            SCRIMP++. This parameter is ignored when `percentage = 1.0`.

        dask_client : client, default None
            A Dask Distributed client that is connected to a Dask scheduler and
            Dask workers. Setting up a Dask distributed cluster is beyond the
            scope of this library. Please refer to the Dask Distributed
            documentation.

        device_id : int or list, default None
            The (GPU) device number to use. The default value is `0`. A list of
            valid device ids (int) may also be provided for parallel GPU-STUMP
            computation. A list of all valid device ids can be obtained by
            executing `[device.id for device in numba.cuda.list_devices()]`.

        mp_func : object, default stump
            The matrix profile function to use when `percentage = 1.0`
        """
        self._T = T
        if max_m is None:
            max_m = max(min_m + 1, core.get_max_window_size(self._T.shape[0]))
            M = np.arange(min_m, max_m + 1, step)
        else:
            min_m, max_m = sorted([min_m, max_m])
            M = np.arange(
                max(3, min_m),
                min(core.get_max_window_size(self._T.shape[0]), max_m) + 1,
                step,
            )
        self._bfs_indices = _bfs_indices(M.shape[0])
        self._M = M[self._bfs_indices]
        self._n_processed = 0
        percentage = np.clip(percentage, 0.0, 1.0)
        self._percentage = percentage
        self._pre_scrump = pre_scrump
        # self._normalize = normalize
        partial_mp_func = core._get_partial_mp_func(
            mp_func, dask_client=dask_client, device_id=device_id
        )
        self._mp_func = partial_mp_func

        self._PAN = np.full((self._M.shape[0], self._T.shape[0]), fill_value=np.inf)

    def update(self):
        """
        Update the pan matrix profile by computing a single matrix profile using the
        next available subsequence window size

        Notes
        -----
        `DOI: 10.1109/ICBK.2019.00031 \
        <https://www.cs.ucr.edu/~eamonn/PAN_SKIMP%20%28Matrix%20Profile%20XX%29.pdf>`__

        See Table 2
        """
        if self._n_processed < self._M.shape[0]:
            m = self._M[self._n_processed]
            if self._percentage < 1.0:
                approx = scrump(
                    self._T,
                    m,
                    ignore_trivial=True,
                    percentage=self._percentage,
                    pre_scrump=self._pre_scrump,
                    # normalize=self._normalize,
                )
                approx.update()
                self._PAN[
                    self._bfs_indices[self._n_processed], : approx.P_.shape[0]
                ] = approx.P_
            else:
                out = self._mp_func(
                    self._T,
                    m,
                    ignore_trivial=True,
                    # normalize=self._normalize
                )
                self._PAN[
                    self._bfs_indices[self._n_processed], : out[:, 0].shape[0]
                ] = out[:, 0]
            self._n_processed += 1

    def pan(self, threshold=0.2, normalize=True, contrast=True, binary=True, clip=True):
        """
        Generate a transformed (i.e., normalized, contrasted, binarized, and repeated)
        pan matrix profile

        Parameters
        ----------
        threshold : float, default 0.2
            The distance `threshold` in which to center the pan matrix profile around
            for best contrast and this value is also used for binarizing the pan matrix
            profile

        normalize : bool, default True
            A flag for whether or not each individual matrix profile within the pan
            matrix profile is normalized by its corresponding subsequence window size.
            If set to `True`, normalization is performed.

        contrast : bool, default True
            A flag for whether or not the pan matrix profile is centered around the
            desired `threshold` in order to provide higher contrast. If set to `True`,
            centering is performed.

        binary : bool, default True
            A flag for whether or not the pan matrix profile is binarized. If set to
            `True`, all values less than or equal to `threshold` are set to `0.0` while
            all other values are set to `1.0`.

        clip : bool, default True
            A flag for whether or not the pan matrix profile is clipped. If set to
            `True`, all values are ensured to be clipped between `0.0` and `1.0`.

        Returns
        -------
        None
        """
        PAN = self._PAN.copy()
        # Retrieve the row indices where the matrix profile was actually computed
        idx = self._bfs_indices[: self._n_processed]
        sorted_idx = np.sort(idx)
        PAN[PAN == np.inf] = np.nan

        if normalize:
            _normalize_pan(PAN, self._M, self._bfs_indices, self._n_processed)
        if contrast:
            _contrast_pan(PAN, threshold, self._bfs_indices, self._n_processed)
        if binary:
            _binarize_pan(PAN, threshold, self._bfs_indices, self._n_processed)
        if clip:
            PAN[idx] = np.clip(PAN[idx], 0.0, 1.0)

        # Below, for each matrix profile that was computed, we take that matrix profile
        # and copy/repeat it downwards to replace other rows in the `PAN` where the
        # matrix profile has yet to be computed. Instead of only having lines/values in
        # the rows where matrix profiles were computed, this gives us the "blocky" look
        nrepeat = np.diff(np.append(-1, sorted_idx))
        PAN[: np.sum(nrepeat)] = np.repeat(PAN[sorted_idx], nrepeat, axis=0)
        PAN[np.isnan(PAN)] = np.nanmax(PAN)

        return PAN

    @property
    def PAN_(self):
        """
        Get the transformed (i.e., normalized, contrasted, binarized, and repeated) pan
        matrix profile
        """
        return self.pan().astype(np.float64)

    @property
    def M_(self):
        """
        Get all of the (breadth first searched (level) ordered) subsequence window sizes
        """
        return self._M.astype(np.int64)

    # @property
    # def bfs_indices_(self):
    #     """
    #     Get the breadth first search (level order) indices
    #     """
    #     return self._bfs_indices.astype(np.int64)

    # @property
    # def n_processed_(self):  # pragma: no cover
    #     """
    #     Get the total number of windows that have been processed
    #     """
    #     return self._n_processed


[docs]class stimp(_stimp): """ Compute the Pan Matrix Profile This is based on the SKIMP algorithm. Parameters ---------- T : ndarray The time series or sequence for which to compute the pan matrix profile m_start : int, default 3 The starting (or minimum) subsequence window size for which a matrix profile may be computed m_stop : int, default None The stopping (or maximum) subsequence window size for which a matrix profile may be computed. When `m_stop = Non`, this is set to the maximum allowable subsequence window size m_step : int, default 1 The step between subsequence window sizes percentage : float, default 0.01 The percentage of the full matrix profile to compute for each subsequence window size. When `percentage < 1.0`, then the `scrump` algorithm is used. Otherwise, the `stump` algorithm is used when the exact matrix profile is requested. pre_scrump : bool, default True A flag for whether or not to perform the PreSCRIMP calculation prior to computing SCRIMP. If set to `True`, this is equivalent to computing SCRIMP++. This parameter is ignored when `percentage = 1.0`. Attributes ---------- PAN_ : ndarray The transformed (i.e., normalized, contrasted, binarized, and repeated) pan matrix profile M_ : ndarray The full list of (breadth first search (level) ordered) subsequence window sizes Methods ------- update(): Compute the next matrix profile using the next available (breadth-first-search (level) ordered) subsequence window size and update the pan matrix profile Notes ----- `DOI: 10.1109/ICBK.2019.00031 \ <https://www.cs.ucr.edu/~eamonn/PAN_SKIMP%20%28Matrix%20Profile%20XX%29.pdf>`__ See Table 2 """ def __init__( self, T, min_m=3, max_m=None, step=1, percentage=0.01, pre_scrump=True, # normalize=True, ): """ Initialize the `stimp` object and compute the Pan Matrix Profile Parameters ---------- T : ndarray The time series or sequence for which to compute the pan matrix profile min_m : int, default 3 The minimum subsequence window size to consider computing a matrix profile for max_m : int, default None The maximum subsequence window size to consider computing a matrix profile for. When `max_m = None`, this is set to the maximum allowable subsequence window size step : int, default 1 The step between subsequence window sizes percentage : float, default 0.01 The percentage of the full matrix profile to compute for each subsequence window size. When `percentage < 1.0`, then the `scrump` algorithm is used. Otherwise, the `stump` algorithm is used when the exact matrix profile is requested. pre_scrump : bool, default True A flag for whether or not to perform the PreSCRIMP calculation prior to computing SCRIMP. If set to `True`, this is equivalent to computing SCRIMP++. This parameter is ignored when `percentage = 1.0`. """ super().__init__( T, min_m=min_m, max_m=max_m, step=step, percentage=percentage, pre_scrump=pre_scrump, mp_func=stump, )
[docs]class stimped(_stimp): """ Compute the Pan Matrix Profile with a distributed dask cluster This is based on the SKIMP algorithm. Parameters ---------- dask_client : client A Dask Distributed client that is connected to a Dask scheduler and Dask workers. Setting up a Dask distributed cluster is beyond the scope of this library. Please refer to the Dask Distributed documentation. T : ndarray The time series or sequence for which to compute the pan matrix profile m_start : int, default 3 The starting (or minimum) subsequence window size for which a matrix profile may be computed m_stop : int, default None The stopping (or maximum) subsequence window size for which a matrix profile may be computed. When `m_stop = Non`, this is set to the maximum allowable subsequence window size m_step : int, default 1 The step between subsequence window sizes Attributes ---------- PAN_ : ndarray The transformed (i.e., normalized, contrasted, binarized, and repeated) pan matrix profile M_ : ndarray The full list of (breadth first search (level) ordered) subsequence window sizes Methods ------- update(): Compute the next matrix profile using the next available (breadth-first-search (level) ordered) subsequence window size and update the pan matrix profile Notes ----- `DOI: 10.1109/ICBK.2019.00031 \ <https://www.cs.ucr.edu/~eamonn/PAN_SKIMP%20%28Matrix%20Profile%20XX%29.pdf>`__ See Table 2 """ def __init__( self, dask_client, T, min_m=3, max_m=None, step=1, # normalize=True, ): """ Initialize the `stimp` object and compute the Pan Matrix Profile Parameters ---------- dask_client : client A Dask Distributed client that is connected to a Dask scheduler and Dask workers. Setting up a Dask distributed cluster is beyond the scope of this library. Please refer to the Dask Distributed documentation. T : ndarray The time series or sequence for which to compute the pan matrix profile min_m : int, default 3 The minimum subsequence window size to consider computing a matrix profile for max_m : int, default None The maximum subsequence window size to consider computing a matrix profile for. When `max_m = None`, this is set to the maximum allowable subsequence window size step : int, default 1 The step between subsequence window sizes """ super().__init__( T, min_m=min_m, max_m=max_m, step=step, percentage=1.0, pre_scrump=False, dask_client=dask_client, mp_func=stumped, )