Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove schema update capability from jsonwriter and delete related tests #1047

Merged
merged 3 commits into from May 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -36,10 +36,7 @@
/**
* A StreamWriter that can write JSON data (JSONObjects) to BigQuery tables. The JsonStreamWriter is
* built on top of a StreamWriter, and it simply converts all JSON data to protobuf messages then
* calls StreamWriter's append() method to write to BigQuery tables. It maintains all StreamWriter
* functions, but also provides an additional feature: schema update support, where if the BigQuery
* table schema is updated, users will be able to ingest data on the new schema after some time (in
* order of minutes).
* calls StreamWriter's append() method to write to BigQuery tables.
*/
public class JsonStreamWriter implements AutoCloseable {
private static String streamPatternString =
Expand Down Expand Up @@ -83,8 +80,7 @@ private JsonStreamWriter(Builder builder)

/**
* Writes a JSONArray that contains JSONObjects to the BigQuery table by first converting the JSON
* data to protobuf messages, then using StreamWriter's append() to write the data. If there is a
* schema update, the OnSchemaUpdateRunnable will be used to determine what actions to perform.
* data to protobuf messages, then using StreamWriter's append() to write the data.
*
* @param jsonArr The JSON array that contains JSONObjects to be written
* @return ApiFuture<AppendRowsResponse> returns an AppendRowsResponse message wrapped in an
Expand All @@ -96,8 +92,7 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr) {

/**
* Writes a JSONArray that contains JSONObjects to the BigQuery table by first converting the JSON
* data to protobuf messages, then using StreamWriter's append() to write the data. If there is a
* schema update, the OnSchemaUpdateRunnable will be used to determine what actions to perform.
* data to protobuf messages, then using StreamWriter's append() to write the data.
*
* @param jsonArr The JSON array that contains JSONObjects to be written
* @param offset Offset for deduplication
Expand Down Expand Up @@ -193,10 +188,6 @@ private void setStreamWriterSettings(
if (createDefaultStream) {
streamWriterBuilder.createDefaultStream();
}
JsonStreamWriterOnSchemaUpdateRunnable jsonStreamWriterOnSchemaUpdateRunnable =
new JsonStreamWriterOnSchemaUpdateRunnable();
jsonStreamWriterOnSchemaUpdateRunnable.setJsonStreamWriter(this);
streamWriterBuilder.setOnSchemaUpdateRunnable(jsonStreamWriterOnSchemaUpdateRunnable);
}

/**
Expand Down Expand Up @@ -267,39 +258,6 @@ public void close() {
this.streamWriter.close();
}

private class JsonStreamWriterOnSchemaUpdateRunnable extends OnSchemaUpdateRunnable {
private JsonStreamWriter jsonStreamWriter;
/**
* Setter for the jsonStreamWriter
*
* @param jsonStreamWriter
*/
public void setJsonStreamWriter(JsonStreamWriter jsonStreamWriter) {
this.jsonStreamWriter = jsonStreamWriter;
}

/** Getter for the jsonStreamWriter */
public JsonStreamWriter getJsonStreamWriter() {
return this.jsonStreamWriter;
}

@Override
public void run() {
this.getJsonStreamWriter().setTableSchema(this.getUpdatedSchema());
try {
this.getJsonStreamWriter().refreshConnection();
} catch (InterruptedException | IOException e) {
LOG.severe("StreamWriter failed to refresh upon schema update." + e);
return;
} catch (Descriptors.DescriptorValidationException e) {
LOG.severe(
"Schema update fail: updated schema could not be converted to a valid descriptor.");
return;
}
LOG.info("Successfully updated schema: " + this.getUpdatedSchema());
}
}

public static final class Builder {
private String streamOrTableName;
private BigQueryWriteClient client;
Expand Down
Expand Up @@ -19,6 +19,8 @@
* A abstract class that implements the Runnable interface and provides access to the current
* StreamWriter and updatedSchema. This runnable will only be called when a updated schema has been
* passed back through the AppendRowsResponse. Users should only implement the run() function.
*
* @deprecated
*/
public abstract class OnSchemaUpdateRunnable implements Runnable {
private StreamWriter streamWriter;
Expand Down