# STUMPY
# Copyright 2019 TD Ameritrade. Released under the terms of the 3-Clause BSD license. # noqa: E501
# STUMPY is a trademark of TD Ameritrade IP Company, Inc. All rights reserved.
import functools
import inspect
import math
import tempfile
import warnings
import numpy as np
from numba import cuda, njit, prange
from scipy import linalg
from scipy.ndimage import maximum_filter1d, minimum_filter1d
from scipy.signal import convolve
from scipy.spatial.distance import cdist
from . import config
try:
from numba.cuda.cudadrv.driver import _raise_driver_not_found
except ImportError:
pass
def _compare_parameters(norm, non_norm, exclude=None):
"""
Compare if the parameters in `norm` and `non_norm` are the same
Parameters
----------
norm : function
The normalized function (or class) that is complementary to the
non-normalized function (or class)
non_norm : function
The non-normalized function (or class) that is complementary to the
z-normalized function (or class)
exclude : list
A list of parameters to exclude for the comparison
Returns
-------
is_same_params : bool
`True` if parameters from both `norm` and `non-norm` are the same. `False`
otherwise.
"""
norm_params = list(inspect.signature(norm).parameters.keys())
non_norm_params = list(inspect.signature(non_norm).parameters.keys())
if exclude is not None:
for param in exclude:
if param in norm_params:
norm_params.remove(param)
if param in non_norm_params:
non_norm_params.remove(param)
is_same_params = set(norm_params) == set(non_norm_params)
if not is_same_params:
msg = ""
if exclude is not None or (isinstance(exclude, list) and len(exclude)):
msg += f"Excluding `{exclude}` parameters, "
msg += f"function `{norm.__name__}({norm_params}) and "
msg += f"function `{non_norm.__name__}({non_norm_params}) "
msg += "have different arguments/parameters."
warnings.warn(msg)
return is_same_params
def non_normalized(non_norm, exclude=None, replace=None):
"""
Decorator for swapping a z-normalized function (or class) for its complementary
non-normalized function (or class) as defined by `non_norm`. This requires that
the z-normalized function (or class) has a `normalize` parameter.
With the exception of `normalize` parameter, the `non_norm` function (or class)
must have the same siganture as the `norm` function (or class) signature in order
to be compatible. Please use a combination of the `exclude` and/or `replace`
parameters when necessary.
```
def non_norm_func(Q, T, A_non_norm):
...
return
@non_normalized(
non_norm_func,
exclude=["normalize", "p", "A_norm", "A_non_norm"],
replace={"A_norm": "A_non_norm", "other_norm": None},
)
def norm_func(Q, T, A_norm=None, other_norm=None, normalize=True, p=2.0):
...
return
```
Parameters
----------
non_norm : function
The non-normalized function (or class) that is complementary to the
z-normalized function (or class)
exclude : list, default None
A list of function (or class) parameter names to exclude when comparing the
function (or class) signatures. When `exlcude is None`, this parameter is
automatically set to `exclude = ["normalize", "p", "T_A_subseq_isconstant",
T_B_subseq_isconstant]` by default.
replace : dict, default None
A dictionary of function (or class) parameter key-value pairs. Each key that
is found as a parameter name in the `norm` function (or class) will be replaced
by its corresponding or complementary parameter name in the `non_norm` function
(or class) (e.g., {"norm_param": "non_norm_param"}). To remove any parameter in
the `norm` function (or class) that does not exist in the `non_norm` function,
simply set the value to `None` (i.e., {"norm_param": None}).
Returns
-------
outer_wrapper : function
The desired z-normalized/non-normalized function (or class)
"""
if exclude is None:
exclude = [
"normalize",
"p",
"T_A_subseq_isconstant",
"T_B_subseq_isconstant",
]
@functools.wraps(non_norm)
def outer_wrapper(norm):
@functools.wraps(norm)
def inner_wrapper(*args, **kwargs):
is_same_params = _compare_parameters(norm, non_norm, exclude=exclude)
if not is_same_params or kwargs.get("normalize", True):
return norm(*args, **kwargs)
else:
kwargs = {k: v for k, v in kwargs.items() if k != "normalize"}
if replace is not None:
for k, v in replace.items():
if k in kwargs.keys():
if v is None: # pragma: no cover
_ = kwargs.pop(k)
else:
kwargs[v] = kwargs.pop(k)
return non_norm(*args, **kwargs)
return inner_wrapper
return outer_wrapper
def driver_not_found(*args, **kwargs): # pragma: no cover
"""
Helper function to raise CudaSupportError driver not found error.
Parameters
----------
None
Returns
-------
None
"""
_raise_driver_not_found()
def _gpu_stump_driver_not_found(*args, **kwargs): # pragma: no cover
"""
Dummy function to raise CudaSupportError driver not found error.
Parameters
----------
None
Returns
-------
None
"""
driver_not_found()
def _gpu_aamp_driver_not_found(*args, **kwargs): # pragma: no cover
"""
Dummy function to raise CudaSupportError driver not found error.
Parameters
----------
None
Returns
-------
None
"""
driver_not_found()
def _gpu_ostinato_driver_not_found(*args, **kwargs): # pragma: no cover
"""
Dummy function to raise CudaSupportError driver not found error.
Parameters
----------
None
Returns
-------
None
"""
driver_not_found()
def _gpu_aamp_ostinato_driver_not_found(*args, **kwargs): # pragma: no cover
"""
Dummy function to raise CudaSupportError driver not found error.
Parameters
----------
None
Returns
-------
None
"""
driver_not_found()
def _gpu_mpdist_driver_not_found(*args, **kwargs): # pragma: no cover
"""
Dummy function to raise CudaSupportError driver not found error.
Parameters
----------
None
Returns
-------
None
"""
driver_not_found()
def _gpu_aampdist_driver_not_found(*args, **kwargs): # pragma: no cover
"""
Dummy function to raise CudaSupportError driver not found error.
Parameters
----------
None
Returns
-------
None
"""
driver_not_found()
def _gpu_stimp_driver_not_found(*args, **kwargs): # pragma: no cover
"""
Dummy function to raise CudaSupportError driver not found error.
Parameters
----------
None
Returns
-------
None
"""
driver_not_found()
def _gpu_aamp_stimp_driver_not_found(*args, **kwargs): # pragma: no cover
"""
Dummy function to raise CudaSupportError driver not found error.
Parameters
----------
None
Returns
-------
None
"""
driver_not_found()
def _gpu_searchsorted_left_driver_not_found(*args, **kwargs): # pragma: no cover
"""
Dummy function to raise CudaSupportError driver not found error.
Parameters
----------
None
Returns
-------
None
"""
driver_not_found()
def _gpu_searchsorted_right_driver_not_found(*args, **kwargs): # pragma: no cover
"""
Dummy function to raise CudaSupportError driver not found error.
Parameters
----------
None
Returns
-------
None
"""
driver_not_found()
def get_pkg_name(): # pragma: no cover
"""
Return package name.
Parameters
----------
None
Returns
-------
None
"""
return __name__.split(".")[0]
def rolling_window(a, window):
"""
Use strides to generate rolling/sliding windows for a numpy array.
Parameters
----------
a : numpy.ndarray
numpy array
window : int
Size of the rolling window
Returns
-------
output : numpy.ndarray
This will be a new view of the original input array.
"""
a = np.asarray(a)
shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
strides = a.strides + (a.strides[-1],)
return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)
def z_norm(a, axis=0, threshold=config.STUMPY_STDDEV_THRESHOLD):
"""
Calculate the z-normalized input array `a` by subtracting the mean and
dividing by the standard deviation along a given axis.
Parameters
----------
a : numpy.ndarray
NumPy array
axis : int, default 0
NumPy array axis
threshold : float, default to config.STUMPY_STDDEV_THRESHOLD
A non-nan std value being less than `threshold` will be replaced with 1.0
Returns
-------
output : numpy.ndarray
An array with z-normalized values computed along a specified axis.
"""
std = np.std(a, axis, keepdims=True)
std[np.less(std, threshold, where=~np.isnan(std))] = 1.0
return (a - np.mean(a, axis, keepdims=True)) / std
def check_nan(a): # pragma: no cover
"""
Check if the array contains NaNs.
Parameters
----------
a : numpy.ndarray
NumPy array
Returns
-------
None
Raises
------
ValueError
If the array contains a NaN
"""
if np.any(np.isnan(a)):
msg = "Input array contains one or more NaNs"
raise ValueError(msg)
return
def check_dtype(a, dtype=np.float64): # pragma: no cover
"""
Check if the array type of `a` is of type specified by `dtype` parameter.
Parameters
----------
a : numpy.ndarray
NumPy array
dtype : dtype, default np.float64
NumPy `dtype`
Returns
-------
None
Raises
------
TypeError
If the array type does not match `dtype`
"""
if dtype == int:
dtype = np.int64
if dtype == float:
dtype = np.float64
if dtype == bool:
dtype = np.bool_
if not np.issubdtype(a.dtype, dtype):
msg = f"{dtype} dtype expected but found {a.dtype} in input array\n"
msg += "Please change your input `dtype` with `.astype(dtype)`"
raise TypeError(msg)
return True
def transpose_dataframe(df): # pragma: no cover
"""
Check if the input is a column-wise Pandas `DataFrame`. If `True`, return a
transpose dataframe since stumpy assumes that each row represents data from a
different dimension while each column represents data from the same dimension.
If `False`, return `a` unchanged. Pandas `Series` do not need to be transposed.
Note that this function has zero dependency on Pandas (not even a soft dependency).
Parameters
----------
df : numpy.ndarray
Pandas dataframe
Returns
-------
output : df
If `df` is a Pandas `DataFrame` then return `df.T`. Otherwise, return `df`
"""
if type(df).__name__ == "DataFrame":
return df.T
return df
def are_arrays_equal(a, b): # pragma: no cover
"""
Check if two arrays are equal; first by comparing memory addresses,
and secondly by their values.
Parameters
----------
a : numpy.ndarray
NumPy array
b : numpy.ndarray
NumPy array
Returns
-------
output : bool
This is `True` if the arrays are equal and `False` otherwise.
"""
if id(a) == id(b):
return True
# For numpy >= 1.19
# return np.array_equal(a, b, equal_nan=True)
if a.shape != b.shape:
return False
return bool(((a == b) | (np.isnan(a) & np.isnan(b))).all())
def are_distances_too_small(a, threshold=10e-6): # pragma: no cover
"""
Check the distance values from a matrix profile.
If the values are smaller than the threshold (i.e., less than 10e-6) then
it could suggest that this is a self-join.
Parameters
----------
a : numpy.ndarray
NumPy array
threshold : float, default 10e-6
Minimum value in which to compare the matrix profile to
Returns
-------
output : bool
This is `True` if the matrix profile distances are all below the
threshold and `False` if they are all above the threshold.
"""
if a.mean() < threshold or np.all(a < threshold):
return True
return False
def get_max_window_size(n):
"""
Get the maximum window size for a self-join
Parameters
----------
n : int
The length of the time series
Returns
-------
max_m : int
The maximum window size allowed given `config.STUMPY_EXCL_ZONE_DENOM`
"""
max_m = (
int(
n
- np.floor(
(n + (config.STUMPY_EXCL_ZONE_DENOM - 1))
// (config.STUMPY_EXCL_ZONE_DENOM + 1)
)
)
- 1
)
return max_m
def check_window_size(m, max_size=None):
"""
Check the window size and ensure that it is greater than or equal to 3 and, if
`max_size` is provided, ensure that the window size is less than or equal to the
`max_size`
Parameters
----------
m : int
Window size
max_size : int, default None
The maximum window size allowed
Returns
-------
None
"""
if m <= 2:
raise ValueError(
"All window sizes must be greater than or equal to three",
"""A window size that is less than or equal to two is meaningless when
it comes to computing the z-normalized Euclidean distance. In the case of
`m=1` produces a standard deviation of zero. In the case of `m=2`, both
the mean and standard deviation for any given subsequence are identical
and so the z-normalization for any sequence will either be [-1., 1.] or
[1., -1.]. Thus, the z-normalized Euclidean distance will be (very likely)
zero between any subsequence and its nearest neighbor (assuming that the
time series is large enough to contain both scenarios).
""",
)
if max_size is not None and m > max_size:
raise ValueError(f"The window size must be less than or equal to {max_size}")
@njit(fastmath=True)
def _sliding_dot_product(Q, T):
"""
A Numba JIT-compiled implementation of the sliding window dot product.
Parameters
----------
Q : numpy.ndarray
Query array or subsequence
T : numpy.ndarray
Time series or sequence
Returns
-------
out : numpy.ndarray
Sliding dot product between `Q` and `T`.
"""
m = Q.shape[0]
k = T.shape[0] - m + 1
out = np.empty(k)
for i in range(k):
out[i] = np.dot(Q, T[i : i + m])
return out
def sliding_dot_product(Q, T):
"""
Use FFT convolution to calculate the sliding window dot product.
Parameters
----------
Q : numpy.ndarray
Query array or subsequence
T : numpy.ndarray
Time series or sequence
Returns
-------
output : numpy.ndarray
Sliding dot product between `Q` and `T`.
Notes
-----
Calculate the sliding dot product
`DOI: 10.1109/ICDM.2016.0179 \
<https://www.cs.ucr.edu/~eamonn/PID4481997_extend_Matrix%20Profile_I.pdf>`__
See Table I, Figure 4
Following the inverse FFT, Fig. 4 states that only cells [m-1:n]
contain valid dot products
Padding is done automatically in fftconvolve step
"""
n = T.shape[0]
m = Q.shape[0]
Qr = np.flipud(Q) # Reverse/flip Q
QT = convolve(Qr, T)
return QT.real[m - 1 : n]
@njit(
# "f8[:](f8[:], i8, b1[:])",
fastmath={"nsz", "arcp", "contract", "afn", "reassoc"}
)
def _welford_nanvar(a, w, a_subseq_isfinite):
"""
Compute the rolling variance for a 1-D array while ignoring NaNs using a modified
version of Welford's algorithm but is much faster than using `np.nanstd` with stride
tricks.
Parameters
----------
a : numpy.ndarray
The input array
w : int
The rolling window size
a_subseq_isfinite : numpy.ndarray
A boolean array that describes whether each subequence of length `w` within `a`
is finite.
Returns
-------
all_variances : numpy.ndarray
Rolling window nanvar
"""
all_variances = np.empty(a.shape[0] - w + 1, dtype=np.float64)
prev_mean = 0.0
prev_var = 0.0
for start_idx in range(a.shape[0] - w + 1):
prev_start_idx = start_idx - 1
stop_idx = start_idx + w # Exclusive index value
last_idx = start_idx + w - 1 # Last inclusive index value
if (
start_idx == 0
or not a_subseq_isfinite[prev_start_idx]
or not a_subseq_isfinite[start_idx]
):
curr_mean = np.nanmean(a[start_idx:stop_idx])
curr_var = np.nanvar(a[start_idx:stop_idx])
else:
curr_mean = prev_mean + (a[last_idx] - a[prev_start_idx]) / w
curr_var = (
prev_var
+ (a[last_idx] - a[prev_start_idx])
* (a[last_idx] - curr_mean + a[prev_start_idx] - prev_mean)
/ w
)
all_variances[start_idx] = curr_var
prev_mean = curr_mean
prev_var = curr_var
return all_variances
def welford_nanvar(a, w=None):
"""
Compute the rolling variance for a 1-D array while ignoring NaNs using a modified
version of Welford's algorithm but is much faster than using `np.nanstd` with stride
tricks.
This is a convenience wrapper around the `_welford_nanvar` function.
Parameters
----------
a : numpy.ndarray
The input array
w : numpy.ndarray, default None
The rolling window size
Returns
-------
output : numpy.ndarray
Rolling window nanvar.
"""
if w is None:
w = a.shape[0]
a_subseq_isfinite = rolling_isfinite(a, w)
return _welford_nanvar(a, w, a_subseq_isfinite)
def welford_nanstd(a, w=None):
"""
Compute the rolling standard deviation for a 1-D array while ignoring NaNs using
a modified version of Welford's algorithm but is much faster than using `np.nanstd`
with stride tricks.
This a convenience wrapper around `welford_nanvar`.
Parameters
----------
a : numpy.ndarray
The input array
w : numpy.ndarray, default None
The rolling window size
Returns
-------
output : numpy.ndarray
Rolling window nanstd.
"""
if w is None:
w = a.shape[0]
return np.sqrt(np.clip(welford_nanvar(a, w), a_min=0, a_max=None))
@njit(parallel=True, fastmath={"nsz", "arcp", "contract", "afn", "reassoc"})
def _rolling_nanstd_1d(a, w):
"""
A Numba JIT-compiled and parallelized function for computing the rolling standard
deviation for 1-D array while ignoring NaN.
Parameters
----------
a : numpy.ndarray
The input array
w : int
The rolling window size
Returns
-------
out : numpy.ndarray
This 1D array has the length of `a.shape[0]-w+1`. `out[i]`
contains the stddev value of `a[i : i + w]`
"""
n = a.shape[0] - w + 1
out = np.empty(n, dtype=np.float64)
for i in prange(n):
out[i] = np.nanstd(a[i : i + w])
return out
def rolling_nanstd(a, w, welford=False):
"""
Compute the rolling standard deviation over the last axis of `a` while ignoring
NaNs.
This essentially replaces:
`np.nanstd(rolling_window(a[..., start:stop], w), axis=a.ndim)`
Parameters
----------
a : numpy.ndarray
The input array
w : numpy.ndarray
The rolling window size
welford : bool, default False
When False (default), the computation is parallelized and the stddev of
each subsequence is calculated on its own. When `welford==True`, the
welford method is used to reduce the computing time at the cost of slightly
reduced precision.
Returns
-------
out : numpy.ndarray
Rolling window nanstd
"""
axis = a.ndim - 1 # Account for rolling
if welford:
return np.apply_along_axis(
lambda a_row, w: welford_nanstd(a_row, w), axis=axis, arr=a, w=w
)
else:
return np.apply_along_axis(
lambda a_row, w: _rolling_nanstd_1d(a_row, w), axis=axis, arr=a, w=w
)
def _rolling_nanmin_1d(a, w=None):
"""
Compute the rolling min for 1-D while ignoring NaNs.
This essentially replaces:
`np.nanmin(rolling_window(a[..., start:stop], w), axis=a.ndim)`
Parameters
----------
a : numpy.ndarray
The input array
w : numpy.ndarray, default None
The rolling window size
Returns
-------
output : numpy.ndarray
Rolling window nanmin.
"""
if w is None:
w = a.shape[0]
half_window_size = int(math.ceil((w - 1) / 2))
return minimum_filter1d(a, size=w)[
half_window_size : half_window_size + a.shape[0] - w + 1
]
def _rolling_nanmax_1d(a, w=None):
"""
Compute the rolling max for 1-D while ignoring NaNs.
This essentially replaces:
`np.nanmax(rolling_window(a[..., start:stop], w), axis=a.ndim)`
Parameters
----------
a : numpy.ndarray
The input array
w : numpy.ndarray, default None
The rolling window size
Returns
-------
output : numpy.ndarray
Rolling window nanmax.
"""
if w is None:
w = a.shape[0]
half_window_size = int(math.ceil((w - 1) / 2))
return maximum_filter1d(a, size=w)[
half_window_size : half_window_size + a.shape[0] - w + 1
]
def rolling_nanmin(a, w):
"""
Compute the rolling min for 1-D and 2-D arrays while ignoring NaNs.
This a convenience wrapper around `_rolling_nanmin_1d`.
This essentially replaces:
`np.nanmin(rolling_window(a[..., start:stop], w), axis=a.ndim)`
Parameters
----------
a : numpy.ndarray
The input array
w : numpy.ndarray
The rolling window size
Returns
-------
output : numpy.ndarray
Rolling window nanmin.
"""
axis = a.ndim - 1 # Account for rolling
return np.apply_along_axis(
lambda a_row, w: _rolling_nanmin_1d(a_row, w), axis=axis, arr=a, w=w
)
def rolling_nanmax(a, w):
"""
Compute the rolling max for 1-D and 2-D arrays while ignoring NaNs.
This a convenience wrapper around `_rolling_nanmax_1d`.
This essentially replaces:
`np.nanmax(rolling_window(a[..., start:stop], w), axis=a.ndim)`
Parameters
----------
a : numpy.ndarray
The input array
w : numpy.ndarray
The rolling window size
Returns
-------
output : numpy.ndarray
Rolling window nanmax.
"""
axis = a.ndim - 1 # Account for rolling
return np.apply_along_axis(
lambda a_row, w: _rolling_nanmax_1d(a_row, w), axis=axis, arr=a, w=w
)
def compute_mean_std(T, m):
"""
Compute the sliding mean and standard deviation for the array `T` with
a window size of `m`
Parameters
----------
T : numpy.ndarray
Time series or sequence
m : int
Window size
Returns
-------
M_T : numpy.ndarray
Sliding mean. All nan values are replaced with np.inf
Σ_T : numpy.ndarray
Sliding standard deviation
Notes
-----
`DOI: 10.1109/ICDM.2016.0179 \
<https://www.cs.ucr.edu/~eamonn/PID4481997_extend_Matrix%20Profile_I.pdf>`__
See Table II
DOI: 10.1145/2020408.2020587
See Page 2 and Equations 1, 2
DOI: 10.1145/2339530.2339576
See Page 4
http://www.cs.unm.edu/~mueen/FastestSimilaritySearch.html
Note that Mueen's algorithm has an off-by-one bug where the
sum for the first subsequence is omitted and we fixed that!
"""
num_chunks = config.STUMPY_MEAN_STD_NUM_CHUNKS
max_iter = config.STUMPY_MEAN_STD_MAX_ITER
if T.ndim > 2: # pragma nocover
raise ValueError("T has to be one or two dimensional!")
for iteration in range(max_iter):
try:
chunk_size = math.ceil((T.shape[-1] + 1) / num_chunks)
if chunk_size < m:
chunk_size = m
mean_chunks = []
std_chunks = []
for chunk in range(num_chunks):
start = chunk * chunk_size
stop = min(start + chunk_size + m - 1, T.shape[-1])
if stop - start < m:
break
tmp_mean = np.mean(rolling_window(T[..., start:stop], m), axis=T.ndim)
mean_chunks.append(tmp_mean)
tmp_std = rolling_nanstd(T[..., start:stop], m)
std_chunks.append(tmp_std)
M_T = np.hstack(mean_chunks)
Σ_T = np.hstack(std_chunks)
break
except MemoryError: # pragma nocover
num_chunks *= 2
if iteration < max_iter - 1:
M_T[np.isnan(M_T)] = np.inf
Σ_T[np.isnan(Σ_T)] = 0
return M_T, Σ_T
else: # pragma nocover
raise MemoryError(
"Could not calculate mean and standard deviation. "
"Increase the number of chunks or maximal iterations."
)
@njit(
# "f8(i8, f8, f8, f8, f8, f8)",
fastmath=True
)
def _calculate_squared_distance(
m, QT, μ_Q, σ_Q, M_T, Σ_T, Q_subseq_isconstant, T_subseq_isconstant
):
"""
Compute a single squared distance given all scalar inputs.
Parameters
----------
m : int
Window size
QT : float
Pre-computed dot product between `Q` and the ith subsequence in `T`, each with
length `m`
μ_Q : float
Mean of `Q`
σ_Q : float
Standard deviation of `Q`
M_T : float
Mean of the ith subsequence in `T`
Σ_T : float
Standard deviation of the ith subsequence in `T`
Q_subseq_isconstant : bool
A boolean value that indicates whether the subsequence `Q` is constant (True)
T_subseq_isconstant : bool
A boolean value that indicates whether the ith subsequence in `T` is
constant (True)
Returns
-------
D_squared : float
Squared distance
Notes
-----
`DOI: 10.1109/ICDM.2016.0179 \
<https://www.cs.ucr.edu/~eamonn/PID4481997_extend_Matrix%20Profile_I.pdf>`__
See Equation on Page 4
"""
if np.isinf(M_T) or np.isinf(μ_Q):
D_squared = np.inf
elif Q_subseq_isconstant and T_subseq_isconstant:
D_squared = 0
elif Q_subseq_isconstant or T_subseq_isconstant:
D_squared = m
else:
denom = m * σ_Q * Σ_T
denom = max(denom, config.STUMPY_DENOM_THRESHOLD)
ρ = (QT - m * μ_Q * M_T) / denom
ρ = min(ρ, 1.0)
D_squared = np.abs(2 * m * (1.0 - ρ))
return D_squared
@njit(
# "f8[:](i8, f8[:], f8, f8, f8[:], f8[:])",
fastmath=True,
)
def _calculate_squared_distance_profile(
m, QT, μ_Q, σ_Q, M_T, Σ_T, Q_subseq_isconstant, T_subseq_isconstant
):
"""
Compute the squared distance profile
Parameters
----------
m : int
Window size
QT : numpy.ndarray
Dot product between `Q` and `T`
μ_Q : float
Mean of `Q`
σ_Q : float
Standard deviation of `Q`
M_T : numpy.ndarray
Sliding mean of `T`
Σ_T : numpy.ndarray
Sliding standard deviation of `T`
Q_subseq_isconstant : bool
A boolean value that indicates whether the subsequence `Q` is constant (True)
T_subseq_isconstant : numpy.ndarray
A boolean array that indicates whether a subsequence in `T` is constant (True)
Returns
-------
D_squared : numpy.ndarray
Squared distance profile
Notes
-----
`DOI: 10.1109/ICDM.2016.0179 \
<https://www.cs.ucr.edu/~eamonn/PID4481997_extend_Matrix%20Profile_I.pdf>`__
See Equation on Page 4
"""
k = M_T.shape[0]
D_squared = np.empty(k, dtype=np.float64)
for i in range(k):
D_squared[i] = _calculate_squared_distance(
m,
QT[i],
μ_Q,
σ_Q,
M_T[i],
Σ_T[i],
Q_subseq_isconstant,
T_subseq_isconstant[i],
)
return D_squared
@njit(
# "f8[:](i8, f8[:], f8, f8, f8[:], f8[:])",
fastmath=True,
)
def calculate_distance_profile(
m, QT, μ_Q, σ_Q, M_T, Σ_T, Q_subseq_isconstant, T_subseq_isconstant
):
"""
Compute the distance profile
Parameters
----------
m : int
Window size
QT : numpy.ndarray
Dot product between `Q` and `T`
μ_Q : float
Mean of `Q`
σ_Q : float
Standard deviation of `Q`
M_T : numpy.ndarray
Sliding mean of `T`
Σ_T : numpy.ndarray
Sliding standard deviation of `T`
Q_subseq_isconstant : bool
A boolean value that indicates whether the subsequence `Q` is constant (True)
T_subseq_isconstant : numpy.ndarray
A boolean array that indicates whether a subsequence in `T` is constant (True)
Returns
-------
output : numpy.ndarray
Distance profile
Notes
-----
`DOI: 10.1109/ICDM.2016.0179 \
<https://www.cs.ucr.edu/~eamonn/PID4481997_extend_Matrix%20Profile_I.pdf>`__
See Equation on Page 4
"""
D_squared = _calculate_squared_distance_profile(
m, QT, μ_Q, σ_Q, M_T, Σ_T, Q_subseq_isconstant, T_subseq_isconstant
)
return np.sqrt(D_squared)
@njit(fastmath=True)
def _p_norm_distance_profile(Q, T, p=2.0):
"""
A Numba JIT-compiled and parallelized function for computing the p-normalized
distance profile
Parameters
----------
Q : numpy.ndarray
Query array or subsequence
T : numpy.ndarray
Time series or sequence
p : float, default 2.0
The p-norm to apply for computing the Minkowski distance.
Returns
-------
output : numpy.ndarray
p-normalized distance profile between `Q` and `T`
"""
m = Q.shape[0]
k = T.shape[0] - m + 1
p_norm_profile = np.empty(k, dtype=np.float64)
if p == 2.0:
Q_squared = np.sum(Q * Q)
T_squared = np.empty(k, dtype=np.float64)
T_squared[0] = np.sum(T[:m] * T[:m])
for i in range(1, k):
T_squared[i] = (
T_squared[i - 1] - T[i - 1] * T[i - 1] + T[i + m - 1] * T[i + m - 1]
)
QT = _sliding_dot_product(Q, T)
for i in range(k):
p_norm_profile[i] = Q_squared + T_squared[i] - 2.0 * QT[i]
else:
for i in range(k):
p_norm_profile[i] = np.sum(np.power(np.abs(Q - T[i : i + m]), p))
return p_norm_profile
def _mass_absolute(Q, T, p=2.0):
"""
A wrapper around `cdist` for computing the non-normalized distance profile
Parameters
----------
Q : numpy.ndarray
Query array or subsequence
T : numpy.ndarray
Time series or sequence
p : float, default 2.0
The p-norm to apply for computing the Minkowski distance. Minkowski distance is
typically used with `p` being 1 or 2, which correspond to the Manhattan distance
and the Euclidean distance, respectively.
Returns
-------
output : numpy.ndarray
Non-normalized distance profile
"""
m = Q.shape[0]
return cdist(
rolling_window(Q, m), rolling_window(T, m), metric="minkowski", p=p
).flatten()
def mass_absolute(Q, T, T_subseq_isfinite=None, p=2.0, query_idx=None):
"""
Compute the non-normalized distance profile (i.e., without z-normalization) using
the "MASS absolute" algorithm. This is a convenience wrapper around the Numba JIT
compiled `_mass_absolute` function.
Parameters
----------
Q : numpy.ndarray
Query array or subsequence
T : numpy.ndarray
Time series or sequence
T_subseq_isfinite : numpy.ndarray, default None
A boolean array that indicates whether a subsequence in `T` contains a
`np.nan`/`np.inf` value (False)
p : float, default 2.0
The p-norm to apply for computing the Minkowski distance. Minkowski distance is
typically used with `p` being 1 or 2, which correspond to the Manhattan distance
and the Euclidean distance, respectively.
query_idx : int, default None
This is the index position along the time series, `T`, where the query
subsequence, `Q`, is located. `query_idx` should be set to None if `Q`
is not a subsequence of `T`. If `Q` is a subsequence of `T`, provding
this argument is optional. If query_idx is provided, the distance between
Q and T[query_idx : query_idx + m] will automatically be set to zero.
Returns
-------
output : numpy.ndarray
Unnormalized Distance profile
Notes
-----
`See Mueen's Absolute Algorithm for Similarity Search \
<https://www.cs.unm.edu/~mueen/MASS_absolute.m>`__
"""
Q = _preprocess(Q)
m = Q.shape[0]
if Q.ndim == 2 and Q.shape[1] == 1: # pragma: no cover
warnings.warn("`Q` must be 1-dimensional and was automatically flattened")
Q = Q.flatten()
if Q.ndim != 1: # pragma: no cover
raise ValueError(f"`Q` is {Q.ndim}-dimensional and must be 1-dimensional. ")
Q_isfinite = np.isfinite(Q)
check_window_size(m, max_size=Q.shape[-1])
if query_idx is not None: # pragma: no cover
query_idx = int(query_idx)
T_isfinite_idx = np.isfinite(T[query_idx : query_idx + m])
if not np.all(Q_isfinite == T_isfinite_idx) or not np.allclose(
Q[Q_isfinite], T[query_idx : query_idx + m][T_isfinite_idx]
):
msg = (
"Subsequences `Q` and `T[query_idx:query_idx+m]` are "
+ "different but were expected to be identical. Please "
+ "verify that `query_idx` is correct."
)
warnings.warn(msg)
T = _preprocess(T)
n = T.shape[0]
if T.ndim == 2 and T.shape[1] == 1: # pragma: no cover
T = T.flatten()
if T.ndim != 1: # pragma: no cover
raise ValueError(f"T is {T.ndim}-dimensional and must be 1-dimensional. ")
if m > n: # pragma: no cover
raise ValueError(
f"The length of `Q` ({len(Q)}) must be less than or equal to "
f"the length of `T` ({len(T)}). "
)
distance_profile = np.empty(n - m + 1, dtype=np.float64)
if np.any(~Q_isfinite):
distance_profile[:] = np.inf
else:
if T_subseq_isfinite is None:
T, T_subseq_isfinite = preprocess_non_normalized(T, m)
distance_profile[:] = _mass_absolute(Q, T, p)
if query_idx is not None: # pragma: no cover
distance_profile[query_idx] = 0.0
distance_profile[~T_subseq_isfinite] = np.inf
return distance_profile
def _mass_absolute_distance_matrix(Q, T, m, distance_matrix, p=2.0):
"""
Compute the full non-normalized (i.e., without z-normalization) distance matrix
between all of the subsequences of `Q` and `T` using the MASS absolute algorithm
Parameters
----------
Q : numpy.ndarray
Query array
T : numpy.ndarray
Time series or sequence
m : int
Window size
distance_matrix : numpy.ndarray
The full output distance matrix. This is mandatory since it may be reused.
p : float, default 2.0
The p-norm to apply for computing the Minkowski distance. Minkowski distance is
typically used with `p` being 1 or 2, which correspond to the Manhattan distance
and the Euclidean distance, respectively.
Returns
-------
None
"""
cdist(
rolling_window(Q, m),
rolling_window(T, m),
out=distance_matrix,
metric="minkowski",
p=p,
)
def mueen_calculate_distance_profile(Q, T):
"""
Compute the mueen distance profile
Parameters
----------
Q : numpy.ndarray
Query array or subsequence
T : numpy.ndarray
Time series or sequence
Returns
-------
output : numpy.ndarray
Distance profile
Notes
-----
`DOI: 10.1109/ICDM.2016.0179 \
<https://www.cs.ucr.edu/~eamonn/PID4481997_extend_Matrix%20Profile_I.pdf>`__
See Table II
DOI: 10.1145/2020408.2020587
See Page 2 and Equations 1, 2
DOI: 10.1145/2339530.2339576
See Page 4
http://www.cs.unm.edu/~mueen/FastestSimilaritySearch.html
Note that Mueen's algorithm has an off-by-one bug where the
sum for the first subsequence is omitted and we fixed that!
"""
n = T.shape[0]
m = Q.shape[0]
μ_Q = np.mean(Q, keepdims=True)
σ_Q = np.std(Q, keepdims=True)
Q_norm = (Q - μ_Q) / σ_Q
QT = sliding_dot_product(Q_norm, T)
cumsum_T = np.empty(len(T) + 1, dtype=np.float64) # Add one element, fix off-by-one
np.cumsum(T, out=cumsum_T[1:]) # store output in cumsum_T[1:]
cumsum_T[0] = 0
cumsum_T_squared = np.empty(len(T) + 1, dtype=np.float64)
np.cumsum(np.square(T), out=cumsum_T_squared[1:])
cumsum_T_squared[0] = 0
subseq_sum_T = cumsum_T[m:] - cumsum_T[: n - m + 1]
subseq_sum_T_squared = cumsum_T_squared[m:] - cumsum_T_squared[: n - m + 1]
M_T = subseq_sum_T / m
Σ_T_squared = np.abs(subseq_sum_T_squared / m - np.square(M_T))
Σ_T = np.sqrt(Σ_T_squared)
D = np.abs(
(subseq_sum_T_squared - 2 * subseq_sum_T * M_T + m * np.square(M_T))
/ Σ_T_squared
- 2 * QT / Σ_T
+ m
)
return np.sqrt(D)
@njit(
# "f8[:](f8[:], f8[:], f8[:], f8, f8, f8[:], f8[:])",
fastmath=True
)
def _mass(Q, T, QT, μ_Q, σ_Q, M_T, Σ_T, Q_subseq_isconstant, T_subseq_isconstant):
"""
A Numba JIT compiled algorithm for computing the distance profile using the MASS
algorithm.
This private function assumes only finite numbers in your inputs and it is the
responsibility of the user to pre-process and post-process their results if the
original time series contains `np.nan`/`np.inf` values. Failure to do so will
result in incorrect outputs. See `core.mass` for common pre-processing and
post-processing procedures.
Parameters
----------
Q : numpy.ndarray
Query array or subsequence
T : numpy.ndarray
Time series or sequence
QT : numpy.ndarray
The sliding dot product of Q and T
μ_Q : float
The scalar mean of Q
σ_Q : float
The scalar standard deviation of Q
M_T : numpy.ndarray
Sliding mean of `T`
Σ_T : numpy.ndarray
Sliding standard deviation of `T`
Q_subseq_isconstant : bool
A boolean value that indicates whether the subsequence `Q` is constant (True)
T_subseq_isconstant : numpy.ndarray
A boolean array that indicates whether a subsequence in `T` is constant (True)
Returns
-------
output : numpy.ndarray
Distance profile
Notes
-----
`DOI: 10.1109/ICDM.2016.0179 \
<https://www.cs.ucr.edu/~eamonn/PID4481997_extend_Matrix%20Profile_I.pdf>`__
See Table II
Note that Q, T are not directly required to calculate D
Note: Unlike the Matrix Profile I paper, here, M_T, Σ_T can be calculated
once for all subsequences of T and passed in so the redundancy is removed
"""
m = Q.shape[0]
return calculate_distance_profile(
m, QT, μ_Q, σ_Q, M_T, Σ_T, Q_subseq_isconstant, T_subseq_isconstant
)
[docs]@non_normalized(
mass_absolute,
exclude=[
"normalize",
"M_T",
"Σ_T",
"T_subseq_isfinite",
"p",
"T_subseq_isconstant",
"Q_subseq_isconstant",
],
replace={"M_T": "T_subseq_isfinite", "Σ_T": None},
)
def mass(
Q,
T,
M_T=None,
Σ_T=None,
normalize=True,
p=2.0,
T_subseq_isfinite=None,
T_subseq_isconstant=None,
Q_subseq_isconstant=None,
query_idx=None,
):
"""
Compute the distance profile using the MASS algorithm
This is a convenience wrapper around the Numba JIT compiled `_mass` function.
Parameters
----------
Q : numpy.ndarray
Query array or subsequence
T : numpy.ndarray
Time series or sequence
M_T : numpy.ndarray, default None
Sliding mean of `T`
Σ_T : numpy.ndarray, default None
Sliding standard deviation of `T`
normalize : bool, default True
When set to `True`, this z-normalizes subsequences prior to computing distances.
Otherwise, this function gets re-routed to its complementary non-normalized
equivalent set in the `@core.non_normalized` function decorator.
p : float, default 2.0
The p-norm to apply for computing the Minkowski distance. This parameter is
ignored when `normalize == True`.
T_subseq_isfinite : numpy.ndarray, default None
A boolean array that indicates whether a subsequence in `T` contains a
`np.nan`/`np.inf` value (False). This parameter is ignored when
`normalize=True`.
T_subseq_isconstant : numpy.ndarray or function, default None
A boolean array that indicates whether a subsequence in `T` is constant
(True). Alternatively, a custom, user-defined function that returns a
boolean array that indicates whether a subsequence in `T` is constant
(True). The function must only take two arguments, `a`, a 1-D array,
and `w`, the window size, while additional arguments may be specified
by currying the user-defined function using `functools.partial`. Any
subsequence with at least one np.nan/np.inf will automatically have its
corresponding value set to False in this boolean array.
Q_subseq_isconstant : numpy.ndarray or function, default None
A boolean array that indicates whether a subsequence in `Q` is constant
(True). Alternatively, a custom, user-defined function that returns a
boolean array that indicates whether a subsequence in `Q` is constant
(True). The function must only take two arguments, `a`, a 1-D array,
and `w`, the window size, while additional arguments may be specified
by currying the user-defined function using `functools.partial`. Any
subsequence with at least one np.nan/np.inf will automatically have its
corresponding value set to False in this boolean array.
query_idx : int, default None
This is the index position along the time series, `T`, where the query
subsequence, `Q`, is located. `query_idx` should be set to None if `Q`
is not a subsequence of `T`. If `Q` is a subsequence of `T`, provding
this argument is optional. If query_idx is provided, the distance
between Q and `T[query_idx : query_idx + m]` will automatically be set to
zero.
Returns
-------
distance_profile : numpy.ndarray
Distance profile
See Also
--------
stumpy.motifs : Discover the top motifs for time series `T`
stumpy.match : Find all matches of a query `Q` in a time series `T```
Notes
-----
`DOI: 10.1109/ICDM.2016.0179 \
<https://www.cs.ucr.edu/~eamonn/PID4481997_extend_Matrix%20Profile_I.pdf>`__
See Table II
Note that Q, T are not directly required to calculate D
Note: Unlike the Matrix Profile I paper, here, M_T, Σ_T can be calculated
once for all subsequences of T and passed in so the redundancy is removed
Examples
--------
>>> import stumpy
>>> import numpy as np
>>> stumpy.mass(
... np.array([-11.1, 23.4, 79.5, 1001.0]),
... np.array([584., -11., 23., 79., 1001., 0., -19.]))
array([3.18792463e+00, 1.11297393e-03, 3.23874018e+00, 3.34470195e+00])
"""
Q = _preprocess(Q)
m = Q.shape[0]
if Q.ndim == 2 and Q.shape[1] == 1: # pragma: no cover
warnings.warn("`Q` must be 1-dimensional and was automatically flattened")
Q = Q.flatten()
if Q.ndim != 1: # pragma: no cover
raise ValueError(f"Q is {Q.ndim}-dimensional and must be 1-dimensional. ")
Q_isfinite = np.isfinite(Q)
check_window_size(m, max_size=Q.shape[-1])
if query_idx is not None:
query_idx = int(query_idx)
T_isfinite_idx = np.isfinite(T[query_idx : query_idx + m])
if not np.all(Q_isfinite == T_isfinite_idx) or not np.allclose(
Q[Q_isfinite], T[query_idx : query_idx + m][T_isfinite_idx]
): # pragma: no cover
msg = (
"Subsequences `Q` and `T[query_idx:query_idx+m]` are "
+ "different but were expected to be identical. Please "
+ "verify that `query_idx` is correct."
)
warnings.warn(msg)
T = _preprocess(T)
n = T.shape[0]
if T.ndim == 2 and T.shape[1] == 1: # pragma: no cover
T = T.flatten()
if T.ndim != 1: # pragma: no cover
raise ValueError(f"T is {T.ndim}-dimensional and must be 1-dimensional. ")
if m > n: # pragma: no cover
raise ValueError(
f"The length of `Q` ({len(Q)}) must be less than or equal to "
f"the length of `T` ({len(T)}). "
)
distance_profile = np.empty(n - m + 1, dtype=np.float64)
if np.any(~Q_isfinite):
distance_profile[:] = np.inf
else:
T, M_T, Σ_T, T_subseq_isconstant = preprocess(
T,
m,
copy=False,
M_T=M_T,
Σ_T=Σ_T,
T_subseq_isconstant=T_subseq_isconstant,
)
QT = sliding_dot_product(Q, T)
Q, μ_Q, σ_Q, Q_subseq_isconstant = preprocess(
Q,
m,
copy=False,
T_subseq_isconstant=Q_subseq_isconstant,
)
distance_profile[:] = _mass(
Q,
T,
QT,
μ_Q[0],
σ_Q[0],
M_T,
Σ_T,
Q_subseq_isconstant[0],
T_subseq_isconstant,
)
if query_idx is not None:
distance_profile[query_idx] = 0
return distance_profile
def _mass_distance_matrix(
Q,
T,
m,
distance_matrix,
μ_Q,
σ_Q,
M_T,
Σ_T,
Q_subseq_isconstant,
T_subseq_isconstant,
query_idx=None,
):
"""
Compute the full distance matrix between all of the subsequences of `Q` and `T`
using the MASS algorithm
Parameters
----------
Q : numpy.ndarray
Query array
T : numpy.ndarray
Time series or sequence
m : int
Window size
distance_matrix : numpy.ndarray
The full output distance matrix. This is mandatory since it may be reused.
μ_Q : numpy.ndarray
Sliding mean of `Q`
σ_Q : numpy.ndarray
Sliding standard deviation of `Q`
M_T : numpy.ndarray
Sliding mean of `T`
Σ_T : numpy.ndarray
Sliding standard deviation of `T`
Q_subseq_isconstant : numpy.ndarray
A boolean array that indicates whether a subsequence in `Q` is constant (True)
T_subseq_isconstant : numpy.ndarray
A boolean array that indicates whether a subsequence in `T` is constant (True)
query_idx : int, default None
This is the index position along the time series, `T`, where the query
subsequence, `Q`, is located. `query_idx` should be set to None if `Q`
is not a subsequence of `T`. If `Q` is a subsequence of `T`, provding
this argument is optional. If provided, the precision of computation
can be slightly improved.
Returns
-------
None
"""
if query_idx is not None:
query_idx = int(query_idx)
for i in range(distance_matrix.shape[0]):
if np.any(~np.isfinite(Q[i : i + m])): # pragma: no cover
distance_matrix[i, :] = np.inf
else:
QT = _sliding_dot_product(Q[i : i + m], T)
distance_matrix[i, :] = _mass(
Q[i : i + m],
T,
QT,
μ_Q[i],
σ_Q[i],
M_T,
Σ_T,
Q_subseq_isconstant[i],
T_subseq_isconstant,
)
# this is to fix slight loss-of-precision
if query_idx is not None:
distance_matrix[i, query_idx + i] = 0.0
def mass_distance_matrix(
Q,
T,
m,
distance_matrix,
M_T=None,
Σ_T=None,
T_subseq_isconstant=None,
Q_subseq_isconstant=None,
):
"""
Compute the full distance matrix between all of the subsequences of `Q` and `T`
using the MASS algorithm
Parameters
----------
Q : numpy.ndarray
Query array
T : numpy.ndarray
Time series or sequence
m : int
Window size
distance_matrix : numpy.ndarray
The full output distance matrix. This is mandatory since it may be reused.
M_T : numpy.ndarray, default None
Sliding mean of `T`
Σ_T : numpy.ndarray, default None
Sliding standard deviation of `T`
T_subseq_isconstant : numpy.ndarray, function, default None
A boolean array that indicates whether a subsequence in `T` is constant
(True). Alternatively, a custom, user-defined function that returns a
boolean array that indicates whether a subsequence in `T` is constant
(True). The function must only take two arguments, `a`, a 1-D array,
and `w`, the window size, while additional arguments may be specified
by currying the user-defined function using `functools.partial`. Any
subsequence with at least one np.nan/np.inf will automatically have its
corresponding value set to False in this boolean array.
Q_subseq_isconstant : numpy.ndarray, function, default None
A boolean array that indicates whether a subsequence in `Q` is constant
(True). Alternatively, a custom, user-defined function that returns a
boolean array that indicates whether a subsequence in `Q` is constant
(True). The function must only take two arguments, `a`, a 1-D array,
and `w`, the window size, while additional arguments may be specified
by currying the user-defined function using `functools.partial`. Any
subsequence with at least one np.nan/np.inf will automatically have its
corresponding value set to False in this boolean array.
Returns
-------
None
"""
Q, μ_Q, σ_Q, Q_subseq_isconstant = preprocess(
T=Q, m=m, copy=True, T_subseq_isconstant=Q_subseq_isconstant
)
T, M_T, Σ_T, T_subseq_isconstant = preprocess(
T,
m,
copy=True,
M_T=M_T,
Σ_T=Σ_T,
T_subseq_isconstant=T_subseq_isconstant,
)
check_window_size(m, max_size=min(Q.shape[-1], T.shape[-1]))
return _mass_distance_matrix(
Q,
T,
m,
distance_matrix,
μ_Q,
σ_Q,
M_T,
Σ_T,
Q_subseq_isconstant,
T_subseq_isconstant,
)
def _get_QT(start, T_A, T_B, m):
"""
Compute the sliding dot product between the query, `T_B`, (from
[start:start+m]) and the time series, `T_A`. Additionally, compute
QT for the first window `T_A[:m]` and `T_B`.
Parameters
----------
start : int
The window index for T_B from which to calculate the QT dot product
T_A : numpy.ndarray
The time series or sequence for which to compute the dot product
T_B : numpy.ndarray
The time series or sequence that will be used to annotate T_A. For every
subsequence in T_A, its nearest neighbor in T_B will be recorded.
m : int
Window size
Returns
-------
QT : numpy.ndarray
Given `start`, return the corresponding QT
QT_first : numpy.ndarray
QT for the first window
"""
QT = sliding_dot_product(T_B[start : start + m], T_A)
QT_first = sliding_dot_product(T_A[:m], T_B)
return QT, QT_first
@njit(
# ["(f8[:], i8, i8)", "(f8[:, :], i8, i8)"],
fastmath=True
)
def _apply_exclusion_zone(a, idx, excl_zone, val):
"""
Apply an exclusion zone to an array (inplace), i.e. set all values
to `val` in a window around a given index.
All values in a in [idx - excl_zone, idx + excl_zone] (endpoints included)
will be set to `val`.
Parameters
----------
a : numpy.ndarray
The array you want to apply the exclusion zone to
idx : int
The index around which the window should be centered
excl_zone : int
Size of the exclusion zone.
val : float or bool
The elements within the exclusion zone will be set to this value
Returns
-------
None
"""
zone_start = max(0, idx - excl_zone)
zone_stop = min(a.shape[-1], idx + excl_zone)
a[..., zone_start : zone_stop + 1] = val
def apply_exclusion_zone(a, idx, excl_zone, val):
"""
Apply an exclusion zone to an array (inplace), i.e. set all values
to `val` in a window around a given index.
All values in a in [idx - excl_zone, idx + excl_zone] (endpoints included)
will be set to `val`. This is a convenience wrapper around the Numba JIT-compiled
`_apply_exclusion_zone` function.
Parameters
----------
a : numpy.ndarray
The array you want to apply the exclusion zone to
idx : int
The index around which the window should be centered
excl_zone : int
Size of the exclusion zone.
val : float or bool
The elements within the exclusion zone will be set to this value
Returns
-------
None
"""
check_dtype(a, dtype=type(val))
_apply_exclusion_zone(a, idx, excl_zone, val)
def _preprocess(T, copy=True):
"""
Creates a copy of the time series when `copy` is True, transposes all dataframes,
converts to `numpy.ndarray`, and checks the `dtype`
Parameters
----------
T : numpy.ndarray
Time series or sequence
copy : bool, default True
A boolean value that indicates whether the process should be done on
input `T` (False) or its copy (True).
Returns
-------
T : numpy.ndarray
Modified time series
"""
if copy:
T = T.copy()
T = transpose_dataframe(T)
T = np.asarray(T)
check_dtype(T)
return T
def preprocess(
T,
m,
copy=True,
M_T=None,
Σ_T=None,
T_subseq_isconstant=None,
):
"""
Creates a copy of the time series where all NaN and inf values
are replaced with zero. Also computes mean and standard deviation
for every subsequence. Every subsequence that contains at least
one NaN or inf value, will have a mean of np.inf. For the standard
deviation these values are ignored. If all values are illegal, the
standard deviation will be 0 (see `core.compute_mean_std`). Also,
compute the rolling isconstant, a boolean array that indicates if
a subsequence is constant (True) or False. A subsequence is constant
if it contains finite values that are identical.
Parameters
----------
T : numpy.ndarray
Time series or sequence
m : int
Window size
copy : bool, default True
A boolean value that indicates whether the process should be done on
input `T` (False) or its copy (True).
M_T : numpy.ndarray, default None
Rolling mean
Σ_T : numpy.ndarray, default None
Rolling standard deviation
T_subseq_isconstant : numpy.ndarray or function, default None
A boolean array that indicates whether a subsequence in `T` is constant
(True). Alternatively, a custom, user-defined function that returns a
boolean array that indicates whether a subsequence in `T` is constant
(True). The function must only take two arguments, `a`, a 1-D array,
and `w`, the window size, while additional arguments may be specified
by currying the user-defined function using `functools.partial`. Any
subsequence with at least one np.nan/np.inf will automatically have its
corresponding value set to False in this boolean array.
Returns
-------
T : numpy.ndarray
Modified time series
M_T : numpy.ndarray
Rolling mean
Σ_T : numpy.ndarray
Rolling standard deviation
T_subseq_isconstant : numpy.ndarray
A boolean array that indicates whether a subsequence in `T`
is constant (True)
"""
T = _preprocess(T, copy)
check_window_size(m, max_size=T.shape[-1])
T[np.isinf(T)] = np.nan
T_subseq_isconstant = rolling_isconstant(T, m, T_subseq_isconstant)
T_subseq_isconstant = fix_isconstant_isfinite_conflicts(T, m, T_subseq_isconstant)
if M_T is None or Σ_T is None:
M_T, Σ_T = compute_mean_std(T, m)
T[np.isnan(T)] = 0
return T, M_T, Σ_T, T_subseq_isconstant
def preprocess_non_normalized(T, m):
"""
Preprocess a time series that is to be used when computing a non-normalized (i.e.,
without z-normalization) distance matrix.
Creates a copy of the time series where all NaN and inf values
are replaced with zero. Every subsequence that contains at least
one NaN or inf value will have a `False` value in its `T_subseq_isfinite` `bool`
array.
Parameters
----------
T : numpy.ndarray
Time series or sequence
m : int
Window size
Returns
-------
T : numpy.ndarray
Modified time series
T_subseq_isfinite : numpy.ndarray
A boolean array that indicates whether a subsequence in `T` contains a
`np.nan`/`np.inf` value (False)
"""
T = _preprocess(T)
check_window_size(m, max_size=T.shape[-1])
T_subseq_isfinite = rolling_isfinite(T, m)
T[~np.isfinite(T)] = np.nan
T[np.isnan(T)] = 0
return T, T_subseq_isfinite
def preprocess_diagonal(T, m, T_subseq_isconstant=None):
"""
Preprocess a time series that is to be used when traversing the diagonals of a
distance matrix.
Creates a copy of the time series where all NaN and inf values are replaced
with zero. Also computes means, `M_T` and `M_T_m_1`, for every subsequence
using a window size of `m` and `m-1`, respectively, and the inverse standard
deviation, `Σ_T_inverse`. Every subsequence that contains at least one NaN or
inf value will have a `False` value in its `T_subseq_isfinite` `bool` array.
Additionally, the inverse standard deviation, σ_inverse, will also be computed
and returned. Finally, constant subsequences (i.e., subsequences with a standard
deviation of zero), will have a corresponding `True` value in its
`T_subseq_isconstant` array.
Parameters
----------
T : numpy.ndarray
Time series or sequence
m : int
Window size
T_subseq_isconstant : numpy.ndarray or function, default None
A boolean array that indicates whether a subsequence in `T` is constant
(True). Alternatively, a custom, user-defined function that returns a
boolean array that indicates whether a subsequence in `T` is constant
(True). The function must only take two arguments, `a`, a 1-D array,
and `w`, the window size, while additional arguments may be specified
by currying the user-defined function using `functools.partial`. Any
subsequence with at least one np.nan/np.inf will automatically have its
corresponding value set to False in this boolean array.
Returns
-------
T : numpy.ndarray
Modified time series
M_T : numpy.ndarray
Rolling mean with a subsequence length of `m`
Σ_T_inverse : numpy.ndarray
Inverted rolling standard deviation
M_T_m_1 : numpy.ndarray
Rolling mean with a subsequence length of `m-1`
T_subseq_isfinite : numpy.ndarray
A boolean array that indicates whether a subsequence in `T` contains a
`np.nan`/`np.inf` value (False)
T_subseq_isconstant : numpy.ndarray
A boolean array that indicates whether a subsequence in `T` is constant (True)
"""
T = _preprocess(T)
check_window_size(m, max_size=T.shape[-1])
T_subseq_isfinite = rolling_isfinite(T, m)
T[~np.isfinite(T)] = np.nan
T_subseq_isconstant = rolling_isconstant(T, m, T_subseq_isconstant)
T_subseq_isconstant = fix_isconstant_isfinite_conflicts(
T, m, T_subseq_isconstant, T_subseq_isfinite
)
T[np.isnan(T)] = 0
M_T, Σ_T = compute_mean_std(T, m)
Σ_T[T_subseq_isconstant] = 1.0 # Avoid divide by zero in next inversion step
Σ_T_inverse = 1.0 / Σ_T
M_T_m_1, _ = compute_mean_std(T, m - 1)
return T, M_T, Σ_T_inverse, M_T_m_1, T_subseq_isfinite, T_subseq_isconstant
def replace_distance(D, search_val, replace_val, epsilon=0.0):
"""
Replace values in distance array inplace
Parameters
----------
D : numpy.ndarray
Distance array
search_val : float
Value to search for
replace_val : float
Value to replace with
epsilon : float, default 0.0
Threshold below `search_val` in which to still allow for a replacement
Returns
-------
None
"""
D[D == search_val - epsilon] = replace_val
def array_to_temp_file(a):
"""
Write an array to a file
Parameters
----------
a : numpy.ndarray
A NumPy array to be written to a file
Returns
-------
fname : str
The output file name
"""
fname = tempfile.NamedTemporaryFile(delete=False)
fname = fname.name + ".npy"
np.save(fname, a, allow_pickle=False)
return fname
@njit(
# "i8[:](i8[:], i8, i8, i8)",
fastmath=True,
)
def _count_diagonal_ndist(diags, m, n_A, n_B):
"""
Count the number of distances that would be computed for each diagonal index
referenced in `diags`
Parameters
----------
diags : numpy.ndarray
The diagonal indices of interest
m : int
Window size
n_A : int
The length of time series `T_A`
n_B : int
The length of time series `T_B`
Returns
-------
diag_ndist_counts : numpy.ndarray
Counts of distances computed along each diagonal of interest
"""
diag_ndist_counts = np.zeros(diags.shape[0], dtype=np.int64)
for diag_idx in range(diags.shape[0]):
k = diags[diag_idx]
if k >= 0:
diag_ndist_counts[diag_idx] = min(n_B - m + 1 - k, n_A - m + 1)
else:
diag_ndist_counts[diag_idx] = min(n_B - m + 1, n_A - m + 1 + k)
return diag_ndist_counts
@njit(
# "i8[:, :](i8[:], i8, b1)"
)
def _get_array_ranges(a, n_chunks, truncate):
"""
Given an input array, split it into `n_chunks`.
Parameters
----------
a : numpy.ndarray
An array to be split
n_chunks : int
Number of chunks to split the array into
truncate : bool
If `truncate=True`, truncate the rows of `array_ranges` if there are not enough
elements in `a` to be chunked up into `n_chunks`. Otherwise, if
`truncate=False`, all extra chunks will have their start and stop indices set
to `a.shape[0]`.
Returns
-------
array_ranges : numpy.ndarray
A two column array where each row consists of a start and (exclusive) stop index
pair. The first column contains the start indices and the second column
contains the stop indices.
"""
array_ranges = np.zeros((n_chunks, 2), dtype=np.int64)
if a.shape[0] > 0 and n_chunks > 0:
cumsum = a.cumsum() / a.sum()
insert = np.linspace(0, 1, n_chunks + 1)[1:-1]
idx = 1 + np.searchsorted(cumsum, insert)
array_ranges[1:, 0] = idx # Fill the first column with start indices
array_ranges[:-1, 1] = idx # Fill the second column with exclusive stop indices
array_ranges[-1, 1] = a.shape[0] # Handle the stop index for the final chunk
diff_idx = np.diff(idx)
if np.any(diff_idx == 0):
row_truncation_idx = np.argmin(diff_idx) + 2
array_ranges[row_truncation_idx:, 0] = a.shape[0]
array_ranges[row_truncation_idx - 1 :, 1] = a.shape[0]
if truncate:
array_ranges = array_ranges[:row_truncation_idx]
return array_ranges
@njit(
# "i8[:, :](i8, i8, b1)"
)
def _get_ranges(size, n_chunks, truncate):
"""
Given a single input integer value, split an array of that length `size` evenly into
`n_chunks`.
This function is different from `_get_array_ranges` in that it does not take into
account the contents of the array and, instead, assumes that we are chunking up
`np.ones(size, dtype=np.int64)`. Additionally, the non-truncated sections may not
all appear at the end of the returned away (i.e., they may be scattered throughout
different rows of the array) but may be identified as having the same start and
stop indices.
Parameters
----------
size : int
The size or length of an array to chunk
n_chunks : int
Number of chunks to split the array into
truncate : bool
If `truncate=True`, truncate the rows of `array_ranges` if there are not enough
elements in `a` to be chunked up into `n_chunks`. Otherwise, if
`truncate=False`, all extra chunks will have their start and stop indices set
to `a.shape[0]`.
Returns
-------
array_ranges : numpy.ndarray
A two column array where each row consists of a start and (exclusive) stop index
pair. The first column contains the start indices and the second column
contains the stop indices.
"""
a = np.ones(size, dtype=np.int64)
array_ranges = np.zeros((n_chunks, 2), dtype=np.int64)
if a.shape[0] > 0 and n_chunks > 0:
cumsum = a.cumsum()
insert = np.linspace(0, a.sum(), n_chunks + 1)[1:-1]
idx = 1 + np.searchsorted(cumsum, insert)
array_ranges[1:, 0] = idx # Fill the first column with start indices
array_ranges[:-1, 1] = idx # Fill the second column with exclusive stop indices
array_ranges[-1, 1] = a.shape[0] # Handle the stop index for the final chunk
if truncate:
array_ranges = array_ranges[array_ranges[:, 0] != array_ranges[:, 1]]
return array_ranges
def _rolling_isfinite_1d(a, w):
"""
Determine if all elements in each rolling window `isfinite`
Parameters
----------
a : numpy.ndarray
The input array
w : int
The length of the rolling window
Returns
-------
output : numpy.ndarray
A boolean array of length `a.shape[0] - w + 1` that records whether each
rolling window subsequence contain all finite values
"""
if a.dtype == np.dtype("bool"):
a_isfinite = a.copy()
else:
a_isfinite = np.isfinite(a)
a_subseq_isfinite = rolling_window(a_isfinite, w)
# Process first subsequence
a_first_subseq = ~a_isfinite[:w]
if a_first_subseq.any():
a_isfinite[: np.flatnonzero(a_first_subseq).max()] = False
# Shift `a_isfinite` and fill forward by `w`
a_subseq_isfinite[~a_isfinite[w - 1 :]] = False
return a_isfinite[: a_isfinite.shape[0] - w + 1]
def rolling_isfinite(a, w):
"""
Compute the rolling `isfinite` for 1-D and 2-D arrays.
This a convenience wrapper around `_rolling_isfinite_1d`.
Parameters
----------
a : numpy.ndarray
The input array
w : numpy.ndarray
The rolling window size
Returns
-------
output : numpy.ndarray
Rolling window nanmax.
"""
axis = a.ndim - 1 # Account for rolling
return np.apply_along_axis(
lambda a_row, w: _rolling_isfinite_1d(a_row, w), axis=axis, arr=a, w=w
)
@njit(parallel=True, fastmath={"nsz", "arcp", "contract", "afn", "reassoc"})
def _rolling_isconstant(a, w):
"""
Compute the rolling isconstant for 1-D array.
This is accomplished by comparing the min and max within each window and
assigning `True` when the min and max are equal and `False` otherwise. If
a subsequence contains at least one NaN, then the subsequence is not constant.
Parameters
----------
a : numpy.ndarray
The input array
w : numpy.ndarray
The rolling window size
Returns
-------
output : numpy.ndarray
Rolling window isconstant.
"""
l = a.shape[0] - w + 1
out = np.empty(l)
for i in prange(l):
out[i] = np.ptp(a[i : i + w])
return out == 0
def rolling_isconstant(a, w, a_subseq_isconstant=None):
"""
Compute the rolling isconstant for 1-D and 2-D arrays.
This is accomplished by comparing the min and max within each window and
assigning `True` when the min and max are equal and `False` otherwise. If
a subsequence contains at least one NaN, then the subsequence is not constant.
Parameters
----------
a : numpy.ndarray
The input array
w : numpy.ndarray
The rolling window size
a_subseq_isconstant : np.ndarray or function, default None
A boolean array that indicates whether a subsequence in `a` is constant
(True). Alternatively, a custom, user-defined function that returns a
boolean array that indicates whether a subsequence in `a` is constant
(True). The function must only take two arguments, `a`, a 1-D array,
and `w`, the window size, while additional arguments may be specified
by currying the user-defined function using `functools.partial`. When
None, this defaults to `_rolling_isconstant`.
Returns
-------
a_subseq_isconstant : numpy.ndarray
Rolling window isconstant
"""
if a_subseq_isconstant is None:
a_subseq_isconstant = _rolling_isconstant
isconstant_func = None
if callable(a_subseq_isconstant):
incomp_args = _find_incompatible_args(a_subseq_isconstant, ["a", "w"])
if len(incomp_args) > 0: # pragma: no cover
msg = (
f"Incompatible arguments {incomp_args} found in `T_subseq_isconstant`. "
+ "Please provide the custom function `T_subseq_isconstant` with "
+ "arguments `a`, a 1-D array, and `w`, the window size."
)
raise ValueError(msg)
isconstant_func = a_subseq_isconstant
elif isinstance(a_subseq_isconstant, np.ndarray):
isconstant_func = None
if a.ndim != a_subseq_isconstant.ndim: # pragma: no cover
msg = (
"The arrays `a` and `a_subseq_isconstant` must have same "
+ "number of dimensions, {a.ndim} != {a_subseq_isconstant.ndim}"
)
raise ValueError(msg)
else: # pragma: no cover
msg = (
"`T_subseq_isconstant` must be of type `np.ndarray` or a callable "
+ f"function. Found {type(a_subseq_isconstant)} instead."
)
raise ValueError(msg)
if isconstant_func is not None:
axis = a.ndim - 1
a_subseq_isconstant = np.apply_along_axis(
lambda a_row, w: isconstant_func(a_row, w), axis=axis, arr=a, w=w
)
if not issubclass(a_subseq_isconstant.dtype.type, np.bool_): # pragma: no cover
msg = (
f"The output dtype of `T_subseq_isconstant` is {a_subseq_isconstant.dtype} "
+ "but dtype `np.bool` was expected"
)
raise ValueError(msg)
return a_subseq_isconstant
def fix_isconstant_isfinite_conflicts(
T, m, T_subseq_isconstant, T_subseq_isfinite=None
):
"""
Fix `T_subseq_isconstant` by setting its element to False if their
corresponding value in `T_subseq_isfinite` is False.
Parameters
----------
T : numpy.ndarray
Time series
m : int
Subsequence window size
T_subseq_isconstant : numpy.ndarray
A numpy array `dtype` of boolean that indicates whether a subsequence
is constant (True) or not (False).
T_subseq_isfinite : numpy.ndarray, default None
A boolean array that indicates whether a subsequence in `T` contains a
`np.nan`/`np.inf` value (False)
Returns
-------
fixed : numpy.ndarray
The same as input `T_subseq_isconstant` but with indices set to False
if their corresponding subsequence are not finite.
"""
if T_subseq_isfinite is None:
T_subseq_isfinite = rolling_isfinite(T, m)
fixed = np.logical_and(T_subseq_isconstant, T_subseq_isfinite)
conflicts = fixed != T_subseq_isconstant
if np.any(conflicts): # pragma: no cover
msg = (
f"Subsequences located at indices {np.nonzero(conflicts)} contain one "
+ "or more np.nan/np.inf and so their corresponding values in "
+ "`T_subseq_isconstant` have been automatically switched from True "
+ " to False."
)
warnings.warn(msg)
return fixed
def _get_partial_mp_func(mp_func, client=None, device_id=None):
"""
A convenience function for creating a `functools.partial` matrix profile function
for single server (parallel CPU), multi-server with Dask distributed (parallel CPU),
and multi-GPU implementations.
Parameters
----------
mp_func : function
The matrix profile function to be used for computing a matrix profile
client : client, default None
A Dask or Ray Distributed client. Setting up a distributed cluster is beyond
the scope of this library. Please refer to the Dask or Ray Distributed
documentation.
device_id : int or list, default None
The (GPU) device number to use. The default value is `0`. A list of
valid device ids (int) may also be provided for parallel GPU-STUMP
computation. A list of all valid device ids can be obtained by
executing `[device.id for device in numba.cuda.list_devices()]`.
Returns
-------
partial_mp_func : functools.partial
A generic matrix profile function that wraps the distributed `client` or GPU
`device_id` into `functools.partial` function
"""
if client is not None:
partial_mp_func = functools.partial(mp_func, client)
elif device_id is not None:
partial_mp_func = functools.partial(mp_func, device_id=device_id)
elif type(mp_func) != functools.partial:
partial_mp_func = functools.partial(mp_func)
else:
partial_mp_func = mp_func
return partial_mp_func
def _jagged_list_to_array(a, fill_value, dtype):
"""
Fits a 2d jagged list into a 2d numpy array of the specified dtype.
The resulting array will have a shape of (len(a), l), where l is the length
of the longest list in a. All other lists will be padded with `fill_value`.
Example:
[[2, 1, 1], [0]] with a fill value of -1 will become
np.array([[2, 1, 1], [0, -1, -1]])
Parameters
----------
a : list
Jagged list (list-of-lists) to be converted into an array.
fill_value : int or float
Missing entries will be filled with this value.
dtype : dtype
The desired data-type for the array.
Returns
-------
out : numpy.ndarray
The resulting array of dtype `dtype`.
"""
if not a:
return np.array([[]])
max_length = max([len(row) for row in a])
out = np.full((len(a), max_length), fill_value, dtype=dtype)
for i, row in enumerate(a):
out[i, : row.size] = row
return out
def _get_mask_slices(mask):
"""
For a boolean vector mask, return the (inclusive) start and (exclusive) stop
indices where the mask is `True`.
Parameters
----------
mask : numpy.ndarray
A boolean 1D array
Returns
-------
slices : numpy.ndarray
slices of indices where the mask is True. Each slice has a size of two:
The first number is the start index (inclusive)
The second number is the end index (exclusive)
"""
m1 = np.r_[0, mask]
m2 = np.r_[mask, 0]
(starts,) = np.where(~m1 & m2)
(ends,) = np.where(m1 & ~m2)
slices = np.c_[starts, ends]
return slices
def _idx_to_mp(
I, T, m, normalize=True, p=2.0, T_subseq_isconstant=None, check_neg=True
):
"""
Convert a set of matrix profile indices (including left and right indices) to its
corresponding matrix profile distances
Parameters
----------
I : numpy.ndarray
A 1D array of matrix profile indices
T : numpy.ndarray
Time series
m : int
Subsequence window size
normalize : bool, default True
When set to `True`, this z-normalizes subsequences prior to computing distances
p : float, default 2.0
The p-norm to apply for computing the Minkowski distance. This parameter is
ignored when `normalize == True`.
T_subseq_isconstant : bool, default None
A boolean value that indicates whether the ith subsequence in `T` is
constant (True). When `None`, it is computed by `rolling_isconstant`
check_neg : bool, default True
Check for the existence of negative indices
Returns
-------
P : numpy.ndarray
Matrix profile distances
"""
I = I.astype(np.int64)
T = T.copy()
if check_neg:
neg_idx = np.where(I < 0)[0]
if neg_idx.size > 0: # pragma: no cover
msg = f"A negative index value ({I[neg_idx[0]]}) was found "
msg += f"at I[{neg_idx[0]}] where a positive index value was "
msg += "expected (i.e., a negative index is considered null)."
warnings.warn(msg)
if normalize:
T_subseq_isconstant = rolling_isconstant(T, m, T_subseq_isconstant)
T_isfinite = np.isfinite(T)
T_subseq_isfinite = np.all(rolling_window(T_isfinite, m), axis=1)
T[~T_isfinite] = 0.0
T_subseqs = rolling_window(T, m)
nn_subseqs = T_subseqs[I]
if normalize:
P = linalg.norm(z_norm(T_subseqs, axis=1) - z_norm(nn_subseqs, axis=1), axis=1)
nn_subseq_isconstant = T_subseq_isconstant[I]
P[
T_subseq_isconstant & nn_subseq_isconstant
] = 0 # both subsequences are constant
P[np.logical_xor(T_subseq_isconstant, nn_subseq_isconstant)] = np.sqrt(
m
) # only one subsequence is constant
else:
P = linalg.norm(T_subseqs - nn_subseqs, axis=1, ord=p)
P[~T_subseq_isfinite] = np.inf
P[I < 0] = np.inf
return P
@njit(fastmath=True)
def _total_diagonal_ndists(tile_lower_diag, tile_upper_diag, tile_height, tile_width):
"""
Count the total number of distances covered by a range of diagonals
Parameters
----------
tile_lower_diag : int
The (inclusive) lower diagonal index
tile_upper_diag : int
The (exclusive) upper diagonal index
tile_height : int
The height of the tile
tile_width : int
The width of the tile
Returns
-------
out : int
The total number of distances
Notes
-----
This function essentially uses the "shoelace formula" to determine the area
of a simple polygon whose vertices are described by their Cartesian coordinates
in a plane.
"""
if tile_width < tile_height:
# Transpose inputs, adjust for inclusive/exclusive diags
tile_width, tile_height = tile_height, tile_width
tile_lower_diag, tile_upper_diag = 1 - tile_upper_diag, 1 - tile_lower_diag
if tile_lower_diag > tile_upper_diag: # pragma: no cover
# Swap diags
tile_lower_diag, tile_upper_diag = tile_upper_diag, tile_lower_diag
min_tile_diag = 1 - tile_height
max_tile_diag = tile_width # Exclusive
if (
tile_lower_diag < min_tile_diag
or tile_upper_diag < min_tile_diag
or tile_lower_diag > max_tile_diag
or tile_upper_diag > max_tile_diag
):
return 0
if tile_lower_diag == min_tile_diag and tile_upper_diag == max_tile_diag:
return tile_height * tile_width
# Determine polygon shape and establish vertices
if tile_lower_diag <= 0 and tile_upper_diag <= 0:
# lower trapezoid/triangle
lower_ndists = tile_height + tile_lower_diag
upper_ndists = tile_height + tile_upper_diag
vertices = np.array(
[
[tile_lower_diag, 0],
[tile_upper_diag, 0],
[-tile_height, upper_ndists],
[-tile_height, lower_ndists],
]
)
elif tile_lower_diag <= 0 and 0 < tile_upper_diag <= tile_width - tile_height:
# irregular hexagon/diamond
lower_ndists = tile_height + tile_lower_diag
upper_ndists = min(tile_height, tile_width - tile_upper_diag)
vertices = np.array(
[
[tile_lower_diag, 0],
[0, 0],
[0, tile_upper_diag],
[-upper_ndists, tile_upper_diag + upper_ndists],
[-tile_height, lower_ndists],
]
)
elif tile_lower_diag <= 0 and tile_upper_diag >= tile_width - tile_height:
# irregular hexagon/diamond
lower_ndists = tile_height + tile_lower_diag
upper_ndists = min(tile_height, tile_width - tile_upper_diag)
vertices = np.array(
[
[tile_lower_diag, 0],
[0, 0],
[0, tile_upper_diag],
[-upper_ndists, tile_upper_diag + upper_ndists],
[-tile_height, tile_width],
[-tile_height, lower_ndists],
]
)
elif (
0 < tile_lower_diag <= tile_width - tile_height
and 0 < tile_upper_diag <= tile_width - tile_height
):
# parallelogram
lower_ndists = min(tile_height, tile_width - tile_lower_diag)
upper_ndists = min(tile_height, tile_width - tile_upper_diag)
vertices = np.array(
[
[0, tile_lower_diag],
[0, tile_upper_diag],
[-upper_ndists, tile_upper_diag + upper_ndists],
[-tile_height, tile_lower_diag + lower_ndists],
]
)
elif (
0 < tile_lower_diag <= tile_width - tile_height
and tile_upper_diag > tile_width - tile_height
):
# upper diamond
lower_ndists = min(tile_height, tile_width - tile_lower_diag)
upper_ndists = tile_width - tile_upper_diag
vertices = np.array(
[
[0, tile_lower_diag],
[0, tile_upper_diag],
[-upper_ndists, tile_upper_diag + upper_ndists],
[-tile_height, tile_width],
[-tile_height, tile_lower_diag + lower_ndists],
]
)
else:
# tile_lower_diag > tile_width - tile_height and
# tile_upper_diag > tile_width - tile_height
# upper trapezoid
lower_ndists = tile_width - tile_lower_diag
upper_ndists = tile_width - tile_upper_diag
vertices = np.array(
[
[0, tile_lower_diag],
[0, tile_upper_diag],
[-upper_ndists, tile_upper_diag + upper_ndists],
[-lower_ndists, tile_width],
]
)
# Shoelace formula
i = np.arange(vertices.shape[0])
total_diagonal_ndists = np.abs(
np.sum(
vertices[i, 0] * vertices[i - 1, 1] - vertices[i, 1] * vertices[i - 1, 0]
)
/ 2
)
# Account for over/under-counting due to jagged upper/lower diagonal shapes
total_diagonal_ndists += lower_ndists / 2 - upper_ndists / 2
return int(total_diagonal_ndists)
def _bfs_indices(n, fill_value=None):
"""
Generate the level order indices from the implicit construction of a binary
search tree followed by a breadth first (level order) search.
Example:
If `n = 10` then the corresponding (zero-based index) balanced binary tree is:
5
* *
* *
* *
* *
* *
2 8
* * * *
* * * *
* * * *
1 4 7 9
* * *
0 3 6
And if we traverse the nodes at each level from left to right then the breadth
first search indices would be `[5, 2, 8, 1, 4, 7, 9, 0, 3, 6]`. In this function,
we avoid/skip the explicit construction of the binary tree and directly output
the desired indices efficiently. Note that it is not always possible to reconstruct
the position of the leaf nodes from the indices alone (i.e., a leaf node could be
attached to any parent node from a previous layer). For example, if `n = 9` then
the corresponding (zero-based index) balanced binary tree is:
4
* *
* *
* *
* *
* *
* *
* *
* *
2 7
* * * *
* * * *
* * * *
1 3 6 8
* *
0 5
And if we traverse the nodes at each level from left to right then the breadth
first search indices would be `[4, 2, 7, 1, 3, 6, 8, 0, 5]`. Notice that the parent
of index `5` is not index `1` or index `3`. Instead, it is index `6`! So, given only
the indices, it is not possible to easily reconstruct the full tree (i.e., whether a
given leaf index is a left or right child or which parent node that it should be
attached to). Therefore, to reduce the abiguity, you can choose to fill in the
missing leaf nodes by specifying a `fill_value = -1`, which will produce a modified
set of indices, `[ 4, 2, 7, 1, 3, 6, 8, 0, -1, -1, -1, 5, -1, -1, -1]` and
represents the following fully populated tree:
4
* *
* *
* *
* *
* *
* *
* *
* *
2 7
* * * *
* * * *
* * * *
1 3 6 8
* * * * * * * *
0 -1 -1 -1 5 -1 -1 -1
Parameters
----------
n : int
The number indices to generate the ordered indices for
fill_value : int, default None
The integer value to use to fill in any missing leaf nodes. A negative integer
is recommended as the only allowable indices being returned will be positive
whole numbers.
Returns
-------
level_idx : numpy.ndarray
The breadth first search (level order) indices
"""
if n == 1: # pragma: no cover
return np.array([0], dtype=np.int64)
nlevel = np.floor(np.log2(n) + 1).astype(np.int64)
nindices = np.power(2, np.arange(nlevel))
cumsum_nindices = np.cumsum(nindices)
nindices[-1] = n - cumsum_nindices[np.searchsorted(cumsum_nindices, n) - 1]
indices = np.empty((2, nindices.max()), dtype=np.int64)
indices[0, 0] = 0
indices[1, 0] = n
tmp_indices = np.empty((2, 2 * nindices.max()), dtype=np.int64)
out = np.empty(n, dtype=np.int64)
out_idx = 0
for nidx in nindices:
level_indices = (indices[0, :nidx] + indices[1, :nidx]) // 2
if out_idx + len(level_indices) < n:
tmp_indices[0, 0 : 2 * nidx : 2] = indices[0, :nidx]
tmp_indices[0, 1 : 2 * nidx : 2] = level_indices + 1
tmp_indices[1, 0 : 2 * nidx : 2] = level_indices
tmp_indices[1, 1 : 2 * nidx : 2] = indices[1, :nidx]
mask = tmp_indices[0, : 2 * nidx] < tmp_indices[1, : 2 * nidx]
mask_sum = np.count_nonzero(mask)
indices[0, :mask_sum] = tmp_indices[0, : 2 * nidx][mask]
indices[1, :mask_sum] = tmp_indices[1, : 2 * nidx][mask]
# for level_idx in level_indices:
# yield level_idx
out[out_idx : out_idx + len(level_indices)] = level_indices
out_idx += len(level_indices)
if fill_value is not None and nindices[-1] < np.power(2, nlevel):
fill_value = int(fill_value)
out = out[: -nindices[-1]]
last_row = np.empty(np.power(2, nlevel - 1), dtype=np.int64)
last_row[0::2] = out[-nindices[-2] :] - 1
last_row[1::2] = out[-nindices[-2] :] + 1
mask = np.isin(last_row, out[: nindices[-2]])
last_row[mask] = fill_value
last_row[last_row >= n] = fill_value
out = np.concatenate([out, last_row])
return out
def _contrast_pan(pan, threshold, bfs_indices, n_processed):
"""
Center the pan matrix profile (inplace) around the desired distance threshold
in order to increase the contrast
Parameters
----------
pan : numpy.ndarray
The pan matrix profile
threshold : float
The distance threshold value in which to center the pan matrix profile around
bfs_indices : numpy.ndarray
The breadth-first-search indices
n_processed : numpy.ndarray
The number of breadth-first-search indices to apply contrast to
Returns
-------
None
"""
idx = bfs_indices[:n_processed]
l = n_processed * pan.shape[1]
tmp = pan[idx].argsort(kind="mergesort", axis=None)
ranks = np.empty(l, dtype=np.int64)
ranks[tmp] = np.arange(l).astype(np.int64)
percentile = np.full(ranks.shape, np.nan)
percentile[:l] = np.linspace(0, 1, l)
percentile = percentile[ranks].reshape(pan[idx].shape)
pan[idx] = 1.0 / (1.0 + np.exp(-10 * (percentile - threshold)))
def _binarize_pan(pan, threshold, bfs_indices, n_processed):
"""
Binarize the pan matrix profile (inplace) such all values below the `threshold`
are set to `0.0` and all values above the `threshold` are set to `1.0`.
Parameters
----------
pan : numpy.ndarray
The pan matrix profile
threshold : float
The distance threshold value in which to center the pan matrix profile around
bfs_indices : numpy.ndarray
The breadth-first-search indices
n_processed : numpy.ndarray
The number of breadth-first-search indices to binarize
Returns
-------
None
"""
idx = bfs_indices[:n_processed]
pan[idx] = np.where(pan[idx] <= threshold, 0.0, 1.0)
def _select_P_ABBA_value(P_ABBA, k, custom_func=None):
"""
A convenience function for returning the `k`th smallest value from the `P_ABBA`
array or use a custom function to specify what `P_ABBA` value to return.
The MPdist distance measure considers two time series to be similar if they share
many subsequences, regardless of the order of matching subsequences. MPdist
concatenates the output of an AB-join and a BA-join and returns the `k`th smallest
value as the reported distance. Note that MPdist is a measure and not a metric.
Therefore, it does not obey the triangular inequality but the method is highly
scalable.
Parameters
----------
P_ABBA : numpy.ndarray
An unsorted array resulting from the concatenation of the outputs from an
AB-joinand BA-join for two time series, `T_A` and `T_B`
k : int
Specify the `k`th value in the concatenated matrix profiles to return. This
parameter is ignored when `custom_func` is not None.
custom_func : function, default None
A custom user defined function for selecting the desired value from the
unsorted `P_ABBA` array. This function may need to leverage `functools.partial`
and should take `P_ABBA` as its only input parameter and return a single
`MPdist` value. The `percentage` and `k` parameters are ignored when
`custom_func` is not None.
Returns
-------
MPdist : float
The matrix profile distance
"""
k = min(int(k), P_ABBA.shape[0] - 1)
if custom_func is not None:
MPdist = custom_func(P_ABBA)
else:
partition = np.partition(P_ABBA, k)
MPdist = partition[k]
if ~np.isfinite(MPdist):
partition[:k].sort()
k = max(0, np.count_nonzero(np.isfinite(partition[:k])) - 1)
MPdist = partition[k]
return MPdist
@njit()
def _merge_topk_PI(PA, PB, IA, IB):
"""
Merge two top-k matrix profiles `PA` and `PB`, and update `PA` (in place).
When the inputs are 1D arrays, PA[i] is updated if it is greater than PB[i] and
IA[i] != IB[i]. In such case, PA[i] and IA[i] are replaced with PB[i] and IB[i],
respectively. (Note that it might happen that IA[i]==IB[i] but PA[i] != PB[i].
This situation can occur if there is slight imprecision in numerical calculations.
In that case, we do not update PA[i] and IA[i]. While updating PA[i] and IA[i]
is harmless in this case, we avoid doing that so to be consistent with the merging
process when the inputs are 2D arrays)
When the inputs are 2D arrays, we always prioritize the values of `PA` over the
values of `PB` in case of ties. (i.e., values from `PB` are always inserted to
the right of values from `PA`). Also, update `IA` accordingly. In case of
overlapping values between two arrays IA[i] and IB[i], the ones in IB[i] (and
their corresponding values in PB[i]) are ignored throughout the updating process
of IA[i] (and PA[i]).
Unlike `_merge_topk_ρI`, where `top-k` largest values are kept, this function
keeps `top-k` smallest values.
Parameters
----------
PA : numpy.ndarray
A (top-k) matrix profile where values in each row are sorted in ascending
order. `PA` must be 1- or 2-dimensional.
PB : numpy.ndarray
A (top-k) matrix profile where values in each row are sorted in ascending
order. `PB` must have the same shape as `PA`.
IA : numpy.ndarray
A (top-k) matrix profile indices corresponding to `PA`
IB : numpy.ndarray
A (top-k) matrix profile indices corresponding to `PB`
Returns
-------
None
"""
if PA.ndim == 1:
mask = (PB < PA) & (IB != IA)
PA[mask] = PB[mask]
IA[mask] = IB[mask]
else:
k = PA.shape[1]
tmp_P = np.empty(k, dtype=np.float64)
tmp_I = np.empty(k, dtype=np.int64)
for i in range(PA.shape[0]):
overlap = set(IB[i]).intersection(set(IA[i]))
aj, bj = 0, 0
idx = 0
# 2 * k iterations are required to traverse both A and B if needed.
for _ in range(2 * k):
if idx >= k:
break
if bj < k and PB[i, bj] < PA[i, aj]:
if IB[i, bj] not in overlap:
tmp_P[idx] = PB[i, bj]
tmp_I[idx] = IB[i, bj]
idx += 1
bj += 1
else:
tmp_P[idx] = PA[i, aj]
tmp_I[idx] = IA[i, aj]
idx += 1
aj += 1
PA[i] = tmp_P
IA[i] = tmp_I
@njit()
def _merge_topk_ρI(ρA, ρB, IA, IB):
"""
Merge two top-k pearson profiles `ρA` and `ρB`, and update `ρA` (in place).
When the inputs are 1D arrays, ρA[i] is updated if it is less than ρB[i] and
IA[i] != IB[i]. In such case, ρA[i] and IA[i] are replaced with ρB[i] and IB[i],
respectively. (Note that it might happen that IA[i]==IB[i] but ρA[i] != ρB[i].
This situation can occur if there is slight imprecision in numerical calculations.
In that case, we do not update ρA[i] and IA[i]. While updating ρA[i] and IA[i]
is harmless in this case, we avoid doing that so to be consistent with the merging
process when the inputs are 2D arrays)
When the inputs are 2D arrays, we always prioritize the values of `ρA` over
the values of `ρB` in case of ties. (i.e., values from `ρB` are always inserted
to the left of values from `ρA`). Also, update `IA` accordingly. In case of
overlapping values between two arrays IA[i] and IB[i], the ones in IB[i] (and
their corresponding values in ρB[i]) are ignored throughout the updating process
of IA[i] (and ρA[i]).
Unlike `_merge_topk_PI`, where `top-k` smallest values are kept, this function
keeps `top-k` largest values.
Parameters
----------
ρA : numpy.ndarray
A (top-k) pearson profile where values in each row are sorted in ascending
order. `ρA` must be 1- or 2-dimensional.
ρB : numpy.ndarray
A (top-k) pearson profile, where values in each row are sorted in ascending
order. `ρB` must have the same shape as `ρA`.
IA : numpy.ndarray
A (top-k) matrix profile indices corresponding to `ρA`
IB : numpy.ndarray
A (top-k) matrix profile indices corresponding to `ρB`
Returns
-------
None
"""
if ρA.ndim == 1:
mask = (ρB > ρA) & (IB != IA)
ρA[mask] = ρB[mask]
IA[mask] = IB[mask]
else:
k = ρA.shape[1]
tmp_ρ = np.empty(k, dtype=np.float64)
tmp_I = np.empty(k, dtype=np.int64)
last_idx = k - 1
for i in range(len(ρA)):
overlap = set(IB[i]).intersection(set(IA[i]))
aj, bj = last_idx, last_idx
idx = last_idx
# 2 * k iterations are required to traverse both A and B if needed.
for _ in range(2 * k):
if idx < 0:
break
if bj >= 0 and ρB[i, bj] > ρA[i, aj]:
if IB[i, bj] not in overlap:
tmp_ρ[idx] = ρB[i, bj]
tmp_I[idx] = IB[i, bj]
idx -= 1
bj -= 1
else:
tmp_ρ[idx] = ρA[i, aj]
tmp_I[idx] = IA[i, aj]
idx -= 1
aj -= 1
ρA[i] = tmp_ρ
IA[i] = tmp_I
@njit()
def _shift_insert_at_index(a, idx, v, shift="right"):
"""
If `shift=right` (default), all elements in `a[idx:]` are shifted to the right by
one element, `v` in inserted at index `idx` and the last element is discarded.
If `shift=left`, all elements in `a[:idx]` are shifted to the left by one element,
`v` in inserted at index `idx-1`, and the first element is discarded. In both cases,
`a` is updated in place and its length remains unchanged.
Note that all unrecognized `shift` inputs will default to `shift=right`.
Parameters
----------
a : numpy.ndarray
A 1d array
idx : int
The index at which the value `v` should be inserted. This can be any
integer number from `0` to `len(a)`. When `idx=len(a)` and `shift="right"`,
OR when `idx=0` and `shift="left"`, then no change will occur on
the input array `a`.
v : float
The value that should be inserted into array `a` at index `idx`
shift : str, default "right"
The value that indicates whether the shifting of elements should be towards
the right or left. If `shift="right"` (default), all elements in `a[idx:]`
are shifted to the right by one element. If `shift="left"`, all elements
in `a[:idx]` are shifted to the left by one element.
Returns
-------
None
"""
if shift == "left":
if 0 < idx <= len(a):
a[: idx - 1] = a[1:idx]
# elements were shifted to the left, thus the insertion index becomes
# `idx-1`
a[idx - 1] = v
else:
if 0 <= idx < len(a):
a[idx + 1 :] = a[idx:-1]
a[idx] = v
def _check_P(P, threshold=1e-6):
"""
Check if the 1-dimensional matrix profile values are too small and
log a warning the if true.
Parameters
----------
P : numpy.ndarray
A 1-dimensional matrix profile
threshold : float, default 1e-6
A distance threshold
Returns
-------
None
"""
if P.ndim != 1:
raise ValueError("`P` was {P.ndim}-dimensional and must be 1-dimensional")
if are_distances_too_small(P, threshold=threshold): # pragma: no cover
msg = f"A large number of values in `P` are smaller than {threshold}.\n"
msg += "For a self-join, try setting `ignore_trivial=True`."
warnings.warn(msg)
def _find_matches(
D, excl_zone, max_distance=None, max_matches=None, query_idx=None, atol=1e-8
):
"""
Find all matches of a query `Q` whose distance profile with `T` is `D`.
Parameters
----------
D : numpy.ndarray
The distance profile of `Q` with `T`. It is a 1D numpy array of size
`len(T)-len(Q)+1`, where `D[i]` is the distance between query `Q` and
`T[i : i + len(Q)]`.
excl_zone : int
Size of the exclusion zone. That is, after finding the next-best-match
located at index `idx`, we ignore subsequences with start index in range
(idx - excl_zone, idx + excl_zone + 1).
max_distance : float or function, default None
Maximum distance between `Q` and a subsequence `S` for `S` to be considered a
match.
If a function, then it has to be a function of one argument `D`, which will be
the distance profile of `Q` with `T` (a 1D numpy array of size `n-m+1`).
If None, this defaults to
`np.nanmax([np.nanmean(D) - 2 * np.nanstd(D), np.nanmin(D)])` (i.e. at
least the closest match will be returned).
max_matches : int, default None
The maximum amount of similar occurrences to be returned. The resulting
occurrences are sorted by distance, so a value of `10` means that the
indices of the most similar `10` subsequences is returned. If `None`, then all
occurrences are returned.
query_idx : int, default None
This is the index position along the time series, `T`, where the query
subsequence, `Q`, is located.
`query_idx` should only be used when the matrix profile is a self-join and
should be set to `None` for matrix profiles computed from AB-joins.
If `query_idx` is set to a specific integer value, then this will help ensure
that the self-match will be returned first.
atol : float, default 1e-8
The absolute tolerance parameter. This value will be added to `max_distance`
when comparing distances between subsequences.
Returns
-------
out : numpy.ndarray
The first column consists of values selected from `D`. These are the distances
of subsequences of `T` whose distances to `Q` are less than or equal to
`max_distance`, sorted by distance (lowest to highest). The second column
consists of the corresponding indices in `D`. These are in fact the start index
of susequences in `T` selected as the match of `Q`.
"""
D = D.copy()
if max_distance is None:
def max_distance(D):
D_copy = D.copy().astype(np.float64)
D_copy[np.isinf(D_copy)] = np.nan
return np.nanmax(
[np.nanmean(D_copy) - 2.0 * np.nanstd(D_copy), np.nanmin(D_copy)]
)
if not isinstance(max_distance, float):
max_distance = max_distance(D)
if max_matches is None:
max_matches = np.inf
if query_idx is not None:
candidate_idx = query_idx
else:
candidate_idx = np.argmin(D)
matches = []
for _ in range(len(D)):
if (
D[candidate_idx] > atol + max_distance
or ~np.isfinite(D[candidate_idx])
or len(matches) >= max_matches
):
break
matches.append([D[candidate_idx], candidate_idx])
apply_exclusion_zone(D, candidate_idx, excl_zone, np.inf)
candidate_idx = np.argmin(D)
return np.array(matches, dtype=object)
@cuda.jit(device=True)
def _gpu_searchsorted_left(a, v, bfs, nlevel):
"""
A device function, equivalent to numpy.searchsorted(a, v, side='left')
Parameters
----------
a : numpy.ndarray
1-dim array sorted in ascending order.
v : float
Value to insert into array `a`
bfs : numpy.ndarray
The breadth-first-search indices where the missing leaves of its corresponding
binary search tree are filled with -1.
nlevel : int
The number of levels in the binary search tree from which the array
`bfs` is obtained.
Returns
-------
idx : int
The index of the insertion point
"""
n = a.shape[0]
idx = 0
for level in range(nlevel):
if v <= a[bfs[idx]]:
next_idx = 2 * idx + 1
else:
next_idx = 2 * idx + 2
if level == nlevel - 1 or bfs[next_idx] < 0:
if v <= a[bfs[idx]]:
idx = max(bfs[idx], 0)
else:
idx = min(bfs[idx] + 1, n)
break
idx = next_idx
return idx
@cuda.jit(device=True)
def _gpu_searchsorted_right(a, v, bfs, nlevel):
"""
A device function, equivalent to numpy.searchsorted(a, v, side='right')
Parameters
----------
a : numpy.ndarray
1-dim array sorted in ascending order.
v : float
Value to insert into array `a`
bfs : numpy.ndarray
The breadth-first-search indices where the missing leaves of its corresponding
binary search tree are filled with -1.
nlevel : int
The number of levels in the binary search tree from which the array
`bfs` is obtained.
Returns
-------
idx : int
The index of the insertion point
"""
n = a.shape[0]
idx = 0
for level in range(nlevel):
if v < a[bfs[idx]]:
next_idx = 2 * idx + 1
else:
next_idx = 2 * idx + 2
if level == nlevel - 1 or bfs[next_idx] < 0:
if v < a[bfs[idx]]:
idx = max(bfs[idx], 0)
else:
idx = min(bfs[idx] + 1, n)
break
idx = next_idx
return idx
def check_ignore_trivial(T_A, T_B, ignore_trivial):
"""
Check inputs and verify the appropriateness for self-joins vs AB-joins and
provides relevant warnings.
Note that the warnings will output the first occurrence of matching warnings
for each location (module + line number) where the warning is issued
Parameters
----------
T_A : numpy.ndarray
The time series or sequence for which to compute the matrix profile
T_B : numpy.ndarray
The time series or sequence that will be used to annotate T_A. For every
subsequence in T_A, its nearest neighbor in T_B will be recorded. Default is
`None` which corresponds to a self-join.
ignore_trivial : bool
Set to `True` if this is a self-join. Otherwise, for AB-join, set this
to `False`.
Returns
-------
ignore_trivial : bool
The (corrected) ignore_trivial value
Notes
-----
These warnings may be supresse by using a context manager
```
import stumpy
import numpy as np
import warnings
T = np.random.rand(10_000)
m = 50
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message="Arrays T_A, T_B are equal")
for _ in range(5):
stumpy.stump(T, m, T, ignore_trivial=False)
```
"""
if ignore_trivial is False and are_arrays_equal(T_A, T_B): # pragma: no cover
msg = "Arrays T_A, T_B are equal, which implies a self-join. "
msg += "Try setting `ignore_trivial = True`."
warnings.warn(msg)
if ignore_trivial and are_arrays_equal(T_A, T_B) is False: # pragma: no cover
msg = "Arrays T_A, T_B are not equal, which implies an AB-join. "
msg += "`ignore_trivial` has been automatically set to `False`."
warnings.warn(msg)
ignore_trivial = False
return ignore_trivial
def _client_to_func(client):
"""
Based on the client information and the parent function calling this
function, infer the name of the client function to return
For example, if the parent function calling `_client_to_func` is called
`stumped` and the `client` is a Dask client, then `_dask_` will be
prepended to the string `calling_func` and the resulting function
called `_dask_stumped` will be returned. For a Ray client, the function
caled `_ray_stumped` will be returned. Note that it is the responsibility
of the caller to ensure that the resulting derived function exists. Otherwise,
this will likely result in a `ModuleNotFoundError`.
Parameters
----------
client : client
A Dask or Ray Distributed client. Setting up a distributed cluster is beyond
the scope of this library. Please refer to the Dask or Ray Distributed
documentation.
Returns
-------
func : function
The correct function for a client
"""
if client.__class__.__name__.startswith("Client"):
prefix = "_dask_"
# elif inspect.ismodule(client) and str(client).startswith(
# "<module 'ray'"
# ): # pragma: no cover
# prefix = "_ray_"
else:
msg = f"Distributed client `{client}` is unrecognized or "
msg += "has yet to be implemented"
raise NotImplementedError(msg)
calling_func = inspect.stack()[1].function
module = __import__(
calling_func,
globals(),
locals(),
level=1,
fromlist=[prefix + calling_func],
)
func = getattr(module, prefix + calling_func)
return func
def _find_incompatible_args(func, required_args):
"""
For a given `func` and `requried_args`, return non-default
arguments in `func` that are not in `required_args`
Parameters
----------
func : function
A function
required_args : list
A list containing the name of required arguments.
Returns
-------
out : set
A set of non-default arguments in `func` which are not in
required_args
"""
if not isinstance(required_args, list): # pragma: no cover
required_args = list(required_args)
non_default_args = []
for arg_name, arg in inspect.signature(func).parameters.items():
# inspect.signature(functools.partial(func)) returns all arguments
# including the ones with default values. the following if block
# is to find non-default arguments.
if arg.default == inspect.Parameter.empty:
non_default_args.append(arg_name)
return set(non_default_args).difference(set(required_args))
def _preprocess_include(include):
"""
A utility function for processing the `include` input
Parameters
----------
include : numpy.ndarray
A list of (zero-based) indices corresponding to the dimensions in `T` that
must be included in the constrained multidimensional motif search.
For more information, see Section IV D in:
`DOI: 10.1109/ICDM.2017.66 \
<https://www.cs.ucr.edu/~eamonn/Motif_Discovery_ICDM.pdf>`__
Returns
-------
include : numpy.ndarray
Process `include` and remove any redundant index values
"""
include = np.asarray(include)
_, idx = np.unique(include, return_index=True)
if include.shape[0] != idx.shape[0]: # pragma: no cover
warnings.warn("Removed repeating indices in `include`")
include = include[np.sort(idx)]
return include
def _apply_include(
D,
include,
restricted_indices=None,
unrestricted_indices=None,
mask=None,
tmp_swap=None,
):
"""
Apply a transformation to the multi-dimensional distance profile so that specific
dimensions are always included. Essentially, it is swapping rows within the distance
profile.
Parameters
----------
D : numpy.ndarray
The multi-dimensional distance profile
include : numpy.ndarray
A list of (zero-based) indices corresponding to the dimensions in `T` that
must be included in the constrained multidimensional motif search.
For more information, see Section IV D in:
`DOI: 10.1109/ICDM.2017.66 \
<https://www.cs.ucr.edu/~eamonn/Motif_Discovery_ICDM.pdf>`__
restricted_indices : numpy.ndarray, default None
A list of indices specified in `include` that reside in the first
`include.shape[0]` rows
unrestricted_indices : numpy.ndarray, default None
A list of indices specified in `include` that do not reside in the first
`include.shape[0]` rows
mask : numpy.ndarray, default None
A boolean mask to select for unrestricted indices
tmp_swap : numpy.ndarray, default None
A reusable array to aid in array element swapping
Returns
-------
None
"""
include = _preprocess_include(include)
if restricted_indices is None:
restricted_indices = include[include < include.shape[0]]
if unrestricted_indices is None:
unrestricted_indices = include[include >= include.shape[0]]
if mask is None:
mask = np.ones(include.shape[0], dtype=bool)
mask[restricted_indices] = False
if tmp_swap is None:
tmp_swap = D[: include.shape[0]].copy()
else:
tmp_swap[:] = D[: include.shape[0]]
D[: include.shape[0]] = D[include]
D[unrestricted_indices] = tmp_swap[mask]
def _subspace(D, k, include=None, discords=False):
"""
Compute the k-dimensional matrix profile subspace for a given subsequence index and
its nearest neighbor index
Parameters
----------
D : numpy.ndarray
The multi-dimensional distance profile
k : int
The subset number of dimensions out of `D = T.shape[0]`-dimensions to return
the subspace for. Note that zero-based indexing is used.
include : numpy.ndarray, default None
A list of (zero-based) indices corresponding to the dimensions in `T` that
must be included in the constrained multidimensional motif search.
For more information, see Section IV D in:
`DOI: 10.1109/ICDM.2017.66 \
<https://www.cs.ucr.edu/~eamonn/Motif_Discovery_ICDM.pdf>`__
discords : bool, default False
When set to `True`, this reverses the distance profile to favor discords rather
than motifs. Note that indices in `include` are still maintained and respected.
Returns
-------
S : numpy.ndarray
An array that contains the `k`th-dimensional subspace for the subsequence. Note
that `k+1` rows will be returned.
"""
if discords:
sorted_idx = D[::-1].argsort(axis=0, kind="mergesort")
else:
sorted_idx = D.argsort(axis=0, kind="mergesort")
# `include` processing occur here since we are dealing with indices, not distances
if include is not None:
include = _preprocess_include(include)
mask = np.in1d(sorted_idx, include)
include_idx = mask.nonzero()[0]
exclude_idx = (~mask).nonzero()[0]
sorted_idx[: include_idx.shape[0]], sorted_idx[include_idx.shape[0] :] = (
sorted_idx[include_idx],
sorted_idx[exclude_idx],
)
S = sorted_idx[: k + 1]
return S
def _mdl(disc_subseqs, disc_neighbors, S, n_bit=8):
"""
Compute the number of bits needed to compress one array with another
using the minimum description length (MDL)
Parameters
----------
disc_subseqs : numpy.ndarray
The discretized array to be compressed
disc_neighbors : numpy.ndarray
The discretized array that will be used as a hypothesis for compression
S : numpy.ndarray
An array that contains the `k`th-dimensional subspace to be used
n_bit : int, default 8
The number of bits to use for computing the bit size
Returns
-------
bit_size : float
The total number of bits computed from MDL for representing both input arrays
"""
ndim = disc_subseqs.shape[0]
sub_dims, m = disc_subseqs[S].shape
n_val = len(np.unique(disc_subseqs[S] - disc_neighbors[S]))
bit_size = n_bit * (2 * ndim * m - sub_dims * m)
bit_size = bit_size + sub_dims * m * np.log2(n_val) + n_val * n_bit
return bit_size
@njit(
# "(i8, i8, f8[:, :], f8[:], i8, f8[:, :], i8[:, :], f8)",
parallel=True,
fastmath={"nsz", "arcp", "contract", "afn", "reassoc"},
)
def _compute_multi_PI(d, idx, D, D_prime, range_start, P, I, p=2.0):
"""
A Numba JIT-compiled version of mSTOMP for updating the matrix profile and matrix
profile indices
Parameters
----------
d : int
The total number of dimensions in `T`
idx : int
The subsequence index for the i-th time series, `T[i]`
D : numpy.ndarray
The distance profile
D_prime : numpy.ndarray
A reusable array for storing the column-wise cumulative sum of `D`
range_start : int
The starting index value along `T` for which to start the matrix
profile calculation
P : numpy.ndarray
The matrix profile
I : numpy.ndarray
The matrix profile indices
p : float, default 2.0
The p-norm to apply for computing the Minkowski distance. Minkowski distance is
typically used with `p` being 1 or 2, which correspond to the Manhattan distance
and the Euclidean distance, respectively.
Returns
-------
None
"""
D_prime[:] = 0.0
for i in range(d):
D_prime = D_prime + np.power(D[i], 1.0 / p)
min_index = np.argmin(D_prime)
pos = idx - range_start
I[i, pos] = min_index
P[i, pos] = D_prime[min_index] / (i + 1)
if np.isinf(P[i, pos]): # pragma nocover
I[i, pos] = -1
def _compute_P_ABBA(
T_A,
T_B,
m,
P_ABBA,
partial_mp_func,
client=None,
device_id=None,
):
"""
A convenience function for computing the (unsorted) concatenated matrix profiles
from an AB-join and BA-join for the two time series, `T_A` and `T_B`. This result
can then be used to compute the matrix profile distance (MPdist) measure.
The MPdist distance measure considers two time series to be similar if they share
many subsequences, regardless of the order of matching subsequences. MPdist
concatenates the output of an AB-join and a BA-join and returns the `k`th smallest
value as the reported distance. Note that MPdist is a measure and not a metric.
Therefore, it does not obey the triangular inequality but the method is highly
scalable.
Parameters
----------
T_A : numpy.ndarray
The first time series or sequence for which to compute the matrix profile
T_B : numpy.ndarray
The second time series or sequence for which to compute the matrix profile
m : int
Window size
P_ABBA : numpy.ndarray
The output array to write the concatenated AB-join and BA-join results to
partial_mp_func : functools.partial
A generic matrix profile function that wraps extra parameters into
`functools.partial` function
client : client, default None
A Dask or Ray Distributed client. Setting up a distributed cluster is beyond
the scope of this library. Please refer to the Dask or Ray Distributed
documentation.
device_id : int or list, default None
The (GPU) device number to use. The default value is `0`. A list of
valid device ids (int) may also be provided for parallel GPU-STUMP
computation. A list of all valid device ids can be obtained by
executing `[device.id for device in numba.cuda.list_devices()]`.
Returns
-------
None
Notes
-----
`DOI: 10.1109/ICDM.2018.00119 \
<https://www.cs.ucr.edu/~eamonn/MPdist_Expanded.pdf>`__
See Section III
"""
partial_mp_func = _get_partial_mp_func(
partial_mp_func, client=client, device_id=device_id
)
n_A = T_A.shape[0]
if inspect.signature(partial_mp_func).parameters.get("normalize") is not None:
# Normalized (stump-like)
# Only normalized mp funcs can have a "normalize" parameter in its function
# signature
params = partial_mp_func.keywords
T_A_subseq_isconstant = params.get("T_A_subseq_isconstant")
T_B_subseq_isconstant = params.get("T_B_subseq_isconstant")
P_ABBA[: n_A - m + 1] = partial_mp_func(
T_A,
m,
T_B,
ignore_trivial=False,
T_A_subseq_isconstant=T_A_subseq_isconstant,
T_B_subseq_isconstant=T_B_subseq_isconstant,
)[:, 0]
P_ABBA[n_A - m + 1 :] = partial_mp_func(
T_B,
m,
T_A,
ignore_trivial=False,
T_A_subseq_isconstant=T_B_subseq_isconstant,
T_B_subseq_isconstant=T_A_subseq_isconstant,
)[:, 0]
else:
# Non-normalized (aamp-like)
# Ignore/omit `T_A_subseq_isconstant` and `T_B_subseq_isconstant` parameters
# for all non-normalized mp funcs
P_ABBA[: n_A - m + 1] = partial_mp_func(T_A, m, T_B, ignore_trivial=False)[:, 0]
P_ABBA[n_A - m + 1 :] = partial_mp_func(T_B, m, T_A, ignore_trivial=False)[:, 0]
def _mpdist(
T_A,
T_B,
m,
partial_mp_func,
percentage=0.05,
k=None,
client=None,
device_id=None,
custom_func=None,
):
"""
A convenience function for computing the matrix profile distance (MPdist) measure
between any two time series.
The MPdist distance measure considers two time series to be similar if they share
many subsequences, regardless of the order of matching subsequences. MPdist
concatenates the output of an AB-join and a BA-join and returns the `k`th smallest
value as the reported distance. Note that MPdist is a measure and not a metric.
Therefore, it does not obey the triangular inequality but the method is highly
scalable.
Parameters
----------
T_A : numpy.ndarray
The first time series or sequence for which to compute the matrix profile
T_B : numpy.ndarray
The second time series or sequence for which to compute the matrix profile
m : int
Window size
partial_mp_func : functools.partial
A generic matrix profile function that wraps extra parameters into
`functools.partial` function.
percentage : float, 0.05
The percentage of distances that will be used to report `mpdist`. The value
is between 0.0 and 1.0. This parameter is ignored when `k` is not `None` or when
`k_func` is not None.
k : int, default None
Specify the `k`th value in the concatenated matrix profiles to return. When `k`
is not `None`, then the `percentage` parameter is ignored. This parameter is
ignored when `k_func` is not None.
client : client, default None
A Dask or Ray Distributed client. Setting up a distributed cluster is beyond
the scope of this library. Please refer to the Dask or Ray Distributed
documentation.
device_id : int or list, default None
The (GPU) device number to use. The default value is `0`. A list of
valid device ids (int) may also be provided for parallel GPU-STUMP
computation. A list of all valid device ids can be obtained by
executing `[device.id for device in numba.cuda.list_devices()]`.
custom_func : function, default None
A custom user defined function for selecting the desired value from the
unsorted `P_ABBA` array. This function may need to leverage `functools.partial`
and should take `P_ABBA` as its only input parameter and return a single
`MPdist` value. The `percentage` and `k` parameters are ignored when
`custom_func` is not None.
Returns
-------
MPdist : float
The matrix profile distance
Notes
-----
`DOI: 10.1109/ICDM.2018.00119 \
<https://www.cs.ucr.edu/~eamonn/MPdist_Expanded.pdf>`__
See Section III
"""
n_A = T_A.shape[0]
n_B = T_B.shape[0]
P_ABBA = np.empty(n_A - m + 1 + n_B - m + 1, dtype=np.float64)
_compute_P_ABBA(
T_A,
T_B,
m,
P_ABBA,
partial_mp_func,
client,
device_id,
)
if k is not None:
k = min(int(k), P_ABBA.shape[0] - 1)
else:
percentage = np.clip(percentage, 0.0, 1.0)
k = min(math.ceil(percentage * (n_A + n_B)), n_A - m + 1 + n_B - m + 1 - 1)
MPdist = _select_P_ABBA_value(P_ABBA, k, custom_func)
return MPdist