Skip to content

Commit

Permalink
feat: Implement interfaces and utilities needed for Pub/Sub Lite Kafk…
Browse files Browse the repository at this point in the history
…a shim (#276)

* fix: Implement rethrowAsRuntime and clean up ExtractStatus

* feat: Implement interfaces and utilities needed for Pub/Sub Lite Kafka shim

* fix: Formatting changes

* deps: Fix dependencies

* chore: Update exception type for already exists

* chore: Run linter
  • Loading branch information
dpcollins-google committed Oct 5, 2020
1 parent 5c0e7cc commit 3c43ef3
Show file tree
Hide file tree
Showing 12 changed files with 656 additions and 6 deletions.
Expand Up @@ -23,17 +23,21 @@
import io.grpc.StatusRuntimeException;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;

public final class ExtractStatus {
public static Optional<Status> extract(Throwable t) {
if (t instanceof StatusException) {
return Optional.of(((StatusException) t).getStatus());
try {
throw t;
} catch (StatusException e) {
return Optional.of(e.getStatus());
} catch (StatusRuntimeException e) {
return Optional.of(e.getStatus());
} catch (Throwable e) {
return Optional.empty();
}
if (t instanceof StatusRuntimeException) {
return Optional.of(((StatusRuntimeException) t).getStatus());
}
return Optional.empty();
}

public static StatusException toCanonical(Throwable t) {
Expand All @@ -56,5 +60,47 @@ public static void addFailureHandler(ApiFuture<?> future, Consumer<StatusExcepti
MoreExecutors.directExecutor());
}

public interface StatusFunction<I, O> {
O apply(I input) throws StatusException;
}

public interface StatusConsumer<I> {
void apply(I input) throws StatusException;
}

public interface StatusBiconsumer<K, V> {
void apply(K key, V value) throws StatusException;
}

public static <I, O> Function<I, O> rethrowAsRuntime(StatusFunction<I, O> function) {
return i -> {
try {
return function.apply(i);
} catch (StatusException e) {
throw e.getStatus().asRuntimeException();
}
};
}

public static <I> Consumer<I> rethrowAsRuntime(StatusConsumer<I> consumer) {
return i -> {
try {
consumer.apply(i);
} catch (StatusException e) {
throw e.getStatus().asRuntimeException();
}
};
}

public static <K, V> BiConsumer<K, V> rethrowAsRuntime(StatusBiconsumer<K, V> consumer) {
return (k, v) -> {
try {
consumer.apply(k, v);
} catch (StatusException e) {
throw e.getStatus().asRuntimeException();
}
};
}

private ExtractStatus() {}
}
53 changes: 53 additions & 0 deletions pubsublite-kafka-shim/pom.xml
Expand Up @@ -14,6 +14,59 @@
<name>Pub/Sub Lite Kafka Shim</name>
<url>https://github.com/googleapis/java-pubsublite</url>
<description>Kafka Producer and Consumer for Google Cloud Pub/Sub Lite</description>
<dependencies>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-pubsublite-v1</artifactId>
<version>0.4.2-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsublite:current} -->
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite</artifactId>
<version>0.4.2-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsublite:current} -->
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-api</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>
<dependency>
<groupId>com.google.api</groupId>
<artifactId>api-common</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<!--test dependencies-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.truth</groupId>
<artifactId>truth</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.truth.extensions</groupId>
<artifactId>truth-java8-extension</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
Expand Down
@@ -0,0 +1,26 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.kafka;

import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.internal.wire.Committer;
import io.grpc.StatusException;

/** A factory for making new PullSubscribers for a given partition of a subscription. */
interface CommitterFactory {
Committer newCommitter(Partition partition) throws StatusException;
}
@@ -0,0 +1,21 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.kafka;

interface ConsumerFactory {
SingleSubscriptionConsumer newConsumer();
}
@@ -0,0 +1,91 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.kafka;

import com.google.cloud.pubsublite.internal.ExtractStatus;
import io.grpc.StatusException;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidRequestException;

class KafkaExceptionUtils {
private KafkaExceptionUtils() {}

static KafkaException toKafkaException(StatusException source) {
switch (source.getStatus().getCode()) {
case OK:
throw source.getStatus().asRuntimeException();
case ABORTED:
return new BrokerNotAvailableException("Aborted.", source);
case ALREADY_EXISTS:
return new KafkaException("Already exists.", source);
case CANCELLED:
return new BrokerNotAvailableException("Cancelled.", source);
case DATA_LOSS:
return new KafkaException("Data loss.", source);
case DEADLINE_EXCEEDED:
return new BrokerNotAvailableException("Deadline exceeded.", source);
case FAILED_PRECONDITION:
return new InvalidRequestException("Failed precondition.", source);
case INTERNAL:
return new BrokerNotAvailableException("Internal.", source);
case INVALID_ARGUMENT:
return new InvalidRequestException("Invalid argument.", source);
case NOT_FOUND:
return new KafkaException("Not found.", source);
case OUT_OF_RANGE:
return new KafkaException("Out of range.", source);
case PERMISSION_DENIED:
return new AuthorizationException("Permission denied.", source);
case RESOURCE_EXHAUSTED:
return new KafkaException("Resource exhausted.", source);
case UNAUTHENTICATED:
return new AuthenticationException("Unauthenticated.", source);
case UNAVAILABLE:
return new BrokerNotAvailableException("Unavailable.", source);
case UNIMPLEMENTED:
return new KafkaException("Unimplemented.", source);
case UNKNOWN:
return new KafkaException("Unknown.", source);
}
return new KafkaException("No case.", source);
}

/**
* Transform an exception into a kind that is likely to be thrown through kafka interfaces.
*
* @param t A throwable to transform.
* @return The transformed exception suitable for re-throwing.
*/
static RuntimeException toKafka(Throwable t) {
try {
throw t;
} catch (KafkaException | UnsupportedOperationException | IllegalStateException e) {
return e;
} catch (InterruptedException e) {
return new InterruptException(e);
} catch (TimeoutException e) {
return new org.apache.kafka.common.errors.TimeoutException(e);
} catch (Throwable e) {
return KafkaExceptionUtils.toKafkaException(ExtractStatus.toCanonical(t));
}
}
}
@@ -0,0 +1,98 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.kafka;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.protobuf.ByteString;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

class LiteHeaders implements Headers {
private ImmutableListMultimap<String, ByteString> attributes;

LiteHeaders(ImmutableListMultimap<String, ByteString> attributes) {
this.attributes = attributes;
}

static Header toHeader(String key, ByteString value) {
return new Header() {
@Override
public String key() {
return key;
}

@Override
public byte[] value() {
return value.toByteArray();
}
};
}

private static List<Header> toHeaders(String key, Collection<ByteString> values) {
ImmutableList.Builder<Header> headersBuilder = ImmutableList.builder();
values.forEach(value -> headersBuilder.add(toHeader(key, value)));
return headersBuilder.build();
}

@Override
public Headers add(Header header) throws IllegalStateException {
throw new IllegalStateException();
}

@Override
public Headers add(String s, byte[] bytes) throws IllegalStateException {
throw new IllegalStateException();
}

@Override
public Headers remove(String s) throws IllegalStateException {
throw new IllegalStateException();
}

@Override
public Header lastHeader(String s) {
return Iterables.getLast(this);
}

@Override
public Iterable<Header> headers(String s) {
if (attributes.containsKey(s))
return Iterables.transform(attributes.get(s), value -> toHeader(s, value));
return ImmutableList.of();
}

@Override
public Header[] toArray() {
ImmutableList.Builder<Header> arrayBuilder = ImmutableList.builder();
attributes
.entries()
.forEach(entry -> arrayBuilder.add(toHeader(entry.getKey(), entry.getValue())));
return (Header[]) arrayBuilder.build().toArray();
}

@Override
public Iterator<Header> iterator() {
return Iterators.transform(
attributes.entries().iterator(), entry -> toHeader(entry.getKey(), entry.getValue()));
}
}

0 comments on commit 3c43ef3

Please sign in to comment.