diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index 95696a4574..5d39c04d08 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -18,6 +18,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.SettableApiFuture; import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.cloud.bigquery.storage.v1beta2.AppendRowsRequest.ProtoData; import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.DoneCallback; @@ -160,6 +161,10 @@ private StreamWriterV2(Builder builder) throws IOException { .setCredentialsProvider(builder.credentialsProvider) .setTransportChannelProvider(builder.channelProvider) .setEndpoint(builder.endpoint) + // (b/185842996): Temporily fix this by explicitly providing the header. + .setHeaderProvider( + FixedHeaderProvider.create( + "x-goog-request-params", "write_stream=" + this.streamName)) .build(); this.client = BigQueryWriteClient.create(stubSettings); this.ownsBigQueryWriteClient = true; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java index 8d60dd2e96..cfb5570b73 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java @@ -49,6 +49,7 @@ public class ITBigQueryWriteManualClientTest { private static final Logger LOG = Logger.getLogger(ITBigQueryWriteManualClientTest.class.getName()); private static final String DATASET = RemoteBigQueryHelper.generateDatasetName(); + private static final String DATASET_EU = RemoteBigQueryHelper.generateDatasetName(); private static final String TABLE = "testtable"; private static final String TABLE2 = "complicatedtable"; private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset"; @@ -56,8 +57,10 @@ public class ITBigQueryWriteManualClientTest { private static BigQueryWriteClient client; private static TableInfo tableInfo; private static TableInfo tableInfo2; + private static TableInfo tableInfoEU; private static String tableId; private static String tableId2; + private static String tableIdEU; private static BigQuery bigquery; @BeforeClass @@ -110,6 +113,25 @@ public static void beforeClass() throws IOException { String.format( "projects/%s/datasets/%s/tables/%s", ServiceOptions.getDefaultProjectId(), DATASET, TABLE2); + DatasetInfo datasetInfoEU = + DatasetInfo.newBuilder(/* datasetId = */ DATASET_EU) + .setLocation("EU") + .setDescription(DESCRIPTION) + .build(); + bigquery.create(datasetInfoEU); + tableInfoEU = + TableInfo.newBuilder( + TableId.of(DATASET_EU, TABLE), + StandardTableDefinition.of( + Schema.of( + com.google.cloud.bigquery.Field.newBuilder("foo", LegacySQLTypeName.STRING) + .build()))) + .build(); + tableIdEU = + String.format( + "projects/%s/datasets/%s/tables/%s", + ServiceOptions.getDefaultProjectId(), DATASET_EU, TABLE); + bigquery.create(tableInfoEU); } @AfterClass @@ -206,6 +228,54 @@ public void testBatchWriteWithCommittedStream() } } + ProtoRows CreateProtoRows(String[] messages) { + ProtoRows.Builder rows = ProtoRows.newBuilder(); + for (String message : messages) { + FooType foo = FooType.newBuilder().setFoo(message).build(); + rows.addSerializedRows(foo.toByteString()); + } + return rows.build(); + } + + @Test + public void testBatchWriteWithCommittedStreamEU() + throws IOException, InterruptedException, ExecutionException { + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(tableIdEU) + .setWriteStream( + WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) + .build()); + StreamWriterV2 streamWriter = + StreamWriterV2.newBuilder(writeStream.getName()) + .setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor())) + .build(); + LOG.info("Sending one message"); + + ApiFuture response = + streamWriter.append(CreateProtoRows(new String[] {"aaa"}), 0); + assertEquals(0, response.get().getAppendResult().getOffset().getValue()); + + LOG.info("Sending two more messages"); + ApiFuture response1 = + streamWriter.append(CreateProtoRows(new String[] {"bbb", "ccc"}), 1); + ApiFuture response2 = + streamWriter.append(CreateProtoRows(new String[] {"ddd"}), 3); + assertEquals(1, response1.get().getAppendResult().getOffset().getValue()); + assertEquals(3, response2.get().getAppendResult().getOffset().getValue()); + + TableResult result = + bigquery.listTableData( + tableInfoEU.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); + Iterator iter = result.getValues().iterator(); + assertEquals("aaa", iter.next().get(0).getStringValue()); + assertEquals("bbb", iter.next().get(0).getStringValue()); + assertEquals("ccc", iter.next().get(0).getStringValue()); + assertEquals("ddd", iter.next().get(0).getStringValue()); + assertEquals(false, iter.hasNext()); + } + @Test public void testJsonStreamWriterCommittedStream() throws IOException, InterruptedException, ExecutionException, diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java index abfca7b61c..f3471d910f 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java @@ -25,9 +25,12 @@ import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadClient; import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadGrpc.BigQueryReadImplBase; import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadSettings; +import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteSettings; import com.google.cloud.bigquery.storage.v1beta2.ReadRowsRequest; import com.google.cloud.bigquery.storage.v1beta2.ReadSession; import com.google.cloud.bigquery.storage.v1beta2.SplitReadStreamRequest; +import com.google.cloud.bigquery.storage.v1beta2.WriteStream; import java.util.regex.Pattern; import org.junit.After; import org.junit.AfterClass; @@ -43,6 +46,9 @@ public class ResourceHeaderTest { private static final String TEST_TABLE_REFERENCE = "projects/project/datasets/dataset/tables/table"; + private static final String WRITE_STREAM_NAME = + "projects/project/datasets/dataset/tables/table/streams/stream"; + private static final String TEST_STREAM_NAME = "streamName"; private static final String NAME = "resource-header-test:123"; @@ -52,6 +58,20 @@ public class ResourceHeaderTest { private static final Pattern READ_SESSION_NAME_PATTERN = Pattern.compile( ".*" + "read_session\\.table=projects/project/datasets/dataset/tables/table" + ".*"); + + private static final Pattern PARENT_PATTERN = + Pattern.compile(".*" + "parent=projects/project/datasets/dataset/tables/table" + ".*"); + + private static final Pattern NAME_PATTERN = + Pattern.compile( + ".*" + "name=projects/project/datasets/dataset/tables/table/streams/stream" + ".*"); + + private static final Pattern WRITE_STREAM_PATTERN = + Pattern.compile( + ".*" + + "write_stream=projects/project/datasets/dataset/tables/table/streams/stream" + + ".*"); + private static final Pattern READ_STREAM_PATTERN = Pattern.compile(".*" + "read_stream=streamName" + ".*"); private static final Pattern STREAM_NAME_PATTERN = @@ -64,7 +84,9 @@ public class ResourceHeaderTest { private static InProcessServer server; private LocalChannelProvider channelProvider; + private LocalChannelProvider channelProvider2; private BigQueryReadClient client; + private BigQueryWriteClient writeClient; @BeforeClass public static void setUpClass() throws Exception { @@ -81,6 +103,12 @@ public void setUp() throws Exception { .setHeaderProvider(FixedHeaderProvider.create(TEST_HEADER_NAME, TEST_HEADER_VALUE)) .setTransportChannelProvider(channelProvider); client = BigQueryReadClient.create(settingsBuilder.build()); + channelProvider2 = LocalChannelProvider.create(NAME); + BigQueryWriteSettings.Builder writeSettingsBuilder = + BigQueryWriteSettings.newBuilder() + .setCredentialsProvider(NoCredentialsProvider.create()) + .setTransportChannelProvider(channelProvider2); + writeClient = BigQueryWriteClient.create(writeSettingsBuilder.build()); } @After @@ -129,6 +157,63 @@ public void splitReadStreamTest() { verifyHeaderSent(STREAM_NAME_PATTERN); } + @Test + public void createWriteStreamTest() { + try { + writeClient.createWriteStream( + "projects/project/datasets/dataset/tables/table", + WriteStream.newBuilder().setType(WriteStream.Type.BUFFERED).build()); + } catch (UnimplementedException e) { + // Ignore the error: none of the methods are actually implemented. + } + boolean headerSent = channelProvider2.isHeaderSent(HEADER_NAME, PARENT_PATTERN); + assertWithMessage("Generated header was sent").that(headerSent).isTrue(); + } + + @Test + public void getWriteStreamTest() { + try { + writeClient.getWriteStream(WRITE_STREAM_NAME); + } catch (UnimplementedException e) { + // Ignore the error: none of the methods are actually implemented. + } + boolean headerSent = channelProvider2.isHeaderSent(HEADER_NAME, NAME_PATTERN); + assertWithMessage("Generated header was sent").that(headerSent).isTrue(); + } + + // Following tests will work after b/185842996 is fixed. + // @Test + // public void appendRowsTest() { + // try { + // AppendRowsRequest req = + // AppendRowsRequest.newBuilder().setWriteStream(WRITE_STREAM_NAME).build(); + // BidiStream bidiStream = + // writeClient.appendRowsCallable().call(); + // bidiStream.send(req); + // } catch (UnimplementedException e) { + // // Ignore the error: none of the methods are actually implemented. + // } + // boolean headerSent = channelProvider2.isHeaderSent(HEADER_NAME, WRITE_STREAM_PATTERN); + // assertWithMessage("Generated header was sent").that(headerSent).isTrue(); + // } + // + // @Test + // public void appendRowsManualTest() { + // try { + // StreamWriterV2 streamWriter = + // StreamWriterV2.newBuilder(WRITE_STREAM_NAME, writeClient) + // .setWriterSchema(ProtoSchema.newBuilder().build()) + // .build(); + // streamWriter.append(ProtoRows.newBuilder().build(), 1); + // } catch (UnimplementedException e) { + // // Ignore the error: none of the methods are actually implemented. + // } catch (IOException e) { + // // Ignore the error: none of the methods are actually implemented. + // } + // boolean headerSent = channelProvider2.isHeaderSent(HEADER_NAME, WRITE_STREAM_PATTERN); + // assertWithMessage("Generated header was sent").that(headerSent).isTrue(); + // } + private void verifyHeaderSent(Pattern... patterns) { for (Pattern pattern : patterns) { boolean headerSent = channelProvider.isHeaderSent(HEADER_NAME, pattern);