New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support sortedTake
in beam runner
#1949
Conversation
We flatten all the input PriorityQueues and construct a new PQ using the monoid provided. TESTS: Updated failing unit test
@@ -59,6 +61,25 @@ object BeamOp extends Serializable { | |||
)(implicit ordK: Ordering[K], kryoCoder: KryoCoder): PCollection[KV[K, java.lang.Iterable[U]]] = { | |||
reduceFn match { | |||
case ComposedMapGroup(f, g) => planMapGroup(planMapGroup(pcoll, f), g) | |||
case EmptyGuard(MapValueStream(SumAll(pqm: PriorityQueueMonoid[V]))) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this should either compile correctly. I probably wrote some bad code in my example. The SumAll
type is TraversableOnce[A] => Iterator[A]
where in this case A
= V
. So when you pattern match on PriorityQueueMonoid[T]
that means V = PriorityQueue[T]
, so, it shouldn't be PriorityQueueMonoid[V]
. A little known fact is that you can use a lower case to bind an unknown type: pqm: PriorityQueueMonoid[v]
and then use v
lower down.
// We are not using plus method defined in PriorityQueueMonoid as it is mutating | ||
// input Priority Queues. We create a new PQ from the individual ones. | ||
// We didn't use Top PTransformation in beam as it is not needed, also | ||
// we cannot access `max` defined in PQ monoid. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not extract the comparator: val cmp = pqm.zero.comparator()
and then use Top.of as @tlazaro suggested here:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Top requires max
no of elements in its constructor. But we cannot access it from the monoid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ahh... man that PriorityQueueMonoid is really bad. :(
ahh, but there is a way out!
you could subclass it! class ScaldingPriorityQueueMonoid[K](val count: Int)(implicit val ordering: Ordering[K]) extends PriorityQueueMonoid(count)(ordering)
and then you can access the count and the ordering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made this quick PR: twitter/algebird#1008
// input Priority Queues. We create a new PQ from the individual ones. | ||
// We didn't use Top PTransformation in beam as it is not needed, also | ||
// we cannot access `max` defined in PQ monoid. | ||
val flattenedValues = input.getValue.asScala.flatMap { value => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is going to materialize everything in memory. I think you should do input.getValue.asScala.toStream.flatMap
... to make sure this is a lazy Iterable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
input.getValue.asScala
gives an Iterator only and flatmap
also works lazily. Which call would materialize it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getValue
returns the Iterable[_]
no? so, asScala on that is going to give you an Iterable
if I'm not mistaken. You could call .iterator
on that before you call .asScala and that will certainly be lazy, which is better than the toStream
since we would rather not materialize the whole set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually... build
requires an Iterable
and since flattenedValues
is a val
it will pin things in memory.
To take this suggestion, I think you need @inline def flattenedValues = input.getValue.asScala.toStream.flatMap...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I understand all the pieces but would really appreciate if you could clarify how to make it properly lazy.
In the first part are you referring to Iterable
keeping references to all elements, as opposed to Iterator
which wouldn't?
Then when you mention using inline, is it to avoid a closure over that variable keeping the reference alive for longer or landing in more expensive generation? Or more about escape analysis or similar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In scala, most Iterable methods (like flatMap) are not lazy and will materialize the entire result in many cases (basically the only exception is Stream or LazyList in scala 2.13).
An Iterator generally drops references to the items it has iterated past, although you could imagine making one that doesn't. Consider a List iterator:
class ListIterator[A](lst: List[A]) extends Iterator[A] {
private[this] var current: List[A] = lst
def hasNext: Boolean = current.nonEmpty
def next(): A = {
val result = current.head
current = current.tail
result
}
}
The way the scala compiler works, lst
is only referenced by the constructor, so you won't keep a reference to that beyond the constructor which just uses it to initialize.
Finally, the @inline
is just a hint, but the def
means there is nothing keeping the reference around so when the GC runs it could colllect anything not pointed to by the Stream at that moment.
BUT, all of this is a bit moot because looking inside PriorityQueueMonoid
we see that build
traverses the Iterable twice... so if it is a large Stream
you will materialize the whole thing...
I think the best idea would be just subclass PriorityQueueMonoid so you can access the max size and the ordering, and use that call Top.of.
pcoll.apply(MapElements.via( | ||
new SimpleFunction[KV[K, java.lang.Iterable[V]], KV[K, java.lang.Iterable[U]]]() { | ||
override def apply(input: KV[K, lang.Iterable[V]]): KV[K, java.lang.Iterable[U]] = { | ||
// We are not using plus method defined in PriorityQueueMonoid as it is mutating |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you also need to match on:
scalding/scalding-beam/src/main/scala/com/twitter/scalding/beam_backend/BeamOp.scala
Line 271 in e198811
def mapSideAggregator( |
since I guess this restriction is also enforced on the mapside operations...
You'll need a slightly different approach there (basically a custom cache).
But maybe they don't do their mutation detection on the mappers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tried with a unit test and it failed because of mutation. Instead of overwriting the put
method in the SummingCache
I was hoping to pattern match on Semigroup and overwrite the plus method.
case class ImmutablePQMonoid[T](
size: Int
)(implicit ord: Ordering[T]) extends PriorityQueueMonoid[T](size) {
override def plus(left: PriorityQueue[T], right: PriorityQueue[T]): PriorityQueue[T] = {
super.build(left.iterator().asScala.toIterable ++ right.iterator().asScala.toIterable)
}
}
But the same issue is there. I cannot access the size field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe this?
case class ImmutablePQMonoid[T](
pqm: PriorityQueueMonoid[T]
)(implicit ord: Ordering[T]) extends Monoid[PriorityQueue[T]] {
override def zero: PriorityQueue[T] = pqm.zero
override def plus(
l: PriorityQueue[T],
r: PriorityQueue[T]
): PriorityQueue[T] = ???
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not just pattern match on this monoid and not use it at all? Like, as a short fix, you could just make sumByLocalKeys a no-op for this particular monoid in the short term, but for a better fix you could implement a summingcache for this particular monoid.
I think making a full copy of the priority queue each time will be a perf killer and I doubt it will be better than just not using a mapside cache at all...
The more we talk about this, the more it seems like maybe copying the immutable heap in from cats-collections is the right move (or send a PR to algebird and we can publish a new version and use it from there, either way...)
Seems like we keep getting paper cuts here.
.groupAll | ||
.sortedReverseTake(3), | ||
Seq(5, 4, 3) | ||
test("sortedTake"){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note that bufferedTake
also uses the problematic monoid internally. Worth testing:
implicit val mon: PriorityQueueMonoid[V1] = new PriorityQueueMonoid[V1](n)(fakeOrdering) |
Good lesson here: mutation, NOT EVEN ONCE! |
Added ScaldingPriorityQueueMonoid which exposes count which we later use in TopCombineFn. Added unit test for bufferedTake Disabled map side aggregation when using ScaldingPriorityQueueMonoid
scalding-beam/src/main/scala/com/twitter/scalding/beam_backend/BeamOp.scala
Show resolved
Hide resolved
scalding-beam/src/main/scala/com/twitter/scalding/beam_backend/BeamOp.scala
Show resolved
Hide resolved
TypedPipe | ||
.from(1 to 50) | ||
.groupAll | ||
.bufferedTake(100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a note: this is going to be really bad in a real job without map-side aggregation: the key is Unit
so there is only one key, so this would have each mapper send 100, then have the reducers pick 100 of those.
But with no mapside aggregation, all the data will be sent to the reducers, and they will throw away all but 100.
But we can add an issue and come back and address this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opened a ticket for this.
#1952
scalding-beam/src/main/scala/com/twitter/scalding/beam_backend/BeamOp.scala
Outdated
Show resolved
Hide resolved
6279624
to
49af384
Compare
I'm fine with landing this, how do we usually do it? 'Squash and merge' ? |
yes, squash and merge if you think it is ready. Looks good to me too. |
We flatten all the input PriorityQueues and construct a new PQ using the monoid provided.
TESTS: Updated failing unit test