Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
fix: Accept null json values in JsonToProtoMessage converter (#1288)
* fix: accept null json values in JsonToProtoMessage converter

* remove a unit test since we no longe cares about StreamWriter

* 🦉 Updates from OwlBot

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
yirutang and gcf-owl-bot[bot] committed Sep 3, 2021
1 parent b920cd5 commit fb515ab
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 101 deletions.
Expand Up @@ -133,6 +133,9 @@ private static void fillField(
throws IllegalArgumentException {

java.lang.Object val = json.get(exactJsonKeyName);
if (val == JSONObject.NULL) {
return;
}
switch (fieldDescriptor.getType()) {
case BOOL:
if (val instanceof Boolean) {
Expand Down
Expand Up @@ -743,15 +743,12 @@ public void testTopLevelMatchSecondLevelMismatch() throws Exception {

@Test
public void testJsonNullValue() throws Exception {
TestInt64 expectedProto = TestInt64.newBuilder().setInt(1).build();
JSONObject json = new JSONObject();
json.put("long", JSONObject.NULL);
json.put("int", 1);
try {
DynamicMessage protoMsg =
JsonToProtoMessage.convertJsonToProtoMessage(TestInt64.getDescriptor(), json);
Assert.fail("Should fail");
} catch (IllegalArgumentException e) {
assertEquals(e.getMessage(), "JSONObject does not have a int64 field at root.long.");
}
DynamicMessage protoMsg =
JsonToProtoMessage.convertJsonToProtoMessage(TestInt64.getDescriptor(), json);
assertEquals(expectedProto, protoMsg);
}
}
Expand Up @@ -901,100 +901,6 @@ public Throwable call() {
executor.shutdown();
}

@Test
public void testFlowControlBehaviorBlockAbortOnShutdown() throws Exception {
StreamWriter writer =
getTestStreamWriterBuilder()
.setBatchingSettings(
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setElementCountThreshold(1L)
.setFlowControlSettings(
StreamWriter.Builder.DEFAULT_FLOW_CONTROL_SETTINGS
.toBuilder()
.setMaxOutstandingElementCount(2L)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
.build())
.build())
.build();

testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build())
.build());
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build())
.build());
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(4)).build())
.build());
// Response will have a 10 second delay before server sends them back.
testBigQueryWrite.setResponseDelay(Duration.ofSeconds(10));

ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"}, 2);
final StreamWriter writer1 = writer;
ExecutorService executor = Executors.newFixedThreadPool(2);
Callable<Throwable> callable =
new Callable<Throwable>() {
@Override
public Throwable call() {
try {
ApiFuture<AppendRowsResponse> appendFuture2 =
sendTestMessage(writer1, new String[] {"B"}, 3);
ApiFuture<AppendRowsResponse> appendFuture3 =
sendTestMessage(writer1, new String[] {"C"}, 4);

// This request will be send out immediately because there is space in inflight queue.
if (3L != appendFuture2.get().getAppendResult().getOffset().getValue()) {
return new Exception(
"Expect offset to be 3 but got "
+ appendFuture2.get().getAppendResult().getOffset().getValue());
}
testBigQueryWrite.waitForResponseScheduled();
// This triggers the last response to come back.
fakeExecutor.advanceTime(Duration.ofSeconds(10));
// This request will be waiting for previous response to come back.
if (4L != appendFuture3.get().getAppendResult().getOffset().getValue()) {
return new Exception(
"Expect offset to be 4 but got "
+ appendFuture2.get().getAppendResult().getOffset().getValue());
}
} catch (InterruptedException e) {
return e;
} catch (ExecutionException e) {
return e;
} catch (IllegalStateException e) {
// In very rare cases, the stream is shutdown before the request is send, ignore this
// error.
}
return null;
}
};
Future<Throwable> future = executor.submit(callable);
assertEquals(false, appendFuture1.isDone());
// Wait is necessary for response to be scheduled before timer is advanced.
testBigQueryWrite.waitForResponseScheduled();
testBigQueryWrite.waitForResponseScheduled();
// This will trigger the previous two responses to come back.
fakeExecutor.advanceTime(Duration.ofSeconds(10));
// The first requests gets back while the second one is blocked.
assertEquals(2L, appendFuture1.get().getAppendResult().getOffset().getValue());
// When close is called, there should be one inflight request waiting.
Thread.sleep(500);
writer.close();
if (future.get() != null) {
future.get().printStackTrace();
fail("Callback got exception" + future.get().toString());
}
// Everything should come back.
executor.shutdown();
}

@Test
public void testFlowControlBehaviorException() throws Exception {
try (StreamWriter writer =
Expand Down

0 comments on commit fb515ab

Please sign in to comment.