diff --git a/README.md b/README.md index 03a5d963..2bb6d1bb 100644 --- a/README.md +++ b/README.md @@ -19,8 +19,9 @@ ```java import com.google.cloud.pubsublite.kafka.ProducerSettings; - import org.apache.kafka.clients.producer.*; import com.google.cloud.pubsublite.*; + + import org.apache.kafka.clients.producer.*; ... @@ -51,17 +52,22 @@ 1. Read some messages using: ```java + import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; import com.google.cloud.pubsublite.kafka.ConsumerSettings; - import org.apache.kafka.clients.consumer.*; import com.google.cloud.pubsublite.*; - import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; - + + import org.apache.kafka.clients.consumer.*; ... private final static String ZONE = "us-central1-b"; private final static Long PROJECT_NUM = 123L; ... + TopicPath topic = TopicPath.newBuilder() + .setLocation(CloudZone.parse(ZONE)) + .setProject(ProjectNumber.of(PROJECT_NUM)) + .setName(TopicName.of("my-topic")) + .build(); SubscriptionPath subscription = SubscriptionPath.newBuilder() .setLocation(CloudZone.parse(ZONE)) @@ -75,11 +81,13 @@ .setBytesOutstanding(10_000_000) // 10 MB .setMessagesOutstanding(Long.MAX_VALUE) .build()) - .setAutocommit(true); + .setAutocommit(true) + .build(); try (Consumer consumer = settings.instantiate()) { + consumer.subscribe(Arrays.asList(topic.toString())); while (true) { - ConsumerRecords records = consumer.poll(Long.MAX_VALUE); + ConsumerRecords records = consumer.poll(Duration.ofSeconds(30)); for (ConsumerRecord record : records) { System.out.println(record.offset() + “: ” + record.value()); }