Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidations of shuffle files from different map tasks #635

Closed
wants to merge 2 commits into from
Closed

Consolidations of shuffle files from different map tasks #635

wants to merge 2 commits into from

Conversation

jason-dai
Copy link

A Spark job can have many small tasks, which in turn generates many small shuffle files - one of the major shuffle performance issues. This change combines multiple such files (for the same reducer) into one single file as follows (using the shuffle manager introduced by Reynold's #587):

  1. On each slave, the shuffle manager maintains a shuffle block pool for each shuffle

  2. Each pool maintains a list of shuffle block group; a ShuffleMapTask acquires a free group when it needs to write its results, and returns the group when it's done

  3. Each group maintains a list of writers, each for a different bucket (reduce partition)

@AmplabJenkins
Copy link

Thank you for your pull request. An admin will review this request soon.

@jason-dai
Copy link
Author

The patch passes tests on my side following the normal path (i.e., no failures). I also introduced some random failures (i.e., throwing random exceptions during writing ShuffleMapTask's results), which helped catch a few bugs in the failure handling path. I think we need to perform more extensive failure testing - not sure how this fits into the existing test framework though.

@rxin
Copy link
Member

rxin commented May 31, 2013

Jenkins, test this please.

@rxin
Copy link
Member

rxin commented May 31, 2013

Ah it's 1:30am. I am going to bed now and will take a look at this tomorrow. Thanks for doing this.

@rxin
Copy link
Member

rxin commented May 31, 2013

I haven't looked at your changes yet, but can you comment on the impact on fetch parallelism?

@jason-dai
Copy link
Author

For netty fetcher, there are no changes – its parallelism is controlled by the number of copiers. For basic fetcher, its parallelism is controlled by MaxBytesInFlight, and in this case its parallelism will be lower.

@AmplabJenkins
Copy link

Thank you for your pull request. All automated tests for this request have passed.
Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82/

@@ -69,12 +69,14 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
} else if (mapSideCombine) {
val mapSideCombined = self.mapPartitions(aggregator.combineValuesByKey(_), true)
val partitioned = new ShuffledRDD[K, C](mapSideCombined, partitioner, serializerClass)
logInfo("serializerClass=" + serializerClass)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe change this to logDebug

@rxin
Copy link
Member

rxin commented May 31, 2013

Jason,

I made some comments. Most of them are minor style things, but there are two things that stood out:

  1. Does revertPartialWrites delete pre-existing file?
  2. Add a comment explaining how writers are closed in ShuffleBlockManager.

Also, one other thing I didn't point out: For if statements, just wrap all of them with { } unless it is an one-liner of if/else.

@@ -382,6 +383,8 @@ private[spark] class BlockManager(
// As an optimization for map output fetches, if the block is for a shuffle, return it
// without acquiring a lock; the disk store never deletes (recent) items so this should work
if (ShuffleBlockManager.isShuffle(blockId)) {
//close the shuffle Writers for blockId
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do shuflfe blocks still walk through this code path? If no, I would just throw an exception if it is a shuffle block here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The remote shuffle blocks still go here

@rxin
Copy link
Member

rxin commented Jun 17, 2013

Hi Jason - any update on the failure paths?

@jason-dai
Copy link
Author

Sorry I was held up by something and this got delayed. I have not thought through all the failure scenarios, but here are some initial thoughts:

(1) Generally speaking, reverting by flushing and truncating can not be guaranteed to be correct when the encoder (compression or serialization) is stateful - e.g., delta encoding. A simple way to address this is to treat each shuffle block as a complete encoded file, and concatenating block files into a larger shuffle files.

(2) The previous failure handling is simple - the location of the map output is registered at the MapOutputTracker if the map task is successful; if anything goes wrong, it will not be registered (and consequently duplicate or invalid shuffle file will never be fetched). Things get complicated when merging multiple shuffle blocks into a larger file. Say, if there are two speculative map tasks complete at the same time, both their results will be written into (different) shuffle files successfully; it will be very complex to revert one after the map task is finished. It is much simpler if we can still rely on MapOutputTracker as the single source of truth for the map output locations.

Therefore, I would propose that the shuffle file be in the form of: {(map1, map1 output block), (map2, map2 output block), ..., (map1 size, map2 size, ...)}
We can still revert by truncating - but this will be best efforts; the MapOutputTracker will maintain the map output locations, which will be used to skip duplicate or invalid blocks when deserialization.

@rxin
Copy link
Member

rxin commented Jul 12, 2013

Closing this one because it is subsumed by #669

@rxin rxin closed this Jul 12, 2013
pwendell pushed a commit to andyk/mesos-spark that referenced this pull request May 5, 2014
…n) as it's used in ReplSuite, and return to use lang3 utility in Utils.scala

For consideration. This was proposed in related discussion: apache/spark#569

Author: Sean Owen <sowen@cloudera.com>

Closes mesos#635 from srowen/SPARK-1629.2 and squashes the following commits:

a442b98 [Sean Owen] Depend on commons lang3 (already used by tachyon) as it's used in ReplSuite, and return to use lang3 utility in Utils.scala
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants