Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task collection scheme seems to invalidate batch_method=sum #3241

Open
kstech-roadie opened this issue May 12, 2023 · 2 comments
Open

task collection scheme seems to invalidate batch_method=sum #3241

kstech-roadie opened this issue May 12, 2023 · 2 comments

Comments

@kstech-roadie
Copy link

kstech-roadie commented May 12, 2023

TL;DR: Instances of Task are apparently never considered unique per instance, instead they are unique by a task_id that is a string of its class name and parameter values. This creates Task instance parsimony, which would be a nice optional feature. It does not however appear to be optional, which limits the framework's flexibility by conflicting with many use cases for the batching mechanism. So, the batch_method feature seems to have only been thought of in the context of min and max which are tolerant of the parsimonious instantiation approach, but it breaks batching functions that rely on all values being present such as len, sum, average, variance, etc.

I'm passing via IntParameter a count of desired/requested resources to a class named Resource(luigi.Task) that creates these resources against an API concurrently and then outputs their IDs for the requesting Task to use. Many dependencies call this task with their desired count of resources passed into the IntParameter. I am declaring this parameter like IntParameter(batch_method=sum) and its working very well....

...except for the frequently occurring case of asking for the same number of resources. All the Resource(1) requests turn into a single Task and the specified batch_method=sum never happens. The summation only happens for for unique Resource/Param combinations.

Is there any way around this issue? Initially I tried adding a "salt" Param and passing in values from randint. I also tried generating various fixed salts and hashes on objects, classes and modules. If the salt param is significant then nothing is summation-batched. If its insignificant, then I'm back to the old behavior. I also tried overriding task_id. None of this works. Looking for some kind of solution that unblocks what seems like a perfectly fitting use case for Luigi.

UPDATE 1:

I can force int values to to be unique by sending them as FloatParameter with unique decimal values and then casting back to int when they arrive in my Task's run method. this allows batch_method=sum to do its job without interference from Task instance parsimony behavior. I would prefer to extend Task or Parameter classes to fold this behavior in, and make the consumer unaware of how their CountParameter(luigi.Parameter) class is causing this to happen. Tips and pointers appreciated.

UPDATE 2:

If I could just get my CountParameter to somehow return int values to the Task I would be content with this hack:

class CountParameter(Parameter):
    _salt_base = 1e-09
    _salt = 0

    def normalize(self, x):
        if x == int(x):
            self._salt += self._salt_base
            return x + self._salt
        return x

    def parse(self, x):
        return float(x)

Using this parameter class I can declare a Task like:

class Resource(Task):
    pm = CountParameter(batch_method=sum)

And inside my run method:

    def run(self):
        pm = int(self.pm)

I've spent a fair bit of time reading the code and I'm not seeing another way, but I would love to have a cleaner solution to this.

Pros:

  • Works. If I have 10 calls to Resource(pm=1) then I get a dynamically built Task of Resource(pm=10.00000003) or similar that I can cast back to int and get my desired result.

Cons:

  • task_ids look crazy with lots of strange float notation when user passed ints.
  • user has to know to cast back to int, leaking implementation across the Parameter interface and into the user's Task implementation

Really it is this last detail that irks me. I don't care much if the task_ids in the console out look crazy. But I really need to be able to cast these floats back to int before passing back to the calling Task. Either that or find a whole new work around for this issue.

@kstech-roadie kstech-roadie changed the title batch_method=sum conflicts with task memoization? task collection scheme seems to invalidate batch_method=sum May 13, 2023
@kstech-roadie
Copy link
Author

The Task/Parameter parsimony design decision irks me. It seems to be embedded into the core assumptions of this framework with no way to disable it. It annoys me that the very identity of a Task instance is defined by its class name and parameters. This goes against the object oriented design that that framework builds on and benefits from. While using unique object identity itself, it then cuts off and buries this functionality, disallowing the user to benefit from it. While this would have made a nice optional feature, currently this decision seems to completely cut off a lot of flexibility. I am starting to think I should have just built my project in RxPy or something that exposes the underlying language features and leaves them open to extension rather than forcing opinionated view of what a Task or a Parameter is, and how it may be used.

I would love to be wrong about this point of view. Please correct me if I am mistaken and I'm overlooking a framework feature that allows for this.

@kstech-roadie
Copy link
Author

kstech-roadie commented May 16, 2023

For anyone who comes across this post and reads this far. I would just say to you that Luigi is a funky, clunky, hacky, magical system that can able simple workflows with heavier processing needs. If you are building complex workflows that are more of the cooperative concurrency flavor, take a look at reactivex, rxpy and especially aioreactive which really brings it all together in a tidy package. I'm not sure why Luigi has so many stars and forks. It seems to occupy a space where its neither computationally powerful like distributed platforms Airflow and Spark, nor programmatically robust/flexible like ReactiveX. It is very specifically single node multiprocessing. I did not appreciate that when I started.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant