Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement pause and resume #300

Open
gabrieljones opened this issue Mar 16, 2022 · 0 comments
Open

Implement pause and resume #300

gabrieljones opened this issue Mar 16, 2022 · 0 comments
Assignees
Labels
api: pubsublite Issues related to the googleapis/java-pubsublite-kafka API. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.

Comments

@gabrieljones
Copy link

gabrieljones commented Mar 16, 2022

I have an algorithm for delayed topic consume that is based on the pause and resume functions. In this API those functions have been shunted as NoOps. Please implement the pause and resume functions so that I may use my delayed message consume algorithm with PubSubLite.

Additional context
https://github.com/googleapis/java-pubsublite-kafka/blob/v0.6.7/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java#L585-L600

Message Delayed Consume Algorithm

  trait Delayer {
    def delay(records: Seq[KafkaRecord]): Seq[KafkaRecord]
  }

  object Delayer {

    /**
     * No op delayer, effectively zero delay.
     */
    object Zero extends Delayer {
      override def delay(records: Seq[KafkaRecord]): Seq[KafkaRecord] = records
    }

    /**
     * Delayer.Some
     * non blocking delayer
     * every batch it:
     * - resumes all assigned partitions
     * - collates previously delayed records with recently polled records
     * - records separated into
     *   - records that are old enough to process
     *   - records still too young to process
     * - pauses partitions for any records that are too young
     * - returns records that are old enough
     * - keeps a buffer of records to reconsider on the next pass
     *
     * If this consumer dies before the held back too young records are processed, their offsets will not have been
     * committed, and they will be reconsidered by the next consumer to be assigned their respective partitions.
     *
     * Not thread safe. It is expected this is only accessed from a single thread.
     */
    class Some(consumer: KafkaConsumer[Array[Byte], Array[Byte]], delay: FiniteDuration, clock: Clock = Clock.systemUTC()) extends Delayer {
      var recordsLater: Seq[KafkaRecord] = Seq.empty

      /** Determine which records are too young to process
       *
       * @param recordsNew records received in the recent poll
       * @param recordsLater records received in previous polls that were too young
       * @return (records ready now, records that are still too young)
       */
      def nowAndDelayed(recordsNew: Seq[KafkaRecord], recordsLater: Seq[KafkaRecord]): (Seq[KafkaRecord], Seq[KafkaRecord]) = {
        if (delay <= Duration.Zero) return (recordsLater ++ recordsNew, Seq.empty)
        val now = clock.millis()
        val bufferLater = new mutable.ArrayBuffer[KafkaRecord]
        val bufferNow = new mutable.ArrayBuffer[KafkaRecord]
        for {
          records <- Seq(recordsLater, recordsNew)
          record <- records
        } yield {
          if (record.timestamp() + delay.toMillis > now) { //record timestamp plus delay is in the future
            bufferLater.append(record) //save it for later
          } else {
            bufferNow.append(record)
          }
        }
        (bufferNow.toSeq, bufferLater.toSeq)
      }

      def partitionsPause(recordsLater: Seq[KafkaRecord]): Unit = {
        if (recordsLater.nonEmpty) {
          val tpsToPause: Set[TopicPartition] = recordsLater.map(r => new TopicPartition(r.topic(), r.partition())).toSet
          for {
            tpToPause <- tpsToPause
          } {
            try {
              consumer.pause(Collections.singletonList(tpToPause))
            } catch {
              case _: IllegalStateException => //skip this topic partition as it is no longer assigned to me
            }
          }
        }
      }

      def partitionsResumeAll(): Unit = consumer.resume(consumer.assignment())

      override def delay(recordsNew: Seq[KafkaRecord]): Seq[KafkaRecord] = {
        partitionsResumeAll() //resume all partitions, we will pause any that should still be paused before exiting
        val (recordsNow, recordsDelayed) = nowAndDelayed(recordsNew, recordsLater)
        recordsLater = recordsDelayed
        partitionsPause(recordsLater) //pause any partitions for records still too young to process
        recordsNow
      }

    }
  }
@product-auto-label product-auto-label bot added the api: pubsublite Issues related to the googleapis/java-pubsublite-kafka API. label Mar 16, 2022
@yoshi-automation yoshi-automation added triage me I really want to be triaged. 🚨 This issue needs some love. labels Mar 17, 2022
@meredithslota meredithslota added type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design. and removed 🚨 This issue needs some love. triage me I really want to be triaged. labels Mar 23, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsublite Issues related to the googleapis/java-pubsublite-kafka API. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.
Projects
None yet
Development

No branches or pull requests

4 participants