Source code for stumpy.stumpi

# Copyright 2019 TD Ameritrade. Released under the terms of the 3-Clause BSD license.
# STUMPY is a trademark of TD Ameritrade IP Company, Inc. All rights reserved.

import numpy as np

from . import config, core, stump
from .aampi import aampi

[docs] @core.non_normalized( aampi, exclude=[ "normalize", "T_subseq_isconstant_func", ], ) class stumpi: """ Compute an incremental z-normalized matrix profile for streaming data This is based on the on-line STOMPI and STAMPI algorithms. Parameters ---------- T : numpy.ndarray The time series or sequence for which the matrix profile and matrix profile indices will be returned m : int Window size egress : bool, default True If set to `True`, the oldest data point in the time series is removed and the time series length remains constant rather than forever increasing normalize : bool, default True When set to `True`, this z-normalizes subsequences prior to computing distances. Otherwise, this class gets re-routed to its complementary non-normalized equivalent set in the `@core.non_normalized` class decorator. p : float, default 2.0 The p-norm to apply for computing the Minkowski distance. This parameter is ignored when `normalize == True`. k : int, default 1 The number of top `k` smallest distances used to construct the matrix profile. Note that this will increase the total computational time and memory usage when k > 1. mp : numpy.ndarry, default None A pre-computed matrix profile (and corresponding matrix profile indices). This is a 2D array of shape `(len(T) - m + 1, 2 * k + 2)`, where the first `k` columns are top-k matrix profile, and the next `k` columns are their corresponding indices. The last two columns correspond to the top-1 left and top-1 right matrix profile indices. When None (default), this array is computed internally using `stumpy.stump`. T_subseq_isconstant_func : function, default None A custom, user-defined function that returns a boolean array that indicates whether a subsequence in `T` is constant (True). The function must only take two arguments, `a`, a 1-D array, and `w`, the window size, while additional arguments may be specified by currying the user-defined function using `functools.partial`. Any subsequence with at least one np.nan/np.inf will automatically have its corresponding value set to False in this boolean array. Attributes ---------- P_ : numpy.ndarray The updated (top-k) matrix profile for `T`. When `k=1` (default), the first (and only) column in this 2D array consists of the matrix profile. When `k > 1`, the output has exactly `k` columns consisting of the top-k matrix profile. I_ : numpy.ndarray The updated (top-k) matrix profile indices for `T`. When `k=1` (default), the first (and only) column in this 2D array consists of the matrix profile indices. When `k > 1`, the output has exactly `k` columns consisting of the top-k matrix profile indices. left_P_ : numpy.ndarray The updated left (top-1) matrix profile for `T` left_I_ : numpy.ndarray The updated left (top-1) matrix profile indices for `T` T_ : numpy.ndarray The updated time series or sequence for which the matrix profile and matrix profile indices are computed Methods ------- update(t) Append a single new data point, `t`, to the time series, `T`, and update the matrix profile Notes ----- `DOI: 10.1007/s10618-017-0519-9 \ <>`__ See Table V Note that line 11 is missing an important `sqrt` operation! Examples -------- >>> import stumpy >>> import numpy as np >>> stream = stumpy.stumpi( ... np.array([584., -11., 23., 79., 1001., 0.]), ... m=3) >>> stream.update(-19.0) >>> stream.left_P_ array([ inf, 3.00009263, 2.69407392, 3.05656417]) >>> stream.left_I_ array([-1, 0, 1, 2]) """ def __init__( self, T, m, egress=True, normalize=True, p=2.0, k=1, mp=None, T_subseq_isconstant_func=None, ): """ Initialize the `stumpi` object Parameters ---------- T : numpy.ndarray The time series or sequence for which the matrix profile and matrix profile indices will be returned m : int Window size egress : bool, default True If set to `True`, the oldest data point in the time series is removed and the time series length remains constant rather than forever increasing normalize : bool, default True When set to `True`, this z-normalizes subsequences prior to computing distances. Otherwise, this class gets re-routed to its complementary non-normalized equivalent set in the `@core.non_normalized` class decorator. p : float, default 2.0 The p-norm to apply for computing the Minkowski distance. Minkowski distance is typically used with `p` being 1 or 2, which correspond to the Manhattan distance and the Euclidean distance, respectively.This parameter is ignored when `normalize == True`. k : int, default 1 The number of top `k` smallest distances used to construct the matrix profile. Note that this will increase the total computational time and memory usage when `k > 1`. mp : numpy.ndarry, default None A pre-computed matrix profile (and corresponding matrix profile indices). This is a 2D array of shape `(len(T) - m + 1, 2 * k + 2)`, where the first `k` columns are top-k matrix profile, and the next `k` columns are their corresponding indices. The last two columns correspond to the top-1 left and top-1 right matrix profile indices. When None (default), this array is computed internally using `stumpy.stump`. T_subseq_isconstant_func : function, default None A custom, user-defined function that returns a boolean array that indicates whether a subsequence in `T` is constant (True). The function must only take two arguments, `a`, a 1-D array, and `w`, the window size, while additional arguments may be specified by currying the user-defined function using `functools.partial`. Any subsequence with at least one np.nan/np.inf will automatically have its corresponding value set to False in this boolean array. """ self._T = core._preprocess(T) core.check_window_size(m, max_size=self._T.shape[-1]) self._m = m self._k = k if T_subseq_isconstant_func is None: T_subseq_isconstant_func = core._rolling_isconstant if not callable(T_subseq_isconstant_func): # pragma: no cover msg = ( "`T_subseq_isconstant_func` was expected to be a callable function " + f"but {type(T_subseq_isconstant_func)} was found." ) raise ValueError(msg) self._T_subseq_isconstant_func = T_subseq_isconstant_func self._n = self._T.shape[0] self._excl_zone = int(np.ceil(self._m / config.STUMPY_EXCL_ZONE_DENOM)) self._T_isfinite = np.isfinite(self._T) self._egress = egress self._T_subseq_isconstant = core.process_isconstant( self._T, self._m, self._T_subseq_isconstant_func ) if mp is None: mp = stump( self._T, self._m, k=self._k, T_A_subseq_isconstant=self._T_subseq_isconstant, ) else: mp = mp.copy() if mp.shape != ( len(self._T) - self._m + 1, 2 * self._k + 2, ): # pragma: no cover msg = ( f"The shape of `mp` must match ({len(T)-m+1}, {2 * k + 2}) but " + f"found {mp.shape} instead." ) raise ValueError(msg) self._P = mp[:, : self._k].astype(np.float64) self._I = mp[:, self._k : 2 * self._k].astype(np.int64) self._left_I = mp[:, 2 * self._k].astype(np.int64) self._left_P = np.full_like(self._left_I, np.inf, dtype=np.float64) self._T, self._M_T, self._Σ_T, self._T_subseq_isconstant = core.preprocess( self._T, self._m, T_subseq_isconstant=self._T_subseq_isconstant ) # Retrieve the left matrix profile values # Since each (top-1) matrix profile value is the minimum between the left # and right matrix profile values, we can save time by re-computing only # the left matrix profile value when the (top-1) matrix profile index is # equal to the right matrix profile index. mask = self._left_I == self._I[:, 0] self._left_P[mask] = self._P[mask, 0] # Only re-compute the `i`-th left matrix profile value, `self._left_P[i]`, # when `self._left_I[i] != self._I[i, 0]` for i in np.flatnonzero(self._left_I >= 0 & ~mask): j = self._left_I[i] QT =[i : i + self._m], self._T[j : j + self._m]) D_square = core._calculate_squared_distance( self._m, QT, self._M_T[i], self._Σ_T[i], self._M_T[j], self._Σ_T[j], self._T_subseq_isconstant[i], self._T_subseq_isconstant[j], ) self._left_P[i] = np.sqrt(D_square) Q = self._T[-self._m :] self._QT = core.sliding_dot_product(Q, self._T) if self._egress: self._QT_new = np.empty(self._QT.shape[0], dtype=np.float64) self._n_appended = 0 def update(self, t): """ Append a single new data point, `t`, to the existing time series `T` and update the (top-k) matrix profile and matrix profile indices. Parameters ---------- t : float A single new data point to be appended to `T` Notes ----- `DOI: 10.1007/s10618-017-0519-9 \ <>`__ See Table V Note that line 11 is missing an important `sqrt` operation! """ if self._egress: self._update_egress(t) else: self._update(t) def _update_egress(self, t): """ Ingress a new data point, egress the oldest data point, and update the (top-k) matrix profile and matrix profile indices Parameters ---------- t : float A single new data point to be appended to `T` """ self._n = self._T.shape[0] l = self._n - self._m + 1 - 1 # Subtract 1 due to egress self._T[:-1] = self._T[1:] self._T[-1] = t self._n_appended += 1 self._QT[:-1] = self._QT[1:] S = self._T[l:] t_drop = self._T[l - 1] self._T_isfinite[:-1] = self._T_isfinite[1:] self._I[:-1] = self._I[1:] self._P[:-1] = self._P[1:] self._left_I[:-1] = self._left_I[1:] self._left_P[:-1] = self._left_P[1:] if np.isfinite(t): self._T_isfinite[-1] = True else: self._T_isfinite[-1] = False t = 0 self._T[-1] = 0 S[-1] = 0 if np.any(~self._T_isfinite[-self._m :]): μ_Q = np.inf σ_Q = np.nan Q_subseq_isconstant = False else: Q_subseq_isconstant = core.process_isconstant( S, self._m, self._T_subseq_isconstant_func )[0] μ_Q, σ_Q = [arr[0] for arr in core.compute_mean_std(S, self._m)] self._M_T[:-1] = self._M_T[1:] self._Σ_T[:-1] = self._Σ_T[1:] self._T_subseq_isconstant[:-1] = self._T_subseq_isconstant[1:] self._M_T[-1] = μ_Q self._Σ_T[-1] = σ_Q self._T_subseq_isconstant[-1] = Q_subseq_isconstant self._QT_new[1:] = self._QT[:l] - self._T[:l] * t_drop + self._T[self._m :] * t self._QT_new[0] = np.sum(self._T[: self._m] * S[: self._m]) D = core.calculate_distance_profile( self._m, self._QT_new, μ_Q, σ_Q, self._M_T, self._Σ_T, Q_subseq_isconstant, self._T_subseq_isconstant, ) if np.any(~self._T_isfinite[-self._m :]): D[:] = np.inf core.apply_exclusion_zone(D, D.shape[0] - 1, self._excl_zone, np.inf) update_idx = np.argwhere(D < self._P[:, -1]).flatten() for i in update_idx: idx = np.searchsorted(self._P[i], D[i], side="right") core._shift_insert_at_index(self._P[i], idx, D[i]) core._shift_insert_at_index( self._I[i], idx, D.shape[0] + self._n_appended - 1 ) # D.shape[0] is base-1 # Calculate the (top-k) matrix profile values/indices for the last subsequence # by using its corresponding distance profile `D` self._P[-1] = np.inf self._I[-1] = -1 for i, d in enumerate(D): if d < self._P[-1, -1]: idx = np.searchsorted(self._P[-1], d, side="right") core._shift_insert_at_index(self._P[-1], idx, d) core._shift_insert_at_index(self._I[-1], idx, i + self._n_appended) # All neighbors of the last subsequence are on its left. So, its (top-1) # matrix profile value/index and its left matrix profile value/index must # be equal. self._left_P[-1] = self._P[-1, 0] self._left_I[-1] = self._I[-1, 0] self._QT[:] = self._QT_new def _update(self, t): """ Ingress a new data point and update the (top-k) matrix profile and matrix profile indices without egressing the oldest data point Parameters ---------- t : float A single new data point to be appended to `T` """ n = self._T.shape[0] l = n - self._m + 1 T_new = np.append(self._T, t) QT_new = np.empty(self._QT.shape[0] + 1, dtype=np.float64) S = T_new[l:] t_drop = T_new[l - 1] if np.isfinite(t): self._T_isfinite = np.append(self._T_isfinite, True) else: self._T_isfinite = np.append(self._T_isfinite, False) t = 0 T_new[-1] = 0 S[-1] = 0 if np.any(~self._T_isfinite[-self._m :]): μ_Q = np.inf σ_Q = np.nan Q_subseq_isconstant = False else: Q_subseq_isconstant = core.process_isconstant( S, self._m, self._T_subseq_isconstant_func )[0] μ_Q, σ_Q = [arr[0] for arr in core.compute_mean_std(S, self._m)] M_T_new = np.append(self._M_T, μ_Q) Σ_T_new = np.append(self._Σ_T, σ_Q) T_subseq_isconstant_new = np.append( self._T_subseq_isconstant, Q_subseq_isconstant ) QT_new[1:] = self._QT[:l] - T_new[:l] * t_drop + T_new[self._m :] * t QT_new[0] = np.sum(T_new[: self._m] * S[: self._m]) D = core.calculate_distance_profile( self._m, QT_new, μ_Q, σ_Q, M_T_new, Σ_T_new, Q_subseq_isconstant, T_subseq_isconstant_new, ) if np.any(~self._T_isfinite[-self._m :]): D[:] = np.inf core.apply_exclusion_zone(D, D.shape[0] - 1, self._excl_zone, np.inf) update_idx = np.argwhere(D[:l] < self._P[:l, -1]).flatten() for i in update_idx: idx = np.searchsorted(self._P[i], D[i], side="right") core._shift_insert_at_index(self._P[i], idx, D[i]) core._shift_insert_at_index(self._I[i], idx, l) # Calculating top-k matrix profile and (top-1) left matrix profile (and their # corresponding indices) for new subsequence whose distance profile is `D` P_new = np.full(self._k, np.inf, dtype=np.float64) I_new = np.full(self._k, -1, dtype=np.int64) for i, d in enumerate(D): if d < P_new[-1]: # maximum value in sorted array P_new idx = np.searchsorted(P_new, d, side="right") core._shift_insert_at_index(P_new, idx, d) core._shift_insert_at_index(I_new, idx, i) left_I_new = I_new[0] left_P_new = P_new[0] self._T = T_new self._P = np.append(self._P, P_new.reshape(1, -1), axis=0) self._I = np.append(self._I, I_new.reshape(1, -1), axis=0) self._left_P = np.append(self._left_P, left_P_new) self._left_I = np.append(self._left_I, left_I_new) self._QT = QT_new self._M_T = M_T_new self._Σ_T = Σ_T_new self._T_subseq_isconstant = T_subseq_isconstant_new @property def P_(self): """ Get the (top-k) matrix profile. When `k=1` (default), the output is a 1D array consisting of the matrix profile. When `k > 1`, the output is a 2D array that has exactly `k` columns and it consists of the top-k matrix profile. Parameters ---------- None Returns ------- None """ if self._k == 1: return self._P.flatten().astype(np.float64) else: return self._P.astype(np.float64) @property def I_(self): """ Get the (top-k) matrix profile indices. When `k=1` (default), the output is a 1D array consisting of the matrix profile indices. When `k > 1`, the output is a 2D array that has exactly `k` columns and it consists of the top-k matrix profile indices. Parameters ---------- None Returns ------- None """ if self._k == 1: return self._I.flatten().astype(np.int64) else: return self._I.astype(np.int64) @property def left_P_(self): """ Get the (top-1) left matrix profile Parameters ---------- None Returns ------- None """ return self._left_P.astype(np.float64) @property def left_I_(self): """ Get the (top-1) left matrix profile indices Parameters ---------- None Returns ------- None """ return self._left_I.astype(np.int64) @property def T_(self): """ Get the time series Parameters ---------- None Returns ------- None """ return self._T