# Source code for stumpy.motifs

# STUMPY

import logging

import numpy as np
from .aamp_motifs import aamp_motifs, aamp_match

from . import core, config

logger = logging.getLogger(__name__)

def _motifs(
T,
P,
M_T,
Σ_T,
excl_zone,
min_neighbors,
max_distance,
cutoff,
max_matches,
max_motifs,
):
"""
Find the top motifs for time series T.

A subsequence, Q, becomes a candidate motif if there are at least min_neighbor
number of other subsequence matches in T (outside the exclusion zone) with a
distance less or equal to max_distance.

Parameters
----------
T : ndarray
The time series or sequence

P : ndarray
Matrix Profile of time series, T

M_T : ndarray
Sliding mean of time series, T

Σ_T : ndarray
Sliding standard deviation of time series, T

excl_zone : int
Size of the exclusion zone

min_neighbors : int
The minimum number of similar matches a subsequence needs to have in order
to be considered a motif.

max_distance : float or function
For a candidate motif, Q, and a non-trivial subsequence, S, max_distance
is the maximum distance allowed between Q and S so that S is considered
a match of Q. If max_distance is a function, then it must be a function
that accepts a single parameter, D, in its function signature, which is the
distance profile between Q and T.

cutoff : float
The largest matrix profile value (distance) that a candidate motif is allowed
to have.

max_matches : int
The maximum number of similar matches to be returned. The resulting
matches are sorted by distance (starting with the most similar). Note that
the first match is always the self-match/trivial-match for each motif.

max_motifs : int
The maximum number of motifs to return.

normalize : bool
When set to True, this z-normalizes subsequences prior to computing distances.
Otherwise, this function gets re-routed to its complementary non-normalized
equivalent set in the @core.non_normalized function decorator.

Return
------
motif_distances : ndarray
The distances corresponding to a set of subsequence matches for each motif.
Note that the first column always corresponds to the distance for the
self-match/trivial-match for each motif.

motif_indices : ndarray
The indices corresponding to a set of subsequences matches for each motif.
Note that the first column always corresponds to the index for the
self-match/trivial-match for each motif.
"""
n = T.shape[1]
l = P.shape[1]
m = n - l + 1

motif_indices = []
motif_distances = []

candidate_idx = np.argmin(P[-1])
while len(motif_indices) < max_motifs:
profile_value = P[-1, candidate_idx]
if profile_value > cutoff:  # pragma: no cover
break

# If max_distance is a constant (independent of the distance profile D of Q
# and T), then we can stop the iteration if the matrix profile value of Q is
# larger than the maximum distance.
if (
isinstance(max_distance, float) and profile_value > max_distance
):  # pragma: no cover
break

Q = T[:, candidate_idx : candidate_idx + m]

query_matches = match(
Q,
T,
M_T=M_T,
Σ_T=Σ_T,
max_matches=None,
max_distance=max_distance,
)

if len(query_matches) > min_neighbors:
motif_distances.append(query_matches[:max_matches, 0])
motif_indices.append(query_matches[:max_matches, 1])

for idx in query_matches[:, 1]:
core.apply_exclusion_zone(P, int(idx), excl_zone)

candidate_idx = np.argmin(P[-1])

motif_distances = core._jagged_list_to_array(
motif_distances, fill_value=np.nan, dtype="float64"
)
motif_indices = core._jagged_list_to_array(
motif_indices, fill_value=-1, dtype="int64"
)

return motif_distances, motif_indices

[docs]@core.non_normalized(aamp_motifs)
def motifs(
T,
P,
min_neighbors=1,
max_distance=None,
cutoff=None,
max_matches=10,
max_motifs=1,
normalize=True,
):
"""
Discover the top motifs for time series T

A subsequence, Q, becomes a candidate motif if there are at least min_neighbor
number of other subsequence matches in T (outside the exclusion zone) with a
distance less or equal to max_distance.

Parameters
----------
T : ndarray
The time series or sequence

P : ndarray
Matrix Profile of T

min_neighbors : int, default 1
The minimum number of similar matches a subsequence needs to have in order
to be considered a motif. This defaults to 1, which means that a subsequence
must have at least one similar match in order to be considered a motif.

max_distance : float or function, default None
For a candidate motif, Q, and a non-trivial subsequence, S, max_distance
is the maximum distance allowed between Q and S so that S is considered
a match of Q. If max_distance is a function, then it must be a function
that accepts a single parameter, D, in its function signature, which is the
distance profile between Q and T. If None, this defaults to
np.nanmax([np.nanmean(D) - 2.0 * np.nanstd(D), np.nanmin(D)]).

cutoff : float, default None
The largest matrix profile value (distance) that a candidate motif is allowed
to have. If None, this defaults to
np.nanmax([np.nanmean(P) - 2.0 * np.nanstd(P), np.nanmin(P)])

max_matches : int, default 10
The maximum amount of similar matches of a motif representative to be returned.
The resulting matches are sorted by distance, so a value of 10 means that the
indices of the most similar 10 subsequences is returned.
If None, all matches within max_distance of the motif representative
will be returned. Note that the first match is always the
self-match/trivial-match for each motif.

max_motifs : int, default 1
The maximum number of motifs to return

normalize : bool, default True
When set to True, this z-normalizes subsequences prior to computing distances.
Otherwise, this function gets re-routed to its complementary non-normalized
equivalent set in the @core.non_normalized function decorator.

Return
------
motif_distances : ndarray
The distances corresponding to a set of subsequence matches for each motif.
Note that the first column always corresponds to the distance for the
self-match/trivial-match for each motif.

motif_indices : ndarray
The indices corresponding to a set of subsequences matches for each motif.
Note that the first column always corresponds to the index for the
self-match/trivial-match for each motif.
"""
if max_motifs < 1:  # pragma: no cover
logger.warn(
"The maximum number of motifs, max_motifs, "
"must be greater than or equal to 1"
)
logger.warn("max_motifs has been set to 1")
max_motifs = 1

if T.ndim != 1:  # pragma: no cover
raise ValueError(
f"T is {T.ndim}-dimensional and must be 1-dimensional. "
"Multidimensional motif discovery is not yet supported."
)

if P.ndim != 1:  # pragma: no cover
raise ValueError(
f"T is {P.ndim}-dimensional and must be 1-dimensional. "
"Multidimensional motif discovery is not yet supported."
)

m = T.shape[-1] - P.shape[-1] + 1
excl_zone = int(np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM))
if max_matches is None:  # pragma: no cover
max_matches = np.inf
if cutoff is None:  # pragma: no cover
P_copy = P.copy().astype(np.float64)
P_copy[np.isinf(P_copy)] = np.nan
cutoff = np.nanmax(
[np.nanmean(P_copy) - 2.0 * np.nanstd(P_copy), np.nanmin(P_copy)]
)

T, M_T, Σ_T = core.preprocess(T[np.newaxis, :], m)
P = P[np.newaxis, :].astype(np.float64)

motif_distances, motif_indices = _motifs(
T,
P,
M_T,
Σ_T,
excl_zone,
min_neighbors,
max_distance,
cutoff,
max_matches,
max_motifs,
)

return motif_distances, motif_indices

[docs]@core.non_normalized(
aamp_match,
exclude=["normalize", "M_T", "Σ_T", "T_subseq_isfinite", "T_squared"],
replace={"M_T": "T_subseq_isfinite", "Σ_T": "T_squared"},
)
def match(
Q,
T,
M_T=None,
Σ_T=None,
max_distance=None,
max_matches=None,
normalize=True,
):
"""
Find all matches of a query Q in a time series T

The indices of subsequences whose distances to Q are less than or equal to
max_distance, sorted by distance (lowest to highest). Around each occurrence an
exclusion zone is applied before searching for the next.

Parameters
----------
Q : ndarray
The query sequence. It doesn't have to be a subsequence of T

T : ndarray
The time series of interest

M_T : ndarray, default None
Sliding mean of time series, T

Σ_T : ndarray, default None
Sliding standard deviation of time series, T

max_distance : float or function, default None
Maximum distance between Q and a subsequence S for S to be considered a
match.
If a function, then it has to be a function of one argument D, which will be
the distance profile of Q with T (a 1D numpy array of size n-m+1).
If None, this defaults to
np.nanmax([np.nanmean(D) - 2 * np.nanstd(D), np.nanmin(D)]) (i.e. at
least the closest match will be returned).

max_matches : int, default None
The maximum amount of similar occurrences to be returned. The resulting
occurrences are sorted by distance, so a value of 10 means that the
indices of the most similar 10 subsequences is returned. If None, then all
occurrences are returned.

normalize : bool, default True
When set to True, this z-normalizes subsequences prior to computing distances.
Otherwise, this function gets re-routed to its complementary non-normalized
equivalent set in the @core.non_normalized function decorator.

Returns
-------
out : ndarray
The first column consists of distances of subsequences of T whose distances
to Q are smaller than max_distance, sorted by distance (lowest to highest).
The second column consists of the corresponding indices in T.
"""
Q = np.asarray(Q)
T = np.asarray(T)

if len(Q.shape) == 1:
Q = Q[np.newaxis, :]
if len(T.shape) == 1:
T = T[np.newaxis, :]

d, n = T.shape
m = Q.shape[1]

excl_zone = int(np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM))
if max_matches is None:  # pragma: no cover
max_matches = np.inf

if np.any(np.isnan(Q)) or np.any(np.isinf(Q)):  # pragma: no cover
raise ValueError("Q contains illegal values (NaN or inf)")

if max_distance is None:  # pragma: no cover

def max_distance(D):
D_copy = D.copy().astype(np.float64)
D_copy[np.isinf(D_copy)] = np.nan
return np.nanmax(
[np.nanmean(D_copy) - 2.0 * np.nanstd(D_copy), np.nanmin(D_copy)]
)

if M_T is None or Σ_T is None:  # pragma: no cover
T, M_T, Σ_T = core.preprocess(T, m)

D = [core.mass(Q[i], T[i], M_T[i], Σ_T[i]) for i in range(d)]

D = np.sum(D, axis=0) / d
if not isinstance(max_distance, float):
max_distance = max_distance(D)

matches = []

candidate_idx = np.argmin(D)
while D[candidate_idx] <= max_distance and len(matches) < max_matches:
matches.append([D[candidate_idx], candidate_idx])
core.apply_exclusion_zone(D, candidate_idx, excl_zone)
candidate_idx = np.argmin(D)

out = np.array(matches, dtype=object)

return out