Source code for seglearn.split

"""
This module has two classes for splitting time series data temporally - where train/test or fold
splits are created within each of the time series' in the time series data. This splitting approach
is for evaluating how well the algorithm performs on segments drawn from the same time series but
excluded from the training set. The performance from this splitting approach should be similar to
performance on the training data so long as the data in each series is relatively uniform.

Note that splitting along the temporal axis violates the assumption of independence between train
and test samples, as successive samples in a sequence or series are not iid. However, temporal
splitting is still useful in certain cases such as for the analysis of a single sequence / series.
"""

# Author: David Burns
# License: BSD

import numpy as np
from sklearn.model_selection._split import _build_repr

from .util import check_ts_data, get_ts_data_parts
from .base import TS_Data


[docs]class TemporalKFold(object): """ K-fold iterator variant for temporal splitting of time series data The time series' are divided in time with no overlap, and are balanced. By splitting the time series', the number of samples in the data set is changed and so new arrays for the data and target are returned by the ``split`` function in addition to the iterator. Parameters ---------- n_splits : int > 1 number of folds Examples -------- >>> from seglearn.split import TemporalKFold >>> from seglearn.datasets import load_watch >>> data = load_watch() >>> splitter = TemporalKFold(n_splits=4) >>> X, y, cv = splitter.split(data['X'], data['y']) """ def __init__(self, n_splits=3): if n_splits < 2: raise ValueError("TemporalKFold: n_splits must be >= 2 (set to %d)" % n_splits) self.n_splits = n_splits def __repr__(self): return _build_repr(self)
[docs] def split(self, X, y): """ Splits time series data and target arrays, and generates splitting indices Parameters ---------- X : array-like, shape [n_series, ...] Time series data and (optionally) contextual data y : array-like shape [n_series, ] target vector Returns ------- X : array-like, shape [n_series * n_splits, ] Split time series data and contextual data y : array-like, shape [n_series * n_splits] Split target data cv : list, shape [2, n_splits] Splitting indices """ check_ts_data(X, y) Xt, Xc = get_ts_data_parts(X) Ns = len(Xt) Xt_new, y_new = self._ts_slice(Xt, y) if Xc is not None: Xc_new = np.concatenate([Xc] * self.n_splits) X_new = TS_Data(Xt_new, Xc_new) else: X_new = np.array(Xt_new) cv = self._make_indices(Ns) return X_new, y_new, cv
def _ts_slice(self, Xt, y): """ takes time series data, and splits each series into temporal folds """ Ns = len(Xt) Xt_new = [] for i in range(self.n_splits): for j in range(Ns): Njs = int(len(Xt[j]) / self.n_splits) Xt_new.append(Xt[j][(Njs * i):(Njs * (i + 1))]) Xt_new = np.array(Xt_new) if len(np.atleast_1d(y[0])) == len(Xt[0]): # y is a time series y_new = [] for i in range(self.n_splits): for j in range(Ns): Njs = int(len(y[j]) / self.n_splits) y_new.append(y[j][(Njs * i):(Njs * (i + 1))]) y_new = np.array(y_new) else: # y is contextual to each series y_new = np.concatenate([y for i in range(self.n_splits)]) return Xt_new, y_new def _make_indices(self, Ns): """ makes indices for cross validation """ N_new = int(Ns * self.n_splits) test = [np.full(N_new, False) for i in range(self.n_splits)] for i in range(self.n_splits): test[i][np.arange(Ns * i, Ns * (i + 1))] = True train = [np.logical_not(test[i]) for i in range(self.n_splits)] test = [np.arange(N_new)[test[i]] for i in range(self.n_splits)] train = [np.arange(N_new)[train[i]] for i in range(self.n_splits)] cv = list(zip(train, test)) return cv
[docs]def temporal_split(X, y, test_size=0.25): """ Split time series or sequence data along the time axis. Test data is drawn from the end of each series / sequence Parameters ---------- X : array-like, shape [n_series, ...] Time series data and (optionally) contextual data y : array-like shape [n_series, ] target vector test_size : float between 0 and 1, amount to allocate to test Returns ------- X_train : array-like, shape [n_series, ] X_test : array-like, shape [n_series, ] y_train : array-like, shape [n_series, ] y_test : array-like, shape [n_series, ] """ if test_size <= 0. or test_size >= 1.: raise ValueError("temporal_split: test_size must be >= 0.0 and <= 1.0" " (was %.1f)" %test_size) Ns = len(y) # number of series check_ts_data(X, y) Xt, Xc = get_ts_data_parts(X) train_size = 1. - test_size train_ind = [np.arange(0, int(train_size * len(Xt[i]))) for i in range(Ns)] test_ind = [np.arange(len(train_ind[i]), len(Xt[i])) for i in range(Ns)] X_train = [Xt[i][train_ind[i]] for i in range(Ns)] X_test = [Xt[i][test_ind[i]] for i in range(Ns)] if Xc is not None: X_train = TS_Data(X_train, Xc) X_test = TS_Data(X_test, Xc) if len(np.atleast_1d(y[0])) == len(Xt[0]): # y is a time series y_train = [y[i][train_ind[i]] for i in range(Ns)] y_test = [y[i][test_ind[i]] for i in range(Ns)] else: # y is contextual y_train = y y_test = y return X_train, X_test, y_train, y_test