Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Delta Table Format to OpenHouse, a proof-of-concept #40

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ RUN mkdir -p "${LIVY_HOME}/logs"
COPY /infra/recipes/docker-compose/common/spark/start-spark.sh /
COPY /build/openhouse-spark-runtime_2.12/libs/*[^sources][^javadoc].jar $SPARK_HOME/openhouse-spark-runtime_2.12-latest-all.jar
COPY /build/openhouse-spark-apps_2.12/libs/openhouse-spark-apps_2.12-*-all.jar $SPARK_HOME/openhouse-spark-apps_2.12-latest-all.jar
COPY /build/spark-delta-client/libs/spark-delta-client-*-all.jar $SPARK_HOME/spark-delta-client-latest-all.jar
COPY /build/dummytokens/libs/dummytokens*.jar /dummytokens.jar
RUN java -jar /dummytokens.jar -d /var/config/

Expand Down
66 changes: 66 additions & 0 deletions integrations/spark-delta-client/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
plugins {
id 'openhouse.java-minimal-conventions'
id 'com.github.johnrengelman.shadow' version '6.0.0'
}

repositories {
mavenCentral()
}


dependencies {

compileOnly("org.apache.spark:spark-sql_2.12:3.1.1")

implementation("io.delta:delta-core_2.12:1.0.+")
implementation("io.delta:delta-standalone_2.12:0.5.0")
implementation("io.delta:delta-storage:1.+")
implementation("org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.2.0")

implementation(project(':client:secureclient')){
exclude group: 'org.slf4j'
exclude group: 'ch.qos.logback'
exclude group: 'org.apache.logging.slf4j'
exclude group: 'org.apache.logging.log4j'
exclude group: 'io.netty'
}
implementation(project(':client:tableclient')){
exclude group: 'org.slf4j'
exclude group: 'ch.qos.logback'
exclude group: 'org.apache.logging.slf4j'
exclude group: 'org.apache.logging.log4j'
exclude group: 'io.netty'
}
compileOnly("org.springframework.boot:spring-boot-starter-webflux:2.6.1"){
exclude group: 'org.slf4j'
exclude group: 'ch.qos.logback'
exclude group: 'org.apache.logging.slf4j'
exclude group: 'org.apache.logging.log4j'
exclude group: 'io.netty'
}



// Libraries required for spark
testImplementation("org.apache.spark:spark-sql_2.12:3.1.1"){
exclude group: "io.netty"
}
testImplementation("org.apache.spark:spark-core_2.12:3.1.1") {
exclude group: "io.netty"
}
// Libraries required for tables-test-fixtures
// TODO: [BDP-18150] This is a temp workaround for jasper-oh integration
testImplementation(project(path: ':tables-test-fixtures_2.12')){
exclude group: 'org.slf4j'
exclude group: 'ch.qos.logback'
exclude group: 'org.apache.logging.slf4j'
exclude group: 'org.apache.logging.log4j'
exclude group: 'io.netty'
}
testRuntime("org.eclipse.jetty:jetty-server:11.0.2")
testImplementation "com.fasterxml.jackson.module:jackson-module-scala_2.12:2.13.1"
testRuntimeOnly 'io.netty:netty-resolver-dns-native-macos:4.1.70.Final:osx-x86_64'
testImplementation 'io.netty:netty-codec-http:4.1.75.Final'
testImplementation 'io.netty:netty-codec-http2:4.1.75.Final'

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package com.linkedin.openhouse.delta;

import com.linkedin.openhouse.client.ssl.HttpConnectionStrategy;
import com.linkedin.openhouse.client.ssl.TablesApiClientFactory;
import com.linkedin.openhouse.tables.client.api.DeltaSnapshotApi;
import com.linkedin.openhouse.tables.client.api.TableApi;
import com.linkedin.openhouse.tables.client.invoker.ApiClient;
import com.linkedin.openhouse.tables.client.model.CreateUpdateTableRequestBody;
import com.linkedin.openhouse.tables.client.model.GetAllTablesResponseBody;
import com.linkedin.openhouse.tables.client.model.GetTableResponseBody;
import java.net.MalformedURLException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.net.ssl.SSLException;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.catalog.TableChange;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.delta.catalog.DeltaTableV2;
import org.apache.spark.sql.delta.storage.OHLogStore;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import scala.Option;

public class OHCatalog implements TableCatalog {
private static final String INITIAL_TABLE_VERSION = "-1";
private static final String AUTH_TOKEN = "auth-token";
private static final String TRUST_STORE = "trust-store";
private static final String HTTP_CONNECTION_STRATEGY = "http-connection-strategy";
public TableApi tableApi;

public DeltaSnapshotApi deltaSnapshotApi;
private String clusterName;

@Override
public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException {
if (namespace.length > 1) {
throw new ValidationException(
"Input namespace has more than one levels " + String.join(".", namespace));
} else if (namespace.length == 0) {
throw new ValidationException(
"DatabaseId was not provided, for SQL please run \"SHOW TABLES IN <databaseId>\" instead");
}
List<Identifier> tables =
tableApi
.searchTablesV1(namespace[0])
.map(GetAllTablesResponseBody::getResults)
.flatMapMany(Flux::fromIterable)
.map(
x -> {
assert x.getTableId() != null;
return Identifier.of(new String[] {x.getDatabaseId()}, x.getTableId());
})
.collectList()
.block();
assert tables != null;
return tables.toArray(new Identifier[0]);
}

@Override
public Table loadTable(Identifier ident) throws NoSuchTableException {
Optional<GetTableResponseBody> a =
tableApi
.getTableV1(ident.namespace()[0], ident.name())
.onErrorResume(WebClientResponseException.NotFound.class, e -> Mono.empty())
.blockOptional();
if (!a.isPresent()) {
throw new NoSuchTableException(ident);
}

assert a.get().getTableLocation() != null;
assert a.get().getTableProperties() != null;

OHLogStore.TABLE_CACHE.put(ident, a.get());
return new DeltaTableV2(
SparkSession.active(),
new Path(a.get().getTableLocation()),
Option.empty(),
Option.apply(ident.toString()),
Option.empty(),
new CaseInsensitiveStringMap(a.get().getTableProperties()));
}

@Override
public Table createTable(
Identifier ident, StructType schema, Transform[] partitions, Map<String, String> properties)
throws TableAlreadyExistsException, NoSuchNamespaceException {
CreateUpdateTableRequestBody createUpdateTableRequestBody = new CreateUpdateTableRequestBody();
createUpdateTableRequestBody.setTableId(ident.name());
createUpdateTableRequestBody.setDatabaseId(ident.namespace()[0]);
createUpdateTableRequestBody.setClusterId(clusterName);
createUpdateTableRequestBody.setBaseTableVersion(INITIAL_TABLE_VERSION);
createUpdateTableRequestBody.setSchema(schema.json());
createUpdateTableRequestBody.setTableProperties(properties);
GetTableResponseBody a =
tableApi.createTableV1(ident.namespace()[0], createUpdateTableRequestBody).block();
assert a != null;
assert a.getTableLocation() != null;
assert a.getTableProperties() != null;
return new DeltaTableV2(
SparkSession.active(),
new Path(a.getTableLocation()),
Option.empty(),
Option.apply(ident.toString()),
Option.empty(),
new CaseInsensitiveStringMap(a.getTableProperties()));
}

@Override
public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException {
return null;
}

@Override
public boolean dropTable(Identifier ident) {
if (!tableExists(ident)) {
return false;
}
tableApi.deleteTableV1(ident.namespace()[0], ident.name()).block();
return true;
}

@Override
public void renameTable(Identifier oldIdent, Identifier newIdent)
throws NoSuchTableException, TableAlreadyExistsException {}

@Override
public void initialize(String name, CaseInsensitiveStringMap options) {
try {
String uri = options.get(CatalogProperties.URI);
Preconditions.checkNotNull(uri, "OpenHouse Table Service URI is required");
String truststore = options.getOrDefault(TRUST_STORE, "");
String token = options.getOrDefault(AUTH_TOKEN, null);
String httpConnectionStrategy = options.getOrDefault(HTTP_CONNECTION_STRATEGY, null);
clusterName = options.get("cluster");
ApiClient apiClient = null;
try {
TablesApiClientFactory tablesApiClientFactory = TablesApiClientFactory.getInstance();
tablesApiClientFactory.setStrategy(
HttpConnectionStrategy.fromString(httpConnectionStrategy));
apiClient = TablesApiClientFactory.getInstance().createApiClient(uri, token, truststore);
} catch (MalformedURLException | SSLException e) {
throw new RuntimeException(
"OpenHouse Catalog initialization failed: Failure while initializing ApiClient", e);
}
tableApi = new TableApi(apiClient);
deltaSnapshotApi = new DeltaSnapshotApi(apiClient);
} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public String name() {
return "openhouse";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.apache.spark.sql.delta.storage;

import com.linkedin.openhouse.delta.OHCatalog;
import com.linkedin.openhouse.tables.client.api.DeltaSnapshotApi;
import com.linkedin.openhouse.tables.client.api.TableApi;
import com.linkedin.openhouse.tables.client.model.DeltaActionsRequestBody;
import com.linkedin.openhouse.tables.client.model.GetTableResponseBody;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.delta.actions.Action;
import org.apache.spark.sql.delta.actions.CommitInfo;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import scala.collection.Iterator;

public class OHLogStore extends HDFSLogStore {

TableApi _tableApi;
DeltaSnapshotApi _deltaSnapshotApi;

public static final ConcurrentHashMap<Identifier, GetTableResponseBody> TABLE_CACHE =
new ConcurrentHashMap<>();

public OHLogStore(SparkConf sparkConf, Configuration defaultHadoopConf) {
super(sparkConf, defaultHadoopConf);
String prefix = "spark.sql.catalog.openhouse.";
Map<String, String> stringStringMap = new HashMap();
Arrays.stream(sparkConf.getAllWithPrefix(prefix)).forEach(x -> stringStringMap.put(x._1, x._2));
OHCatalog ohCatalog = new OHCatalog();
ohCatalog.initialize("openhouse", new CaseInsensitiveStringMap(stringStringMap));
_tableApi = ohCatalog.tableApi;
_deltaSnapshotApi = ohCatalog.deltaSnapshotApi;
}

@Override
public void write(Path path, Iterator<String> actions, boolean overwrite) {
// Hack to get tableMetadata
// solving it the right way can be done via
// TableApi.searchApi(location=path.toString()) and then get the dbId and tableId
GetTableResponseBody lastLoadedTable =
TABLE_CACHE.entrySet().stream()
.filter(x -> path.toString().contains(x.getValue().getTableLocation()))
.collect(Collectors.toList())
.get(0)
.getValue();

DeltaActionsRequestBody deltaActionsRequestBody = new DeltaActionsRequestBody();

List<String> actionsList = new ArrayList<>();
actions.toStream().foreach(x -> actionsList.add(x));
deltaActionsRequestBody.setJsonActions(actionsList);

Integer baseVersion =
((Long) ((CommitInfo) Action.fromJson(actionsList.get(0))).readVersion().get()).intValue();

deltaActionsRequestBody.setBaseTableVersion(baseVersion.toString());
_deltaSnapshotApi
.patchDeltaActionsV1(
lastLoadedTable.getDatabaseId(), lastLoadedTable.getTableId(), deltaActionsRequestBody)
.block();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.linkedin.openhouse.delta;

import com.linkedin.openhouse.tablestest.OpenHouseLocalServer;
import com.linkedin.openhouse.tablestest.OpenHouseSparkITest;
import java.net.URI;
import org.apache.spark.sql.SparkSession;

public class BaseLiOHTest extends OpenHouseSparkITest {

private static OpenHouseLocalServer openHouseLocalServer = null;

@Override
protected synchronized SparkSession getSparkSession() throws Exception {
if (openHouseLocalServer == null) {
openHouseLocalServer = new OpenHouseLocalServer();
openHouseLocalServer.start();
}
return SparkSession.builder()
.master("local[1]")
.config(
"spark.sql.catalog.openhouse.uri",
URI.create("http://localhost:" + openHouseLocalServer.getPort()).toString())
.config("spark.sql.catalog.openhouse.cluster", "local-cluster")
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.openhouse", "com.linkedin.openhouse.delta.OHCatalog")
.config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.OHLogStore")
.getOrCreate();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.linkedin.openhouse.delta.tests;

import com.linkedin.openhouse.delta.BaseLiOHTest;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.Test;

public class SimpleTest extends BaseLiOHTest {

@Test
public void basicSQLCatalog() throws Exception {
SparkSession sparkSession = getSparkSession();
sparkSession.sql("CREATE TABLE openhouse.db.t3 (col1 string, col2 string, col3 string)").show();
sparkSession.sql("INSERT INTO openhouse.db.t3 VALUES ('a', 'b', 'c')").show();
sparkSession.sql("INSERT INTO openhouse.db.t3 VALUES ('c', 'd', 'd')").show();
sparkSession.sql("SELECT * FROM openhouse.db.t3").show();
sparkSession.sql("DROP TABLE openhouse.db.t3").show();
}
}
5 changes: 5 additions & 0 deletions services/tables/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@ plugins {
id 'openhouse.service-specgen-convention'
}

repositories {
mavenCentral()
}

dependencies {
implementation project(':services:common')
implementation project(':iceberg:openhouse:internalcatalog')
implementation project(':client:hts')
implementation project(':cluster:configs')
implementation project(':cluster:storage')
implementation project(':cluster:metrics')
implementation("io.delta:delta-standalone_2.12:0.5.0")
implementation 'org.springframework.security:spring-security-config:' + '5.7.2'
implementation 'org.springframework.boot:spring-boot-starter-webflux:2.6.6'
testCompile 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
Expand Down