Spark ML Pipeline Support

1. NNFrames Overview

NNFrames in DLlib provides Spark DataFrame and ML Pipeline support of distributed deep learning on Apache Spark. It includes both Python and Scala interfaces, and is compatible with both Spark 2.x and Spark 3.x.

Examples

The examples are included in the DLlib source code.

  • image classification: model inference using pre-trained Inception v1 model. (See Python version)

  • image classification: transfer learning from pre-trained Inception v1 model. (See Python version)

2. Primary APIs

  • NNEstimator and NNModel

    BigDL DLLib provides NNEstimator for model training with Spark DataFrame, which provides high level API for training a BigDL Model with the Apache Spark Estimator and Transfomer pattern, thus users can conveniently fit BigDL DLLib into a ML pipeline. The fit result of NNEstimator is a NNModel, which is a Spark ML Transformer.

  • NNClassifier and NNClassifierModel

    NNClassifier and NNClassifierModelextends NNEstimator and NNModel and focus on classification tasks, where both label column and prediction column are of Double type.

  • NNImageReader

    NNImageReader loads image into Spark DataFrame.


2.1 NNEstimator

Scala:

val estimator = NNEstimator(model, criterion)

Python:

estimator = NNEstimator(model, criterion)

NNEstimator extends org.apache.spark.ml.Estimator and supports training a BigDL model with Spark DataFrame data. It can be integrated into a standard Spark ML Pipeline to allow users to combine the components of BigDL and Spark MLlib.

NNEstimator supports different feature and label data types through Preprocessing. During fit (training), NNEstimator will extract feature and label data from input DataFrame and use the Preprocessing to convert data for the model, typically converts the feature and label to Tensors or converts the (feature, option[Label]) tuple to a BigDL Sample.

EachPreprocessing conducts a data conversion step in the preprocessing phase, multiple Preprocessing can be combined into a ChainedPreprocessing. Some pre-defined Preprocessing for popular data types like Image, Array or Vector are provided in package com.intel.analytics.bigdl.dllib.feature, while user can also develop customized Preprocessing.

NNEstimator and NNClassifier also supports setting the caching level for the training data. Options are “DRAM”, “PMEM” or “DISK_AND_DRAM”. If DISK_AND_DRAM(numSlice) is used, only 1/numSlice data will be loaded into memory during training time. By default, DRAM mode is used and all data are cached in memory.

By default, SeqToTensor is used to convert an array or Vector to a 1-dimension Tensor. Using the Preprocessing allows NNEstimator to cache only the raw data and decrease the memory consumption during feature conversion and training, it also enables the model to digest extra data types that DataFrame does not support currently.

More concrete examples are available in package com.intel.analytics.bigdl.dllib.examples.nnframes

NNEstimator can be created with various parameters for different scenarios.

  • NNEstimator(model, criterion)

    Takes only model and criterion and use SeqToTensor as feature and label Preprocessing. NNEstimator will extract the data from feature and label columns (only Scalar, Array[_] or Vector data type are supported) and convert each feature/label to 1-dimension Tensor. The tensors will be combined into BigDL Sample and send to model for training.

  • NNEstimator(model, criterion, featureSize: Array[Int], labelSize: Array[Int])

    Takes model, criterion, featureSize(Array of Int) and labelSize(Array of Int). NNEstimator will extract the data from feature and label columns (only Scalar, Array[_] or Vector data type are supported) and convert each feature/label to Tensor according to the specified Tensor size.

  • NNEstimator(model, criterion, featureSize: Array[Array[Int]], labelSize: Array[Int])

    This is the interface for multi-input model. It takes model, criterion, featureSize(Array of Int Array) and labelSize(Array of Int). NNEstimator will extract the data from feature and label columns (only Scalar, Array[_] or Vector data type are supported) and convert each feature/label to Tensor according to the specified Tensor size.

  • NNEstimator(model, criterion, featurePreprocessing: Preprocessing[F, Tensor[T]], labelPreprocessing: Preprocessing[F, Tensor[T]])

    Takes model, criterion, featurePreprocessing and labelPreprocessing. NNEstimator will extract the data from feature and label columns and convert each feature/label to Tensor with the featurePreprocessing and labelPreprocessing. This constructor provides more flexibility in supporting extra data types.

Meanwhile, for advanced use cases (e.g. model with multiple input tensor), NNEstimator supports: setSamplePreprocessing(value: Preprocessing[(Any, Option[Any]), Sample[T]]) to directly compose Sample according to user-specified Preprocessing.

Scala Example:

import com.intel.analytics.bigdl.dllib.nn._
import com.intel.analytics.bigdl.dllib.nnframes.NNEstimator
import com.intel.analytics.bigdl.dllib.tensor.TensorNumericMath.TensorNumeric.NumericFloat

val model = Sequential().add(Linear(2, 2))
val criterion = MSECriterion()
val estimator = NNEstimator(model, criterion)
  .setLearningRate(0.2)
  .setMaxEpoch(40)
val data = sc.parallelize(Seq(
  (Array(2.0, 1.0), Array(1.0, 2.0)),
  (Array(1.0, 2.0), Array(2.0, 1.0)),
  (Array(2.0, 1.0), Array(1.0, 2.0)),
  (Array(1.0, 2.0), Array(2.0, 1.0))))
val df = sqlContext.createDataFrame(data).toDF("features", "label")
val nnModel = estimator.fit(df)
nnModel.transform(df).show(false)

Python Example:

from bigdl.dllib.nn.layer import *
from bigdl.dllib.nn.criterion import *
from bigdl.dllib.utils.common import *
from bigdl.dllib.nnframes.nn_classifier import *
from bigdl.dllib.feature.common import *

data = self.sc.parallelize([
    ((2.0, 1.0), (1.0, 2.0)),
    ((1.0, 2.0), (2.0, 1.0)),
    ((2.0, 1.0), (1.0, 2.0)),
    ((1.0, 2.0), (2.0, 1.0))])

schema = StructType([
    StructField("features", ArrayType(DoubleType(), False), False),
    StructField("label", ArrayType(DoubleType(), False), False)])
df = self.sqlContext.createDataFrame(data, schema)
model = Sequential().add(Linear(2, 2))
criterion = MSECriterion()
estimator = NNEstimator(model, criterion, SeqToTensor([2]), ArrayToTensor([2]))\
    .setBatchSize(4).setLearningRate(0.2).setMaxEpoch(40) \
nnModel = estimator.fit(df)
res = nnModel.transform(df)

Example with multi-inputs Model. This example trains a model with 3 inputs. And users can use VectorAssembler from Spark MLlib to combine different fields. With the specified sizes for each model input, NNEstiamtor and NNClassifer will split the input features data and send tensors to corresponding inputs.

sparkConf = init_spark_conf().setAppName("testNNClassifer").setMaster('local[1]')
sc = init_nncontext(sparkConf)
spark = SparkSession\
    .builder\
    .getOrCreate()

df = spark.createDataFrame(
    [(1, 35, 109.0, Vectors.dense([2.0, 5.0, 0.5, 0.5]), 1.0),
     (2, 58, 2998.0, Vectors.dense([4.0, 10.0, 0.5, 0.5]), 2.0),
     (3, 18, 123.0, Vectors.dense([3.0, 15.0, 0.5, 0.5]), 1.0)],
    ["user", "age", "income", "history", "label"])

assembler = VectorAssembler(
    inputCols=["user", "age", "income", "history"],
    outputCol="features")

df = assembler.transform(df)

x1 = ZLayer.Input(shape=(1,))
x2 = ZLayer.Input(shape=(2,))
x3 = ZLayer.Input(shape=(2, 2,))

user_embedding = ZLayer.Embedding(5, 10)(x1)
flatten = ZLayer.Flatten()(user_embedding)
dense1 = ZLayer.Dense(2)(x2)
gru = ZLayer.LSTM(4, input_shape=(2, 2))(x3)

merged = ZLayer.merge([flatten, dense1, gru], mode="concat")
zy = ZLayer.Dense(2)(merged)

zmodel = ZModel([x1, x2, x3], zy)
criterion = ZooClassNLLCriterion()
classifier = NNClassifier(zmodel, criterion, [[1], [2], [2, 2]]) \
    .setOptimMethod(Adam()) \
    .setLearningRate(0.1)\
    .setBatchSize(2) \
    .setMaxEpoch(10)

nnClassifierModel = classifier.fit(df)
print(nnClassifierModel.getBatchSize())
res = nnClassifierModel.transform(df).collect()

2.2 NNModel

Scala:

val nnModel = NNModel(bigDLModel)

Python:

nn_model = NNModel(bigDLModel)

NNModel extends Spark’s ML Transformer. User can invoke fit in NNEstimator to get a NNModel, or directly compose a NNModel from BigDLModel. It enables users to wrap a pre-trained BigDL Model into a NNModel, and use it as a transformer in your Spark ML pipeline to predict the results for DataFrame (DataSet).

NNModel can be created with various parameters for different scenarios.

  • NNModel(model)

    Takes only model and use SeqToTensor as feature Preprocessing. NNModel will extract the data from feature column (only Scalar, Array[_] or Vector data type are supported) and convert each feature to 1-dimension Tensor. The tensors will be sent to model for inference.

  • NNModel(model, featureSize: Array[Int])

    Takes model and featureSize(Array of Int). NNModel will extract the data from feature column (only Scalar, Array[_] or Vector data type are supported) and convert each feature to Tensor according to the specified Tensor size. User can also set featureSize as Array[Array[Int]] for multi-inputs model.

  • NNModel(model, featurePreprocessing: Preprocessing[F, Tensor[T]])

    Takes model and featurePreprocessing. NNModel will extract the data from feature column and convert each feature to Tensor with the featurePreprocessing. This constructor provides more flexibility in supporting extra data types.

Meanwhile, for advanced use cases (e.g. model with multiple input tensor), NNModel supports: setSamplePreprocessing(value: Preprocessing[Any, Sample[T]])to directly compose Sample according to user-specified Preprocessing.

We can get model from NNModel by:

Scala:

val model = nnModel.getModel()

Python:

model = nn_model.getModel()

2.3 NNClassifier

Scala:

val classifer =  NNClassifer(model, criterion)

Python:

classifier = NNClassifer(model, criterion)

NNClassifier is a specialized NNEstimator that simplifies the data format for classification tasks where the label space is discrete. It only supports label column of DoubleType, and the fitted NNClassifierModel will have the prediction column of DoubleType.

  • model BigDL module to be optimized in the fit() method

  • criterion the criterion used to compute the loss and the gradient

NNClassifier can be created with various parameters for different scenarios.

  • NNClassifier(model, criterion)

    Takes only model and criterion and use SeqToTensor as feature and label Preprocessing. NNClassifier will extract the data from feature and label columns (only Scalar, Array[_] or Vector data type are supported) and convert each feature/label to 1-dimension Tensor. The tensors will be combined into BigDL samples and send to model for training.

  • NNClassifier(model, criterion, featureSize: Array[Int])

    Takes model, criterion, featureSize(Array of Int). NNClassifier will extract the data from feature and label columns and convert each feature to Tensor according to the specified Tensor size. ScalarToTensor is used to convert the label column. User can also set featureSize as Array[Array[Int]] for multi-inputs model.

  • NNClassifier(model, criterion, featurePreprocessing: Preprocessing[F, Tensor[T]])

    Takes model, criterion and featurePreprocessing. NNClassifier will extract the data from feature and label columns and convert each feature to Tensor with the featurePreprocessing. This constructor provides more flexibility in supporting extra data types.

Meanwhile, for advanced use cases (e.g. model with multiple input tensor), NNClassifier supports setSamplePreprocessing(value: Preprocessing[(Any, Option[Any]), Sample[T]]) to directly compose Sample with user-specified Preprocessing.

Scala example:

import com.intel.analytics.bigdl.dllib.nn._
import com.intel.analytics.bigdl.dllib.nnframes.NNClassifier
import com.intel.analytics.bigdl.dllib.tensor.TensorNumericMath.TensorNumeric.NumericFloat

val model = Sequential().add(Linear(2, 2))
val criterion = MSECriterion()
val estimator = NNClassifier(model, criterion)
  .setLearningRate(0.2)
  .setMaxEpoch(40)
val data = sc.parallelize(Seq(
  (Array(0.0, 1.0), 1.0),
  (Array(1.0, 0.0), 2.0),
  (Array(0.0, 1.0), 1.0),
  (Array(1.0, 0.0), 2.0)))
val df = sqlContext.createDataFrame(data).toDF("features", "label")
val dlModel = estimator.fit(df)
dlModel.transform(df).show(false)

Python Example:

from bigdl.nn.layer import *
from bigdl.nn.criterion import *
from bigdl.util.common import *
from bigdl.dlframes.dl_classifier import *
from pyspark.sql.types import *

#Logistic Regression with BigDL layers and NNClassifier
model = Sequential().add(Linear(2, 2)).add(LogSoftMax())
criterion = ZooClassNLLCriterion()
estimator = NNClassifier(model, criterion, [2]).setBatchSize(4).setMaxEpoch(10)
data = sc.parallelize([
    ((0.0, 1.0), [1.0]),
    ((1.0, 0.0), [2.0]),
    ((0.0, 1.0), [1.0]),
    ((1.0, 0.0), [2.0])])

schema = StructType([
    StructField("features", ArrayType(DoubleType(), False), False),
    StructField("label", ArrayType(DoubleType(), False), False)])
df = sqlContext.createDataFrame(data, schema)
dlModel = estimator.fit(df)
dlModel.transform(df).show(False)

2.4 NNClassifierModel

Scala:

val nnClassifierModel = NNClassifierModel(model, featureSize)

Python:

nn_classifier_model = NNClassifierModel(model)

NNClassifierModel is a specialized NNModel for classification tasks. Both label and prediction column will have the datatype of Double.

NNClassifierModel can be created with various parameters for different scenarios.

  • NNClassifierModel(model)

    Takes only model and use SeqToTensor as feature Preprocessing. NNClassifierModel will extract the data from feature column (only Scalar, Array[_] or Vector data type are supported) and convert each feature to 1-dimension Tensor. The tensors will be sent to model for inference.

  • NNClassifierModel(model, featureSize: Array[Int])

    Takes model and featureSize(Array of Int). NNClassifierModel will extract the data from feature column (only Scalar, Array[_] or Vector data type are supported) and convert each feature to Tensor according to the specified Tensor size. User can also set featureSize as Array[Array[Int]] for multi-inputs model.

  • NNClassifierModel(model, featurePreprocessing: Preprocessing[F, Tensor[T]])

    Takes model and featurePreprocessing. NNClassifierModel will extract the data from feature column and convert each feature to Tensor with the featurePreprocessing. This constructor provides more flexibility in supporting extra data types.

Meanwhile, for advanced use cases (e.g. model with multiple input tensor), NNClassifierModel supports: setSamplePreprocessing(value: Preprocessing[Any, Sample[T]])to directly compose Sample according to user-specified Preprocessing.


2.5 Hyperparameter Setting

Prior to the commencement of the training process, you can modify the optimization algorithm, batch size, the epoch number of your training, and learning rate to meet your goal or NNEstimator/NNClassifier will use the default value.

Continue the codes above, NNEstimator and NNClassifier can be set in the same way.

Scala:

//for esitmator
estimator.setBatchSize(4).setMaxEpoch(10).setLearningRate(0.01).setOptimMethod(new Adam())
//for classifier
classifier.setBatchSize(4).setMaxEpoch(10).setLearningRate(0.01).setOptimMethod(new Adam())

Python:

# for esitmator
estimator.setBatchSize(4).setMaxEpoch(10).setLearningRate(0.01).setOptimMethod(Adam())
# for classifier
classifier.setBatchSize(4).setMaxEpoch(10).setLearningRate(0.01).setOptimMethod(Adam())

2.6 Training

NNEstimator/NNCLassifer supports training with Spark’s DataFrame/DataSet

Suppose df is the training data, simple call fit method and let BigDL DLLib train the model for you.

Scala:

//get a NNClassifierModel
val nnClassifierModel = classifier.fit(df)

Python:

# get a NNClassifierModel
nnClassifierModel = classifier.fit(df)

User may also set validation DataFrame and validation frequency through setValidation method. Train summay and validation summary can also be configured to log the training process for visualization in Tensorboard.

2.7 Prediction

Since NNModel/NNClassifierModel inherits from Spark’s Transformer abstract class, simply call transform method on NNModel/NNClassifierModel to make prediction.

Scala:

nnModel.transform(df).show(false)

Python:

nnModel.transform(df).show(false)

For the complete examples of NNFrames, please refer to: Scala examples Python examples

2.8 NNImageReader

NNImageReader is the primary DataFrame-based image loading interface, defining API to read images into DataFrame.

Scala:

val imageDF = NNImageReader.readImages(imageDirectory, sc)

Python:

image_frame = NNImageReader.readImages(image_path, self.sc)

The output DataFrame contains a sinlge column named “image”. The schema of “image” column can be accessed from com.intel.analytics.bigdl.dllib.nnframes.DLImageSchema.byteSchema. Each record in “image” column represents one image record, in the format of Row(origin, height, width, num of channels, mode, data), where origin contains the URI for the image file, and data holds the original file bytes for the image file. mode represents the OpenCV-compatible type: CV_8UC3, CV_8UC1 in most cases.

val byteSchema = StructType(
  StructField("origin", StringType, true) ::
    StructField("height", IntegerType, false) ::
    StructField("width", IntegerType, false) ::
    StructField("nChannels", IntegerType, false) ::
    // OpenCV-compatible type: CV_8UC3, CV_32FC3 in most cases
    StructField("mode", IntegerType, false) ::
    // Bytes in OpenCV-compatible order: row-wise BGR in most cases
    StructField("data", BinaryType, false) :: Nil)

After loading the image, user can compose the preprocess steps with the Preprocessing defined in com.intel.analytics.bigdl.dllib.feature.image.