Distributed Training and Inference


Orca Estimator provides sklearn-style APIs for transparently distributed model training and inference

1. Estimator

To perform distributed training and inference, the user can first create an Orca Estimator from any standard (single-node) TensorFlow, Kera or PyTorch model, and then call Estimator.fit or Estimator.predict methods (using the data-parallel processing pipeline as input).

Under the hood, the Orca Estimator will replicate the model on each node in the cluster, feed the data partition (generated by the data-parallel processing pipeline) on each node to the local model replica, and synchronize model parameters using various backend technologies (such as Horovod, tf.distribute.MirroredStrategy, torch.distributed, or the parameter sync layer in BigDL).

2. TensorFlow/Keras Estimator

2.1 TensorFlow 1.15 and Keras 2.3

There are two ways to create an Estimator for TensorFlow 1.15, either from a low level computation graph or a Keras model. Examples are as follow:

TensorFlow Computation Graph:

# define inputs to the graph
images = tf.placeholder(dtype=tf.float32, shape=(None, 28, 28, 1))
labels = tf.placeholder(dtype=tf.int32, shape=(None,))

# define the network and loss
logits = lenet(images)
loss = tf.reduce_mean(tf.losses.sparse_softmax_cross_entropy(logits=logits, labels=labels))

# define a metric
acc = accuracy(logits, labels)

# create an estimator using endpoints of the graph
est = Estimator.from_graph(inputs=images,
                           outputs=logits,
                           labels=labels,
                           loss=loss,
                           optimizer=tf.train.AdamOptimizer(),
                           metrics={"acc": acc})

Keras Model:

model = create_keras_lenet_model()
model.compile(optimizer=keras.optimizers.RMSprop(),
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])
est = Estimator.from_keras(keras_model=model)

Then users can perform distributed model training and inference as follows:

dataset = tfds.load(name="mnist", split="train")
dataset = dataset.map(preprocess)
est.fit(data=mnist_train,
        batch_size=320,
        epochs=max_epoch)
predictions = est.predict(data=df,
                          feature_cols=['image'])

The data argument in fit method can be a Spark DataFrame, an XShards or a tf.data.Dataset. The data argument in predict method can be a spark DataFrame or an XShards. See the data-parallel processing pipeline page for more details.

View the related Python API doc for more details.

2.2 TensorFlow 2.x and Keras 2.4+

Using ray or Horovod backend

Users can create an Estimator for TensorFlow 2.x from a Keras model (using a Model Creator Function) when the backend is ray (currently default for TF2) or Horovod. For example:

def model_creator(config):
    model = create_keras_lenet_model()
    model.compile(optimizer=keras.optimizers.RMSprop(),
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])
    return model
est = Estimator.from_keras(model_creator=model_creator) # or backend="horovod"

The model_creator argument should be a function that takes a config dictionary and returns a compiled Keras model.

Then users can perform distributed model training and inference as follows:

def train_data_creator(config, batch_size):
    dataset = tfds.load(name="mnist", split="train")
    dataset = dataset.map(preprocess)
    dataset = dataset.batch(batch_size)
    return dataset
