#
# Copyright 2016 The BigDL Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import math
import os.path
from pyspark.sql import DataFrame
from pyspark.sql import Row
from bigdl.orca.data import SparkXShards
from bigdl.orca.learn.spark_estimator import Estimator as SparkEstimator
from bigdl.dllib.utils.common import get_node_and_core_number
from bigdl.dllib.utils import nest
from bigdl.dllib.nncontext import init_nncontext
from bigdl.orca.data.utils import spark_df_to_pd_sparkxshards
from bigdl.orca.learn.utils import process_xshards_of_pandas_dataframe,\
add_predict_to_pd_xshards
from openvino.inference_engine import IECore
import numpy as np
from bigdl.orca.learn.utils import openvino_output_to_sdf
from bigdl.dllib.utils.log4Error import invalidInputError
from typing import (Dict, List, Optional, Union)
[docs]class Estimator(object):
[docs] @staticmethod
def from_openvino(*, model_path: str) -> "OpenvinoEstimator":
"""
Load an openVINO Estimator.
:param model_path: String. The file path to the OpenVINO IR xml file.
"""
return OpenvinoEstimator(model_path=model_path)
[docs]class OpenvinoEstimator(SparkEstimator):
def __init__(self,
*,
model_path: str) -> None:
self.load(model_path)
[docs] def fit(self, data, epochs, batch_size=32, feature_cols=None, label_cols=None,
validation_data=None, checkpoint_trigger=None):
"""
Fit is not supported in OpenVINOEstimator
"""
invalidInputError(False, "not implemented")
[docs] def predict(self, # type: ignore[override]
data: Union["SparkXShards", "DataFrame", "np.ndarray", List["np.ndarray"]],
feature_cols: Optional[List[str]] = None,
batch_size: Optional[int] = 4,
input_cols: Optional[Union[str, List[str]]]=None,
config: Optional[Dict]=None,
) -> Optional[Union["SparkXShards", "DataFrame", "np.ndarray", List["np.ndarray"]]]:
"""
Predict input data
:param batch_size: Int. Set batch Size, default is 4.
:param data: data to be predicted. XShards, Spark DataFrame, numpy array and list of numpy
arrays are supported. If data is XShards, each partition is a dictionary of {'x':
feature}, where feature(label) is a numpy array or a list of numpy arrays.
:param feature_cols: Feature column name(s) of data. Only used when data is a Spark
DataFrame. Default: None.
:param input_cols: Str or List of str. The model input list(order related). Users can
specify the input order using the `inputs` parameter. If inputs=None, The default
OpenVINO model input list will be used. Default: None.
:return: predicted result.
If the input data is XShards, the predict result is a XShards, each partition
of the XShards is a dictionary of {'prediction': result}, where the result is a
numpy array or a list of numpy arrays.
If the input data is numpy arrays or list of numpy arrays, the predict result is
a numpy array or a list of numpy arrays.
"""
sc = init_nncontext()
spark_version = sc.version
arrow_optimization = True if spark_version.startswith("3") else False
model_bytes_broadcast = sc.broadcast(self.model_bytes)
weight_bytes_broadcast = sc.broadcast(self.weight_bytes)
if input_cols:
if not isinstance(input_cols, list):
input_cols = [input_cols]
invalidInputError(set(input_cols) == set(self.inputs),
"The inputs names need to match the model inputs, the model inputs: "
+ ", ".join(self.inputs))
else:
input_cols = self.inputs
outputs = list(self.output_dict.keys())
invalidInputError(len(outputs) != 0, "The number of model outputs should not be 0.")
is_df = False
schema = None
if config is None:
config = {
'CPU_THREADS_NUM': str(self.core_num),
"CPU_BIND_THREAD": "HYBRID_AWARE"
}
print(config)
def partition_inference(partition):
model_bytes = model_bytes_broadcast.value
weight_bytes = weight_bytes_broadcast.value
ie = IECore()
ie.set_config(config, 'CPU')
net = ie.read_network(model=model_bytes,
weights=weight_bytes, init_from_buffer=True)
net.batch_size = batch_size
local_model = ie.load_network(network=net, device_name="CPU",
num_requests=1)
infer_request = local_model.requests[0]
def add_elem(d):
d_len = len(d)
if d_len < batch_size:
rep_time = [1] * (d_len - 1)
rep_time.append(batch_size - d_len + 1)
return np.repeat(d, rep_time, axis=0), d_len
else:
return d, d_len
def generate_output_row(batch_input_dict):
input_dict = dict()
for col, input in zip(feature_cols, input_cols):
value = batch_input_dict[col]
feature_type = schema_dict[col]
if isinstance(feature_type, df_types.FloatType):
input_dict[input], elem_num = add_elem(np.array(value).astype(np.float32))
elif isinstance(feature_type, df_types.IntegerType):
input_dict[input], elem_num = add_elem(np.array(value).astype(np.int32))
elif isinstance(feature_type, df_types.StringType):
input_dict[input], elem_num = add_elem(np.array(value).astype(str))
elif isinstance(feature_type, df_types.ArrayType):
if isinstance(feature_type.elementType, df_types.StringType):
input_dict[input], elem_num = add_elem(np.array(value).astype(str))
else:
input_dict[input], elem_num = add_elem(
np.array(value).astype(np.float32))
elif isinstance(value[0], DenseVector):
input_dict[input], elem_num = add_elem(value.values.astype(np.float32))
infer_request.infer(input_dict)
if len(outputs) == 1:
batch_pred = infer_request.output_blobs[outputs[0]].buffer[:elem_num]
if arrow_optimization:
temp_result_list = []
for p in batch_pred:
temp_result_list.append(
get_arrow_hex_str([[p.flatten()]], names=outputs))
else:
pred = [[np.expand_dims(output, axis=0).tolist()] for output in batch_pred]
else:
batch_pred = list(map(lambda output:
infer_request.output_blobs[output].buffer[:elem_num],
outputs))
if arrow_optimization:
temp_result_list = []
for i in range(elem_num):
single_r = []
for p in batch_pred:
single_r.append([p[i].flatten()])
temp_result_list.append(get_arrow_hex_str(single_r, names=outputs))
else:
pred = [list(np.expand_dims(output, axis=0).tolist()
for output in single_result)
for single_result in zip(*batch_pred)]
del batch_pred
if arrow_optimization:
return temp_result_list
else:
return pred
if not is_df:
for batch_data in partition:
input_dict = dict()
elem_num = 0
if isinstance(batch_data, list):
for i, input in enumerate(input_cols):
input_dict[input], elem_num = add_elem(batch_data[i])
else:
input_dict[input_cols[0]], elem_num = add_elem(batch_data)
infer_request.infer(input_dict)
if len(outputs) == 1:
pred = infer_request.output_blobs[outputs[0]].buffer[:elem_num]
else:
pred = list(map(lambda output:
infer_request.output_blobs[output].buffer[:elem_num],
outputs))
yield pred
else:
batch_dict = {col: [] for col in feature_cols}
batch_row = []
cnt = 0
schema_dict = {col: schema[col].dataType for col in feature_cols}
import pyspark.sql.types as df_types
from pyspark.ml.linalg import DenseVector
from bigdl.orca.learn.utils import get_arrow_hex_str
for row in partition:
cnt += 1
batch_row.append(row)
for col in feature_cols:
batch_dict[col].append(row[col])
if cnt >= batch_size:
pred = generate_output_row(batch_dict)
if arrow_optimization:
for p in pred:
yield p
else:
for r, p in zip(batch_row, pred):
row = Row(*([r[col] for col in r.__fields__] + p))
yield row
del pred
batch_dict = {col: [] for col in feature_cols}
batch_row = []
cnt = 0
if cnt > 0:
pred = generate_output_row(batch_dict)
if arrow_optimization:
for p in pred:
yield p
else:
for r, p in zip(batch_row, pred):
row = Row(*([r[col] for col in r.__fields__] + p))
yield row
del pred
del local_model
del net
def predict_transform(dict_data, batch_size):
invalidInputError(isinstance(dict_data, dict), "each shard should be an dict")
invalidInputError("x" in dict_data, "key x should in each shard")
feature_data = dict_data["x"]
if isinstance(feature_data, np.ndarray):
invalidInputError(feature_data.shape[0] <= batch_size,
"The batch size of input data (the second dim) should be less"
" than the model batch size, otherwise some inputs will"
" be ignored.")
elif isinstance(feature_data, list):
for elem in feature_data:
invalidInputError(isinstance(elem, np.ndarray),
"Each element in the x list should be a ndarray,"
" but get " + elem.__class__.__name__)
invalidInputError(elem.shape[0] <= batch_size,
"The batch size of each input data (the second dim) should"
" be less than the model batch size, otherwise some inputs"
" will be ignored.")
else:
invalidInputError(False,
"x in each shard should be a ndarray or a list of ndarray.")
return feature_data
def update_result_shard(data):
shard, y = data
shard["prediction"] = y
return shard
if isinstance(data, DataFrame):
is_df = True
schema = data.schema
result = data.rdd.mapPartitions(lambda iter: partition_inference(iter))
if arrow_optimization:
result_df = openvino_output_to_sdf(data, result, outputs,
list(self.output_dict.values()))
else:
from pyspark.sql.types import StructType, StructField, FloatType, ArrayType
print("Inference without arrow optimization. You can use pyspark=3.1.3 and "
"bigdl-orca-spark3 to activate the arrow optimization")
# Deal with types
result_struct = []
for key, shape in self.output_dict.items():
struct_type = FloatType()
for _ in range(len(shape)):
struct_type = ArrayType(struct_type) # type:ignore
result_struct.append(StructField(key, struct_type))
schema = StructType(schema.fields + result_struct)
result_df = result.toDF(schema)
return result_df
elif isinstance(data, SparkXShards):
transformed_data = data.transform_shard(predict_transform, batch_size)
result_rdd = transformed_data.rdd.mapPartitions(lambda iter: partition_inference(iter))
return SparkXShards(result_rdd)
elif isinstance(data, (np.ndarray, list)):
if isinstance(data, np.ndarray):
split_num = math.ceil(len(data)/batch_size)
arrays = np.array_split(data, split_num)
num_slices = min(split_num, self.node_num)
data_rdd = sc.parallelize(arrays, numSlices=num_slices)
elif isinstance(data, list):
flattened = nest.flatten(data)
data_length = len(flattened[0])
data_to_be_rdd = [] # type: ignore
split_num = math.ceil(flattened[0].shape[0]/batch_size)
num_slices = min(split_num, self.node_num)
for i in range(split_num):
data_to_be_rdd.append([])
for x in flattened:
invalidInputError(isinstance(x, np.ndarray),
"the data in the data list should be ndarrays,"
" but get " + x.__class__.__name__)
invalidInputError(len(x) == data_length,
"the ndarrays in data must all have the same"
" size in first dimension, got first ndarray"
" of size {} and another {}".format(data_length, len(x)))
x_parts = np.array_split(x, split_num)
for idx, x_part in enumerate(x_parts):
data_to_be_rdd[idx].append(x_part)
data_to_be_rdd = [nest.pack_sequence_as(data, shard) for shard in data_to_be_rdd]
data_rdd = sc.parallelize(data_to_be_rdd, numSlices=num_slices)
print("Partition number: ", data_rdd.getNumPartitions())
result_rdd = data_rdd.mapPartitions(lambda iter: partition_inference(iter))
result_arr_list = result_rdd.collect()
result_arr = None
if isinstance(result_arr_list[0], list):
result_arr = [np.concatenate([r[i] for r in result_arr_list], axis=0)
for i in range(len(result_arr_list[0]))]
elif isinstance(result_arr_list[0], np.ndarray):
result_arr = np.concatenate(result_arr_list, axis=0)
return result_arr
else:
invalidInputError(False,
"Only XShards, Spark DataFrame, a numpy array and a list of numpy"
" arrays are supported as input data, but"
" get " + data.__class__.__name__)
return None
[docs] def evaluate(self, data, batch_size=32, feature_cols=None, label_cols=None):
"""
Evaluate is not supported in OpenVINOEstimator
"""
invalidInputError(False, "not implemented")
[docs] def get_model(self):
"""
Get_model is not supported in OpenVINOEstimator
"""
invalidInputError(False, "not implemented")
[docs] def save(self, model_path: str):
"""
Save is not supported in OpenVINOEstimator
"""
invalidInputError(False, "not implemented")
[docs] def load(self, model_path: str) -> None:
"""
Load an openVINO model.
:param model_path: String. The file path to the OpenVINO IR xml file.
:return:
"""
self.node_num, self.core_num = get_node_and_core_number()
invalidInputError(isinstance(model_path, str), "The model_path should be string.")
invalidInputError(os.path.exists(model_path), "The model_path should be exist.")
with open(model_path, 'rb') as file:
self.model_bytes = file.read()
with open(model_path[:model_path.rindex(".")] + ".bin", 'rb') as file:
self.weight_bytes = file.read()
ie = IECore()
config = {'CPU_THREADS_NUM': str(self.core_num)}
ie.set_config(config, 'CPU')
net = ie.read_network(model=self.model_bytes,
weights=self.weight_bytes, init_from_buffer=True)
self.inputs = list(net.input_info.keys())
self.output_dict = {k: v.shape for k, v in net.outputs.items()}
del net
del ie
[docs] def set_tensorboard(self, log_dir: str, app_name: str):
"""
Set_tensorboard is not supported in OpenVINOEstimator
"""
invalidInputError(False, "not implemented")
[docs] def clear_gradient_clipping(self):
"""
Clear_gradient_clipping is not supported in OpenVINOEstimator
"""
invalidInputError(False, "not implemented")
[docs] def set_constant_gradient_clipping(self, min, max):
"""
Set_constant_gradient_clipping is not supported in OpenVINOEstimator
"""
invalidInputError(False, "not implemented")
[docs] def set_l2_norm_gradient_clipping(self, clip_norm):
"""
Set_l2_norm_gradient_clipping is not supported in OpenVINOEstimator
"""
invalidInputError(False, "not implemented")
[docs] def get_train_summary(self, tag=None):
"""
Get_train_summary is not supported in OpenVINOEstimator
"""
invalidInputError(False, "not implemented")
[docs] def get_validation_summary(self, tag=None):
"""
Get_validation_summary is not supported in OpenVINOEstimator
"""
invalidInputError(False, "not implemented")
[docs] def load_orca_checkpoint(self, path, version):
"""
Load_orca_checkpoint is not supported in OpenVINOEstimator
"""
invalidInputError(False, "not implemented")