diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md new file mode 100644 index 00000000..6b2238bb --- /dev/null +++ b/CODE_OF_CONDUCT.md @@ -0,0 +1,93 @@ +# Code of Conduct + +## Our Pledge + +In the interest of fostering an open and welcoming environment, we as +contributors and maintainers pledge to making participation in our project and +our community a harassment-free experience for everyone, regardless of age, body +size, disability, ethnicity, gender identity and expression, level of +experience, education, socio-economic status, nationality, personal appearance, +race, religion, or sexual identity and orientation. + +## Our Standards + +Examples of behavior that contributes to creating a positive environment +include: + +* Using welcoming and inclusive language +* Being respectful of differing viewpoints and experiences +* Gracefully accepting constructive criticism +* Focusing on what is best for the community +* Showing empathy towards other community members + +Examples of unacceptable behavior by participants include: + +* The use of sexualized language or imagery and unwelcome sexual attention or + advances +* Trolling, insulting/derogatory comments, and personal or political attacks +* Public or private harassment +* Publishing others' private information, such as a physical or electronic + address, without explicit permission +* Other conduct which could reasonably be considered inappropriate in a + professional setting + +## Our Responsibilities + +Project maintainers are responsible for clarifying the standards of acceptable +behavior and are expected to take appropriate and fair corrective action in +response to any instances of unacceptable behavior. + +Project maintainers have the right and responsibility to remove, edit, or reject +comments, commits, code, wiki edits, issues, and other contributions that are +not aligned to this Code of Conduct, or to ban temporarily or permanently any +contributor for other behaviors that they deem inappropriate, threatening, +offensive, or harmful. + +## Scope + +This Code of Conduct applies both within project spaces and in public spaces +when an individual is representing the project or its community. Examples of +representing a project or community include using an official project e-mail +address, posting via an official social media account, or acting as an appointed +representative at an online or offline event. Representation of a project may be +further defined and clarified by project maintainers. + +This Code of Conduct also applies outside the project spaces when the Project +Steward has a reasonable belief that an individual's behavior may have a +negative impact on the project or its community. + +## Conflict Resolution + +We do not believe that all conflict is bad; healthy debate and disagreement +often yield positive results. However, it is never okay to be disrespectful or +to engage in behavior that violates the project’s code of conduct. + +If you see someone violating the code of conduct, you are encouraged to address +the behavior directly with those involved. Many issues can be resolved quickly +and easily, and this gives people more control over the outcome of their +dispute. If you are unable to resolve the matter for any reason, or if the +behavior is threatening or harassing, report it. We are dedicated to providing +an environment where participants feel welcome and safe. + +Reports should be directed to *[PROJECT STEWARD NAME(s) AND EMAIL(s)]*, the +Project Steward(s) for *[PROJECT NAME]*. It is the Project Steward’s duty to +receive and address reported violations of the code of conduct. They will then +work with a committee consisting of representatives from the Open Source +Programs Office and the Google Open Source Strategy team. If for any reason you +are uncomfortable reaching out the Project Steward, please email +opensource@google.com. + +We will investigate every complaint, but you may not receive a direct response. +We will use our discretion in determining when and how to follow up on reported +incidents, which may range from not taking action to permanent expulsion from +the project and project-sponsored spaces. We will notify the accused of the +report and provide them an opportunity to discuss it before any action is taken. +The identity of the reporter will be omitted from the details of the report +supplied to the accused. In potentially harmful situations, such as ongoing +harassment or threats to anyone's safety, we may take action without notice. + +## Attribution + +This Code of Conduct is adapted from the Contributor Covenant, version 1.4, +available at +https://www.contributor-covenant.org/version/1/4/code-of-conduct.html \ No newline at end of file diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 00000000..f2dbdee0 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,139 @@ +# How to Contribute + +We'd love to accept your patches and contributions to this project. There are +just a few small guidelines you need to follow. + +## Contributor License Agreement + +Contributions to this project must be accompanied by a Contributor License +Agreement. You (or your employer) retain the copyright to your contribution; +this simply gives us permission to use and redistribute your contributions as +part of the project. Head over to to see +your current agreements on file or to sign a new one. + +You generally only need to submit a CLA once, so if you've already submitted one +(even if it was for a different project), you probably don't need to do it +again. + +## Code reviews + +All submissions, including submissions by project members, require review. We +use GitHub pull requests for this purpose. Consult +[GitHub Help](https://help.github.com/articles/about-pull-requests/) for more +information on using pull requests. + +## Community Guidelines + +This project follows +[Google's Open Source Community Guidelines](https://opensource.google.com/conduct/). + +## Building the project + +To build, package, and run all unit tests run the command + +``` +mvn clean verify +``` + +### Running Integration tests + +To include integration tests when building the project, you need access to +a GCP Project with a valid service account. + +For instructions on how to generate a service account and corresponding +credentials JSON see: [Creating a Service Account][1]. + +Then run the following to build, package, run all unit tests and run all +integration tests. + +```bash +export GOOGLE_APPLICATION_CREDENTIALS=/path/to/service/account.json +mvn -Penable-integration-tests clean verify +``` + +## Code Samples + +Code Samples must be bundled in separate Maven modules, and guarded by a +Maven profile with the name `enable-samples`. + +The samples must be separate from the primary project for a few reasons: +1. Primary projects have a minimum Java version of Java 7 whereas samples have + a minimum Java version of Java 8. Due to this we need the ability to + selectively exclude samples from a build run. +2. Many code samples depend on external GCP services and need + credentials to access the service. +3. Code samples are not released as Maven artifacts and must be excluded from + release builds. + +### Building + +```bash +mvn -Penable-samples clean verify +``` + +Some samples require access to GCP services and require a service account: + +```bash +export GOOGLE_APPLICATION_CREDENTIALS=/path/to/service/account.json +mvn -Penable-samples clean verify +``` + +### Profile Config + +1. To add samples in a profile to your Maven project, add the following to your +`pom.xml` + + ```xml + + [...] + + + enable-samples + + sample + + + + [...] + + ``` + +2. [Activate](#profile-activation) the profile. +3. Define your samples in a normal Maven project in the `samples/` directory. + +### Code Formatting + +Code in this repo is formatted with +[google-java-format](https://github.com/google/google-java-format). +To run formatting on your project, you can run: +``` +mvn com.coveo:fmt-maven-plugin:format +``` + +### Profile Activation + +To include code samples when building and testing the project, enable the +`enable-samples` Maven profile. + +#### Command line + +To activate the Maven profile on the command line add `-Penable-samples` to your +Maven command. + +#### Maven `settings.xml` + +To activate the Maven profile in your `~/.m2/settings.xml` add an entry of +`enable-samples` following the instructions in [Active Profiles][2]. + +This method has the benefit of applying to all projects you build (and is +respected by IntelliJ IDEA) and is recommended if you are going to be +contributing samples to several projects. + +#### IntelliJ IDEA + +To activate the Maven Profile inside IntelliJ IDEA, follow the instructions in +[Activate Maven profiles][3] to activate `enable-samples`. + +[1]: https://cloud.google.com/docs/authentication/getting-started#creating_a_service_account +[2]: https://maven.apache.org/settings.html#Active_Profiles +[3]: https://www.jetbrains.com/help/idea/work-with-maven-profiles.html#activate_maven_profiles diff --git a/LICENSE b/LICENSE new file mode 100644 index 00000000..d6456956 --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md index 81b4755e..a2ae93e4 100644 --- a/README.md +++ b/README.md @@ -1 +1,90 @@ -# java-pubsublite-kafka +# Instructions for Pub/Sub Lite Kafka usage. + +1. Add the following to your POM file to download the Pub/Sub Lite Kafka shim. +```xml + + org.apache.kafka + kafka-clients + 2.6.0 + + + com.google.cloud + pubsublite-kafka-shim + TODO: Make a release + +``` + +1. Create a topic using `gcloud pubsub lite-topics create` +1. Write some messages using: + + ```java + import com.google.cloud.pubsublite.kafka.ProducerSettings; + import org.apache.kafka.clients.producer.*; + import com.google.cloud.pubsublite.*; + + ... + + private final static String ZONE = "us-central1-b"; + private final static Long PROJECT_NUM = 123L; + + ... + + TopicPath topic = TopicPath.newBuilder() + .setLocation(CloudZone.parse(ZONE)) + .setProject(ProjectNumber.of(PROJECT_NUM)) + .setName(TopicName.of("my-topic")).build(); + + ProducerSettings settings = ProducerSettings.newBuilder() + .setTopicPath(topic) + .build(); + + try (Producer producer = settings.instantiate()) { + Future sent = producer.send(new ProducerRecord( + topic.toString(), // Required to be the same topic. + "key".getBytes(), + "value".getBytes() + )); + RecordMetadata meta = sent.get(); + } + ``` +1. Create a subscription using `gcloud pubsub lite-subscriptions create` +1. Read some messages using: + + ```java + import com.google.cloud.pubsublite.kafka.ConsumerSettings; + import org.apache.kafka.clients.consumer.*; + import com.google.cloud.pubsublite.*; + import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; + + ... + + private final static String ZONE = "us-central1-b"; + private final static Long PROJECT_NUM = 123L; + + ... + + SubscriptionPath subscription = SubscriptionPath.newBuilder() + .setLocation(CloudZone.parse(ZONE)) + .setProject(ProjectNumber.of(PROJECT_NUM)) + .setName(SubscriptionName.of("my-sub")) + .build(); + + ConsumerSettings settings = ConsumerSettings.newBuilder() + .setSubscriptionPath(subscription) + .setPerPartitionFlowControlSettings(FlowControlSettings.builder() + .setBytesOutstanding(10_000_000) // 10 MB + .setMessagesOutstanding(Long.MAX_VALUE) + .build()) + .setAutocommit(true); + + try (Consumer consumer = settings.instantiate()) { + while (true) { + ConsumerRecords records = consumer.poll(Long.MAX_VALUE); + for (ConsumerRecord record : records) { + System.out.println(record.offset() + “: ” + record.value()); + } + } + } catch (WakeupException e) { + // ignored + } + ``` diff --git a/pom.xml b/pom.xml new file mode 100644 index 00000000..cc382f82 --- /dev/null +++ b/pom.xml @@ -0,0 +1,99 @@ + + + + com.google.cloud + google-cloud-pubsublite-parent + 0.5.1-SNAPSHOT + ../pom.xml + + 4.0.0 + com.google.cloud + pubsublite-kafka-shim + 0.5.1-SNAPSHOT + jar + Pub/Sub Lite Kafka Shim + https://github.com/googleapis/java-pubsublite + Kafka Producer and Consumer for Google Cloud Pub/Sub Lite + + + com.google.api.grpc + proto-google-cloud-pubsublite-v1 + 0.5.1-SNAPSHOT + + + com.google.cloud + google-cloud-pubsublite + 0.5.1-SNAPSHOT + + + org.apache.kafka + kafka-clients + 2.6.0 + + + io.grpc + grpc-api + + + com.google.protobuf + protobuf-java + + + com.google.protobuf + protobuf-java-util + + + com.google.api + api-common + + + com.google.guava + guava + + + com.google.flogger + google-extensions + + + com.google.auto.value + auto-value-annotations + + + com.google.errorprone + error_prone_annotations + + + + junit + junit + test + + + com.google.truth + truth + test + + + com.google.truth.extensions + truth-java8-extension + test + + + org.mockito + mockito-core + test + + + + + + org.sonatype.plugins + nexus-staging-maven-plugin + + + true + + + + + diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/CommitterFactory.java b/src/main/java/com/google/cloud/pubsublite/kafka/CommitterFactory.java new file mode 100644 index 00000000..7652aefc --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/kafka/CommitterFactory.java @@ -0,0 +1,26 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.internal.wire.Committer; +import io.grpc.StatusException; + +/** A factory for making new PullSubscribers for a given partition of a subscription. */ +interface CommitterFactory { + Committer newCommitter(Partition partition) throws StatusException; +} diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerFactory.java b/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerFactory.java new file mode 100644 index 00000000..668f6d7b --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerFactory.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +interface ConsumerFactory { + SingleSubscriptionConsumer newConsumer(); +} diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java b/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java new file mode 100644 index 00000000..ec6123f3 --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java @@ -0,0 +1,125 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import com.google.auto.value.AutoValue; +import com.google.cloud.pubsublite.AdminClient; +import com.google.cloud.pubsublite.AdminClientSettings; +import com.google.cloud.pubsublite.CloudZone; +import com.google.cloud.pubsublite.PartitionLookupUtils; +import com.google.cloud.pubsublite.SubscriptionPath; +import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; +import com.google.cloud.pubsublite.internal.BufferingPullSubscriber; +import com.google.cloud.pubsublite.internal.CursorClient; +import com.google.cloud.pubsublite.internal.CursorClientSettings; +import com.google.cloud.pubsublite.internal.ExtractStatus; +import com.google.cloud.pubsublite.internal.wire.AssignerBuilder; +import com.google.cloud.pubsublite.internal.wire.AssignerFactory; +import com.google.cloud.pubsublite.internal.wire.CommitterBuilder; +import com.google.cloud.pubsublite.internal.wire.PubsubContext; +import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework; +import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder; +import com.google.cloud.pubsublite.proto.Subscription; +import io.grpc.StatusException; +import org.apache.kafka.clients.consumer.Consumer; + +@AutoValue +public abstract class ConsumerSettings { + private static final Framework FRAMEWORK = Framework.of("KAFKA_SHIM"); + + // Required parameters. + abstract SubscriptionPath subscriptionPath(); + + abstract FlowControlSettings perPartitionFlowControlSettings(); + + // Optional parameters. + abstract boolean autocommit(); + + public static Builder newBuilder() { + return (new AutoValue_ConsumerSettings.Builder()).setAutocommit(false); + } + + @AutoValue.Builder + public abstract static class Builder { + // Required parameters. + public abstract Builder setSubscriptionPath(SubscriptionPath path); + + public abstract Builder setPerPartitionFlowControlSettings(FlowControlSettings settings); + + // Optional parameters. + public abstract Builder setAutocommit(boolean autocommit); + + public abstract ConsumerSettings build(); + } + + public Consumer instantiate() throws StatusException { + CloudZone zone = subscriptionPath().location(); + try (AdminClient adminClient = + AdminClient.create(AdminClientSettings.newBuilder().setRegion(zone.region()).build())) { + Subscription subscription = adminClient.getSubscription(subscriptionPath()).get(); + TopicPath topic = TopicPath.parse(subscription.getTopic()); + long partitionCount = PartitionLookupUtils.numPartitions(topic); + AssignerFactory assignerFactory = + receiver -> { + AssignerBuilder.Builder builder = AssignerBuilder.newBuilder(); + builder.setReceiver(receiver); + builder.setSubscriptionPath(subscriptionPath()); + return builder.build(); + }; + PullSubscriberFactory pullSubscriberFactory = + (partition, initialSeek) -> { + SubscriberBuilder.Builder builder = + SubscriberBuilder.newBuilder() + .setContext(PubsubContext.of(FRAMEWORK)) + .setPartition(partition) + .setSubscriptionPath(subscriptionPath()); + return new BufferingPullSubscriber( + consumer -> { + synchronized (builder) { + return builder.setMessageConsumer(consumer).build(); + } + }, + perPartitionFlowControlSettings(), + initialSeek); + }; + CommitterFactory committerFactory = + partition -> + CommitterBuilder.newBuilder() + .setSubscriptionPath(subscriptionPath()) + .setPartition(partition) + .build(); + ConsumerFactory consumerFactory = + () -> + new SingleSubscriptionConsumerImpl( + topic, autocommit(), pullSubscriberFactory, committerFactory); + + CursorClient cursorClient = + CursorClient.create(CursorClientSettings.newBuilder().setRegion(zone.region()).build()); + + return new PubsubLiteConsumer( + subscriptionPath(), + topic, + partitionCount, + consumerFactory, + assignerFactory, + cursorClient); + } catch (Exception e) { + throw ExtractStatus.toCanonical(e); + } + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/KafkaExceptionUtils.java b/src/main/java/com/google/cloud/pubsublite/kafka/KafkaExceptionUtils.java new file mode 100644 index 00000000..7b92d1a7 --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/kafka/KafkaExceptionUtils.java @@ -0,0 +1,91 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import com.google.cloud.pubsublite.internal.ExtractStatus; +import io.grpc.StatusException; +import java.util.concurrent.TimeoutException; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.BrokerNotAvailableException; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.InvalidRequestException; + +class KafkaExceptionUtils { + private KafkaExceptionUtils() {} + + static KafkaException toKafkaException(StatusException source) { + switch (source.getStatus().getCode()) { + case OK: + throw source.getStatus().asRuntimeException(); + case ABORTED: + return new BrokerNotAvailableException("Aborted.", source); + case ALREADY_EXISTS: + return new KafkaException("Already exists.", source); + case CANCELLED: + return new BrokerNotAvailableException("Cancelled.", source); + case DATA_LOSS: + return new KafkaException("Data loss.", source); + case DEADLINE_EXCEEDED: + return new BrokerNotAvailableException("Deadline exceeded.", source); + case FAILED_PRECONDITION: + return new InvalidRequestException("Failed precondition.", source); + case INTERNAL: + return new BrokerNotAvailableException("Internal.", source); + case INVALID_ARGUMENT: + return new InvalidRequestException("Invalid argument.", source); + case NOT_FOUND: + return new KafkaException("Not found.", source); + case OUT_OF_RANGE: + return new KafkaException("Out of range.", source); + case PERMISSION_DENIED: + return new AuthorizationException("Permission denied.", source); + case RESOURCE_EXHAUSTED: + return new KafkaException("Resource exhausted.", source); + case UNAUTHENTICATED: + return new AuthenticationException("Unauthenticated.", source); + case UNAVAILABLE: + return new BrokerNotAvailableException("Unavailable.", source); + case UNIMPLEMENTED: + return new KafkaException("Unimplemented.", source); + case UNKNOWN: + return new KafkaException("Unknown.", source); + } + return new KafkaException("No case.", source); + } + + /** + * Transform an exception into a kind that is likely to be thrown through kafka interfaces. + * + * @param t A throwable to transform. + * @return The transformed exception suitable for re-throwing. + */ + static RuntimeException toKafka(Throwable t) { + try { + throw t; + } catch (KafkaException | UnsupportedOperationException | IllegalStateException e) { + return e; + } catch (InterruptedException e) { + return new InterruptException(e); + } catch (TimeoutException e) { + return new org.apache.kafka.common.errors.TimeoutException(e); + } catch (Throwable e) { + return KafkaExceptionUtils.toKafkaException(ExtractStatus.toCanonical(t)); + } + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/LiteHeaders.java b/src/main/java/com/google/cloud/pubsublite/kafka/LiteHeaders.java new file mode 100644 index 00000000..fd097c56 --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/kafka/LiteHeaders.java @@ -0,0 +1,98 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableListMultimap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.protobuf.ByteString; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; + +class LiteHeaders implements Headers { + private ImmutableListMultimap attributes; + + LiteHeaders(ImmutableListMultimap attributes) { + this.attributes = attributes; + } + + static Header toHeader(String key, ByteString value) { + return new Header() { + @Override + public String key() { + return key; + } + + @Override + public byte[] value() { + return value.toByteArray(); + } + }; + } + + private static List
toHeaders(String key, Collection values) { + ImmutableList.Builder
headersBuilder = ImmutableList.builder(); + values.forEach(value -> headersBuilder.add(toHeader(key, value))); + return headersBuilder.build(); + } + + @Override + public Headers add(Header header) throws IllegalStateException { + throw new IllegalStateException(); + } + + @Override + public Headers add(String s, byte[] bytes) throws IllegalStateException { + throw new IllegalStateException(); + } + + @Override + public Headers remove(String s) throws IllegalStateException { + throw new IllegalStateException(); + } + + @Override + public Header lastHeader(String s) { + return Iterables.getLast(this); + } + + @Override + public Iterable
headers(String s) { + if (attributes.containsKey(s)) + return Iterables.transform(attributes.get(s), value -> toHeader(s, value)); + return ImmutableList.of(); + } + + @Override + public Header[] toArray() { + ImmutableList.Builder
arrayBuilder = ImmutableList.builder(); + attributes + .entries() + .forEach(entry -> arrayBuilder.add(toHeader(entry.getKey(), entry.getValue()))); + return (Header[]) arrayBuilder.build().toArray(); + } + + @Override + public Iterator
iterator() { + return Iterators.transform( + attributes.entries().iterator(), entry -> toHeader(entry.getKey(), entry.getValue())); + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java b/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java new file mode 100644 index 00000000..49a55b39 --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java @@ -0,0 +1,58 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import com.google.auto.value.AutoValue; +import com.google.cloud.pubsublite.PartitionLookupUtils; +import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.internal.wire.PubsubContext; +import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework; +import com.google.cloud.pubsublite.internal.wire.RoutingPublisherBuilder; +import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder; +import io.grpc.StatusException; +import org.apache.kafka.clients.producer.Producer; + +@AutoValue +public abstract class ProducerSettings { + private static final Framework FRAMEWORK = Framework.of("KAFKA_SHIM"); + + // Required parameters. + abstract TopicPath topicPath(); + + public static Builder newBuilder() { + return new AutoValue_ProducerSettings.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + // Required parameters. + public abstract Builder setTopicPath(TopicPath path); + + public abstract ProducerSettings build(); + } + + public Producer instantiate() throws StatusException { + SinglePartitionPublisherBuilder.Builder builder = + SinglePartitionPublisherBuilder.newBuilder() + .setContext(PubsubContext.of(FRAMEWORK)) + .setTopic(topicPath()); + RoutingPublisherBuilder.Builder routingBuilder = + RoutingPublisherBuilder.newBuilder().setTopic(topicPath()).setPublisherBuilder(builder); + return new PubsubLiteProducer( + routingBuilder.build(), PartitionLookupUtils.numPartitions(topicPath()), topicPath()); + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java new file mode 100644 index 00000000..19fd03b9 --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java @@ -0,0 +1,550 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import static com.google.cloud.pubsublite.kafka.KafkaExceptionUtils.toKafka; + +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.SubscriptionPath; +import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.internal.CursorClient; +import com.google.cloud.pubsublite.internal.ExtractStatus; +import com.google.cloud.pubsublite.internal.wire.Assigner; +import com.google.cloud.pubsublite.internal.wire.AssignerFactory; +import com.google.cloud.pubsublite.internal.wire.PartitionAssignmentReceiver; +import com.google.cloud.pubsublite.proto.Cursor; +import com.google.cloud.pubsublite.proto.SeekRequest; +import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.flogger.GoogleLogger; +import io.grpc.StatusException; +import java.time.Duration; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.UnsupportedVersionException; + +/** + * A class that uses a SingleSubscriptionConsumer to remove the duplicate methods from the kafka + * consumer. + * + *

This also filters methods that Pub/Sub Lite will not implement. + */ +class PubsubLiteConsumer implements Consumer { + private static final Duration INFINITE_DURATION = Duration.ofMillis(Long.MAX_VALUE); + private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); + private final SubscriptionPath subscriptionPath; + private final TopicPath topicPath; + private final long partitionCount; + private final ConsumerFactory consumerFactory; + private final AssignerFactory assignerFactory; + private final CursorClient cursorClient; + private Optional assigner = Optional.empty(); + private Optional consumer = Optional.empty(); + + PubsubLiteConsumer( + SubscriptionPath subscriptionPath, + TopicPath topicPath, + long partitionCount, + ConsumerFactory consumerFactory, + AssignerFactory assignerFactory, + CursorClient cursorClient) { + this.subscriptionPath = subscriptionPath; + this.topicPath = topicPath; + this.partitionCount = partitionCount; + this.consumerFactory = consumerFactory; + this.assignerFactory = assignerFactory; + this.cursorClient = cursorClient; + } + + private TopicPartition toTopicPartition(Partition partition) { + return new TopicPartition(topicPath.toString(), (int) partition.value()); + } + + private SingleSubscriptionConsumer requireValidConsumer() { + if (!consumer.isPresent()) { + throw new IllegalStateException("Neither subscribe nor assign has been called."); + } + return consumer.get(); + } + + @Override + public Set assignment() { + return requireValidConsumer().assignment().stream() + .map(this::toTopicPartition) + .collect(Collectors.toSet()); + } + + @Override + public Set subscription() { + if (consumer.isPresent()) { + return ImmutableSet.of(topicPath.toString()); + } + return ImmutableSet.of(); + } + + private static class NoOpRebalanceListener implements ConsumerRebalanceListener { + @Override + public void onPartitionsRevoked(Collection partitions) {} + + @Override + public void onPartitionsAssigned(Collection partitions) {} + } + + @Override + public void subscribe(Pattern pattern) { + subscribe(pattern, new NoOpRebalanceListener()); + } + + @Override + public void subscribe(Pattern pattern, ConsumerRebalanceListener consumerRebalanceListener) { + throw new UnsupportedOperationException( + "Pattern assignment is not available for Pub/Sub Lite."); + } + + private void checkTopic(String topic) { + try { + TopicPath path = TopicPath.parse(topic); + if (!path.equals(topicPath)) { + throw new UnsupportedOperationException( + "Pub/Sub Lite consumers may only interact with the one topic they are configured for."); + } + } catch (StatusException e) { + throw toKafka(e); + } + } + + private Partition checkTopicGetPartition(TopicPartition topicPartition) { + checkTopic(topicPartition.topic()); + try { + return Partition.of(topicPartition.partition()); + } catch (StatusException e) { + throw toKafka(e); + } + } + + private PartitionAssignmentReceiver newAssignmentReceiver(ConsumerRebalanceListener listener) { + AtomicReference> lastPartitions = new AtomicReference<>(ImmutableSet.of()); + return newAssignment -> { + Set previousAssignment = lastPartitions.get(); + Set removed = new HashSet<>(previousAssignment); + removed.removeAll(newAssignment); + Set added = new HashSet<>(newAssignment); + added.removeAll(previousAssignment); + if (!removed.isEmpty()) { + listener.onPartitionsLost( + removed.stream().map(this::toTopicPartition).collect(Collectors.toSet())); + } + if (!added.isEmpty()) { + listener.onPartitionsAssigned( + added.stream().map(this::toTopicPartition).collect(Collectors.toSet())); + } + consumer.get().setAssignment(newAssignment); + lastPartitions.set(newAssignment); + }; + } + + @Override + public void subscribe(Collection collection) { + subscribe(collection, new NoOpRebalanceListener()); + } + + @Override + public void subscribe( + Collection collection, ConsumerRebalanceListener consumerRebalanceListener) { + if (collection.size() != 1) { + throw new UnsupportedOperationException( + "Subscribing to multiple topics is not available for Pub/Sub Lite."); + } + checkTopic(collection.iterator().next()); + if (consumer.isPresent()) { + if (assigner.isPresent()) { + // No-op: we can only subscribe to one topic and we already are. + return; + } + throw new IllegalStateException("Called subscribe after calling assign."); + } + consumer = Optional.of(consumerFactory.newConsumer()); + try { + assigner = Optional.of(assignerFactory.New(newAssignmentReceiver(consumerRebalanceListener))); + } catch (StatusException e) { + throw toKafka(e); + } + } + + @Override + public void assign(Collection collection) { + if (collection.isEmpty()) { + unsubscribe(); + return; + } + if (assigner.isPresent()) { + throw new IllegalStateException("Called assign after calling subscribe."); + } + Set partitions = + collection.stream().map(this::checkTopicGetPartition).collect(Collectors.toSet()); + if (!consumer.isPresent()) { + consumer = Optional.of(consumerFactory.newConsumer()); + } + consumer.get().setAssignment(partitions); + } + + @Override + public void unsubscribe() { + assigner.ifPresent(instance -> instance.stopAsync().awaitTerminated()); + assigner = Optional.empty(); + consumer.ifPresent(instance -> instance.close(INFINITE_DURATION)); + consumer = Optional.empty(); + } + + private static Duration toDuration(long l, TimeUnit timeUnit) { + return Duration.ofMillis(TimeUnit.MILLISECONDS.convert(l, timeUnit)); + } + + @Override + public ConsumerRecords poll(long l) { + return poll(Duration.ofMillis(l)); + } + + @Override + public ConsumerRecords poll(Duration timeout) { + return requireValidConsumer().poll(timeout); + } + + Map checkAndTransformOffsets(Map map) { + ImmutableMap.Builder output = ImmutableMap.builder(); + try { + map.forEach( + ExtractStatus.rethrowAsRuntime( + (topicPartition, offsetAndMetadata) -> { + output.put( + checkTopicGetPartition(topicPartition), Offset.of(offsetAndMetadata.offset())); + })); + } catch (Throwable t) { + throw toKafka(t); + } + return output.build(); + } + + @Override + public void commitSync(Map map) { + commitSync(map, INFINITE_DURATION); + } + + @Override + public void commitSync(Map map, Duration duration) { + try { + requireValidConsumer() + .commit(checkAndTransformOffsets(map)) + .get(duration.toMillis(), TimeUnit.MILLISECONDS); + } catch (Throwable t) { + throw toKafka(t); + } + } + + @Override + public void commitAsync( + Map map, OffsetCommitCallback offsetCommitCallback) { + ApiFutures.addCallback( + requireValidConsumer().commit(checkAndTransformOffsets(map)), + new ApiFutureCallback() { + @Override + public void onFailure(Throwable throwable) { + offsetCommitCallback.onComplete(null, toKafka(throwable)); + } + + @Override + public void onSuccess(Void result) { + offsetCommitCallback.onComplete(map, null); + } + }); + } + + @Override + public void commitSync() { + commitSync(INFINITE_DURATION); + } + + @Override + public void commitSync(Duration duration) { + try { + requireValidConsumer().commitAll().get(duration.toMillis(), TimeUnit.MILLISECONDS); + } catch (Throwable t) { + throw toKafka(t); + } + } + + @Override + public void commitAsync(OffsetCommitCallback offsetCommitCallback) { + ApiFutures.addCallback( + requireValidConsumer().commitAll(), + new ApiFutureCallback>() { + @Override + public void onFailure(Throwable throwable) { + offsetCommitCallback.onComplete(null, toKafka(throwable)); + } + + @Override + public void onSuccess(Map map) { + ImmutableMap.Builder result = ImmutableMap.builder(); + map.forEach( + (partition, offset) -> + result.put(toTopicPartition(partition), new OffsetAndMetadata(offset.value()))); + offsetCommitCallback.onComplete(result.build(), null); + } + }); + } + + @Override + public void commitAsync() { + commitAsync( + (map, e) -> { + if (e != null) { + logger.atWarning().withCause(e).log("Failed to commit offsets."); + } + }); + } + + @Override + public void seek(TopicPartition topicPartition, long l) { + Partition partition = checkTopicGetPartition(topicPartition); + requireValidConsumer() + .doSeek( + partition, + SeekRequest.newBuilder().setCursor(Cursor.newBuilder().setOffset(l)).build()); + } + + @Override + public void seek(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) { + seek(topicPartition, offsetAndMetadata.offset()); + } + + @Override + public void seekToBeginning(Collection collection) { + collection.forEach( + topicPartition -> + requireValidConsumer() + .doSeek( + checkTopicGetPartition(topicPartition), + SeekRequest.newBuilder() + .setCursor(Cursor.newBuilder().setOffset(0).build()) + .build())); + } + + @Override + public void seekToEnd(Collection collection) { + collection.forEach( + topicPartition -> + requireValidConsumer() + .doSeek( + checkTopicGetPartition(topicPartition), + SeekRequest.newBuilder().setNamedTarget(NamedTarget.HEAD).build())); + } + + @Override + public long position(TopicPartition topicPartition) { + return position(topicPartition, INFINITE_DURATION); + } + + @Override + public long position(TopicPartition partition, Duration timeout) { + Partition litePartition = checkTopicGetPartition(partition); + Optional consumerPosition = requireValidConsumer().position(litePartition); + if (consumerPosition.isPresent()) return consumerPosition.get(); + return committed(partition, timeout).offset(); + } + + @Override + public OffsetAndMetadata committed(TopicPartition topicPartition) { + return committed(topicPartition, INFINITE_DURATION); + } + + @Override + public OffsetAndMetadata committed(TopicPartition topicPartition, Duration duration) { + return committed(ImmutableSet.of(topicPartition), duration).get(topicPartition); + } + + @Override + public Map committed(Set set) { + return committed(set, INFINITE_DURATION); + } + + @Override + public Map committed( + Set partitions, Duration timeout) { + Set targets = + partitions.stream().map(this::checkTopicGetPartition).collect(Collectors.toSet()); + try { + Map full_map = + cursorClient + .listPartitionCursors(subscriptionPath) + .get(timeout.toMillis(), TimeUnit.MILLISECONDS); + ImmutableMap.Builder output = ImmutableMap.builder(); + targets.forEach( + ExtractStatus.rethrowAsRuntime( + partition -> { + output.put( + toTopicPartition(partition), + new OffsetAndMetadata(full_map.getOrDefault(partition, Offset.of(0)).value())); + })); + return output.build(); + } catch (Throwable t) { + throw toKafka(t); + } + } + + @Override + public Map metrics() { + return ImmutableMap.of(); + } + + @Override + public List partitionsFor(String s) { + return partitionsFor(s, INFINITE_DURATION); + } + + @Override + public List partitionsFor(String topic, Duration timeout) { + checkTopic(topic); + return SharedBehavior.partitionsFor(partitionCount, topicPath); + } + + @Override + public Map> listTopics() { + return listTopics(INFINITE_DURATION); + } + + @Override + public Map> listTopics(Duration timeout) { + return ImmutableMap.of(topicPath.toString(), partitionsFor(topicPath.toString(), timeout)); + } + + @Override + public Map offsetsForTimes(Map map) { + return offsetsForTimes(map, INFINITE_DURATION); + } + + @Override + public Map offsetsForTimes( + Map map, Duration duration) { + throw new UnsupportedVersionException( + "Pub/Sub Lite does not support Consumer backlog introspection."); + } + + @Override + public Map beginningOffsets(Collection collection) { + return beginningOffsets(collection, INFINITE_DURATION); + } + + @Override + public Map beginningOffsets( + Collection collection, Duration duration) { + ImmutableMap.Builder results = ImmutableMap.builder(); + collection.forEach( + topicPartition -> { + checkTopic(topicPartition.topic()); + results.put(topicPartition, 0L); + }); + return results.build(); + } + + @Override + public Map endOffsets(Collection collection) { + return endOffsets(collection, INFINITE_DURATION); + } + + @Override + public Map endOffsets( + Collection collection, Duration duration) { + throw new UnsupportedVersionException( + "Pub/Sub Lite does not support Consumer backlog introspection."); + } + + @Override + public void close() { + close(INFINITE_DURATION); + } + + @Override + public void close(long l, TimeUnit timeUnit) { + close(toDuration(l, timeUnit)); + } + + @Override + public void close(Duration timeout) { + try { + cursorClient.close(); + } catch (Exception e) { + logger.atSevere().withCause(e).log("Error closing cursor client during Consumer shutdown."); + } + unsubscribe(); + } + + @Override + public ConsumerGroupMetadata groupMetadata() { + return new ConsumerGroupMetadata(subscriptionPath.toString()); + } + + @Override + public Set paused() { + return ImmutableSet.of(); + } + + @Override + public void pause(Collection collection) { + logger.atWarning().log( + "Calling pause on a Pub/Sub Lite Consumer is a no-op. Configure the amount of outstanding bytes and messages instead."); + } + + @Override + public void resume(Collection collection) { + logger.atWarning().log( + "Calling resume on a Pub/Sub Lite Consumer is a no-op. Configure the amount of outstanding bytes and messages instead."); + } + + @Override + public void enforceRebalance() { + logger.atWarning().log("Calling enforceRebalance on a Pub/Sub Lite Consumer is a no-op."); + } + + @Override + public void wakeup() { + requireValidConsumer().wakeup(); + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteNode.java b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteNode.java new file mode 100644 index 00000000..7a63b922 --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteNode.java @@ -0,0 +1,26 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import org.apache.kafka.common.Node; + +final class PubsubLiteNode { + private PubsubLiteNode() {} + + public static final Node NODE = new Node(0, "pubsublite.googleapis.com", 443); + public static final Node[] NODES = {NODE}; +} diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducer.java b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducer.java new file mode 100644 index 00000000..dbb7090b --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducer.java @@ -0,0 +1,199 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import static com.google.cloud.pubsublite.kafka.KafkaExceptionUtils.toKafka; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.core.ApiService.Listener; +import com.google.api.core.ApiService.State; +import com.google.cloud.pubsublite.PublishMetadata; +import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.internal.ExtractStatus; +import com.google.cloud.pubsublite.internal.Publisher; +import com.google.common.collect.ImmutableMap; +import com.google.common.flogger.GoogleLogger; +import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.StatusException; +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.UnsupportedVersionException; + +class PubsubLiteProducer implements Producer { + private static final UnsupportedVersionException NO_TRANSACTIONS_EXCEPTION = + new UnsupportedVersionException( + "Pub/Sub Lite is a non-transactional system and does not support producer transactions."); + private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); + + private final Publisher publisher; + private final TopicPath topicPath; + private final long partitionCount; + + PubsubLiteProducer( + Publisher publisher, long partitionCount, TopicPath topicPath) { + this.publisher = publisher; + this.topicPath = topicPath; + this.partitionCount = partitionCount; + this.publisher.addListener( + new Listener() { + @Override + public void failed(State from, Throwable failure) { + logger.atWarning().withCause(failure).log("Pub/Sub Lite Publisher failed."); + } + }, + MoreExecutors.directExecutor()); + this.publisher.startAsync().awaitRunning(); + } + + @Override + public void initTransactions() { + throw NO_TRANSACTIONS_EXCEPTION; + } + + @Override + public void beginTransaction() { + throw NO_TRANSACTIONS_EXCEPTION; + } + + @Override + public void sendOffsetsToTransaction(Map map, String s) { + throw NO_TRANSACTIONS_EXCEPTION; + } + + @Override + public void sendOffsetsToTransaction( + Map map, ConsumerGroupMetadata consumerGroupMetadata) { + throw NO_TRANSACTIONS_EXCEPTION; + } + + @Override + public void commitTransaction() { + throw NO_TRANSACTIONS_EXCEPTION; + } + + @Override + public void abortTransaction() { + throw NO_TRANSACTIONS_EXCEPTION; + } + + private void checkTopic(String topic) { + try { + TopicPath path = TopicPath.parse(topic); + if (!path.equals(topicPath)) { + throw new UnsupportedOperationException( + "Pub/Sub Lite producers may only interact with the one topic they are configured for."); + } + } catch (StatusException e) { + throw toKafka(e); + } + } + + @Override + public ApiFuture send(ProducerRecord producerRecord) { + checkTopic(producerRecord.topic()); + if (producerRecord.partition() != null) { + throw new UnsupportedOperationException( + "Pub/Sub Lite producers may not specify a partition in their records."); + } + ApiFuture future = + publisher.publish(RecordTransforms.toMessage(producerRecord)); + return ApiFutures.transform( + future, + meta -> + new RecordMetadata( + new TopicPartition(topicPath.toString(), (int) meta.partition().value()), + meta.offset().value(), + 0, + -1, + 0L, + producerRecord.key().length, + producerRecord.value().length), + MoreExecutors.directExecutor()); + } + + @Override + public Future send( + ProducerRecord producerRecord, Callback callback) { + ApiFuture future = send(producerRecord); + ApiFutures.addCallback( + future, + new ApiFutureCallback() { + @Override + public void onFailure(Throwable throwable) { + callback.onCompletion(null, ExtractStatus.toCanonical(throwable)); + } + + @Override + public void onSuccess(RecordMetadata recordMetadata) { + callback.onCompletion(recordMetadata, null); + } + }, + MoreExecutors.directExecutor()); + return future; + } + + @Override + public void flush() { + try { + publisher.flush(); + } catch (IOException e) { + throw ExtractStatus.toCanonical(e).getStatus().asRuntimeException(); + } + } + + @Override + public List partitionsFor(String s) { + checkTopic(s); + return SharedBehavior.partitionsFor(partitionCount, topicPath); + } + + @Override + public Map metrics() { + return ImmutableMap.of(); + } + + @Override + public void close() { + close(Duration.ofMillis(Long.MAX_VALUE)); + } + + @Override + public void close(Duration duration) { + try { + publisher.stopAsync().awaitTerminated(duration.toMillis(), MILLISECONDS); + } catch (TimeoutException e) { + logger.atWarning().withCause(e).log("Failed to close publisher."); + } + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/PullSubscriberFactory.java b/src/main/java/com/google/cloud/pubsublite/kafka/PullSubscriberFactory.java new file mode 100644 index 00000000..ff45ed13 --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/kafka/PullSubscriberFactory.java @@ -0,0 +1,29 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.SequencedMessage; +import com.google.cloud.pubsublite.internal.PullSubscriber; +import com.google.cloud.pubsublite.proto.SeekRequest; +import io.grpc.StatusException; + +/** A factory for making new PullSubscribers for a given partition of a subscription. */ +interface PullSubscriberFactory { + PullSubscriber newPullSubscriber(Partition partition, SeekRequest initial) + throws StatusException; +} diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/RecordTransforms.java b/src/main/java/com/google/cloud/pubsublite/kafka/RecordTransforms.java new file mode 100644 index 00000000..dddbcd22 --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/kafka/RecordTransforms.java @@ -0,0 +1,75 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.SequencedMessage; +import com.google.cloud.pubsublite.TopicPath; +import com.google.common.collect.ImmutableListMultimap; +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import com.google.protobuf.util.Timestamps; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.record.TimestampType; + +class RecordTransforms { + private RecordTransforms() {} + + static Message toMessage(ProducerRecord record) { + Message.Builder builder = + Message.builder() + .setKey(ByteString.copyFrom(record.key())) + .setData(ByteString.copyFrom(record.value())); + if (record.timestamp() != null) { + builder.setEventTime(Timestamps.fromMillis(record.timestamp())); + } + ImmutableListMultimap.Builder attributes = ImmutableListMultimap.builder(); + record + .headers() + .forEach(header -> attributes.put(header.key(), ByteString.copyFrom(header.value()))); + return builder.setAttributes(attributes.build()).build(); + } + + static ConsumerRecord fromMessage( + SequencedMessage message, TopicPath topic, Partition partition) { + Headers headers = new LiteHeaders(message.message().attributes()); + TimestampType type; + Timestamp timestamp; + if (message.message().eventTime().isPresent()) { + type = TimestampType.CREATE_TIME; + timestamp = message.message().eventTime().get(); + } else { + type = TimestampType.LOG_APPEND_TIME; + timestamp = message.publishTime(); + } + return new ConsumerRecord<>( + topic.toString(), + (int) partition.value(), + message.offset().value(), + Timestamps.toMillis(timestamp), + type, + 0L, + message.message().key().size(), + message.message().data().size(), + message.message().key().toByteArray(), + message.message().data().toByteArray(), + headers); + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/SharedBehavior.java b/src/main/java/com/google/cloud/pubsublite/kafka/SharedBehavior.java new file mode 100644 index 00000000..80d83fab --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/kafka/SharedBehavior.java @@ -0,0 +1,51 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import static com.google.cloud.pubsublite.kafka.KafkaExceptionUtils.toKafka; + +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.TopicPath; +import com.google.common.collect.ImmutableList; +import java.util.List; +import org.apache.kafka.common.PartitionInfo; + +/** Shared behavior for producer and consumer. */ +final class SharedBehavior { + private SharedBehavior() {} + + static PartitionInfo toPartitionInfo(TopicPath topic, Partition partition) { + return new PartitionInfo( + topic.toString(), + (int) partition.value(), + PubsubLiteNode.NODE, + PubsubLiteNode.NODES, + PubsubLiteNode.NODES); + } + + static List partitionsFor(long partitionCount, TopicPath topic) { + try { + ImmutableList.Builder result = ImmutableList.builder(); + for (int i = 0; i < partitionCount; ++i) { + result.add(toPartitionInfo(topic, Partition.of(i))); + } + return result.build(); + } catch (Throwable t) { + throw toKafka(t); + } + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumer.java b/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumer.java new file mode 100644 index 00000000..c44420b7 --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumer.java @@ -0,0 +1,53 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import com.google.api.core.ApiFuture; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.proto.SeekRequest; +import java.time.Duration; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.KafkaException; + +/** A stripped down KafkaConsumer interface that operates on a single subscription. */ +interface SingleSubscriptionConsumer { + void setAssignment(Set partitions); + + Set assignment(); + + ConsumerRecords poll(Duration duration); + + ApiFuture> commitAll(); + + ApiFuture commit(Map commitOffsets); + + void doSeek(Partition partition, SeekRequest request) throws KafkaException; + + Optional position(Partition partition); + + void close(Duration duration); + + /** + * Cause the outstanding or next call to poll to throw a WakeupException. The consumer is left in + * an unspecified state. + */ + void wakeup(); +} diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java b/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java new file mode 100644 index 00000000..277bf51d --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java @@ -0,0 +1,293 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import static com.google.cloud.pubsublite.kafka.KafkaExceptionUtils.toKafka; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.SequencedMessage; +import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.internal.CloseableMonitor; +import com.google.cloud.pubsublite.internal.ExtractStatus; +import com.google.cloud.pubsublite.internal.PullSubscriber; +import com.google.cloud.pubsublite.internal.wire.Committer; +import com.google.cloud.pubsublite.proto.SeekRequest; +import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.flogger.GoogleLogger; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import java.time.Duration; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.kafka.clients.consumer.CommitFailedException; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; + +class SingleSubscriptionConsumerImpl implements SingleSubscriptionConsumer { + private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); + + private final TopicPath topic; + private final boolean autocommit; + + private final PullSubscriberFactory subscriberFactory; + private final CommitterFactory committerFactory; + + private final CloseableMonitor monitor = new CloseableMonitor(); + + static class SubscriberState { + PullSubscriber subscriber; + Committer committer; + Optional lastUncommitted = Optional.empty(); + } + + @GuardedBy("monitor.monitor") + private Map partitions = new HashMap<>(); + + // Set to true when wakeup() has been called once. + @GuardedBy("monitor.monitor") + private boolean wakeupTriggered = false; + + SingleSubscriptionConsumerImpl( + TopicPath topic, + boolean autocommit, + PullSubscriberFactory subscriberFactory, + CommitterFactory committerFactory) { + this.topic = topic; + this.autocommit = autocommit; + this.subscriberFactory = subscriberFactory; + this.committerFactory = committerFactory; + } + + @Override + public void setAssignment(Set assignment) { + try (CloseableMonitor.Hold h = monitor.enter()) { + List unassigned = + ImmutableSet.copyOf(partitions.keySet()).stream() + .filter(p -> !assignment.contains(p)) + .map(p -> partitions.remove(p)) + .collect(Collectors.toList()); + for (SubscriberState state : unassigned) { + state.subscriber.close(); + state.committer.stopAsync().awaitTerminated(); + } + assignment.stream() + .filter(p -> !partitions.containsKey(p)) + .forEach( + ExtractStatus.rethrowAsRuntime( + partition -> { + SubscriberState s = new SubscriberState(); + s.subscriber = + subscriberFactory.newPullSubscriber( + partition, + SeekRequest.newBuilder() + .setNamedTarget(NamedTarget.COMMITTED_CURSOR) + .build()); + s.committer = committerFactory.newCommitter(partition); + s.committer.startAsync().awaitRunning(); + partitions.put(partition, s); + })); + } catch (Throwable t) { + throw ExtractStatus.toCanonical(t).getStatus().asRuntimeException(); + } + } + + @Override + public Set assignment() { + try (CloseableMonitor.Hold h = monitor.enter()) { + return partitions.keySet(); + } + } + + @GuardedBy("monitor.monitor") + private Map> fetchAll() { + Map> partitionQueues = new HashMap<>(); + partitions.forEach( + ExtractStatus.rethrowAsRuntime( + (partition, state) -> { + List messages = state.subscriber.pull(); + if (messages.isEmpty()) return; + partitionQueues.computeIfAbsent(partition, x -> new ArrayDeque<>()).addAll(messages); + })); + return partitionQueues; + } + + private Map> doPoll(Duration duration) { + try { + while (!duration.isZero()) { + try (CloseableMonitor.Hold h = monitor.enter()) { + if (wakeupTriggered) throw new WakeupException(); + Map> partitionQueues = fetchAll(); + if (!partitionQueues.isEmpty()) return partitionQueues; + } + Duration sleepFor = Collections.min(ImmutableList.of(duration, Duration.ofMillis(10))); + Thread.sleep(sleepFor.toMillis()); + duration = duration.minus(sleepFor); + } + // Last fetch to handle duration originally being 0 and last time window sleep. + try (CloseableMonitor.Hold h = monitor.enter()) { + return fetchAll(); + } + } catch (Throwable t) { + throw toKafka(t); + } + } + + @Override + public ConsumerRecords poll(Duration duration) { + Map> partitionQueues = doPoll(duration); + Map>> records = new HashMap<>(); + if (autocommit) { + ApiFuture future = commitAll(); + ApiFutures.addCallback( + future, + new ApiFutureCallback() { + @Override + public void onFailure(Throwable throwable) { + logger.atWarning().withCause(throwable).log("Failed to commit offsets."); + } + + @Override + public void onSuccess(Object result) {} + }, + MoreExecutors.directExecutor()); + } + partitionQueues.forEach( + (partition, queue) -> { + if (queue.isEmpty()) return; + try (CloseableMonitor.Hold h = monitor.enter()) { + SubscriberState state = partitions.getOrDefault(partition, null); + if (state != null) { + state.lastUncommitted = Optional.of(Iterables.getLast(queue).offset()); + } + } + List> partitionRecords = + queue.stream() + .map(message -> RecordTransforms.fromMessage(message, topic, partition)) + .collect(Collectors.toList()); + records.put( + new TopicPartition(topic.toString(), (int) partition.value()), partitionRecords); + }); + return new ConsumerRecords<>(records); + } + + @Override + public ApiFuture> commitAll() { + try (CloseableMonitor.Hold h = monitor.enter()) { + ImmutableMap.Builder builder = ImmutableMap.builder(); + ImmutableList.Builder> commitFutures = ImmutableList.builder(); + partitions.entrySet().stream() + .filter(entry -> entry.getValue().lastUncommitted.isPresent()) + .forEach( + entry -> { + // The Pub/Sub Lite commit offset is the next to be received. + Offset lastUncommitted = entry.getValue().lastUncommitted.get(); + entry.getValue().lastUncommitted = Optional.empty(); + Offset toCommit = Offset.of(lastUncommitted.value() + 1); + builder.put(entry.getKey(), toCommit); + commitFutures.add(entry.getValue().committer.commitOffset(toCommit)); + }); + Map map = builder.build(); + return ApiFutures.transform( + ApiFutures.allAsList(commitFutures.build()), + ignored -> map, + MoreExecutors.directExecutor()); + } + } + + @Override + public ApiFuture commit(Map commitOffsets) { + try (CloseableMonitor.Hold h = monitor.enter()) { + ImmutableList.Builder> commitFutures = ImmutableList.builder(); + commitOffsets.forEach( + (partition, offset) -> { + if (!partitions.containsKey(partition)) { + throw new CommitFailedException( + "Tried to commit to partition " + + partition.value() + + " which is not assigned to this consumer."); + } + commitFutures.add(partitions.get(partition).committer.commitOffset(offset)); + }); + return ApiFutures.transform( + ApiFutures.allAsList(commitFutures.build()), + ignored -> null, + MoreExecutors.directExecutor()); + } + } + + @Override + public void doSeek(Partition partition, SeekRequest request) throws KafkaException { + try (CloseableMonitor.Hold h = monitor.enter()) { + if (!partitions.containsKey(partition)) { + throw new IllegalStateException( + "Received seek for partition " + + partition.value() + + " which is not assigned to this consumer."); + } + SubscriberState state = partitions.get(partition); + state.subscriber.close(); + state.subscriber = subscriberFactory.newPullSubscriber(partition, request); + } catch (IllegalStateException e) { + throw e; + } catch (Throwable t) { + throw toKafka(t); + } + } + + @Override + public Optional position(Partition partition) { + if (!partitions.containsKey(partition)) return Optional.empty(); + return partitions.get(partition).subscriber.nextOffset().map(Offset::value); + } + + @Override + public void close(Duration duration) { + try (CloseableMonitor.Hold h = monitor.enter()) { + for (SubscriberState state : partitions.values()) { + state.subscriber.close(); + state.committer.stopAsync().awaitTerminated(); + } + } catch (Throwable t) { + throw toKafka(t); + } + } + + @Override + public void wakeup() { + try (CloseableMonitor.Hold h = monitor.enter()) { + wakeupTriggered = true; + } + } +} diff --git a/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java b/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java new file mode 100644 index 00000000..f152a62e --- /dev/null +++ b/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java @@ -0,0 +1,457 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +import com.google.api.core.ApiFutures; +import com.google.api.core.SettableApiFuture; +import com.google.cloud.pubsublite.CloudZone; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.ProjectNumber; +import com.google.cloud.pubsublite.SubscriptionPath; +import com.google.cloud.pubsublite.TopicName; +import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.internal.CursorClient; +import com.google.cloud.pubsublite.internal.testing.UnitTestExamples; +import com.google.cloud.pubsublite.internal.wire.Assigner; +import com.google.cloud.pubsublite.internal.wire.AssignerFactory; +import com.google.cloud.pubsublite.internal.wire.PartitionAssignmentReceiver; +import com.google.cloud.pubsublite.proto.Cursor; +import com.google.cloud.pubsublite.proto.SeekRequest; +import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableListMultimap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Multimaps; +import com.google.common.reflect.ImmutableTypeToInstanceMap; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; + +@RunWith(JUnit4.class) +public class PubsubLiteConsumerTest { + private static Duration INFINITE_DURATION = Duration.ofMillis(Long.MAX_VALUE); + + private static TopicPartition exampleTopicPartition() { + return new TopicPartition( + UnitTestExamples.example(TopicPath.class).toString(), + (int) UnitTestExamples.example(Partition.class).value()); + } + + private static OffsetAndMetadata exampleOffsetAndMetadata() { + return new OffsetAndMetadata(UnitTestExamples.example(Offset.class).value()); + } + + private static T example(Class klass) { + ImmutableTypeToInstanceMap map = + ImmutableTypeToInstanceMap.builder() + .put(TopicPartition.class, exampleTopicPartition()) + .put(OffsetAndMetadata.class, exampleOffsetAndMetadata()) + .build(); + T instance = (T) map.getInstance(klass); + if (instance != null) return instance; + return UnitTestExamples.example(klass); + } + + @Mock ConsumerFactory consumerFactory; + @Mock AssignerFactory assignerFactory; + @Mock CursorClient cursorClient; + + @Mock Assigner assigner; + @Mock SingleSubscriptionConsumer underlying; + + Consumer consumer; + + @Before + public void setUp() { + initMocks(this); + consumer = + new PubsubLiteConsumer( + example(SubscriptionPath.class), + example(TopicPath.class), + 3, + consumerFactory, + assignerFactory, + cursorClient); + when(consumerFactory.newConsumer()).thenReturn(underlying); + } + + @Test + public void unsupportedOperations() { + assertThrows( + UnsupportedOperationException.class, () -> consumer.subscribe(Pattern.compile(".*"))); + assertThrows( + UnsupportedOperationException.class, + () -> consumer.subscribe(Pattern.compile(".*"), mock(ConsumerRebalanceListener.class))); + assertThrows( + UnsupportedOperationException.class, () -> consumer.subscribe(ImmutableList.of("a", "b"))); + assertThrows( + UnsupportedOperationException.class, + () -> + consumer.subscribe(ImmutableList.of("a", "b"), mock(ConsumerRebalanceListener.class))); + assertThrows( + UnsupportedVersionException.class, () -> consumer.offsetsForTimes(ImmutableMap.of())); + assertThrows( + UnsupportedVersionException.class, + () -> consumer.offsetsForTimes(ImmutableMap.of(), Duration.ZERO)); + assertThrows(UnsupportedVersionException.class, () -> consumer.endOffsets(ImmutableList.of())); + assertThrows( + UnsupportedVersionException.class, + () -> consumer.endOffsets(ImmutableList.of(), Duration.ZERO)); + } + + @Test + public void staticOperations() { + // Pre-subscribe: returns empty set. + assertThat(consumer.subscription()).isEmpty(); + + assertThat(consumer.metrics()).isEmpty(); + assertThat(consumer.groupMetadata().groupId()) + .isEqualTo(example(SubscriptionPath.class).toString()); + TopicPartition other = new TopicPartition(example(TopicPath.class).toString(), 2); + assertThat(consumer.beginningOffsets(ImmutableList.of(example(TopicPartition.class), other))) + .containsExactly(example(TopicPartition.class), 0L, other, 0L); + // No-op operations. + assertThat(consumer.paused()).isEmpty(); + consumer.pause(ImmutableList.of()); + consumer.resume(ImmutableList.of()); + } + + @Test + public void badTopicOperations() throws Exception { + TopicPath badTopic = + TopicPath.newBuilder() + .setLocation(example(CloudZone.class)) + .setProject(example(ProjectNumber.class)) + .setName(TopicName.of("abc")) + .build(); + assertThrows( + UnsupportedOperationException.class, + () -> consumer.subscribe(ImmutableList.of(badTopic.toString()))); + assertThrows( + UnsupportedOperationException.class, + () -> + consumer.subscribe( + ImmutableList.of(badTopic.toString()), mock(ConsumerRebalanceListener.class))); + + TopicPartition bad = new TopicPartition(badTopic.toString(), 4); + assertThrows(UnsupportedOperationException.class, () -> consumer.assign(ImmutableList.of(bad))); + assertThrows(UnsupportedOperationException.class, () -> consumer.seek(bad, 3)); + assertThrows( + UnsupportedOperationException.class, () -> consumer.seek(bad, new OffsetAndMetadata(3))); + assertThrows(UnsupportedOperationException.class, () -> consumer.position(bad)); + assertThrows(UnsupportedOperationException.class, () -> consumer.position(bad, Duration.ZERO)); + assertThrows(UnsupportedOperationException.class, () -> consumer.committed(bad)); + assertThrows(UnsupportedOperationException.class, () -> consumer.committed(bad, Duration.ZERO)); + assertThrows( + UnsupportedOperationException.class, + () -> consumer.committed(ImmutableSet.of(example(TopicPartition.class), bad))); + assertThrows( + UnsupportedOperationException.class, + () -> + consumer.committed(ImmutableSet.of(example(TopicPartition.class), bad), Duration.ZERO)); + assertThrows( + UnsupportedOperationException.class, () -> consumer.partitionsFor(badTopic.toString())); + assertThrows( + UnsupportedOperationException.class, + () -> consumer.partitionsFor(badTopic.toString(), Duration.ZERO)); + assertThrows( + UnsupportedOperationException.class, + () -> consumer.beginningOffsets(ImmutableList.of(example(TopicPartition.class), bad))); + assertThrows( + UnsupportedOperationException.class, + () -> + consumer.beginningOffsets( + ImmutableList.of(example(TopicPartition.class), bad), Duration.ZERO)); + + // Only valid if subscribed. + consumer.assign(ImmutableList.of(example(TopicPartition.class))); + assertThrows( + UnsupportedOperationException.class, + () -> consumer.commitSync(ImmutableMap.of(bad, new OffsetAndMetadata(3)))); + assertThrows( + UnsupportedOperationException.class, + () -> consumer.commitSync(ImmutableMap.of(bad, new OffsetAndMetadata(3)), Duration.ZERO)); + assertThrows( + UnsupportedOperationException.class, + () -> consumer.commitAsync(ImmutableMap.of(bad, new OffsetAndMetadata(3)), null)); + assertThrows( + UnsupportedOperationException.class, () -> consumer.seekToBeginning(ImmutableList.of(bad))); + assertThrows( + UnsupportedOperationException.class, () -> consumer.seekToEnd(ImmutableList.of(bad))); + } + + @Test + public void invalidBeforeSubscribeOperations() { + assertThrows(IllegalStateException.class, () -> consumer.assignment()); + assertThrows(IllegalStateException.class, () -> consumer.poll(3)); + assertThrows(IllegalStateException.class, () -> consumer.poll(Duration.ZERO)); + assertThrows(IllegalStateException.class, () -> consumer.commitSync(ImmutableMap.of())); + assertThrows( + IllegalStateException.class, () -> consumer.commitSync(ImmutableMap.of(), Duration.ZERO)); + assertThrows( + IllegalStateException.class, + () -> consumer.commitAsync(ImmutableMap.of(), mock(OffsetCommitCallback.class))); + assertThrows(IllegalStateException.class, () -> consumer.commitSync()); + assertThrows(IllegalStateException.class, () -> consumer.commitSync(Duration.ZERO)); + assertThrows(IllegalStateException.class, () -> consumer.commitAsync()); + assertThrows( + IllegalStateException.class, () -> consumer.commitAsync(mock(OffsetCommitCallback.class))); + assertThrows( + IllegalStateException.class, () -> consumer.seek(example(TopicPartition.class), 3)); + assertThrows( + IllegalStateException.class, + () -> consumer.seek(example(TopicPartition.class), new OffsetAndMetadata(3))); + assertThrows( + IllegalStateException.class, + () -> consumer.seekToBeginning(ImmutableList.of(example(TopicPartition.class)))); + assertThrows( + IllegalStateException.class, + () -> consumer.seekToEnd(ImmutableList.of(example(TopicPartition.class)))); + assertThrows( + IllegalStateException.class, () -> consumer.position(example(TopicPartition.class))); + assertThrows( + IllegalStateException.class, + () -> consumer.position(example(TopicPartition.class), Duration.ZERO)); + assertThrows(IllegalStateException.class, () -> consumer.wakeup()); + } + + @Test + public void validAssign() throws Exception { + consumer.assign(ImmutableList.of(example(TopicPartition.class))); + verify(consumerFactory, times(1)).newConsumer(); + verify(underlying, times(1)).setAssignment(ImmutableSet.of(example(Partition.class))); + consumer.assign(ImmutableList.of(example(TopicPartition.class))); + verify(consumerFactory, times(1)).newConsumer(); + verify(underlying, times(2)).setAssignment(ImmutableSet.of(example(Partition.class))); + verify(assignerFactory, times(0)).New(any()); + consumer.unsubscribe(); + verify(underlying).close(INFINITE_DURATION); + } + + @Test + public void simpleConsumerMethods() { + consumer.assign(ImmutableList.of(example(TopicPartition.class))); + when(underlying.assignment()).thenReturn(ImmutableSet.of(example(Partition.class))); + assertThat(consumer.assignment()).containsExactly(example(TopicPartition.class)); + assertThat(consumer.subscription()).containsExactly(example(TopicPath.class).toString()); + consumer.wakeup(); + verify(underlying).wakeup(); + + // Assign empty calls unsubscribe. + consumer.assign(ImmutableSet.of()); + verify(underlying).close(INFINITE_DURATION); + } + + @Test + public void validSubscribe() throws Exception { + ConsumerRebalanceListener listener = mock(ConsumerRebalanceListener.class); + AtomicReference receiver = new AtomicReference<>(null); + doAnswer( + args -> { + receiver.set(args.getArgument(0)); + return assigner; + }) + .when(assignerFactory) + .New(any()); + consumer.subscribe(ImmutableList.of(example(TopicPath.class).toString()), listener); + verify(consumerFactory).newConsumer(); + verify(assignerFactory).New(any()); + receiver.get().handleAssignment(ImmutableSet.of(Partition.of(5))); + verify(listener) + .onPartitionsAssigned( + ImmutableSet.of(new TopicPartition(example(TopicPath.class).toString(), 5))); + verify(underlying).setAssignment(ImmutableSet.of(Partition.of(5))); + // Duplicate subscribe does nothing. + receiver.get().handleAssignment(ImmutableSet.of(Partition.of(5))); + verifyNoMoreInteractions(listener); + // Add and remove. + receiver.get().handleAssignment(ImmutableSet.of(Partition.of(7))); + verify(listener) + .onPartitionsLost( + ImmutableSet.of(new TopicPartition(example(TopicPath.class).toString(), 5))); + verify(listener) + .onPartitionsAssigned( + ImmutableSet.of(new TopicPartition(example(TopicPath.class).toString(), 7))); + verify(underlying).setAssignment(ImmutableSet.of(Partition.of(7))); + } + + @Test + public void polling() { + consumer.assign(ImmutableList.of(example(TopicPartition.class))); + + when(underlying.poll(Duration.ofMillis(1))) + .thenReturn( + new ConsumerRecords<>( + Multimaps.asMap( + ImmutableListMultimap.of( + example(TopicPartition.class), + new ConsumerRecord<>( + example(TopicPath.class).toString(), + 0, + 0, + new byte[0], + new byte[0]))))); + ConsumerRecords records = consumer.poll(1); + assertThat(records.count()).isEqualTo(1); + ConsumerRecord record = records.iterator().next(); + assertThat(record.topic()).isEqualTo(example(TopicPath.class).toString()); + assertThat(record.partition()).isEqualTo(0); + + when(underlying.poll(Duration.ofMillis(2))) + .thenReturn( + new ConsumerRecords<>( + Multimaps.asMap( + ImmutableListMultimap.of( + example(TopicPartition.class), + new ConsumerRecord<>( + example(TopicPath.class).toString(), + 1, + 0, + new byte[0], + new byte[0]))))); + records = consumer.poll(Duration.ofMillis(2)); + assertThat(records.count()).isEqualTo(1); + record = records.iterator().next(); + assertThat(record.topic()).isEqualTo(example(TopicPath.class).toString()); + assertThat(record.partition()).isEqualTo(1); + + when(underlying.poll(Duration.ofMillis(3))) + .thenAnswer( + args -> { + throw new KafkaException(); + }); + assertThrows(KafkaException.class, () -> consumer.poll(Duration.ofMillis(3))); + } + + @Test + public void commitSync() { + consumer.assign(ImmutableList.of(example(TopicPartition.class))); + when(underlying.commit(ImmutableMap.of(example(Partition.class), example(Offset.class)))) + .thenReturn(ApiFutures.immediateFuture(null)); + consumer.commitSync( + ImmutableMap.of(example(TopicPartition.class), example(OffsetAndMetadata.class))); + verify(underlying).commit(ImmutableMap.of(example(Partition.class), example(Offset.class))); + when(underlying.commit(ImmutableMap.of(example(Partition.class), example(Offset.class)))) + .thenReturn(SettableApiFuture.create()); + assertThrows( + TimeoutException.class, + () -> + consumer.commitSync( + ImmutableMap.of(example(TopicPartition.class), example(OffsetAndMetadata.class)), + Duration.ZERO)); + + when(underlying.commitAll()).thenReturn(ApiFutures.immediateFuture(null)); + consumer.commitSync(); + verify(underlying).commitAll(); + when(underlying.commitAll()).thenReturn(SettableApiFuture.create()); + assertThrows(TimeoutException.class, () -> consumer.commitSync(Duration.ZERO)); + } + + @Test + public void commitAsync() { + consumer.assign(ImmutableList.of(example(TopicPartition.class))); + OffsetCommitCallback callback = mock(OffsetCommitCallback.class); + when(underlying.commit(ImmutableMap.of(example(Partition.class), example(Offset.class)))) + .thenReturn(ApiFutures.immediateFuture(null)); + consumer.commitAsync( + ImmutableMap.of(example(TopicPartition.class), example(OffsetAndMetadata.class)), callback); + verify(callback) + .onComplete( + ImmutableMap.of(example(TopicPartition.class), example(OffsetAndMetadata.class)), null); + + callback = mock(OffsetCommitCallback.class); + when(underlying.commit(ImmutableMap.of(example(Partition.class), example(Offset.class)))) + .thenReturn(ApiFutures.immediateFailedFuture(new KafkaException())); + consumer.commitAsync( + ImmutableMap.of(example(TopicPartition.class), example(OffsetAndMetadata.class)), callback); + verify(callback).onComplete(isNull(), any(KafkaException.class)); + + when(underlying.commitAll()) + .thenReturn( + ApiFutures.immediateFuture( + ImmutableMap.of(example(Partition.class), example(Offset.class)))); + consumer.commitAsync(); + verify(underlying, times(1)).commitAll(); + + callback = mock(OffsetCommitCallback.class); + consumer.commitAsync(callback); + verify(underlying, times(2)).commitAll(); + verify(callback) + .onComplete( + ImmutableMap.of(example(TopicPartition.class), example(OffsetAndMetadata.class)), null); + + callback = mock(OffsetCommitCallback.class); + when(underlying.commitAll()).thenReturn(ApiFutures.immediateFailedFuture(new KafkaException())); + consumer.commitAsync(callback); + verify(underlying, times(3)).commitAll(); + verify(callback).onComplete(isNull(), any(KafkaException.class)); + } + + @Test + public void seek() { + consumer.assign(ImmutableList.of(example(TopicPartition.class))); + consumer.seek(example(TopicPartition.class), 1); + verify(underlying) + .doSeek( + example(Partition.class), + SeekRequest.newBuilder().setCursor(Cursor.newBuilder().setOffset(1)).build()); + consumer.seek(example(TopicPartition.class), new OffsetAndMetadata(2)); + verify(underlying) + .doSeek( + example(Partition.class), + SeekRequest.newBuilder().setCursor(Cursor.newBuilder().setOffset(2)).build()); + consumer.seekToBeginning(ImmutableList.of(example(TopicPartition.class))); + verify(underlying) + .doSeek( + example(Partition.class), + SeekRequest.newBuilder().setCursor(Cursor.newBuilder().setOffset(0)).build()); + consumer.seekToEnd(ImmutableList.of(example(TopicPartition.class))); + verify(underlying) + .doSeek( + example(Partition.class), + SeekRequest.newBuilder().setNamedTarget(NamedTarget.HEAD).build()); + } +} diff --git a/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java b/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java new file mode 100644 index 00000000..7a05fcea --- /dev/null +++ b/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java @@ -0,0 +1,214 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.example; +import static com.google.cloud.pubsublite.kafka.StatusTestHelpers.assertFutureThrowsCode; +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.api.core.SettableApiFuture; +import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.PublishMetadata; +import com.google.cloud.pubsublite.TopicName; +import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.internal.Publisher; +import com.google.cloud.pubsublite.internal.testing.FakeApiService; +import com.google.common.collect.ImmutableMap; +import io.grpc.Status; +import io.grpc.Status.Code; +import io.grpc.StatusException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.MockitoAnnotations; +import org.mockito.Spy; + +@RunWith(JUnit4.class) +public class PubsubLiteProducerTest { + abstract static class FakePublisher extends FakeApiService + implements Publisher {} + + private static final ProducerRecord RECORD = + new ProducerRecord<>( + example(TopicPath.class).toString(), "abc".getBytes(), "defg".getBytes()); + private static final Message MESSAGE = RecordTransforms.toMessage(RECORD); + private static final TopicPartition TOPIC_PARTITION = + new TopicPartition( + example(TopicPath.class).toString(), (int) example(Partition.class).value()); + + @Spy FakePublisher underlying; + + Producer producer; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + producer = new PubsubLiteProducer(underlying, 3, example(TopicPath.class)); + verify(underlying).startAsync(); + verify(underlying).awaitRunning(); + } + + @Test + public void noTransactions() { + assertThrows(UnsupportedVersionException.class, () -> producer.initTransactions()); + assertThrows(UnsupportedVersionException.class, () -> producer.beginTransaction()); + assertThrows( + UnsupportedVersionException.class, + () -> producer.sendOffsetsToTransaction(ImmutableMap.of(), "")); + assertThrows( + UnsupportedVersionException.class, + () -> producer.sendOffsetsToTransaction(ImmutableMap.of(), new ConsumerGroupMetadata(""))); + assertThrows(UnsupportedVersionException.class, () -> producer.commitTransaction()); + assertThrows(UnsupportedVersionException.class, () -> producer.abortTransaction()); + } + + @Test + public void badRecordThrows() throws StatusException { + TopicPath other = + example(TopicPath.class).toBuilder().setName(TopicName.of("not-example")).build(); + ProducerRecord badTopicRecord = + new ProducerRecord<>(other.toString(), "abc".getBytes()); + assertThrows(UnsupportedOperationException.class, () -> producer.send(badTopicRecord)); + assertThrows( + UnsupportedOperationException.class, + () -> producer.send(badTopicRecord, (metadata, ex) -> {})); + + ProducerRecord withPartitionRecord = + new ProducerRecord<>(other.toString(), 999, "abc".getBytes(), "def".getBytes()); + assertThrows(UnsupportedOperationException.class, () -> producer.send(withPartitionRecord)); + assertThrows( + UnsupportedOperationException.class, + () -> producer.send(withPartitionRecord, (metadata, ex) -> {})); + } + + @Test + public void badTopicThrows() throws StatusException { + TopicPath other = + example(TopicPath.class).toBuilder().setName(TopicName.of("not-example")).build(); + ProducerRecord record = + new ProducerRecord<>(other.toString(), "abc".getBytes()); + assertThrows(UnsupportedOperationException.class, () -> producer.send(record)); + assertThrows( + UnsupportedOperationException.class, () -> producer.send(record, (metadata, ex) -> {})); + } + + @Test + public void sendSuccess() throws Exception { + SettableApiFuture response = SettableApiFuture.create(); + when(underlying.publish(MESSAGE)).thenReturn(response); + Future future = producer.send(RECORD); + verify(underlying).publish(MESSAGE); + response.set(PublishMetadata.of(example(Partition.class), example(Offset.class))); + // RecordMetadata doesn't define a equals implementation. + RecordMetadata metadata = future.get(); + assertThat(metadata.topic()).isEqualTo(example(TopicPath.class).toString()); + assertThat(metadata.partition()).isEqualTo(example(Partition.class).value()); + assertThat(metadata.offset()).isEqualTo(example(Offset.class).value()); + assertThat(metadata.serializedKeySize()).isEqualTo(3); + assertThat(metadata.serializedValueSize()).isEqualTo(4); + } + + @Test + public void sendSuccessWithCallback() throws Exception { + SettableApiFuture response = SettableApiFuture.create(); + SettableApiFuture leaked = SettableApiFuture.create(); + when(underlying.publish(MESSAGE)).thenReturn(response); + Future future = + producer.send( + RECORD, + (metadata, ex) -> { + if (metadata != null) { + leaked.set(metadata); + } else { + leaked.setException(ex); + } + }); + verify(underlying).publish(MESSAGE); + response.set(PublishMetadata.of(example(Partition.class), example(Offset.class))); + // RecordMetadata doesn't define a equals implementation. + RecordMetadata metadata = leaked.get(); + assertThat(metadata.topic()).isEqualTo(example(TopicPath.class).toString()); + assertThat(metadata.partition()).isEqualTo(example(Partition.class).value()); + assertThat(metadata.offset()).isEqualTo(example(Offset.class).value()); + assertThat(metadata.serializedKeySize()).isEqualTo(3); + assertThat(metadata.serializedValueSize()).isEqualTo(4); + RecordMetadata resultMetadata = future.get(); + assertThat(resultMetadata.topic()).isEqualTo(example(TopicPath.class).toString()); + assertThat(resultMetadata.partition()).isEqualTo(example(Partition.class).value()); + assertThat(resultMetadata.offset()).isEqualTo(example(Offset.class).value()); + assertThat(resultMetadata.serializedKeySize()).isEqualTo(3); + assertThat(resultMetadata.serializedValueSize()).isEqualTo(4); + } + + @Test + public void sendError() { + SettableApiFuture response = SettableApiFuture.create(); + when(underlying.publish(MESSAGE)).thenReturn(response); + Future future = producer.send(RECORD); + verify(underlying).publish(MESSAGE); + response.setException(Status.FAILED_PRECONDITION.asException()); + assertFutureThrowsCode(future, Code.FAILED_PRECONDITION); + } + + @Test + public void sendErrorWithCallback() { + SettableApiFuture response = SettableApiFuture.create(); + SettableApiFuture leaked = SettableApiFuture.create(); + when(underlying.publish(MESSAGE)).thenReturn(response); + Future future = + producer.send( + RECORD, + (metadata, ex) -> { + if (metadata != null) { + leaked.set(metadata); + } else { + leaked.setException(ex); + } + }); + verify(underlying).publish(MESSAGE); + response.setException(Status.FAILED_PRECONDITION.asException()); + assertFutureThrowsCode(future, Code.FAILED_PRECONDITION); + assertFutureThrowsCode(leaked, Code.FAILED_PRECONDITION); + } + + @Test + public void flush() throws Exception { + producer.flush(); + verify(underlying).flush(); + } + + @Test + public void close() throws Exception { + producer.close(); + verify(underlying).stopAsync(); + verify(underlying).awaitTerminated(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + } +} diff --git a/src/test/java/com/google/cloud/pubsublite/kafka/RecordTransformsTest.java b/src/test/java/com/google/cloud/pubsublite/kafka/RecordTransformsTest.java new file mode 100644 index 00000000..20c2a8e9 --- /dev/null +++ b/src/test/java/com/google/cloud/pubsublite/kafka/RecordTransformsTest.java @@ -0,0 +1,93 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.example; +import static com.google.common.truth.Truth.assertThat; + +import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.SequencedMessage; +import com.google.cloud.pubsublite.TopicPath; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableListMultimap; +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.record.TimestampType; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class RecordTransformsTest { + private static final Message MESSAGE = + Message.builder() + .setKey(ByteString.copyFromUtf8("abc")) + .setData(ByteString.copyFromUtf8("def")) + .setEventTime(Timestamp.newBuilder().setSeconds(1).setNanos(1000000).build()) + .setAttributes( + ImmutableListMultimap.of( + "xxx", + ByteString.copyFromUtf8("yyy"), + "zzz", + ByteString.copyFromUtf8("zzz"), + "zzz", + ByteString.copyFromUtf8("zzz"))) + .build(); + + @Test + public void publishTransform() { + ProducerRecord record = + new ProducerRecord<>( + example(TopicPath.class).toString(), + null, + 1001L, + "abc".getBytes(), + "def".getBytes(), + ImmutableList.of( + LiteHeaders.toHeader("xxx", ByteString.copyFromUtf8("yyy")), + LiteHeaders.toHeader("zzz", ByteString.copyFromUtf8("zzz")), + LiteHeaders.toHeader("zzz", ByteString.copyFromUtf8("zzz")))); + Message message = RecordTransforms.toMessage(record); + assertThat(message).isEqualTo(MESSAGE); + } + + @Test + public void subscribeTransform() { + SequencedMessage sequencedMessage = + SequencedMessage.of( + MESSAGE, Timestamp.newBuilder().setNanos(12345).build(), example(Offset.class), 123L); + ConsumerRecord record = + RecordTransforms.fromMessage( + sequencedMessage, example(TopicPath.class), example(Partition.class)); + assertThat(record.key()).isEqualTo("abc".getBytes()); + assertThat(record.value()).isEqualTo("def".getBytes()); + assertThat(record.timestampType()).isEqualTo(TimestampType.CREATE_TIME); + assertThat(record.timestamp()).isEqualTo(1001L); + ImmutableListMultimap.Builder headers = ImmutableListMultimap.builder(); + record + .headers() + .forEach(header -> headers.put(header.key(), ByteString.copyFrom(header.value()))); + assertThat(headers.build()).isEqualTo(MESSAGE.attributes()); + assertThat(record.offset()).isEqualTo(example(Offset.class).value()); + assertThat(record.topic()).isEqualTo(example(TopicPath.class).toString()); + assertThat(record.partition()).isEqualTo(example(Partition.class).value()); + } +} diff --git a/src/test/java/com/google/cloud/pubsublite/kafka/SharedBehaviorTest.java b/src/test/java/com/google/cloud/pubsublite/kafka/SharedBehaviorTest.java new file mode 100644 index 00000000..abcb416e --- /dev/null +++ b/src/test/java/com/google/cloud/pubsublite/kafka/SharedBehaviorTest.java @@ -0,0 +1,42 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.example; +import static com.google.common.truth.Truth.assertThat; + +import com.google.cloud.pubsublite.TopicPath; +import java.util.List; +import org.apache.kafka.common.PartitionInfo; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class SharedBehaviorTest { + @Test + public void partitionsForSuccess() { + List result = SharedBehavior.partitionsFor(2, example(TopicPath.class)); + assertThat(result.size()).isEqualTo(2); + assertThat(result.get(0).topic()).isEqualTo(example(TopicPath.class).toString()); + assertThat(result.get(0).partition()).isEqualTo(0); + assertThat(result.get(0).leader()).isEqualTo(PubsubLiteNode.NODE); + assertThat(result.get(1).topic()).isEqualTo(example(TopicPath.class).toString()); + assertThat(result.get(1).partition()).isEqualTo(1); + assertThat(result.get(1).leader()).isEqualTo(PubsubLiteNode.NODE); + } +} diff --git a/src/test/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImplTest.java b/src/test/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImplTest.java new file mode 100644 index 00000000..c6a69c55 --- /dev/null +++ b/src/test/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImplTest.java @@ -0,0 +1,322 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.example; +import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth8.assertThat; +import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +import com.google.api.core.ApiFuture; +import com.google.api.core.SettableApiFuture; +import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.SequencedMessage; +import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.internal.PullSubscriber; +import com.google.cloud.pubsublite.internal.testing.FakeApiService; +import com.google.cloud.pubsublite.internal.wire.Committer; +import com.google.cloud.pubsublite.proto.Cursor; +import com.google.cloud.pubsublite.proto.SeekRequest; +import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableListMultimap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ListMultimap; +import com.google.protobuf.Timestamp; +import io.grpc.StatusException; +import java.time.Duration; +import java.util.Map; +import java.util.Optional; +import org.apache.kafka.clients.consumer.CommitFailedException; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.errors.WakeupException; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Spy; + +@RunWith(JUnit4.class) +public class SingleSubscriptionConsumerImplTest { + private static final SeekRequest DEFAULT_SEEK = + SeekRequest.newBuilder().setNamedTarget(NamedTarget.COMMITTED_CURSOR).build(); + private static final SeekRequest OFFSET_SEEK = + SeekRequest.newBuilder() + .setCursor(Cursor.newBuilder().setOffset(example(Offset.class).value())) + .build(); + + @Mock PullSubscriberFactory subscriberFactory; + @Mock CommitterFactory committerFactory; + + @Mock PullSubscriber subscriber5; + @Mock PullSubscriber subscriber8; + + abstract static class FakeCommitter extends FakeApiService implements Committer {} + + @Spy FakeCommitter committer5; + @Spy FakeCommitter committer8; + + private SingleSubscriptionConsumer consumer; + + @Before + public void setUp() throws StatusException { + initMocks(this); + consumer = + new SingleSubscriptionConsumerImpl( + example(TopicPath.class), false, subscriberFactory, committerFactory); + verifyNoInteractions(subscriberFactory, committerFactory); + when(subscriberFactory.newPullSubscriber(eq(Partition.of(5)), any())).thenReturn(subscriber5); + when(subscriberFactory.newPullSubscriber(eq(Partition.of(8)), any())).thenReturn(subscriber8); + when(committerFactory.newCommitter(Partition.of(5))).thenReturn(committer5); + when(committerFactory.newCommitter(Partition.of(8))).thenReturn(committer8); + } + + private static SequencedMessage message(long offset) { + return SequencedMessage.of( + Message.builder().build(), Timestamp.getDefaultInstance(), Offset.of(offset), 0L); + } + + private static void assertConsumerRecordsEqual( + ConsumerRecords records, ListMultimap target) + throws StatusException { + ImmutableListMultimap.Builder builder = ImmutableListMultimap.builder(); + for (ConsumerRecord record : records) { + builder.put(Partition.of(record.partition()), Offset.of(record.offset())); + } + assertThat(builder.build()).isEqualTo(target); + } + + @Test + public void assignAndPoll() throws Exception { + consumer.setAssignment(ImmutableSet.of(Partition.of(5), Partition.of(8))); + verify(subscriberFactory).newPullSubscriber(Partition.of(5), DEFAULT_SEEK); + verify(subscriberFactory).newPullSubscriber(Partition.of(8), DEFAULT_SEEK); + verify(committerFactory).newCommitter(Partition.of(5)); + verify(committerFactory).newCommitter(Partition.of(8)); + when(subscriber5.pull()).thenReturn(ImmutableList.of()); + when(subscriber8.pull()).thenReturn(ImmutableList.of()); + // ----------------------------- + // Pulls ceil(15ms / 10ms) + 1 times (3) when no messages are returned + assertConsumerRecordsEqual(consumer.poll(Duration.ofMillis(15)), ImmutableListMultimap.of()); + verify(subscriber5, times(3)).pull(); + verify(subscriber8, times(3)).pull(); + verify(committer5, times(0)).commitOffset(any()); + verify(committer8, times(0)).commitOffset(any()); + // ----------------------------- + // Pulls once when messages are available. + when(subscriber5.pull()).thenReturn(ImmutableList.of(message(1), message(2), message(3))); + when(subscriber8.pull()).thenReturn(ImmutableList.of(message(1), message(2), message(4))); + assertConsumerRecordsEqual( + consumer.poll(Duration.ofMillis(15)), + ImmutableListMultimap.builder() + .putAll(Partition.of(5), ImmutableList.of(Offset.of(1), Offset.of(2), Offset.of(3))) + .putAll(Partition.of(8), ImmutableList.of(Offset.of(1), Offset.of(2), Offset.of(4))) + .build()); + verify(subscriber5, times(4)).pull(); + verify(subscriber8, times(4)).pull(); + verify(committer5, times(0)).commitOffset(any()); + verify(committer8, times(0)).commitOffset(any()); + + // -------------------------- + // Zero duration poll pulls once. + when(subscriber5.pull()).thenReturn(ImmutableList.of()); + when(subscriber8.pull()).thenReturn(ImmutableList.of()); + assertConsumerRecordsEqual(consumer.poll(Duration.ZERO), ImmutableListMultimap.of()); + verify(subscriber5, times(5)).pull(); + verify(subscriber8, times(5)).pull(); + verify(committer5, times(0)).commitOffset(any()); + verify(committer8, times(0)).commitOffset(any()); + // -------------------------- + // commitAll sends commits + SettableApiFuture commit5 = SettableApiFuture.create(); + SettableApiFuture commit8 = SettableApiFuture.create(); + // Commits are last received + 1 + when(committer5.commitOffset(Offset.of(4))).thenReturn(commit5); + when(committer8.commitOffset(Offset.of(5))).thenReturn(commit8); + ApiFuture> committed = consumer.commitAll(); + assertThat(committed.isDone()).isFalse(); + commit5.set(null); + assertThat(committed.isDone()).isFalse(); + commit8.set(null); + assertThat(committed.get()) + .containsExactlyEntriesIn( + ImmutableMap.of(Partition.of(5), Offset.of(4), Partition.of(8), Offset.of(5))); + // Close closes. + consumer.close(Duration.ZERO); + verify(subscriber5).close(); + verify(subscriber8).close(); + verify(committer5).stopAsync(); + verify(committer5).awaitTerminated(); + verify(committer8).stopAsync(); + verify(committer8).awaitTerminated(); + } + + @Test + public void assignAndPollAutocommit() throws Exception { + consumer = + new SingleSubscriptionConsumerImpl( + example(TopicPath.class), true, subscriberFactory, committerFactory); + consumer.setAssignment(ImmutableSet.of(Partition.of(5), Partition.of(8))); + verify(subscriberFactory).newPullSubscriber(Partition.of(5), DEFAULT_SEEK); + verify(subscriberFactory).newPullSubscriber(Partition.of(8), DEFAULT_SEEK); + verify(committerFactory).newCommitter(Partition.of(5)); + verify(committerFactory).newCommitter(Partition.of(8)); + when(subscriber5.pull()).thenReturn(ImmutableList.of()); + when(subscriber8.pull()).thenReturn(ImmutableList.of()); + // ----------------------------- + // Pulls ceil(15ms / 10ms) + 1 times (3) when no messages are returned + assertConsumerRecordsEqual(consumer.poll(Duration.ofMillis(15)), ImmutableListMultimap.of()); + verify(subscriber5, times(3)).pull(); + verify(subscriber8, times(3)).pull(); + verify(committer5, times(0)).commitOffset(any()); + verify(committer8, times(0)).commitOffset(any()); + // ----------------------------- + // Pulls once when messages are available. + when(subscriber5.pull()).thenReturn(ImmutableList.of(message(1), message(2), message(3))); + when(subscriber8.pull()).thenReturn(ImmutableList.of(message(1), message(2), message(4))); + assertConsumerRecordsEqual( + consumer.poll(Duration.ofMillis(15)), + ImmutableListMultimap.builder() + .putAll(Partition.of(5), ImmutableList.of(Offset.of(1), Offset.of(2), Offset.of(3))) + .putAll(Partition.of(8), ImmutableList.of(Offset.of(1), Offset.of(2), Offset.of(4))) + .build()); + verify(subscriber5, times(4)).pull(); + verify(subscriber8, times(4)).pull(); + verify(committer5, times(0)).commitOffset(any()); + verify(committer8, times(0)).commitOffset(any()); + + // -------------------------- + // Zero duration poll pulls once, commits previous offsets. + SettableApiFuture commit5 = SettableApiFuture.create(); + SettableApiFuture commit8 = SettableApiFuture.create(); + // Commits are last received + 1 + when(committer5.commitOffset(Offset.of(4))).thenReturn(commit5); + when(committer8.commitOffset(Offset.of(5))).thenReturn(commit8); + + when(subscriber5.pull()).thenReturn(ImmutableList.of()); + when(subscriber8.pull()).thenReturn(ImmutableList.of()); + assertConsumerRecordsEqual(consumer.poll(Duration.ZERO), ImmutableListMultimap.of()); + verify(subscriber5, times(5)).pull(); + verify(subscriber8, times(5)).pull(); + verify(committer5).commitOffset(Offset.of(4)); + verify(committer8).commitOffset(Offset.of(5)); + commit5.set(null); + commit8.set(null); + // Close closes. + consumer.close(Duration.ZERO); + verify(subscriber5).close(); + verify(subscriber8).close(); + verify(committer5).stopAsync(); + verify(committer5).awaitTerminated(); + verify(committer8).stopAsync(); + verify(committer8).awaitTerminated(); + } + + @Test + public void wakeupBeforePoll() throws Exception { + consumer.setAssignment(ImmutableSet.of(Partition.of(5))); + when(subscriber5.pull()).thenReturn(ImmutableList.of()); + consumer.wakeup(); + assertThrows(WakeupException.class, () -> consumer.poll(Duration.ofMillis(15))); + } + + @Test + public void wakeupDuringPoll() throws Exception { + consumer.setAssignment(ImmutableSet.of(Partition.of(5))); + when(subscriber5.pull()) + .thenAnswer( + args -> { + consumer.wakeup(); + return ImmutableList.of(); + }); + assertThrows(WakeupException.class, () -> consumer.poll(Duration.ofDays(1))); + } + + @Test + public void assignmentChange() throws Exception { + consumer.setAssignment(ImmutableSet.of(Partition.of(5))); + assertThat(consumer.assignment()).isEqualTo(ImmutableSet.of(Partition.of(5))); + verify(subscriberFactory).newPullSubscriber(Partition.of(5), DEFAULT_SEEK); + verify(committerFactory).newCommitter(Partition.of(5)); + verify(committer5).startAsync(); + consumer.setAssignment(ImmutableSet.of(Partition.of(8))); + assertThat(consumer.assignment()).isEqualTo(ImmutableSet.of(Partition.of(8))); + verify(subscriberFactory).newPullSubscriber(Partition.of(8), DEFAULT_SEEK); + verify(committerFactory).newCommitter(Partition.of(8)); + verify(committer8).startAsync(); + verify(subscriber5).close(); + verify(committer5).stopAsync(); + } + + @Test + public void commitNotAssigned() throws Exception { + consumer.setAssignment(ImmutableSet.of(Partition.of(5))); + assertThrows( + CommitFailedException.class, + () -> consumer.commit(ImmutableMap.of(Partition.of(8), Offset.of(1)))); + } + + @Test + public void commitAssigned() throws Exception { + consumer.setAssignment(ImmutableSet.of(Partition.of(5))); + SettableApiFuture commit5 = SettableApiFuture.create(); + when(committer5.commitOffset(Offset.of(1))).thenReturn(commit5); + ApiFuture commitFuture = consumer.commit(ImmutableMap.of(Partition.of(5), Offset.of(1))); + assertThat(commitFuture.isDone()).isFalse(); + commit5.set(null); + assertThat(commitFuture.isDone()).isTrue(); + } + + @Test + public void seekNotAssigned() throws Exception { + consumer.setAssignment(ImmutableSet.of(Partition.of(5))); + assertThrows(IllegalStateException.class, () -> consumer.doSeek(Partition.of(8), OFFSET_SEEK)); + } + + @Test + public void seekAssigned() throws Exception { + consumer.setAssignment(ImmutableSet.of(Partition.of(5))); + verify(subscriberFactory).newPullSubscriber(Partition.of(5), DEFAULT_SEEK); + verify(committerFactory).newCommitter(Partition.of(5)); + when(subscriberFactory.newPullSubscriber(Partition.of(5), OFFSET_SEEK)).thenReturn(subscriber8); + consumer.doSeek(Partition.of(5), OFFSET_SEEK); + verify(subscriber5).close(); + verify(subscriberFactory).newPullSubscriber(Partition.of(5), OFFSET_SEEK); + } + + @Test + public void position() throws Exception { + consumer.setAssignment(ImmutableSet.of(Partition.of(5))); + assertThat(consumer.position(Partition.of(8))).isEmpty(); + when(subscriber5.nextOffset()).thenReturn(Optional.empty()); + assertThat(consumer.position(Partition.of(5))).isEmpty(); + when(subscriber5.nextOffset()).thenReturn(Optional.of(example(Offset.class))); + assertThat(consumer.position(Partition.of(5))).hasValue(example(Offset.class).value()); + } +} diff --git a/src/test/java/com/google/cloud/pubsublite/kafka/StatusTestHelpers.java b/src/test/java/com/google/cloud/pubsublite/kafka/StatusTestHelpers.java new file mode 100755 index 00000000..34f1b4aa --- /dev/null +++ b/src/test/java/com/google/cloud/pubsublite/kafka/StatusTestHelpers.java @@ -0,0 +1,39 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth8.assertThat; +import static org.junit.Assert.assertThrows; + +import com.google.cloud.pubsublite.internal.ExtractStatus; +import io.grpc.Status; +import io.grpc.Status.Code; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +public class StatusTestHelpers { + private StatusTestHelpers() {} + + public static void assertFutureThrowsCode(Future f, Code code) { + ExecutionException exception = assertThrows(ExecutionException.class, f::get); + Optional statusOr = ExtractStatus.extract(exception.getCause()); + assertThat(statusOr).isPresent(); + assertThat(statusOr.get().getCode()).isEqualTo(code); + } +}