Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Proposal] Apply operand closure cleanup #3193

Open
chaokunyang opened this issue Jul 20, 2022 · 5 comments
Open

[Proposal] Apply operand closure cleanup #3193

chaokunyang opened this issue Jul 20, 2022 · 5 comments

Comments

@chaokunyang
Copy link
Contributor

chaokunyang commented Jul 20, 2022

For function calls such as DataFrame.apply, DataFrameGroupby.apply, Series.apply, etc., the user will pass in a custom function, i.e. UDF. Mars will serialize the custom function multiple times during graph building, scheduling and execution. If the custom function captures a large amount of data, such as pandas DataFrame internally, it will cause bottlenecks in scheduling and execution:

  • In the tile phase, the udf will be serialized when the key is generated for each operator
  • In the tile phase, the udf is serialized when the key is generated for each chunk
  • In the stage of generating SubtaskGraph, the udf will be serialized when the logic key is generated for each chunk
  • When submitting each Subtask to the Worker main pool, the udf will be serialized for each Subtask
  • When receiving Subtask in Worker main pool, it will deserialize the UDF
  • When the Worker main pool submits each Subtask to the Worker sub pool, the udf will be serialized for each Subtask
  • When the Worker sub pool receives the Subtask, the UDF will be deserialized

Although the serialization of udf functions can be cached by custom Mars serialization (currently cached when calculating chunk key/logic key in our inner codebase), if there is a large amount of data in udf, the result of serialization of a single Subtask may have Hundreds of megabytes, repeatedly sending these data to the Worker main pool and Sub pool during Subtask scheduling, which cause Supervisor bottlenecks and scheduling delays.
On the other hand, if a single Subtask object is too large, it will also lead to a large amount of lilneage storage overhead, resulting in task failure since lilneage are evicited.
Based on this, we propose a function closure cleaner at the Mars operator level, clean up the function closure in the tile phase, serialize the closure result into Storage, and then the operator only holds the reference of the closure stored in the storage. Before the operator is executed, the closure object is obtained from Storage, the udf is restored, and then the calculation is performed.

Proposal

Function closure cleanup

The function closure object is in the closure attribute of the function. We can get the closure first, then put it into the storage, and then get the specific closure object from the storage to restore it when executing. The pseudo code is as follows:

@classmethod
def tile(cls, op):
    closure = getattr(func, "__closure__")
    closure_ref = storage_api.put(closure)
    self.closure_ref = closure_ref
    if closure is not None:
        closure.__closure__ = None
        
@classmethod
def execute(cls, ctx, op):
    if self.closure_ref is not None:
        closure =  storage_api.get(self.closure_ref)
        func.__closure__ = closure
     # execute operand

Note: If the function closure is relatively small, it can be directly inlined in the code.

Callable cleanup

The UDF passed in by the user may not be a function, but a subclass that implements the callable method. In this case, the captured object cannot be obtained through the closure attribute. In this case special clipping of callable object is required. If the object implements python's reduce or getstate method, further processing is required

@chaokunyang chaokunyang changed the title [Operand] Apply operand closure cleanup [Proposal] Apply operand closure cleanup Jul 20, 2022
@fyrestone
Copy link
Contributor

fyrestone commented Aug 4, 2022

The function can't be executed correctly if the corresponding closure is missing. So, Why not put the entire UDF to the storage ?

My suggestion about this proposal:

  1. Put the entire UDF (not only the closure) to the storage.
  2. Only do this if the UDF is very large.
  3. Provide an option for users to disable this feature.
  4. Abstract an API instead of use StorageAPI directly because some execution backends may not able to access the StorageAPI.
  5. Manage the UDF storage object by lifecycle.

@vcfgv
Copy link
Contributor

vcfgv commented Aug 23, 2022

This comment is used to track relative developments.

@vcfgv
Copy link
Contributor

vcfgv commented Sep 13, 2022

After discussions, storage service on supervisor will remain empty. As a result, some refinements on #3205 will come up soon.

@vcfgv
Copy link
Contributor

vcfgv commented Oct 18, 2022

This comment is used to track further developments.

  • Under ray backend, UDF could be cleaned up when generating TileableGraph to avoid expenses of serialization and deserialization between client and supervisor.
  • Size of UDF could be calculated by the size of serialized UDF.

@vcfgv
Copy link
Contributor

vcfgv commented Oct 26, 2022

This comment is used to track further directions

  • Operand may take large-scale arguments as op.args which are out of expectation but need cleanup as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants