Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support procedure expire_snapshots for iceberg #22609

Merged
merged 2 commits into from May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 34 additions & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Expand Up @@ -912,6 +912,40 @@ procedure on the catalog's ``system`` schema::
``unregister_table`` only when using the Hive catalog. This is similar to
the behavior listed above for the ``DROP TABLE`` command.

Expire snapshots
^^^^^^^^^^^^^^^^

Each DML (Data Manipulation Language) action in Iceberg produces a new snapshot while keeping the old data and metadata for snapshot isolation and time travel. Use `expire_snapshots` to remove older snapshots and their files.

This procedure removes old snapshots and their corresponding files, and never removes files which are required by a non-expired snapshot.

The following arguments are available:

===================== ========== =============== =======================================================================
Argument Name required type Description
===================== ========== =============== =======================================================================
``schema`` ✔️ string Schema of the table to update

``table_name`` ✔️ string Name of the table to update

``older_than`` timestamp Timestamp before which snapshots will be removed (Default: 5 days ago)

``retain_last`` int Number of ancestor snapshots to preserve regardless of older_than
(defaults to 1)

``snapshot_ids`` array of long Array of snapshot IDs to expire
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a word missing after "array of long"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, what I want to express is that the type is array of long. Is it OK with this expression in your opinion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine! I was unfamiliar with the phrase and I should have looked it up first. This is good as is. Thanks!

===================== ========== =============== =======================================================================

Examples:

* Remove snapshots older than a specific day and time, but retain the last 10 snapshots::

CALL iceberg.system.expire_snapshots('schema_name', 'table_name', TIMESTAMP '2023-08-31 00:00:00.000', 10);

* Remove snapshots with snapshot ID 10001 and 10002 (note that these snapshot IDs should not be the current snapshot)::

CALL iceberg.system.expire_snapshots(schema => 'schema_name', table_name => 'table_name', snapshot_ids => ARRAY[10001, 10002]);

Schema Evolution
-----------------

