Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure putSnapshot path honoring case insensitive contract #85

Merged
merged 11 commits into from
May 1, 2024
8 changes: 8 additions & 0 deletions integrations/spark/openhouse-spark-runtime/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ dependencies {
exclude group: 'org.roaringbitmap'
}

testImplementation("org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:" + icebergVersion)
testImplementation(project(':tables-test-fixtures_2.12'))
autumnust marked this conversation as resolved.
Show resolved Hide resolved
testImplementation('org.apache.spark:spark-sql_2.12:' + spark_version){
// These classes are available from `client-codegen-convention.gradle`
exclude group: "io.netty"
}
testImplementation(project(path: ':integrations:java:openhouse-java-runtime', configuration: 'shadow'))
autumnust marked this conversation as resolved.
Show resolved Hide resolved

fatJarPackagedDependencies(project(path: ':integrations:java:openhouse-java-runtime'))
fatJarRuntimeDependencies("org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:" + icebergVersion)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import com.linkedin.openhouse.tablestest.OpenHouseSparkITest;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import scala.collection.JavaConverters;

public class CatalogOperationTest extends OpenHouseSparkITest {
@Test
public void testCasingWithCTAS() throws Exception {
try (SparkSession spark = getSparkSession()) {
// creating a casing preserving table using backtick
spark.sql("CREATE TABLE openhouse.d1.`tT1` (name string)");
// testing writing behavior, note the casing of tt1 is intentionally changed.
spark.sql("INSERT INTO openhouse.d1.Tt1 VALUES ('foo')");

// Verifying by querying with all lower-cased name
Assertions.assertEquals(
1, spark.sql("SELECT * from openhouse.d1.tt1").collectAsList().size());
// ctas but referring with lower-cased name
spark.sql("CREATE TABLE openhouse.d1.t2 AS SELECT * from openhouse.d1.tt1");
Assertions.assertEquals(1, spark.sql("SELECT * FROM openhouse.d1.t2").collectAsList().size());
}
}

@Test
public void testCatalogWriteAPI() throws Exception {
try (SparkSession spark = getSparkSession()) {
Catalog icebergCatalog = getOpenHouseCatalog(spark);
// Create a table
Schema schema = new Schema(Types.NestedField.required(1, "name", Types.StringType.get()));
TableIdentifier tableIdentifier = TableIdentifier.of("db", "aaa");
icebergCatalog.createTable(tableIdentifier, schema);

// Write into data with intentionally changed casing in name
TableIdentifier tableIdentifierUpperTblName = TableIdentifier.of("db", "AAA");

DataFile fooDataFile =
DataFiles.builder(PartitionSpec.unpartitioned())
.withPath("/path/to/data-a.parquet")
.withFileSizeInBytes(10)
.withRecordCount(1)
.build();
AtomicReference<Table> tableRef = new AtomicReference<>();
Assertions.assertDoesNotThrow(
() -> {
Table loadedTable = icebergCatalog.loadTable(tableIdentifierUpperTblName);
tableRef.set(loadedTable);
});
Table table = tableRef.get();
Assertions.assertDoesNotThrow(
() -> {
table.newAppend().appendFile(fooDataFile).commit();
});
}
}

/**
* This is a copy of com.linkedin.openhouse.jobs.spark.Operations#getCatalog() temporarily.
* Refactoring these pieces require deployment coordination, thus we shall create an artifact
* module that can be pulled by :apps module.
*/
private Catalog getOpenHouseCatalog(SparkSession spark) {
final Map<String, String> catalogProperties = new HashMap<>();
final String catalogPropertyPrefix = String.format("spark.sql.catalog.openhouse.");
final Map<String, String> sparkProperties = JavaConverters.mapAsJavaMap(spark.conf().getAll());
for (Map.Entry<String, String> entry : sparkProperties.entrySet()) {
if (entry.getKey().startsWith(catalogPropertyPrefix)) {
catalogProperties.put(
entry.getKey().substring(catalogPropertyPrefix.length()), entry.getValue());
}
}
// this initializes the catalog based on runtime Catalog class passed in catalog-impl conf.
return CatalogUtil.loadCatalog(
sparkProperties.get("spark.sql.catalog.openhouse.catalog-impl"),
"openhouse",
catalogProperties,
spark.sparkContext().hadoopConfiguration());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,23 @@ public Pair<TableDto, Boolean> putIcebergSnapshots(

TableDto tableDtoToSave =
tablesMapper.toTableDto(
tableDto.orElse(
TableDto.builder()
.tableId(tableId)
.databaseId(databaseId)
.clusterId(clusterId)
.tableUri(
TableUri.builder()
.tableId(tableId)
.databaseId(databaseId)
.clusterId(clusterId)
.build()
.toString())
.tableUUID(
tableUUIDGenerator.generateUUID(icebergSnapshotRequestBody).toString())
.tableCreator(tableCreatorUpdater)
.build()),
tableDto.orElseGet(
autumnust marked this conversation as resolved.
Show resolved Hide resolved
() ->
TableDto.builder()
.tableId(tableId)
.databaseId(databaseId)
.clusterId(clusterId)
.tableUri(
TableUri.builder()
.tableId(tableId)
.databaseId(databaseId)
.clusterId(clusterId)
.build()
.toString())
.tableUUID(
tableUUIDGenerator.generateUUID(icebergSnapshotRequestBody).toString())
.tableCreator(tableCreatorUpdater)
.build()),
icebergSnapshotRequestBody);

if (tableDto.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,19 +106,22 @@ public Pair<TableDto, Boolean> putTable(
// FIXME: save method redundantly issue existence check after findById is called above
TableDto tableDtoToSave =
tablesMapper.toTableDto(
tableDto.orElse(
TableDto.builder()
.tableUri(
TableUri.builder()
.tableId(tableId)
.databaseId(databaseId)
.clusterId(createUpdateTableRequestBody.getClusterId())
.build()
.toString())
.tableUUID(
tableUUIDGenerator.generateUUID(createUpdateTableRequestBody).toString())
.tableCreator(tableCreatorUpdater)
.build()),
tableDto.orElseGet(
autumnust marked this conversation as resolved.
Show resolved Hide resolved
() ->
TableDto.builder()
.tableUri(
TableUri.builder()
.tableId(tableId)
.databaseId(databaseId)
.clusterId(createUpdateTableRequestBody.getClusterId())
.build()
.toString())
.tableUUID(
tableUUIDGenerator
.generateUUID(createUpdateTableRequestBody)
.toString())
.tableCreator(tableCreatorUpdater)
.build()),
createUpdateTableRequestBody);
try {
return Pair.of(openHouseInternalRepository.save(tableDtoToSave), !tableDto.isPresent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
public class TableUUIDGenerator {
// TODO: r/w of tableProperties being managed in single place.
private static final String OPENHOUSE_NAMESPACE = "openhouse.";
private static final String DB_RAW_KEY = "databaseId";
private static final String TBL_RAW_KEY = "tableId";

@Autowired FsStorageProvider fsStorageProvider;

Expand Down Expand Up @@ -62,14 +64,37 @@ public UUID generateUUID(CreateUpdateTableRequestBody createUpdateTableRequestBo
* @return UUID
*/
public UUID generateUUID(IcebergSnapshotsRequestBody icebergSnapshotsRequestBody) {
return extractUUIDFromSnapshotJson(
icebergSnapshotsRequestBody.getJsonSnapshots(),
icebergSnapshotsRequestBody.getCreateUpdateTableRequestBody().getDatabaseId(),
icebergSnapshotsRequestBody.getCreateUpdateTableRequestBody().getTableId())
return extractUUIDFromRequestBody(icebergSnapshotsRequestBody)
.orElseGet(
() -> generateUUID(icebergSnapshotsRequestBody.getCreateUpdateTableRequestBody()));
}

/** Simple helper method to obtain tableURI from requestBody. */
private String getTableURI(IcebergSnapshotsRequestBody icebergSnapshotsRequestBody) {
autumnust marked this conversation as resolved.
Show resolved Hide resolved
return icebergSnapshotsRequestBody.getCreateUpdateTableRequestBody().getDatabaseId()
+ "."
+ icebergSnapshotsRequestBody.getCreateUpdateTableRequestBody().getTableId();
}

/**
* Extracting the value of given key from the table properties map. The main use cases are for
* tableId and databaseId where the value captured in tblproperties preserved the casing from
* creation. This casing is critical if r/w for this table occurs in a platform with different
* casing-preservation contract.
*/
private String extractFromTblPropsIfExists(
String tableURI, Map<String, String> tblProps, String rawKey) {
if (tblProps == null
HotSushi marked this conversation as resolved.
Show resolved Hide resolved
|| !tblProps.containsKey(OPENHOUSE_NAMESPACE + rawKey)
|| tblProps.get(OPENHOUSE_NAMESPACE + rawKey) == null) {
throw new RequestValidationFailureException(
String.format(
"Provided snapshot is invalid for %s since databaseId or tableId is missing in properties",
tableURI));
}
return tblProps.get(OPENHOUSE_NAMESPACE + rawKey);
}

/**
* Helper method to extract UUID from tableProperties. A CTAS command's commit() call provides
* "openhouse.tableUUID", if snapshot was not provided, this property is used and its path is
Expand Down Expand Up @@ -111,18 +136,10 @@ private void validatePathOfProvidedRequest(
String tableUUIDProperty,
TableType tableType) {

// Using Ids from tableProperties is to ensure casing of these Ids are properly presented as
// they were when
// initially created. Ids carried in the requestBody, if sourced from query engine, may lose
// proper casing.
String dbIdFromProps = tableProperties.get(OPENHOUSE_NAMESPACE + "databaseId");
String tblIdFromProps = tableProperties.get(OPENHOUSE_NAMESPACE + "tableId");
if (dbIdFromProps == null || tblIdFromProps == null) {
throw new RequestValidationFailureException(
String.format(
"Provided snapshot is invalid for %s.%s since databaseId or tableId is missing in properties",
databaseId, tableId));
}
String dbIdFromProps =
extractFromTblPropsIfExists(databaseId + "." + tableId, tableProperties, DB_RAW_KEY);
String tblIdFromProps =
extractFromTblPropsIfExists(databaseId + "." + tableId, tableProperties, TBL_RAW_KEY);

java.nio.file.Path previousPath =
InternalRepositoryUtils.constructTablePath(
Expand All @@ -135,18 +152,32 @@ private void validatePathOfProvidedRequest(
}

/**
* Helper method to extract UUID from List.of(jsonSnapshots)
* Helper method to extract UUID from Iceberg-Snapshots' RequestBody
*
* <p>If List is null or empty returns empty Optional. If List contains a snapshot, Snapshot is
* validated by evaluating its "manifest-list" key.
*
* @param jsonSnapshots
* @param databaseId
* @param tableId
* @param snapshotsRequestBody a complete snapshot request-body
* @return Optional.of(UUID)
*/
private Optional<UUID> extractUUIDFromSnapshotJson(
List<String> jsonSnapshots, String databaseId, String tableId) {
private Optional<UUID> extractUUIDFromRequestBody(
IcebergSnapshotsRequestBody snapshotsRequestBody) {
List<String> jsonSnapshots = snapshotsRequestBody.getJsonSnapshots();
String tableURI =
snapshotsRequestBody.getCreateUpdateTableRequestBody().getDatabaseId()
+ "."
+ snapshotsRequestBody.getCreateUpdateTableRequestBody().getTableId();
String databaseId =
extractFromTblPropsIfExists(
tableURI,
snapshotsRequestBody.getCreateUpdateTableRequestBody().getTableProperties(),
DB_RAW_KEY);
String tableId =
extractFromTblPropsIfExists(
tableURI,
snapshotsRequestBody.getCreateUpdateTableRequestBody().getTableProperties(),
TBL_RAW_KEY);

String snapshotStr =
Optional.ofNullable(jsonSnapshots)
.filter(l -> !l.isEmpty())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public void testPutSnapshotsAppendWithStagedTable(GetTableResponseBody getTableR
getTableResponseBody
.toBuilder()
.tableVersion(INITIAL_TABLE_VERSION)
.tableProperties(tablePropsHelperForResponseBody(getTableResponseBody))
.tableUUID(beforeUUID)
.build()));
Map<String, String> snapshotRefs =
Expand All @@ -127,6 +128,7 @@ public void testPutSnapshotsAppendWithStagedTable(GetTableResponseBody getTableR
buildCreateUpdateTableRequestBody(getTableResponseBody)
.toBuilder()
.baseTableVersion(INITIAL_TABLE_VERSION)
.tableProperties(tablePropsHelperForResponseBody(getTableResponseBody))
.build())
.jsonSnapshots(jsonSnapshots)
.snapshotRefs(snapshotRefs)
Expand Down Expand Up @@ -327,4 +329,15 @@ private List<Snapshot> getSnapshotsWithMultipleAppendRequests(
}
return snapshots;
}

/**
* For mock responseBody, ensure they are equipped with correct properties that are critical for
* casing contract.
*/
private Map<String, String> tablePropsHelperForResponseBody(GetTableResponseBody responseBody) {
Map<String, String> originalProps = responseBody.getTableProperties();
originalProps.put("openhouse.databaseId", responseBody.getDatabaseId());
originalProps.put("openhouse.tableId", responseBody.getTableId());
return originalProps;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import com.linkedin.openhouse.tables.repository.impl.InternalRepositoryUtils;
import com.linkedin.openhouse.tables.utils.TableUUIDGenerator;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import lombok.SneakyThrows;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -146,6 +148,7 @@ public void testUUIDFailsForInvalidSnapshot() {
CreateUpdateTableRequestBody.builder()
.tableId("t")
.databaseId("db")
.tableProperties(generateMinimalTestProps("db", "t"))
.clusterId(CLUSTER_NAME)
.build())
.jsonSnapshots(
Expand Down Expand Up @@ -189,6 +192,7 @@ public void testUUIDFailsForInvalidSnapshotShortManifestList() {
CreateUpdateTableRequestBody.builder()
.tableId("t")
.databaseId("db")
.tableProperties(generateMinimalTestProps("db", "t"))
.clusterId(CLUSTER_NAME)
.build())
.jsonSnapshots(
Expand All @@ -210,6 +214,7 @@ public void testUUIDFailsForInvalidSnapshotWithoutUUID() {
CreateUpdateTableRequestBody.builder()
.tableId("t")
.databaseId("db")
.tableProperties(generateMinimalTestProps("db", "t"))
.clusterId(CLUSTER_NAME)
.build())
.jsonSnapshots(
Expand Down Expand Up @@ -298,4 +303,13 @@ private String getIcebergSnapshot(String manifestListValue) {
jsonObject.addProperty(key, manifestListValue);
return jsonObject.toString();
}

private Map<String, String> generateMinimalTestProps(String databaseId, String tableId) {
return new HashMap<String, String>() {
{
put("openhouse.databaseId", databaseId);
put("openhouse.tableId", tableId);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.linkedin.openhouse.tablestest;

import com.linkedin.openhouse.internal.catalog.model.HouseTable;
import com.linkedin.openhouse.internal.catalog.model.HouseTablePrimaryKey;
import com.linkedin.openhouse.internal.catalog.repository.HouseTableRepository;
import java.util.Optional;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Repository;

Expand All @@ -11,4 +14,13 @@
*/
@Repository
@Primary
public interface HouseTablesH2Repository extends HouseTableRepository {}
public interface HouseTablesH2Repository extends HouseTableRepository {
Optional<HouseTable> findByDatabaseIdIgnoreCaseAndTableIdIgnoreCase(
String databaseId, String tableId);

@Override
default Optional<HouseTable> findById(HouseTablePrimaryKey houseTablePrimaryKey) {
autumnust marked this conversation as resolved.
Show resolved Hide resolved
return this.findByDatabaseIdIgnoreCaseAndTableIdIgnoreCase(
houseTablePrimaryKey.getDatabaseId(), houseTablePrimaryKey.getTableId());
}
}