Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: exception tests #586

Merged
merged 2 commits into from Oct 6, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -18,7 +18,6 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.google.api.core.*;
import com.google.api.core.ApiFuture;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
Expand All @@ -32,14 +31,18 @@
import com.google.cloud.bigquery.storage.test.Test.UpdatedFooType2;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.AppendRowsResponse;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.logging.Logger;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -168,7 +171,6 @@ public void setUp() throws Exception {

@After
public void tearDown() throws Exception {
LOG.info("tearDown called");
serviceHelper.stop();
}

Expand All @@ -181,19 +183,28 @@ private JsonStreamWriter.Builder getTestJsonStreamWriterBuilder(
}

@Test
public void testTwoParamNewBuilder() throws Exception {
public void testTwoParamNewBuilder_nullSchema() {
try {
getTestJsonStreamWriterBuilder(null, TABLE_SCHEMA);
Assert.fail("expected NullPointerException");
} catch (NullPointerException e) {
assertEquals(e.getMessage(), "StreamName is null.");
}
}

@Test
public void testTwoParamNewBuilder_nullStream() {
try {
getTestJsonStreamWriterBuilder(TEST_STREAM, null);
Assert.fail("expected NullPointerException");
} catch (NullPointerException e) {
assertEquals(e.getMessage(), "TableSchema is null.");
}
}

@Test
public void testTwoParamNewBuilder()
throws DescriptorValidationException, IOException, InterruptedException {
JsonStreamWriter writer = getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build();
assertEquals(TEST_STREAM, writer.getStreamName());
}
Expand Down Expand Up @@ -522,7 +533,9 @@ public void testAppendMultipleSchemaUpdate() throws Exception {
}

@Test
public void testAppendAlreadyExistsException() throws Exception {
// This might be a bug but it is the current behavior. Investigate.
public void testAppendAlreadyExists_doesNotThrowxception()
throws DescriptorValidationException, IOException, InterruptedException, ExecutionException {
try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) {
testBigQueryWrite.addResponse(
Expand All @@ -535,11 +548,7 @@ public void testAppendAlreadyExistsException() throws Exception {
jsonArr.put(foo);
ApiFuture<AppendRowsResponse> appendFuture =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
try {
appendFuture.get();
} catch (Throwable t) {
assertEquals(t.getCause().getMessage(), "ALREADY_EXISTS: ");
}
appendFuture.get();
}
}

Expand All @@ -559,8 +568,9 @@ public void testAppendOutOfRangeException() throws Exception {
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
try {
appendFuture.get();
} catch (Throwable t) {
assertEquals(t.getCause().getMessage(), "OUT_OF_RANGE: ");
Assert.fail("expected ExecutionException");
} catch (ExecutionException ex) {
assertEquals(ex.getCause().getMessage(), "OUT_OF_RANGE: ");
}
}
}
Expand All @@ -584,8 +594,9 @@ public void testAppendOutOfRangeAndUpdateSchema() throws Exception {
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
try {
appendFuture.get();
} catch (Throwable t) {
assertEquals(t.getCause().getMessage(), "OUT_OF_RANGE: ");
Assert.fail("expected ExecutionException");
} catch (ExecutionException ex) {
assertEquals(ex.getCause().getMessage(), "OUT_OF_RANGE: ");
int millis = 0;
while (millis <= 10000) {
if (writer.getDescriptor().getFields().size() == 2) {
Expand Down Expand Up @@ -781,15 +792,13 @@ public void run() {
ApiFuture<AppendRowsResponse> appendFuture =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
AppendRowsResponse response = appendFuture.get();
LOG.info("Processing complete, offset is " + response.getOffset());
offset_sets.remove(response.getOffset());
} catch (Exception e) {
LOG.severe("Thread execution failed: " + e.getMessage());
}
}
});
thread_arr[i] = t;
LOG.info("Starting thread " + i + ".");
t.start();
}

Expand Down Expand Up @@ -833,10 +842,10 @@ public void testMultiThreadAppendWithSchemaUpdate() throws Exception {
final JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);

final HashSet<Long> offset_sets = new HashSet<Long>();
int thread_nums = 5;
Thread[] thread_arr = new Thread[thread_nums];
for (int i = 0; i < thread_nums; i++) {
final HashSet<Long> offsetSets = new HashSet<Long>();
int numberThreads = 5;
Thread[] thread_arr = new Thread[numberThreads];
for (int i = 0; i < numberThreads; i++) {
if (i == 2) {
testBigQueryWrite.addResponse(
Storage.AppendRowsResponse.newBuilder()
Expand All @@ -848,7 +857,7 @@ public void testMultiThreadAppendWithSchemaUpdate() throws Exception {
Storage.AppendRowsResponse.newBuilder().setOffset((long) i).build());
}

offset_sets.add((long) i);
offsetSets.add((long) i);
Thread t =
new Thread(
new Runnable() {
Expand All @@ -857,23 +866,21 @@ public void run() {
ApiFuture<AppendRowsResponse> appendFuture =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
AppendRowsResponse response = appendFuture.get();
LOG.info("Processing complete, offset is " + response.getOffset());
offset_sets.remove(response.getOffset());
offsetSets.remove(response.getOffset());
} catch (Exception e) {
LOG.severe("Thread execution failed: " + e.getMessage());
}
}
});
thread_arr[i] = t;
LOG.info("Starting thread " + i + ".");
t.start();
}

for (int i = 0; i < thread_nums; i++) {
for (int i = 0; i < numberThreads; i++) {
thread_arr[i].join();
}
assertTrue(offset_sets.size() == 0);
for (int i = 0; i < thread_nums; i++) {
assertTrue(offsetSets.size() == 0);
for (int i = 0; i < numberThreads; i++) {
assertEquals(
1,
testBigQueryWrite
Expand All @@ -900,16 +907,16 @@ public void run() {
Thread.sleep(100);
millis += 100;
}
assertTrue(writer.getDescriptor().getFields().size() == 2);
assertEquals(2, writer.getDescriptor().getFields().size());

foo.put("bar", "allen2");
final JSONArray jsonArr2 = new JSONArray();
jsonArr2.put(foo);

for (int i = thread_nums; i < thread_nums + 5; i++) {
for (int i = numberThreads; i < numberThreads + 5; i++) {
testBigQueryWrite.addResponse(
Storage.AppendRowsResponse.newBuilder().setOffset((long) i).build());
offset_sets.add((long) i);
offsetSets.add((long) i);
Thread t =
new Thread(
new Runnable() {
Expand All @@ -918,23 +925,21 @@ public void run() {
ApiFuture<AppendRowsResponse> appendFuture =
writer.append(jsonArr2, -1, /* allowUnknownFields */ false);
AppendRowsResponse response = appendFuture.get();
LOG.info("Processing complete, offset is " + response.getOffset());
offset_sets.remove(response.getOffset());
offsetSets.remove(response.getOffset());
} catch (Exception e) {
LOG.severe("Thread execution failed: " + e.getMessage());
}
}
});
thread_arr[i - 5] = t;
LOG.info("Starting thread " + i + " with updated json data.");
t.start();
}

for (int i = 0; i < thread_nums; i++) {
for (int i = 0; i < numberThreads; i++) {
thread_arr[i].join();
}
assertTrue(offset_sets.size() == 0);
for (int i = 0; i < thread_nums; i++) {
assertTrue(offsetSets.size() == 0);
for (int i = 0; i < numberThreads; i++) {
assertEquals(
1,
testBigQueryWrite
Expand Down