Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RowIterator+AsyncIterator are not cancellation safe #158

Open
ivankelly opened this issue Jun 1, 2023 · 5 comments · Fixed by #162
Open

RowIterator+AsyncIterator are not cancellation safe #158

ivankelly opened this issue Jun 1, 2023 · 5 comments · Fixed by #162

Comments

@ivankelly
Copy link
Contributor

To repro:

  • create a database and a change stream
  • query the change stream in a tokio::select! loop like
loop {
    tokio::select! {
        _ = some_tokio_interval.tick() => {
            do_periodic_task();
        },
       next_entry_res = iterator.next() => {
         next_entry_res?;
        }
    }
}
  • wait until it's getting heartbeats from the iterator
  • delete the change stream

At this point, the iterator should error and we return from the loop. However the client goes into a retry, giving the tick() future a chance to complete. the iterator.next() is cancelled and the error never returned to next_entry_res. It goes into an error state,but this error state always returns None for next() calls. This is particularly bad for change streams, because None indicates you've read everything from the stream, which may not be the case. So this loops forever.

@yoshidan
Copy link
Owner

yoshidan commented Jun 2, 2023

Thanks for pointing this out. I am not sure I understand it.

  • How can I drop the iterator in that code?
  • Normally, you would put in a code to break the result of next().
    let config = ClientConfig::default();
    let client = Client::new("projects/local-project/instances/test-instance/databases/local-database", config).await.unwrap();
    let task = tokio::spawn(async move {
        let key = Key::new(&"user_x_x");
        let mut tx = client.read_only_transaction().await.unwrap();
        let mut iterator = tx.read("User", &["UserId"], key).await.unwrap();
        let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(10));
        loop {
            tokio::select! {
                _ = interval.tick() => {
                    tracing::info!("tick");
                }
                next_entry_res = iterator.next() => {
                    match next_entry_res.unwrap() {
                        Some(_row) => tracing::info!("handle row"),
                        None => break,
                    }
                }
            }
        }
    });
    let _ = tokio::time::timeout(tokio::time::Duration::from_secs(2), task);

@ivankelly
Copy link
Contributor Author

The iterator itself doesn't get dropped, but the future returned by iterator.next() can be dropped if iterval.tick complete first. I don't think this is normally a problem because the iterator is mostly cancellation safe. But if you get a error from spanner for a future that is dropped, that never bubbles up, and then the next call to iterator.next returns with a None which looks like a normal partition completion (when using change streams).

I think the problem is in try_recv, which calls self.streaming.message().await which calls poll_next(),

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {                                                                                                                                                                                
        loop {                                                                                                                                                                                                                                                                
            if let State::Error = &self.inner.state {                                                                                                                                                                                                                         
                return Poll::Ready(None);                                                                                                                                                                                                                                     
            }           

If you miss the error that put the inner state into State::Error, you'll get None forever. This may not even be a spanner client issue. It could well be tonic.

@yoshidan
Copy link
Owner

yoshidan commented Jun 9, 2023

Thank you for the detailed information. I was able to reproduce the phenomenon.
When a future is dropped using tokio::select, if an error is generated by streaming.message() in try_recv, then self.reader.read will remain in Pending state and the process will not be completed.

let result = self.reader.read(self.session, option).await?;

If try_recv was called in another future, such as when a tick was terminated, streaming.message() would always return None.

@yoshidan
Copy link
Owner

I added enable_resume option to QueryOptions. If this is set to false and query is executed, the RowIterator will be cancel safe because it will not be automatically resumed.

#162

@ivankelly
Copy link
Contributor Author

👍 good stuff. In our case we're just going to assume nothing is cancellation safe. It's too sharp an edge to be load bearing for correctness. Our final solution used futures::future::select: https://docs.rs/futures/latest/futures/future/fn.select.html. It's ugly as hell but it's safe.

'mainloop: loop {
    let mut data_future = iterator.next();
    'innerloop: loop {
        let tick = timed_flush_check_interval.tick();
        pin_mut!(tick);
        match select(data_future, tick).await {
            Either::Left((data, _)) => {
                let status = process_the_data(data).await?;
                if status == Status::Finished {
                    break 'mainloop;
                };
                break 'innerloop;
            }
            Either::Right((_, future)) => {
                data_future = future;
            }
        }
        things_that_need_to_run_periodically().await?;
    }
}

Thanks for your help on this. I have another bug incoming, but this will come as a PR. Session leakage.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants