-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Advanced recovery - part one #460
base: feature/advanced_recovery
Are you sure you want to change the base?
Conversation
CodSpeed Performance ReportMerging #460 will not alter performanceComparing Summary
|
297d9f8
to
108fd34
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a half-finished review, but I wanted to get some comments out there but will have more later. I'm trying to think through ways of clarifying the different layers of this implementation. It feels like there should be some ways to better isolate the steps and levels of state.
@@ -42,4 +32,3 @@ def _parse_args(): | |||
|
|||
if __name__ == "__main__": | |||
args = _parse_args() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't need any of this module's script anymore.
@@ -220,12 +217,28 @@ def _create_arg_parser(): | |||
"-r", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to highlight line 214 above but it won't let me: Link to https://docs.bytewax.io/stable/guide/concepts/recovery.html since that's what will have recovery info, not the module docstring anymore. Yeah the docs haven't been updated and that's fine, but we'll eventually do that.
@@ -220,12 +217,28 @@ def _create_arg_parser(): | |||
"-r", | |||
"--recovery-directory", | |||
type=Path, | |||
help="""Local file system directory to look for pre-initialized recovery | |||
partitions; see `python -m bytewax.recovery` for how to init partitions""", | |||
help="""Local file system directory to use to save recovery partitions""", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mention that the default for this value is a tmp dir?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did put the default value there, but was planning on removing it before merging into main, as choosing a default value is not that easy given the dataflow could be run in a different number of environments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't our current design require some on-disk directory by default? I also would say that having access to a RW on-disk temp directory is an ok operational requirement and don't think it'll cause that much trouble.
@@ -220,12 +217,28 @@ def _create_arg_parser(): | |||
"-r", | |||
"--recovery-directory", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we maybe want to rename this now to something like -d
/ --local-state-dir
or --local-store-dir
? Eventually we're going to need to be pretty careful about being explicit between "local store" and "durable store" to not confuse people. I'd like to phase out using "recovery store" to ensure that we know where we need to update the docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will change this
@@ -97,6 +658,13 @@ pub(crate) struct PartitionMeta(PartitionIndex, PartitionCount); | |||
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] | |||
pub(crate) struct ExecutionNumber(u64); | |||
|
|||
impl ExecutionNumber { | |||
pub(crate) fn next(mut self) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's going on here with mut self
? Why is this not return Self(self.0 + 1)
? Is there some benefit to mutating the execution in-place? Makes me nervous about accidentally mutating an already existing ex number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used mut self
so that the function consumes the struct to make sure the old execution number is not used again by mistake, but I'm ok with returning a separate instance too, we still need to be careful either way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't consuming the old struct to not be used again just be plain self
? I guess the mut self
or self
takes ownership so the calling API still gives you what you want. mut self
implies some special trickery is going on, but since all the types are copy to me this seems like a distraction.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize, FromPyObject)] | ||
pub(crate) struct PartitionCount(usize); | ||
/// Represents the possible types of state that the `StateStore` can hold. | ||
pub(crate) enum StatefulLogicKind { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels like an inversion of abstraction: bringing the kinds of operators down into the state store rather than wrapping the functionality. It seems like the core requirement here is needing to be able to know how to snapshot and hydrate any state (which requires closing over operator parameters). Also it means that to follow how the operators work you have to ping-pong back between logic kind code, state manager code, state store code.
Is there a way to get rid of this and instead re-use the hydration callback? Like we assume that all state PyObjects have .snapshot()
but then something like an operator "registers" into a HashMap<StepId, PyCallable>
(the callable would have the signature def build(resume_state: Any) -> Any
"here's how to hydrate state for this step-id" callback that then the state store knows how to use whenever necessary. Then the API for the state store is just get and set PyObject
and doesn't need to know about possible operator types.
I think, in general, there's some ways to leverage the dynamism of Python when it makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opted for this to keep using the Rust structures that wraps the python objects inside the operators (to use the various next_batch
, write_batch
etc methods), and to avoid having to either use call_method
without type checking, or do the extract
each time I needed to use the methods. But I can find a way to make it work with raw PyObjects
in the state and remove the enum. I'll work on this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The type checking in this area is duck / protocol-based anyway (even though yes, we have a few concrete types which implement snapshot
/ builder
, they don't actually share a typing relationship). So I don't think adding concrete types here really represents the Python-side behavior anyway.
Another option, but I don't know that it is worth the complexity would be to define a Python-side typing.Protocol
for having snapshot
and produce a Rust-side typecheck class for it. But then you still don't have encoded the builder
side. So that'd be a... another protocol? But you aren't coordinating across the two. You can't combine them to one because the need to close over runtime config from the Dataflow
itself... If you want to go that direction you could try it out. I think doing a Stateful
protocol and leaving the builder as Callable
would probably be the right work:payoff.
/// Use this to manage a single recovery segment. | ||
/// This is intended to be used only once, so the struct consumes itself | ||
/// whenever you do the write to the db. | ||
pub(crate) struct Segment { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thinking was that the operations on a segment and the local store were identical and thus didn't need to be separate types with separate implementations. It might make sense to move some of the "when do I need to open a DB, segment number, file naming" logic up a level so this doesn't need to be repeated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, makes sense, will work on that
/// Trait to group common operations that are used in all stateful operators. | ||
/// It just needs to know how to borrow the StateStore and how to retrieve | ||
/// the step_id to manipulate state and local store. | ||
pub(crate) trait StateManager { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little confused by the purpose of this trait. I think the idea is to wrap up the concept of the step ID into a struct so you don't need to remember to use that in each operator, but I think that makes sense more as a struct and not a trait. Especially if you buy into the idea of using PyObject
and hydrate as the interface, as in the other comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wrote this trait to gather all the common operations between state objects basically, but yeah I'll do the switch to PyObject
s and hydrate interface, so I'll see how it turns out with a concrete struct here
self.borrow_state() | ||
.snap(py, self.step_id().clone(), key, epoch) | ||
} | ||
fn write_snapshots(&self, snaps: Vec<SerializedSnapshot>) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of this bulk write, start_at
at this level? Shouldn't the operator interface not need those things?
Co-authored-by: David Selassie <david@bytewax.io> Signed-off-by: Federico Dolce <psykopear@gmail.com>
conn.pragma_update(None, "journal_mode", "WAL").unwrap(); | ||
conn.pragma_update(None, "busy_timeout", "5000").unwrap(); | ||
get_migrations(py).to_latest(&mut conn).unwrap(); | ||
pub(crate) trait WriteFrontiersOp<S> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thinking was that this WriteFrontierOp
, CompactFrontierOp
, and CompactorOp
all are actually the same operator because they take from upstream a union of snapshots and items which contain frontier updates. The upstream is a change log of things that could happen to the DB. This also means this operator can be re-used for the full resume dataflow.
The same operation happens in all the contexts a compactor
operator is used: The operator is managing a specific SQLite DB path. Upstream is (something like) Stream<SnapshotOrFrontier>
, downstream is (something like) Stream<PathBuf>
. On receiving a snapshot from upstream, write it to the current DB file. On receiving a frontier from upstream, write the new frontier to the current DB file, GC state. Only emit downstream the original filename if the file is swapped out (described below).
Then optionally for the "snapshot segment compactor" and the "frontier segment compactor", it can swap out to a new file. The original file name is emitted downstream for the backup operator to actually copy based on what kind of swap mode the operator is in. This would mean that the init for this operator either takes a specific DB file path (for retained / "don't swap" mode) or a directory (for batch or immediate / "swap and emit old filename" mode). The swap happens depending on this mode: either never, at the end of the epoch, or after each batch of upstream items.
Some of the contexts we use compactor
, we need to monitor write progress of the compaction, even though there's no swapping out of the underlying DB (e.g. "local store compactor"). Any probes or downstream operators still see progress messages even if no paths are emitted.
I did not describe this process correctly in the proposal because I didn't think about the interaction between backup
and compactor
. I've PR'd a clarify the process. https://github.com/bytewax/proposals/pull/2 It just repeats what I posted here, but might be a little easier to understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I think I forgot something: the "snapshot segment compactor" shouldn't write frontier updates because that would be advancing the frontier used for a full resume before all backup is complete. Let me think more about this tomorrow. Sorry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the first comment in this thread and the proposal PR to reflect what I think makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these are the last of my comments for this pass. Thanks for working on this. Let me know once you have a version with the snapshot / builder design working and we can look over it again.
where | ||
S: Scope; | ||
} | ||
fn durable_backup(&self, backup: Backup, immediate_backup: bool) -> ClockStream<S> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the backup operator also need to know the snapshot mode? All the complexity is in the "did we make a new segment" which is encapsulated in the compactor
operator(s?). I think if this operator gets a segment filename, it backs it up.
backup_interval: BackupInterval, | ||
backup: Backup, | ||
#[pyo3(get)] | ||
pub(crate) batch_backup: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we make an enum for this setting? SnapshotMode::{Batch,Immediate}
? Some components take a batch_backup
and others take an immediate_backup
and they're opposite each other, and it's easy to forget a !
. An enum would help with understanding the interpretation.
where | ||
S: Scope; | ||
} | ||
fn durable_backup(&self, backup: Backup, immediate_backup: bool) -> ClockStream<S> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, there's no "local backup", so I think it's fine to call this just backup
.
S: Scope; | ||
} | ||
fn durable_backup(&self, backup: Backup, immediate_backup: bool) -> ClockStream<S> { | ||
let mut op_builder = OperatorBuilder::new("compactor".to_string(), self.scope()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let mut op_builder = OperatorBuilder::new("compactor".to_string(), self.scope()); | |
let mut op_builder = OperatorBuilder::new("backup".to_string(), self.scope()); |
// TODO: This doesn't really need an output, but using `sink` I can't | ||
// probe the dataflow externally, so it would probably need its own | ||
// custom operator. | ||
.unary_frontier(Pipeline, "get_max_epoch", |cap: Capability<u64>, _info| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know that a custom operator is needed here. You should be able to close over cluster_resume_epoch
in a map
step and keep the max there, then send clock downstream. The worker will pick the max value out of that when the probe indicates the dataflow is complete.
First PR for the advanced recovery proposal.
I'm using a
feature/advanced_recovery
branch as base so we can review this first part and then work on the rest on a separate PR, as this is already quite big on its own.Introduces the
StateStore
as the way stateful operators interact with their own state.All the state is now stored in the worker global store that keeps the state in memory, and saves the snapshots to a local db.
Snapshots from all the operators are then passed to the revoery section of the dataflow that uses the
Backup
class to save state segments of each epoch to a durable storage.This PR doesn't include: