Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: new API RaftStorage::append_to_log_cb() #735

Open
Tracked by #724
drmingdrmer opened this issue Mar 22, 2023 · 12 comments
Open
Tracked by #724

Feature: new API RaftStorage::append_to_log_cb() #735

drmingdrmer opened this issue Mar 22, 2023 · 12 comments

Comments

@drmingdrmer
Copy link
Member

drmingdrmer commented Mar 22, 2023

Updated:

    /// Nonblocking version of `append_to_log`.
    ///
    /// To ensure correctness:
    ///
    /// - When this method returns, the entries are guaranteed to be readable, i.e., a `LogReader`
    ///   can read these entries.
    ///
    /// - There must not be a **hole** in logs. Because Raft only examine the last log id to ensure
    ///   correctness.
    ///
    /// But it does **NOT** have to guarantee that the entries are persisted on disk before
    /// returning. Instead, an implementation should calls the `callback` when the entries are
    /// persisted on disk. The `callback` can be called either before or after this method returns.
    async fn append_to_log_cb(
        &mut self,
        entries: &[C::Entry],
        flush: FlushLog<C::NodeId>,
    ) -> Result<(), StorageError<C::NodeId>> {
        let last = entries.last().map(|e| *e.get_log_id());

        /// Default implementation that calls the flush-before-return `append_to_log`.
        self.append_to_log(entries).await?;
        flush.log_io_completed(Ok(last));

        Ok(())
    }

Original:

To parallelize the local store IO and network IO, appending log entries should provide the capability to return at once after the entries are readable, rather than waiting for them to be persisted.

This task can be done with a new RaftStorage method append_to_log_cb():

    /// Nonblocking version of `append_to_log`.
    ///
    /// To ensure correctness:
    ///
    /// - When this method returns, the entries are guaranteed to be readable, i.e., a `LogReader`
    ///   can read these entries.
    ///
    /// - There must not be a **hole** in logs. Because Raft only examine the last log id to ensure
    ///   correctness.
    ///
    /// But it does **NOT** have to guarantee that the entries are persisted on disk before
    /// returning. Instead, an implementation should calls the `callback` when the entries are
    /// persisted on disk.
    async fn append_to_log_cb(
        &mut self,
        entries: &[C::Entry],
        callback: &Callback,
    ) -> Result<(), StorageError<C::NodeId>> {
        /// Default implementation that calls the flush-before-return `append_to_log`.
        self.append_to_log(entries).await?;
        callback.await;
        Ok(())
    }

Considerations

It is not proper to use a separate flush_log(upto_index) method for flushing log.

There could be additional IO operations waiting to be processed before calling flush_log(), after an appending-log operation, such as append-log 1,2,3, then truncate-log [2,+oo), then append-log 2,3,4. In such a case, flush_log(upto=3) may lead to ambiguity, and Openraft could operate on a incorrect state.

Something like fn flush_log(upto_index: u64, callback: LogIoComplete)?

Well, something like it, yes. So you'd call it immediately after append_to_log() and the default implementation would simply call the callback? And in our case, we'd just put the log into the buffer in append_to_log() and then schedule the I/O from the log buffer and call the callback after the I/O completed.

I'd pass the callback by reference, though, to prevent unnecessary refcounting on the channel. (Also, I'd document and guarantee that the callback passed is always the same one, so one could store an instance in the Storage impl and reuse it without further refcounting as an optimization.)

Originally posted by @schreter in #702 (comment)

@github-actions
Copy link

👋 Thanks for opening this issue!

Get help or engage by:

  • /help : to print help messages.
  • /assignme : to assign this issue to you.

@drmingdrmer drmingdrmer changed the title Feature: new API RaftStorage::flush_log Feature: new API RaftStorage::append_to_log_cb() Mar 23, 2023
@drmingdrmer
Copy link
Member Author

@schreter
What do you think about this new API?

@schreter
Copy link
Collaborator

schreter commented Mar 23, 2023

What do you think about this new API?

Seems good, except that the default implementation should call non-blocking callback.log_flushed_up_to() or so to send the notification instead of callback.await :-).

The Callback should ideally have interface like this:

#[derive(Clone)]
struct Callback { ... }
impl Callback {
    pub fn log_flushed_up_to(id: LogId);
}

This would indicate that everything including the id is now flushed to the disk. Having the ID allows for group commits, where multiple log calls written in a single I/O only call the callback once for the last one (and the other callbacks will be ignored). I think this is also in-line with the requirements of RaftCore/RaftEngine, since we basically need to advance the persisted log ID and don't strictly need to call the callback for each log call.

Clone support is needed, since if we need to delay the notification (normal case for async flush), then we obviously need to clone the callback (the reference is only valid during the call).

For code robustness, one could send an error to the engine if the drop of the last callback finds out that log I/O is still in progress (likely a panic somewhere in log handling code => Raft must be shut down).

BTW, ideally, it would be nice to get and clone the callback only once and/or guarantee that behind the callback is always the same object, so it can be clone only once and stored somewhere, but I don't know how we should express this on the API, so we can address that later (also based on measurements - probably it's not that relevant).

@drmingdrmer
Copy link
Member Author

Below is the upgraded code:

    /// Nonblocking version of `append_to_log`.
    ///
    /// To ensure correctness:
    ///
    /// - When this method returns, the entries are guaranteed to be readable, i.e., a `LogReader`
    ///   can read these entries.
    ///
    /// - There must not be a **hole** in logs. Because Raft only examine the last log id to ensure
    ///   correctness.
    ///
    /// But it does **NOT** have to guarantee that the entries are persisted on disk before
    /// returning. Instead, an implementation should calls the `callback` when the entries are
    /// persisted on disk. The `callback` can be called either before or after this method returns.
    async fn append_to_log_cb(
        &mut self,
        entries: &[C::Entry],
        callback: LogFlushed<C::NodeId>,
    ) -> Result<(), StorageError<C::NodeId>> {
        let last = entries.last().map(|e| *e.get_log_id());

        /// Default implementation that calls the flush-before-return `append_to_log`.
        self.append_to_log(entries).await?;
        callback.log_io_completed(Ok(last));

        Ok(())
    }

Clone support is needed, since if we need to delay the notification (normal case for async flush), then we obviously need to clone the callback (the reference is only valid during the call).

I'm afraid it's not easy to group commit at the storage API level, and not easy to let RaftStorage save a cloned callback for future use.
As I said, the raft logs can be truncated and re-appended. One single log-id or log index does not reflect what IO operation is completed. Hence, the callback must be a one-shot sender, implying that a single IO operation is completed.

@schreter
Copy link
Collaborator

I'm afraid it's not easy to group commit at the storage API level, and not easy to let RaftStorage save a cloned callback for future use.

Hm, maybe not for future use, but at least drop callbacks "in-between" and then do a single confirmation at the end.

As I said, the raft logs can be truncated and re-appended. One single log-id or log index does not reflect what IO operation is completed.

Log index probably not, but log ID containing the term does. The truncation will necessarily require a new term (or term + leader). Therefore, it should be sufficient. If you get a confirmation for the old term, then confirm only up to the truncation location. A new append will be sent for a new term to append logs from the truncated location, ideally after the truncation (I think it's fine to await the truncation, since that should be an exceptional case).

Hence, the callback must be a one-shot sender, implying that a single IO operation is completed.

Let's see how the implementation will look like. I don't see how a oneshot channel will help you, since it needs to be explicitly awaited somewhere.

Today, we have a command AppendInputEntries which triggers log append. This command would get another parameter for the callback and pass it to append_to_log(). My idea was that the callback basically boils down to scheduling a new command AppendDone or so, with the log ID which was appended last, effectively updating the knowledge of the persisted log index in a delayed way. A new counter/index will be required telling which entry was sent to the log last, but not yet persisted (instead of awaiting log persistence) to not send the same log entry twice.

The callback would be effectively just a thin wrapper over Raft::tx_api channel, sending an appropriate RaftMsg updating the engine state.

And, since there is a single tx_api instance in existence, it's in principle possible to prevent frequent refcounting by keeping the instance in log component (but that's a microoptimization, we have a lot of optimization potential elsewhere).

@drmingdrmer
Copy link
Member Author

In my opinion, having a sender inside a RaftStorage implementation is alternative possible approach. But it still relies on the assumption that IO progress is a linear value:

Log index probably not, but log ID containing the term does. The truncation will necessarily require a new term (or term + leader). Therefore, it should be sufficient. If you get a confirmation for the old term, then confirm only up to the truncation location. A new append will be sent for a new term to append logs from the truncated location, ideally after the truncation (I think it's fine to await the truncation, since that should be an exceptional case).

Given 3 nodes, it is possible the 3 nodes has their local store as the following:

i-j: log in term-i at index-j

N1  1-1  2-2 4-3
N2  1-1
N3  1-1  3-2 5-3
--------------------------------------> log index

If N1 and N3 alternately elected as leader, they will replicate their local log at index 2 to N2.

  • N1 replicated 2-2 to N2;
  • N1 crashed;
  • N3 tried to replicate log entry 3-2 to N2; This replication action truncated log entry 2-2 from N2, then appended 3-2 to N2; N3 crashed;
  • N1 tried to replicate log entry 2-2 to N2; This replication action truncated log entry 3-2 from N2, then appended 2-2 to N2;
  • ...

In this case, the IO operations that are submitted on N2 will be:

  • append(2-2);
  • truncate(2-2); append(3-2);
  • truncate(3-2); append(2-2);
  • ...

Consequently, a particular log id (term:3,index:2) may be appended multiple times, and a callback complete(term=3,index=2) fails to accurately reflect the current status of the IO operation.

@drmingdrmer
Copy link
Member Author

Today, we have a command AppendInputEntries which triggers log append. This command would get another parameter for the callback and pass it to append_to_log(). My idea was that the callback basically boils down to scheduling a new command AppendDone or so, with the log ID which was appended last, effectively updating the knowledge of the persisted log index in a delayed way. A new counter/index will be required telling which entry was sent to the log last, but not yet persisted (instead of awaiting log persistence) to not send the same log entry twice.

You are right. Basically it is the big picture.

@schreter
Copy link
Collaborator

Uhm, the situation you are describing is IMHO impossible. How can N1 and N3 have different term for log index 2 and N2 not have this log entry persisted yet? That directly contradicts the quorum! N2 must already have either 2-2 or 3-2 persisted.

Say, N2 has 3-2. If N1+N2 restarts, then N2 "wins" with higher term and replicates 3-2 to N1, overwriting its 2-2. If N2+N3 restarts, then either N2 or N3 wins the election, but both already have 3-2.

If I understand it correctly, log ID at a certain index can only go "up", but never "down", i.e., the term can only increase, but never decrease (also in the optimized version with term + leader ID, this still holds with total ordering over term + leader ID).

Or am I missing something?

@drmingdrmer
Copy link
Member Author

Uhm, the situation you are describing is IMHO impossible. How can N1 and N3 have different term for log index 2 and N2 not have this log entry persisted yet? That directly contradicts the quorum! N2 must already have either 2-2 or 3-2 persisted.

Such a event sequence will build the situation mentioned above:

  • N1 elected as leader in term 2, proposed a blank log 2-2 and crashed.
  • N3 elected as leader in term 3, proposed a blank log 3-2 and crashed.
Ei:  elect in term-i
Vi:  grant the vote in term-i
Li:  establish leader in term-i
i-j: append a log in term-i at index-j
X:   crash

N1 E2      L2  2-2  X
N2     V2                  V3
N3                     E3      L3 3-2  X
-----------------------------------------------> time

@schreter
Copy link
Collaborator

Such a event sequence will build the situation mentioned above
[...]

OK, but then, it's only the question about orchestrating the truncation. The simplest solution on Raft side which comes to my mind is a local transient truncation counter, which is passed around in the callback. A callback called with an old truncation counter is simply ignored.

In this case, the log implementation has to orchestrate writes to the log properly. Say, there is a log write for some log before truncation ongoing, then the truncation is done and a new write with same log indexes is triggered. This needs to be handled properly on the log side by either delaying new log write until the old write is done or by doing shadow paging or something else - but not your concern.

Alternative would be to block the truncation until the log write up to the last log ID passed to the log is confirmed via the callback and only then proceed with the truncation. This way, there is no need for truncation counter and no need to implement anything special on the log side, but the code might be somewhat more complex to properly delay the truncation until log write finishes.

In any case, this is orthogonal to the technology used for the callback, since the truncation still needs to be handled properly somehow.

@drmingdrmer
Copy link
Member Author

drmingdrmer commented Mar 23, 2023

OK, but then, it's only the question about orchestrating the truncation. The simplest solution on Raft side which comes to my mind is a local transient truncation counter, which is passed around in the callback. A callback called with an old truncation counter is simply ignored.

Yes. If it is a multi-shot callback handle, it should not send a log id to report IO progress. Instead the IO progress should be measure by the WAL of the local store. An alternative solution is to assign a unique id to every IO operation.

@drmingdrmer
Copy link
Member Author

So, either the RaftStorage implementation should provide an IO operation counter or, at the very least, a truncation counter to send back to the callback.
Alternatively, openraft could pass such a counter to RaftStorage every time it submit a new IO operation. In this way, a new callback handle must be passed to append_to_log().

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants