Skip to content

Commit

Permalink
fix: remove schema update capability from jsonwriter and delete relat…
Browse files Browse the repository at this point in the history
…ed tests (#1047)

* fix: remove schema update capability from json writer and delete related tests

* .

* .
  • Loading branch information
yirutang committed May 4, 2021
1 parent e75f9e7 commit 21e399b
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 700 deletions.
Expand Up @@ -36,10 +36,7 @@
/**
* A StreamWriter that can write JSON data (JSONObjects) to BigQuery tables. The JsonStreamWriter is
* built on top of a StreamWriter, and it simply converts all JSON data to protobuf messages then
* calls StreamWriter's append() method to write to BigQuery tables. It maintains all StreamWriter
* functions, but also provides an additional feature: schema update support, where if the BigQuery
* table schema is updated, users will be able to ingest data on the new schema after some time (in
* order of minutes).
* calls StreamWriter's append() method to write to BigQuery tables.
*/
public class JsonStreamWriter implements AutoCloseable {
private static String streamPatternString =
Expand Down Expand Up @@ -83,8 +80,7 @@ private JsonStreamWriter(Builder builder)

/**
* Writes a JSONArray that contains JSONObjects to the BigQuery table by first converting the JSON
* data to protobuf messages, then using StreamWriter's append() to write the data. If there is a
* schema update, the OnSchemaUpdateRunnable will be used to determine what actions to perform.
* data to protobuf messages, then using StreamWriter's append() to write the data.
*
* @param jsonArr The JSON array that contains JSONObjects to be written
* @return ApiFuture<AppendRowsResponse> returns an AppendRowsResponse message wrapped in an
Expand All @@ -96,8 +92,7 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr) {

/**
* Writes a JSONArray that contains JSONObjects to the BigQuery table by first converting the JSON
* data to protobuf messages, then using StreamWriter's append() to write the data. If there is a
* schema update, the OnSchemaUpdateRunnable will be used to determine what actions to perform.
* data to protobuf messages, then using StreamWriter's append() to write the data.
*
* @param jsonArr The JSON array that contains JSONObjects to be written
* @param offset Offset for deduplication
Expand Down Expand Up @@ -193,10 +188,6 @@ private void setStreamWriterSettings(
if (createDefaultStream) {
streamWriterBuilder.createDefaultStream();
}
JsonStreamWriterOnSchemaUpdateRunnable jsonStreamWriterOnSchemaUpdateRunnable =
new JsonStreamWriterOnSchemaUpdateRunnable();
jsonStreamWriterOnSchemaUpdateRunnable.setJsonStreamWriter(this);
streamWriterBuilder.setOnSchemaUpdateRunnable(jsonStreamWriterOnSchemaUpdateRunnable);
}

/**
Expand Down Expand Up @@ -267,39 +258,6 @@ public void close() {
this.streamWriter.close();
}

private class JsonStreamWriterOnSchemaUpdateRunnable extends OnSchemaUpdateRunnable {
private JsonStreamWriter jsonStreamWriter;
/**
* Setter for the jsonStreamWriter
*
* @param jsonStreamWriter
*/
public void setJsonStreamWriter(JsonStreamWriter jsonStreamWriter) {
this.jsonStreamWriter = jsonStreamWriter;
}

/** Getter for the jsonStreamWriter */
public JsonStreamWriter getJsonStreamWriter() {
return this.jsonStreamWriter;
}

@Override
public void run() {
this.getJsonStreamWriter().setTableSchema(this.getUpdatedSchema());
try {
this.getJsonStreamWriter().refreshConnection();
} catch (InterruptedException | IOException e) {
LOG.severe("StreamWriter failed to refresh upon schema update." + e);
return;
} catch (Descriptors.DescriptorValidationException e) {
LOG.severe(
"Schema update fail: updated schema could not be converted to a valid descriptor.");
return;
}
LOG.info("Successfully updated schema: " + this.getUpdatedSchema());
}
}

public static final class Builder {
private String streamOrTableName;
private BigQueryWriteClient client;
Expand Down
Expand Up @@ -19,6 +19,8 @@
* A abstract class that implements the Runnable interface and provides access to the current
* StreamWriter and updatedSchema. This runnable will only be called when a updated schema has been
* passed back through the AppendRowsResponse. Users should only implement the run() function.
*
* @deprecated
*/
public abstract class OnSchemaUpdateRunnable implements Runnable {
private StreamWriter streamWriter;
Expand Down

0 comments on commit 21e399b

Please sign in to comment.