py-basic-dtw

A dynamic time warping (dtw) library that contains both classical and subsequence dtw techniques in a basic form which is readable.


License
MIT
Install
pip install py-basic-dtw==0.1.0

Documentation

py-basic-dtw

This dynamic time warping (dtw) library is in a non-optimised state, making it more readable. It performs two version of the dynamic time warping algorithm, classical and subsequence.

CURRENTLY IN DEVELOPMENT STAGE - NOT STABLE

Features:

  • Classical dtw
  • Subsequence dtw with multiple matching.
  • Multidimensional sequence inputs
  • Custom distance metrics via lambda functions (default is absolute difference)
  • Custom step weightings (the step implemented is 1 step diagonal, -vertical, and -horizontal)
  • Custom dimension weightings
  • Retrieve local and accumulated cost matrices
  • Select how subsequent subsequence matches are found. The matches begin first by matching the end point, then building an exclusion zone around them.
    • LocalMaximumExclusion: Excludes all neighbouring end points until a local maximum is reached on both sides.
    • QueryBasedLocalExclusion: Excludes neighbouring end points within a set radius, determined by a percentage of the query length.

Dependencies

  • Numpy

import

Please import the package as follows

 from src.dtw import dtw

Usage

Classical DTW

Optimal alignment between two sequences of varying lengths ( x and y in this case).

The match will start at (0,0) and end at (N,M) where N is the last point in sequence x, and M is the last point in sequence y.

from src.dtw import dtw

# initalise
basicdtw = dtw()

# create sequence
# Ensure even single dimensional input is nested in an array.
x = np.array([[0,0,0,1,1,1,2,3,4,5]])
y = np.array([[1,1,2,2,2,3,3,3,4,4,5,6]])

# assign metric if required
basicdtw.metric = lambda x,y: np.square(x-y)

# compute match
optimalPath, totalCost = basicdtw.Classical(x=x,y=y)

# retrieve cost matrix
localCostMatrix = basicdtw.localCostMatrix
accumulatedCostMatrix = basicdtw.accumulatedCostMatrix

Subsequence DTW

Find a subsequence(s) match within a longer sequence (y) that optimally fits the shorter sequence (x).

Therefore, sequence x needs to be shorter in length than sequence y.

If there are two subsequence matches to find, the first match will always be the last occurring one ( last in the sequence of y.). E.g. if there is two peaks in the y time series where the second peak occurs after the first in time,and x describes such a peak, the first match will find the second peak first then the first peak.

This is due to the nature of the subsequence matching algorithm implemented and can be modified by changing the end point selection function from InvLexiArgMin to LexiArgMin (found at line 76 inside NextSubSequenceMatch function).

from src.dtw import dtw

# initalise
basicdtw = dtw()

# create sequence
# Multidimensional input. Each sequence can vary in length but their dimensions at each time interval must be equal lengths.
x = np.array([[0,0,0,1,1,1,2,3,4,5],[0,0,0,1,1,1,2,3,4,5]])
y = np.array([[1,1,2,2,2,3,3,3,4,4],[0,0,0,1,1,1,2,3,4,5],[0,0,0,1,1,1,2,3,4,5]])

# assign metric if required
basicdtw.metric = lambda x,y: np.square(x-y)

# compute first match
optimalPath, totalCost = basicdtw.Subsequence( x=x, y=y)

# retrieve cost matrix
localCostMatrix = basicdtw.localCostMatrix
accumulatedCostMatrix = basicdtw.accumulatedCostMatrix

# compute next match
optimalPath, totalCost = basicdtw.NextSubsequenceMatch()

Step weights

The step weights define the calculation of the accumulated cost matrix. Which will affect the optimal warping path.

The step function used to create the accumulated cost matrix is as follows:

Distance, D(x,y) = Min(D(x-1,y), D(x,y-1), D(x-1,y-1)) + Local Cost(x,y))

With the step weightings added:

weights = [1,1,1] (diagonal, vertical, horizontal)

Distance, D(x,y) = Min( D(x-1,y) * weights[0] , D(x,y-1) * weights[1] , D(x-1,y-1) * weights[2] ) + Local Cost(x,y) )

from src.dtw import dtw

# initalise
basicdtw = dtw()

# create sequence
# Ensure even single dimensional inputs values is sepearted into an array as follows, of 1 dim.
x = np.array([[0],[0],[0],[1],[1],[1],[2]])
y = np.array([[1],[1],[2],[2],[2],[3],[3],[3],[4],[4],[5],[6]])

# assign metric if required
basicdtw.metric = lambda x,y: np.square(x-y)

# create step weights, giving more control to the diagonal step.
weights = np.array([0.5,1,1])

# compute match
optimalPath, totalCost = basicdtw.Classical( x=x, y=y, stepWeights=weights)

# retrieve cost matrix
localCostMatrix = basicdtw.localCostMatrix
accumulatedCostMatrix = basicdtw.accumulatedCostMatrix

Dimensional weights

Dimensional weighting defines what dimensions are most important during creation of the local cost matrix.

The local cost matrix with weighting is created using an accumulated distance function as follows:

dimWeights = [1,1,1,1,1,1]
metric = lambda x,y: abs(x-y)

for i -> 0 to N (where N is number of dimensions):
    localCostMatrix(x,y) += metric(x[i][x],y[i][y]) * dimWeights[dim]

The implementation of dimensional weighting:

from src.dtw import dtw

# initalise
basicdtw = dtw()

# create sequence
# Ensure even single dimensional input is nested in an array.
x = np.array([[0,1,2],[0,1,2],[0,1,2],[1,1,2],[1,3,3],[1,4,4],[2,5,5]])
y = np.array([[1,1,1],[1,1,1],[2,2,2],[2,2,2],[2,2,2],[3,3,3],[3,3,3],[3,3,3],[4,4,4],[4,4,4],[5,5,5],[6,6,6]])

# assign metric if required
basicdtw.metric = lambda x,y: np.square(x-y)

# dimensional weighting
weights = np.array([1,1,1]) # only three elements as there is only three dimensions.

# compute match
optimalPath, totalCost = basicdtw.Classical(x=x, y=y, dimWeights=weights)

# retrieve cost matrix
localCostMatrix = basicdtw.localCostMatrix
accumulatedCostMatrix = basicdtw.accumulatedCostMatrix