Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add labels for writechannelconfiguration #347

Merged
merged 1 commit into from May 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -28,6 +28,7 @@
import com.google.common.primitives.Ints;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
Expand All @@ -53,6 +54,7 @@ public final class WriteChannelConfiguration implements LoadConfiguration, Seria
private final TimePartitioning timePartitioning;
private final Clustering clustering;
private final Boolean useAvroLogicalTypes;
private final Map<String, String> labels;

public static final class Builder implements LoadConfiguration.Builder {

Expand All @@ -70,6 +72,7 @@ public static final class Builder implements LoadConfiguration.Builder {
private TimePartitioning timePartitioning;
private Clustering clustering;
private Boolean useAvroLogicalTypes;
private Map<String, String> labels;

private Builder() {}

Expand All @@ -89,6 +92,7 @@ private Builder(WriteChannelConfiguration writeChannelConfiguration) {
this.timePartitioning = writeChannelConfiguration.timePartitioning;
this.clustering = writeChannelConfiguration.clustering;
this.useAvroLogicalTypes = writeChannelConfiguration.useAvroLogicalTypes;
this.labels = writeChannelConfiguration.labels;
}

private Builder(com.google.api.services.bigquery.model.JobConfiguration configurationPb) {
Expand Down Expand Up @@ -162,6 +166,9 @@ private Builder(com.google.api.services.bigquery.model.JobConfiguration configur
this.clustering = Clustering.fromPb(loadConfigurationPb.getClustering());
}
this.useAvroLogicalTypes = loadConfigurationPb.getUseAvroLogicalTypes();
if (configurationPb.getLabels() != null) {
this.labels = configurationPb.getLabels();
}
}

@Override
Expand Down Expand Up @@ -250,6 +257,11 @@ public Builder setUseAvroLogicalTypes(Boolean useAvroLogicalTypes) {
return this;
}

public Builder setLabels(Map<String, String> labels) {
this.labels = labels;
return this;
}

@Override
public WriteChannelConfiguration build() {
return new WriteChannelConfiguration(this);
Expand All @@ -271,6 +283,7 @@ protected WriteChannelConfiguration(Builder builder) {
this.timePartitioning = builder.timePartitioning;
this.clustering = builder.clustering;
this.useAvroLogicalTypes = builder.useAvroLogicalTypes;
this.labels = builder.labels;
}

@Override
Expand Down Expand Up @@ -355,6 +368,10 @@ public Boolean getUseAvroLogicalTypes() {
return useAvroLogicalTypes;
}

public Map<String, String> getLabels() {
return labels;
}

@Override
public Builder toBuilder() {
return new Builder(this);
Expand All @@ -375,7 +392,8 @@ MoreObjects.ToStringHelper toStringHelper() {
.add("autodetect", autodetect)
.add("timePartitioning", timePartitioning)
.add("clustering", clustering)
.add("useAvroLogicalTypes", useAvroLogicalTypes);
.add("useAvroLogicalTypes", useAvroLogicalTypes)
.add("labels", labels);
}

@Override
Expand Down Expand Up @@ -405,7 +423,8 @@ public int hashCode() {
autodetect,
timePartitioning,
clustering,
useAvroLogicalTypes);
useAvroLogicalTypes,
labels);
}

WriteChannelConfiguration setProjectId(String projectId) {
Expand All @@ -416,6 +435,8 @@ WriteChannelConfiguration setProjectId(String projectId) {
}

com.google.api.services.bigquery.model.JobConfiguration toPb() {
com.google.api.services.bigquery.model.JobConfiguration jobConfiguration =
new com.google.api.services.bigquery.model.JobConfiguration();
JobConfigurationLoad loadConfigurationPb = new JobConfigurationLoad();
loadConfigurationPb.setDestinationTable(destinationTable.toPb());
if (createDisposition != null) {
Expand Down Expand Up @@ -471,8 +492,11 @@ com.google.api.services.bigquery.model.JobConfiguration toPb() {
loadConfigurationPb.setClustering(clustering.toPb());
}
loadConfigurationPb.setUseAvroLogicalTypes(useAvroLogicalTypes);
return new com.google.api.services.bigquery.model.JobConfiguration()
.setLoad(loadConfigurationPb);
if (labels != null) {
jobConfiguration.setLabels(labels);
}
jobConfiguration.setLoad(loadConfigurationPb);
return jobConfiguration;
}

static WriteChannelConfiguration fromPb(
Expand Down
Expand Up @@ -23,8 +23,10 @@
import com.google.cloud.bigquery.JobInfo.WriteDisposition;
import com.google.cloud.bigquery.TimePartitioning.Type;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import org.junit.Test;

public class WriteChannelConfigurationTest {
Expand Down Expand Up @@ -55,6 +57,8 @@ public class WriteChannelConfigurationTest {
private static final TimePartitioning TIME_PARTITIONING = TimePartitioning.of(Type.DAY);
private static final Clustering CLUSTERING =
Clustering.newBuilder().setFields(ImmutableList.of("Foo", "Bar")).build();
private static final Map<String, String> LABELS =
ImmutableMap.of("test-job-name", "test-write-channel");
private static final WriteChannelConfiguration LOAD_CONFIGURATION_CSV =
WriteChannelConfiguration.newBuilder(TABLE_ID)
.setCreateDisposition(CREATE_DISPOSITION)
Expand All @@ -68,6 +72,7 @@ public class WriteChannelConfigurationTest {
.setAutodetect(AUTODETECT)
.setTimePartitioning(TIME_PARTITIONING)
.setClustering(CLUSTERING)
.setLabels(LABELS)
.build();

private static final DatastoreBackupOptions BACKUP_OPTIONS =
Expand Down Expand Up @@ -151,6 +156,7 @@ public void testBuilder() {
assertEquals(IGNORE_UNKNOWN_VALUES, LOAD_CONFIGURATION_CSV.ignoreUnknownValues());
assertEquals(MAX_BAD_RECORDS, LOAD_CONFIGURATION_CSV.getMaxBadRecords());
assertEquals(TABLE_SCHEMA, LOAD_CONFIGURATION_CSV.getSchema());
assertEquals(LABELS, LOAD_CONFIGURATION_CSV.getLabels());
assertEquals(BACKUP_OPTIONS, LOAD_CONFIGURATION_BACKUP.getDatastoreBackupOptions());
assertEquals(SCHEMA_UPDATE_OPTIONS, LOAD_CONFIGURATION_CSV.getSchemaUpdateOptions());
assertEquals(SCHEMA_UPDATE_OPTIONS, LOAD_CONFIGURATION_BACKUP.getSchemaUpdateOptions());
Expand Down Expand Up @@ -218,5 +224,6 @@ private void compareLoadConfiguration(
assertEquals(expected.getTimePartitioning(), value.getTimePartitioning());
assertEquals(expected.getClustering(), value.getClustering());
assertEquals(expected.getUseAvroLogicalTypes(), value.getUseAvroLogicalTypes());
assertEquals(expected.getLabels(), value.getLabels());
}
}
Expand Up @@ -2105,6 +2105,40 @@ public void testInsertFromFile() throws InterruptedException, IOException, Timeo
assertTrue(bigquery.delete(tableId));
}

@Test
public void testInsertFromFileWithLabels()
throws InterruptedException, IOException, TimeoutException {
String destinationTableName = "test_insert_from_file_table_with_labels";
TableId tableId = TableId.of(DATASET, destinationTableName);
WriteChannelConfiguration configuration =
WriteChannelConfiguration.newBuilder(tableId)
.setFormatOptions(FormatOptions.json())
.setCreateDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED)
.setSchema(TABLE_SCHEMA)
.setLabels(LABELS)
.build();
TableDataWriteChannel channel = bigquery.writer(configuration);
try {
// A zero byte write should not throw an exception.
assertEquals(0, channel.write(ByteBuffer.wrap("".getBytes(StandardCharsets.UTF_8))));
} finally {
// Force the channel to flush by calling `close`.
channel.close();
}
channel = bigquery.writer(configuration);
try {
channel.write(ByteBuffer.wrap(JSON_CONTENT.getBytes(StandardCharsets.UTF_8)));
} finally {
channel.close();
}
Job job = channel.getJob().waitFor();
LoadJobConfiguration jobConfiguration = job.getConfiguration();
assertEquals(TABLE_SCHEMA, jobConfiguration.getSchema());
assertEquals(LABELS, jobConfiguration.getLabels());
assertNull(job.getStatus().getError());
assertTrue(bigquery.delete(tableId));
}

@Test
public void testLocation() throws Exception {
String location = "EU";
Expand Down