Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suggestion: Provides test suite for ensuring the storage implementation is correct #538

Open
jopemachine opened this issue Mar 11, 2024 · 1 comment

Comments

@jopemachine
Copy link
Contributor

jopemachine commented Mar 11, 2024

I am currently developing a high-level Raft framework based on raft-rs, as shown below.
https://github.com/lablup/raftify

One thing I've realized while developing this framework is that even a slightly incorrect storage implementation can lead to totally unexpected bugs in certain scenarios.

It seems like openraft provides a test suite that allows the storage developers to test their own custom storage, as shown in this link to prevent such situations.

I think passing these test suite can alleviate the anxiety storage developers might have about encountering unforeseen errors in specific scenarios.

Documentation about this:
https://docs.rs/openraft/latest/openraft/docs/getting_started/index.html#ensure-the-implementation-of-raftstorage-is-correct

I think it would be very helpful if raft-rs provide a similar test suite if possible.

@BusyJay
Copy link
Member

BusyJay commented Mar 11, 2024

Good idea! One approach is to adapt the implementations in

raft-rs/src/storage.rs

Lines 522 to 813 in 65a0062

#[cfg(test)]
mod test {
use std::panic::{self, AssertUnwindSafe};
use protobuf::Message as PbMessage;
use crate::eraftpb::{ConfState, Entry, Snapshot};
use crate::errors::{Error as RaftError, StorageError};
use super::{GetEntriesContext, MemStorage, Storage};
fn new_entry(index: u64, term: u64) -> Entry {
let mut e = Entry::default();
e.term = term;
e.index = index;
e
}
fn size_of<T: PbMessage>(m: &T) -> u32 {
m.compute_size()
}
fn new_snapshot(index: u64, term: u64, voters: Vec<u64>) -> Snapshot {
let mut s = Snapshot::default();
s.mut_metadata().index = index;
s.mut_metadata().term = term;
s.mut_metadata().mut_conf_state().voters = voters;
s
}
#[test]
fn test_storage_term() {
let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
let mut tests = vec![
(2, Err(RaftError::Store(StorageError::Compacted))),
(3, Ok(3)),
(4, Ok(4)),
(5, Ok(5)),
(6, Err(RaftError::Store(StorageError::Unavailable))),
];
for (i, (idx, wterm)) in tests.drain(..).enumerate() {
let storage = MemStorage::new();
storage.wl().entries = ents.clone();
let t = storage.term(idx);
if t != wterm {
panic!("#{}: expect res {:?}, got {:?}", i, wterm, t);
}
}
}
#[test]
fn test_storage_entries() {
let ents = vec![
new_entry(3, 3),
new_entry(4, 4),
new_entry(5, 5),
new_entry(6, 6),
];
let max_u64 = u64::max_value();
let mut tests = vec![
(
2,
6,
max_u64,
Err(RaftError::Store(StorageError::Compacted)),
),
(3, 4, max_u64, Ok(vec![new_entry(3, 3)])),
(4, 5, max_u64, Ok(vec![new_entry(4, 4)])),
(4, 6, max_u64, Ok(vec![new_entry(4, 4), new_entry(5, 5)])),
(
4,
7,
max_u64,
Ok(vec![new_entry(4, 4), new_entry(5, 5), new_entry(6, 6)]),
),
// even if maxsize is zero, the first entry should be returned
(4, 7, 0, Ok(vec![new_entry(4, 4)])),
// limit to 2
(
4,
7,
u64::from(size_of(&ents[1]) + size_of(&ents[2])),
Ok(vec![new_entry(4, 4), new_entry(5, 5)]),
),
(
4,
7,
u64::from(size_of(&ents[1]) + size_of(&ents[2]) + size_of(&ents[3]) / 2),
Ok(vec![new_entry(4, 4), new_entry(5, 5)]),
),
(
4,
7,
u64::from(size_of(&ents[1]) + size_of(&ents[2]) + size_of(&ents[3]) - 1),
Ok(vec![new_entry(4, 4), new_entry(5, 5)]),
),
// all
(
4,
7,
u64::from(size_of(&ents[1]) + size_of(&ents[2]) + size_of(&ents[3])),
Ok(vec![new_entry(4, 4), new_entry(5, 5), new_entry(6, 6)]),
),
];
for (i, (lo, hi, maxsize, wentries)) in tests.drain(..).enumerate() {
let storage = MemStorage::new();
storage.wl().entries = ents.clone();
let e = storage.entries(lo, hi, maxsize, GetEntriesContext::empty(false));
if e != wentries {
panic!("#{}: expect entries {:?}, got {:?}", i, wentries, e);
}
}
}
#[test]
fn test_storage_last_index() {
let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
let storage = MemStorage::new();
storage.wl().entries = ents;
let wresult = Ok(5);
let result = storage.last_index();
if result != wresult {
panic!("want {:?}, got {:?}", wresult, result);
}
storage.wl().append(&[new_entry(6, 5)]).unwrap();
let wresult = Ok(6);
let result = storage.last_index();
if result != wresult {
panic!("want {:?}, got {:?}", wresult, result);
}
}
#[test]
fn test_storage_first_index() {
let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
let storage = MemStorage::new();
storage.wl().entries = ents;
assert_eq!(storage.first_index(), Ok(3));
storage.wl().compact(4).unwrap();
assert_eq!(storage.first_index(), Ok(4));
}
#[test]
fn test_storage_compact() {
let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
let mut tests = vec![(2, 3, 3, 3), (3, 3, 3, 3), (4, 4, 4, 2), (5, 5, 5, 1)];
for (i, (idx, windex, wterm, wlen)) in tests.drain(..).enumerate() {
let storage = MemStorage::new();
storage.wl().entries = ents.clone();
storage.wl().compact(idx).unwrap();
let index = storage.first_index().unwrap();
if index != windex {
panic!("#{}: want {}, index {}", i, windex, index);
}
let term = if let Ok(v) =
storage.entries(index, index + 1, 1, GetEntriesContext::empty(false))
{
v.first().map_or(0, |e| e.term)
} else {
0
};
if term != wterm {
panic!("#{}: want {}, term {}", i, wterm, term);
}
let last = storage.last_index().unwrap();
let len = storage
.entries(index, last + 1, 100, GetEntriesContext::empty(false))
.unwrap()
.len();
if len != wlen {
panic!("#{}: want {}, term {}", i, wlen, len);
}
}
}
#[test]
fn test_storage_create_snapshot() {
let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
let nodes = vec![1, 2, 3];
let mut conf_state = ConfState::default();
conf_state.voters = nodes.clone();
let unavailable = Err(RaftError::Store(
StorageError::SnapshotTemporarilyUnavailable,
));
let mut tests = vec![
(4, Ok(new_snapshot(4, 4, nodes.clone())), 0),
(5, Ok(new_snapshot(5, 5, nodes.clone())), 5),
(5, Ok(new_snapshot(6, 5, nodes)), 6),
(5, unavailable, 6),
];
for (i, (idx, wresult, windex)) in tests.drain(..).enumerate() {
let storage = MemStorage::new();
storage.wl().entries = ents.clone();
storage.wl().raft_state.hard_state.commit = idx;
storage.wl().raft_state.hard_state.term = idx;
storage.wl().raft_state.conf_state = conf_state.clone();
if wresult.is_err() {
storage.wl().trigger_snap_unavailable();
}
let result = storage.snapshot(windex, 0);
if result != wresult {
panic!("#{}: want {:?}, got {:?}", i, wresult, result);
}
}
}
#[test]
fn test_storage_append() {
let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)];
let mut tests = vec![
(
vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)],
Some(vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)]),
),
(
vec![new_entry(3, 3), new_entry(4, 6), new_entry(5, 6)],
Some(vec![new_entry(3, 3), new_entry(4, 6), new_entry(5, 6)]),
),
(
vec![
new_entry(3, 3),
new_entry(4, 4),
new_entry(5, 5),
new_entry(6, 5),
],
Some(vec![
new_entry(3, 3),
new_entry(4, 4),
new_entry(5, 5),
new_entry(6, 5),
]),
),
// overwrite compacted raft logs is not allowed
(
vec![new_entry(2, 3), new_entry(3, 3), new_entry(4, 5)],
None,
),
// truncate the existing entries and append
(
vec![new_entry(4, 5)],
Some(vec![new_entry(3, 3), new_entry(4, 5)]),
),
// direct append
(
vec![new_entry(6, 6)],
Some(vec![
new_entry(3, 3),
new_entry(4, 4),
new_entry(5, 5),
new_entry(6, 6),
]),
),
];
for (i, (entries, wentries)) in tests.drain(..).enumerate() {
let storage = MemStorage::new();
storage.wl().entries = ents.clone();
let res = panic::catch_unwind(AssertUnwindSafe(|| storage.wl().append(&entries)));
if let Some(wentries) = wentries {
let _ = res.unwrap();
let e = &storage.wl().entries;
if *e != wentries {
panic!("#{}: want {:?}, entries {:?}", i, wentries, e);
}
} else {
res.unwrap_err();
}
}
}
#[test]
fn test_storage_apply_snapshot() {
let nodes = vec![1, 2, 3];
let storage = MemStorage::new();
// Apply snapshot successfully
let snap = new_snapshot(4, 4, nodes.clone());
storage.wl().apply_snapshot(snap).unwrap();
// Apply snapshot fails due to StorageError::SnapshotOutOfDate
let snap = new_snapshot(3, 3, nodes);
storage.wl().apply_snapshot(snap).unwrap_err();
}
}

to accept arbitrary engine. Actually, TiKV itself adapts these cases to test its own PeerStorage in the first version.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants