Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compression codec interface and Snappy support to reduce buffer size to improve scalability #685

Closed
wants to merge 2 commits into from

Conversation

lyogavin
Copy link
Contributor

@lyogavin lyogavin commented Jul 8, 2013

The biggest scalability issue we observed in our tests is: in order to scale to bigger data size, we need to have more reduce splits, so when we want to scale to massive data, we end up with having too many reduce splits. Similar to what's stated in https://spark-project.atlassian.net/browse/SPARK-751. We believe #669 is very important.

While even with #669 we would still have many reduce splits, like in our case, processing 10 TB data needs 1 million reduce splits. One of the scalability bottleneck of this is, when we have many splits, there would be the same number of open writers as the number of splits. For each writer there's 1 compression stream and 1 buffer stream, which both contain some buffer internally. The current LZF compression library has 64K fixed buffer, which would cause OOM easily when there are too many reduce splits. (https://github.com/ning/compress/blob/master/src/main/java/com/ning/compress/lzf/LZFOutputStream.java)

In this change, compression codec interface is introduced with Snappy compression which support customization of buffer size. (We can also rewrite the output stream implementation of LZF to support configurable buffer size, while snappy already supports this so might be more straightforward to use snappy.) LZF ans Snappy can be configured from system properties. Other compression codec implementation can be defined in driver program as well.

We did some tests, Snappy is slightly faster and has slightly better compression rate than LZF when using the same buffer size. We also tested compression rate of Snappy in different buffer size:

15496 LZF (64k buffer)
15434 snappy (64k buffer)
16594 snappy (32k buffer)
18028 snappy (16k buffer)

So we can reduce the memory footprint by 4x with a slightly worse compression rate.

@AmplabJenkins
Copy link

Thank you for your pull request. An admin will review this request soon.

@@ -162,7 +162,8 @@ object SparkBuild extends Build {
"cc.spray" % "spray-json_2.9.2" % "1.1.1" excludeAll(excludeNetty),
"org.apache.mesos" % "mesos" % "0.9.0-incubating",
"io.netty" % "netty-all" % "4.0.0.Beta2",
"org.apache.derby" % "derby" % "10.4.2.0" % "test"
"org.apache.derby" % "derby" % "10.4.2.0" % "test",
"org.xerial.snappy" % "snappy-java" % "1.0.5"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, also update maven build.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. I will add it.

@rxin
Copy link
Member

rxin commented Jul 8, 2013

Hi Gavin,

Thanks for submitting this.

Is there any particular reason you initialize the codec in wrapForCompression instead of during the class BlockManager construction?

@rxin
Copy link
Member

rxin commented Jul 8, 2013

Jenkins, test this please.

Whitelist this person.

@lyogavin
Copy link
Contributor Author

lyogavin commented Jul 8, 2013

Hi Reynold,

Yes. we want to support the user customized compression codec implementation. So we need to initialize it after users' jars are added to class loader in Executor.updateDependencies(), in the construction of BlockManager(in createFromSystemProperties()) that hasn't happened yet.

@@ -902,8 +904,15 @@ private[spark] class BlockManager(
* Wrap an output stream for compression if block compression is enabled for its block type
*/
def wrapForCompression(blockId: String, s: OutputStream): OutputStream = {
if (compressionCodec == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Gavin,

As we discussed in person a while ago, you can move the initialization of the compressionCodec to the place it was declared using "lazy val" in Scala.

@rxin rxin mentioned this pull request Jul 31, 2013
@rxin
Copy link
Member

rxin commented Jul 31, 2013

Hi @lyogavin - I took over the change and submitted a new one based on yours in #754

Thanks a lot for investigating the memory overhead and designing this extension.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants