Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Made combOp of aggregate() work as a reduce instead of an implicit fold #658

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

markhamstra
Copy link
Contributor

No description provided.

@AmplabJenkins
Copy link

Thank you for your pull request. An admin will review this request soon.

val cleanSeqOp = sc.clean(seqOp)
val cleanCombOp = sc.clean(combOp)
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
def optCombOp(a: Option[U], b: Option[U]): Option[U] = for (u <- b) yield a.fold(u)(cleanCombOp(_, _))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess

for (u <- b) yield a.fold(u)(combOp(_, _))

should be just as good, maybe better.

	- preserve sequential order within partitions
	- reformat code
  - no need to clone None for jobResult
  - use combOp instead of cleanCombOp
@markhamstra
Copy link
Contributor Author

A fun little demo aggregating an RDD[Char] into a String:

MASTER=local[4] ./spark-shell
.
.
.
scala> val l = sc.parallelize("foobarbazquux".toList, 4)
l: spark.RDD[Char] = ParallelCollectionRDD[0] at parallelize at <console>:12

scala> l.aggregate("_")((s, c) => s :+ c, (s1, s2) => s1 + s2)
.
.
.
res0: java.lang.String = _bar_quux_baz_foo

Run the aggregate multiple times, and you'll see different permutations of foo, bar, baz and quux, but always maintaining the ordering within each partition and with only one leading zeroValue ("_") where the old implementation would have two.

@mateiz
Copy link
Member

mateiz commented Jun 23, 2013

Mark, this seems inconsistent with Scala Collections' aggregate. When you call their aggregate on an empty list, you still get the zeroValue, whereas with this, wouldn't you get a NoSuchElementException?

Alternatively, if this does return the zeroValue because each partition's aggregate will pass that, isn't it the same as the current version?

@mateiz
Copy link
Member

mateiz commented Jun 23, 2013

Just to show what I mean:

scala> List(1,2,3).aggregate(new ArrayBuffer[Int])(_ += _, _ ++= _)
res1: scala.collection.mutable.ArrayBuffer[Int] = ArrayBuffer(1, 2, 3)

scala> List().aggregate(new ArrayBuffer[Int])(_ += _, _ ++= _)
res2: scala.collection.mutable.ArrayBuffer[Int] = ArrayBuffer()

@markhamstra
Copy link
Contributor Author

The zeroValue is not bound into combOp (which operation potentially needs a different zero than does seqOp), but the zeroElement is still part of the seqOp fold, so there won't be any NoSuchElementException. The results from my aggregate() using your example inputs are exactly the same as for Scala collections.

The difference is that the zeroElement doesn't become the initial element of an implicit fold in combOp. Instead, the first call of optCombOp (when jobResult is still None) effectively becomes the identity function -- i.e. the same as Option(zeroElementForCombOp combOp u) without us having to know or infer what the correct zero is for combOp. (If U couldn't be an AnyVal, then you could avoid using Option and do something similar by initializing jobResult to null and putting an if (jobResult == null) return taskResult else ... at the beginning of combOp(jobResult, taskResult).

For sane uses of aggregate, this shouldn't make any difference; but you can see differences in "creative" uses of aggregate. For example, sc.parallelize(List[Char](), 4).aggregate("*")(_ :+ _, _ + _) will result in **** for my aggregate, whereas the existing aggregate will produce ***** since an additional copy of the zeroElement is currently inserted into combOp. And, in the (_ + _ , _ *_) example that started off this whole discussion, the current implementation will wipe out the summed results by inserting 0 * _ at the beginning of the combOp fold-that-should-be-a-reduce.

@mateiz
Copy link
Member

mateiz commented Jun 23, 2013

But I don't understand why we'd want to support a combOp that's incompatible with the seqOp. Why should the user have to worry about the number of partitions? If you look at Scala's aggregate, it also has inconsistent behavior if you give + as one operation and * as the other:

scala> collection.parallel.ForkJoinTasks.defaultForkJoinPool.setParallelism(1)

scala> List(0,1,2).par.aggregate(0)(_ + _, _ * _)
res20: Int = 3

scala> collection.parallel.ForkJoinTasks.defaultForkJoinPool.setParallelism(2)

scala> List(0,1,2).par.aggregate(0)(_ + _, _ * _)
res22: Int = 0

I think this just means that you're not expected to use aggregate with such incompatible operations. More precisely, the operations passed should be such that they'll give the same result regardless what the partitions are. Maybe I'm missing something, but I don't think we should complicate the implementation to change the result in this arguably incorrect use case. If a user really wants to be aware of partitions, they can always use mapPartitions anyway.

@markhamstra
Copy link
Contributor Author

Yes, I'm not sure that there is a legitimate use case for seqOp and combOp that don't share the same zero. If we're willing to say that combOp must be commutative, associative and have the same zero as seqOp, then there is no harm in inserting an extra zeroElement into the combOp reduce, and this pull request is needless complication (although a documentation update would be in order.) On the other hand, if we don't want to be that restrictive on combOp and can see a use for incompatible operations when users have grasped control of the relevant details of partitioning (e.g. with a custom partitioner, coalesce or one of the operations that supports a numPartitions parameter), then we really should implement combOp as a reduce, as the docs claim.

I will agree that Scala does generate some really weird results when incompatible operations are supplied to aggregate over parallel collections. I haven't looked yet at how that aggregate is implemented, but it's not obvious to me what it is doing just from looking at various inputs and results.

@MLnick
Copy link
Contributor

MLnick commented Jun 24, 2013

My (intended) real point in the mailing list discussion was that its behaviour is somewhat inconsistent with its documentation. If it's behaviour is desired (and that is arguably the case), then perhaps the docs should simply be updated to indicate that the operations should be consistent (as Mark pointed out both associative and commutative) to avoid strange behaviour. If any user really wants to use two "inconsistent" operations they do have available mapPartitions and a reduce (or whatever).

@mateiz
Copy link
Member

mateiz commented Jun 24, 2013

Yeah, good point. In that case I'd vote to just update the docs for this but leave it the way it was. One other difference I noticed with fold vs. reduce is that fold will also work on RDDs with zero partitions (which can actually happen sometimes with HDFS files if you give an empty directory), while reduce won't.

@AmplabJenkins
Copy link

Thank you for your pull request. An admin will review this request soon.

xiajunluan pushed a commit to xiajunluan/spark that referenced this pull request May 30, 2014
I also removed a println that I bumped into.

Author: Michael Armbrust <michael@databricks.com>

Closes mesos#658 from marmbrus/nullPrimitives and squashes the following commits:

a3ec4f3 [Michael Armbrust] Remove println.
695606b [Michael Armbrust] Support for null primatives from using scala and java reflection.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants