# STUMPY
# Copyright 2019 TD Ameritrade. Released under the terms of the 3-Clause BSD license.
# STUMPY is a trademark of TD Ameritrade IP Company, Inc. All rights reserved.
import numba
import numpy as np
from numba import njit, prange
from . import config, core
from .scraamp import prescraamp, scraamp
from .stump import _stump
def _preprocess_prescrump(
T_A, m, T_B=None, s=None, T_A_subseq_isconstant=None, T_B_subseq_isconstant=None
):
"""
Performs several preprocessings and returns outputs that are needed for the
prescrump algorithm.
Parameters
----------
T_A : numpy.ndarray
The time series or sequence for which to compute the matrix profile
m : int
Window size
T_B : numpy.ndarray, default None
The time series or sequence that will be used to annotate T_A. For every
subsequence in T_A, its nearest neighbor in T_B will be recorded.
s : int, default None
The sampling interval that defaults to
`int(np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM))`
T_A_subseq_isconstant : numpy.ndarray or function, default None
A boolean array that indicates whether a subsequence in `T_A` is constant
(True). Alternatively, a custom, user-defined function that returns a
boolean array that indicates whether a subsequence in `T_A` is constant
(True). The function must only take two arguments, `a`, a 1-D array,
and `w`, the window size, while additional arguments may be specified
by currying the user-defined function using `functools.partial`. Any
subsequence with at least one np.nan/np.inf will automatically have its
corresponding value set to False in this boolean array.
T_B_subseq_isconstant : numpy.ndarray or function, default None
A boolean array that indicates whether a subsequence in `T_B` is constant
(True). Alternatively, a custom, user-defined function that returns a
boolean array that indicates whether a subsequence in `T_B` is constant
(True). The function must only take two arguments, `a`, a 1-D array,
and `w`, the window size, while additional arguments may be specified
by currying the user-defined function using `functools.partial`. Any
subsequence with at least one np.nan/np.inf will automatically have its
corresponding value set to False in this boolean array.
Returns
-------
T_A : numpy.ndarray
A copy of the time series input `T_A`, where all NaN and inf values
are replaced with zero.
T_B : numpy.ndarray
A copy of the time series input `T_B`, where all NaN and inf values
are replaced with zero. If the input `T_B` is not provided (default),
this array is just a copy of `T_A`.
μ_Q : numpy.ndarray
Sliding window mean for `T_A`
σ_Q : numpy.ndarray
Sliding window standard deviation for `T_A`
M_T : numpy.ndarray
Sliding window mean for `T_B`
Σ_T : numpy.ndarray
Sliding window standard deviation for `T_B`
Q_subseq_isconstant : numpy.ndarray
A boolean array that indicates whether a subsequence in `Q` is constant (True)
T_subseq_isconstant : numpy.ndarray
A boolean array that indicates whether a subsequence in `T` is constant (True)
indices : numpy.ndarray
The subsequence indices to compute `prescrump` for
s : int
The sampling interval that defaults to
`int(np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM))`
excl_zone : int
The half width for the exclusion zone
"""
if T_B is None:
T_B = T_A
T_B_subseq_isconstant = T_A_subseq_isconstant
excl_zone = int(np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM))
else:
excl_zone = None
T_A, μ_Q, σ_Q, Q_subseq_isconstant = core.preprocess(
T_A, m, T_subseq_isconstant=T_A_subseq_isconstant
)
T_B, M_T, Σ_T, T_subseq_isconstant = core.preprocess(
T_B, m, T_subseq_isconstant=T_B_subseq_isconstant
)
n_A = T_A.shape[0]
l = n_A - m + 1
if s is None: # pragma: no cover
if excl_zone is not None: # self-join
s = excl_zone
else: # AB-join
s = int(np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM))
indices = np.random.permutation(range(0, l, s)).astype(np.int64)
return (
T_A,
T_B,
μ_Q,
σ_Q,
M_T,
Σ_T,
Q_subseq_isconstant,
T_subseq_isconstant,
indices,
s,
excl_zone,
)
@njit(fastmath=True)
def _compute_PI(
T_A,
T_B,
m,
μ_Q,
σ_Q,
M_T,
Σ_T,
Q_subseq_isconstant,
T_subseq_isconstant,
indices,
start,
stop,
thread_idx,
s,
P_squared,
I,
excl_zone=None,
k=1,
):
"""
Compute (Numba JIT-compiled) and update the squared (top-k) matrix profile
distance and matrix profile indces according to the preSCRIMP algorithm.
Parameters
----------
T_A : numpy.ndarray
The time series or sequence for which to compute the matrix profile
T_B : numpy.ndarray
The time series or sequence that will be used to annotate T_A. For every
subsequence in T_A, its nearest neighbor in T_B will be recorded.
m : int
Window size
μ_Q : numpy.ndarray
Sliding window mean for `T_A`
σ_Q : numpy.ndarray
Sliding window standard deviation for `T_A`
M_T : numpy.ndarray
Sliding window mean for `T_B`
Σ_T : numpy.ndarray
Sliding window standard deviation for `T_B`
Q_subseq_isconstant : numpy.ndarray
A boolean array that indicates whether a subsequence in `T_A` is constant (True)
T_subseq_isconstant : numpy.ndarray
A boolean array that indicates whether a subsequence in `T_B` is constant (True)
indices : numpy.ndarray
The subsequence indices to compute `prescrump` for
start : int
The (inclusive) start index for `indices`
stop : int
The (exclusive) stop index for `indices`
thread_idx : int
The thread index
s : int
The sampling interval that defaults to
`int(np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM))`
P_squared : numpy.ndarray
The squared (top-k) matrix profile
I : numpy.ndarray
The (top-k) matrix profile indices
excl_zone : int
The half width for the exclusion zone relative to the `i`.
k : int, default 1
The number of top `k` smallest distances used to construct the matrix profile.
Note that this will increase the total computational time and memory usage
when k > 1.
Returns
-------
None
Notes
-----
`DOI: 10.1109/ICDM.2018.00099 \
<https://www.cs.ucr.edu/~eamonn/SCRIMP_ICDM_camera_ready_updated.pdf>`__
See Algorithm 2
"""
l = T_A.shape[0] - m + 1 # length of matrix profile
w = T_B.shape[0] - m + 1 # length of distance profile
squared_distance_profile = np.empty(w)
QT = np.empty(w, dtype=np.float64)
for i in indices[start:stop]:
Q = T_A[i : i + m]
QT[:] = core._sliding_dot_product(Q, T_B)
squared_distance_profile[:] = core._calculate_squared_distance_profile(
m,
QT,
μ_Q[i],
σ_Q[i],
M_T,
Σ_T,
Q_subseq_isconstant[i],
T_subseq_isconstant,
)
if excl_zone is not None:
core._apply_exclusion_zone(squared_distance_profile, i, excl_zone, np.inf)
nn_i = np.argmin(squared_distance_profile)
if (
squared_distance_profile[nn_i] < P_squared[thread_idx, i, -1]
and nn_i not in I[thread_idx, i]
):
idx = np.searchsorted(
P_squared[thread_idx, i],
squared_distance_profile[nn_i],
side="right",
)
core._shift_insert_at_index(
P_squared[thread_idx, i], idx, squared_distance_profile[nn_i]
)
core._shift_insert_at_index(I[thread_idx, i], idx, nn_i)
if P_squared[thread_idx, i, 0] == np.inf: # pragma: no cover
I[thread_idx, i, 0] = -1
continue
j = nn_i
QT_j = QT[j]
QT_j_prime = QT_j
# Update top-k for both subsequences `S[i+g] = T[i+g:i+g+m]`` and
# `S[j+g] = T[j+g:j+g+m]` (i.e., the right neighbors of `T[i : i+m]` and
# `T[j:j+m]`) by using the distance between `S[i+g]` and `S[j+g]`
for g in range(1, min(s, l - i, w - j)):
QT_j = (
QT_j
- T_B[j + g - 1] * T_A[i + g - 1]
+ T_B[j + g + m - 1] * T_A[i + g + m - 1]
)
D_squared = core._calculate_squared_distance(
m,
QT_j,
M_T[j + g],
Σ_T[j + g],
μ_Q[i + g],
σ_Q[i + g],
T_subseq_isconstant[j + g],
Q_subseq_isconstant[i + g],
)
if (
D_squared < P_squared[thread_idx, i + g, -1]
and (j + g) not in I[thread_idx, i + g]
):
idx = np.searchsorted(
P_squared[thread_idx, i + g], D_squared, side="right"
)
core._shift_insert_at_index(
P_squared[thread_idx, i + g], idx, D_squared
)
core._shift_insert_at_index(I[thread_idx, i + g], idx, j + g)
if (
excl_zone is not None
and D_squared < P_squared[thread_idx, j + g, -1]
and (i + g) not in I[thread_idx, j + g]
):
idx = np.searchsorted(
P_squared[thread_idx, j + g], D_squared, side="right"
)
core._shift_insert_at_index(
P_squared[thread_idx, j + g], idx, D_squared
)
core._shift_insert_at_index(I[thread_idx, j + g], idx, i + g)
QT_j = QT_j_prime
# Update top-k for both subsequences `S[i-g] = T[i-g:i-g+m]` and
# `S[j-g] = T[j-g:j-g+m]` (i.e., the left neighbors of `T[i : i+m]` and
# `T[j:j+m]`) by using the distance between `S[i-g]` and `S[j-g]`
for g in range(1, min(s, i + 1, j + 1)):
QT_j = QT_j - T_B[j - g + m] * T_A[i - g + m] + T_B[j - g] * T_A[i - g]
D_squared = core._calculate_squared_distance(
m,
QT_j,
M_T[j - g],
Σ_T[j - g],
μ_Q[i - g],
σ_Q[i - g],
T_subseq_isconstant[j - g],
Q_subseq_isconstant[i - g],
)
if (
D_squared < P_squared[thread_idx, i - g, -1]
and (j - g) not in I[thread_idx, i - g]
):
idx = np.searchsorted(
P_squared[thread_idx, i - g], D_squared, side="right"
)
core._shift_insert_at_index(
P_squared[thread_idx, i - g], idx, D_squared
)
core._shift_insert_at_index(I[thread_idx, i - g], idx, j - g)
if (
excl_zone is not None
and D_squared < P_squared[thread_idx, j - g, -1]
and (i - g) not in I[thread_idx, j - g]
):
idx = np.searchsorted(
P_squared[thread_idx, j - g], D_squared, side="right"
)
core._shift_insert_at_index(
P_squared[thread_idx, j - g], idx, D_squared
)
core._shift_insert_at_index(I[thread_idx, j - g], idx, i - g)
# In the case of a self-join, the calculated distance profile can also be
# used to refine the top-k for all non-trivial subsequences
if excl_zone is not None:
# Note that the squared distance, `squared_distance_profile[j]`,
# between subsequences `S_i = T[i : i + m]` and `S_j = T[j : j + m]`
# can be used to update the top-k for BOTH subsequence `i` and
# subsequence `j`. We update the latter here.
indices = np.flatnonzero(
squared_distance_profile < P_squared[thread_idx, :, -1]
)
for j in indices:
if i not in I[thread_idx, j]:
idx = np.searchsorted(
P_squared[thread_idx, j],
squared_distance_profile[j],
side="right",
)
core._shift_insert_at_index(
P_squared[thread_idx, j], idx, squared_distance_profile[j]
)
core._shift_insert_at_index(I[thread_idx, j], idx, i)
@njit(
# "(f8[:], f8[:], i8, f8[:], f8[:], f8[:], f8[:], f8[:], i8, i8, f8[:], f8[:],"
# "i8[:], optional(i8))",
parallel=True,
fastmath=True,
)
def _prescrump(
T_A,
T_B,
m,
μ_Q,
σ_Q,
M_T,
Σ_T,
Q_subseq_isconstant,
T_subseq_isconstant,
indices,
s,
excl_zone=None,
k=1,
):
"""
A Numba JIT-compiled implementation of the preSCRIMP algorithm.
Parameters
----------
T_A : numpy.ndarray
The time series or sequence for which to compute the matrix profile
T_B : numpy.ndarray
The time series or sequence that will be used to annotate T_A. For every
subsequence in T_A, its nearest neighbor in T_B will be recorded.
m : int
Window size
μ_Q : numpy.ndarray
Sliding window mean for `T_A`
σ_Q : numpy.ndarray
Sliding window standard deviation for `T_A`
M_T : numpy.ndarray
Sliding window mean for `T_B`
Σ_T : numpy.ndarray
Sliding window standard deviation for `T_B`
Q_subseq_isconstant : numpy.ndarray
A boolean array that indicates whether a subsequence in `T_A` is constant (True)
T_subseq_isconstant : numpy.ndarray
A boolean array that indicates whether a subsequence in `T_B` is constant (True)
indices : numpy.ndarray
The subsequence indices to compute `prescrump` for
s : int
The sampling interval that defaults to
`int(np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM))`
excl_zone : int
The half width for the exclusion zone relative to the `i`.
k : int, default 1
The number of top `k` smallest distances used to construct the matrix profile.
Note that this will increase the total computational time and memory usage
when k > 1.
Returns
-------
out1 : numpy.ndarray
The (top-k) matrix profile. When k=1 (default), the first (and only) column
in this 2D array consists of the matrix profile. When k > 1, the output
has exactly `k` columns consisting of the top-k matrix profile.
out2 : numpy.ndarray
The (top-k) matrix profile indices. When k=1 (default), the first (and only)
column in this 2D array consists of the matrix profile indices. When k > 1,
the output has exactly `k` columns consisting of the top-k matrix profile
indices.
Notes
-----
`DOI: 10.1109/ICDM.2018.00099 \
<https://www.cs.ucr.edu/~eamonn/SCRIMP_ICDM_camera_ready_updated.pdf>`__
See Algorithm 2
"""
n_threads = numba.config.NUMBA_NUM_THREADS
l = T_A.shape[0] - m + 1
P_squared = np.full((n_threads, l, k), np.inf, dtype=np.float64)
I = np.full((n_threads, l, k), -1, dtype=np.int64)
idx_ranges = core._get_ranges(len(indices), n_threads, truncate=False)
for thread_idx in prange(n_threads):
_compute_PI(
T_A,
T_B,
m,
μ_Q,
σ_Q,
M_T,
Σ_T,
Q_subseq_isconstant,
T_subseq_isconstant,
indices,
idx_ranges[thread_idx, 0],
idx_ranges[thread_idx, 1],
thread_idx,
s,
P_squared,
I,
excl_zone,
k,
)
for thread_idx in range(1, n_threads):
core._merge_topk_PI(P_squared[0], P_squared[thread_idx], I[0], I[thread_idx])
return np.sqrt(P_squared[0]), I[0]
@core.non_normalized(prescraamp)
def prescrump(
T_A,
m,
T_B=None,
s=None,
normalize=True,
p=2.0,
k=1,
T_A_subseq_isconstant=None,
T_B_subseq_isconstant=None,
):
"""
A convenience wrapper around the Numba JIT-compiled parallelized
`_prescrump` function which computes the approximate (top-k) matrix
profile according to the preSCRIMP algorithm.
Parameters
----------
T_A : numpy.ndarray
The time series or sequence for which to compute the matrix profile
m : int
Window size
T_B : numpy.ndarray, default None
The time series or sequence that will be used to annotate T_A. For every
subsequence in T_A, its nearest neighbor in T_B will be recorded.
s : int, default None
The sampling interval that defaults to
`int(np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM))`
normalize : bool, default True
When set to `True`, this z-normalizes subsequences prior to computing distances.
Otherwise, this function gets re-routed to its complementary non-normalized
equivalent set in the `@core.non_normalized` function decorator.
p : float, default 2.0
The p-norm to apply for computing the Minkowski distance. Minkowski distance is
typically used with `p` being 1 or 2, which correspond to the Manhattan distance
and the Euclidean distance, respectively. This parameter is ignored when
`normalize == True`.
k : int, default 1
The number of top `k` smallest distances used to construct the matrix profile.
Note that this will increase the total computational time and memory usage
when k > 1.
T_A_subseq_isconstant : numpy.ndarray or function, default None
A boolean array that indicates whether a subsequence in `T_A` is constant
(True). Alternatively, a custom, user-defined function that returns a
boolean array that indicates whether a subsequence in `T_A` is constant
(True). The function must only take two arguments, `a`, a 1-D array,
and `w`, the window size, while additional arguments may be specified
by currying the user-defined function using `functools.partial`. Any
subsequence with at least one np.nan/np.inf will automatically have its
corresponding value set to False in this boolean array.
T_B_subseq_isconstant : numpy.ndarray or function, default None
A boolean array that indicates whether a subsequence in `T_B` is constant
(True). Alternatively, a custom, user-defined function that returns a
boolean array that indicates whether a subsequence in `T_B` is constant
(True). The function must only take two arguments, `a`, a 1-D array,
and `w`, the window size, while additional arguments may be specified
by currying the user-defined function using `functools.partial`. Any
subsequence with at least one np.nan/np.inf will automatically have its
corresponding value set to False in this boolean array.
Returns
-------
P : numpy.ndarray
The (top-k) matrix profile. When k = 1 (default), this is a 1D array
consisting of the matrix profile. When k > 1, the output is a 2D array that
has exactly `k` columns consisting of the top-k matrix profile.
I : numpy.ndarray
The (top-k) matrix profile indices. When k = 1 (default), this is a 1D array
consisting of the matrix profile indices. When k > 1, the output is a 2D
array that has exactly `k` columns consisting of the top-k matrix profile
indices.
Notes
-----
`DOI: 10.1109/ICDM.2018.00099 \
<https://www.cs.ucr.edu/~eamonn/SCRIMP_ICDM_camera_ready_updated.pdf>`__
See Algorithm 2
"""
(
T_A,
T_B,
μ_Q,
σ_Q,
M_T,
Σ_T,
Q_subseq_isconstant,
T_subseq_isconstant,
indices,
s,
excl_zone,
) = _preprocess_prescrump(
T_A,
m,
T_B=T_B,
s=s,
T_A_subseq_isconstant=T_A_subseq_isconstant,
T_B_subseq_isconstant=T_B_subseq_isconstant,
)
P, I = _prescrump(
T_A,
T_B,
m,
μ_Q,
σ_Q,
M_T,
Σ_T,
Q_subseq_isconstant,
T_subseq_isconstant,
indices,
s,
excl_zone,
k,
)
if k == 1:
return P.flatten().astype(np.float64), I.flatten().astype(np.int64)
else:
return P, I
[docs]@core.non_normalized(
scraamp,
exclude=[
"normalize",
"pre_scrump",
"pre_scraamp",
"p",
"T_A_subseq_isconstant",
"T_B_subseq_isconstant",
],
replace={"pre_scrump": "pre_scraamp"},
)
class scrump:
"""
Compute an approximate z-normalized matrix profile
This is a convenience wrapper around the Numba JIT-compiled parallelized
`_stump` function which computes the matrix profile according to SCRIMP.
Parameters
----------
T_A : numpy.ndarray
The time series or sequence for which to compute the matrix profile
T_B : numpy.ndarray
The time series or sequence that will be used to annotate T_A. For every
subsequence in T_A, its nearest neighbor in T_B will be recorded.
m : int
Window size
ignore_trivial : bool
Set to `True` if this is a self-join. Otherwise, for AB-join, set this to
`False`. Default is `True`.
percentage : float
Approximate percentage completed. The value is between 0.0 and 1.0.
pre_scrump : bool
A flag for whether or not to perform the PreSCRIMP calculation prior to
computing SCRIMP. If set to `True`, this is equivalent to computing
SCRIMP++ and may lead to faster convergence
s : int
The size of the PreSCRIMP fixed interval. If `pre_scrump=True` and `s=None`,
then `s` will automatically be set to
`s=int(np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM))`, the size of the exclusion
zone.
normalize : bool, default True
When set to `True`, this z-normalizes subsequences prior to computing distances.
Otherwise, this class gets re-routed to its complementary non-normalized
equivalent set in the `@core.non_normalized` class decorator.
p : float, default 2.0
The p-norm to apply for computing the Minkowski distance. Minkowski distance is
typically used with `p` being 1 or 2, which correspond to the Manhattan distance
and the Euclidean distance, respectively. This parameter is ignored when
`normalize == True`.
k : int, default 1
The number of top `k` smallest distances used to construct the matrix profile.
Note that this will increase the total computational time and memory usage
when k > 1.
T_A_subseq_isconstant : numpy.ndarray or function, default None
A boolean array that indicates whether a subsequence in `T_A` is constant
(True). Alternatively, a custom, user-defined function that returns a
boolean array that indicates whether a subsequence in `T_A` is constant
(True). The function must only take two arguments, `a`, a 1-D array,
and `w`, the window size, while additional arguments may be specified
by currying the user-defined function using `functools.partial`. Any
subsequence with at least one np.nan/np.inf will automatically have its
corresponding value set to False in this boolean array.
T_B_subseq_isconstant : numpy.ndarray or function, default None
A boolean array that indicates whether a subsequence in `T_B` is constant
(True). Alternatively, a custom, user-defined function that returns a
boolean array that indicates whether a subsequence in `T_B` is constant
(True). The function must only take two arguments, `a`, a 1-D array,
and `w`, the window size, while additional arguments may be specified
by currying the user-defined function using `functools.partial`. Any
subsequence with at least one np.nan/np.inf will automatically have its
corresponding value set to False in this boolean array.
Attributes
----------
P_ : numpy.ndarray
The updated (top-k) matrix profile. When `k=1` (default), this output is
a 1D array consisting of the matrix profile. When `k > 1`, the output
is a 2D array that has exactly `k` columns consisting of the top-k matrix
profile.
I_ : numpy.ndarray
The updated (top-k) matrix profile indices. When `k=1` (default), this output is
a 1D array consisting of the matrix profile indices. When `k > 1`, the output
is a 2D array that has exactly `k` columns consisting of the top-k matrix
profile indiecs.
left_I_ : numpy.ndarray
The updated left (top-1) matrix profile indices
right_I_ : numpy.ndarray
The updated right (top-1) matrix profile indices
Methods
-------
update()
Update the matrix profile and the matrix profile indices by computing
additional new distances (limited by `percentage`) that make up the full
distance matrix. It updates the (top-k) matrix profile, (top-1) left
matrix profile, (top-1) right matrix profile, (top-k) matrix profile indices,
(top-1) left matrix profile indices, and (top-1) right matrix profile indices.
See Also
--------
stumpy.stump : Compute the z-normalized matrix profile
stumpy.stumped : Compute the z-normalized matrix profile with a distributed dask
cluster
stumpy.gpu_stump : Compute the z-normalized matrix profile with one or more GPU
devices
Notes
-----
`DOI: 10.1109/ICDM.2018.00099 \
<https://www.cs.ucr.edu/~eamonn/SCRIMP_ICDM_camera_ready_updated.pdf>`__
See Algorithm 1 and Algorithm 2
Examples
--------
>>> import stumpy
>>> import numpy as np
>>> approx_mp = stumpy.scrump(
... np.array([584., -11., 23., 79., 1001., 0., -19.]),
... m=3)
>>> approx_mp.update()
>>> approx_mp.P_
array([2.982409 , 3.28412702, inf, 2.982409 , 3.28412702])
>>> approx_mp.I_
array([ 3, 4, -1, 0, 1])
"""
def __init__(
self,
T_A,
m,
T_B=None,
ignore_trivial=True,
percentage=0.01,
pre_scrump=False,
s=None,
normalize=True,
p=2.0,
k=1,
T_A_subseq_isconstant=None,
T_B_subseq_isconstant=None,
):
"""
Initialize the `scrump` object
Parameters
----------
T_A : numpy.ndarray
The time series or sequence for which to compute the matrix profile
m : int
Window size
T_B : numpy.ndarray, default None
The time series or sequence that will be used to annotate T_A. For every
subsequence in T_A, its nearest neighbor in T_B will be recorded.
ignore_trivial : bool, default True
Set to `True` if this is a self-join. Otherwise, for AB-join, set this to
`False`. Default is `True`.
percentage : float, default 0.01
Approximate percentage completed. The value is between 0.0 and 1.0.
pre_scrump : bool, default False
A flag for whether or not to perform the PreSCRIMP calculation prior to
computing SCRIMP. If set to `True`, this is equivalent to computing
SCRIMP++
s : int, default None
The size of the PreSCRIMP fixed interval. If `pre_scrump=True` and `s=None`,
then `s` will automatically be set to
`s=int(np.ceil(m / config.STUMPY_EXCL_ZONE_DENOM))`, the size of the
exclusion zone.
normalize : bool, default True
When set to `True`, this z-normalizes subsequences prior to computing
distances. Otherwise, this class gets re-routed to its complementary
non-normalized equivalent set in the `@core.non_normalized` class decorator.
p : float, default 2.0
The p-norm to apply for computing the Minkowski distance. Minkowski distance
is typically used with `p` being 1 or 2, which correspond to the Manhattan
distance and the Euclidean distance, respectively.This parameter is ignored
when `normalize == True`.
k : int, default 1
The number of top `k` smallest distances used to construct the matrix
profile. Note that this will increase the total computational time and
memory usage when k > 1.
T_A_subseq_isconstant : numpy.ndarray or function, default None
A boolean array that indicates whether a subsequence in `T_A` is constant
(True). Alternatively, a custom, user-defined function that returns a
boolean array that indicates whether a subsequence in `T_A` is constant
(True). The function must only take two arguments, `a`, a 1-D array,
and `w`, the window size, while additional arguments may be specified
by currying the user-defined function using `functools.partial`. Any
subsequence with at least one np.nan/np.inf will automatically have its
corresponding value set to False in this boolean array.
T_B_subseq_isconstant : numpy.ndarray or function, default None
A boolean array that indicates whether a subsequence in `T_B` is constant
(True). Alternatively, a custom, user-defined function that returns a
boolean array that indicates whether a subsequence in `T_B` is constant
(True). The function must only take two arguments, `a`, a 1-D array,
and `w`, the window size, while additional arguments may be specified
by currying the user-defined function using `functools.partial`. Any
subsequence with at least one np.nan/np.inf will automatically have its
corresponding value set to False in this boolean array.
"""
self._ignore_trivial = ignore_trivial
if T_B is None:
T_B = T_A
self._ignore_trivial = True
T_B_subseq_isconstant = T_A_subseq_isconstant
self._m = m
(
self._T_A,
self._μ_Q,
self._σ_Q_inverse,
self._μ_Q_m_1,
self._Q_subseq_isfinite,
self._Q_subseq_isconstant,
) = core.preprocess_diagonal(
T_A, self._m, T_subseq_isconstant=T_A_subseq_isconstant
)
(
self._T_B,
self._M_T,
self._Σ_T_inverse,
self._M_T_m_1,
self._T_subseq_isfinite,
self._T_subseq_isconstant,
) = core.preprocess_diagonal(
T_B, self._m, T_subseq_isconstant=T_B_subseq_isconstant
)
if self._T_A.ndim != 1: # pragma: no cover
raise ValueError(
f"T_A is {self._T_A.ndim}-dimensional and must be 1-dimensional. "
"For multidimensional STUMP use `stumpy.mstump` or `stumpy.mstumped`"
)
if self._T_B.ndim != 1: # pragma: no cover
raise ValueError(
f"T_B is {self._T_B.ndim}-dimensional and must be 1-dimensional. "
"For multidimensional STUMP use `stumpy.mstump` or `stumpy.mstumped`"
)
core.check_window_size(m, max_size=min(T_A.shape[0], T_B.shape[0]))
self._ignore_trivial = core.check_ignore_trivial(
self._T_A, self._T_B, self._ignore_trivial
)
self._n_A = self._T_A.shape[0]
self._n_B = self._T_B.shape[0]
self._l = self._n_A - self._m + 1
self._k = k
self._P = np.full((self._l, self._k), np.inf, dtype=np.float64)
self._PL = np.full(self._l, np.inf, dtype=np.float64)
self._PR = np.full(self._l, np.inf, dtype=np.float64)
self._I = np.full((self._l, self._k), -1, dtype=np.int64)
self._IL = np.full(self._l, -1, dtype=np.int64)
self._IR = np.full(self._l, -1, dtype=np.int64)
self._excl_zone = int(np.ceil(self._m / config.STUMPY_EXCL_ZONE_DENOM))
if s is None:
s = self._excl_zone
if pre_scrump:
if self._ignore_trivial:
(
T_A,
T_B,
μ_Q,
σ_Q,
M_T,
Σ_T,
Q_subseq_isconstant,
T_subseq_isconstant,
indices,
s,
excl_zone,
) = _preprocess_prescrump(
T_A, m, s=s, T_A_subseq_isconstant=T_A_subseq_isconstant
)
else:
(
T_A,
T_B,
μ_Q,
σ_Q,
M_T,
Σ_T,
Q_subseq_isconstant,
T_subseq_isconstant,
indices,
s,
excl_zone,
) = _preprocess_prescrump(
T_A,
m,
T_B=T_B,
s=s,
T_A_subseq_isconstant=T_A_subseq_isconstant,
T_B_subseq_isconstant=T_B_subseq_isconstant,
)
P, I = _prescrump(
T_A,
T_B,
m,
μ_Q,
σ_Q,
M_T,
Σ_T,
Q_subseq_isconstant,
T_subseq_isconstant,
indices,
s,
excl_zone,
k,
)
core._merge_topk_PI(self._P, P, self._I, I)
if self._ignore_trivial:
self._diags = np.random.permutation(
range(self._excl_zone + 1, self._n_A - self._m + 1)
).astype(np.int64)
if self._diags.shape[0] == 0: # pragma: no cover
max_m = core.get_max_window_size(self._T_A.shape[0])
raise ValueError(
f"The window size, `m = {self._m}`, is too long for a self join. "
f"Please try a value of `m <= {max_m}`"
)
else:
self._diags = np.random.permutation(
range(-(self._n_A - self._m + 1) + 1, self._n_B - self._m + 1)
).astype(np.int64)
self._n_threads = numba.config.NUMBA_NUM_THREADS
self._percentage = np.clip(percentage, 0.0, 1.0)
self._n_chunks = int(np.ceil(1.0 / percentage))
self._ndist_counts = core._count_diagonal_ndist(
self._diags, self._m, self._n_A, self._n_B
)
self._chunk_diags_ranges = core._get_array_ranges(
self._ndist_counts, self._n_chunks, True
)
self._n_chunks = self._chunk_diags_ranges.shape[0]
self._chunk_idx = 0
def update(self):
"""
Update the (top-k) matrix profile and the (top-k) matrix profile indices by
computing additional new distances (limited by `percentage`) that make up
the full distance matrix.
Parameters
----------
None
Returns
-------
None
"""
if self._chunk_idx < self._n_chunks:
start_idx, stop_idx = self._chunk_diags_ranges[self._chunk_idx]
P, PL, PR, I, IL, IR = _stump(
self._T_A,
self._T_B,
self._m,
self._M_T,
self._μ_Q,
self._Σ_T_inverse,
self._σ_Q_inverse,
self._M_T_m_1,
self._μ_Q_m_1,
self._Q_subseq_isfinite,
self._T_subseq_isfinite,
self._Q_subseq_isconstant,
self._T_subseq_isconstant,
self._diags[start_idx:stop_idx],
self._ignore_trivial,
self._k,
)
# Update (top-k) matrix profile and indices
core._merge_topk_PI(self._P, P, self._I, I)
# update left matrix profile and indices
mask = PL < self._PL
self._PL[mask] = PL[mask]
self._IL[mask] = IL[mask]
# update right matrix profile and indices
mask = PR < self._PR
self._PR[mask] = PR[mask]
self._IR[mask] = IR[mask]
self._chunk_idx += 1
@property
def P_(self):
"""
Get the updated (top-k) matrix profile. When `k=1` (default), this output
is a 1D array consisting of the updated matrix profile. When `k > 1`, the
output is a 2D array that has exactly `k` columns consisting of the updated
top-k matrix profile.
Parameters
----------
None
Returns
-------
None
"""
if self._k == 1:
return self._P.flatten().astype(np.float64)
else:
return self._P.astype(np.float64)
@property
def I_(self):
"""
Get the updated (top-k) matrix profile indices. When `k=1` (default), this
output is a 1D array consisting of the updated matrix profile indices. When
`k > 1`, the output is a 2D array that has exactly `k` columns consisting
of the updated top-k matrix profile indices.
Parameters
----------
None
Returns
-------
None
"""
if self._k == 1:
return self._I.flatten().astype(np.int64)
else:
return self._I.astype(np.int64)
@property
def left_I_(self):
"""
Get the updated left (top-1) matrix profile indices
Parameters
----------
None
Returns
-------
None
"""
return self._IL.astype(np.int64)
@property
def right_I_(self):
"""
Get the updated right (top-1) matrix profile indices
Parameters
----------
None
Returns
-------
None
"""
return self._IR.astype(np.int64)