# Source code for stumpy.floss

```
# STUMPY
# Copyright 2019 TD Ameritrade. Released under the terms of the 3-Clause BSD license. # noqa: E501
# STUMPY is a trademark of TD Ameritrade IP Company, Inc. All rights reserved.
import copy
import numpy as np
import scipy.stats
from . import config, core
def _nnmark(I):
"""
Count the number of nearest neighbor overhead crossings or arcs.
Parameters
----------
I : numpy.ndarray
Matrix profile indices
Returns
-------
nnmark : numpy.ndarray
Counts of nearest neighbor overheard crossings or arcs.
Notes
-----
DOI: 10.1109/ICDM.2017.21 <https://www.cs.ucr.edu/~eamonn/Segmentation_ICDM.pdf>`__
See Table I
This is a fast and vectorized implementation of the nnmark algorithm.
"""
I = I.astype(np.int64)
# Replace index values that are less than zero with its own positional index
idx = np.argwhere(I < 0).flatten()
I[idx] = idx
k = I.shape[0]
i = np.arange(k, dtype=np.int64)
nnmark = np.bincount(np.minimum(i, I), minlength=k)
nnmark -= np.bincount(np.maximum(i, I), minlength=k)
return nnmark.cumsum()
def _iac(
width, bidirectional=True, n_iter=1000, n_samples=1000, seed=0
): # pragma: no cover
"""
Compute the bidirectional idealized arc curve (IAC). This is based
on a beta distribution that is scaled with a width that is identical
to the length of the matrix profile index. The height of the idealized
parabolic curve is assumed to be exactly half the width.
If `bidirectional=False` then the 1-dimensional IAC is computed instead.
Parameters
----------
width : int
The width of the bidirectional idealized arc curve. This is equal
to the length of the matrix profile index.
bidirectional : bool, default True
Flag for computing a bidirectional (`True`) or 1-dimensional (`False`)
idealized arc curve
n_iter : int, default 1000
Number of iterations to average over when determining the parameters for
beta distribution
n_samples : int, default 1000
Number of distribution samples to draw during each iteration
seed : int, default 0
NumPy random seed used in sampling the beta distribution. Set this to your
desired value for reproducibility purposes. The default value is set to `0`.
Returns
-------
IAC : numpy.ndarray
Idealized arc curve (IAC)
"""
np.random.seed(seed)
I = np.random.randint(0, width, size=width, dtype=np.int64)
if bidirectional is False: # Idealized 1-dimensional matrix profile index
I[:-1] = width
for i in range(width - 1):
I[i] = np.random.randint(i + 1, width, dtype=np.int64)
target_AC = _nnmark(I)
params = np.empty((n_iter, 2), dtype=np.float64)
for i in range(n_iter):
hist_dist = scipy.stats.rv_histogram(
(target_AC, np.append(np.arange(width), width))
)
data = hist_dist.rvs(size=n_samples)
a, b, c, d = scipy.stats.beta.fit(data, floc=0, fscale=width)
params[i, 0] = a
params[i, 1] = b
a_mean = np.round(np.mean(params[:, 0]), 2)
b_mean = np.round(np.mean(params[:, 1]), 2)
IAC = scipy.stats.beta.pdf(np.arange(width), a_mean, b_mean, loc=0, scale=width)
slope, _, _, _ = np.linalg.lstsq(np.expand_dims(IAC, axis=1), target_AC, rcond=None)
IAC *= slope
return IAC
def _cac(I, L, bidirectional=True, excl_factor=5, custom_iac=None, seed=0):
"""
Compute the corrected arc curve (CAC)
Parameters
----------
I : numpy.ndarray
The matrix profile indices for the time series of interest
L : int
The subsequence length that is set roughly to be one period length.
This is likely to be the same value as the window size, `m`, used
to compute the matrix profile and matrix profile index but it can
be different since this is only used to manage edge effects
and has no bearing on any of the IAC or CAC core calculations.
bidirectional : bool, default True
Flag for normalizing the arc curve with a bidirectional (`True`) or
1-dimensional (`False`) idealized arc curve. If a `custom_iac` is
specified then this flag is ignored.
excl_factor : int, default 5
The multiplying factor for the first and last regime exclusion zones
custom_iac : numpy.ndarray, default None
A custom idealized arc curve (IAC) that will used for correcting the
arc curve
seed : int, default 0
NumPy random seed used in sampling the `iac` beta distribution. Set this
to your desired value for reproducibility purposes. The default value is
set to `0`.
Returns
-------
output : numpy.ndarray
A corrected arc curve (CAC)
Notes
-----
DOI: 10.1109/ICDM.2017.21 <https://www.cs.ucr.edu/~eamonn/Segmentation_ICDM.pdf>`__
See Table I
This is the implementation for the corrected arc curve (CAC).
"""
k = I.shape[0]
AC = _nnmark(I)
CAC = np.zeros(k, dtype=np.float64)
if custom_iac is None:
IAC = _iac(k, bidirectional, seed=seed)
else:
IAC = custom_iac
IAC[IAC == 0.0] = 10**-10 # Avoid divide by zero
CAC[:] = AC / IAC
CAC[CAC > 1.0] = 1.0 # Equivalent to min
if excl_factor > 0:
CAC[: L * excl_factor] = 1.0
CAC[-L * excl_factor :] = 1.0
return CAC
def _rea(cac, n_regimes, L, excl_factor=5):
"""
Find the location of the regimes using the regime extracting
algorithm (REA)
Parameters
----------
cac : numpy.ndarray
The corrected arc curve
n_regimes : int
The number of regimes to search for. This is one more than the
number of regime changes as denoted in the original paper.
L : int
The subsequence length that is set roughly to be one period length.
This is likely to be the same value as the window size, `m`, used
to compute the matrix profile and matrix profile index but it can
be different since this is only used to manage edge effects
and has no bearing on any of the IAC or CAC core calculations.
excl_factor : int, default 5
The multiplying factor for the regime exclusion zone
Returns
-------
regime_locs : numpy.ndarray
The locations of the regimes
Notes
-----
DOI: 10.1109/ICDM.2017.21 <https://www.cs.ucr.edu/~eamonn/Segmentation_ICDM.pdf>`__
See Table II
This is the implementation for the regime extracting algorithm (REA).
"""
regime_locs = np.empty(n_regimes - 1, dtype=np.int64)
tmp_cac = copy.deepcopy(cac)
for i in range(n_regimes - 1):
regime_locs[i] = np.argmin(tmp_cac)
excl_start = max(regime_locs[i] - excl_factor * L, 0)
excl_stop = min(regime_locs[i] + excl_factor * L, cac.shape[0])
tmp_cac[excl_start:excl_stop] = 1.0
return regime_locs
[docs]
def fluss(I, L, n_regimes, excl_factor=5, custom_iac=None):
"""
Compute the Fast Low-cost Unipotent Semantic Segmentation (FLUSS)
for static data (i.e., batch processing)
Essentially, this is a wrapper to compute the corrected arc curve and
regime locations. Note that since the matrix profile indices, `I`, are pre-computed,
this function is agnostic to subsequence normalization.
Parameters
----------
I : numpy.ndarray
The matrix profile indices for the time series of interest
L : int
The subsequence length that is set roughly to be one period length.
This is likely to be the same value as the window size, `m`, used
to compute the matrix profile and matrix profile index but it can
be different since this is only used to manage edge effects
and has no bearing on any of the IAC or CAC core calculations.
n_regimes : int
The number of regimes to search for. This is one more than the
number of regime changes as denoted in the original paper.
excl_factor : int, default 5
The multiplying factor for the regime exclusion zone
custom_iac : numpy.ndarray, default None
A custom idealized arc curve (IAC) that will used for correcting the
arc curve
Returns
-------
cac : numpy.ndarray
A corrected arc curve (CAC)
regime_locs : numpy.ndarray
The locations of the regimes
See Also
--------
stumpy.floss : Compute the Fast Low-Cost Online Semantic Segmentation (FLOSS) for
streaming data
Notes
-----
`DOI: 10.1109/ICDM.2017.21 <https://www.cs.ucr.edu/~eamonn/Segmentation_ICDM.pdf>`__
See Section A
This is the implementation for Fast Low-cost Unipotent Semantic
Segmentation (FLUSS).
Examples
--------
>>> import stumpy
>>> import numpy as np
>>> mp = stumpy.stump(np.array([584., -11., 23., 79., 1001., 0., -19.]), m=3)
>>> stumpy.fluss(mp[:, 0], 3, 2)
(array([1., 1., 1., 1., 1.]), array([0]))
"""
cac = _cac(I, L, bidirectional=True, excl_factor=excl_factor, custom_iac=custom_iac)
regime_locs = _rea(cac, n_regimes, L, excl_factor=excl_factor)
return cac, regime_locs
[docs]
class floss:
"""
Compute the Fast Low-cost Online Semantic Segmentation (FLOSS) for
streaming data
Parameters
----------
mp : numpy.ndarray
The first column consists of the matrix profile, the second column
consists of the matrix profile indices, the third column consists of
the left matrix profile indices, and the fourth column consists of
the right matrix profile indices.
T : numpy.ndarray
A 1-D time series data used to generate the matrix profile and matrix profile
indices found in `mp`. Note that the the right matrix profile index is used and
the right matrix profile is intelligently recomputed on the fly from `T` instead
of using the bidirectional matrix profile.
m : int
The window size for computing sliding window mass. This is identical
to the window size used in the matrix profile calculation. For managing
edge effects, see the `L` parameter.
L : int
The subsequence length that is set roughly to be one period length.
This is likely to be the same value as the window size, `m`, used
to compute the matrix profile and matrix profile index but it can
be different since this is only used to manage edge effects
and has no bearing on any of the IAC or CAC core calculations.
excl_factor : int, default 5
The multiplying factor for the regime exclusion zone. Note that this
is unrelated to the `excl_zone` used in to compute the matrix profile.
n_iter : int, default 1000
Number of iterations to average over when determining the parameters for
the IAC beta distribution
n_samples : int, default 1000
Number of distribution samples to draw during each iteration when
computing the IAC
custom_iac : numpy.ndarray, default None
A custom idealized arc curve (IAC) that will used for correcting the
arc curve
normalize : bool, default True
When set to `True`, this z-normalizes subsequences prior to computing distances
p : float, default 2.0
The p-norm to apply for computing the Minkowski distance. Minkowski distance is
typically used with `p` being 1 or 2, which correspond to the Manhattan distance
and the Euclidean distance, respectively. This parameter is ignored when
`normalize == True`.
T_subseq_isconstant_func : function, default None
A custom, user-defined function that returns a boolean array that indicates
whether a subsequence in `T` is constant (True). The function must only take
two arguments, `a`, a 1-D array, and `w`, the window size, while additional
arguments may be specified by currying the user-defined function using
`functools.partial`. Any subsequence with at least one np.nan/np.inf will
automatically have its corresponding value set to False in this boolean array.
Attributes
----------
cac_1d_ : numpy.ndarray
A 1-dimensional corrected arc curve (CAC) updated as a result of ingressing a
single new data point and egressing a single old data point.
P_ : numpy.ndarray
The matrix profile updated as a result of ingressing a single new data
point and egressing a single old data point.
I_ : numpy.ndarray
The (right) matrix profile indices updated as a result of ingressing a single
new data point and egressing a single old data point.
T_ : numpy.ndarray
The updated time series, `T`
Methods
-------
update(t)
Ingress a new data point, `t`, onto the time series, `T`, followed by egressing
the oldest single data point from `T`. Then, update the 1-dimensional corrected
arc curve (CAC_1D) and the matrix profile.
See Also
--------
stumpy.fluss : Compute the Fast Low-cost Unipotent Semantic Segmentation (FLUSS)
for static data (i.e., batch processing)
Notes
-----
`DOI: 10.1109/ICDM.2017.21 <https://www.cs.ucr.edu/~eamonn/Segmentation_ICDM.pdf>`__
See Section C
This is the implementation for Fast Low-cost Online Semantic
Segmentation (FLOSS).
Examples
--------
>>> import stumpy
>>> import numpy as np
>>> mp = stumpy.stump(np.array([584., -11., 23., 79., 1001., 0.]), m=3)
>>> stream = stumpy.floss(
... mp,
... np.array([584., -11., 23., 79., 1001., 0.]),
... m=3,
... L=3)
>>> stream.update(19.)
>>> stream.cac_1d_
array([1., 1., 1., 1.])
"""
def __init__(
self,
mp,
T,
m,
L,
excl_factor=5,
n_iter=1000,
n_samples=1000,
custom_iac=None,
normalize=True,
p=2.0,
T_subseq_isconstant_func=None,
):
"""
Initialize the FLOSS object
Parameters
----------
mp : numpy.ndarray
The first column consists of the matrix profile, the second column
consists of the matrix profile indices, the third column consists of
the left matrix profile indices, and the fourth column consists of
the right matrix profile indices.
T : numpy.ndarray
A 1-D time series data used to generate the matrix profile and matrix
profile indices found in `mp`. Note that the the right matrix profile index
is used and the right matrix profile is intelligently recomputed on-the-fly
from `T` instead of using the bidirectional matrix profile.
m : int
The window size for computing sliding window mass. This is identical
to the window size used in the matrix profile calculation. For managing
edge effects, see the `L` parameter.
L : int
The subsequence length that is set roughly to be one period length.
This is likely to be the same value as the window size, `m`, used
to compute the matrix profile and matrix profile index but it can
be different since this is only used to manage edge effects
and has no bearing on any of the IAC or CAC core calculations.
excl_factor : int, default 5
The multiplying factor for the regime exclusion zone. Note that this
is unrelated to the `excl_zone` used in to compute the matrix profile.
n_iter : int, default 1000
Number of iterations to average over when determining the parameters for
the IAC beta distribution
n_samples : int, default 1000
Number of distribution samples to draw during each iteration when
computing the IAC
custom_iac : numpy.ndarray, default None
A custom idealized arc curve (IAC) that will used for correcting the
arc curve
normalize : bool, default True
When set to `True`, this z-normalizes subsequences prior to computing
distances
p : float, default 2.0
The p-norm to apply for computing the Minkowski distance. Minkowski distance
is typically used with `p` being 1 or 2, which correspond to the Manhattan
distance and the Euclidean distance, respectively.This parameter is ignored
when `normalize == True`.
T_subseq_isconstant_func : function, default None
A custom, user-defined function that returns a boolean array that indicates
whether a subsequence in `T` is constant (True). The function must only take
two arguments, `a`, a 1-D array, and `w`, the window size, while additional
arguments may be specified by currying the user-defined function using
`functools.partial`. Any subsequence with at least one np.nan/np.inf will
automatically have its corresponding value set to False in this boolean
array.
"""
self._mp = copy.deepcopy(np.asarray(mp))
self._T = copy.deepcopy(np.asarray(T))
self._m = m
self._L = L
self._excl_factor = excl_factor
self._n_iter = n_iter
self._n_samples = n_samples
self._custom_iac = custom_iac
self._normalize = normalize
self._p = p
self._T_subseq_isconstant = None
self._M_T = None
self._Σ_T = None
if T_subseq_isconstant_func is None:
T_subseq_isconstant_func = core._rolling_isconstant
if not callable(T_subseq_isconstant_func): # pragma: no cover
msg = (
"`T_subseq_isconstant_func` was expected to be a callable function "
+ f"but {type(T_subseq_isconstant_func)} was found."
)
raise ValueError(msg)
self._T_subseq_isconstant_func = T_subseq_isconstant_func
self._k = self._mp.shape[0]
self._n = self._T.shape[0]
self._last_idx = self._n - self._m + 1 # Depends on the changing length of `T`
self._n_appended = 0
self._T_isfinite = np.isfinite(self._T)
self._finite_T = self._T.copy()
self._finite_T[~np.isfinite(self._finite_T)] = 0.0
self._finite_Q = self._finite_T[-self._m :].copy()
if self._normalize:
self._T_subseq_isconstant = core.process_isconstant(
self._T, self._m, self._T_subseq_isconstant_func
)
self._M_T, self._Σ_T = core.compute_mean_std(self._T, self._m)
if self._custom_iac is None: # pragma: no cover
self._custom_iac = _iac(
self._k,
bidirectional=False,
n_iter=self._n_iter,
n_samples=self._n_samples,
)
right_nn = np.zeros((self._k, self._m), dtype=np.float64)
# Disable the bidirectional matrix profile indices and left indices
self._mp[:, 1] = -1
self._mp[:, 2] = -1
# Update matrix profile distance to be right mp distance and not bidirectional.
# Use right indices to perform direct distance calculations
# Note that any -1 indices must have a np.inf matrix profile value
right_indices = [
np.arange(IR, IR + self._m, dtype=np.int64)
for IR in self._mp[:, 3].tolist()
]
right_nn[:] = self._T[np.array(right_indices)]
if self._normalize:
self._mp[:, 0] = np.linalg.norm(
core.z_norm(core.rolling_window(self._T, self._m), 1)
- core.z_norm(right_nn, 1),
axis=1,
)
nn_subseq_isconstant = self._T_subseq_isconstant[
self._mp[:, 3].astype(np.int64)
]
# subseq and its nn are both constant
mask = self._T_subseq_isconstant & nn_subseq_isconstant
self._mp[mask, 0] = 0
# Only the subseq OR its nn is constant but not both
mask = np.logical_xor(self._T_subseq_isconstant, nn_subseq_isconstant)
self._mp[mask, 0] = np.sqrt(m)
else:
self._mp[:, 0] = np.linalg.norm(
core.rolling_window(self._T, self._m) - right_nn,
axis=1,
ord=self._p,
)
# Note that a negative matrix profile index (e.g. -1) represents a null index.
# However, in numpy, negative indices are interpreted as counting from the end
# of the array. To resolve this, we temporarily replace a negative index with
# a self index. So, if subsequence S_i has no valid right nearest neighbor, then
# self._mp[i, 3] will be set to i and its corresponding matrix profile distance
# is set to 0.0. This temporary resolution is later resolved (by setting them
# back to -1 and np.inf, respectively) in a post-processing step.
inf_indices = np.argwhere(self._mp[:, 3] < 0).flatten()
self._mp[inf_indices, 0] = np.inf
self._mp[inf_indices, 3] = inf_indices
self._cac = np.ones(self._k, dtype=np.float64) * -1
def update(self, t):
"""
Ingress a new data point, `t`, onto the time series, `T`, followed by egressing
the oldest single data point from `T`. Then, update the 1-dimensional corrected
arc curve (CAC_1D) and the matrix profile.
Parameters
----------
t : float
A single new data point to be appended to `T`
Notes
-----
DOI: 10.1109/ICDM.2017.21 \
<https://www.cs.ucr.edu/~eamonn/Segmentation_ICDM.pdf>`__
See Section C
This is the implementation for Fast Low-cost Online Semantic
Segmentation (FLOSS).
"""
self._T[:-1] = self._T[1:]
self._T_isfinite[:-1] = self._T_isfinite[1:]
self._finite_T[:-1] = self._finite_T[1:]
self._finite_Q[:-1] = self._finite_Q[1:]
self._T[-1] = t
self._T_isfinite[-1] = np.isfinite(t)
self._finite_T[-1] = t
if not np.isfinite(t):
self._finite_T[-1] = 0.0
self._finite_Q[-1] = self._finite_T[-1]
excl_zone = int(np.ceil(self._m / config.STUMPY_EXCL_ZONE_DENOM))
# Note that the start of the exclusion zone is relative to
# the unchanging length of the matrix profile index
zone_start = max(0, self._k - excl_zone)
# Egress
# Remove the first element in the matrix profile index
# Shift mp up by one and replace the last row with new values
self._mp[:-1, :] = self._mp[1:, :]
self._mp[-1, 0] = np.inf
self._mp[-1, 3] = self._last_idx
# Ingress
if self._normalize:
self._T_subseq_isconstant[:-1] = self._T_subseq_isconstant[1:]
self._Q_subseq_isconstant = core.process_isconstant(
self._T[-self._m :], self._m, self._T_subseq_isconstant_func
)
self._T_subseq_isconstant[-1] = self._Q_subseq_isconstant
self._M_T[:-1] = self._M_T[1:]
self._Σ_T[:-1] = self._Σ_T[1:]
self._M_T[-1], self._Σ_T[-1] = core.compute_mean_std(
self._T[-self._m :], self._m
)
D = core.mass(
self._finite_Q,
self._finite_T,
self._M_T,
self._Σ_T,
T_subseq_isconstant=self._T_subseq_isconstant,
Q_subseq_isconstant=self._Q_subseq_isconstant,
)
else:
D = core.mass_absolute(self._T[-self._m :], self._T, p=self._p)
D[zone_start:] = np.inf
T_subseq_isfinite = core.rolling_isfinite(self._T_isfinite, self._m)
D[~T_subseq_isfinite] = np.inf
if not T_subseq_isfinite[-1]:
D[:] = np.inf
# Update nearest neighbor for old data if any old subsequences
# are closer to the newly arrived subsequence
update_idx = np.argwhere(D < self._mp[:, 0]).flatten()
self._mp[update_idx, 0] = D[update_idx]
self._mp[update_idx, 3] = self._last_idx
self._cac[:] = _cac(
self._mp[:, 3] - self._n_appended - 1,
self._L,
bidirectional=False,
excl_factor=self._excl_factor,
custom_iac=self._custom_iac,
)
self._last_idx += 1
self._n_appended += 1
@property
def cac_1d_(self):
"""
Get the updated 1-dimensional corrected arc curve (CAC_1D)
Parameters
----------
None
Returns
-------
None
"""
return self._cac.astype(np.float64)
@property
def P_(self):
"""
Get the updated matrix profile
Parameters
----------
None
Returns
-------
None
"""
return self._mp[:, 0].astype(np.float64)
@property
def I_(self):
"""
Get the updated (right) matrix profile indices
The indices stored in `self.I_` reflect the starting index of
subsequneces with respect to the full time series (i.e., including
all egressed data points).
Parameters
----------
None
Returns
-------
None
"""
# Comparing the right matrix profile index value with the self index
# position (i.e., self._mp[:, 3] == np.arange(len(self._mp)) is avoided
# because things are constantly being shifted (ingress+egress) to the left
# and so the aforementioned check is all `False` as soon as we start using
# `.update()`.
I = self._mp[:, 3].astype(np.int64).copy()
I[self._mp[:, 0] == np.inf] = -1
return I
@property
def T_(self):
"""
Get the updated time series, `T`
Parameters
----------
None
Returns
-------
None
"""
return self._T.astype(np.float64)
```