Skip to content

Commit

Permalink
WIP Experiment
Browse files Browse the repository at this point in the history
WIP
  • Loading branch information
mdellweg committed Aug 31, 2023
1 parent bcb84d8 commit cfd8985
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 28 deletions.
20 changes: 20 additions & 0 deletions pulpcore/app/migrations/0109_task_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Generated by Django 4.2.3 on 2023-07-10 09:05

import django.core.serializers.json
from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("core", "0108_task_versions"),
]

operations = [
migrations.AddField(
model_name="task",
name="result",
field=models.JSONField(
encoder=django.core.serializers.json.DjangoJSONEncoder, null=True
),
),
]
19 changes: 15 additions & 4 deletions pulpcore/app/models/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from pulpcore.constants import TASK_CHOICES, TASK_INCOMPLETE_STATES, TASK_STATES
from pulpcore.exceptions import AdvisoryLockError, exception_to_dict
from pulpcore.app.util import get_domain_pk, current_task
from pulpcore.app.loggers import deprecation_logger

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -174,6 +175,7 @@ class Task(BaseModel, AutoAddObjPermsMixin):

args = models.JSONField(null=True, encoder=DjangoJSONEncoder)
kwargs = models.JSONField(null=True, encoder=DjangoJSONEncoder)
result = models.JSONField(null=True, encoder=DjangoJSONEncoder)

worker = models.ForeignKey("Worker", null=True, related_name="tasks", on_delete=models.SET_NULL)

Expand Down Expand Up @@ -247,17 +249,26 @@ def set_running(self):
with suppress(AttributeError):
del self.error

def set_completed(self):
def set_completed(self, result):
"""
Set this Task to the completed state, save it, and log output in warning cases.
This updates the :attr:`finished_at` and sets the :attr:`state` to :attr:`COMPLETED`.
"""
# Only set the state to finished if it's running. This is important for when the task has
# been canceled, so we don't move the task from canceled to finished.
rows = Task.objects.filter(pk=self.pk, state=TASK_STATES.RUNNING).update(
state=TASK_STATES.COMPLETED, finished_at=timezone.now()
)
try:
rows = Task.objects.filter(pk=self.pk, state=TASK_STATES.RUNNING).update(
state=TASK_STATES.COMPLETED, finished_at=timezone.now(), result=result
)
except TypeError:
deprecation_logger.warning(
f"Task {self.pk} returned unserialized value."
" This is deprecated and will be turned into an error with pulpcore>=3.40."
)
rows = Task.objects.filter(pk=self.pk, state=TASK_STATES.RUNNING).update(
state=TASK_STATES.COMPLETED, finished_at=timezone.now()
)
if rows != 1:
raise RuntimeError(
_("Task set_completed() occurred but Task {} is not RUNNING.").format(self.pk)
Expand Down
11 changes: 11 additions & 0 deletions pulpcore/app/serializers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ def _patched_reverse(viewname, args=None, kwargs=None, **extra):
return reverse


class DeferredSerializerMixin:
def validate(self, data):
data = super().validate(data)
if "task" in self.context:
data = self.deferred_validate(data)
return data

def deferred_validate(self, data):
return data


class HrefFieldMixin:
"""A mixin to configure related fields to generate relative hrefs."""

Expand Down
51 changes: 51 additions & 0 deletions pulpcore/app/serializers/orphans.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
from gettext import gettext as _

from django.conf import settings
from rest_framework import fields, serializers

from pulpcore.app.models import Content
from pulpcore.app.serializers import ValidateFieldsMixin
from pulpcore.app.tasks.base import general_serializer_task
from pulpcore.tasking.tasks import dispatch


class OrphansCleanupSerializer(serializers.Serializer, ValidateFieldsMixin):
content_hrefs = fields.ListField(
required=False,
help_text=_("Will delete specified content and associated Artifacts if they are orphans."),
write_only=True,
)
orphan_protection_time = serializers.IntegerField(
help_text=(
Expand All @@ -21,6 +25,13 @@ class OrphansCleanupSerializer(serializers.Serializer, ValidateFieldsMixin):
),
allow_null=True,
required=False,
write_only=True,
)
deleted_artifacts = serializers.IntegerField(
read_only=True,
)
deleted_content = serializers.IntegerField(
read_only=True,
)

def validate_content_hrefs(self, value):
Expand All @@ -42,3 +53,43 @@ def validate_content_hrefs(self, value):
pks_to_return.append(NamedModelViewSet.get_resource(href, Content).pk)

return pks_to_return

def validate_orphan_protection_time(self, value):
if value is None:
value = settings.ORPHAN_PROTECTION_TIME
return value

def resources(self):
uri = "/api/v3/orphans/cleanup/"
if settings.DOMAIN_ENABLED:
request = self.context["request"]
uri = f"/{request.pulp_domain.name}{uri}"
return [uri], None

def cleanup(self, validated_data):
assert "task" in self.context

from pulpcore.app.tasks.orphan import orphan_cleanup

# TODO Think about moving the implementation here.

orphan_cleanup(**validated_data)
return {
"deleted_" + pr.code.split(".")[1]: pr.done
for pr in self.context["task"].progress_reports.all()
}

def dispatch(self, method):
serializer_id = self.__class__.__module__ + ":" + self.__class__.__qualname__
exclusive_resources, shared_resources = self.resources()
return dispatch(
general_serializer_task,
exclusive_resources=exclusive_resources,
shared_resources=shared_resources,
kwargs={
"serializer_id": serializer_id,
"method": method,
"data": self.initial_data,
"partial": self.partial,
},
)
5 changes: 5 additions & 0 deletions pulpcore/app/serializers/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ class TaskSerializer(ModelSerializer):
help_text=_("A list of resources required by that task."),
read_only=True,
)
result = serializers.JSONField(
help_text=_("The result of the task."),
read_only=True,
)

def get_created_by(self, obj):
if task_user_map := self.context.get("task_user_mapping"):
Expand Down Expand Up @@ -114,6 +118,7 @@ class Meta:
"progress_reports",
"created_resources",
"reserved_resources_record",
"result",
)


Expand Down
28 changes: 27 additions & 1 deletion pulpcore/app/tasks/base.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,37 @@
import importlib

from django.db import transaction

from pulpcore.app.apps import get_plugin_config
from pulpcore.app.models import CreatedResource, PulpTemporaryFile
from pulpcore.app.models import CreatedResource, PulpTemporaryFile, Task
from pulpcore.app.files import PulpTemporaryUploadedFile
from pulpcore.plugin.models import MasterModel


def general_serializer_task(
serializer_id, method, instance_id=None, partial=False, data=None, context=None
):
"""
Call a method of a serializer after validating it's data.
"""
if context is None:
context = {}
context["task"] = Task.current()
module_name, serializer_name = serializer_id.rsplit(":")
module = importlib.import_module(module_name)
serializer_class = getattr(module, serializer_name)
if instance_id is None:
serializer = serializer_class(data=data, context=context)
else:
instance = serializer_class.Meta.model.objects.get(pk=instance_id)
if isinstance(instance, MasterModel):
instance = instance.cast()
serializer = serializer_class(instance, data=data, partial=partial)
serializer.is_valid(raise_exception=True)
result = getattr(serializer, method)(serializer.validated_data)
return serializer_class(result).data


def general_create_from_temp_file(app_label, serializer_name, temp_file_pk, *args, **kwargs):
"""
Create a model instance from contents stored in a temporary file.
Expand Down
3 changes: 2 additions & 1 deletion pulpcore/app/tasks/orphan.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def queryset_iterator(qs, batchsize=2000, gc_collect=True):
gc.collect()


def orphan_cleanup(content_pks=None, orphan_protection_time=settings.ORPHAN_PROTECTION_TIME):
def orphan_cleanup(content_hrefs=None, orphan_protection_time=settings.ORPHAN_PROTECTION_TIME):
"""
Delete all orphan Content and Artifact records.
Go through orphan Content multiple times to remove content from subrepos.
Expand All @@ -44,6 +44,7 @@ def orphan_cleanup(content_pks=None, orphan_protection_time=settings.ORPHAN_PROT
content_pks (list): A list of content pks. If specified, only remove these orphans.
"""
content_pks = content_hrefs # Quick hack
with ProgressReport(
message="Clean up orphan Content",
total=None,
Expand Down
21 changes: 2 additions & 19 deletions pulpcore/app/viewsets/orphans.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
from drf_spectacular.utils import extend_schema
from django.conf import settings
from rest_framework.viewsets import ViewSet

from pulpcore.app.response import OperationPostponedResponse
from pulpcore.app.serializers import AsyncOperationResponseSerializer, OrphansCleanupSerializer
from pulpcore.app.tasks import orphan_cleanup
from pulpcore.tasking.tasks import dispatch


class OrphansCleanupViewset(ViewSet):
Expand All @@ -19,21 +16,7 @@ def cleanup(self, request):
"""
Triggers an asynchronous orphan cleanup operation.
"""
serializer = OrphansCleanupSerializer(data=request.data)
serializer = self.serializer_class(data=request.data)
serializer.is_valid(raise_exception=True)

content_pks = serializer.validated_data.get("content_hrefs", None)
orphan_protection_time = serializer.validated_data.get(
"orphan_protection_time", settings.ORPHAN_PROTECTION_TIME
)
uri = "/api/v3/orphans/cleanup/"
if settings.DOMAIN_ENABLED:
uri = f"/{request.pulp_domain.name}{uri}"

task = dispatch(
orphan_cleanup,
exclusive_resources=[uri],
kwargs={"content_pks": content_pks, "orphan_protection_time": orphan_protection_time},
)

task = serializer.dispatch("cleanup")
return OperationPostponedResponse(task, request)
7 changes: 4 additions & 3 deletions pulpcore/tasking/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ def _execute_task(task):
module_name, function_name = task.name.rsplit(":")
except ValueError:
deprecation_logger.warning(
"Old task specification found. This will be turned into an error with pulpcore >=3.40."
"Old task specification found. "
"This will be turned into an error with pulpcore >=3.40."
)
# When removing this, write a data-migration to update existing task entries.
module_name, function_name = task.name.rsplit(".", 1)
Expand All @@ -72,15 +73,15 @@ def _execute_task(task):
if asyncio.iscoroutine(result):
_logger.debug(_("Task is coroutine %s"), task.pk)
loop = asyncio.get_event_loop()
loop.run_until_complete(result)
result = loop.run_until_complete(result)

except Exception:
exc_type, exc, tb = sys.exc_info()
task.set_failed(exc, tb)
_logger.info(_("Task %s failed (%s)"), task.pk, exc)
_logger.info("\n".join(traceback.format_list(traceback.extract_tb(tb))))
else:
task.set_completed()
task.set_completed(result=result)
_logger.info(_("Task completed %s"), task.pk)


Expand Down

0 comments on commit cfd8985

Please sign in to comment.