Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to be able to query new Delta protocols #22596

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
56 changes: 45 additions & 11 deletions presto-delta/pom.xml
Expand Up @@ -14,33 +14,61 @@
<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<enforcer.skip>true</enforcer.skip>
<io.delta.delta-kernel-api.version>3.1.0</io.delta.delta-kernel-api.version>
</properties>

<dependencies>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-standalone_2.12</artifactId>
<version>0.6.0</version>
<artifactId>delta-kernel-api</artifactId>
<version>${io.delta.delta-kernel-api.version}</version>
<exclusions>
<exclusion>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.12</artifactId>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-kernel-defaults</artifactId>
<version>${io.delta.delta-kernel-api.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<groupId>org.roringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
</exclusion>
</exclusions>
</dependency>
Expand Down Expand Up @@ -292,6 +320,12 @@
<artifactId>javax.servlet-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<version>19.0.0</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
167 changes: 114 additions & 53 deletions presto-delta/src/main/java/com/facebook/presto/delta/DeltaClient.java
Expand Up @@ -19,22 +19,27 @@
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import io.delta.standalone.DeltaLog;
import io.delta.standalone.Snapshot;
import io.delta.standalone.actions.AddFile;
import io.delta.standalone.actions.Metadata;
import io.delta.standalone.data.CloseableIterator;
import io.delta.kernel.Snapshot;
import io.delta.kernel.Table;
import io.delta.kernel.TableNotFoundException;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.defaults.client.DefaultTableClient;
import io.delta.kernel.internal.InternalScanFileUtils;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.utils.CloseableIterator;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import javax.inject.Inject;

import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static com.facebook.presto.delta.DeltaErrorCode.DELTA_UNSUPPORTED_DATA_FORMAT;
Expand All @@ -51,6 +56,7 @@
*/
public class DeltaClient
{
private static final String TABLE_NOT_FOUND_ERROR_TEMPLATE = "Delta table (%s.%s) no longer exists.";
private final HdfsEnvironment hdfsEnvironment;

@Inject
Expand All @@ -70,72 +76,89 @@ public DeltaClient(HdfsEnvironment hdfsEnvironment)
* @return If the table is found return {@link DeltaTable}.
*/
public Optional<DeltaTable> getTable(
DeltaConfig config,
ConnectorSession session,
SchemaTableName schemaTableName,
String tableLocation,
Optional<Long> snapshotId,
Optional<Long> snapshotAsOfTimestampMillis)
{
Optional<DeltaLog> deltaLog = loadDeltaTableLog(session, new Path(tableLocation), schemaTableName);
if (!deltaLog.isPresent()) {
Path location = new Path(tableLocation);
Optional<TableClient> deltaTableClient = loadDeltaTableClient(session, location,
schemaTableName);
if (!deltaTableClient.isPresent()) {
return Optional.empty();
}

Table deltaTable = loadDeltaTable(location.toString(), deltaTableClient.get());
// Fetch the snapshot info for given snapshot version. If no snapshot version is given, get the latest snapshot info.
// Lock the snapshot version here and use it later in the rest of the query (such as fetching file list etc.).
// If we don't lock the snapshot version here, the query may end up with schema from one version and data files from another
// version when the underlying delta table is changing while the query is running.
Snapshot snapshot;
if (snapshotId.isPresent()) {
snapshot = getSnapshotById(deltaLog.get(), snapshotId.get(), schemaTableName);
snapshot = getSnapshotById(deltaTable, deltaTableClient.get(), snapshotId.get(), schemaTableName);
}
else if (snapshotAsOfTimestampMillis.isPresent()) {
snapshot = getSnapshotAsOfTimestamp(deltaLog.get(), snapshotAsOfTimestampMillis.get(), schemaTableName);
snapshot = getSnapshotAsOfTimestamp(deltaTable, deltaTableClient.get(),
snapshotAsOfTimestampMillis.get(), schemaTableName);
}
else {
snapshot = deltaLog.get().snapshot(); // get the latest snapshot
try {
snapshot = deltaTable.getLatestSnapshot(deltaTableClient.get()); // get the latest snapshot
}
catch (TableNotFoundException e) {
throw new PrestoException(GENERIC_INTERNAL_ERROR,
format("Could not move to latest snapshot on table '%s.%s'", schemaTableName.getSchemaName(),
schemaTableName.getTableName()));
}
}

Metadata metadata = snapshot.getMetadata();
String format = metadata.getFormat().getProvider();
if (!PARQUET.name().equalsIgnoreCase(format)) {
throw new PrestoException(DELTA_UNSUPPORTED_DATA_FORMAT,
format("Delta table %s has unsupported data format: %s. Currently only Parquet data format is supported", schemaTableName, format));
if (snapshot instanceof SnapshotImpl) {
String format = ((SnapshotImpl) snapshot).getMetadata().getFormat().getProvider();
if (!PARQUET.name().equalsIgnoreCase(format)) {
throw new PrestoException(DELTA_UNSUPPORTED_DATA_FORMAT,
format("Delta table %s has unsupported data format: %s. Currently only Parquet data format is supported", schemaTableName, format));
}
}

return Optional.of(new DeltaTable(
schemaTableName.getSchemaName(),
schemaTableName.getTableName(),
tableLocation,
Optional.of(snapshot.getVersion()), // lock the snapshot version
getSchema(schemaTableName, metadata)));
Optional.of(snapshot.getVersion(deltaTableClient.get())), // lock the snapshot version
getSchema(config, schemaTableName, deltaTable, deltaTableClient.get())));
}

/**
* Get the list of files corresponding to the given Delta table.
*
* @return Closeable iterator of files. It is responsibility of the caller to close the iterator.
*/
public CloseableIterator<AddFile> listFiles(ConnectorSession session, DeltaTable deltaTable)
public CloseableIterator<FilteredColumnarBatch> listFiles(ConnectorSession session, DeltaTable deltaTable)
{
checkArgument(deltaTable.getSnapshotId().isPresent(), "Snapshot id is missing from the Delta table");
Optional<DeltaLog> deltaLog = loadDeltaTableLog(
session,
Optional<TableClient> deltaTableClient = loadDeltaTableClient(session,
new Path(deltaTable.getTableLocation()),
new SchemaTableName(deltaTable.getSchemaName(), deltaTable.getTableName()));
if (!deltaTableClient.isPresent()) {
throw new PrestoException(GENERIC_INTERNAL_ERROR,
format("Could not obtain Delta table client in '%s'", deltaTable.getTableLocation()));
}
checkArgument(deltaTable.getSnapshotId().isPresent(), "Snapshot id is missing from the Delta table");
Table sourceTable = loadDeltaTable(deltaTable.getTableLocation(), deltaTableClient.get());

if (!deltaLog.isPresent()) {
try {
return sourceTable.getLatestSnapshot(deltaTableClient.get()).getScanBuilder(deltaTableClient.get()).build()
.getScanFiles(deltaTableClient.get());
}
catch (TableNotFoundException e) {
throw new PrestoException(NOT_FOUND,
format("Delta table (%s.%s) no longer exists.", deltaTable.getSchemaName(), deltaTable.getTableName()));
format("Delta table not found in '%s'", deltaTable.getTableLocation()));
}

return deltaLog.get()
.getSnapshotForVersionAsOf(deltaTable.getSnapshotId().get())
.scan()
.getFiles();
}

private Optional<DeltaLog> loadDeltaTableLog(ConnectorSession session, Path tableLocation, SchemaTableName schemaTableName)
private Optional<TableClient> loadDeltaTableClient(ConnectorSession session, Path tableLocation,
SchemaTableName schemaTableName)
{
try {
HdfsContext hdfsContext = new HdfsContext(
Expand All @@ -148,32 +171,47 @@ private Optional<DeltaLog> loadDeltaTableLog(ConnectorSession session, Path tabl
if (!fileSystem.isDirectory(tableLocation)) {
return Optional.empty();
}
return Optional.of(DeltaLog.forTable(
hdfsEnvironment.getConfiguration(hdfsContext, tableLocation),
tableLocation));
return Optional.of(DefaultTableClient.create(fileSystem.getConf()));
}
catch (IOException ioException) {
throw new PrestoException(GENERIC_INTERNAL_ERROR, "Failed to load Delta table: " + ioException.getMessage(), ioException);
}
}

private static Snapshot getSnapshotById(DeltaLog deltaLog, long snapshotId, SchemaTableName schemaTableName)
private Table loadDeltaTable(String tableLocation, TableClient deltaTableClient)
{
try {
return deltaLog.getSnapshotForVersionAsOf(snapshotId);
return Table.forPath(deltaTableClient, tableLocation);
}
catch (TableNotFoundException e) {
throw new PrestoException(NOT_FOUND,
format("Could not obtain Delta table client in '%s'", tableLocation));
}
}

private static Snapshot getSnapshotById(Table deltaTable, TableClient deltaTableClient, long snapshotId, SchemaTableName schemaTableName)
{
try {
return deltaTable.getLatestSnapshot(deltaTableClient);
}
catch (IllegalArgumentException exception) {
throw new PrestoException(
NOT_FOUND,
format("Snapshot version %d does not exist in Delta table '%s'.", snapshotId, schemaTableName),
exception);
}
catch (TableNotFoundException e) {
throw new PrestoException(NOT_FOUND,
format(TABLE_NOT_FOUND_ERROR_TEMPLATE, schemaTableName.getSchemaName(),
schemaTableName.getTableName()));
}
}

private static Snapshot getSnapshotAsOfTimestamp(DeltaLog deltaLog, long snapshotAsOfTimestampMillis, SchemaTableName schemaTableName)
private static Snapshot getSnapshotAsOfTimestamp(Table deltaTable, TableClient deltaTableClient,
long snapshotAsOfTimestampMillis, SchemaTableName schemaTableName)
{
try {
return deltaLog.getSnapshotForTimestampAsOf(snapshotAsOfTimestampMillis);
return deltaTable.getLatestSnapshot(deltaTableClient);
}
catch (IllegalArgumentException exception) {
throw new PrestoException(
Expand All @@ -184,27 +222,50 @@ private static Snapshot getSnapshotAsOfTimestamp(DeltaLog deltaLog, long snapsho
Instant.ofEpochMilli(snapshotAsOfTimestampMillis)),
exception);
}
catch (TableNotFoundException e) {
throw new PrestoException(NOT_FOUND,
format(TABLE_NOT_FOUND_ERROR_TEMPLATE, schemaTableName.getSchemaName(),
schemaTableName.getTableName()));
}
}

/**
* Utility method that returns the columns in given Delta metadata. Returned columns include regular and partition types.
* Data type from Delta is mapped to appropriate Presto data type.
*/
private static List<DeltaColumn> getSchema(SchemaTableName tableName, Metadata metadata)
private static List<DeltaColumn> getSchema(DeltaConfig config, SchemaTableName tableName, Table deltaTable,
TableClient deltaTableClient)
{
Set<String> partitionColumns = metadata.getPartitionColumns().stream()
.map(String::toLowerCase)
.collect(Collectors.toSet());

return Arrays.stream(metadata.getSchema().getFields())
.map(field -> {
String columnName = field.getName().toLowerCase(US);
TypeSignature prestoType = DeltaTypeUtils.convertDeltaDataTypePrestoDataType(tableName, columnName, field.getDataType());
return new DeltaColumn(
columnName,
prestoType,
field.isNullable(),
partitionColumns.contains(columnName));
}).collect(Collectors.toList());
try {
Snapshot latestSnapshot = deltaTable.getLatestSnapshot(deltaTableClient);
CloseableIterator<FilteredColumnarBatch> columnBatches = latestSnapshot.getScanBuilder(deltaTableClient)
.build().getScanFiles(deltaTableClient);
Row row = null;
while (columnBatches.hasNext()) {
CloseableIterator<Row> rows = columnBatches.next().getRows();
if (rows.hasNext()) {
row = rows.next();
break;
}
}
Map<String, String> partitionValues = row != null ?
InternalScanFileUtils.getPartitionValues(row) : new HashMap<>(0);
return latestSnapshot.getSchema(deltaTableClient).fields().stream()
.map(field -> {
String columnName = config.isCaseSensitivePartitionsEnabled() ? field.getName() :
field.getName().toLowerCase(US);
TypeSignature prestoType = DeltaTypeUtils.convertDeltaDataTypePrestoDataType(tableName,
columnName, field.getDataType());
return new DeltaColumn(
columnName,
prestoType,
field.isNullable(),
partitionValues.containsKey(columnName));
}).collect(Collectors.toList());
}
catch (TableNotFoundException e) {
throw new PrestoException(NOT_FOUND,
format(TABLE_NOT_FOUND_ERROR_TEMPLATE, tableName.getSchemaName(), tableName.getTableName()));
}
}
}
Expand Up @@ -21,6 +21,7 @@ public class DeltaConfig
{
private int maxSplitsBatchSize = 200;
private boolean parquetDereferencePushdownEnabled = true;
private boolean caseSensitivePartitionsEnabled = true;

@NotNull
public boolean isParquetDereferencePushdownEnabled()
Expand All @@ -46,4 +47,16 @@ public DeltaConfig setMaxSplitsBatchSize(int maxSplitsBatchSize)
this.maxSplitsBatchSize = maxSplitsBatchSize;
return this;
}

public boolean isCaseSensitivePartitionsEnabled()
{
return this.caseSensitivePartitionsEnabled;
}

@Config("delta.case-sensitive-partitions-enabled")
public DeltaConfig setCaseSensitivePartitionsEnabled(boolean caseSensitivePartitionsEnabled)
{
this.caseSensitivePartitionsEnabled = caseSensitivePartitionsEnabled;
return this;
}
}