Skip to content

Commit

Permalink
docs: update readme samples (#155)
Browse files Browse the repository at this point in the history
  • Loading branch information
anguillanneuf committed Jul 10, 2020
1 parent d07e37e commit 28930f8
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 199 deletions.
200 changes: 102 additions & 98 deletions .readme-partials.yaml
Expand Up @@ -13,16 +13,18 @@ custom_content: |
Then, to create the topic, use the following code:
```java
CloudRegion cloudRegion = CloudRegion.of(CLOUD_REGION);
CloudZone zone = CloudZone.of(cloudRegion, ZONE_ID);
ProjectNumber projectNum = ProjectNumber.of(PROJECT_NUMBER);
TopicName topicName = TopicName.of(TOPIC_NAME);
// TODO(developer): Replace these variables with your own.
long projectNumber = 123L;
String cloudRegion = "us-central1";
char zoneId = 'b';
String topicId = "your-topic-id";
Integer partitions = 1;
TopicPath topicPath =
TopicPaths.newBuilder()
.setZone(zone)
.setProjectNumber(projectNum)
.setTopicName(topicName)
.setProjectNumber(ProjectNumber.of(projectNumber))
.setZone(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
.setTopicName(TopicName.of(topicId))
.build();
Topic topic =
Expand All @@ -33,7 +35,7 @@ custom_content: |
// throughput of 4 MiB per sec. This must be in the range [1,4]. A
// topic with `scale` of 2 and count of 10 is charged for 20 partitions.
.setScale(1)
.setCount(PARTITIONS))
.setCount(partitions))
.setRetentionConfig(
RetentionConfig.newBuilder()
// How long messages are retained.
Expand All @@ -47,12 +49,10 @@ custom_content: |
.build();
AdminClientSettings adminClientSettings =
AdminClientSettings.newBuilder().setRegion(cloudRegion).build();
AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();
try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
Topic response = adminClient.createTopic(topic).get();
System.out.println(response.getAllFields() + "created successfully.");
}
```
Expand All @@ -72,56 +72,56 @@ custom_content: |
Then, to publish messages asynchronously, use the following code:
```java
public class PublisherExample {
private static final int MESSAGE_COUNT = 10;
// Load the project number from a commandline flag.
private static final long PROJECT_NUMBER = 123L;
// Load the zone from a commandline flag.
private static final String ZONE = "us-central1-b";
// Load the topic name from a commandline flag.
private static final String TOPIC_NAME = "my-new-topic";
public static List<ApiFuture<String>> runPublisher(Publisher publisher) throws Exception {
List<ApiFuture<String>> futures = new ArrayList<>();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "message-" + i;
// Convert the message to a byte string.
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Schedule a message to be published. Messages are automatically batched.
ApiFuture<String> future = publisher.publish(pubsubMessage);
futures.add(future);
}
return futures;
// TODO(developer): Replace these variables before running the sample.
long projectNumber = 123L;
String cloudRegion = "us-central1";
char zoneId = 'b';
// Choose an existing topic.
String topicId = "your-topic-id";
int messageCount = 100;
TopicPath topicPath =
TopicPaths.newBuilder()
.setProjectNumber(ProjectNumber.of(projectNumber))
.setZone(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
.setTopicName(TopicName.of(topicId))
.build();
Publisher publisher = null;
List<ApiFuture<String>> futures = new ArrayList<>();
try {
PublisherSettings publisherSettings =
PublisherSettings.newBuilder().setTopicPath(topicPath).build();
publisher = Publisher.create(publisherSettings);
// Start the publisher. Upon successful starting, its state will become RUNNING.
publisher.startAsync().awaitRunning();
for (int i = 0; i < messageCount; i++) {
String message = "message-" + i;
// Convert the message to a byte string.
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Publish a message. Messages are automatically batched.
ApiFuture<String> future = publisher.publish(pubsubMessage);
futures.add(future);
}
} finally {
ArrayList<PublishMetadata> metadata = new ArrayList<>();
List<String> ackIds = ApiFutures.allAsList(futures).get();
for (String id : ackIds) {
// Decoded metadata contains partition and offset.
metadata.add(PublishMetadata.decode(id));
}
System.out.println(metadata + "\nPublished " + ackIds.size() + " messages.");
// Publish messages to a topic.
public static void main(String[] args) throws Exception {
PublisherSettings settings =
PublisherSettings.newBuilder()
.setTopicPath(
TopicPaths.newBuilder()
.setProjectNumber(ProjectNumber.of(PROJECT_NUMBER))
.setZone(CloudZone.parse(ZONE))
.setTopicName(TopicName.of(TOPIC_NAME))
.build())
.build();
Publisher publisher = Publisher.create(settings);
publisher.startAsync().awaitRunning();
List<ApiFuture<String>> futureAckIds = runPublisher(publisher);
if (publisher != null) {
// Shut down the publisher.
publisher.stopAsync().awaitTerminated();
List<String> ackIds = ApiFutures.allAsList(futureAckIds).get();
ArrayList<PublishMetadata> metadata = new ArrayList<>();
for (String id : ackIds) {
metadata.add(PublishMetadata.decode(id));
}
for (PublishMetadata one : metadata) {
System.out.println(one);
}
System.out.println("Publisher is shut down.");
}
}
```
Expand All @@ -140,24 +140,26 @@ custom_content: |
Then, to create the subscription, use the following code:
```java
CloudRegion cloudRegion = CloudRegion.of(CLOUD_REGION);
CloudZone zone = CloudZone.of(cloudRegion, ZONE_ID);
ProjectNumber projectNum = ProjectNumber.of(PROJECT_NUMBER);
TopicName topicName = TopicName.of(TOPIC_NAME);
SubscriptionName subscriptionName = SubscriptionName.of(SUBSCRIPTION_NAME);
// TODO(developer): Replace these variables with your own.
long projectNumber = 123L;
String cloudRegion = "us-central1";
char zoneId = 'b';
// Choose an existing topic.
String topicId = "your-topic-id";
String subscriptionId = "your-subscription-id";
TopicPath topicPath =
TopicPaths.newBuilder()
.setZone(zone)
.setProjectNumber(projectNum)
.setTopicName(topicName)
.setProjectNumber(ProjectNumber.of(projectNumber))
.setZone(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
.setTopicName(TopicName.of(topicId))
.build();
SubscriptionPath subscriptionPath =
SubscriptionPaths.newBuilder()
.setZone(zone)
.setProjectNumber(projectNum)
.setSubscriptionName(subscriptionName)
.setZone(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
.setProjectNumber(ProjectNumber.of(projectNumber))
.setSubscriptionName(SubscriptionName.of(subscriptionId))
.build();
Subscription subscription =
Expand All @@ -175,12 +177,10 @@ custom_content: |
.build();
AdminClientSettings adminClientSettings =
AdminClientSettings.newBuilder().setRegion(cloudRegion).build();
AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();
try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
Subscription response = adminClient.createSubscription(subscription).get();
System.out.println(response.getAllFields() + "created successfully.");
}
```
Expand All @@ -202,66 +202,70 @@ custom_content: |
Then, to pull messages asynchronously, use the following code:
```java
CloudRegion cloudRegion = CloudRegion.of(CLOUD_REGION);
CloudZone zone = CloudZone.of(cloudRegion, ZONE_ID);
ProjectNumber projectNum = ProjectNumber.of(PROJECT_NUMBER);
SubscriptionName subscriptionName = SubscriptionName.of(SUBSCRIPTION_NAME);
// TODO(developer): Replace these variables with your own.
long projectNumber = 123L;
String cloudRegion = "us-central1";
char zoneId = 'b';
// Choose an existing topic.
String topicId = "your-topic-id";
// Choose an existing subscription.
String subscriptionId = "your-subscription-id";
List<Integer> partitionNumbers = ImmutableList.of(0);
SubscriptionPath subscriptionPath =
SubscriptionPaths.newBuilder()
.setZone(zone)
.setProjectNumber(projectNum)
.setSubscriptionName(subscriptionName)
.setZone(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
.setProjectNumber(ProjectNumber.of(projectNumber))
.setSubscriptionName(SubscriptionName.of(subscriptionId))
.build();
// The message stream is paused based on the maximum size or number of messages that the
// subscriber has already received, whichever condition is met first.
FlowControlSettings flowControlSettings =
FlowControlSettings.builder()
// Set outstanding bytes to 10 MiB per partition.
// 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
.setBytesOutstanding(10 * 1024 * 1024L)
.setMessagesOutstanding(Long.MAX_VALUE)
// 1,000 outstanding messages. Must be >0.
.setMessagesOutstanding(1000L)
.build();
List<Partition> partitions = new ArrayList<>();
for (Integer num : PARTITION_NOS) {
for (Integer num : partitionNumbers) {
partitions.add(Partition.of(num));
}
MessageReceiver receiver =
new MessageReceiver() {
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
System.out.println("Id : " + message.getMessageId());
System.out.println("Data : " + message.getData().toStringUtf8());
consumer.ack();
}
(PubsubMessage message, AckReplyConsumer consumer) -> {
System.out.println("Id : " + message.getMessageId());
System.out.println("Data : " + message.getData().toStringUtf8());
consumer.ack();
};
SubscriberSettings subscriberSettings =
SubscriberSettings.newBuilder()
.setSubscriptionPath(subscriptionPath)
.setPerPartitionFlowControlSettings(flowControlSettings)
.setPartitions(partitions)
.setReceiver(receiver)
// Flow control settings are set at the partition level.
.setPerPartitionFlowControlSettings(flowControlSettings)
.build();
Subscriber subscriber = Subscriber.create(subscriberSettings);
// Start the subscriber. Upon successful starting, its state will become RUNNING.
subscriber.startAsync().awaitRunning();
System.out.println("Listening to messages on " + subscriptionPath.value() + " ...");
System.out.println("Listening to messages on " + subscriptionPath.value() + "...");
try {
System.out.println(subscriber.state());
// Wait 30 seconds for the subscriber to reach TERMINATED state. If it encounters
// unrecoverable errors before then, its state will change to FAILED and
// an IllegalStateException will be thrown.
// unrecoverable errors before then, its state will change to FAILED and an
// IllegalStateException will be thrown.
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException t) {
// Shut down the subscriber. This will change the state of the
// subscriber to TERMINATED.
subscriber.stopAsync();
System.out.println(subscriber.state());
// Shut down the subscriber. This will change the state of the subscriber to TERMINATED.
subscriber.stopAsync().awaitTerminated();
System.out.println("Subscriber is shut down: " + subscriber.state());
}
```
about: |
Expand Down

0 comments on commit 28930f8

Please sign in to comment.