Skip to content

googleapis/java-pubsublite-kafka

Google Pub/Sub Lite Kafka Shim Client for Java

Java idiomatic client for Pub/Sub Lite Kafka Shim.

Maven Stability

Quickstart

If you are using Maven, add this to your pom.xml file:

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>pubsublite-kafka</artifactId>
  <version>1.1.2</version>
</dependency>

If you are using Gradle without BOM, add this to your dependencies:

implementation 'com.google.cloud:pubsublite-kafka:1.1.2'

If you are using SBT, add this to your dependencies:

libraryDependencies += "com.google.cloud" % "pubsublite-kafka" % "1.1.2"

Authentication

See the Authentication section in the base directory's README.

Authorization

The client application making API calls must be granted authorization scopes required for the desired Pub/Sub Lite Kafka Shim APIs, and the authenticated principal must have the IAM role(s) required to access GCP resources using the Pub/Sub Lite Kafka Shim API calls.

Getting Started

Prerequisites

You will need a Google Cloud Platform Console project with the Pub/Sub Lite Kafka Shim API enabled. You will need to enable billing to use Google Pub/Sub Lite Kafka Shim. Follow these instructions to get your project set up. You will also need to set up the local development environment by installing the Google Cloud SDK and running the following commands in command line: gcloud auth login and gcloud config set project [YOUR PROJECT ID].

Installation and setup

You'll need to obtain the pubsublite-kafka library. See the Quickstart section to add pubsublite-kafka as a dependency in your code.

About Pub/Sub Lite Kafka Shim

Because Google Cloud Pub/Sub Lite provides partitioned zonal data storage with predefined capacity, a large portion of the Kafka Producer/Consumer API can be implemented using Pub/Sub Lite as a backend. The key differences are:

  • Pub/Sub Lite does not support transactions. All transaction methods on Producer<byte[], byte[]> will raise an exception.
  • Producers operate on a single topic, and Consumers on a single subscription.
  • ProducerRecord may not specify partition explicitly.
  • Consumers may not dynamically create consumer groups (subscriptions).

Note:

  • In order to use Pub/Sub Lite seek operations, Consumers must have auto-commit enabled. Consumer seek methods are client-initiated, whereas Pub/Sub Lite seek operations are initiated out-of-band and pushed to Consumers. Both types of seeks should not be used concurrently, as they would interfere with one another.

Publishing messages

With Pub/Sub Lite, you can use a Producer<byte[], byte[]> to publish messages:

import com.google.cloud.pubsublite.kafka.ProducerSettings;
import org.apache.kafka.clients.producer.*;
import com.google.cloud.pubsublite.*;

...

private final static String ZONE = "us-central1-b";
private final static Long PROJECT_NUM = 123L;

...

TopicPath topic = TopicPath.newBuilder()
    .setLocation(CloudZone.parse(ZONE))
    .setProject(ProjectNumber.of(PROJECT_NUM))
    .setName(TopicName.of("my-topic")).build();

ProducerSettings settings = ProducerSettings.newBuilder()
    .setTopicPath(topic)
    .build();

try (Producer<byte[], byte[]> producer = settings.instantiate()) {
    Future<RecordMetadata> sent = producer.send(new ProducerRecord(
        topic.toString(),  // Required to be the same topic.
        "key".getBytes(),
        "value".getBytes()
    ));
    RecordMetadata meta = sent.get();
}

Receiving messages

With Pub/Sub Lite you can receive messages using a Consumer<byte[], byte[]>:

import com.google.cloud.pubsublite.kafka.ConsumerSettings;
import org.apache.kafka.clients.consumer.*;
import com.google.cloud.pubsublite.*;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;

...

private final static String ZONE = "us-central1-b";
private final static Long PROJECT_NUM = 123L;

...

SubscriptionPath subscription = SubscriptionPath.newBuilder()
    .setLocation(CloudZone.parse(ZONE))
    .setProject(ProjectNumber.of(PROJECT_NUM))
    .setName(SubscriptionName.of("my-sub"))
    .build();

ConsumerSettings settings = ConsumerSettings.newBuilder()
    .setSubscriptionPath(subscription)
    .setPerPartitionFlowControlSettings(FlowControlSettings.builder()
        .setBytesOutstanding(10_000_000)  // 10 MB
        .setMessagesOutstanding(Long.MAX_VALUE)
        .build())
    .setAutocommit(true);

try (Consumer<byte[], byte[]> consumer = settings.instantiate()) {
   while (true) {
     ConsumerRecords<byte[], byte[]> records = consumer.poll(Long.MAX_VALUE);
     for (ConsumerRecord<byte[], byte[]> record : records) {
       System.out.println(record.offset() + “: ” + record.value());
     }
   }
} catch (WakeupException e) {
   // ignored
}

Samples

Samples are in the samples/ directory.

Sample Source Code Try it
Consumer Example source code Open in Cloud Shell
Kafka Producer Example source code Open in Cloud Shell
Producer Example source code Open in Cloud Shell

Troubleshooting

To get help, follow the instructions in the shared Troubleshooting document.

Transport

Pub/Sub Lite Kafka Shim uses gRPC for the transport layer.

Supported Java Versions

Java 8 or above is required for using this client.

Google's Java client libraries, Google Cloud Client Libraries and Google Cloud API Libraries, follow the Oracle Java SE support roadmap (see the Oracle Java SE Product Releases section).

For new development

In general, new feature development occurs with support for the lowest Java LTS version covered by Oracle's Premier Support (which typically lasts 5 years from initial General Availability). If the minimum required JVM for a given library is changed, it is accompanied by a semver major release.

Java 11 and (in September 2021) Java 17 are the best choices for new development.

Keeping production systems current

Google tests its client libraries with all current LTS versions covered by Oracle's Extended Support (which typically lasts 8 years from initial General Availability).

Legacy support

Google's client libraries support legacy versions of Java runtimes with long term stable libraries that don't receive feature updates on a best efforts basis as it may not be possible to backport all patches.

Google provides updates on a best efforts basis to apps that continue to use Java 7, though apps might need to upgrade to current versions of the library that supports their JVM.

Where to find specific information

The latest versions and the supported Java versions are identified on the individual GitHub repository github.com/GoogleAPIs/java-SERVICENAME and on google-cloud-java.

Versioning

This library follows Semantic Versioning.

Contributing

Contributions to this library are always welcome and highly encouraged.

See CONTRIBUTING for more information how to get started.

Please note that this project is released with a Contributor Code of Conduct. By participating in this project you agree to abide by its terms. See Code of Conduct for more information.

License

Apache 2.0 - See LICENSE for more information.

CI Status

Java Version Status
Java 8 Kokoro CI
Java 8 OSX Kokoro CI
Java 8 Windows Kokoro CI
Java 11 Kokoro CI

Java is a registered trademark of Oracle and/or its affiliates.