Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The current read index mechanism can't update max_ts #16823

Open
CalvinNeo opened this issue Apr 12, 2024 · 1 comment
Open

The current read index mechanism can't update max_ts #16823

CalvinNeo opened this issue Apr 12, 2024 · 1 comment
Labels
type/question Type: Issue - Question

Comments

@CalvinNeo
Copy link
Member

CalvinNeo commented Apr 12, 2024

Bug Report

What version of TiKV are you using?

ALL

What operating system and CPU are you using?

ALL

Steps to reproduce

Run under kv_service.rs

use kvproto::tikvpb::*;
use kvproto::kvrpcpb::*;
use kvproto::kvrpcpb::KeyRange;
use kvproto::coprocessor::*;
use kvproto::raft_cmdpb::CmdType;
use kvproto::raft_cmdpb::RaftRequestHeader;
use kvproto::raft_cmdpb::Request as RaftRequest;
use kvproto::raft_cmdpb::RaftCmdRequest;
use tokio::sync::oneshot;
use raftstore::store::RaftCmdExtraOpts;
use raftstore::store::msg::Callback;
use raftstore::router::RaftStoreRouter;

#[test]
fn test_read_index_check_memory_locks() {
    let mut cluster = new_server_cluster(0, 1);
    cluster.run();

    let cm = cluster.sim.read().unwrap().get_concurrency_manager(1);
    let keys: Vec<_> = vec![b"k", b"l"]
        .into_iter()
        .map(|k| Key::from_raw(k))
        .collect();
    let guards = block_on(cm.lock_keys(keys.iter()));
    let lock = Lock::new(
        LockType::Put,
        b"k".to_vec(),
        1.into(),
        20000,
        None,
        1.into(),
        1,
        2.into(),
        false,
    );
    guards[0].with_lock(|l| *l = Some(lock.clone()));

    let region = cluster.get_region(b"");
    let leader = region.get_peers()[0].clone();
    let addr = cluster.sim.rl().get_addr(leader.get_store_id()).to_owned();

    let env = Arc::new(Environment::new(1));
    let channel = ChannelBuilder::new(env).connect(&addr);
    let client = TikvClient::new(channel);

    let mut ctx = Context::default();
    let region_id = leader.get_id();
    ctx.set_region_id(leader.get_id());
    ctx.set_region_epoch(region.get_region_epoch().clone());
    ctx.set_peer(leader);

    let read_index = |ranges: &[(&[u8], &[u8])]| {
        let start_ts = block_on(cluster.pd_client.get_tso()).unwrap();
        // let resp = client.read_index(&req).unwrap();

        let mut cmd = RaftCmdRequest::default();
        {
            let mut header = RaftRequestHeader::default();
            let mut inner_req = RaftRequest::default();
            inner_req.set_cmd_type(CmdType::ReadIndex);
            inner_req.mut_read_index().set_start_ts(start_ts.into_inner());

            let mut req = ReadIndexRequest::default();
            let start_ts = block_on(cluster.pd_client.get_tso()).unwrap();
            req.set_context(ctx.clone());
            req.set_start_ts(start_ts.into_inner());
            for &(start_key, end_key) in ranges {
                let mut range = KeyRange::default();
                range.set_start_key(start_key.to_vec());
                range.set_end_key(end_key.to_vec());
                req.mut_ranges().push(range);
            }

            header.set_region_id(region_id);
            header.set_peer(req.get_context().get_peer().clone());
            header.set_region_epoch(req.get_context().get_region_epoch().clone());
            cmd.set_header(header);
            cmd.set_requests(vec![inner_req].into());
        }

        let (result_tx, result_rx) = oneshot::channel();
        let router = cluster.get_router(1).unwrap();
        if let Err(e) = router.send_command(
            cmd,
            Callback::read(Box::new(move |resp| {
                result_tx.send(resp.response).unwrap();
            })),
            RaftCmdExtraOpts {
                deadline: None,
                disk_full_opt: DiskFullOpt::AllowedOnAlmostFull,
            },
        ) {
            panic!(
                "router send msg failed, error: {}",
                e
            );
        }
        
        let resp = block_on(result_rx).unwrap();
        (resp.get_responses()[0].get_read_index().clone(), start_ts)
    };

    // wait a while until the node updates its own max ts
    std::thread::sleep(Duration::from_millis(300));

    let prev_cm_max_ts = cm.max_ts();
    let (resp, start_ts) = read_index(&[(b"l", b"yz")]);
    assert!(!resp.has_locked());
    // Actually not changed
    assert_ne!(cm.max_ts(), prev_cm_max_ts);
    assert_eq!(cm.max_ts(), start_ts);

    let (resp, start_ts) = read_index(&[(b"a", b"b"), (b"j", b"k0")]);
    assert_eq!(resp.get_locked(), &lock.into_lock_info(b"k".to_vec()));
    assert_eq!(cm.max_ts(), start_ts);

    drop(guards);

    let (resp, start_ts) = read_index(&[(b"a", b"z")]);
    assert!(!resp.has_locked());
    assert_eq!(cm.max_ts(), start_ts);
}

What did you expect?

Expected the ut can pass, because read index mechanism can update max_ts.

What did happened?

However not.

@CalvinNeo CalvinNeo added the type/bug Type: Issue - Confirmed a bug label Apr 12, 2024
@cfzjywxk
Copy link
Collaborator

The max_ts is not supposed to be updated during raft command processing, and the read_index RPC interface is deprecated since #13832

@cfzjywxk cfzjywxk added type/question Type: Issue - Question and removed type/bug Type: Issue - Confirmed a bug labels Apr 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/question Type: Issue - Question
Projects
None yet
Development

No branches or pull requests

2 participants