New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RowIterator+AsyncIterator are not cancellation safe #158
Comments
Thanks for pointing this out. I am not sure I understand it.
let config = ClientConfig::default();
let client = Client::new("projects/local-project/instances/test-instance/databases/local-database", config).await.unwrap();
let task = tokio::spawn(async move {
let key = Key::new(&"user_x_x");
let mut tx = client.read_only_transaction().await.unwrap();
let mut iterator = tx.read("User", &["UserId"], key).await.unwrap();
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(10));
loop {
tokio::select! {
_ = interval.tick() => {
tracing::info!("tick");
}
next_entry_res = iterator.next() => {
match next_entry_res.unwrap() {
Some(_row) => tracing::info!("handle row"),
None => break,
}
}
}
}
});
let _ = tokio::time::timeout(tokio::time::Duration::from_secs(2), task); |
The iterator itself doesn't get dropped, but the future returned by I think the problem is in
If you miss the error that put the inner state into |
Thank you for the detailed information. I was able to reproduce the phenomenon. google-cloud-rust/spanner/src/reader.rs Line 232 in 207422c
If |
I added |
👍 good stuff. In our case we're just going to assume nothing is cancellation safe. It's too sharp an edge to be load bearing for correctness. Our final solution used 'mainloop: loop {
let mut data_future = iterator.next();
'innerloop: loop {
let tick = timed_flush_check_interval.tick();
pin_mut!(tick);
match select(data_future, tick).await {
Either::Left((data, _)) => {
let status = process_the_data(data).await?;
if status == Status::Finished {
break 'mainloop;
};
break 'innerloop;
}
Either::Right((_, future)) => {
data_future = future;
}
}
things_that_need_to_run_periodically().await?;
}
} Thanks for your help on this. I have another bug incoming, but this will come as a PR. Session leakage. |
To repro:
At this point, the iterator should error and we return from the loop. However the client goes into a retry, giving the tick() future a chance to complete. the iterator.next() is cancelled and the error never returned to next_entry_res. It goes into an error state,but this error state always returns None for next() calls. This is particularly bad for change streams, because None indicates you've read everything from the stream, which may not be the case. So this loops forever.
The text was updated successfully, but these errors were encountered: