Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for table snapshot #1320

Merged
merged 8 commits into from Jun 28, 2021
Expand Up @@ -38,6 +38,7 @@ public final class CopyJobConfiguration extends JobConfiguration {

private final List<TableId> sourceTables;
private final TableId destinationTable;
private final String operationType;
private final JobInfo.CreateDisposition createDisposition;
private final JobInfo.WriteDisposition writeDisposition;
private final EncryptionConfiguration destinationEncryptionConfiguration;
Expand All @@ -49,6 +50,7 @@ public static final class Builder

private List<TableId> sourceTables;
private TableId destinationTable;
private String operationType;
private JobInfo.CreateDisposition createDisposition;
private JobInfo.WriteDisposition writeDisposition;
private EncryptionConfiguration destinationEncryptionConfiguration;
Expand All @@ -63,6 +65,7 @@ private Builder(CopyJobConfiguration jobConfiguration) {
this();
this.sourceTables = jobConfiguration.sourceTables;
this.destinationTable = jobConfiguration.destinationTable;
this.operationType = jobConfiguration.operationType;
this.createDisposition = jobConfiguration.createDisposition;
this.writeDisposition = jobConfiguration.writeDisposition;
this.destinationEncryptionConfiguration = jobConfiguration.destinationEncryptionConfiguration;
Expand All @@ -74,6 +77,9 @@ private Builder(com.google.api.services.bigquery.model.JobConfiguration configur
this();
JobConfigurationTableCopy copyConfigurationPb = configurationPb.getCopy();
this.destinationTable = TableId.fromPb(copyConfigurationPb.getDestinationTable());
if (copyConfigurationPb.getOperationType() != null) {
this.operationType = copyConfigurationPb.getOperationType();
}
if (copyConfigurationPb.getSourceTables() != null) {
this.sourceTables =
Lists.transform(copyConfigurationPb.getSourceTables(), TableId.FROM_PB_FUNCTION);
Expand Down Expand Up @@ -114,6 +120,15 @@ public Builder setDestinationTable(TableId destinationTable) {
return this;
}

/**
* Sets the supported operation types (COPY, SNAPSHOT or RESTORE) in table copy job. More info:
* https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#operationtype
*/
public Builder setOperationType(String operationType) {
this.operationType = operationType;
return this;
}

public Builder setDestinationEncryptionConfiguration(
EncryptionConfiguration encryptionConfiguration) {
this.destinationEncryptionConfiguration = encryptionConfiguration;
Expand Down Expand Up @@ -178,6 +193,7 @@ private CopyJobConfiguration(Builder builder) {
super(builder);
this.sourceTables = checkNotNull(builder.sourceTables);
this.destinationTable = checkNotNull(builder.destinationTable);
this.operationType = builder.operationType;
this.createDisposition = builder.createDisposition;
this.writeDisposition = builder.writeDisposition;
this.destinationEncryptionConfiguration = builder.destinationEncryptionConfiguration;
Expand All @@ -195,6 +211,11 @@ public TableId getDestinationTable() {
return destinationTable;
}

/** Returns the table copy job type */
public String getOperationType() {
return operationType;
}

public EncryptionConfiguration getDestinationEncryptionConfiguration() {
return destinationEncryptionConfiguration;
}
Expand Down Expand Up @@ -241,6 +262,7 @@ ToStringHelper toStringHelper() {
return super.toStringHelper()
.add("sourceTables", sourceTables)
.add("destinationTable", destinationTable)
.add("operationType", operationType)
.add("destinationEncryptionConfiguration", destinationEncryptionConfiguration)
.add("createDisposition", createDisposition)
.add("writeDisposition", writeDisposition)
Expand All @@ -260,6 +282,7 @@ public int hashCode() {
baseHashCode(),
sourceTables,
destinationTable,
operationType,
createDisposition,
writeDisposition,
labels,
Expand Down Expand Up @@ -293,11 +316,12 @@ com.google.api.services.bigquery.model.JobConfiguration toPb() {
com.google.api.services.bigquery.model.JobConfiguration jobConfiguration =
new com.google.api.services.bigquery.model.JobConfiguration();
configurationPb.setDestinationTable(destinationTable.toPb());
if (sourceTables.size() == 1) {
configurationPb.setSourceTable(sourceTables.get(0).toPb());
} else {
if (sourceTables != null) {
configurationPb.setSourceTables(Lists.transform(sourceTables, TableId.TO_PB_FUNCTION));
}
if (operationType != null) {
configurationPb.setOperationType(operationType);
}
if (createDisposition != null) {
configurationPb.setCreateDisposition(createDisposition.toString());
}
Expand Down
@@ -0,0 +1,112 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.bigquery;

import com.google.api.client.util.DateTime;
import com.google.api.core.BetaApi;
import com.google.api.services.bigquery.model.Table;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import javax.annotation.Nullable;

@AutoValue
@BetaApi
public abstract class SnapshotTableDefinition extends TableDefinition {

private static final long serialVersionUID = 2113445776046717526L;

@AutoValue.Builder
public abstract static class Builder
extends TableDefinition.Builder<SnapshotTableDefinition, Builder> {

/** Reference describing the ID of the table that was snapshot. * */
public abstract Builder setBaseTableId(TableId baseTableId);

/**
* The time at which the base table was snapshot. This value is reported in the JSON response
* using RFC3339 format. *
*/
public abstract Builder setSnapshotTime(String dateTime);

public abstract Builder setTimePartitioning(TimePartitioning timePartitioning);

public abstract Builder setRangePartitioning(RangePartitioning rangePartitioning);

public abstract Builder setClustering(Clustering clustering);

/** Creates a {@code SnapshotTableDefinition} object. */
public abstract SnapshotTableDefinition build();
}

@Nullable
public abstract TableId getBaseTableId();

@Nullable
public abstract String getSnapshotTime();

@Nullable
public abstract TimePartitioning getTimePartitioning();

@Nullable
public abstract RangePartitioning getRangePartitioning();

@Nullable
public abstract Clustering getClustering();

/** Returns a builder for a snapshot table definition. */
public static SnapshotTableDefinition.Builder newBuilder() {
return new AutoValue_SnapshotTableDefinition.Builder().setType(Type.SNAPSHOT);
}

@VisibleForTesting
public abstract SnapshotTableDefinition.Builder toBuilder();

@Override
Table toPb() {
Table tablePb = super.toPb();
com.google.api.services.bigquery.model.SnapshotDefinition snapshotDefinition =
new com.google.api.services.bigquery.model.SnapshotDefinition();
snapshotDefinition.setBaseTableReference(getBaseTableId().toPb());
snapshotDefinition.setSnapshotTime(DateTime.parseRfc3339(getSnapshotTime()));
tablePb.setSnapshotDefinition(snapshotDefinition);
if (getTimePartitioning() != null) {
tablePb.setTimePartitioning(getTimePartitioning().toPb());
}
if (getRangePartitioning() != null) {
tablePb.setRangePartitioning(getRangePartitioning().toPb());
}
if (getClustering() != null) {
tablePb.setClustering(getClustering().toPb());
}
return tablePb;
}

static SnapshotTableDefinition fromPb(Table tablePb) {
Builder builder = newBuilder().table(tablePb);
com.google.api.services.bigquery.model.SnapshotDefinition snapshotDefinition =
tablePb.getSnapshotDefinition();
if (snapshotDefinition != null) {
if (snapshotDefinition.getBaseTableReference() != null) {
builder.setBaseTableId(TableId.fromPb(snapshotDefinition.getBaseTableReference()));
}
if (snapshotDefinition.getSnapshotTime() != null) {
builder.setSnapshotTime(snapshotDefinition.getSnapshotTime().toStringRfc3339());
}
}
return builder.build();
}
}
Expand Up @@ -83,6 +83,8 @@ public Type apply(String constant) {
*/
public static final Type MODEL = type.createAndRegister("MODEL");

public static final Type SNAPSHOT = type.createAndRegister("SNAPSHOT");

private Type(String constant) {
super(constant);
}
Expand Down Expand Up @@ -165,6 +167,8 @@ static <T extends TableDefinition> T fromPb(Table tablePb) {
return (T) ExternalTableDefinition.fromPb(tablePb);
case "MODEL":
return (T) ModelTableDefinition.fromPb(tablePb);
case "SNAPSHOT":
stephaniewang526 marked this conversation as resolved.
Show resolved Hide resolved
return (T) SnapshotTableDefinition.fromPb(tablePb);
default:
// never reached
throw new IllegalArgumentException("Format " + tablePb.getType() + " is not supported");
Expand Down
Expand Up @@ -113,7 +113,8 @@ public void testToPbAndFromPb() {
assertNull(COPY_JOB_CONFIGURATION.toPb().getExtract());
assertNull(COPY_JOB_CONFIGURATION.toPb().getLoad());
assertNull(COPY_JOB_CONFIGURATION.toPb().getQuery());
assertNull(COPY_JOB_CONFIGURATION.toPb().getCopy().getSourceTables());
assertNull(COPY_JOB_CONFIGURATION.toPb().getCopy().getSourceTable());
assertNotNull(COPY_JOB_CONFIGURATION.toPb().getCopy().getSourceTables());
assertNull(COPY_JOB_CONFIGURATION_MULTIPLE_TABLES.toPb().getCopy().getSourceTable());
assertNotNull(COPY_JOB_CONFIGURATION.getLabels());
assertNotNull(COPY_JOB_CONFIGURATION_MULTIPLE_TABLES.getLabels());
Expand Down
@@ -0,0 +1,72 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.bigquery;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import org.junit.Test;

public class SnapshotTableDefinitionTest {

private static final TableId BASE_TABLE_ID = TableId.of("DATASET_NAME", "BASE_TABLE_NAME");
private static final String SNAPSHOT_TIME = "2021-05-19T11:32:26.553Z";
private static final SnapshotTableDefinition SNAPSHOTTABLE_DEFINITION =
SnapshotTableDefinition.newBuilder()
.setBaseTableId(BASE_TABLE_ID)
.setSnapshotTime(SNAPSHOT_TIME)
.build();

@Test
public void testToBuilder() {
compareSnapshotTableDefinition(
SNAPSHOTTABLE_DEFINITION, SNAPSHOTTABLE_DEFINITION.toBuilder().build());
SnapshotTableDefinition snapshotTableDefinition =
SNAPSHOTTABLE_DEFINITION.toBuilder().setSnapshotTime("2021-05-20T11:32:26.553Z").build();
assertEquals("2021-05-20T11:32:26.553Z", snapshotTableDefinition.getSnapshotTime());
}

@Test
public void testBuilder() {
assertEquals(TableDefinition.Type.SNAPSHOT, SNAPSHOTTABLE_DEFINITION.getType());
assertEquals(BASE_TABLE_ID, SNAPSHOTTABLE_DEFINITION.getBaseTableId());
assertEquals(SNAPSHOT_TIME, SNAPSHOTTABLE_DEFINITION.getSnapshotTime());
SnapshotTableDefinition snapshotTableDefinition =
SnapshotTableDefinition.newBuilder()
.setBaseTableId(BASE_TABLE_ID)
.setSnapshotTime(SNAPSHOT_TIME)
.build();
assertEquals(SNAPSHOTTABLE_DEFINITION, snapshotTableDefinition);
}

@Test
public void testToAndFromPb() {
SnapshotTableDefinition snapshotTableDefinition = SNAPSHOTTABLE_DEFINITION.toBuilder().build();
assertTrue(
TableDefinition.fromPb(snapshotTableDefinition.toPb()) instanceof SnapshotTableDefinition);
compareSnapshotTableDefinition(
snapshotTableDefinition,
TableDefinition.<SnapshotTableDefinition>fromPb(snapshotTableDefinition.toPb()));
}

private void compareSnapshotTableDefinition(
SnapshotTableDefinition expected, SnapshotTableDefinition value) {
assertEquals(expected, value);
assertEquals(expected.getBaseTableId(), value.getBaseTableId());
assertEquals(expected.getSnapshotTime(), value.getSnapshotTime());
}
}
Expand Up @@ -86,6 +86,7 @@
import com.google.cloud.bigquery.RoutineId;
import com.google.cloud.bigquery.RoutineInfo;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.SnapshotTableDefinition;
import com.google.cloud.bigquery.StandardSQLDataType;
import com.google.cloud.bigquery.StandardSQLField;
import com.google.cloud.bigquery.StandardSQLTableType;
Expand Down Expand Up @@ -2589,6 +2590,70 @@ public void testCopyJob() throws InterruptedException, TimeoutException {
assertTrue(remoteTable.delete());
}

@Test
public void testSnapshotTableCopyJob() throws InterruptedException {
String sourceTableName = "test_copy_job_base_table";
// this creates a snapshot table at specified snapshotTime
String snapshotTableName = String.format("test_snapshot_table");
// Create source table
TableId sourceTableId = TableId.of(DATASET, sourceTableName);
StandardTableDefinition tableDefinition = StandardTableDefinition.of(TABLE_SCHEMA);
TableInfo tableInfo = TableInfo.of(sourceTableId, tableDefinition);
Table createdTable = bigquery.create(tableInfo);
stephaniewang526 marked this conversation as resolved.
Show resolved Hide resolved
assertNotNull(createdTable);

// Create snapshot table using source table as the base table
TableId snapshotTableId = TableId.of(DATASET, snapshotTableName);
CopyJobConfiguration snapshotConfiguration =
CopyJobConfiguration.newBuilder(snapshotTableId, sourceTableId)
.setOperationType("SNAPSHOT")
.build();
Job createdJob = bigquery.create(JobInfo.of(snapshotConfiguration));
CopyJobConfiguration createdConfiguration = createdJob.getConfiguration();
assertNotNull(createdConfiguration.getSourceTables());
assertNotNull(createdConfiguration.getOperationType());
assertNotNull(createdConfiguration.getDestinationTable());
Job completedJob = createdJob.waitFor();
assertNull(completedJob.getStatus().getError());
Table snapshotTable = bigquery.getTable(DATASET, snapshotTableName);
assertNotNull(snapshotTable);
assertEquals(snapshotTableId.getDataset(), snapshotTable.getTableId().getDataset());
assertEquals(snapshotTableName, snapshotTable.getTableId().getTable());
assertTrue(snapshotTable.getDefinition() instanceof SnapshotTableDefinition);
assertEquals(TABLE_SCHEMA, snapshotTable.getDefinition().getSchema());
assertNotNull(((SnapshotTableDefinition) snapshotTable.getDefinition()).getSnapshotTime());
assertEquals(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason you didn't just compare tableIds? Something with relative resolution of project?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I would get this error if comparing tableIds:

java.lang.AssertionError: 
Expected :GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=gcloud_test_dataset_temp_413a3914_e293_4da3_a3cd_e691ba96cdd1, tableId=test_copy_job_base_table}}
Actual   :GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=gcloud_test_dataset_temp_413a3914_e293_4da3_a3cd_e691ba96cdd1, projectId=java-docs-samples-testing, tableId=test_copy_job_base_table}}

So I am comparing just the tableNames.

sourceTableName,
((SnapshotTableDefinition) snapshotTable.getDefinition()).getBaseTableId().getTable());

// Restore base table to a new table
String restoredTableName = "test_restore_table";
TableId restoredTableId = TableId.of(DATASET, restoredTableName);
CopyJobConfiguration restoreConfiguration =
CopyJobConfiguration.newBuilder(restoredTableId, snapshotTableId)
.setOperationType("RESTORE")
.build();
Job createdRestoreJob = bigquery.create(JobInfo.of(restoreConfiguration));
CopyJobConfiguration createdRestoreConfiguration = createdRestoreJob.getConfiguration();
assertNotNull(createdRestoreConfiguration.getSourceTables());
stephaniewang526 marked this conversation as resolved.
Show resolved Hide resolved
assertNotNull(createdRestoreConfiguration.getOperationType());
assertNotNull(createdRestoreConfiguration.getDestinationTable());
Job completedRestoreJob = createdRestoreJob.waitFor();
assertNull(completedRestoreJob.getStatus().getError());
Table restoredTable = bigquery.getTable(DATASET, restoredTableName);
assertNotNull(restoredTable);
assertEquals(restoredTableId.getDataset(), restoredTable.getTableId().getDataset());
assertEquals(restoredTableName, restoredTable.getTableId().getTable());
assertEquals(TABLE_SCHEMA, restoredTable.getDefinition().getSchema());
assertEquals(snapshotTable.getNumBytes(), restoredTable.getNumBytes());
assertEquals(snapshotTable.getNumRows(), restoredTable.getNumRows());

// Clean up
assertTrue(createdTable.delete());
stephaniewang526 marked this conversation as resolved.
Show resolved Hide resolved
assertTrue(restoredTable.delete());
assertTrue(snapshotTable.delete());
}

@Test
public void testCopyJobWithLabels() throws InterruptedException {
String sourceTableName = "test_copy_job_source_table_label";
Expand Down