Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CommittablePartitionedSource Spawns Multiple Streams for the Same Topic-Partition #1290

Open
andrii-kovalchuk-exa opened this issue Dec 17, 2020 · 2 comments

Comments

@andrii-kovalchuk-exa
Copy link

Overview

Streams, built with Consumer.committablePartitionedSource get stalled (produce little or no records) after multiple rebalances. After restart on new pods streams work fine, but get stalled after multiple rebalances too.

Details

When researching the issue and enabling DEBUG logging for alpakka, we found that Kafka Consumer of an old pod gets assigned with new partitions, but new sub-sources don't get created, so that assigned partitions just get eventually revoked:

2020-12-17 01:02:03.307 | DEBUG |  | 34014098 ms| akka.kafka.internal.SubSourceLogic       | [6969b#1] Assigning new partitions: PEB_HIPReq_RT_NkLjaR-27, PEB_HIPReq_RT_NkLjaR-28, PEB_HIPFree_RT_NkLjaR-26, PEB_HIPReq_RT_NkLjaR-35, PEB_HIPFree_RT_NkLjaR-29, PEB_HIPFree_RT_NkLjaR-34, PEB_HIPReq_RT_NkLjaR-26, PEB_HIPReq_RT_NkLjaR-33, PEB_HIPReq_RT_NkLjaR-29, PEB_HIPFree_RT_NkLjaR-27, PEB_HIPFree_RT_NkLjaR-33, PEB_HIPFree_RT_NkLjaR-30, PEB_HIPFree_RT_NkLjaR-24, PEB_HIPReq_RT_NkLjaR-34, PEB_HIPReq_RT_NkLjaR-32, PEB_HIPReq_RT_NkLjaR-25, PEB_HIPReq_RT_NkLjaR-31, PEB_HIPFree_RT_NkLjaR-35, PEB_HIPReq_RT_NkLjaR-30, PEB_HIPFree_RT_NkLjaR-28, PEB_HIPReq_RT_NkLjaR-24, PEB_HIPFree_RT_NkLjaR-31, PEB_HIPFree_RT_NkLjaR-25, PEB_HIPFree_RT_NkLjaR-32
2020-12-17 01:02:30.937 | DEBUG |  | 34041728 ms| akka.kafka.internal.SubSourceLogic       | [6969b#1] Closing SubSources for revoked partitions: 
2020-12-17 01:02:49.347 | DEBUG |  | 34060138 ms| akka.kafka.internal.SubSourceLogic       | [6969b#1] Closing SubSources for revoked partitions: PEB_HIPReq_RT_NkLjaR-27, PEB_HIPReq_RT_NkLjaR-28, PEB_HIPFree_RT_NkLjaR-26, PEB_HIPReq_RT_NkLjaR-35, PEB_HIPFree_RT_NkLjaR-29, PEB_HIPFree_RT_NkLjaR-34, PEB_HIPReq_RT_NkLjaR-26, PEB_HIPReq_RT_NkLjaR-33, PEB_HIPReq_RT_NkLjaR-29, PEB_HIPFree_RT_NkLjaR-27, PEB_HIPFree_RT_NkLjaR-33, PEB_HIPFree_RT_NkLjaR-30, PEB_HIPFree_RT_NkLjaR-24, PEB_HIPReq_RT_NkLjaR-34, PEB_HIPReq_RT_NkLjaR-32, PEB_HIPReq_RT_NkLjaR-25, PEB_HIPReq_RT_NkLjaR-31, PEB_HIPFree_RT_NkLjaR-35, PEB_HIPReq_RT_NkLjaR-30, PEB_HIPFree_RT_NkLjaR-28, PEB_HIPReq_RT_NkLjaR-24, PEB_HIPFree_RT_NkLjaR-31, PEB_HIPFree_RT_NkLjaR-25, PEB_HIPFree_RT_NkLjaR-32

When a pod is new, streams work fine:

2020-12-17 00:32:35.771 | DEBUG |  | 11404 ms| akka.kafka.internal.SubSourceLogic       | [7f0ee#1] Assigning new partitions: PEB_HIPFree_RT_NkLjaR-4, PEB_HIPFree_RT_NkLjaR-5, PEB_HIPFree_RT_NkLjaR-3, PEB_HIPReq_RT_NkLjaR-3, PEB_HIPReq_RT_NkLjaR-4, PEB_HIPReq_RT_NkLjaR-5
2020-12-17 00:32:35.774 | INFO  |  | 11407 ms| c.e.e.operation.streaming.Streamer       | Bootstrapping PEB_HIPFree_RT_NkLjaR-4 state
2020-12-17 00:32:35.774 | INFO  |  | 11407 ms| c.e.e.operation.streaming.Streamer       | Bootstrapping PEB_HIPFree_RT_NkLjaR-4 state
2020-12-17 00:32:35.899 | INFO  |  | 11532 ms| c.e.e.operation.streaming.Streamer       | Bootstrapping PEB_HIPFree_RT_NkLjaR-5 state
2020-12-17 00:32:35.904 | INFO  |  | 11537 ms| c.e.e.operation.streaming.Streamer       | Bootstrapping PEB_HIPFree_RT_NkLjaR-3 state
2020-12-17 00:32:35.912 | INFO  |  | 11545 ms| c.e.e.operation.streaming.Streamer       | Bootstrapping PEB_HIPReq_RT_NkLjaR-3 state
2020-12-17 00:32:35.941 | INFO  |  | 11574 ms| c.e.e.operation.streaming.Streamer       | Bootstrapping PEB_HIPReq_RT_NkLjaR-4 state
2020-12-17 00:32:35.942 | INFO  |  | 11575 ms| a.k.i.CommittableSubSourceStageLogic     | [0e287#1] Starting. Partition PEB_HIPReq_RT_NkLjaR-3
2020-12-17 00:32:35.944 | INFO  |  | 11577 ms| a.k.i.CommittableSubSourceStageLogic     | [05281#1] Starting. Partition PEB_HIPReq_RT_NkLjaR-4
2020-12-17 00:32:35.945 | INFO  |  | 11578 ms| c.e.e.operation.streaming.Streamer       | Bootstrapping PEB_HIPReq_RT_NkLjaR-5 state
2020-12-17 00:32:35.948 | INFO  |  | 11581 ms| a.k.i.CommittableSubSourceStageLogic     | [b5dfc#1] Starting. Partition PEB_HIPReq_RT_NkLjaR-5
...

After digging deeper, we discovered the following logs, which looks to point at the root cause:

2020-12-15 17:48:23.603 | DEBUG |  | 72319 ms| akka.kafka.internal.SubSourceLogic       | [b51d3#1] Assigning new partitions: PEB_HIPReq_RT_NkLjaR-0, PEB_HIPFree_RT_NkLjaR-2, PEB_Mrg_RT_NkLjaR-0, PEB_HIPFree_RT_NkLjaR-0, PEB_HIPReq_RT_NkLjaR-2, PEB_HIPReq_RT_NkLjaR-1, PEB_HIPFree_RT_NkLjaR-1
...
2020-12-15 17:50:00.360 | INFO  |  | 169076 ms| c.e.e.operation.streaming.Streamer       | Bootstrapping PEB_Mrg_RT_NkLjaR-0 state
2020-12-15 17:50:00.362 | INFO  |  | 169078 ms| a.k.i.CommittableSubSourceStageLogic     | [f7792#1] Starting. Partition PEB_Mrg_RT_NkLjaR-0
...
2020-12-15 17:50:05.393 | INFO  |  | 174109 ms| c.e.e.operation.streaming.Streamer       | Bootstrapping PEB_Mrg_RT_NkLjaR-0 state
2020-12-15 17:50:05.397 | INFO  |  | 174113 ms| a.k.i.CommittableSubSourceStageLogic     | [e7f24#1] Starting. Partition PEB_Mrg_RT_NkLjaR-0
2020-12-15 17:50:10.412 | INFO  |  | 179128 ms| c.e.e.operation.streaming.Streamer       | Bootstrapping PEB_Mrg_RT_NkLjaR-0 state
2020-12-15 17:50:10.414 | INFO  |  | 179130 ms| a.k.i.CommittableSubSourceStageLogic     | [f661e#1] Starting. Partition PEB_Mrg_RT_NkLjaR-0
2020-12-15 17:50:10.466 | WARN  |  | 179182 ms| akka.kafka.internal.KafkaConsumerActor   | [3ae81] RequestMessages from topic/partition Set(PEB_Mrg_RT_NkLjaR-0) already requested by other stage Set(PEB_Mrg_RT_NkLjaR-0)
2020-12-15 17:50:15.432 | INFO  |  | 184148 ms| c.e.e.operation.streaming.Streamer       | Bootstrapping PEB_Mrg_RT_NkLjaR-0 state
2020-12-15 17:50:15.434 | INFO  |  | 184150 ms| a.k.i.CommittableSubSourceStageLogic     | [2ef31#1] Starting. Partition PEB_Mrg_RT_NkLjaR-0
2020-12-15 17:50:15.442 | WARN  |  | 184158 ms| akka.kafka.internal.KafkaConsumerActor   | [3ae81] RequestMessages from topic/partition Set(PEB_Mrg_RT_NkLjaR-0) already requested by other stage Set(PEB_Mrg_RT_NkLjaR-0)
2020-12-15 17:50:20.452 | INFO  |  | 189168 ms| c.e.e.operation.streaming.Streamer       | Bootstrapping PEB_Mrg_RT_NkLjaR-0 state
2020-12-15 17:50:20.454 | INFO  |  | 189170 ms| a.k.i.CommittableSubSourceStageLogic     | [82b22#1] Starting. Partition PEB_Mrg_RT_NkLjaR-0
2020-12-15 17:50:20.463 | WARN  |  | 189179 ms| akka.kafka.internal.KafkaConsumerActor   | [3ae81] RequestMessages from topic/partition Set(PEB_Mrg_RT_NkLjaR-0) already requested by other stage Set(PEB_Mrg_RT_NkLjaR-0)
2020-12-15 17:50:25.472 | INFO  |  | 194188 ms| c.e.e.operation.streaming.Streamer       | Bootstrapping PEB_Mrg_RT_NkLjaR-0 state
2020-12-15 17:50:25.474 | INFO  |  | 194190 ms| a.k.i.CommittableSubSourceStageLogic     | [a72e9#1] Starting. Partition PEB_Mrg_RT_NkLjaR-0

As can be seen, multiple sub-sources get spawned for a single PEB_Mrg_RT_NkLjaR-0.

The streaming code is as follows:

Consumer
          .committablePartitionedSource(consumerSettings, topics) 
          .mapAsyncUnordered(parallelism /*32*/) {
            case (topicPartition, source) =>
                // bootstrap state and run the streamF here

We are on alpakka 2.0.2 and kafka 2.4.0.

@ennru
Copy link
Member

ennru commented Dec 22, 2020

Thank you for this report!
I guess it is hard to create a reproducer for this, but it would be immensely helpful when trying to fix this.

@andrii-kovalchuk-exa
Copy link
Author

andrii-kovalchuk-exa commented Dec 22, 2020

We seems to get down to the root cause of it.

We have a custom GraphStageWithMaterializedValue, that waits for the first message from the tp, which gets materialized to a tp state then. The behavior is similar to akka's FlatMapPrefix.
When making a stream we preMaterialize result of the custom stage to a tuple of (Promise[TPState], Source[CommittableMessage[String, Event], NotUsed]).

The reported problem has gone after adding a following block to the custom graph stage:

override def postStop(): Unit = {
        //this covers the case when the nested flow was never materialized
        statePromise.tryFailure(new AbruptStageTerminationException(this))
        super.postStop()
      }

So it looks like when partition rebalancing was very active (during k8s pods scale-up), the tp stream wasn't in time to grab the first message from the tp and materialize, which made it hang, possibly draining parallelism of mapAsyncUnordered. This could prevent alpakka from spawning new tp-streams.

However it's still unclear what made alpakka spawning multiple streams for the same tps.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants