Ray is an open source distributed framework for emerging AI applications. With the RayOnSpark support packaged in BigDL Orca, Users can seamlessly integrate Ray applications into the big data processing pipeline on the underlying Big Data cluster (such as Hadoop/YARN or K8s).

Note: BigDL has been tested on Ray 1.9.2 and you are highly recommended to use this tested version.

1. Install

We recommend using conda to prepare the Python environment. When installing bigdl-orca with pip, you can specify the extras key [ray] to install the additional dependencies for running Ray (i.e. ray==1.9.2, psutil, aiohttp==3.7.0, aioredis==1.1.0, setproctitle, hiredis==1.1.0, async-timeout==3.0.1):

conda create -n py37 python=3.7  # "py37" is conda environment name, you can use any name you like.
conda activate py37

pip install bigdl-orca[ray]

View Python User Guide and Orca User Guide for more installation instructions.

2. Initialize

We recommend using init_orca_context to initiate and run RayOnSpark on the underlying cluster. The Ray cluster would be launched by specifying init_ray_on_spark=True. For example, to launch Spark and Ray on standard Hadoop/YARN clusters in YARN client mode:

from bigdl.orca import init_orca_context

sc = init_orca_context(cluster_mode="yarn-client", cores=4, memory="10g", num_nodes=2, init_ray_on_spark=True)

By default, the Ray cluster would be launched using Spark barrier execution mode, you can turn it off via the configurations of OrcaContext:

from bigdl.orca import OrcaContext

OrcaContext.barrier_mode = False

View Orca Context for more details.

3. Run

  • After the initialization, you can directly run Ray applications on the underlying cluster. Ray tasks or actors would be launched across the cluster. The following code shows a simple example:

    import ray
    class Counter(object):
          def __init__(self):
              self.n = 0
          def increment(self):
              self.n += 1
              return self.n
    counters = [Counter.remote() for i in range(5)]
    print(ray.get([c.increment.remote() for c in counters]))
  • You can retrieve the information of the Ray cluster via OrcaContext:

    from bigdl.orca import OrcaContext
    ray_ctx = OrcaContext.get_ray_context()
    address_info = ray_ctx.address_info  # The dictionary information of the ray cluster, including node_ip_address, object_store_address, webui_url, etc.
    redis_address = ray_ctx.redis_address  # The redis address of the ray cluster.
  • You should call stop_orca_context() when your program finishes:

    from bigdl.orca import stop_orca_context

4. Known Issue

If you encounter the following error when launching Ray on the underlying cluster, especially when you are using a Spark standalone cluster:

This system supports the C.UTF-8 locale which is recommended. You might be able to resolve your issue by exporting the following environment variables:

    export LC_ALL=C.UTF-8
    export LANG=C.UTF-8

Add the environment variables when calling init_orca_context would resolve the issue:

sc = init_orca_context(cluster_mode, init_ray_on_spark=True, env={"LANG": "C.UTF-8", "LC_ALL": "C.UTF-8"})