Source code for bigdl.chronos.data.experimental.xshards_tsdataset

#
# Copyright 2016 The BigDL Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


from bigdl.orca.data.shard import SparkXShards
from bigdl.orca.learn.utils import dataframe_to_xshards_of_pandas_df
from bigdl.chronos.data.utils.utils import _to_list, _check_type
from bigdl.chronos.data.utils.roll import roll_timeseries_dataframe
from bigdl.chronos.data.utils.impute import impute_timeseries_dataframe
from bigdl.chronos.data.utils.split import split_timeseries_dataframe
from bigdl.chronos.data.experimental.utils import add_row, transform_to_dict
from bigdl.chronos.data.utils.scale import unscale_timeseries_numpy
from bigdl.chronos.data.utils.feature import generate_dt_features
import pandas as pd


_DEFAULT_ID_COL_NAME = "id"
_DEFAULT_ID_PLACEHOLDER = "0"


[docs]class XShardsTSDataset: def __init__(self, shards, **schema): ''' XShardTSDataset is an abstract of time series dataset with distributed fashion. Cascade call is supported for most of the transform methods. XShardTSDataset will partition the dataset by id_col, which is experimental. ''' self.shards = shards self.id_col = schema["id_col"] self.dt_col = schema["dt_col"] self.feature_col = schema["feature_col"].copy() self.target_col = schema["target_col"].copy() self.scaler_index = [i for i in range(len(self.target_col))] self.numpy_shards = None self._id_list = list(shards[self.id_col].unique())
[docs] @staticmethod def from_xshards(shards, dt_col, target_col, id_col=None, extra_feature_col=None, with_split=False, val_ratio=0, test_ratio=0.1): ''' Initialize xshardtsdataset(s) from xshard pandas dataframe. :param shards: an xshards pandas dataframe for your raw time series data. :param dt_col: a str indicates the col name of datetime column in the input data frame. :param target_col: a str or list indicates the col name of target column in the input data frame. :param id_col: (optional) a str indicates the col name of dataframe id. If it is not explicitly stated, then the data is interpreted as only containing a single id. :param extra_feature_col: (optional) a str or list indicates the col name of extra feature columns that needs to predict the target column. :param with_split: (optional) bool, states if we need to split the dataframe to train, validation and test set. The value defaults to False. :param val_ratio: (optional) float, validation ratio. Only effective when with_split is set to True. The value defaults to 0. :param test_ratio: (optional) float, test ratio. Only effective when with_split is set to True. The value defaults to 0.1. :return: a XShardTSDataset instance when with_split is set to False, three XShardTSDataset instances when with_split is set to True. Create a xshardtsdataset instance by: >>> # Here is a df example: >>> # id datetime value "extra feature 1" "extra feature 2" >>> # 00 2019-01-01 1.9 1 2 >>> # 01 2019-01-01 2.3 0 9 >>> # 00 2019-01-02 2.4 3 4 >>> # 01 2019-01-02 2.6 0 2 >>> from bigdl.orca.data.pandas import read_csv >>> shards = read_csv(csv_path) >>> tsdataset = XShardsTSDataset.from_xshards(shards, dt_col="datetime", >>> target_col="value", id_col="id", >>> extra_feature_col=["extra feature 1", >>> "extra feature 2"]) ''' _check_type(shards, "shards", SparkXShards) target_col = _to_list(target_col, name="target_col") feature_col = _to_list(extra_feature_col, name="extra_feature_col") if id_col is None: shards = shards.transform_shard(add_row, _DEFAULT_ID_COL_NAME, _DEFAULT_ID_PLACEHOLDER) id_col = _DEFAULT_ID_COL_NAME # repartition to id shards = shards.partition_by(cols=id_col, num_partitions=len(shards[id_col].unique())) if with_split: tsdataset_shards\ = shards.transform_shard(split_timeseries_dataframe, id_col, val_ratio, test_ratio).split() return [XShardsTSDataset(shards=tsdataset_shards[i], id_col=id_col, dt_col=dt_col, target_col=target_col, feature_col=feature_col) for i in range(3)] return XShardsTSDataset(shards=shards, id_col=id_col, dt_col=dt_col, target_col=target_col, feature_col=feature_col)
[docs] @staticmethod def from_sparkdf(df, dt_col, target_col, id_col=None, extra_feature_col=None, with_split=False, val_ratio=0, test_ratio=0.1): ''' Initialize xshardtsdataset(s) from Spark Dataframe. :param df: an Spark DataFrame for your raw time series data. :param dt_col: a str indicates the col name of datetime column in the input data frame. :param target_col: a str or list indicates the col name of target column in the input data frame. :param id_col: (optional) a str indicates the col name of dataframe id. If it is not explicitly stated, then the data is interpreted as only containing a single id. :param extra_feature_col: (optional) a str or list indicates the col name of extra feature columns that needs to predict the target column. :param with_split: (optional) bool, states if we need to split the dataframe to train, validation and test set. The value defaults to False. :param val_ratio: (optional) float, validation ratio. Only effective when with_split is set to True. The value defaults to 0. :param test_ratio: (optional) float, test ratio. Only effective when with_split is set to True. The value defaults to 0.1. :return: a XShardTSDataset instance when with_split is set to False, three XShardTSDataset instances when with_split is set to True. Create a xshardtsdataset instance by: >>> # Here is a df example: >>> # id datetime value "extra feature 1" "extra feature 2" >>> # 00 2019-01-01 1.9 1 2 >>> # 01 2019-01-01 2.3 0 9 >>> # 00 2019-01-02 2.4 3 4 >>> # 01 2019-01-02 2.6 0 2 >>> df = <pyspark.sql.dataframe.DataFrame> >>> tsdataset = XShardsTSDataset.from_xshards(df, dt_col="datetime", >>> target_col="value", id_col="id", >>> extra_feature_col=["extra feature 1", >>> "extra feature 2"]) ''' from pyspark.sql.dataframe import DataFrame _check_type(df, "df", DataFrame) target_col = _to_list(target_col, name="target_col") feature_col = _to_list(extra_feature_col, name="extra_feature_col") all_col = target_col + feature_col + _to_list(id_col, name="id_col") + [dt_col] shards = dataframe_to_xshards_of_pandas_df(df, feature_cols=all_col, label_cols=None, accept_str_col=False) if id_col is None: shards = shards.transform_shard(add_row, _DEFAULT_ID_COL_NAME, _DEFAULT_ID_PLACEHOLDER) id_col = _DEFAULT_ID_COL_NAME # repartition to id shards = shards.partition_by(cols=id_col, num_partitions=len(shards[id_col].unique())) if with_split: tsdataset_shards\ = shards.transform_shard(split_timeseries_dataframe, id_col, val_ratio, test_ratio).split() return [XShardsTSDataset(shards=tsdataset_shards[i], id_col=id_col, dt_col=dt_col, target_col=target_col, feature_col=feature_col) for i in range(3)] return XShardsTSDataset(shards=shards, id_col=id_col, dt_col=dt_col, target_col=target_col, feature_col=feature_col)
[docs] def roll(self, lookback, horizon, feature_col=None, target_col=None, id_sensitive=False): ''' Sampling by rolling for machine learning/deep learning models. :param lookback: int, lookback value. :param horizon: int or list, if `horizon` is an int, we will sample `horizon` step continuously after the forecasting point. if `horizon` is a list, we will sample discretely according to the input list. specially, when `horizon` is set to 0, ground truth will be generated as None. :param feature_col: str or list, indicates the feature col name. Default to None, where we will take all available feature in rolling. :param target_col: str or list, indicates the target col name. Default to None, where we will take all target in rolling. it should be a subset of target_col you used to initialize the xshardtsdataset. :param id_sensitive: bool, |if `id_sensitive` is False, we will rolling on each id's sub dataframe |and fuse the sampings. |The shape of rolling will be |x: (num_sample, lookback, num_feature_col + num_target_col) |y: (num_sample, horizon, num_target_col) |where num_sample is the summation of sample number of each dataframe |if `id_sensitive` is True, we have not implement this currently. :return: the xshardtsdataset instance. ''' from bigdl.nano.utils.common import invalidInputError if id_sensitive: invalidInputError(False, "id_sensitive option has not been implemented.") feature_col = _to_list(feature_col, "feature_col") if feature_col is not None \ else self.feature_col target_col = _to_list(target_col, "target_col") if target_col is not None \ else self.target_col self.numpy_shards = self.shards.transform_shard(roll_timeseries_dataframe, None, lookback, horizon, feature_col, target_col, self.id_col, 0, True) self.scaler_index = [self.target_col.index(target_col[i]) for i in range(len(target_col))] return self
[docs] def scale(self, scaler, fit=True): ''' Scale the time series dataset's feature column and target column. :param scaler: a dictionary of scaler instance, where keys are id name and values are corresponding scaler instance. e.g. if you have 2 ids called "id1" and "id2", a legal scaler input can be {"id1": StandardScaler(), "id2": StandardScaler()} :param fit: if we need to fit the scaler. Typically, the value should be set to True for training set, while False for validation and test set. The value is defaulted to True. :return: the xshardtsdataset instance. Assume there is a training set tsdata and a test set tsdata_test. scale() should be called first on training set with default value fit=True, then be called on test set with the same scaler and fit=False. >>> from sklearn.preprocessing import StandardScaler >>> scaler = {"id1": StandardScaler(), "id2": StandardScaler()} >>> tsdata.scale(scaler, fit=True) >>> tsdata_test.scale(scaler, fit=False) ''' def _fit(df, id_col, scaler, feature_col, target_col): ''' This function is used to fit scaler dictionary on each shard. returns a dictionary of id-scaler pair for each shard. Note: this function will not transform the shard. ''' scaler_for_this_id = scaler[df[id_col].iloc[0]] df[target_col + feature_col] = scaler_for_this_id.fit(df[target_col + feature_col]) return {id_col: df[id_col].iloc[0], "scaler": scaler_for_this_id} def _transform(df, id_col, scaler, feature_col, target_col): ''' This function is used to transform the shard by fitted scaler. Note: this function will not fit the scaler. ''' from sklearn.utils.validation import check_is_fitted from bigdl.nano.utils.common import invalidInputError scaler_for_this_id = scaler[df[id_col].iloc[0]] invalidInputError(not check_is_fitted(scaler_for_this_id), "scaler is not fitted. When calling scale for the first time, " "you need to set fit=True.") df[target_col + feature_col] =\ scaler_for_this_id.transform(df[target_col + feature_col]) return df if fit: self.shards_scaler = self.shards.transform_shard(_fit, self.id_col, scaler, self.feature_col, self.target_col) self.scaler_dict = self.shards_scaler.collect() self.scaler_dict = {sc[self.id_col]: sc["scaler"] for sc in self.scaler_dict} scaler.update(self.scaler_dict) # make the change up-to-date outside the tsdata self.shards = self.shards.transform_shard(_transform, self.id_col, self.scaler_dict, self.feature_col, self.target_col) else: self.scaler_dict = scaler self.shards = self.shards.transform_shard(_transform, self.id_col, self.scaler_dict, self.feature_col, self.target_col) return self
[docs] def unscale(self): ''' Unscale the time series dataset's feature column and target column. :return: the xshardtsdataset instance. ''' def _inverse_transform(df, id_col, scaler, feature_col, target_col): from sklearn.utils.validation import check_is_fitted from bigdl.nano.utils.common import invalidInputError scaler_for_this_id = scaler[df[id_col].iloc[0]] invalidInputError(not check_is_fitted(scaler_for_this_id), "scaler is not fitted. When calling scale for the first time, " "you need to set fit=True.") df[target_col + feature_col] =\ scaler_for_this_id.inverse_transform(df[target_col + feature_col]) return df self.shards = self.shards.transform_shard(_inverse_transform, self.id_col, self.scaler_dict, self.feature_col, self.target_col) return self
[docs] def gen_dt_feature(self, features): ''' Generate datetime feature(s) for each record. :param features: list, states which feature(s) will be generated. The list should contain the features you want to generate. A table of all datatime features and their description is listed below. | "MINUTE": The minute of the time stamp. | "DAY": The day of the time stamp. | "DAYOFYEAR": The ordinal day of the year of the time stamp. | "HOUR": The hour of the time stamp. | "WEEKDAY": The day of the week of the time stamp, Monday=0, Sunday=6. | "WEEKOFYEAR": The ordinal week of the year of the time stamp. | "MONTH": The month of the time stamp. | "YEAR": The year of the time stamp. | "IS_AWAKE": Bool value indicating whether it belongs to awake hours for the time stamp, | True for hours between 6A.M. and 1A.M. | "IS_BUSY_HOURS": Bool value indicating whether it belongs to busy hours for the time | stamp, True for hours between 7A.M. and 10A.M. and hours between 4P.M. and 8P.M. | "IS_WEEKEND": Bool value indicating whether it belongs to weekends for the time stamp, | True for Saturdays and Sundays. :return: the xshards instance. ''' features_generated = [] self.shards = self.shards.transform_shard(generate_dt_features, self.dt_col, features, None, None, features_generated) features_generated = [fe for fe in features if fe not in self.target_col + [self.dt_col, self.id_col]] self.feature_col += features_generated return self
[docs] def unscale_xshards(self, data, key=None): ''' Unscale the time series forecaster's numpy prediction result/ground truth. :param data: xshards same with self.numpy_xshards. :param key: str, 'y' or 'prediction', default to 'y'. if no "prediction" or "y" return an error and require our users to input a key. if key is None, key will be set 'prediction'. :return: the unscaled xshardtsdataset instance. ''' from bigdl.nano.utils.common import invalidInputError def _inverse_transform(data, scaler, scaler_index, key): from sklearn.utils.validation import check_is_fitted from bigdl.nano.utils.common import invalidInputError id = data['id'][0, 0] scaler_for_this_id = scaler[id] invalidInputError(not check_is_fitted(scaler_for_this_id), "scaler is not fitted. When calling scale for the first time, " "you need to set fit=True.") return unscale_timeseries_numpy(data[key], scaler_for_this_id, scaler_index) if key is None: key = 'prediction' invalidInputError(key in {'y', 'prediction'}, "key is not in {'y', 'prediction'}, " "please input the correct key.") return data.transform_shard(_inverse_transform, self.scaler_dict, self.scaler_index, key)
[docs] def impute(self, mode="last", const_num=0): ''' Impute the tsdataset by imputing each univariate time series distinguished by id_col and feature_col. :param mode: imputation mode, select from "last", "const" or "linear". "last": impute by propagating the last non N/A number to its following N/A. if there is no non N/A number ahead, 0 is filled instead. "const": impute by a const value input by user. "linear": impute by linear interpolation. :param const_num: indicates the const number to fill, which is only effective when mode is set to "const". :return: the tsdataset instance. ''' def df_reset_index(df): df.reset_index(drop=True, inplace=True) return df self.shards = self.shards.transform_shard(impute_timeseries_dataframe, self.dt_col, mode, const_num) self.shards = self.shards.transform_shard(df_reset_index) return self
[docs] def to_xshards(self, partition_num=None): ''' Export rolling result in form of a dict of numpy ndarray {'x': ..., 'y': ..., 'id': ...}, where value for 'x' and 'y' are 3-dim numpy ndarray and value for 'id' is 2-dim ndarray with shape (batch_size, 1) :param partition_num: how many partition you would like to split your data. :return: a 3-element dict xshard. each value is a 3d numpy ndarray. The ndarray is casted to float32. Default to None which will partition according to id. ''' from bigdl.nano.utils.common import invalidInputError if self.numpy_shards is None: invalidInputError(False, "Please call 'roll' method " "before transform a XshardsTSDataset to numpy ndarray!") if partition_num is None: return self.numpy_shards.transform_shard(transform_to_dict) else: return self.numpy_shards.transform_shard(transform_to_dict).repartition(partition_num)