Skip to content

Commit

Permalink
Support procedure expire_snapshots for iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
hantangwangd committed Apr 25, 2024
1 parent d5e1222 commit 018a29c
Show file tree
Hide file tree
Showing 5 changed files with 441 additions and 0 deletions.
34 changes: 34 additions & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Expand Up @@ -912,6 +912,40 @@ procedure on the catalog's ``system`` schema::
``unregister_table`` only when using the Hive catalog. This is similar to
the behavior listed above for the ``DROP TABLE`` command.

Expire snapshots
^^^^^^^^^^^^^^^^

Each dml action in Iceberg produces a new snapshot while keeping the old data and metadata around for snapshot isolation and time travel. The expire_snapshots procedure can be used to remove older snapshots and their files which are no longer needed.

This procedure will remove old snapshots and the corresponding snapshot files, and it will never remove files which are still required by a non-expired snapshot.

The following arguments are available:

===================== ========== =============== =======================================================================
Argument Name required type Description
===================== ========== =============== =======================================================================
``schema`` ✔️ string Schema of the table to update

``table_name`` ✔️ string Name of the table to update

``older_than`` timestamp Timestamp before which snapshots will be removed (Default: 5 days ago)

``retain_last`` int Number of ancestor snapshots to preserve regardless of older_than
(defaults to 1)

``snapshot_ids`` array of long Array of snapshot IDs to expire
===================== ========== =============== =======================================================================

Examples:

* Remove snapshots older than specific day and time, but retain the last 10 snapshots::

CALL iceberg.system.expire_snapshots('schema_name', 'table_name', TIMESTAMP '2023-08-31 00:00:00.000', 10);

* Remove snapshots with snapshot ID 10001 and 10002 (note that this snapshot ID should not be the current snapshot)::

CALL iceberg.system.expire_snapshots(schema => 'schema_name', table_name => 'table_name', snapshot_ids => ARRAY[10001, 10002]);

Schema Evolution
-----------------

Expand Down
Expand Up @@ -39,6 +39,7 @@
import com.facebook.presto.hive.gcs.HiveGcsConfigurationInitializer;
import com.facebook.presto.iceberg.nessie.NessieConfig;
import com.facebook.presto.iceberg.optimizer.IcebergPlanOptimizerProvider;
import com.facebook.presto.iceberg.procedure.ExpireSnapshotsProcedure;
import com.facebook.presto.iceberg.procedure.RegisterTableProcedure;
import com.facebook.presto.iceberg.procedure.RollbackToSnapshotProcedure;
import com.facebook.presto.iceberg.procedure.UnregisterTableProcedure;
Expand Down Expand Up @@ -149,6 +150,7 @@ public void setup(Binder binder)
procedures.addBinding().toProvider(RollbackToSnapshotProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(RegisterTableProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(UnregisterTableProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(ExpireSnapshotsProcedure.class).in(Scopes.SINGLETON);

// for orc
binder.bind(EncryptionLibrary.class).annotatedWith(HiveDwrfEncryptionProvider.ForCryptoService.class).to(UnsupportedEncryptionLibrary.class).in(Scopes.SINGLETON);
Expand Down
@@ -0,0 +1,107 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg.procedure;

import com.facebook.presto.common.type.SqlTimestamp;
import com.facebook.presto.iceberg.IcebergAbstractMetadata;
import com.facebook.presto.iceberg.IcebergMetadataFactory;
import com.facebook.presto.iceberg.IcebergUtil;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
import com.facebook.presto.spi.procedure.Procedure;
import com.google.common.collect.ImmutableList;
import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.Table;

import javax.inject.Inject;
import javax.inject.Provider;

import java.lang.invoke.MethodHandle;
import java.util.List;

import static com.facebook.presto.common.block.MethodHandleUtil.methodHandle;
import static com.facebook.presto.common.type.StandardTypes.INTEGER;
import static com.facebook.presto.common.type.StandardTypes.TIMESTAMP;
import static com.facebook.presto.common.type.StandardTypes.VARCHAR;
import static java.util.Objects.requireNonNull;

public class ExpireSnapshotsProcedure
implements Provider<Procedure>
{
private static final MethodHandle EXPIRE_SNAPSHOTS = methodHandle(
ExpireSnapshotsProcedure.class,
"expireSnapshots",
ConnectorSession.class,
String.class,
String.class,
SqlTimestamp.class,
Integer.class,
List.class);
private final IcebergMetadataFactory metadataFactory;

@Inject
public ExpireSnapshotsProcedure(IcebergMetadataFactory metadataFactory)
{
this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null");
}

@Override
public Procedure get()
{
return new Procedure(
"system",
"expire_snapshots",
ImmutableList.of(
new Procedure.Argument("schema", VARCHAR),
new Procedure.Argument("table_name", VARCHAR),
new Procedure.Argument("older_than", TIMESTAMP, false, null),
new Procedure.Argument("retain_last", INTEGER, false, null),
new Procedure.Argument("snapshot_ids", "array(bigint)", false, null)),
EXPIRE_SNAPSHOTS.bindTo(this));
}

public void expireSnapshots(ConnectorSession clientSession, String schema, String tableName, SqlTimestamp olderThan, Integer retainLast, List<Long> snapshotIds)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
doExpireSnapshots(clientSession, schema, tableName, olderThan, retainLast, snapshotIds);
}
}

private void doExpireSnapshots(ConnectorSession clientSession, String schema, String tableName, SqlTimestamp olderThan, Integer retainLast, List<Long> snapshotIds)
{
IcebergAbstractMetadata metadata = (IcebergAbstractMetadata) metadataFactory.create();
SchemaTableName schemaTableName = new SchemaTableName(schema, tableName);
Table icebergTable = IcebergUtil.getIcebergTable(metadata, clientSession, schemaTableName);

ExpireSnapshots expireSnapshots = icebergTable.expireSnapshots();

if (snapshotIds != null) {
for (long id : snapshotIds) {
expireSnapshots = expireSnapshots.expireSnapshotId(id);
}
}

if (olderThan != null) {
expireSnapshots = expireSnapshots.expireOlderThan(olderThan.isLegacyTimestamp() ? olderThan.getMillisUtc() : olderThan.getMillis());
}

if (retainLast != null) {
expireSnapshots = expireSnapshots.retainLast(retainLast);
}

expireSnapshots.cleanExpiredFiles(true)
.commit();
}
}

0 comments on commit 018a29c

Please sign in to comment.