Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] non time-based windows / epochs #284

Open
rabernat opened this issue Aug 18, 2023 · 5 comments
Open

[FEATURE] non time-based windows / epochs #284

rabernat opened this issue Aug 18, 2023 · 5 comments
Labels
type:feature New feature request

Comments

@rabernat
Copy link
Contributor

Thanks for this amazing library! 馃檹 I'm very excited to be able to play with timely dataflow concepts from Python.

Is your feature request related to a problem? Please describe.

One thing I like about the timely dataflow paradigm is this definition of "timestamps"

The standard approach to this problem is to install timestamps on the data. Each record gets a logical timestamp associated with it that indicates when it should be thought to happen. This is not necessarily "when" in terms of the date, time, or specific nanosecond the record was emitted; a timestamp could simply be a sequence number identifying a batch of input records.

I would like to do exactly this with bytewax: create a clock that is just based on an integer sequence number (which I control from another system which is emitting records to be processed). I don't care about actual date. But I still want to apply windowed streaming operations over this dimension.

I read the guides and the API docs for windows, which state right at the top: "Time-based windows." I could not find any examples of anything other than time-based windows being supported.

Describe the solution you'd like

If this is supported, I'd like to see documentation about how to create events and windows using a "clock" that is not datetime based at all but rather uses a custom, user-defined integer-based clock. I'm thinking something like example from section 1.2 of the timely dataflow book.

If it is not yet supported, I'd be curious to know how hard it would be to add.

Describe alternatives you've considered

I have considered just spoofing what I want by abusing datetimes. I could define an EventClockConfig that converts my integer to some known timestamp, e.g.

@dataclass
class MyEvent:
    sequence_numer: int

align_to = datetime(2022, 1, 1, tzinfo=timezone.utc)

clock_config = EventClockConfig(
    lambda e: align_to + timedelta(seconds=e.sequence_number)
)

However, this feels inefficient. Working with datatimes will always be slower and more error-prone than with the much simpler integer type.

@github-actions github-actions bot added the needs triage New issue, needs triage label Aug 18, 2023
@Psykopear
Copy link
Contributor

If I understood correctly, you can achieve what you want by writing a custom input source.
One recent example I worked on, periodic input does something similar. It emits an increasing counter every x seconds. It uses the newest next_awake method (still unreleased) to avoid spurious wake ups, but you can do that with the current version of bytewax too, without the optimization.

Then, you can use the reduce operator and check the sequence number in the is_complete function to decide when to open/close windows manually. Or you can use stateful_map if you need to do more complicated stuff.

As an example, you can add a reduce operator to the periodic input and have it emit windows everytime the counter % 5 is 0 like this:

from datetime import datetime, timedelta, timezone

from bytewax.connectors.stdio import StdOutput
from bytewax.dataflow import Dataflow
from bytewax.inputs import DynamicInput, StatelessSource


class PeriodicSource(StatelessSource):
    def __init__(self, frequency):
        self.frequency = frequency
        self._next_awake = datetime.now(timezone.utc)
        self._counter = 0

    def next_awake(self):
        return self._next_awake

    def next_batch(self):
        self._counter += 1
        delay = datetime.now(timezone.utc) - self._next_awake
        self._next_awake += self.frequency
        return [(self._counter, f"delay (ms): {delay.total_seconds() * 1000:.3f}")]


class PeriodicInput(DynamicInput):
    def __init__(self, frequency):
        self.frequency = frequency

    def build(self, worker_index, worker_count):
        return PeriodicSource(frequency=self.frequency)


def reducer(acc, data):
    if isinstance(acc, list):
        acc.append(data)
    else:
        acc = [acc, data]
    return acc


def is_complete(acc):
    if not isinstance(acc, list):
        return False
    counter = acc[-1][0]
    return counter % 5 == 0


stateless_flow = Dataflow()
stateless_flow.input("periodic", PeriodicInput(timedelta(seconds=1)))
stateless_flow.map(lambda x: ("ALL", x))
stateless_flow.reduce("counter_window", reducer, is_complete)
stateless_flow.output("stdout", StdOutput())

Is that in line with what you wanted?

@rabernat
Copy link
Contributor Author

rabernat commented Aug 18, 2023

Thanks for the reply @Psykopear!

I tried to run your example, but it didn't work. I got this error

TypeError: (src/run.rs:133:49) worker error
Caused by => TypeError: (src/worker.rs:149:14) error building Dataflow
Caused by => TypeError: (src/worker.rs:411:30) error building DynamicInput
Caused by => TypeError: (src/inputs.rs:443:14) error building DynamicInput
Caused by => TypeError: Can't instantiate abstract class PeriodicSource with abstract method next
Traceback (most recent call last):
  File "*/periodic.py", line 29, in build
    return PeriodicSource(frequency=self.frequency)

I tried changing

-     def next_batch(self):
+    def next(self):

to match the API docs. Then I ended up with this error

  File "*/periodic.py", line 44, in is_complete
    return counter % 5 == 0
           ~~~~~~~~^~~
TypeError: unsupported operand type(s) for %: 'tuple' and 'int'

at which point I stopped trying to debug.


