Source code for seglearn.util

"""
This module has utilities for time series data input checking
"""
# Author: David Burns
# License: BSD

from pandas import DataFrame
import numpy as np
import warnings

from seglearn.base import TS_Data

__all__ = ['get_ts_data_parts', 'check_ts_data', 'check_ts_data_with_ts_target', 'ts_stats']


[docs]def get_ts_data_parts(X): """ Separates time series data object into time series variables and contextual variables Parameters ---------- X : array-like, shape [n_series, ...] Time series data and (optionally) contextual data Returns ------- Xt : array-like, shape [n_series, ] Time series data Xs : array-like, shape [n_series, n_contextd = np.colum _variables] contextual variables """ if isinstance(X, TS_Data): return X.ts_data, X.context_data elif isinstance(X, DataFrame): return X.ts_data.values, X.drop(columns=['ts_data']).values else: return X, None
[docs]def check_ts_data(X, y=None): """ Checks time series data is good. If not raises value error. Parameters ---------- X : array-like, shape [n_series, ...] Time series data and (optionally) contextual data Returns ------- ts_target : bool target (y) is a time series """ if y is not None: Nx = len(X) Ny = len(y) if Nx != Ny: raise ValueError("Number of time series different in X (%d) and y (%d)" % (Nx, Ny)) Xt, _ = get_ts_data_parts(X) Ntx = np.array([len(Xt[i]) for i in np.arange(Nx)]) Nty = np.array([len(np.atleast_1d(y[i])) for i in np.arange(Nx)]) if np.count_nonzero(Nty == 1) == Nx: # all targets are single values return False elif np.count_nonzero(Nty == Ntx) == Nx: # y is a time series return True elif np.count_nonzero(Nty == Nty[0]) == Nx: # target vector (eg multilabel or onehot) return False else: raise ValueError("Invalid time series lengths.\n" "Ns: ", Nx, "Ntx: ", Ntx, "Nty: ", Nty)
[docs]def check_ts_data_with_ts_target(X, y=None): """ Checks time series data with time series target is good. If not raises value error. Parameters ---------- X : array-like, shape [n_series, ...] Time series data and (optionally) contextual data y : array-like, shape [n_series, ...] target data """ if y is not None: Nx = len(X) Ny = len(y) if Nx != Ny: raise ValueError("Number of time series different in X (%d) and y (%d)" % (Nx, Ny)) Xt, _ = get_ts_data_parts(X) Ntx = np.array([len(Xt[i]) for i in np.arange(Nx)]) Nty = np.array([len(np.atleast_1d(y[i])) for i in np.arange(Nx)]) if np.count_nonzero(Nty == Ntx) == Nx: return else: raise ValueError("Invalid time series lengths.\n" "Ns: ", Nx, "Ntx: ", Ntx, "Nty: ", Nty)
[docs]def ts_stats(Xt, y, fs=1.0, class_labels=None): """ Generates some helpful statistics about the data X Parameters ---------- X : array-like, shape [n_series, ...] Time series data and (optionally) contextual data y : array-like, shape [n_series] target data fs : float sampling frequency class_labels : list of strings, default None List of target class names Returns ------- results : dict | Dictionary of relevant statistics for the time series data | results['total'] has stats for the whole data set | results['by_class'] has stats segragated by target class """ check_ts_data(Xt) Xt, Xs = get_ts_data_parts(Xt) if Xs is not None: S = len(np.atleast_1d(Xs[0])) else: S = 0 C = np.max(y) + 1 # number of classes if class_labels is None: class_labels = np.arange(C) N = len(Xt) if Xt[0].ndim > 1: D = Xt[0].shape[1] else: D = 1 Ti = np.array([Xt[i].shape[0] for i in range(N)], dtype=np.float64) / fs ic = np.array([y == i for i in range(C)]) Tic = [Ti[ic[i]] for i in range(C)] T = np.sum(Ti) total = {"n_series": N, "n_classes": C, "n_TS_vars": D, "n_context_vars": S, "Total_Time": T, "Series_Time_Mean": np.mean(Ti), "Series_Time_Std": np.std(Ti), "Series_Time_Range": (np.min(Ti), np.max(Ti))} by_class = {"Class_labels": class_labels, "n_series": np.array([len(Tic[i]) for i in range(C)]), "Total_Time": np.array([np.sum(Tic[i]) for i in range(C)]), "Series_Time_Mean": np.array([np.mean(Tic[i]) for i in range(C)]), "Series_Time_Std": np.array([np.std(Tic[i]) for i in range(C)]), "Series_Time_Min": np.array([np.min(Tic[i]) for i in range(C)]), "Series_Time_Max": np.array([np.max(Tic[i]) for i in range(C)])} results = {'total': total, 'by_class': by_class} return results
def interp_sort(t, x): """ sorts time series x by timestamp t, removing duplicates in the first entry this is required to user the scipy interp1d methods which returns nan when there are duplicate values for t_min this can be removed once the scipy issue is fixed Parameters ---------- t : array-like, shape [n] timestamps x : array-like, shape [n, ] data Returns ------- t : array-like, shape [n] timestamps x : array-like, shape [n, ] data """ if len(t) != len(x): raise ValueError("Interpolation time and value errors not equal") ind = np.argsort(t) t = t[ind] x = x[ind] t, ind = np.unique(t, return_index=True) if len(t) < len(x): warnings.warn("Interpolation time has duplicate time indices", UserWarning) x = x[ind] return t, x def segmented_prediction_to_series(yp, step, width, categorical_target=False): """ resamples prediction on a single segmented series to original series sampling Parameters ---------- yp : array-like, shape [n, ] prediction on segmented series step : int segmentation step size (number of samples) width : int segmentation width (number of samples) categorical_target : boolean set to True for classification problems and False for regression problems Returns ------- yt : array-like, shape [n, ] resampled prediction """ # average regression predictions if highly overlapping if not categorical_target and step < 0.5 * width: mask = segmentation_mask(len(yp), step, width) counts = np.bincount(mask) yt = np.repeat(yp, width, axis=0) if yt.ndim == 1: yt = np.nan_to_num(np.bincount(mask, weights=yt) / counts) else: yt = np.column_stack( [np.nan_to_num(np.bincount(mask, weights=yt[:, i]) / counts) for i in range(yt.shape[1])] ) return yt else: yt = np.repeat(yp[0:-1], step, axis=0) ye = np.repeat(yp[-1:], width, axis=0) yt = np.append(yt, ye, axis=0) return yt def segmentation_mask(N, step, width): mask = np.tile(np.arange(width), (N, 1)) steps = np.array([np.arange(start=0, stop=N * step, step=step)]).transpose() steps = np.tile(steps, (1, width)) mask = mask + steps return mask.flatten()