diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslDataSourceOptions.java b/src/main/java/com/google/cloud/pubsublite/spark/PslDataSourceOptions.java index ca02995d..380e022a 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PslDataSourceOptions.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslDataSourceOptions.java @@ -18,6 +18,7 @@ import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings; +import com.google.api.gax.rpc.ApiException; import com.google.auto.value.AutoValue; import com.google.cloud.pubsublite.AdminClient; import com.google.cloud.pubsublite.AdminClientSettings; @@ -79,9 +80,16 @@ public static PslDataSourceOptions fromSparkDataSourceOptions(DataSourceOptions options .get(Constants.MAX_MESSAGE_PER_BATCH_CONFIG_KEY) .ifPresent(mmpb -> builder.setMaxMessagesPerBatch(Long.parseLong(mmpb))); + String subscriptionPathVal = options.get(Constants.SUBSCRIPTION_CONFIG_KEY).get(); + SubscriptionPath subscriptionPath; + try { + subscriptionPath = SubscriptionPath.parse(subscriptionPathVal); + } catch (ApiException e) { + throw new IllegalArgumentException( + "Unable to parse subscription path " + subscriptionPathVal); + } return builder - .setSubscriptionPath( - SubscriptionPath.parse(options.get(Constants.SUBSCRIPTION_CONFIG_KEY).get())) + .setSubscriptionPath(subscriptionPath) .setFlowControlSettings( FlowControlSettings.builder() .setMessagesOutstanding( diff --git a/src/test/java/com/google/cloud/pubsublite/spark/PslDataSourceOptionsTest.java b/src/test/java/com/google/cloud/pubsublite/spark/PslDataSourceOptionsTest.java new file mode 100644 index 00000000..bc794ead --- /dev/null +++ b/src/test/java/com/google/cloud/pubsublite/spark/PslDataSourceOptionsTest.java @@ -0,0 +1,35 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.spark; + +import static org.junit.Assert.assertThrows; + +import com.google.common.collect.ImmutableMap; +import org.apache.spark.sql.sources.v2.DataSourceOptions; +import org.junit.Test; + +public class PslDataSourceOptionsTest { + + @Test + public void testInvalidSubPath() { + DataSourceOptions options = + new DataSourceOptions(ImmutableMap.of(Constants.SUBSCRIPTION_CONFIG_KEY, "invalid/path")); + assertThrows( + IllegalArgumentException.class, + () -> PslDataSourceOptions.fromSparkDataSourceOptions(options)); + } +}