Skip to content

Commit

Permalink
Support procedure expire_snapshots for iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
hantangwangd committed May 7, 2024
1 parent c6909d1 commit 5fbfac8
Show file tree
Hide file tree
Showing 5 changed files with 445 additions and 0 deletions.
34 changes: 34 additions & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Expand Up @@ -912,6 +912,40 @@ procedure on the catalog's ``system`` schema::
``unregister_table`` only when using the Hive catalog. This is similar to
the behavior listed above for the ``DROP TABLE`` command.

Expire snapshots
^^^^^^^^^^^^^^^^

Each DML (Data Manipulation Language) action in Iceberg produces a new snapshot while keeping the old data and metadata for snapshot isolation and time travel. Use `expire_snapshots` to remove older snapshots and their files.

This procedure removes old snapshots and their corresponding files, and never removes files which are required by a non-expired snapshot.

The following arguments are available:

===================== ========== =============== =======================================================================
Argument Name required type Description
===================== ========== =============== =======================================================================
``schema`` ✔️ string Schema of the table to update

``table_name`` ✔️ string Name of the table to update

``older_than`` timestamp Timestamp before which snapshots will be removed (Default: 5 days ago)

``retain_last`` int Number of ancestor snapshots to preserve regardless of older_than
(defaults to 1)

``snapshot_ids`` array of long Array of snapshot IDs to expire
===================== ========== =============== =======================================================================

Examples:

* Remove snapshots older than a specific day and time, but retain the last 10 snapshots::

CALL iceberg.system.expire_snapshots('schema_name', 'table_name', TIMESTAMP '2023-08-31 00:00:00.000', 10);

* Remove snapshots with snapshot ID 10001 and 10002 (note that these snapshot IDs should not be the current snapshot)::

CALL iceberg.system.expire_snapshots(schema => 'schema_name', table_name => 'table_name', snapshot_ids => ARRAY[10001, 10002]);

Schema Evolution
-----------------

Expand Down
Expand Up @@ -39,6 +39,7 @@
import com.facebook.presto.hive.gcs.HiveGcsConfigurationInitializer;
import com.facebook.presto.iceberg.nessie.NessieConfig;
import com.facebook.presto.iceberg.optimizer.IcebergPlanOptimizerProvider;
import com.facebook.presto.iceberg.procedure.ExpireSnapshotsProcedure;
import com.facebook.presto.iceberg.procedure.RegisterTableProcedure;
import com.facebook.presto.iceberg.procedure.RollbackToSnapshotProcedure;
import com.facebook.presto.iceberg.procedure.UnregisterTableProcedure;
Expand Down Expand Up @@ -149,6 +150,7 @@ public void setup(Binder binder)
procedures.addBinding().toProvider(RollbackToSnapshotProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(RegisterTableProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(UnregisterTableProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(ExpireSnapshotsProcedure.class).in(Scopes.SINGLETON);

// for orc
binder.bind(EncryptionLibrary.class).annotatedWith(HiveDwrfEncryptionProvider.ForCryptoService.class).to(UnsupportedEncryptionLibrary.class).in(Scopes.SINGLETON);
Expand Down
@@ -0,0 +1,107 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg.procedure;

import com.facebook.presto.common.type.SqlTimestamp;
import com.facebook.presto.iceberg.IcebergAbstractMetadata;
import com.facebook.presto.iceberg.IcebergMetadataFactory;
import com.facebook.presto.iceberg.IcebergUtil;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
import com.facebook.presto.spi.procedure.Procedure;
import com.google.common.collect.ImmutableList;
import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.Table;

import javax.inject.Inject;
import javax.inject.Provider;

import java.lang.invoke.MethodHandle;
import java.util.List;

import static com.facebook.presto.common.block.MethodHandleUtil.methodHandle;
import static com.facebook.presto.common.type.StandardTypes.INTEGER;
import static com.facebook.presto.common.type.StandardTypes.TIMESTAMP;
import static com.facebook.presto.common.type.StandardTypes.VARCHAR;
import static java.util.Objects.requireNonNull;

public class ExpireSnapshotsProcedure
implements Provider<Procedure>
{
private static final MethodHandle EXPIRE_SNAPSHOTS = methodHandle(
ExpireSnapshotsProcedure.class,
"expireSnapshots",
ConnectorSession.class,
String.class,
String.class,
SqlTimestamp.class,
Integer.class,
List.class);
private final IcebergMetadataFactory metadataFactory;

@Inject
public ExpireSnapshotsProcedure(IcebergMetadataFactory metadataFactory)
{
this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null");
}

@Override
public Procedure get()
{
return new Procedure(
"system",
"expire_snapshots",
ImmutableList.of(
new Procedure.Argument("schema", VARCHAR),
new Procedure.Argument("table_name", VARCHAR),
new Procedure.Argument("older_than", TIMESTAMP, false, null),
new Procedure.Argument("retain_last", INTEGER, false, null),
new Procedure.Argument("snapshot_ids", "array(bigint)", false, null)),
EXPIRE_SNAPSHOTS.bindTo(this));
}

public void expireSnapshots(ConnectorSession clientSession, String schema, String tableName, SqlTimestamp olderThan, Integer retainLast, List<Long> snapshotIds)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
doExpireSnapshots(clientSession, schema, tableName, olderThan, retainLast, snapshotIds);
}
}

private void doExpireSnapshots(ConnectorSession clientSession, String schema, String tableName, SqlTimestamp olderThan, Integer retainLast, List<Long> snapshotIds)
{
IcebergAbstractMetadata metadata = (IcebergAbstractMetadata) metadataFactory.create();
SchemaTableName schemaTableName = new SchemaTableName(schema, tableName);
Table icebergTable = IcebergUtil.getIcebergTable(metadata, clientSession, schemaTableName);

ExpireSnapshots expireSnapshots = icebergTable.expireSnapshots();

if (snapshotIds != null) {
for (long id : snapshotIds) {
expireSnapshots = expireSnapshots.expireSnapshotId(id);
}
}

if (olderThan != null) {
expireSnapshots = expireSnapshots.expireOlderThan(olderThan.isLegacyTimestamp() ? olderThan.getMillisUtc() : olderThan.getMillis());
}

if (retainLast != null) {
expireSnapshots = expireSnapshots.retainLast(retainLast);
}

expireSnapshots.cleanExpiredFiles(true)
.commit();
}
}

0 comments on commit 5fbfac8

Please sign in to comment.