Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GroupBy using only one group after some time #7544

Open
PiotrDuz opened this issue Mar 21, 2023 · 9 comments
Open

GroupBy using only one group after some time #7544

PiotrDuz opened this issue Mar 21, 2023 · 9 comments
Labels

Comments

@PiotrDuz
Copy link

PiotrDuz commented Mar 21, 2023

Hello,
I have experienced a strange behaviour in code and created unit tests that demonstrate it.
I am running RxJava3 v 3.1.5

Tests written in groovy and spock:

void 'test groupBy'() {
       given:
       def scheduler = Schedulers.from(Executors.newFixedThreadPool(5))
       when:
       Flowable.range(0, 5)
               .flatMap(partition -> {
                   return Flowable.range(0, 10_000_000)
                           .subscribeOn(Schedulers.io())
                           .filter(item -> item % 5 == partition)
                           .delay(10, TimeUnit.MILLISECONDS)
                           .map(item -> new Container(partition, item))
               })
               .doOnNext(container -> System.out.println("Next :" + container.partition))
               .groupBy(item -> item.getPartition())
               .flatMap(grouped -> {
                   grouped.flatMap(container -> {
                       Flowable.fromRunnable(() -> {
                           Thread.sleep(100)
                           System.out.println("Partition " + container.partition + " " + Thread.currentThread().getName())
                       })
                               .subscribeOn(scheduler)
                   }, 1)
               })
               .blockingSubscribe()


       then:
       1
   }


   void 'without groupBy'() {
       given:
       def scheduler = Schedulers.from(Executors.newFixedThreadPool(5))
       when:
       Flowable.range(0, 5)
               .flatMap(partition -> {
                   return Flowable.range(0, 10_000_000)
                           .subscribeOn(Schedulers.io())
                           .filter(item -> item % 5 == partition)
                           .observeOn(scheduler)
                           .delay(10, TimeUnit.MILLISECONDS)
                           .map(item -> new Container(partition, item))
                           .map(container -> {
                               Thread.sleep(100)
                               System.out.println("Partition " + container.partition + " " + Thread.currentThread().getName())
                               return container
                           })
               })
               .blockingSubscribe()
       then:
       1
   }

public class Container {
   private final int partition;

   private final int value;

   public Container(int partition, int value) {
       this.partition = partition;
       this.value = value;
   }

   public int getPartition() {
       return partition;
   }

   public int getValue() {
       return value;
   }
}

First test with groupBy fetches simultaneously values from a source (lets assume that fetching is slow so we do it in parallel for different values)
Fetched values must be processed sequentially, so after joining them in common stream, we group them by "partition" and then process groups in parallel - but still keeping processing of each group sequential.
Unfortunately after few seconds only one partition is being processed. The threads are changing, but data is fetched only from partiton X. I would expect that each group is processed in parallel, so in output log there should be many partitions interleaving.

The other tests shows how the result should be. It actually allows us to fetch data in parallel and proces streams together, sequantially each. So I treat it as a workaroud, but still wonder what happened in first example.

Could you help me with finding a couse of this behaviour?

@akarnokd
Copy link
Member

Because of the flatMap with maxConcurrency = 1 and the inherent preference towards last sender in flatMap, each time one group item is processed, it requests, 1 more item that tends to find its way back to only one of the partitions.

If you observe a group, similarly to the second example, you have ample of requests per group and thus get a mixture of processing:

.groupBy(item -> item.getPartition())
.flatMap(grouped -> {
    return grouped.observeOn(scheduler).flatMap(container -> {
        return Flowable.fromAction(() -> {
            Thread.sleep(100);
            System.out.println("Partition " + container.partition + " " + container.item + " " + Thread.currentThread().getName());
        });
    }, 1);
})

@PiotrDuz
Copy link
Author

There are 2 nested flatmaps. First flatmap is for groups, and there is no constraint on concurrency. So I was thinking that all available groups will be sequentially filled. Then, for each group, their sequential elements are processed 1 at a time. Scheduled on threadPoolExecuter to make this processing parallel.

Do I understand you correctly that concurrency limit on inner flatmap can affect mapping of groups? (.flatMap(grouped )

Thank you for a reply,
Regards

@akarnokd
Copy link
Member

Do I understand you correctly that concurrency limit on inner flatmap can affect mapping of groups? (.flatMap(grouped )

Concurrency limit can result in skewed request patterns, thus it can end up requesting 1 from one group after all. The processing speed is slow compared to the production speed so that when the next 1 item is requested, it gets routed all the way back to the very first category, as it will always have something ready. There is no balancing (or round-robin) happening.

@PiotrDuz
Copy link
Author

There are 5 threads in a pool, and processor has enough cores. How is it that that while 1 item is being processed (and given group's flatMap is being blocked, as only 1 concurrent processing per group can happen) other threads are idling?
Shouldnt other threads also process data coming from other groups? Each group has been subscribed with parallel consumer.

@akarnokd
Copy link
Member

grouped.flatMap(..., 1). Limiting backpressure such a way will result in a "hot path" from category 0 to processing its items thus you don't get mixing.

Please try out the example I suggested.

@PiotrDuz
Copy link
Author

I have rearranged my code:
`.flatMap(grouped -> {
grouped.observeOn(scheduler).flatMap(container -> {
Flowable.fromRunnable(() -> {
Thread.sleep(100)
System.out.println("Partition " + container.partition + " " + Thread.currentThread().getName())
})
}, 1)
})

`

But after some time the same situation occurs. One partition only is printing.

I might have not understood you correctly. Was your example supposed to be a solution?
I still have hard time to grasp what is going on.
Source producers of nmbers are flatmapped into one stream. How does thet groupedBy "knows" to request from specified partition more data?
What with the other groupd being on other threads, shouldnt they be pumping values too? Please go with me slowly on this :)

@akarnokd
Copy link
Member

The way you generate the input to groupBy, there is no guarantee you get reliable dispatching patterns. Worked for me; apparently doesn't last for you. Small imbalance in threading and everything is back to a hot path and single group processing. There is no fairness in the operators involved.

The 'without groupBy' approach has independend container-specific generation which is not merged and the re-split, so if that still represents what you were trying to achieve originally, use that pattern instead.

@PiotrDuz
Copy link
Author

It is more clear now, thank you for your replies!
Regards

@Mahammadnajaf
Copy link

Mahammadnajaf commented Mar 28, 2023

Combination of groupBy() and flatMap() operators.

In the first test with groupBy(), you are creating groups based on the partition value of each Container. However, the flatMap() operator used to process each group is not respecting the grouping, and is allowing items from different groups to be processed in parallel. This can result in some groups being blocked while others are still being processed, causing the behavior you observed where only one partition is being processed at a time.

In the second test without groupBy(), you are using the observeOn() operator to ensure that each item is processed sequentially on the scheduler you created. This approach avoids the issue with groupBy() and flatMap(), and allows items to be processed in parallel while still maintaining sequential processing within each partition.

To fix the issue in the first test, you can use the concatMap() operator instead of flatMap() to ensure that items from each group are processed sequentially. This would look something like this:
.flatMap(grouped -> {
return grouped.concatMap(container -> {
return Flowable.fromRunnable(() -> {
Thread.sleep(100);
System.out.println("Partition " + container.partition + " " + Thread.currentThread().getName());
})
.subscribeOn(scheduler);
}, 1);
})
Using concatMap() ensures that items from each group are processed one at a time, and the second argument 1 ensures that only one group is being processed at a time. This should result in each partition being processed in parallel while still maintaining sequential processing within each group.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants