Skip to content

Commit

Permalink
Add support to be able to query new Delta protocols
Browse files Browse the repository at this point in the history
  • Loading branch information
mblanco-denodo committed Apr 24, 2024
1 parent 95ba95b commit da3c3ae
Show file tree
Hide file tree
Showing 359 changed files with 705 additions and 222 deletions.
56 changes: 45 additions & 11 deletions presto-delta/pom.xml
Expand Up @@ -14,33 +14,61 @@
<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<enforcer.skip>true</enforcer.skip>
<io.delta.delta-kernel-api.version>3.1.0</io.delta.delta-kernel-api.version>
</properties>

<dependencies>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-standalone_2.12</artifactId>
<version>0.6.0</version>
<artifactId>delta-kernel-api</artifactId>
<version>${io.delta.delta-kernel-api.version}</version>
<exclusions>
<exclusion>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.12</artifactId>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-kernel-defaults</artifactId>
<version>${io.delta.delta-kernel-api.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<groupId>org.roringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
</exclusion>
</exclusions>
</dependency>
Expand Down Expand Up @@ -292,6 +320,12 @@
<artifactId>javax.servlet-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<version>19.0.0</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
167 changes: 114 additions & 53 deletions presto-delta/src/main/java/com/facebook/presto/delta/DeltaClient.java
Expand Up @@ -19,22 +19,27 @@
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import io.delta.standalone.DeltaLog;
import io.delta.standalone.Snapshot;
import io.delta.standalone.actions.AddFile;
import io.delta.standalone.actions.Metadata;
import io.delta.standalone.data.CloseableIterator;
import io.delta.kernel.Snapshot;
import io.delta.kernel.Table;
import io.delta.kernel.TableNotFoundException;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.defaults.client.DefaultTableClient;
import io.delta.kernel.internal.InternalScanFileUtils;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.utils.CloseableIterator;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import javax.inject.Inject;

import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static com.facebook.presto.delta.DeltaErrorCode.DELTA_UNSUPPORTED_DATA_FORMAT;
Expand All @@ -51,6 +56,7 @@
*/
public class DeltaClient
{
private static final String TABLE_NOT_FOUND_ERROR_TEMPLATE = "Delta table (%s.%s) no longer exists.";
private final HdfsEnvironment hdfsEnvironment;

@Inject
Expand All @@ -70,72 +76,89 @@ public DeltaClient(HdfsEnvironment hdfsEnvironment)
* @return If the table is found return {@link DeltaTable}.
*/
public Optional<DeltaTable> getTable(
DeltaConfig config,
ConnectorSession session,
SchemaTableName schemaTableName,
String tableLocation,
Optional<Long> snapshotId,
Optional<Long> snapshotAsOfTimestampMillis)
{
Optional<DeltaLog> deltaLog = loadDeltaTableLog(session, new Path(tableLocation), schemaTableName);
if (!deltaLog.isPresent()) {
Path location = new Path(tableLocation);
Optional<TableClient> deltaTableClient = loadDeltaTableClient(session, location,
schemaTableName);
if (!deltaTableClient.isPresent()) {
return Optional.empty();
}

Table deltaTable = loadDeltaTable(location.toString(), deltaTableClient.get());
// Fetch the snapshot info for given snapshot version. If no snapshot version is given, get the latest snapshot info.
// Lock the snapshot version here and use it later in the rest of the query (such as fetching file list etc.).
// If we don't lock the snapshot version here, the query may end up with schema from one version and data files from another
// version when the underlying delta table is changing while the query is running.
Snapshot snapshot;
if (snapshotId.isPresent()) {
snapshot = getSnapshotById(deltaLog.get(), snapshotId.get(), schemaTableName);
snapshot = getSnapshotById(deltaTable, deltaTableClient.get(), snapshotId.get(), schemaTableName);
}
else if (snapshotAsOfTimestampMillis.isPresent()) {
snapshot = getSnapshotAsOfTimestamp(deltaLog.get(), snapshotAsOfTimestampMillis.get(), schemaTableName);
snapshot = getSnapshotAsOfTimestamp(deltaTable, deltaTableClient.get(),
snapshotAsOfTimestampMillis.get(), schemaTableName);
}
else {
snapshot = deltaLog.get().snapshot(); // get the latest snapshot
try {
snapshot = deltaTable.getLatestSnapshot(deltaTableClient.get()); // get the latest snapshot
}
catch (TableNotFoundException e) {
throw new PrestoException(GENERIC_INTERNAL_ERROR,
format("Could not move to latest snapshot on table '%s.%s'", schemaTableName.getSchemaName(),
schemaTableName.getTableName()));
}
}

Metadata metadata = snapshot.getMetadata();
String format = metadata.getFormat().getProvider();
if (!PARQUET.name().equalsIgnoreCase(format)) {
throw new PrestoException(DELTA_UNSUPPORTED_DATA_FORMAT,
format("Delta table %s has unsupported data format: %s. Currently only Parquet data format is supported", schemaTableName, format));
if (snapshot instanceof SnapshotImpl) {
String format = ((SnapshotImpl) snapshot).getMetadata().getFormat().getProvider();
if (!PARQUET.name().equalsIgnoreCase(format)) {
throw new PrestoException(DELTA_UNSUPPORTED_DATA_FORMAT,
format("Delta table %s has unsupported data format: %s. Currently only Parquet data format is supported", schemaTableName, format));
}
}

return Optional.of(new DeltaTable(
schemaTableName.getSchemaName(),
schemaTableName.getTableName(),
tableLocation,
Optional.of(snapshot.getVersion()), // lock the snapshot version
getSchema(schemaTableName, metadata)));
Optional.of(snapshot.getVersion(deltaTableClient.get())), // lock the snapshot version
getSchema(config, schemaTableName, deltaTable, deltaTableClient.get())));
}

/**
* Get the list of files corresponding to the given Delta table.
*
* @return Closeable iterator of files. It is responsibility of the caller to close the iterator.
*/
public CloseableIterator<AddFile> listFiles(ConnectorSession session, DeltaTable deltaTable)
public CloseableIterator<FilteredColumnarBatch> listFiles(ConnectorSession session, DeltaTable deltaTable)
{
checkArgument(deltaTable.getSnapshotId().isPresent(), "Snapshot id is missing from the Delta table");
Optional<DeltaLog> deltaLog = loadDeltaTableLog(
session,
Optional<TableClient> deltaTableClient = loadDeltaTableClient(session,
new Path(deltaTable.getTableLocation()),
new SchemaTableName(deltaTable.getSchemaName(), deltaTable.getTableName()));
if (!deltaTableClient.isPresent()) {
throw new PrestoException(GENERIC_INTERNAL_ERROR,
format("Could not obtain Delta table client in '%s'", deltaTable.getTableLocation()));
}
checkArgument(deltaTable.getSnapshotId().isPresent(), "Snapshot id is missing from the Delta table");
Table sourceTable = loadDeltaTable(deltaTable.getTableLocation(), deltaTableClient.get());

if (!deltaLog.isPresent()) {
try {
return sourceTable.getLatestSnapshot(deltaTableClient.get()).getScanBuilder(deltaTableClient.get()).build()
.getScanFiles(deltaTableClient.get());
}
catch (TableNotFoundException e) {
throw new PrestoException(NOT_FOUND,
format("Delta table (%s.%s) no longer exists.", deltaTable.getSchemaName(), deltaTable.getTableName()));
format("Delta table not found in '%s'", deltaTable.getTableLocation()));
}

return deltaLog.get()
.getSnapshotForVersionAsOf(deltaTable.getSnapshotId().get())
.scan()
.getFiles();
}

private Optional<DeltaLog> loadDeltaTableLog(ConnectorSession session, Path tableLocation, SchemaTableName schemaTableName)
private Optional<TableClient> loadDeltaTableClient(ConnectorSession session, Path tableLocation,
SchemaTableName schemaTableName)
{
try {
HdfsContext hdfsContext = new HdfsContext(
Expand All @@ -148,32 +171,47 @@ private Optional<DeltaLog> loadDeltaTableLog(ConnectorSession session, Path tabl
if (!fileSystem.isDirectory(tableLocation)) {
return Optional.empty();
}
return Optional.of(DeltaLog.forTable(
hdfsEnvironment.getConfiguration(hdfsContext, tableLocation),
tableLocation));
return Optional.of(DefaultTableClient.create(fileSystem.getConf()));
}
catch (IOException ioException) {
throw new PrestoException(GENERIC_INTERNAL_ERROR, "Failed to load Delta table: " + ioException.getMessage(), ioException);
}
}

private static Snapshot getSnapshotById(DeltaLog deltaLog, long snapshotId, SchemaTableName schemaTableName)
private Table loadDeltaTable(String tableLocation, TableClient deltaTableClient)
{
try {
return deltaLog.getSnapshotForVersionAsOf(snapshotId);
return Table.forPath(deltaTableClient, tableLocation);
}
catch (TableNotFoundException e) {
throw new PrestoException(NOT_FOUND,
format("Could not obtain Delta table client in '%s'", tableLocation));
}
}

private static Snapshot getSnapshotById(Table deltaTable, TableClient deltaTableClient, long snapshotId, SchemaTableName schemaTableName)
{
try {
return deltaTable.getLatestSnapshot(deltaTableClient);
}
catch (IllegalArgumentException exception) {
throw new PrestoException(
NOT_FOUND,
format("Snapshot version %d does not exist in Delta table '%s'.", snapshotId, schemaTableName),
exception);
}
catch (TableNotFoundException e) {
throw new PrestoException(NOT_FOUND,
format(TABLE_NOT_FOUND_ERROR_TEMPLATE, schemaTableName.getSchemaName(),
schemaTableName.getTableName()));
}
}

private static Snapshot getSnapshotAsOfTimestamp(DeltaLog deltaLog, long snapshotAsOfTimestampMillis, SchemaTableName schemaTableName)
private static Snapshot getSnapshotAsOfTimestamp(Table deltaTable, TableClient deltaTableClient,
long snapshotAsOfTimestampMillis, SchemaTableName schemaTableName)
{
try {
return deltaLog.getSnapshotForTimestampAsOf(snapshotAsOfTimestampMillis);
return deltaTable.getLatestSnapshot(deltaTableClient);
}
catch (IllegalArgumentException exception) {
throw new PrestoException(
Expand All @@ -184,27 +222,50 @@ private static Snapshot getSnapshotAsOfTimestamp(DeltaLog deltaLog, long snapsho
Instant.ofEpochMilli(snapshotAsOfTimestampMillis)),
exception);
}
catch (TableNotFoundException e) {
throw new PrestoException(NOT_FOUND,
format(TABLE_NOT_FOUND_ERROR_TEMPLATE, schemaTableName.getSchemaName(),
schemaTableName.getTableName()));
}
}

/**
* Utility method that returns the columns in given Delta metadata. Returned columns include regular and partition types.
* Data type from Delta is mapped to appropriate Presto data type.
*/
private static List<DeltaColumn> getSchema(SchemaTableName tableName, Metadata metadata)
private static List<DeltaColumn> getSchema(DeltaConfig config, SchemaTableName tableName, Table deltaTable,
TableClient deltaTableClient)
{
Set<String> partitionColumns = metadata.getPartitionColumns().stream()
.map(String::toLowerCase)
.collect(Collectors.toSet());

return Arrays.stream(metadata.getSchema().getFields())
.map(field -> {
String columnName = field.getName().toLowerCase(US);
TypeSignature prestoType = DeltaTypeUtils.convertDeltaDataTypePrestoDataType(tableName, columnName, field.getDataType());
return new DeltaColumn(
columnName,
prestoType,
field.isNullable(),
partitionColumns.contains(columnName));
}).collect(Collectors.toList());
try {
Snapshot latestSnapshot = deltaTable.getLatestSnapshot(deltaTableClient);
CloseableIterator<FilteredColumnarBatch> columnBatches = latestSnapshot.getScanBuilder(deltaTableClient)
.build().getScanFiles(deltaTableClient);
Row row = null;
while (columnBatches.hasNext()) {
CloseableIterator<Row> rows = columnBatches.next().getRows();
if (rows.hasNext()) {
row = rows.next();
break;
}
}
Map<String, String> partitionValues = row != null ?
InternalScanFileUtils.getPartitionValues(row) : new HashMap<>(0);
return latestSnapshot.getSchema(deltaTableClient).fields().stream()
.map(field -> {
String columnName = config.isCaseSensitivePartitionsEnabled() ? field.getName() :
field.getName().toLowerCase(US);
TypeSignature prestoType = DeltaTypeUtils.convertDeltaDataTypePrestoDataType(tableName,
columnName, field.getDataType());
return new DeltaColumn(
columnName,
prestoType,
field.isNullable(),
partitionValues.containsKey(columnName));
}).collect(Collectors.toList());
}
catch (TableNotFoundException e) {
throw new PrestoException(NOT_FOUND,
format(TABLE_NOT_FOUND_ERROR_TEMPLATE, tableName.getSchemaName(), tableName.getTableName()));
}
}
}
Expand Up @@ -21,6 +21,7 @@ public class DeltaConfig
{
private int maxSplitsBatchSize = 200;
private boolean parquetDereferencePushdownEnabled = true;
private boolean caseSensitivePartitionsEnabled = true;

@NotNull
public boolean isParquetDereferencePushdownEnabled()
Expand All @@ -46,4 +47,16 @@ public DeltaConfig setMaxSplitsBatchSize(int maxSplitsBatchSize)
this.maxSplitsBatchSize = maxSplitsBatchSize;
return this;
}

public boolean isCaseSensitivePartitionsEnabled()
{
return this.caseSensitivePartitionsEnabled;
}

@Config("delta.case-sensitive-partitions-enabled")
public DeltaConfig setCaseSensitivePartitionsEnabled(boolean caseSensitivePartitionsEnabled)
{
this.caseSensitivePartitionsEnabled = caseSensitivePartitionsEnabled;
return this;
}
}

0 comments on commit da3c3ae

Please sign in to comment.