Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add flush API to StreamWriter #278

Merged
merged 2 commits into from May 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -244,6 +244,29 @@ public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
return outstandingAppend.appendResult;
}

/**
* Flush the rows on a BUFFERED stream, up to the specified offset. After flush, rows will be
* available for read. If no exception is thrown, it means the flush happened.
*
* <p>NOTE: Currently the implementation is void, BUFFERED steam acts like COMMITTED stream. It is
* just for Dataflow team to mock the usage.
*
* @param offset Offset to which the rows will be committed to the system. It must fall within the
* row counts on the stream.
* @throws IllegalArgumentException if offset is invalid
*/
public void flush(long offset) {
if (offset < 0) {
throw new IllegalArgumentException("Invalid offset: " + offset);
}
// TODO: Once we persisted stream type, we should check the call can only be issued on BUFFERED
// stream here.
Storage.FlushRowsRequest request =
Storage.FlushRowsRequest.newBuilder().setWriteStream(streamName).setOffset(offset).build();
stub.flushRows(request);
// TODO: We will verify if the returned offset is equal to requested offset.
}

/**
* Re-establishes a stream connection.
*
Expand Down
Expand Up @@ -54,6 +54,8 @@ public void addResponse(AbstractMessage response) {
serviceImpl.addResponse((AppendRowsResponse) response);
} else if (response instanceof Stream.WriteStream) {
serviceImpl.addWriteStreamResponse((Stream.WriteStream) response);
} else if (response instanceof FlushRowsResponse) {
serviceImpl.addFlushRowsResponse((FlushRowsResponse) response);
} else {
throw new IllegalStateException("Unsupported service");
}
Expand Down
Expand Up @@ -38,9 +38,11 @@ class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase {
private final LinkedBlockingQueue<AppendRowsRequest> requests = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<GetWriteStreamRequest> writeRequests =
new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<FlushRowsRequest> flushRequests = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<Response> responses = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<Stream.WriteStream> writeResponses =
new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<FlushRowsResponse> flushResponses = new LinkedBlockingQueue<>();
private final AtomicInteger nextMessageId = new AtomicInteger(1);
private boolean autoPublishResponse;
private ScheduledExecutorService executor = null;
Expand Down Expand Up @@ -97,6 +99,21 @@ public void getWriteStream(
}
}

@Override
public void flushRows(
FlushRowsRequest request, StreamObserver<FlushRowsResponse> responseObserver) {
Object response = writeResponses.remove();
if (response instanceof FlushRowsResponse) {
flushRequests.add(request);
responseObserver.onNext((FlushRowsResponse) response);
responseObserver.onCompleted();
} else if (response instanceof Exception) {
responseObserver.onError((Exception) response);
} else {
responseObserver.onError(new IllegalArgumentException("Unrecognized response type"));
}
}

@Override
public StreamObserver<AppendRowsRequest> appendRows(
final StreamObserver<AppendRowsResponse> responseObserver) {
Expand Down Expand Up @@ -173,6 +190,11 @@ public FakeBigQueryWriteImpl addWriteStreamResponse(Stream.WriteStream response)
return this;
}

public FakeBigQueryWriteImpl addFlushRowsResponse(FlushRowsResponse response) {
flushResponses.add(response);
return this;
}

public FakeBigQueryWriteImpl addConnectionError(Throwable error) {
responses.add(new Response(error));
return this;
Expand Down
Expand Up @@ -399,4 +399,39 @@ public Long call() throws IOException, InterruptedException, ExecutionException
}
DirectWriter.clearCache();
}

@Test
public void testFlushRows() throws IOException, InterruptedException, ExecutionException {
String tableName = "BufferTable";
TableInfo tableInfo =
TableInfo.newBuilder(
TableId.of(DATASET, tableName),
StandardTableDefinition.of(
Schema.of(
com.google.cloud.bigquery.Field.newBuilder("foo", LegacySQLTypeName.STRING)
.build())))
.build();
bigquery.create(tableInfo);
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(parent.toString())
.setWriteStream(WriteStream.newBuilder().setType(WriteStream.Type.BUFFERED).build())
.build());
try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()).build()) {
ApiFuture<AppendRowsResponse> response =
streamWriter.append(
createAppendRequest(writeStream.getName(), new String[] {"aaa"})
.setOffset(Int64Value.of(0L))
.build());
assertEquals(0L, response.get().getOffset());
streamWriter.flush(0);
}
TableResult result =
bigquery.listTableData(tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L));
Iterator<FieldValueList> iter = result.getValues().iterator();
assertEquals("aaa", iter.next().get(0).getStringValue());
assertEquals(false, iter.hasNext());
}
}