{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Incremental Matrix Profiles for Streaming Time Series Data\n", "\n", "[![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/TDAmeritrade/stumpy/main?filepath=notebooks/Tutorial_Matrix_Profiles_For_Streaming_Data.ipynb)\n", "\n", "Now that you have a basic understanding of how to compute a matrix profile, in this short tutorial, we will demonstrate how to incrementally update your matrix profile when you have streaming (on-line) data using the `stumpy.stumpi` (\"STUMP Incremental\") function. You can learn more about the details of this approach by reading Section G of the [Matrix Profile I](https://www.cs.ucr.edu/~eamonn/PID4481997_extend_Matrix%20Profile_I.pdf) paper and Section 4.6 and Table 5 [this paper](https://www.cs.ucr.edu/~eamonn/ten_quadrillion.pdf)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Getting Started\n", "\n", "Let's import the packages that we'll need to create and analyze a randomly generated time series data set." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import stumpy\n", "import numpy.testing as npt\n", "import time" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Generating Some Random Time Series Data\n", "\n", "Imagine that we have an [IoT](https://en.wikipedia.org/wiki/Internet_of_things) sensor that has been collecting data once an hour for the last 14 days. That would mean that we've amassed `14 * 24 = 336` data points up until this point and our data set might look like this:" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "T = np.random.rand(336)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And, perhaps, we know from experience that an interesting motif or anomaly might be detectable within a 12 hour (sliding) time window:" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "m = 12" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Typical Batch Analysis" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To compute the matrix profile using a batch process is straightforward using `stumpy.stump`:" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "mp = stumpy.stump(T, m)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "But as the length of `T` grows with each passing hour, it will take increasingly more time to compute the matrix profile since `stumpy.stump` will actually re-compute all of the pairwise distances between all subsequences within the time series. This is super time consuming! Instead, for streaming data, we want to find a way to take the new incoming (single) data point and compare the subsequence that it resides in with the rest of the time series (i.e., compute the distance profile) and update the existing matrix profile. Luckily, this can be easily accomplished with `stumpy.stumpi` or \"STUMP Incremental\".\n", "\n", "## Streaming (On-line) Analysis with STUMPI\n", "\n", "As we wait for the next data point, `t`, to arrive, we can take our existing data initialize our streaming object:" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "stream = stumpy.stumpi(T, m, egress=False) # Don't egress/remove the oldest data point when streaming" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And when a new data point, `t`, arrives:" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "t = np.random.rand()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can append `t` to our `stream` and easily update the matrix profile, `P`, and matrix profile indices, `I` behind the scenes:" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "stream.update(t)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In the background, `t` has been appended to the existing time series and it automatically compares the new subsequence with all of the existing ones and updates the historical values. It also determines which one of the existing subsequences is the nearest neighbor to the new subsequence and appends this information to the matrix profile. And this can continue on, say, for another 1,000 iterations (or indefinitely) as additional data is streamed in:" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "for i in range(1000):\n", " t = np.random.rand()\n", " stream.update(t)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It is important to reiterate that incremental `stumpy.stumpi` is different from batch `stumpy.stump` in that it does not waste any time re-computing any of the past pairwise distances. `stumpy.stumpi` only spends time computing new distances and then updates the appropriate arrays where necessary and, thus, it is really fast!" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Validation\n", "\n", "### The Matrix Profile\n", "\n", "Now, this claim of \"fast updating\" with streaming (on-line) data may feel strange or seem magical so, first, let's validate that the output from incremental `stumpy.stumpi` is the same as performing batch `stumpy.stump`. Let's start with the full time series with `64` data points and compute the full matrix profile:" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "T_full = np.random.rand(64)\n", "m = 8\n", "\n", "mp = stumpy.stump(T_full, m)\n", "P_full = mp[:, 0]\n", "I_full = mp[:, 1]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, for `stumpy.stumpi`, we'll only start with the first `10` elements from the full length time series and then incrementally stream in the additional data points one at a time:" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "# Start with half of the full length time series and initialize inputs\n", "T_stream = T_full[:10].copy()\n", "stream = stumpy.stumpi(T_stream, m, egress=False) # Don't remove/egress the oldest data point when streaming\n", "\n", "# Incrementally add one new data point at a time and update the matrix profile\n", "for i in range(len(T_stream), len(T_full)):\n", " t = T_full[i]\n", " stream.update(t)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that we're done, let's check and validate that:\n", "\n", "1. `stream.T == T_full`\n", "2. `stream.P == P_full`\n", "3. `stream.I == I_full`" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "npt.assert_almost_equal(stream.T_, T_full)\n", "npt.assert_almost_equal(stream.P_, P_full)\n", "npt.assert_almost_equal(stream.I_, I_full)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "There are no errors! So, this means that `stump.stumpi` indeed produces the correct matrix profile results that we'd expect.\n", "\n", "### The Performance\n", "\n", "We've basically claimed that incrementally updating our matrix profile with `stumpy.stumpi` is much faster (in total computational time) than performing a full pairwise distance calculation with `stumpy.stump` as each new data point arrives. Let's actually compare the timings by taking a full time series that is 10,000 data points in length and we initialize both approaches with the first 2% of the time series (i.e., the first 200 points) and append a single new data point at each iteration before re-computing the matrix profile:" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "stumpy.stump: 257.4s\n", "stumpy.stumpi: 3.1s\n" ] } ], "source": [ "T_full = np.random.rand(10_000)\n", "T_stream = T_full[:200].copy()\n", "m = 100\n", "\n", "# `stumpy.stump` timing\n", "start = time.time()\n", "mp = stumpy.stump(T_stream, m)\n", "for i in range(200, len(T_full)):\n", " T_stream = np.append(T_stream, T_full[i])\n", " mp = stumpy.stump(T_stream, m)\n", "stump_time = time.time() - start\n", "\n", "# `stumpy.stumpi` timing\n", "stream = stumpy.stumpi(T_stream, m, egress=False) # Don't egress/remove the oldest data point when streaming\n", "start = time.time()\n", "for i in range(200, len(T_full)):\n", " t = T_full[i]\n", " stream.update(t)\n", "stumpi_time = time.time() - start\n", "\n", "print(f\"stumpy.stump: {np.round(stump_time,1)}s\")\n", "print(f\"stumpy.stumpi: {np.round(stumpi_time, 1)}s\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Setting aside the fact that having more CPUs will speed up both approaches, we clearly see that incremental `stumpy.stumpi` is one to two orders of magnitude faster than batch `stumpy.stump` for processing streaming data. In fact for the current hardware, on average, it is taking roughly 0.1 seconds for `stumpy.stump` to analyze each new matrix profile. So, if you have more than 10 new data point arriving every second, then you wouldn't be able to keep up. In contrast, `stumpy.stumpi` should be able to comfortably handle and process ~450+ new data points per second using fairly modest hardware. Additionally, batch `stumpy.stump`, which has a computational complexity of `O(n^2)`, will get even slower as more and more data points get appended to the existing time series while `stumpy.stumpi`, which is essentially `O(1)`, will continue to be highly performant. \n", "\n", "In fact, if you don't care about maintaining the oldest data point and its relationships with the newest data point (i.e., you only care about maintaining a fixed sized sliding window), then you can get slightly improve the performance by telling `stumpy.stumpi` to remove/egress the oldest data point (along with its corresponding matrix profile information) by setting the parameter `egress=True` when we instantiate our streaming object (note that this is actually the default behavior):" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "stream = stumpy.stumpi(T_stream, m, egress=True) # Egressing/removing the oldest data point is the default behavior!" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And now, when we process the same data above:" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "stumpy.stumpi: 3.4s\n" ] } ], "source": [ "# `stumpy.stumpi` timing with egress\n", "stream = stumpy.stumpi(T_stream, m, egress=True)\n", "start = time.time()\n", "for i in range(200, len(T_full)):\n", " t = T_full[i]\n", " stream.update(t)\n", "stumpi_time = time.time() - start\n", "\n", "print(f\"stumpy.stumpi: {np.round(stumpi_time, 1)}s\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## A Visual Example\n", "\n", "Now that we understand how to compute and update our matrix profile with streaming data, let's explore this with a real example data set where there is a known pattern and see if `stumpy.stumpi` can correctly identify when the global pattern (motif) is encountered. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Retrieving and Loading the Data\n", "\n", "First let's import some additional Python packages and then retrieve our standard \"Steamgen Dataset\":" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", " | drum pressure | \n", "excess oxygen | \n", "water level | \n", "steam flow | \n", "
---|---|---|---|---|
0 | \n", "320.08239 | \n", "2.506774 | \n", "0.032701 | \n", "9.302970 | \n", "
1 | \n", "321.71099 | \n", "2.545908 | \n", "0.284799 | \n", "9.662621 | \n", "
2 | \n", "320.91331 | \n", "2.360562 | \n", "0.203652 | \n", "10.990955 | \n", "
3 | \n", "325.00252 | \n", "0.027054 | \n", "0.326187 | \n", "12.430107 | \n", "
4 | \n", "326.65276 | \n", "0.285649 | \n", "0.753776 | \n", "13.681666 | \n", "