Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compression in reduce side combine #686

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
86 changes: 85 additions & 1 deletion core/src/main/scala/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@ import java.util.{HashMap => JHashMap}

import scala.collection.JavaConversions._

import serializer.{SerializerInstance, DeserializationStream, SerializationStream}

import scala.collection.mutable.{LinkedList, ArrayBuffer}
import spark.Logging
import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
import org.xerial.snappy.Snappy
import java.io.ByteArrayOutputStream
import java.io.ByteArrayInputStream

/** A set of functions used to aggregate data.
*
* @param createCombiner function to create the initial value of the aggregation.
Expand All @@ -13,7 +22,8 @@ import scala.collection.JavaConversions._
case class Aggregator[K, V, C] (
val createCombiner: V => C,
val mergeValue: (C, V) => C,
val mergeCombiners: (C, C) => C) {
val mergeCombiners: (C, C) => C)
extends Logging {

def combineValuesByKey(iter: Iterator[(K, V)]) : Iterator[(K, C)] = {
val combiners = new JHashMap[K, C]
Expand All @@ -40,5 +50,79 @@ case class Aggregator[K, V, C] (
}
combiners.iterator
}

abstract class CompressionStreamsIterator[+A] extends Iterator[A]

def combineValuesByKeyInCompression(iter: Iterator[(K, V)]) : Iterator[(K, C)] = {
val partitionCount = System.getProperty("spark.reduce.side.combine.compression.partition", "11").toInt
logDebug("combine compression enabled, partition count:"+partitionCount)
val indexArray = Array.fill(partitionCount){new JHashMap[K,LinkedList[Int]]()}
val byteArrayStreams = Array.fill(partitionCount){new ByteArrayOutputStream()}
val serializeStreams = Array.tabulate(partitionCount)(i => SparkEnv.get.serializer.
newInstance().serializeStream(new SnappyOutputStream(byteArrayStreams(i))))
var data: Array[Object] = null
var counts = Array.fill(partitionCount){-1}

for ((k, v) <- iter) {
var keyHash = k.hashCode % partitionCount
if (keyHash < 0) {
keyHash = keyHash + partitionCount
}
serializeStreams(keyHash).writeObject(v)

val list = indexArray(keyHash).get(k)
counts(keyHash) = counts(keyHash) + 1

if (list == null) {
indexArray(keyHash).put(k, LinkedList(counts(keyHash)))
} else {
list.append(LinkedList(counts(keyHash)))
}
}

serializeStreams.foreach(_.close())

for (i <- 0 to partitionCount - 1)
{
logDebug("Compression partition [" + i + "], size:" +
byteArrayStreams(i).size() + ", objects count:" +
counts(i))
}

new CompressionStreamsIterator[(K,C)] {
private var uncompressedData: ArrayBuffer[Any] = null
private var cur: Iterator[(K,LinkedList[Int])] = Iterator.empty
private var curIdx: Int = -1
def hasNext: Boolean =
cur.hasNext || curIdx < partitionCount - 1 && {
curIdx = curIdx + 1
if (counts(curIdx) == -1) {
hasNext
} else {
uncompressedData = new ArrayBuffer[Any](counts(curIdx) + 1)
SparkEnv.get.serializer.newInstance().deserializeStream(new SnappyInputStream(
new ByteArrayInputStream(byteArrayStreams(curIdx).toByteArray()))).asIterator.
foreach(uncompressedData += _)
cur = indexArray(curIdx).toIterator
hasNext
}
}
def next(): (K,C) =
if (hasNext) {
val (curKey, curValue) = cur.next()
indexArray(curIdx).put(curKey, null)

val combiner = createCombiner(uncompressedData(curValue(0)).asInstanceOf[V])
curValue.slice(1, curValue.length).foreach(
(i: Int) => {
mergeValue(combiner, uncompressedData(i).asInstanceOf[V])
uncompressedData(i) = null
})
(curKey, combiner)
} else {
Iterator.empty.next()
}
}
}
}

6 changes: 5 additions & 1 deletion core/src/main/scala/spark/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
// A sanity check to make sure mergeCombiners is not defined.
assert(mergeCombiners == null)
val values = new ShuffledRDD[K, V](self, partitioner, serializerClass)
values.mapPartitions(aggregator.combineValuesByKey(_), true)
val reduceSideCompression =
System.getProperty("spark.reduce.side.combine.compression","false") == "true"
values.mapPartitions(
if (reduceSideCompression) aggregator.combineValuesByKeyInCompression(_)
else aggregator.combineValuesByKey(_), true)
}
}

Expand Down
1 change: 1 addition & 0 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ object SparkBuild extends Build {
"cc.spray" % "spray-json_2.9.2" % "1.1.1" excludeAll(excludeNetty),
"org.apache.mesos" % "mesos" % "0.9.0-incubating",
"io.netty" % "netty-all" % "4.0.0.Beta2",
"org.xerial.snappy" % "snappy-java" % "1.0.5",
"org.apache.derby" % "derby" % "10.4.2.0" % "test"
) ++ (
if (HADOOP_MAJOR_VERSION == "2") {
Expand Down