Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: in StreamWriterV2, supports new append, which takes rows and offset #894

Merged
merged 8 commits into from Mar 1, 2021
Expand Up @@ -19,10 +19,12 @@
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsRequest.ProtoData;
import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.DoneCallback;
import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.RequestCallback;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Int64Value;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
Expand All @@ -39,8 +41,6 @@
/**
* A BigQuery Stream Writer that can be used to write data into BigQuery Table.
*
* <p>TODO: Attach schema.
*
* <p>TODO: Attach traceId.
*
* <p>TODO: Support batching.
Expand All @@ -59,6 +59,11 @@ public class StreamWriterV2 implements AutoCloseable {
*/
private final String streamName;

/*
* The proto schema of rows to write.
*/
private final ProtoSchema writerSchema;
yirutang marked this conversation as resolved.
Show resolved Hide resolved

/*
* Max allowed inflight requests in the stream. Method append is blocked at this.
*/
Expand Down Expand Up @@ -135,6 +140,7 @@ private StreamWriterV2(Builder builder) throws IOException {
this.hasMessageInWaitingQueue = lock.newCondition();
this.inflightReduced = lock.newCondition();
this.streamName = builder.streamName;
this.writerSchema = builder.writerSchema;
this.maxInflightRequests = builder.maxInflightRequest;
this.maxInflightBytes = builder.maxInflightBytes;
this.waitingRequestQueue = new LinkedList<AppendRequestAndResponse>();
Expand Down Expand Up @@ -188,10 +194,52 @@ public void run() {
* ApiFuture<AppendRowsResponse> messageIdFuture = writer.append(message);
* ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<AppendRowsResponse>() {
* public void onSuccess(AppendRowsResponse response) {
* if (response.hasOffset()) {
* System.out.println("written with offset: " + response.getOffset());
* if (!response.hasError()) {
* System.out.println("written with offset: " + response.getAppendResult().getOffset());
* } else {
* System.out.println("received an in stream error: " + response.getError().toString());
* }
* }
*
* public void onFailure(Throwable t) {
* System.out.println("failed to write: " + t);
* }
* }, MoreExecutors.directExecutor());
* }</pre>
*
* @param rows the rows in serialized format to write to BigQuery.
* @param offset the offset of the first row.
* @return the append response wrapped in a future.
*/
public ApiFuture<AppendRowsResponse> append(ProtoRows rows, long offset) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we simply accept byte strings?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think byte string is cleaner in this case. But since this builds with API anyway, I am fine with ProtoRows.

// TODO: Move this check to builder after the other append is removed.
if (this.writerSchema == null) {
throw new StatusRuntimeException(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs to be in at the writer builder for sure, here is not a proper place to validate this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. That's why I have a TODO here.

I need to put it here instead for backward compatibility reason. Otherwise, it would break existing customers. For example, the sample will stop working because the version is bumped up automatically.

I just need to do it in two phases.

Status.fromCode(Code.INVALID_ARGUMENT)
.withDescription("Writer schema must be provided when building this writer."));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this needs to be moved then no worries but otherwise, maybe we need to add a test for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
requestBuilder.setProtoRows(ProtoData.newBuilder().setRows(rows).build());
if (offset >= 0) {
requestBuilder.setOffset(Int64Value.of(offset));
}
return append(requestBuilder.build());
}

/**
* Schedules the writing of a message.
*
* <p>Example of writing a message.
*
* <pre>{@code
* AppendRowsRequest message;
* ApiFuture<AppendRowsResponse> messageIdFuture = writer.append(message);
* ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<AppendRowsResponse>() {
* public void onSuccess(AppendRowsResponse response) {
* if (!response.hasError()) {
* System.out.println("written with offset: " + response.getAppendResult().getOffset());
* } else {
* System.out.println("received an in stream error: " + response.error().toString());
* System.out.println("received an in stream error: " + response.getError().toString());
* }
* }
*
Expand All @@ -202,8 +250,9 @@ public void run() {
* }</pre>
*
* @param message the message in serialized format to write to BigQuery.
* @return the message ID wrapped in a future.
* @return the append response wrapped in a future.
*/
@Deprecated
public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message);
if (requestWrapper.messageSize > getApiMaxRequestBytes()) {
Expand Down Expand Up @@ -380,6 +429,9 @@ private AppendRowsRequest prepareRequestBasedOnPosition(
AppendRowsRequest original, boolean isFirstRequest) {
AppendRowsRequest.Builder requestBuilder = original.toBuilder();
if (isFirstRequest) {
if (this.writerSchema != null) {
requestBuilder.getProtoRowsBuilder().setWriterSchema(this.writerSchema);
}
requestBuilder.setWriteStream(this.streamName);
} else {
requestBuilder.clearWriteStream();
Expand Down Expand Up @@ -473,6 +525,8 @@ public static final class Builder {

private BigQueryWriteClient client;

private ProtoSchema writerSchema = null;

private long maxInflightRequest = DEFAULT_MAX_INFLIGHT_REQUESTS;

private long maxInflightBytes = DEFAULT_MAX_INFLIGHT_BYTES;
Expand All @@ -495,6 +549,12 @@ private Builder(String streamName, BigQueryWriteClient client) {
this.client = Preconditions.checkNotNull(client);
}

/** Sets the proto schema of the rows. */
public Builder setWriterSchema(ProtoSchema writerSchema) {
this.writerSchema = writerSchema;
return this;
}

public Builder setMaxInflightRequests(long value) {
this.maxInflightRequest = value;
return this;
Expand Down
Expand Up @@ -87,31 +87,39 @@ private StreamWriterV2 getTestStreamWriterV2() throws IOException {
return StreamWriterV2.newBuilder(TEST_STREAM, client).build();
}

private AppendRowsRequest createAppendRequest(String[] messages, long offset) {
AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
AppendRowsRequest.ProtoData.Builder dataBuilder = AppendRowsRequest.ProtoData.newBuilder();
dataBuilder.setWriterSchema(
ProtoSchema.newBuilder()
.setProtoDescriptor(
DescriptorProtos.DescriptorProto.newBuilder()
.setName("Message")
.addField(
DescriptorProtos.FieldDescriptorProto.newBuilder()
.setName("foo")
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING)
.setNumber(1)
.build())
.build()));
ProtoRows.Builder rows = ProtoRows.newBuilder();
private ProtoSchema createProtoSchema() {
return ProtoSchema.newBuilder()
.setProtoDescriptor(
DescriptorProtos.DescriptorProto.newBuilder()
.setName("Message")
.addField(
DescriptorProtos.FieldDescriptorProto.newBuilder()
.setName("foo")
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING)
.setNumber(1)
.build())
.build())
.build();
}

private ProtoRows createProtoRows(String[] messages) {
ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
for (String message : messages) {
FooType foo = FooType.newBuilder().setFoo(message).build();
rows.addSerializedRows(foo.toByteString());
rowsBuilder.addSerializedRows(foo.toByteString());
}
return rowsBuilder.build();
}

private AppendRowsRequest createAppendRequest(String[] messages, long offset) {
AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
AppendRowsRequest.ProtoData.Builder dataBuilder = AppendRowsRequest.ProtoData.newBuilder();
dataBuilder.setWriterSchema(createProtoSchema());
if (offset > 0) {
requestBuilder.setOffset(Int64Value.of(offset));
}
return requestBuilder
.setProtoRows(dataBuilder.setRows(rows.build()).build())
.setProtoRows(dataBuilder.setRows(createProtoRows(messages)).build())
.setWriteStream(TEST_STREAM)
.build();
}
Expand Down Expand Up @@ -166,6 +174,24 @@ public void run() {
appendThread.interrupt();
}

private void verifyAppendRequests(long appendCount) {
assertEquals(appendCount, testBigQueryWrite.getAppendRequests().size());
for (int i = 0; i < appendCount; i++) {
AppendRowsRequest serverRequest = testBigQueryWrite.getAppendRequests().get(i);
assertTrue(serverRequest.getProtoRows().getRows().getSerializedRowsCount() > 0);
assertEquals(i, serverRequest.getOffset().getValue());
if (i == 0) {
// First request received by server should have schema and stream name.
assertTrue(serverRequest.getProtoRows().hasWriterSchema());
assertEquals(serverRequest.getWriteStream(), TEST_STREAM);
} else {
// Following request should not have schema and stream name.
assertFalse(serverRequest.getProtoRows().hasWriterSchema());
assertEquals(serverRequest.getWriteStream(), "");
}
}
}

@Test
public void testBuildBigQueryWriteClientInWriter() throws Exception {
StreamWriterV2 writer =
Expand All @@ -181,40 +207,68 @@ public void testBuildBigQueryWriteClientInWriter() throws Exception {
}

@Test
public void testAppendSuccess() throws Exception {
StreamWriterV2 writer = getTestStreamWriterV2();
public void testAppendWithRowsSuccess() throws Exception {
StreamWriterV2 writer =
StreamWriterV2.newBuilder(TEST_STREAM, client).setWriterSchema(createProtoSchema()).build();

long appendCount = 1000;
long appendCount = 100;
for (int i = 0; i < appendCount; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}

List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
for (int i = 0; i < appendCount; i++) {
futures.add(sendTestMessage(writer, new String[] {String.valueOf(i)}));
futures.add(writer.append(createProtoRows(new String[] {String.valueOf(i)}), i));
}

for (int i = 0; i < appendCount; i++) {
assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue());
}
assertEquals(appendCount, testBigQueryWrite.getAppendRequests().size());

verifyAppendRequests(appendCount);

writer.close();
}

@Test
public void testAppendWithMessageSuccess() throws Exception {
StreamWriterV2 writer = getTestStreamWriterV2();

long appendCount = 1000;
for (int i = 0; i < appendCount; i++) {
AppendRowsRequest serverRequest = testBigQueryWrite.getAppendRequests().get(i);
if (i == 0) {
// First request received by server should have schema and stream name.
assertTrue(serverRequest.getProtoRows().hasWriterSchema());
assertEquals(serverRequest.getWriteStream(), TEST_STREAM);
} else {
// Following request should not have schema and stream name.
assertFalse(serverRequest.getProtoRows().hasWriterSchema());
assertEquals(serverRequest.getWriteStream(), "");
}
testBigQueryWrite.addResponse(createAppendResponse(i));
}

List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
for (int i = 0; i < appendCount; i++) {
futures.add(writer.append(createAppendRequest(new String[] {String.valueOf(i)}, i)));
}

for (int i = 0; i < appendCount; i++) {
assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue());
}

verifyAppendRequests(appendCount);

writer.close();
}

@Test
public void testAppendWithRowsNoSchema() throws Exception {
final StreamWriterV2 writer = getTestStreamWriterV2();
StatusRuntimeException ex =
assertThrows(
StatusRuntimeException.class,
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
writer.append(createProtoRows(new String[] {"A"}), -1);
}
});
assertEquals(ex.getStatus().getCode(), Status.INVALID_ARGUMENT.getCode());
assertTrue(ex.getStatus().getDescription().contains("Writer schema must be provided"));
}

@Test
public void testAppendSuccessAndConnectionError() throws Exception {
StreamWriterV2 writer = getTestStreamWriterV2();
Expand Down