Skip to content

Latest commit



255 lines (135 loc) · 9.15 KB

Parallelize your algorithm by Ray (1).md

File metadata and controls

255 lines (135 loc) · 9.15 KB




  1. 使用远程方程(任务) [ray.remote]
  2. 通过object IDs获取结果 [ray.put, ray.get, ray.wait]
  3. 使用远程类 (actors) [ray.remote]


使用该命令安装Ray:pip install -U ray


import ray

# Start Ray. If you're connecting to an existing cluster, you would use
# ray.init(address=<cluster-address>) instead.
  1. 使用远程方程(任务) [ray.remote]


# A regular Python function.
def regular_function():
    return 1

# A Ray remote function.
def remote_function():
    return 1
assert regular_function() == 1

object_id = remote_function.remote()

# The value of the original `regular_function`
assert ray.get(object_id) == 1

Parallelism: Invocations of regular_function happen serially, for example


# These happen serially.
for _ in range(4):

whereas invocations of remote_function happen in parallel, for example


# These happen in parallel.
for _ in range(4):

Oftentimes, you may want to specify a task’s resource requirements (for example one task may require a GPU). The ray.init() command will automatically detect the available GPUs and CPUs on the machine. However, you can override this default behavior by passing in specific resources, e.g.


ray.init(num_cpus=8, num_gpus=4, resources={'Custom': 2})

远程函数/类也可以设置资源需求量,像这样@ray.remote(num_cpus=2, num_gpus)


If you do not specify any resources in the @ray.remote decorator, the default is 1 CPU resource and no other resources.

远程函数执行后并不会直接返回结果,而是会立即返回一个object ID。远程函数会在后台并行处理,等执行得到最终结果后,可以通过返回的object ID取得这个结果。

ray.put(*value*)也会返回object ID

put操作将对象存入object store里,然后返回它的object ID。

Store an object in the object store. return: The object ID assigned to this value.

y = 1
object_id = ray.put(y)

通过object IDs获取结果 [ray.put, ray.get, ray.wait]


从object store获取远程对象或者一个列表的远程对象。

Get a remote object or a list of remote objects from the object store.

Then, if the object is a numpy array or a collection of numpy arrays, the get call is zero-copy and returns arrays backed by shared object store memory. Otherwise, we deserialize the object data into a Python object.

This method blocks until the object corresponding to the object ID is available in the local object store.

需要注意的是,使用get方法时会锁,直到要取得的对象在本地的object store里可用。

调用remote操作是异步的,他们会返回object IDs而不是结果。想要得到真的的结果我们需要使用ray.get()。

我们之前写的这段语句,实际上results是一个由object IDs组成的列表。

results = [do_some_work.remote(x) for x in range(4)]

如果改为下面,ray.get()将通过object ID取得真实的结果。

results = [ray.get(do_some_work.remote(x)) for x in range(4)]



results = ray.get([do_some_work.remote(x) for x in range(4)])


Recall that remote operations are asynchronous and they return futures (i.e., object IDs) instead of the results themselves.To get the actual results, we need to use ray.get(), and here the first instinct is to just call ray.get() on the remote operation invocation i.e., replace line “results = [do_some_work.remote(x) for x in range(4)]” with: results = [ray.get(do_some_work.remote(x)) for x in range(4)]

The observant reader will already have the answer: ray.get() is blocking, so calling it after each remote operation means that we wait for that operation to complete, which essentially means that we execute one operation at a time, hence no parallelism!

To enable parallelism, we need to call ray.get() after invoking all tasks. We can easily do so in our example by replacing line “results = [do_some_work.remote(x) for x in range(4)]” with:

results = ray.get([do_some_work.remote(x) for x in range(4)])

always keep in mind that ray.get() is a blocking operation, and thus if called eagerly it can hurt the parallelism. Instead, you should try to write your program such that ray.get() is called as late as possible.

Tip 1: Delay calling ray.get() as much as possible.



remote classes (Actors)


Actors extend the Ray API from functions (tasks) to classes. The ray.remote decorator indicates that instances of the Counter class will be actors. An actor is essentially a stateful worker. Each actor runs in its own Python process.

class Counter(object):
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
        return self.value

You can specify resource requirements in Actors too (see the Actors section for more details.)


@ray.remote(num_cpus=2, num_gpus=0.5)
class Actor(object):

We can interact with the actor by calling its methods with the .remote operator. We can then call ray.get on the object ID to retrieve the actual value.


obj_id = a1.increment.remote()
ray.get(obj_id) == 1

Actor handles can be passed into other tasks. To illustrate this with a simple example, consider a simple actor definition.



The @ray.remote decorator defines a service. It takes the ParameterServer class and allows it to be instantiated as a remote service or actor.

Sharding Across Multiple Parameter Servers: When your parameters are large and your cluster is large, a single parameter server may not suffice because the application could be bottlenecked by the network bandwidth into and out of the machine that the parameter server is on (especially if there are many workers).

当你的参数特别大,而且你的集群也很大,一个parameter server可能就不够了。特别是有很多worker的时候,因为向一个parameter server的数据传输就会成为瓶颈。

简单的解决办法就是把参数分散在多个parameter server上。可以通过创建多个actor来实现。

A natural solution in this case is to shard the parameters across multiple parameter servers. This can be achieved by simply starting up multiple parameter server actors. An example of how to do this is shown in the code example at the bottom.


当需要重复向不同远程任务传入相同对象时,可以先用ray.put()把类存入object store,然后传入它的object id。

Tip 2: For exploiting Ray’s parallelism, remote tasks should take at least several milliseconds.

Tip 3: When passing the same object repeatedly as an argument to a remote operation, use ray.put() to store it once in the object store and then pass its ID.

Tip 4: Use ray.wait() to process results as soon as they become available.