Expand Down
Expand Up @@ -194,9 +194,9 @@ protected final Table getIcebergTable(ConnectorSession session, SchemaTableName

protected abstract boolean tableExists(ConnectorSession session, SchemaTableName schemaTableName);

protected abstract void registerTable(ConnectorSession clientSession, SchemaTableName schemaTableName, Path metadataLocation);
public abstract void registerTable(ConnectorSession clientSession, SchemaTableName schemaTableName, Path metadataLocation);

protected abstract void unregisterTable(ConnectorSession clientSession, SchemaTableName schemaTableName);
public abstract void unregisterTable(ConnectorSession clientSession, SchemaTableName schemaTableName);

/**
* This class implements the default implementation for getTableLayoutForConstraint which will be used in the case of a Java Worker
Expand Down
Expand Up @@ -39,6 +39,10 @@
import com.facebook.presto.hive.gcs.HiveGcsConfigurationInitializer;
import com.facebook.presto.iceberg.nessie.NessieConfig;
import com.facebook.presto.iceberg.optimizer.IcebergPlanOptimizerProvider;
import com.facebook.presto.iceberg.procedure.ExpireSnapshotsProcedure;
import com.facebook.presto.iceberg.procedure.RegisterTableProcedure;
import com.facebook.presto.iceberg.procedure.RollbackToSnapshotProcedure;
import com.facebook.presto.iceberg.procedure.UnregisterTableProcedure;
import com.facebook.presto.orc.CachingStripeMetadataSource;
import com.facebook.presto.orc.DwrfAwareStripeMetadataSourceFactory;
import com.facebook.presto.orc.EncryptionLibrary;
Expand Down Expand Up @@ -146,6 +150,7 @@ public void setup(Binder binder)
procedures.addBinding().toProvider(RollbackToSnapshotProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(RegisterTableProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(UnregisterTableProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(ExpireSnapshotsProcedure.class).in(Scopes.SINGLETON);

// for orc
binder.bind(EncryptionLibrary.class).annotatedWith(HiveDwrfEncryptionProvider.ForCryptoService.class).to(UnsupportedEncryptionLibrary.class).in(Scopes.SINGLETON);
Expand Down
@@ -0,0 +1,107 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg.procedure;

import com.facebook.presto.common.type.SqlTimestamp;
import com.facebook.presto.iceberg.IcebergAbstractMetadata;
import com.facebook.presto.iceberg.IcebergMetadataFactory;
import com.facebook.presto.iceberg.IcebergUtil;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
import com.facebook.presto.spi.procedure.Procedure;
import com.google.common.collect.ImmutableList;
import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.Table;

import javax.inject.Inject;
import javax.inject.Provider;

import java.lang.invoke.MethodHandle;
import java.util.List;

import static com.facebook.presto.common.block.MethodHandleUtil.methodHandle;
import static com.facebook.presto.common.type.StandardTypes.INTEGER;
import static com.facebook.presto.common.type.StandardTypes.TIMESTAMP;
import static com.facebook.presto.common.type.StandardTypes.VARCHAR;
import static java.util.Objects.requireNonNull;

public class ExpireSnapshotsProcedure
implements Provider<Procedure>
{
private static final MethodHandle EXPIRE_SNAPSHOTS = methodHandle(
ExpireSnapshotsProcedure.class,
"expireSnapshots",
ConnectorSession.class,
String.class,
String.class,
SqlTimestamp.class,
Integer.class,
List.class);
private final IcebergMetadataFactory metadataFactory;

@Inject
public ExpireSnapshotsProcedure(IcebergMetadataFactory metadataFactory)
{
this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null");
}

@Override
public Procedure get()
{
return new Procedure(
"system",
"expire_snapshots",
ImmutableList.of(
new Procedure.Argument("schema", VARCHAR),
new Procedure.Argument("table_name", VARCHAR),
new Procedure.Argument("older_than", TIMESTAMP, false, null),
new Procedure.Argument("retain_last", INTEGER, false, null),
new Procedure.Argument("snapshot_ids", "array(bigint)", false, null)),
EXPIRE_SNAPSHOTS.bindTo(this));
}

public void expireSnapshots(ConnectorSession clientSession, String schema, String tableName, SqlTimestamp olderThan, Integer retainLast, List<Long> snapshotIds)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
doExpireSnapshots(clientSession, schema, tableName, olderThan, retainLast, snapshotIds);
}
}

private void doExpireSnapshots(ConnectorSession clientSession, String schema, String tableName, SqlTimestamp olderThan, Integer retainLast, List<Long> snapshotIds)
{
IcebergAbstractMetadata metadata = (IcebergAbstractMetadata) metadataFactory.create();
SchemaTableName schemaTableName = new SchemaTableName(schema, tableName);
Table icebergTable = IcebergUtil.getIcebergTable(metadata, clientSession, schemaTableName);

ExpireSnapshots expireSnapshots = icebergTable.expireSnapshots();

if (snapshotIds != null) {
for (long id : snapshotIds) {
expireSnapshots = expireSnapshots.expireSnapshotId(id);
}
}

if (olderThan != null) {
expireSnapshots = expireSnapshots.expireOlderThan(olderThan.isLegacyTimestamp() ? olderThan.getMillisUtc() : olderThan.getMillis());
}

if (retainLast != null) {
expireSnapshots = expireSnapshots.retainLast(retainLast);
}

expireSnapshots.cleanExpiredFiles(true)
.commit();
}
}
Expand Up @@ -11,10 +11,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;
package com.facebook.presto.iceberg.procedure;

import com.facebook.presto.hive.HdfsContext;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.iceberg.IcebergAbstractMetadata;
import com.facebook.presto.iceberg.IcebergMetadataFactory;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.PrestoWarning;
Expand Down
Expand Up @@ -11,8 +11,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;
package com.facebook.presto.iceberg.procedure;

import com.facebook.presto.iceberg.IcebergMetadataFactory;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.connector.ConnectorMetadata;
Expand Down
Expand Up @@ -11,10 +11,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;
package com.facebook.presto.iceberg.procedure;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.iceberg.IcebergConfig;
import com.facebook.presto.iceberg.IcebergMetadataFactory;
import com.facebook.presto.iceberg.IcebergResourceFactory;
import com.facebook.presto.iceberg.IcebergTableName;
import com.facebook.presto.iceberg.IcebergUtil;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.connector.ConnectorMetadata;
Expand Down
Expand Up @@ -11,8 +11,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;
package com.facebook.presto.iceberg.procedure;

import com.facebook.presto.iceberg.IcebergAbstractMetadata;
import com.facebook.presto.iceberg.IcebergMetadataFactory;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.SchemaNotFoundException;
import com.facebook.presto.spi.SchemaTableName;
Expand Down
Expand Up @@ -42,8 +42,8 @@
import static com.facebook.presto.iceberg.IcebergQueryRunner.TEST_CATALOG_DIRECTORY;
import static com.facebook.presto.iceberg.IcebergQueryRunner.TEST_DATA_DIRECTORY;
import static com.facebook.presto.iceberg.IcebergUtil.MIN_FORMAT_VERSION_FOR_DELETE;
import static com.facebook.presto.iceberg.RegisterTableProcedure.METADATA_FOLDER_NAME;
import static com.facebook.presto.iceberg.TestIcebergRegisterProcedure.getMetadataFileLocation;
import static com.facebook.presto.iceberg.procedure.RegisterTableProcedure.METADATA_FOLDER_NAME;
import static com.facebook.presto.iceberg.procedure.TestIcebergRegisterProcedure.getMetadataFileLocation;
import static com.facebook.presto.testing.MaterializedResult.resultBuilder;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.Iterables.getOnlyElement;
Expand Down