diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java index dda66bbe6f..06f753b6c4 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java @@ -243,6 +243,29 @@ public ApiFuture append(AppendRowsRequest message) { return outstandingAppend.appendResult; } + /** + * Flush the rows on a BUFFERED stream, up to the specified offset. After flush, rows will be + * available for read. If no exception is thrown, it means the flush happened. + * + *

NOTE: Currently the implementation is void, BUFFERED steam acts like COMMITTED stream. It is + * just for Dataflow team to mock the usage. + * + * @param offset Offset to which the rows will be committed to the system. It must fall within the + * row counts on the stream. + * @throws IllegalArgumentException if offset is invalid + */ + public void flush(long offset) { + if (offset < 0) { + throw new IllegalArgumentException("Invalid offset: " + offset); + } + // TODO: Once we persisted stream type, we should check the call can only be issued on BUFFERED + // stream here. + Storage.FlushRowsRequest request = + Storage.FlushRowsRequest.newBuilder().setWriteStream(streamName).setOffset(offset).build(); + stub.flushRows(request); + // TODO: We will verify if the returned offset is equal to requested offset. + } + /** * Re-establishes a stream connection. * diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWrite.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWrite.java index f1350ce7e7..c743b39af7 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWrite.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWrite.java @@ -54,6 +54,8 @@ public void addResponse(AbstractMessage response) { serviceImpl.addResponse((AppendRowsResponse) response); } else if (response instanceof Stream.WriteStream) { serviceImpl.addWriteStreamResponse((Stream.WriteStream) response); + } else if (response instanceof FlushRowsResponse) { + serviceImpl.addFlushRowsResponse((FlushRowsResponse) response); } else { throw new IllegalStateException("Unsupported service"); } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWriteImpl.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWriteImpl.java index 0a3aa2e622..39c1e4158a 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWriteImpl.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWriteImpl.java @@ -38,9 +38,11 @@ class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase { private final LinkedBlockingQueue requests = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue writeRequests = new LinkedBlockingQueue<>(); + private final LinkedBlockingQueue flushRequests = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue responses = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue writeResponses = new LinkedBlockingQueue<>(); + private final LinkedBlockingQueue flushResponses = new LinkedBlockingQueue<>(); private final AtomicInteger nextMessageId = new AtomicInteger(1); private boolean autoPublishResponse; private ScheduledExecutorService executor = null; @@ -97,6 +99,21 @@ public void getWriteStream( } } + @Override + public void flushRows( + FlushRowsRequest request, StreamObserver responseObserver) { + Object response = writeResponses.remove(); + if (response instanceof FlushRowsResponse) { + flushRequests.add(request); + responseObserver.onNext((FlushRowsResponse) response); + responseObserver.onCompleted(); + } else if (response instanceof Exception) { + responseObserver.onError((Exception) response); + } else { + responseObserver.onError(new IllegalArgumentException("Unrecognized response type")); + } + } + @Override public StreamObserver appendRows( final StreamObserver responseObserver) { @@ -173,6 +190,11 @@ public FakeBigQueryWriteImpl addWriteStreamResponse(Stream.WriteStream response) return this; } + public FakeBigQueryWriteImpl addFlushRowsResponse(FlushRowsResponse response) { + flushResponses.add(response); + return this; + } + public FakeBigQueryWriteImpl addConnectionError(Throwable error) { responses.add(new Response(error)); return this; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java index da6e144323..c75d6859e6 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java @@ -399,4 +399,39 @@ public Long call() throws IOException, InterruptedException, ExecutionException } DirectWriter.clearCache(); } + + @Test + public void testFlushRows() throws IOException, InterruptedException, ExecutionException { + String tableName = "BufferTable"; + TableInfo tableInfo = + TableInfo.newBuilder( + TableId.of(DATASET, tableName), + StandardTableDefinition.of( + Schema.of( + com.google.cloud.bigquery.Field.newBuilder("foo", LegacySQLTypeName.STRING) + .build()))) + .build(); + bigquery.create(tableInfo); + TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(parent.toString()) + .setWriteStream(WriteStream.newBuilder().setType(WriteStream.Type.BUFFERED).build()) + .build()); + try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()).build()) { + ApiFuture response = + streamWriter.append( + createAppendRequest(writeStream.getName(), new String[] {"aaa"}) + .setOffset(Int64Value.of(0L)) + .build()); + assertEquals(0L, response.get().getOffset()); + streamWriter.flush(0); + } + TableResult result = + bigquery.listTableData(tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); + Iterator iter = result.getValues().iterator(); + assertEquals("aaa", iter.next().get(0).getStringValue()); + assertEquals(false, iter.hasNext()); + } }