Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deferred serializer #3997

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 20 additions & 0 deletions pulpcore/app/migrations/0109_task_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Generated by Django 4.2.3 on 2023-07-10 09:05

import django.core.serializers.json
from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("core", "0108_task_versions"),
]

operations = [
migrations.AddField(
model_name="task",
name="result",
field=models.JSONField(
encoder=django.core.serializers.json.DjangoJSONEncoder, null=True
),
),
]
19 changes: 15 additions & 4 deletions pulpcore/app/models/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from pulpcore.constants import TASK_CHOICES, TASK_INCOMPLETE_STATES, TASK_STATES
from pulpcore.exceptions import AdvisoryLockError, exception_to_dict
from pulpcore.app.util import get_domain_pk, current_task
from pulpcore.app.loggers import deprecation_logger

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -174,6 +175,7 @@ class Task(BaseModel, AutoAddObjPermsMixin):

args = models.JSONField(null=True, encoder=DjangoJSONEncoder)
kwargs = models.JSONField(null=True, encoder=DjangoJSONEncoder)
result = models.JSONField(null=True, encoder=DjangoJSONEncoder)

worker = models.ForeignKey("Worker", null=True, related_name="tasks", on_delete=models.SET_NULL)

Expand Down Expand Up @@ -247,17 +249,26 @@ def set_running(self):
with suppress(AttributeError):
del self.error

def set_completed(self):
def set_completed(self, result):
"""
Set this Task to the completed state, save it, and log output in warning cases.

This updates the :attr:`finished_at` and sets the :attr:`state` to :attr:`COMPLETED`.
"""
# Only set the state to finished if it's running. This is important for when the task has
# been canceled, so we don't move the task from canceled to finished.
rows = Task.objects.filter(pk=self.pk, state=TASK_STATES.RUNNING).update(
state=TASK_STATES.COMPLETED, finished_at=timezone.now()
)
try:
rows = Task.objects.filter(pk=self.pk, state=TASK_STATES.RUNNING).update(
state=TASK_STATES.COMPLETED, finished_at=timezone.now(), result=result
)
except TypeError:
deprecation_logger.warning(
f"Task {self.pk} returned unserialized value."
" This is deprecated and will be turned into an error with pulpcore>=3.40."
)
rows = Task.objects.filter(pk=self.pk, state=TASK_STATES.RUNNING).update(
state=TASK_STATES.COMPLETED, finished_at=timezone.now()
)
if rows != 1:
raise RuntimeError(
_("Task set_completed() occurred but Task {} is not RUNNING.").format(self.pk)
Expand Down
11 changes: 11 additions & 0 deletions pulpcore/app/serializers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ def _patched_reverse(viewname, args=None, kwargs=None, **extra):
return reverse


class DeferredSerializerMixin:
def validate(self, data):
data = super().validate(data)
if "task" in self.context:
data = self.deferred_validate(data)
return data

def deferred_validate(self, data):
return data


class HrefFieldMixin:
"""A mixin to configure related fields to generate relative hrefs."""

Expand Down
51 changes: 51 additions & 0 deletions pulpcore/app/serializers/orphans.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
from gettext import gettext as _

from django.conf import settings
from rest_framework import fields, serializers

from pulpcore.app.models import Content
from pulpcore.app.serializers import ValidateFieldsMixin
from pulpcore.app.tasks.base import general_serializer_task
from pulpcore.tasking.tasks import dispatch


class OrphansCleanupSerializer(serializers.Serializer, ValidateFieldsMixin):
content_hrefs = fields.ListField(
required=False,
help_text=_("Will delete specified content and associated Artifacts if they are orphans."),
write_only=True,
)
orphan_protection_time = serializers.IntegerField(
help_text=(
Expand All @@ -21,6 +25,13 @@ class OrphansCleanupSerializer(serializers.Serializer, ValidateFieldsMixin):
),
allow_null=True,
required=False,
write_only=True,
)
deleted_artifacts = serializers.IntegerField(
read_only=True,
)
deleted_content = serializers.IntegerField(
read_only=True,
)

def validate_content_hrefs(self, value):
Expand All @@ -42,3 +53,43 @@ def validate_content_hrefs(self, value):
pks_to_return.append(NamedModelViewSet.get_resource(href, Content).pk)

return pks_to_return

def validate_orphan_protection_time(self, value):
if value is None:
value = settings.ORPHAN_PROTECTION_TIME
return value

def resources(self):
uri = "/api/v3/orphans/cleanup/"
if settings.DOMAIN_ENABLED:
request = self.context["request"]
uri = f"/{request.pulp_domain.name}{uri}"
return [uri], None

def cleanup(self, validated_data):
assert "task" in self.context

from pulpcore.app.tasks.orphan import orphan_cleanup

# TODO Think about moving the implementation here.

orphan_cleanup(**validated_data)
return {
"deleted_" + pr.code.split(".")[1]: pr.done
for pr in self.context["task"].progress_reports.all()
}

def dispatch(self, method):
serializer_id = self.__class__.__module__ + ":" + self.__class__.__qualname__
exclusive_resources, shared_resources = self.resources()
return dispatch(
general_serializer_task,
exclusive_resources=exclusive_resources,
shared_resources=shared_resources,
kwargs={
"serializer_id": serializer_id,
"method": method,
"data": self.initial_data,
"partial": self.partial,
},
)
5 changes: 5 additions & 0 deletions pulpcore/app/serializers/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ class TaskSerializer(ModelSerializer):
help_text=_("A list of resources required by that task."),
read_only=True,
)
result = serializers.JSONField(
help_text=_("The result of the task."),
read_only=True,
)

def get_created_by(self, obj):
if task_user_map := self.context.get("task_user_mapping"):
Expand Down Expand Up @@ -114,6 +118,7 @@ class Meta:
"progress_reports",
"created_resources",
"reserved_resources_record",
"result",
)


Expand Down
28 changes: 27 additions & 1 deletion pulpcore/app/tasks/base.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,37 @@
import importlib

from django.db import transaction

from pulpcore.app.apps import get_plugin_config
from pulpcore.app.models import CreatedResource, PulpTemporaryFile
from pulpcore.app.models import CreatedResource, PulpTemporaryFile, Task
from pulpcore.app.files import PulpTemporaryUploadedFile
from pulpcore.plugin.models import MasterModel


def general_serializer_task(
serializer_id, method, instance_id=None, partial=False, data=None, context=None
):
"""
Call a method of a serializer after validating it's data.
"""
if context is None:
context = {}
context["task"] = Task.current()
module_name, serializer_name = serializer_id.rsplit(":")
module = importlib.import_module(module_name)
serializer_class = getattr(module, serializer_name)
if instance_id is None:
serializer = serializer_class(data=data, context=context)
else:
instance = serializer_class.Meta.model.objects.get(pk=instance_id)
if isinstance(instance, MasterModel):
instance = instance.cast()
serializer = serializer_class(instance, data=data, partial=partial)
serializer.is_valid(raise_exception=True)
result = getattr(serializer, method)(serializer.validated_data)
return serializer_class(result).data


def general_create_from_temp_file(app_label, serializer_name, temp_file_pk, *args, **kwargs):
"""
Create a model instance from contents stored in a temporary file.
Expand Down
3 changes: 2 additions & 1 deletion pulpcore/app/tasks/orphan.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def queryset_iterator(qs, batchsize=2000, gc_collect=True):
gc.collect()


def orphan_cleanup(content_pks=None, orphan_protection_time=settings.ORPHAN_PROTECTION_TIME):
def orphan_cleanup(content_hrefs=None, orphan_protection_time=settings.ORPHAN_PROTECTION_TIME):
"""
Delete all orphan Content and Artifact records.
Go through orphan Content multiple times to remove content from subrepos.
Expand All @@ -44,6 +44,7 @@ def orphan_cleanup(content_pks=None, orphan_protection_time=settings.ORPHAN_PROT
content_pks (list): A list of content pks. If specified, only remove these orphans.

"""
content_pks = content_hrefs # Quick hack
with ProgressReport(
message="Clean up orphan Content",
total=None,
Expand Down
21 changes: 2 additions & 19 deletions pulpcore/app/viewsets/orphans.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
from drf_spectacular.utils import extend_schema
from django.conf import settings
from rest_framework.viewsets import ViewSet

from pulpcore.app.response import OperationPostponedResponse
from pulpcore.app.serializers import AsyncOperationResponseSerializer, OrphansCleanupSerializer
from pulpcore.app.tasks import orphan_cleanup
from pulpcore.tasking.tasks import dispatch


class OrphansCleanupViewset(ViewSet):
Expand All @@ -19,21 +16,7 @@ def cleanup(self, request):
"""
Triggers an asynchronous orphan cleanup operation.
"""
serializer = OrphansCleanupSerializer(data=request.data)
serializer = self.serializer_class(data=request.data)
serializer.is_valid(raise_exception=True)

content_pks = serializer.validated_data.get("content_hrefs", None)
orphan_protection_time = serializer.validated_data.get(
"orphan_protection_time", settings.ORPHAN_PROTECTION_TIME
)
uri = "/api/v3/orphans/cleanup/"
if settings.DOMAIN_ENABLED:
uri = f"/{request.pulp_domain.name}{uri}"

task = dispatch(
orphan_cleanup,
exclusive_resources=[uri],
kwargs={"content_pks": content_pks, "orphan_protection_time": orphan_protection_time},
)

task = serializer.dispatch("cleanup")
return OperationPostponedResponse(task, request)
16 changes: 12 additions & 4 deletions pulpcore/tasking/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,15 @@ def _execute_task(task):
_logger.info(_("Starting task %s"), task.pk)

# Execute task
module_name, function_name = task.name.rsplit(".", 1)
try:
module_name, function_name = task.name.rsplit(":")
except ValueError:
deprecation_logger.warning(
"Old task specification found. "
"This will be turned into an error with pulpcore >=3.40."
)
# When removing this, write a data-migration to update existing task entries.
module_name, function_name = task.name.rsplit(".", 1)
module = importlib.import_module(module_name)
func = getattr(module, function_name)
args = task.args or ()
Expand All @@ -65,15 +73,15 @@ def _execute_task(task):
if asyncio.iscoroutine(result):
_logger.debug(_("Task is coroutine %s"), task.pk)
loop = asyncio.get_event_loop()
loop.run_until_complete(result)
result = loop.run_until_complete(result)

except Exception:
exc_type, exc, tb = sys.exc_info()
task.set_failed(exc, tb)
_logger.info(_("Task %s failed (%s)"), task.pk, exc)
_logger.info("\n".join(traceback.format_list(traceback.extract_tb(tb))))
else:
task.set_completed()
task.set_completed(result=result)
_logger.info(_("Task completed %s"), task.pk)


Expand Down Expand Up @@ -127,7 +135,7 @@ def dispatch(
assert deferred or immediate, "A task must be at least `deferred` or `immediate`."

if callable(func):
function_name = f"{func.__module__}.{func.__name__}"
function_name = f"{func.__module__}:{func.__name__}"
else:
function_name = func

Expand Down