Source code for stumpy.motifs

# STUMPY
# Copyright 2019 TD Ameritrade. Released under the terms of the 3-Clause BSD license.
# STUMPY is a trademark of TD Ameritrade IP Company, Inc. All rights reserved.

import logging

import numpy as np
from .aamp_motifs import aamp_motifs, aamp_match

from . import core
from .config import STUMPY_EXCL_ZONE_DENOM

logger = logging.getLogger(__name__)


def _motifs(
    T,
    P,
    M_T,
    Σ_T,
    excl_zone,
    min_neighbors,
    max_distance,
    max_matches,
    max_motifs,
):
    """
    Find the top motifs for time series `T`.

    A subsequence, `Q`, becomes a candidate motif if there are at least `min_neighbor`
    number of other subsequence matches in `T` (outside the exclusion zone) with a
    distance less or equal to `max_distance`.

    Parameters
    ----------
    T : ndarray
        The time series or sequence

    P : ndarray
        Matrix Profile of time series, `T`

    M_T : ndarray
        Sliding mean of time series, `T`

    Σ_T : ndarray
        Sliding standard deviation of time series, `T`

    excl_zone : int
        Size of the exclusion zone

    min_neighbors : int
        The minimum number of similar matches a subsequence needs to have in order
        to be considered a motif.

    max_distance : float or function
        For a candidate motif, `Q`, and a non-trivial subsequence, `S`, `max_distance`
        is the maximum distance allowed between `Q` and `S` so that `S` is considered
        a match of `Q`. If `max_distance` is a function, then it must be a function
        that accepts a single parameter, `D`, in its function signature, which is the
        distance profile between `Q` and `T`.

    max_matches : int
        The maximum number of similar matches to be returned. The resulting
        matches are sorted by distance (starting with the most similar).

    max_motifs : int
        The maximum number of motifs to return.

    normalize : bool
        When set to `True`, this z-normalizes subsequences prior to computing distances.
        Otherwise, this function gets re-routed to its complementary non-normalized
        equivalent set in the `@core.non_normalized` function decorator.

    Return
    ------
    motif_distances : ndarray
        The distances corresponding to a set of subsequence matches for each motif

    motif_indices : ndarray
        The indices corresponding to a set of subsequences matches for each motif
    """
    n = T.shape[1]
    l = P.shape[1]
    m = n - l + 1

    motif_indices = []
    motif_distances = []

    candidate_idx = np.argmin(P[-1])
    while len(motif_indices) < max_motifs:
        profile_value = P[-1, candidate_idx]
        if np.isinf(profile_value):  # pragma: no cover
            break

        # If max_distance is a constant (independent of the distance profile D of Q
        # and T), then we can stop the iteration if the matrix profile value of Q is
        # larger than the maximum distance.
        if (
            isinstance(max_distance, float) and profile_value > max_distance
        ):  # pragma: no cover
            break

        Q = T[:, candidate_idx : candidate_idx + m]

        query_matches = match(
            Q,
            T,
            M_T=M_T,
            Σ_T=Σ_T,
            excl_zone=excl_zone,
            max_matches=None,
            max_distance=max_distance,
        )

        if len(query_matches) > min_neighbors:
            motif_distances.append(query_matches[:max_matches, 0])
            motif_indices.append(query_matches[:max_matches, 1])

        for idx in query_matches[:, 1]:
            core.apply_exclusion_zone(P, int(idx), excl_zone)

        candidate_idx = np.argmin(P[-1])

    motif_distances = core._jagged_list_to_array(
        motif_distances, fill_value=np.nan, dtype="float64"
    )
    motif_indices = core._jagged_list_to_array(
        motif_indices, fill_value=-1, dtype="int64"
    )

    return motif_distances, motif_indices


[docs]@core.non_normalized(aamp_motifs) def motifs( T, P, excl_zone=None, min_neighbors=1, max_distance=None, cutoff=None, max_matches=10, max_motifs=1, normalize=True, ): """ Discover the top motifs for time series `T` A subsequence, `Q`, becomes a candidate motif if there are at least `min_neighbor` number of other subsequence matches in `T` (outside the exclusion zone) with a distance less or equal to `max_distance`. Parameters ---------- T : ndarray The time series or sequence P : ndarray Matrix Profile of `T` excl_zone : int, default None Size of the exclusion zone. If `None`, this defaults to `m/4`, where `m` is the window size (inferred from the time series and matrix profile lengths). min_neighbors : int, default 1 The minimum number of similar matches a subsequence needs to have in order to be considered a motif. This defaults to `1`, which means that a subsequence must have at least one similar match in order to be considered a motif. max_distance : float or function, default None For a candidate motif, `Q`, and a non-trivial subsequence, `S`, `max_distance` is the maximum distance allowed between `Q` and `S` so that `S` is considered a match of `Q`. If `max_distance` is a function, then it must be a function that accepts a single parameter, `D`, in its function signature, which is the distance profile between `Q` and `T`. If None, this defaults to `max(np.mean(D) - 2 * np.std(D), np.min(D))`. cutoff : float, default None The largest matrix profile value (distance) that a candidate motif is allowed to have. If `None`, this defaults to `max(np.mean(P) - 2 * np.std(P), np.min(P))` max_matches : int, default 10 The maximum amount of similar matches of a motif representative to be returned. The resulting matches are sorted by distance, so a value of `10` means that the indices of the most similar `10` subsequences is returned. If `None`, all matches within `max_distance` of the motif representative will be returned. max_motifs : int, default 1 The maximum number of similar matches to be returned. The resulting matches are sorted by distance (starting with the most similar). normalize : bool, default True When set to `True`, this z-normalizes subsequences prior to computing distances. Otherwise, this function gets re-routed to its complementary non-normalized equivalent set in the `@core.non_normalized` function decorator. Return ------ motif_distances : ndarray The distances corresponding to a set of subsequence matches for each motif motif_indices : ndarray The indices corresponding to a set of subsequences matches for each motif """ if max_motifs < 1: # pragma: no cover logger.warn( "The maximum number of motifs, `max_motifs`, " "must be greater than or equal to 1" ) logger.warn("`max_motifs` has been set to `1`") max_motifs = 1 if T.ndim != 1: # pragma: no cover raise ValueError( f"T is {T.ndim}-dimensional and must be 1-dimensional. " "Multidimensional motif discovery is not yet supported." ) if P.ndim != 1: # pragma: no cover raise ValueError( f"T is {P.ndim}-dimensional and must be 1-dimensional. " "Multidimensional motif discovery is not yet supported." ) m = T.shape[-1] - P.shape[-1] + 1 if excl_zone is None: # pragma: no cover excl_zone = int(np.ceil(m / STUMPY_EXCL_ZONE_DENOM)) if max_matches is None: # pragma: no cover max_matches = np.inf if cutoff is None: # pragma: no cover cutoff = max(np.mean(P) - 2 * np.std(P), np.min(P)) T, M_T, Σ_T = core.preprocess(T[np.newaxis, :], m) P = P[np.newaxis, :].astype(np.float64) motif_distances, motif_indices = _motifs( T, P, M_T, Σ_T, excl_zone, min_neighbors, max_distance, max_matches, max_motifs, ) return motif_distances, motif_indices
[docs]@core.non_normalized( aamp_match, exclude=["normalize", "M_T", "Σ_T", "T_subseq_isfinite", "T_squared"], replace={"M_T": "T_subseq_isfinite", "Σ_T": "T_squared"}, ) def match( Q, T, M_T=None, Σ_T=None, excl_zone=None, max_distance=None, max_matches=None, normalize=True, ): """ Find all matches of a query `Q` in a time series `T` The indices of subsequences whose distances to `Q` are less than or equal to `max_distance`, sorted by distance (lowest to highest). Around each occurrence an exclusion zone is applied before searching for the next. Parameters ---------- Q : ndarray The query sequence. It doesn't have to be a subsequence of `T` T : ndarray The time series of interest M_T : ndarray, default None Sliding mean of time series, `T` Σ_T : ndarray, default None Sliding standard deviation of time series, `T` excl_zone : int, default None Size of the exclusion zone. If `None`, defaults to `m/4`, where `m` is the length of `Q`. max_distance : float or function, default None Maximum distance between `Q` and a subsequence `S` for `S` to be considered a match. If a function, then it has to be a function of one argument `D`, which will be the distance profile of `Q` with `T` (a 1D numpy array of size `n-m+1`). If None, defaults to `max(np.mean(D) - 2 * np.std(D), np.min(D))`, i.e. at least the closest match will be returned. max_matches : int, default None The maximum amount of similar occurrences to be returned. The resulting occurrences are sorted by distance, so a value of `10` means that the indices of the most similar `10` subsequences is returned. If `None`, then all occurrences are returned. normalize : bool, default True When set to `True`, this z-normalizes subsequences prior to computing distances. Otherwise, this function gets re-routed to its complementary non-normalized equivalent set in the `@core.non_normalized` function decorator. Returns ------- out : ndarray The first column consists of distances of subsequences of `T` whose distances to `Q` are smaller than `max_distance`, sorted by distance (lowest to highest). The second column consist of the corresponding indices in `T`. """ if len(Q.shape) == 1: Q = Q[np.newaxis, :] if len(T.shape) == 1: T = T[np.newaxis, :] d, n = T.shape m = Q.shape[1] if excl_zone is None: # pragma: no cover excl_zone = int(np.ceil(m / STUMPY_EXCL_ZONE_DENOM)) if max_matches is None: # pragma: no cover max_matches = np.inf if np.any(np.isnan(Q)) or np.any(np.isinf(Q)): # pragma: no cover raise ValueError("Q contains illegal values (NaN or inf)") if max_distance is None: # pragma: no cover def max_distance(D): return max(np.mean(D) - 2 * np.std(D), np.min(D)) if M_T is None or Σ_T is None: # pragma: no cover T, M_T, Σ_T = core.preprocess(T, m) D = [core.mass(Q[i], T[i], M_T[i], Σ_T[i]) for i in range(d)] D = np.sum(D, axis=0) / d if not isinstance(max_distance, float): max_distance = max_distance(D) matches = [] candidate_idx = np.argmin(D) while D[candidate_idx] <= max_distance and len(matches) < max_matches: matches.append([D[candidate_idx], candidate_idx]) core.apply_exclusion_zone(D, candidate_idx, excl_zone) candidate_idx = np.argmin(D) out = np.array(matches, dtype=object) return out