stats = est.fit(data=train_data_creator,
                epochs=max_epoch,
                steps_per_epoch=total_size // batch_size)
predictions = est.predict(data=df,
                          feature_cols=['image'])

The data argument in fit method can be a spark DataFrame, an XShards or a Data Creator Function (that returns a tf.data.Dataset). The data argument in predict method can be a spark DataFrame or an XShards. See the data-parallel processing pipeline page for more details.

View the related Python API doc for more details.

Using spark backend

Users can create an Estimator for TensorFlow 2.x using the spark backend as follows:

def model_creator(config):
    model = create_keras_lenet_model()
    model.compile(**compile_args(config))
    return model

def compile_args(config):
    if "lr" in config:
        lr = config["lr"]
    else:
        lr = 1e-2
    args = {
        "optimizer": keras.optimizers.SGD(lr),
        "loss": "mean_squared_error",
        "metrics": ["mean_squared_error"]
    }
    return args

est = Estimator.from_keras(model_creator=model_creator,
                           config={"lr": 1e-2},
                           workers_per_node=2,
                           backend="spark",
                           model_dir=model_dir)

The model_creator argument should be a function that takes a config dictionary and returns a compiled Keras model. The model_dir argument is required for spark backend, it should be a share filesystem path which can be accessed by executors for culster mode.

Then users can perform distributed model training and inference as follows:

def train_data_creator(config, batch_size):
    dataset = tfds.load(name="mnist", split="train")
    dataset = dataset.map(preprocess)
    dataset = dataset.batch(batch_size)
    return dataset
stats = est.fit(data=train_data_creator,
                epochs=max_epoch,
                steps_per_epoch=total_size // batch_size)
predictions = est.predict(data=df,
                          feature_cols=['image']).collect()

The data argument in fit method can be a spark DataFrame, an XShards or a Data Creator Function (that returns a tf.data.Dataset). The data argument in predict method can be a spark DataFrame or an XShards. See the data-parallel processing pipeline page for more details.

View the related Python API doc for more details.

For more details, view the distributed TensorFlow training/inference page<TODO: link to be added>.

3. PyTorch Estimator

Using BigDL backend

Users may create a PyTorch Estimator using the BigDL backend (currently default for PyTorch) as follows:

model = LeNet() # a torch.nn.Module
model.train()
criterion = nn.NLLLoss()

adam = torch.optim.Adam(model.parameters(), args.lr)
est = Estimator.from_torch(model=model, optimizer=adam, loss=criterion)

Then users can perform distributed model training and inference as follows:

est.fit(data=train_loader, epochs=args.epochs)
predictions = est.predict(xshards)

The input to fit methods can be a torch.utils.data.DataLoader, a Spark Dataframe, an XShards, or a Data Creator Function (that returns a torch.utils.data.DataLoader). The input to predict methods should be a Spark Dataframe, or an XShards. See the data-parallel processing pipeline page for more details.

View the related Python API doc for more details.

Using torch.distributed or Horovod backend

Alternatively, users can create a PyTorch Estimator using torch.distributed or Horovod backend by specifying the backend argument to be “ray” or “horovod”. In this case, the model and optimizer should be wrapped in Creater Functions. For example:

def model_creator(config):
    model = LeNet() # a torch.nn.Module
    model.train()
    return model

def optimizer_creator(model, config):
    return torch.optim.Adam(model.parameters(), config["lr"])

est = Estimator.from_torch(model=model,
                           optimizer=optimizer_creator,
                           loss=nn.NLLLoss(),
                           config={"lr": 1e-2},
                           backend="ray") # or backend="horovod"

Then users can perform distributed model training and inference as follows:

est.fit(data=train_loader_func, epochs=args.epochs)
predictions = est.predict(data=df,
                          feature_cols=['image'])

The input to fit methods can be a Spark DataFrame, an XShards, or a Data Creator Function (that returns a torch.utils.data.DataLoader). The data argument in predict method can be a Spark DataFrame or an XShards. See the data-parallel processing pipeline page for more details.

View the related Python API doc for more details.

For more details, view the distributed PyTorch training/inference page<TODO: link to be added>.

4. MXNet Estimator

The user may create a MXNet Estimator as follows:

from bigdl.orca.learn.mxnet import Estimator, create_config

def get_model(config):
    net = LeNet() # a mxnet.gluon.Block
    return net

def get_loss(config):
    return gluon.loss.SoftmaxCrossEntropyLoss()

config = create_config(log_interval=2, optimizer="adam",
                       optimizer_params={'learning_rate': 0.02})
est = Estimator.from_mxnet(config=config,
                           model_creator=get_model,
                           loss_creator=get_loss,
                           num_workers=2)

Then the user can perform distributed model training as follows:

import numpy as np

def get_train_data_iter(config, kv):
    train = mx.io.NDArrayIter(data_ndarray, label_ndarray,
                              batch_size=config["batch_size"], shuffle=True)
    return train

est.fit(get_train_data_iter, epochs=2)

The input to fit methods can be an XShards, or a Data Creator Function (that returns an MXNet DataIter/DataLoader). See the data-parallel processing pipeline page for more details.

View the related Python API doc<TODO: link to be added> for more details.

5. BigDL Estimator

The user may create a BigDL Estimator as follows:

from bigdl.dllib.nn.criterion import *
from bigdl.dllib.nn.layer import *
from bigdl.dllib.optim.optimizer import *
from bigdl.orca.learn.bigdl import Estimator

linear_model = Sequential().add(Linear(2, 2))
mse_criterion = MSECriterion()
est = Estimator.from_bigdl(model=linear_model, loss=mse_criterion, optimizer=Adam())

Then the user can perform distributed model training and inference as follows:

# read spark Dataframe
df = spark.read.parquet("data.parquet")

# distributed model training
est.fit(df, 1, batch_size=4)

#distributed model inference
result_df = est.predict(df)

The input to fit and predict methods can be a Spark Dataframe, or an XShards. See the data-parallel processing pipeline page for more details.

View the related Python API doc for more details.

6. OpenVINO Estimator

The user may create a OpenVINO Estimator as follows:

from bigdl.orca.learn.openvino import Estimator

model_path = "The/file_path/to/the/OpenVINO_IR_xml_file"
est = Estimator.from_openvino(model_path=model_path)

Then the user can perform distributed model inference as follows:

# ndarray
input_data = np.random.random([20, 4, 3, 224, 224])
result = est.predict(input_data)

# xshards
shards = XShards.partition({"x": input_data})
result_shards = est.predict(shards)

The input to predict methods can be an XShards, or a numpy array. See the data-parallel processing pipeline page for more details.

View the related Python API doc for more details.