I guess I still don't understand whether you example is still using a time-based clock or not. There is still a lot of time-related stuff in the code.

Perhaps it would be helpful if you could explain the Source API a bit more. How does one control the timestamp associated with a Source when emitting events?

@Psykopear
Copy link
Contributor

sorry, I see now that the example is a bit out of scope regarding what you asked.

I guess I still don't understand whether you example is still using a time-based clock or not. There is still a lot of time-related stuff in the code.

Not really, it's just using datetime to emit an event each second, but that's unrelated to your question, and it's probably making things less clear.

Perhaps it would be helpful if you could explain the Source API a bit more. How does one control the timestamp associated with a Source when emitting events?

you don't have direct control over timely's epochs/timestamps, since we use those internally and hide them to the Python layer. So if you want to use a sequence number as a reference to window items or anything else, you need to add it to the data, but I see that you already did this with the sequence_number in the event. I'm not sure how you are adding that to the data (or if it's present in the original data), but in general if it's not in the data itself, the place to do this is in the input source, so that you can properly use the recovery system to handle the state between executions. But that's not your main question.

The point of my previous answer is that you can manually implement windowing logic using the reduce operator, since as you noted, our own window operators are tied to some concept of time (either system or event time).

But I see your point now, having windowing operators able to work with any kind of orderable/comparable data type makes sense, but it will need some changes in the api of the operators. We'll talk about this with the rest of the team.

Thanks for opening this, and sorry for the initial confusion.


Just for completeness, here's a simplified version of the previous example. I use the TestingInput to generate a growing sequence number as example input, then use reduce to create "windows" of items that are tied to the sequence_number value. In this case, it only closes a window when the sequence_number is a multiple of 5, but that's just an example of what you can do:

from bytewax.connectors.stdio import StdOutput
from bytewax.dataflow import Dataflow
from bytewax.testing import TestingInput


def reducer(acc, data):
    if isinstance(acc, list):
        acc.append(data)
    else:
        acc = [acc, data]
    return acc


def is_complete(acc):
    if not isinstance(acc, list):
        return False
    sequence_number = acc[-1]
    return sequence_number % 5 == 0


flow = Dataflow()
flow.input("sequence_number", TestingInput(range(100)))
# Add a placeholder key for the stateful operator
flow.map(lambda x: ("ALL", x))
flow.reduce("accumulator", reducer, is_complete)
flow.output("stdout", StdOutput())

@Psykopear Psykopear added type:feature New feature request and removed needs triage New issue, needs triage labels Aug 18, 2023
@rabernat
Copy link
Contributor Author

Thanks so much for your reply.

having windowing operators able to work with any kind of orderable/comparable data type makes sense

Yes, exactly, this is exactly what I was getting at. But I have a completely functional workaround for now, so I would not consider this urgent.

Thanks for taking the time to help me understand the library better. It's very cool!

@davidselassie
Copy link
Contributor

Unfortunately this is not currently implemented in the general case; our windowing code requires using specifically datetimes.

Discussion

It's possible, but we'd have to move much of our windowing implementation from Rust into Python to allow for dynamic types in the relevant spots.

Also note, windowing by in a streaming system can't be totally divorced from system time because of late, missing, and out-of-order data. Because a stream could be infinite, you need to ask "should I wait longer for possible out-of-order data?" The way this is traditionally done is via some sort of system time lateness timeout: you delay the closing of a window (whatever that means, even if it's not based on datetimes) until e.g. 10 more seconds have passed to be able to incorporate any out-of-order data that should have been in that window.

Very inside baseball for this hypothetical implementation: This would be implemented via some new concept "logical watermark advancement"? You'd need a way of translating how the logical watermark of the clock should advance as system time marches forward, since the watermark is how you know a window is truly closed. The current event time clock (since the watermark is a datetime itself) just adds the system time duration to the old watermark.

You might already be familiar, but if you're interested in more info on timely streaming, Flink has some great visualizations on how these concepts work in their docs. Bytewax is not implemented in exactly the same way, but the concepts are the same.

Hacky Hacky Hack

You said you came up with some workaround, but to add possibly another: if you don't care about system latency (you'll only close a logical window once an item of the next window shows up, even if that takes forever in wall clock time) and can guarantee data will not be out-of-order (not possible if using multi-partitioned input), you could re-create a lightweight version of logical tumbling windowing using the stateful_map operator.

WINDOW_WIDTH = 5

flow.map(lambda item: (item.sequence_number % WINDOW_WIDTH, item))
flow.map(lambda win_item: ("ALL", win_item))

def builder():
    # Assumes that item.sequence_number > 0 so -1 will never be a window ID.
    return (-1, [])

def windower(win_l, win_item):
    current_win, l = win_l
    item_win, item = win_item
    if item_win > current_win:
        # Window just closed because we saw an item in the next window, setup the initial window and emit the old one.
        return (item_win, [item]), l
    else:
        # Window still open, emit nothing downstream
        l.append(item)
        return win_l, []

flow.stateful_map("logical_tumbling_windower", builder, windower)

If you want to handle partial out-of-order data, you could add in a watermark to this, but it gets complicated fast.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type:feature New feature request
Projects
None yet
Development

No branches or pull requests

3 participants