Skip to content

Commit

Permalink
Add migrate endpoint to move artifacts to another storage backend
Browse files Browse the repository at this point in the history
fixes: pulp#3358
  • Loading branch information
gerrod3 committed Aug 17, 2023
1 parent 20afd1e commit aa7fea2
Show file tree
Hide file tree
Showing 10 changed files with 336 additions and 44 deletions.
1 change: 1 addition & 0 deletions CHANGES/3358.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added new `/migrate/` endpoint to Domains that allows for migrating artifacts from one storage backend to another.
2 changes: 1 addition & 1 deletion pulpcore/app/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __init__(self, task, request):
"""
kwargs = {"pk": task.pk}
if settings.DOMAIN_ENABLED:
kwargs["pulp_domain"] = request.pulp_domain.name
kwargs["pulp_domain"] = task.pulp_domain.name
resp = {"task": reverse("tasks-detail", kwargs=kwargs, request=None)}
super().__init__(data=resp, status=202)

Expand Down
2 changes: 1 addition & 1 deletion pulpcore/app/serializers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
SigningServiceSerializer,
SingleArtifactContentSerializer,
)
from .domain import DomainSerializer
from .domain import DomainSerializer, DomainBackendMigratorSerializer
from .exporter import (
ExporterSerializer,
ExportSerializer,
Expand Down
90 changes: 68 additions & 22 deletions pulpcore/app/serializers/domain.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
from gettext import gettext as _
import json

from django.conf import settings
from django.core.files.storage import import_string
from django.utils.encoding import force_bytes, force_str
from django.core.exceptions import ImproperlyConfigured
from drf_spectacular.types import OpenApiTypes
from drf_spectacular.utils import extend_schema_field

from rest_framework import serializers
from rest_framework.validators import UniqueValidator

from pulpcore.app import models
from pulpcore.app.models import Domain, fields
from pulpcore.app.serializers import IdentityField, ModelSerializer, HiddenFieldsMixin


Expand Down Expand Up @@ -278,6 +280,15 @@ def to_internal_value(self, data):
"""Appropriately convert the incoming data based on the Domain's storage class."""
# Handle Creating & Updating
storage_settings = self.root.initial_data.get("storage_settings", {})
if not isinstance(storage_settings, dict):
if isinstance(storage_settings, str):
try:
storage_settings = json.loads(storage_settings)
except json.JSONDecodeError:
raise serializers.ValidationError("Improper JSON string passed in")
else:
raise serializers.ValidationError("Storage settings should be a JSON object.")

if self.root.instance:
storage_class = self.root.instance.storage_class
storage_settings = {**self.root.instance.storage_settings, **storage_settings}
Expand All @@ -293,13 +304,36 @@ def to_internal_value(self, data):
return {"storage_settings": ret}


class DomainSerializer(ModelSerializer):
class BackendSettingsValidator:
"""Mixin to handle validating `storage_class` and `storage_settings`."""

@staticmethod
def _validate_storage_backend(storage_class, storage_settings):
"""Ensure that the backend can be used."""
try:
backend = import_string(storage_class)
except (ImportError, ImproperlyConfigured):
raise serializers.ValidationError(
detail={"storage_class": _("Backend is not installed on Pulp.")}
)

try:
backend(**storage_settings)
except ImproperlyConfigured as e:
raise serializers.ValidationError(
detail={
"storage_settings": _("Backend settings contain incorrect values: {}".format(e))
}
)


class DomainSerializer(BackendSettingsValidator, ModelSerializer):
"""Serializer for Domain."""

pulp_href = IdentityField(view_name="domains-detail")
name = serializers.SlugField(
help_text=_("A name for this domain."),
validators=[UniqueValidator(queryset=models.Domain.objects.all())],
validators=[UniqueValidator(queryset=Domain.objects.all())],
)
description = serializers.CharField(
help_text=_("An optional description."), required=False, allow_null=True
Expand All @@ -326,24 +360,6 @@ def validate_name(self, value):
raise serializers.ValidationError(_("Name can not be 'api' or 'content'."))
return value

def _validate_storage_backend(self, storage_class, storage_settings):
"""Ensure that the backend can be used."""
try:
backend = import_string(storage_class)
except (ImportError, ImproperlyConfigured):
raise serializers.ValidationError(
detail={"storage_class": _("Backend is not installed on Pulp.")}
)

try:
backend(**storage_settings)
except ImproperlyConfigured as e:
raise serializers.ValidationError(
detail={
"storage_settings": _("Backend settings contain incorrect values: {}".format(e))
}
)

def validate(self, data):
"""Ensure that Domain settings are valid."""
# Validate for update gets called before ViewSet default check
Expand All @@ -369,7 +385,7 @@ def validate(self, data):
return data

class Meta:
model = models.Domain
model = Domain
fields = ModelSerializer.Meta.fields + (
"name",
"description",
Expand All @@ -378,3 +394,33 @@ class Meta:
"redirect_to_object_storage",
"hide_guarded_distributions",
)


class DomainBackendMigratorSerializer(BackendSettingsValidator, serializers.Serializer):
"""Special serializer for performing a storage backend migration on a Domain."""

storage_class = serializers.ChoiceField(
help_text=_("The new backend storage class to migrate to."),
choices=BACKEND_CHOICES,
)
storage_settings = StorageSettingsSerializer(
source="*", help_text=_("The settings for the new storage class to migrate to.")
)

def encrypt_data(self):
"""Returns the data in the serializer as an encrypted string."""
value = json.dumps(self.validated_data)
return force_str(fields._fernet().encrypt(force_bytes(value)))

@classmethod
def decrypt_data(cls, encrypted_data):
"""Returns a JSON object from the decrypted string."""
value = force_str(fields._fernet().decrypt(force_bytes(encrypted_data)))
return json.loads(value)

def validate(self, data):
"""Validate new backend settings."""
storage_class = data["storage_class"]
storage_settings = data["storage_settings"]
self._validate_storage_backend(storage_class, storage_settings)
return data
2 changes: 2 additions & 0 deletions pulpcore/app/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

from .importer import pulp_import

from .migrate import migrate_backend

from .orphan import orphan_cleanup

from .purge import purge
Expand Down
60 changes: 60 additions & 0 deletions pulpcore/app/tasks/migrate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import logging
from gettext import gettext as _

from django.utils.timezone import now
from pulpcore.app.models import Artifact, Domain, storage, ProgressReport
from pulpcore.app.serializers import DomainBackendMigratorSerializer
from pulpcore.app.util import get_domain
from pulpcore.constants import TASK_STATES

_logger = logging.getLogger(__name__)


def migrate_backend(data):
"""
Copy the artifacts from the current storage backend to a new one. Then update backend settings.
Args:
data (str): encrypted json string of the new storage backend settings
"""
data = DomainBackendMigratorSerializer.decrypt_data(data)
domain = get_domain()
old_storage = domain.get_storage()
new_storage = Domain(**data).get_storage()

artifacts = Artifact.objects.filter(pulp_domain=domain)
date = now()

with ProgressReport(
message=_("Migrating Artifacts"), code="migrate", total=artifacts.count()
) as pb:
while True:
for digest in pb.iter(artifacts.values_list("sha256", flat=True)):
filename = storage.get_artifact_path(digest)
if not new_storage.exists(filename):
new_storage.save(filename, old_storage.open(filename))
# Handle new artifacts saved by the content app
artifacts = Artifact.objects.filter(pulp_domain=domain, pulp_created__gte=date)
if count := artifacts.count():
pb.total += count
pb.save()
date = now()
continue
break

# Update the current domain to the new storage backend settings
msg = _("Update Domain({domain})'s Backend Settings").format(domain=domain.name)
with ProgressReport(message=msg, code="update", total=1) as pb:
# Special handling for default domain
if domain.name == "default":
msg = _(
"PLEASE UPDATE THE CONFIG FILE WITH THE NEW STORAGE SETTINGS BEFORE RESTARTING PULP!"
)
_logger.warning(msg)
ProgressReport(message=msg, code="URGENT", state=TASK_STATES.SKIPPED).save()
pb.state = TASK_STATES.SKIPPED
else:
domain.storage_class = data["storage_class"]
domain.storage_settings = data["storage_settings"]
domain.save(update_fields=["storage_class", "storage_settings"])
pb.increment()
48 changes: 46 additions & 2 deletions pulpcore/app/viewsets/domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,21 @@

from drf_spectacular.utils import extend_schema
from rest_framework import mixins
from rest_framework.decorators import action
from rest_framework.exceptions import ValidationError

from pulpcore.filters import BaseFilterSet
from pulpcore.app.models import Domain
from pulpcore.app.serializers import DomainSerializer, AsyncOperationResponseSerializer
from pulpcore.app.response import OperationPostponedResponse
from pulpcore.app.serializers import (
DomainSerializer,
DomainBackendMigratorSerializer,
AsyncOperationResponseSerializer,
)
from pulpcore.app.tasks import migrate_backend
from pulpcore.app.viewsets import NamedModelViewSet, AsyncRemoveMixin, AsyncUpdateMixin
from pulpcore.app.viewsets.base import NAME_FILTER_OPTIONS
from pulpcore.tasking.tasks import dispatch


class DomainFilter(BaseFilterSet):
Expand Down Expand Up @@ -57,7 +65,7 @@ class DomainViewSet(
"condition": "has_model_or_obj_perms:core.view_domain",
},
{
"action": ["update", "partial_update"],
"action": ["update", "partial_update", "migrate"],
"principal": "authenticated",
"effect": "allow",
"condition": "has_model_or_obj_perms:core.change_domain",
Expand Down Expand Up @@ -118,3 +126,39 @@ def destroy(self, request, pk, **kwargs):
raise ValidationError(_("Default domain can not be deleted."))

return super().destroy(request, pk, **kwargs)

@extend_schema(
summary="Migrate storage backend",
request=DomainBackendMigratorSerializer,
responses={202: AsyncOperationResponseSerializer},
)
@action(detail=True, methods=["patch"])
def migrate(self, request, **kwargs):
"""
Migrate the domain's storage backend to a new one.
Launches a background task to copy the domain's artifacts over to the supplied storage
backend. Then updates the domain's storage settings to the new storage backend. This task
does not delete the stored files of the artifacts from the previous backend.
**IMPORTANT** This task will block all other tasks within the domain until the migration is
completed, essentially putting the domain into a read only state. Content will still be
served from the old storage backend until the migration has completed, so don't remove
the old backend until then. Note, if you are performing this operation on the default domain
then you must update the backend settings in your config file after the migration and then
restart Pulp to start serving from the new backend.
This feature is in Tech Preview and is subject to future change and thus not guaranteed to
be backwards compatible.
"""
instance = self.get_object()
serializer = DomainBackendMigratorSerializer(data=request.data)
serializer.is_valid(raise_exception=True)

task = dispatch(
migrate_backend,
args=(serializer.encrypt_data(),),
exclusive_resources=[instance],
domain=instance,
)
return OperationPostponedResponse(task, request)
15 changes: 10 additions & 5 deletions pulpcore/tasking/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import traceback
import sys

from django.conf import settings
from django.db import transaction, connection
from django.db.models import Model, Q
from django.utils import timezone
Expand Down Expand Up @@ -88,6 +87,7 @@ def dispatch(
immediate=False,
deferred=True,
versions=None,
domain=None,
):
"""
Enqueue a message to Pulp workers with a reservation.
Expand Down Expand Up @@ -118,6 +118,8 @@ def dispatch(
`True`. `immediate` and `deferred` cannot both be `False`.
versions (Optional[Dict[str, str]]): Minimum versions of components by app_label the worker
must provide to handle the task.
domain (Optional(pulpcore.app.models.Domain)): A specific domain you want the task to run
in. Defaults to get_domain().
Returns (pulpcore.app.models.Task): The Pulp Task that was created.
Expand Down Expand Up @@ -153,10 +155,12 @@ def dispatch(
shared_resources = []
else:
shared_resources = _validate_and_get_resources(shared_resources)
if settings.DOMAIN_ENABLED:
domain_url = get_url(get_domain())
if domain_url not in exclusive_resources:
shared_resources.append(domain_url)

# A task that is exclusive on a domain will block all tasks within that domain
domain = domain or get_domain()
domain_url = get_url(domain)
if domain_url not in exclusive_resources:
shared_resources.append(domain_url)
resources = exclusive_resources + [f"shared:{resource}" for resource in shared_resources]

notify_workers = False
Expand All @@ -172,6 +176,7 @@ def dispatch(
parent_task=Task.current(),
reserved_resources_record=resources,
versions=versions,
pulp_domain=domain,
)
if immediate:
# Grab the advisory lock before the task hits the db.
Expand Down

0 comments on commit aa7fea2

Please sign in to comment.