Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support sortedTake in beam runner #1949

Merged
merged 3 commits into from Sep 26, 2021

Conversation

nownikhil
Copy link
Contributor

We flatten all the input PriorityQueues and construct a new PQ using the monoid provided.
TESTS: Updated failing unit test

We flatten all the input PriorityQueues and construct a new PQ using the monoid provided.
TESTS: Updated failing unit test
@@ -59,6 +61,25 @@ object BeamOp extends Serializable {
)(implicit ordK: Ordering[K], kryoCoder: KryoCoder): PCollection[KV[K, java.lang.Iterable[U]]] = {
reduceFn match {
case ComposedMapGroup(f, g) => planMapGroup(planMapGroup(pcoll, f), g)
case EmptyGuard(MapValueStream(SumAll(pqm: PriorityQueueMonoid[V]))) =>
Copy link
Collaborator

@johnynek johnynek Sep 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should either compile correctly. I probably wrote some bad code in my example. The SumAll type is TraversableOnce[A] => Iterator[A] where in this case A = V. So when you pattern match on PriorityQueueMonoid[T] that means V = PriorityQueue[T], so, it shouldn't be PriorityQueueMonoid[V]. A little known fact is that you can use a lower case to bind an unknown type: pqm: PriorityQueueMonoid[v] and then use v lower down.

// We are not using plus method defined in PriorityQueueMonoid as it is mutating
// input Priority Queues. We create a new PQ from the individual ones.
// We didn't use Top PTransformation in beam as it is not needed, also
// we cannot access `max` defined in PQ monoid.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not extract the comparator: val cmp = pqm.zero.comparator() and then use Top.of as @tlazaro suggested here:

#1947 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Top requires max no of elements in its constructor. But we cannot access it from the monoid.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh... man that PriorityQueueMonoid is really bad. :(

ahh, but there is a way out!

https://github.com/twitter/algebird/blob/0c45f9395020ceecd28899fe156b7e3c92b29814/algebird-core/src/main/scala/com/twitter/algebird/mutable/PriorityQueueMonoid.scala#L27

you could subclass it! class ScaldingPriorityQueueMonoid[K](val count: Int)(implicit val ordering: Ordering[K]) extends PriorityQueueMonoid(count)(ordering) and then you can access the count and the ordering.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this quick PR: twitter/algebird#1008

// input Priority Queues. We create a new PQ from the individual ones.
// We didn't use Top PTransformation in beam as it is not needed, also
// we cannot access `max` defined in PQ monoid.
val flattenedValues = input.getValue.asScala.flatMap { value =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is going to materialize everything in memory. I think you should do input.getValue.asScala.toStream.flatMap... to make sure this is a lazy Iterable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

input.getValue.asScala gives an Iterator only and flatmap also works lazily. Which call would materialize it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getValue returns the Iterable[_] no? so, asScala on that is going to give you an Iterable if I'm not mistaken. You could call .iterator on that before you call .asScala and that will certainly be lazy, which is better than the toStream since we would rather not materialize the whole set.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually... build requires an Iterable and since flattenedValues is a val it will pin things in memory.

To take this suggestion, I think you need @inline def flattenedValues = input.getValue.asScala.toStream.flatMap...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand all the pieces but would really appreciate if you could clarify how to make it properly lazy.

In the first part are you referring to Iterable keeping references to all elements, as opposed to Iterator which wouldn't?

Then when you mention using inline, is it to avoid a closure over that variable keeping the reference alive for longer or landing in more expensive generation? Or more about escape analysis or similar?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In scala, most Iterable methods (like flatMap) are not lazy and will materialize the entire result in many cases (basically the only exception is Stream or LazyList in scala 2.13).

An Iterator generally drops references to the items it has iterated past, although you could imagine making one that doesn't. Consider a List iterator:

class ListIterator[A](lst: List[A]) extends Iterator[A] {
  private[this] var current: List[A] = lst
  def hasNext: Boolean = current.nonEmpty
  def next(): A = {
    val result = current.head
    current = current.tail
    result
  }
}

The way the scala compiler works, lst is only referenced by the constructor, so you won't keep a reference to that beyond the constructor which just uses it to initialize.

Finally, the @inline is just a hint, but the def means there is nothing keeping the reference around so when the GC runs it could colllect anything not pointed to by the Stream at that moment.

BUT, all of this is a bit moot because looking inside PriorityQueueMonoid we see that build traverses the Iterable twice... so if it is a large Stream you will materialize the whole thing...

I think the best idea would be just subclass PriorityQueueMonoid so you can access the max size and the ordering, and use that call Top.of.

pcoll.apply(MapElements.via(
new SimpleFunction[KV[K, java.lang.Iterable[V]], KV[K, java.lang.Iterable[U]]]() {
override def apply(input: KV[K, lang.Iterable[V]]): KV[K, java.lang.Iterable[U]] = {
// We are not using plus method defined in PriorityQueueMonoid as it is mutating
Copy link
Collaborator

@johnynek johnynek Sep 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you also need to match on:

since I guess this restriction is also enforced on the mapside operations...

You'll need a slightly different approach there (basically a custom cache).

But maybe they don't do their mutation detection on the mappers?

Copy link
Contributor Author

@nownikhil nownikhil Sep 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried with a unit test and it failed because of mutation. Instead of overwriting the put method in the SummingCache I was hoping to pattern match on Semigroup and overwrite the plus method.

case class ImmutablePQMonoid[T](
    size: Int
  )(implicit ord: Ordering[T]) extends PriorityQueueMonoid[T](size) {
    override def plus(left: PriorityQueue[T], right: PriorityQueue[T]): PriorityQueue[T] = {
      super.build(left.iterator().asScala.toIterable ++ right.iterator().asScala.toIterable)
    }
  }

But the same issue is there. I cannot access the size field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe this?

  case class ImmutablePQMonoid[T](
    pqm: PriorityQueueMonoid[T]
  )(implicit ord: Ordering[T]) extends Monoid[PriorityQueue[T]] {
    override def zero: PriorityQueue[T] = pqm.zero

    override def plus(
      l: PriorityQueue[T],
      r: PriorityQueue[T]
    ): PriorityQueue[T] = ???
  }

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just pattern match on this monoid and not use it at all? Like, as a short fix, you could just make sumByLocalKeys a no-op for this particular monoid in the short term, but for a better fix you could implement a summingcache for this particular monoid.

I think making a full copy of the priority queue each time will be a perf killer and I doubt it will be better than just not using a mapside cache at all...

The more we talk about this, the more it seems like maybe copying the immutable heap in from cats-collections is the right move (or send a PR to algebird and we can publish a new version and use it from there, either way...)

Seems like we keep getting paper cuts here.

.groupAll
.sortedReverseTake(3),
Seq(5, 4, 3)
test("sortedTake"){
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that bufferedTake also uses the problematic monoid internally. Worth testing:

implicit val mon: PriorityQueueMonoid[V1] = new PriorityQueueMonoid[V1](n)(fakeOrdering)

@johnynek
Copy link
Collaborator

Good lesson here: mutation, NOT EVEN ONCE!

Added ScaldingPriorityQueueMonoid which exposes count which we later use in TopCombineFn.
Added unit test for bufferedTake
Disabled map side aggregation when using ScaldingPriorityQueueMonoid
TypedPipe
.from(1 to 50)
.groupAll
.bufferedTake(100)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a note: this is going to be really bad in a real job without map-side aggregation: the key is Unit so there is only one key, so this would have each mapper send 100, then have the reducers pick 100 of those.

But with no mapside aggregation, all the data will be sent to the reducers, and they will throw away all but 100.

But we can add an issue and come back and address this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened a ticket for this.
#1952

@tlazaro
Copy link
Contributor

tlazaro commented Sep 26, 2021

I'm fine with landing this, how do we usually do it? 'Squash and merge' ?

@johnynek
Copy link
Collaborator

yes, squash and merge if you think it is ready. Looks good to me too.

@tlazaro tlazaro merged commit c596db9 into twitter:develop Sep 26, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants