Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Advanced recovery - part one #460

Open
wants to merge 13 commits into
base: feature/advanced_recovery
Choose a base branch
from

Conversation

Psykopear
Copy link
Contributor

First PR for the advanced recovery proposal.
I'm using a feature/advanced_recovery branch as base so we can review this first part and then work on the rest on a separate PR, as this is already quite big on its own.

Introduces the StateStore as the way stateful operators interact with their own state.

All the state is now stored in the worker global store that keeps the state in memory, and saves the snapshots to a local db.

Snapshots from all the operators are then passed to the revoery section of the dataflow that uses the Backup class to save state segments of each epoch to a durable storage.

This PR doesn't include:

  • Unloading of the in memory state to the local db to handle larger than memory state
  • Full resume (on rescale, or on problems with local db for fast resume)
  • Compaction
  • Garbage collection

Copy link

codspeed-hq bot commented May 9, 2024

CodSpeed Performance Report

Merging #460 will not alter performance

Comparing advanced_recovery_part_one (108fd34) with main (3591000)

Summary

✅ 6 untouched benchmarks

@Psykopear Psykopear marked this pull request as draft May 9, 2024 13:32
src/recovery.rs Outdated Show resolved Hide resolved
@Psykopear Psykopear marked this pull request as ready for review May 15, 2024 14:05
Copy link
Contributor

@davidselassie davidselassie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a half-finished review, but I wanted to get some comments out there but will have more later. I'm trying to think through ways of clarifying the different layers of this implementation. It feels like there should be some ways to better isolate the steps and levels of state.

@@ -42,4 +32,3 @@ def _parse_args():

if __name__ == "__main__":
args = _parse_args()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need any of this module's script anymore.

pysrc/bytewax/run.py Outdated Show resolved Hide resolved
@@ -220,12 +217,28 @@ def _create_arg_parser():
"-r",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to highlight line 214 above but it won't let me: Link to https://docs.bytewax.io/stable/guide/concepts/recovery.html since that's what will have recovery info, not the module docstring anymore. Yeah the docs haven't been updated and that's fine, but we'll eventually do that.

@@ -220,12 +217,28 @@ def _create_arg_parser():
"-r",
"--recovery-directory",
type=Path,
help="""Local file system directory to look for pre-initialized recovery
partitions; see `python -m bytewax.recovery` for how to init partitions""",
help="""Local file system directory to use to save recovery partitions""",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mention that the default for this value is a tmp dir?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did put the default value there, but was planning on removing it before merging into main, as choosing a default value is not that easy given the dataflow could be run in a different number of environments

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't our current design require some on-disk directory by default? I also would say that having access to a RW on-disk temp directory is an ok operational requirement and don't think it'll cause that much trouble.

@@ -220,12 +217,28 @@ def _create_arg_parser():
"-r",
"--recovery-directory",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we maybe want to rename this now to something like -d / --local-state-dir or --local-store-dir? Eventually we're going to need to be pretty careful about being explicit between "local store" and "durable store" to not confuse people. I'd like to phase out using "recovery store" to ensure that we know where we need to update the docs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will change this

@@ -97,6 +658,13 @@ pub(crate) struct PartitionMeta(PartitionIndex, PartitionCount);
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub(crate) struct ExecutionNumber(u64);

impl ExecutionNumber {
pub(crate) fn next(mut self) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's going on here with mut self? Why is this not return Self(self.0 + 1)? Is there some benefit to mutating the execution in-place? Makes me nervous about accidentally mutating an already existing ex number.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used mut self so that the function consumes the struct to make sure the old execution number is not used again by mistake, but I'm ok with returning a separate instance too, we still need to be careful either way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't consuming the old struct to not be used again just be plain self? I guess the mut self or self takes ownership so the calling API still gives you what you want. mut self implies some special trickery is going on, but since all the types are copy to me this seems like a distraction.

#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize, FromPyObject)]
pub(crate) struct PartitionCount(usize);
/// Represents the possible types of state that the `StateStore` can hold.
pub(crate) enum StatefulLogicKind {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like an inversion of abstraction: bringing the kinds of operators down into the state store rather than wrapping the functionality. It seems like the core requirement here is needing to be able to know how to snapshot and hydrate any state (which requires closing over operator parameters). Also it means that to follow how the operators work you have to ping-pong back between logic kind code, state manager code, state store code.

Is there a way to get rid of this and instead re-use the hydration callback? Like we assume that all state PyObjects have .snapshot() but then something like an operator "registers" into a HashMap<StepId, PyCallable> (the callable would have the signature def build(resume_state: Any) -> Any "here's how to hydrate state for this step-id" callback that then the state store knows how to use whenever necessary. Then the API for the state store is just get and set PyObject and doesn't need to know about possible operator types.

I think, in general, there's some ways to leverage the dynamism of Python when it makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opted for this to keep using the Rust structures that wraps the python objects inside the operators (to use the various next_batch, write_batch etc methods), and to avoid having to either use call_method without type checking, or do the extract each time I needed to use the methods. But I can find a way to make it work with raw PyObjects in the state and remove the enum. I'll work on this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type checking in this area is duck / protocol-based anyway (even though yes, we have a few concrete types which implement snapshot / builder, they don't actually share a typing relationship). So I don't think adding concrete types here really represents the Python-side behavior anyway.

Another option, but I don't know that it is worth the complexity would be to define a Python-side typing.Protocol for having snapshot and produce a Rust-side typecheck class for it. But then you still don't have encoded the builder side. So that'd be a... another protocol? But you aren't coordinating across the two. You can't combine them to one because the need to close over runtime config from the Dataflow itself... If you want to go that direction you could try it out. I think doing a Stateful protocol and leaving the builder as Callable would probably be the right work:payoff.

/// Use this to manage a single recovery segment.
/// This is intended to be used only once, so the struct consumes itself
/// whenever you do the write to the db.
pub(crate) struct Segment {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking was that the operations on a segment and the local store were identical and thus didn't need to be separate types with separate implementations. It might make sense to move some of the "when do I need to open a DB, segment number, file naming" logic up a level so this doesn't need to be repeated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, makes sense, will work on that

/// Trait to group common operations that are used in all stateful operators.
/// It just needs to know how to borrow the StateStore and how to retrieve
/// the step_id to manipulate state and local store.
pub(crate) trait StateManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused by the purpose of this trait. I think the idea is to wrap up the concept of the step ID into a struct so you don't need to remember to use that in each operator, but I think that makes sense more as a struct and not a trait. Especially if you buy into the idea of using PyObject and hydrate as the interface, as in the other comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote this trait to gather all the common operations between state objects basically, but yeah I'll do the switch to PyObjects and hydrate interface, so I'll see how it turns out with a concrete struct here

self.borrow_state()
.snap(py, self.step_id().clone(), key, epoch)
}
fn write_snapshots(&self, snaps: Vec<SerializedSnapshot>) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this bulk write, start_at at this level? Shouldn't the operator interface not need those things?

Co-authored-by: David Selassie <david@bytewax.io>
Signed-off-by: Federico Dolce <psykopear@gmail.com>
conn.pragma_update(None, "journal_mode", "WAL").unwrap();
conn.pragma_update(None, "busy_timeout", "5000").unwrap();
get_migrations(py).to_latest(&mut conn).unwrap();
pub(crate) trait WriteFrontiersOp<S>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking was that this WriteFrontierOp, CompactFrontierOp, and CompactorOp all are actually the same operator because they take from upstream a union of snapshots and items which contain frontier updates. The upstream is a change log of things that could happen to the DB. This also means this operator can be re-used for the full resume dataflow.

The same operation happens in all the contexts a compactor operator is used: The operator is managing a specific SQLite DB path. Upstream is (something like) Stream<SnapshotOrFrontier>, downstream is (something like) Stream<PathBuf>. On receiving a snapshot from upstream, write it to the current DB file. On receiving a frontier from upstream, write the new frontier to the current DB file, GC state. Only emit downstream the original filename if the file is swapped out (described below).

Then optionally for the "snapshot segment compactor" and the "frontier segment compactor", it can swap out to a new file. The original file name is emitted downstream for the backup operator to actually copy based on what kind of swap mode the operator is in. This would mean that the init for this operator either takes a specific DB file path (for retained / "don't swap" mode) or a directory (for batch or immediate / "swap and emit old filename" mode). The swap happens depending on this mode: either never, at the end of the epoch, or after each batch of upstream items.

Some of the contexts we use compactor, we need to monitor write progress of the compaction, even though there's no swapping out of the underlying DB (e.g. "local store compactor"). Any probes or downstream operators still see progress messages even if no paths are emitted.

I did not describe this process correctly in the proposal because I didn't think about the interaction between backup and compactor. I've PR'd a clarify the process. https://github.com/bytewax/proposals/pull/2 It just repeats what I posted here, but might be a little easier to understand.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think I forgot something: the "snapshot segment compactor" shouldn't write frontier updates because that would be advancing the frontier used for a full resume before all backup is complete. Let me think more about this tomorrow. Sorry.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the first comment in this thread and the proposal PR to reflect what I think makes sense.

Copy link
Contributor

@davidselassie davidselassie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these are the last of my comments for this pass. Thanks for working on this. Let me know once you have a version with the snapshot / builder design working and we can look over it again.

where
S: Scope;
}
fn durable_backup(&self, backup: Backup, immediate_backup: bool) -> ClockStream<S> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the backup operator also need to know the snapshot mode? All the complexity is in the "did we make a new segment" which is encapsulated in the compactor operator(s?). I think if this operator gets a segment filename, it backs it up.

backup_interval: BackupInterval,
backup: Backup,
#[pyo3(get)]
pub(crate) batch_backup: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make an enum for this setting? SnapshotMode::{Batch,Immediate}? Some components take a batch_backup and others take an immediate_backup and they're opposite each other, and it's easy to forget a !. An enum would help with understanding the interpretation.

where
S: Scope;
}
fn durable_backup(&self, backup: Backup, immediate_backup: bool) -> ClockStream<S> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, there's no "local backup", so I think it's fine to call this just backup.

S: Scope;
}
fn durable_backup(&self, backup: Backup, immediate_backup: bool) -> ClockStream<S> {
let mut op_builder = OperatorBuilder::new("compactor".to_string(), self.scope());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let mut op_builder = OperatorBuilder::new("compactor".to_string(), self.scope());
let mut op_builder = OperatorBuilder::new("backup".to_string(), self.scope());

// TODO: This doesn't really need an output, but using `sink` I can't
// probe the dataflow externally, so it would probably need its own
// custom operator.
.unary_frontier(Pipeline, "get_max_epoch", |cap: Capability<u64>, _info| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know that a custom operator is needed here. You should be able to close over cluster_resume_epoch in a map step and keep the max there, then send clock downstream. The worker will pick the max value out of that when the probe indicates the dataflow is complete.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants