Skip to content

Commit

Permalink
feat: Add support for renaming tables in schema translator (#3185)
Browse files Browse the repository at this point in the history
* feat: Add support for renaming tables in schema translator (#3154)

Remove any column family transformations and reduce the transoformer to support table names only.

We don't want to encourage column family renames as there are many implications of column family renames. Mainly,
1. read filters won't work as they involve regexes
2. Migration validation jobs won't work as the column families will be different and hashes won't match.

There are complex ways to solve these issues but we can't see use-cases for column family renames that justify making this migration workflow complex around creation, querying and validation.

(cherry picked from commit 7d49bba)

* Fix javax.annotation build issues.

* Fix version number in the comments

* Fix the bad merge.

* Incorporating PR feedback
  • Loading branch information
vermas2012 committed Aug 19, 2021
1 parent 24436be commit e294c1e
Show file tree
Hide file tree
Showing 6 changed files with 518 additions and 34 deletions.
4 changes: 4 additions & 0 deletions bigtable-hbase-1.x-parent/bigtable-hbase-1.x-shaded/pom.xml
Expand Up @@ -304,6 +304,10 @@ limitations under the License.
com.google.bigtable.repackaged.org.codehaus
</shadedPattern>
</relocation>
<relocation>
<pattern>org.checkerframework</pattern>
<shadedPattern>com.google.bigtable.repackaged.org.checkerframework</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
Expand Down
7 changes: 6 additions & 1 deletion bigtable-hbase-1.x-parent/bigtable-hbase-1.x-tools/pom.xml
Expand Up @@ -51,7 +51,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.0-jre</version>
<version>30.1.1-android</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
Expand Down Expand Up @@ -80,6 +80,11 @@
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
</dependency>
</dependencies>

<build>
Expand Down
Expand Up @@ -16,32 +16,37 @@
package com.google.cloud.bigtable.hbase.tools;

import com.google.bigtable.repackaged.com.google.api.core.InternalApi;
import com.google.bigtable.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.bigtable.repackaged.com.google.common.base.Preconditions;
import com.google.bigtable.repackaged.com.google.gson.Gson;
import com.google.bigtable.repackaged.com.google.gson.reflect.TypeToken;
import com.google.cloud.bigtable.hbase.BigtableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import com.google.cloud.bigtable.hbase.tools.ClusterSchemaDefinition.TableSchemaDefinition;
import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Reader;
import java.io.Writer;
import java.lang.reflect.Type;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.log4j.BasicConfigurator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -52,8 +57,7 @@
* <p>Execute the following command to copy the schema from HBase to Cloud Bigtable:
*
* <pre>
* mvn exec:java \
* -Dexec.mainClass=com.google.cloud.bigtable.hbase.tools.HBaseSchemaTranslator \
* java -jar bigtable-hbase-1.x-tools-<your-version>-jar-with-dependencies.jar com.google.cloud.bigtable.hbase.tools.HBaseSchemaTranslator \
* -Dhbase.zookeeper.quorum=$ZOOKEEPER_QUORUM \
* -Dhbase.zookeeper.property.clientPort=$ZOOKEEPER_PORT \
* -Dgoogle.bigtable.table.filter=$TABLE_NAME_REGEX \
Expand All @@ -69,8 +73,7 @@
* <p>Run the tool from a host that can connect to HBase. Store HBase schema in a file:
*
* <pre>
* mvn exec:java \
* -Dexec.mainClass=com.google.cloud.bigtable.hbase.tools.HBaseSchemaTranslator \
* java -jar bigtable-hbase-1.x-tools-<your-version>-jar-with-dependencies.jar com.google.cloud.bigtable.hbase.tools.HBaseSchemaTranslator \
* -Dhbase.zookeeper.quorum=$ZOOKEEPER_QUORUM \
* -Dhbase.zookeeper.property.clientPort=$ZOOKEEPER_PORT \
* -Dgoogle.bigtable.table.filter=$TABLE_NAME_REGEX \
Expand All @@ -81,8 +84,7 @@
* Bigtable using the schema file:
*
* <pre>
* mvn exec:java \
* -Dexec.mainClass=com.google.cloud.bigtable.hbase.tools.HBaseSchemaTranslator \
* java -jar bigtable-hbase-1.x-tools-<your-version>-jar-with-dependencies.jar com.google.cloud.bigtable.hbase.tools.HBaseSchemaTranslator \
* -Dgoogle.bigtable.input.filepath=$SCHEMA_FILE_PATH \
* -Dgoogle.bigtable.project.id=$PROJECT_ID \
* -Dgoogle.bigtable.instance.id=$INSTANCE_ID
Expand All @@ -98,23 +100,25 @@ public class HBaseSchemaTranslator {
public static final String INPUT_FILE_KEY = "google.bigtable.input.filepath";
public static final String OUTPUT_FILE_KEY = "google.bigtable.output.filepath";
public static final String TABLE_NAME_FILTER_KEY = "google.bigtable.table.filter";
public static final String SCHEMA_MAPPING_FILEPATH = "google.bigtable.schema.mapping.filepath";

private static final Logger LOG = LoggerFactory.getLogger(HBaseSchemaTranslator.class);

private final SchemaReader schemaReader;
private final SchemaTransformer schemaTransformer;
private final SchemaWriter schemaWriter;
// TODO Add a schemaOverrider

@VisibleForTesting
static class SchemaTranslationOptions {

String projectId;
String instanceId;
String zookeeperQuorum;
Integer zookeeperPort;
String inputFilePath;
String outputFilePath;
String tableNameFilter;
@Nullable String projectId;
@Nullable String instanceId;
@Nullable String zookeeperQuorum;
@Nullable Integer zookeeperPort;
@Nullable String inputFilePath;
@Nullable String outputFilePath;
@Nullable String tableNameFilter;
@Nullable String schemaMappingFilePath;

@VisibleForTesting
SchemaTranslationOptions() {}
Expand Down Expand Up @@ -160,6 +164,7 @@ public static SchemaTranslationOptions loadOptionsFromSystemProperties() {
}

options.tableNameFilter = System.getProperty(TABLE_NAME_FILTER_KEY);
options.schemaMappingFilePath = System.getProperty(SCHEMA_MAPPING_FILEPATH);

// Ensure that the options are set properly
// TODO It is possible to validate the options without creating the object, but its less
Expand All @@ -175,15 +180,15 @@ public static SchemaTranslationOptions loadOptionsFromSystemProperties() {
}

/** Interface for reading HBase schema. */
interface SchemaReader {

private interface SchemaReader {
ClusterSchemaDefinition readSchema() throws IOException;
}

/**
* Reads HBase schema from a JSON file. JSON file should be representation of a {@link
* ClusterSchemaDefinition} object.
*/
@VisibleForTesting
static class FileBasedSchemaReader implements SchemaReader {

private final String schemaFilePath;
Expand All @@ -200,6 +205,7 @@ public ClusterSchemaDefinition readSchema() throws IOException {
}

/** Reads the HBase schema by connecting to an HBase cluster. */
@VisibleForTesting
static class HBaseSchemaReader implements SchemaReader {

private final String tableFilterPattern;
Expand Down Expand Up @@ -245,14 +251,17 @@ private byte[][] getSplits(TableName table) throws IOException {
return new byte[0][];
}

byte[][] splits = new byte[regions.size()][];
int i = 0;
List<byte[]> splits = new ArrayList<>();
for (HRegionInfo region : regions) {
splits[i] = region.getStartKey();
i++;
if (Arrays.equals(region.getStartKey(), HConstants.EMPTY_START_ROW)) {
// CBT client does not accept an empty row as a split.
continue;
}
splits.add(region.getStartKey());
}
LOG.debug("Found {} splits for table {}.", splits.length, table.getNameAsString());
return splits;

LOG.debug("Found {} splits for table {}.", splits.size(), table.getNameAsString());
return splits.toArray(new byte[0][]);
}

@Override
Expand All @@ -273,7 +282,7 @@ public ClusterSchemaDefinition readSchema() throws IOException {
/**
* Interface for writing the HBase schema represented by a {@link ClusterSchemaDefinition} object.
*/
interface SchemaWriter {
private interface SchemaWriter {

void writeSchema(ClusterSchemaDefinition schemaDefinition) throws IOException;
}
Expand All @@ -282,6 +291,7 @@ interface SchemaWriter {
* Writes the HBase schema into a file. File contains the JSON representation of the {@link
* ClusterSchemaDefinition} object.
*/
@VisibleForTesting
static class FileBasedSchemaWriter implements SchemaWriter {

private final String outputFilePath;
Expand All @@ -304,6 +314,7 @@ public void writeSchema(ClusterSchemaDefinition schemaDefinition) throws IOExcep
* Creates tables in Cloud Bigtable based on the schema provided by the {@link
* ClusterSchemaDefinition} object.
*/
@VisibleForTesting
static class BigtableSchemaWriter implements SchemaWriter {

private final Admin btAdmin;
Expand Down Expand Up @@ -353,23 +364,124 @@ public HBaseSchemaTranslator(SchemaTranslationOptions options) throws IOExceptio
options.zookeeperQuorum, options.zookeeperPort, options.tableNameFilter);
}

if (options.schemaMappingFilePath != null) {

this.schemaTransformer =
JsonBasedSchemaTransformer.newSchemaTransformerFromJsonFile(
options.schemaMappingFilePath);
} else {
this.schemaTransformer = new NoopSchemaTransformer();
}

if (options.outputFilePath != null) {
this.schemaWriter = new FileBasedSchemaWriter(options.outputFilePath);
} else {
this.schemaWriter = new BigtableSchemaWriter(options.projectId, options.instanceId);
}
}

/**
* Transforms the {@link ClusterSchemaDefinition} read by {@link SchemaReader} before writing it
* to {@link SchemaWriter}.
*/
private interface SchemaTransformer {

ClusterSchemaDefinition transform(ClusterSchemaDefinition originalSchema)
throws IOException, DeserializationException;
}

/** No-op implementation of @{@link SchemaTransformer}. Returns the original schema definition. */
private static class NoopSchemaTransformer implements SchemaTransformer {

@Override
public ClusterSchemaDefinition transform(ClusterSchemaDefinition originalSchema) {
return originalSchema;
}
}

/**
* Transforms the @{@link ClusterSchemaDefinition} based on a provided JSON map. It can rename
* tables before writing them to {@link SchemaWriter}.
*
* <p>JSON map should look like { "SourceTable": "DestinationTable",
* "sourceTable-2":"DestinationTable-2"}
*/
@VisibleForTesting
static class JsonBasedSchemaTransformer implements SchemaTransformer {

// Map from old-tableName -> new-tableName
@VisibleForTesting Map<String, String> tableNameMappings;

@VisibleForTesting
JsonBasedSchemaTransformer(Map<String, String> tableNameMappings) {
this.tableNameMappings = tableNameMappings;
LOG.info("Creating SchemaTransformer with schema mapping: {}", tableNameMappings);
}

public static JsonBasedSchemaTransformer newSchemaTransformerFromJsonFile(
String mappingFilePath) throws IOException {

Map<String, String> tableNameMappings = null;
Type mapType = new TypeToken<Map<String, String>>() {}.getType();

try (Reader jsonReader = new FileReader(mappingFilePath)) {
tableNameMappings = new Gson().fromJson(jsonReader, mapType);
}

if (tableNameMappings == null) {
throw new IllegalStateException(
"SchemaMapping file does not contain valid schema mappings");
}

return new JsonBasedSchemaTransformer(tableNameMappings);
}

@Override
public ClusterSchemaDefinition transform(ClusterSchemaDefinition originalSchema)
throws DeserializationException, IOException {
ClusterSchemaDefinition transformedSchema = new ClusterSchemaDefinition();

// Apply the transformations.
for (TableSchemaDefinition tableSchemaDefinition : originalSchema.tableSchemaDefinitions) {
String newTableName = tableSchemaDefinition.name;
HTableDescriptor tableDescriptor = tableSchemaDefinition.getHbaseTableDescriptor();
HTableDescriptor newTableDescriptor = tableDescriptor;

// Override the table name if its present in the mapping file
if (tableNameMappings.containsKey(newTableName)) {
newTableName = tableNameMappings.get(newTableName);
// Rename the table and copy all the other configs, including the column families.
newTableDescriptor =
new HTableDescriptor(TableName.valueOf(newTableName), tableDescriptor);
LOG.info("Renaming table {} to {}.", tableSchemaDefinition.name, newTableName);
}

// finalize the transformed schema
transformedSchema.addTableSchemaDefinition(
newTableDescriptor, tableSchemaDefinition.splits);
}
return transformedSchema;
}
}

@VisibleForTesting
HBaseSchemaTranslator(SchemaReader schemaReader, SchemaWriter schemaWriter) {
this(schemaReader, schemaWriter, new NoopSchemaTransformer());
}

@VisibleForTesting
HBaseSchemaTranslator(
SchemaReader schemaReader, SchemaWriter schemaWriter, SchemaTransformer schemaTransformer) {
this.schemaReader = schemaReader;
this.schemaWriter = schemaWriter;
this.schemaTransformer = schemaTransformer;
}

public void translate() throws IOException {
public void translate() throws IOException, DeserializationException {
ClusterSchemaDefinition schemaDefinition = schemaReader.readSchema();
LOG.info("Read schema with {} tables.", schemaDefinition.tableSchemaDefinitions.size());
this.schemaWriter.writeSchema(schemaDefinition);

this.schemaWriter.writeSchema(schemaTransformer.transform(schemaDefinition));
}

/*
Expand Down Expand Up @@ -418,9 +530,13 @@ private static void usage(final String errorMsg) {
System.err.println(
" Additionally, you can filter tables to create when using HBase as source");
System.err.println(" -D " + TABLE_NAME_FILTER_KEY + "=<table name regex>");
System.err.println(
" Optionally, the tables can be renamed by providing a JSON map. Example JSON "
+ "{\"source-table\": \"destination-table\", \"namespace:source-table2\": \"namespace-destination-table2\"}.");
System.err.println(" -D " + SCHEMA_MAPPING_FILEPATH + "=/schema/mapping/file/path.json");
}

public static void main(String[] args) throws IOException {
public static void main(String[] args) throws IOException, DeserializationException {
// Configure the logger.
BasicConfigurator.configure();

Expand Down

0 comments on commit e294c1e

Please sign in to comment.