Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async Client #108

Open
wants to merge 41 commits into
base: master
Choose a base branch
from
Open

Async Client #108

wants to merge 41 commits into from

Conversation

jonas32
Copy link
Contributor

@jonas32 jonas32 commented Aug 10, 2021

This first commit includes most of the basic changes to the client to get it running on async-std and tokio.
There are surely a few more spots that need work or block threads. Feel free to have a look.
The client is not really working yet. I did never test it at least since unit tests are still broken.

Tests are not working yet. There are still a few logical things that need to be changed for async, but that would again be a bigger change itself (thread pools etc.).

  • Async Networking stack
  • Async tend thread
  • Async Client API
  • Runtime on tokio
  • Runtime on async-std
  • Runtime on actix-rt (not required for the client scope since we dont test against actix)
  • Sync Client API
  • Individual tests for sync and async APIs
  • Threadpool rework (optional, it works like it is, but a threadpool on async seems a little off goal)

@jonas32 jonas32 mentioned this pull request Aug 10, 2021
@jonas32
Copy link
Contributor Author

jonas32 commented Aug 14, 2021

In general, the async implementation seems to work. There is still one bug somewhere, that causes the CI to be stuck (see https://github.com/aerospike/aerospike-client-rust/runs/3326575266). Please cancel this CI run since it does not look like it will do by itself and I'm not able to stop it.

I'm not sure yet where this comes from, since the CI does not give me that much info about it and I'm not able to reproduce it locally. Maybe its related to the CI setup. Please pull this version and try to run the async-std tests locally a few times so i get to see if its related to the client or the CI env.

The async-std CI bug is fixed. Looks like a limitation of concurrent connections in the CI env.
Since the lazy static solution with the client instance is not working anymore due to async, every test needs to open its own instance of the Client. There was a Problem in the tests for expressions, where a new client was opened for every single expression.

The benchmark tool is still broken. Sadly, the bencher lib does not support the async setup. Testing against the sync API would be useless since the sync setup is just a block_on wrapper to the async one. That just adds a lot of overhead and slows it down.

Please also check the code and tell me if you find anything that i might have forgotten. I think the optimization is good, but surely there is more potential to it. (Ignore batch reads for now. I'm still working on that. The current implementation is still a more or less hacky workaround to get it working).

The client API might still be inconsistent between a few functions. That's mainly related to moving values between threads. It removes some comfort features from the related functions (operate, query and batch). I would like to keep it that way so this comfort can stay for the other functions, but that's up to you.

Right now i hardcoded an older Aerospike version (5.5.0.2) in the CI file to make it run without the required partitioning update. That needs to be removed later.

@jonas32 jonas32 marked this pull request as ready for review August 15, 2021 02:27
@jonas32
Copy link
Contributor Author

jonas32 commented Aug 15, 2021

Ready to review from my side.

I'm a little unsure about the other open PRs. This one moved the whole client to the aerospike-core folder.
In theory, git tracked the files as moved and not recreated, so i guess it should also be able to correlate the changes.
If not, #100 and #107 need to be changed for this. Please don't merge them before. I think a conflict in this one would be much more work... #105 would be broken because the cargo file was not moved and the updated dependencies were partly removed.

@khaf you said you are working on a rework of the partitions. If you already got anything running there, would you please share it as a draft on a own branch for example so i can check if it fits in without a problem or even already integrate it?

I think unit tests and a new benchmark should be implemented, but i will do that later in another PR. I guess there is already enough to review here with 117 changed files.

# Conflicts:
#	aerospike-core/src/client.rs
#	aerospike-core/src/cluster/mod.rs
#	aerospike-core/src/commands/admin_command.rs
#	aerospike-core/src/commands/stream_command.rs
@jonas32
Copy link
Contributor Author

jonas32 commented Nov 30, 2022

Hey @khaf
thanks for clarifying that.
I understand why you don't want to go for that commitment with the API included that quick.
It still is an API that was designed to be sync from its base, just ported to async with breaking as few things as possible and users just having to add an .await to the function out of the need for an async version quickly internally. There is surely a lot that could be cleaner due to this.

Also a lot has changed in the ecosystem and the language itself since the clients initial design.
Moving the PR to a separate repository as something like a "legacy version" sounds like a good plan to me. That would give more room to improve the quirks that the client API has sometimes. Also, this huge PR is probably a nightmare to review and test...

I would also be up to help you in the process of researching/designing/developing a more fitting and cleaner solution than this PR/API is for the addressed problems if that helps you to get this client more "running again" next to your other commitments.

@khaf
Copy link
Collaborator

khaf commented Nov 30, 2022

@jonas32 Thanks Jonas, both for your amazing contributions and understanding. I didn't mean that we'd want to move this PR to another repo to put it to pasture, since it is a great piece of code and unless there is reason to the contrary we'll get it in the repo. No reason to let good work go to waste.

What I meant was that as we learn more about how systems work both locally and at scale, new perspectives emerge regarding the use cases different approaches serve. In this specific case, I meant we may want to preserve the threaded version and move the async version to another repo. It is becoming more clear to me that optimizing for latency and throughput are two different goals and async is geared towards the latter. Some of our customers are all about latency and would gladly sacrifice a bit of throughput for very low 99 percentile tail latencies.

There are many more things that I'd like to revisit, including testing, life cycles, serialization, instrumentation, monitoring, self-balancing and healing among others. As I begin to think about them, I'll file new discussions on the relevant Github section so that we get the ball rolling. Of course in the meanwhile, I'll have to get this client updated to support the latest features of the server to keep the hype up :)

Thank you for your help in keeping this branch up to date and going. I'll take you up on your offer for help, since I've grown quite rusty away from Rust.

We invite everyone to take part as we continue to think about the most efficient ways to store and move data at scale.

@CMoore-Darwinium
Copy link

Hi, just letting everyone know that my company has started to use @jonas32 's branch in production.

It introduces a bug where batch reads on clusters with size > 1 will read batch keys out of order (I've got a pull request fixing this bug pointed towards Jonas' branch).

However, it has brought great improvements to both latency and throughput.

The greater improvements to latency are because in real usage, it's typically rare to ever do anything once... we can now do lots of operations from a single thread using join! and its far faster and simpler than spinning up threads to do it.

It also brings a greater sense of simplicity to the internal design of the crate as it is no longer responsible for managing thread pools or queues in order to do operations in parallel. I trust this could be leveraged to make further improvements in future.

Regarding 99th latency. My previous company (one of Aerospike's earliest customers) found that the best way to solve this was using rapid retries using READ_SEQUENCE. I'm hoping to write a patch based on Jonas' branch to all this also.

@vamshiaruru-virgodesigns

@CMoore-Darwinium thanks for the info. I am also planning on using that branch for one of my personal projects. I wonder, have you tried running the sync client with tokio::spawn_blocking vs just running the async client, to see how much performance difference is there?

@CMoore-Darwinium
Copy link

@vamshiaruru-virgodesigns we were using spawn::blocking on the sync client until we switched.

I don't really think we have an apples-to-apples comparison because the async version allows us to do some stuff that we weren't before. With spawn_blocking we needed to be very disciplined about how many tasks we created, since had to contend with blocking pool exhaustion with high traffic.

For the "finalization" part of our transactions it was taking ~30ms to perform one write and maybe 8-9 operations in series. This was also highly variable based on load.

Now we use join_all to perform all the async writes on the same thread. This part is consistently over and done with in 1.2ms-1.5ms.

Granted, this is all very unscientific because obviously there were some major structural code changes to go from sync to async. However, I'd credit the async client for allowing us to make those code changes.

@jonas32
Copy link
Contributor Author

jonas32 commented Nov 30, 2022

@khaf don't get me wrong there. I didn't read your message as if you were saying that the PR should be moved out of sight or should be skipped. I just understand why you are not comfortable with committing to the API that this PR introduces on the longer run. Not focusing on the async client exactly as this PR proposes is totally fine for me, since i agree to the problems that you pointed out before.

@CMoore-Darwinium

It introduces a bug where batch reads on clusters with size > 1 will read batch keys out of order (I've got a pull request fixing this bug pointed towards Jonas' branch).

Oh, i noticed this too while using the branch, but i did not expect it to be ordered at all in the first place.
But good to know. I guess i can skip fixing it in here since it looks like you did that on your PR already.

@vamshiaruru-virgodesigns
I'm not sure if that would be a fair benchmark between sync and async.
The sync client in this branch has no own implementation at all, but rather wraps the async codebase and just block on the futures.

let client = block_on(aerospike_core::Client::new(policy, hosts))?;

The aerospike_core crate is simply the async client.
I didn't check, but i guess it can not win that test. Its just adding extra overhead by blocking futures so the user does not have to as a compatibility layer to the current release.
Testing the current master version against the async one would be more accurate.
That way, its still great if you want to go for throughput, but not ideal for the scenario that @khaf is talking about where customers want to go for latency instead.

When i built this, i tested the performance under a few different scenarios.
It turned out, that the way the thread pool is managed is a big factor.
Just opening a tokio runtime and spamming the async functions will not be very efficient and might even decrease the performance, at least for short running operations. Long running ones would profit again in that case. Same probably counts for the new sync wrapper here.
The real strength is in a scenario like actix web, where the webserver handles the runtime and its threads more efficiently.
I saw a great performance increase in that case.
Really benchmarking the async client and getting solid numbers is not easy, since a lot of factors can count into that. That's also the reason why this version does not ship async benchmarks at all currently.

@CMoore-Darwinium
Copy link

@khaf just letting you know, I have a patch in https://github.com/darwinium-com/aerospike-client-rust to make the client rack aware, as well as introducing replica retry policies. As far as I know, this is the main missing feature in the Rust client and it works now.

We're currently using this async patch, so this patch is a precondition.

@Hartigan
Copy link

Hartigan commented Jan 16, 2023

Hello @CMoore-Darwinium @jonas32 I created patch with TLS support for client from this PR. If anyone can test it with EE version of aerospike that would be great. Also, I want create PR with trust-dns-resolver for asynchronous resolving aerospike hosts.

@@ -19,6 +19,7 @@ use serde::Serialize;
use std::convert::From;

/// Container object for a record bin, comprising a name and a value.
#[derive(Clone)]
pub struct Bin<'a> {
/// Bin name
pub name: &'a str,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that so many life times have been removed, removing this does not seem to be much of a controversial issue. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you talking about the Clone or the lifetime on the struct?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Struct's name field. Can be a String, and life will be easier ever after.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. That will simplify it a lot.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be frank, I'm worried about the CPU cost of all these little conversions, but my impression is we are saturating the io long before the CPU (async itself is very CPU heavy). I hope we don't come to regret this later. On another note, is there an idiomatic way of accepting &str, String and &'static str in all user-facing APIs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes there is. This would work:

fn main() {
    let stat: &'static str = "static";
    accept_all("&str");
    accept_all("String".to_string());
    accept_all(stat);
}

fn accept_all<S: Into<String>>(value: S) {
    println!("{}", value.into());
}

&self,
policy: &ScanPolicy,
namespace: &str,
set_name: &str,
bins: T,
) -> Result<Arc<Recordset>>
where
T: Into<Bins>,
T: Into<Bins> + Send + Sync + 'static,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preferably, this IntoBins trait should allow a lazy iterator so that everything fit on the stack and no heavy allocation would be needed. Thoughts?

@@ -247,6 +247,9 @@ pub fn append<'a>(policy: &ListPolicy, bin: &'a str, value: &'a Value) -> Operat

/// Create list append items operation. Server appends each input list item to the end of list
/// bin. Server returns list size.
///
/// # Panics
/// Will panic if values is empty
pub fn append_items<'a>(policy: &ListPolicy, bin: &'a str, values: &'a [Value]) -> Operation<'a> {
assert!(!values.is_empty());

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

&str probably should become String here as well.

Some(timeout) => Some(Instant::now() + timeout),
None => None,
}
self.timeout.map(|timeout| Instant::now() + timeout)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sigh. I guess I had forgotten my Scala days.

@khaf
Copy link
Collaborator

khaf commented Feb 10, 2023

Alright, I think the best way to move forward would be to create a branch named async, and merge this PR and all of the work based on it into that branch as I review them. This way we can take some concrete steps towards merging the branch and all other related work into the master, while having a central branch for future PRs. Looking forward to feedback.

I'll create that branch latest Sunday evening CET if I don't hear back from anyone.

@khaf
Copy link
Collaborator

khaf commented Feb 13, 2023

The async branch is created and can be used for future development. I'm looking into merging @CMoore-Darwinium 's and @Hartigan 's branches next to merge next.

@jonas32
Copy link
Contributor Author

jonas32 commented Feb 14, 2023

Hey @khaf
sounds like a good plan.
Should I fix the stuff you already commented on myself on this or should I wait for you to finish?

@khaf
Copy link
Collaborator

khaf commented Feb 14, 2023

@jonas32 Don't worry about it, I'll take care of them. If you have some free time and looking for an interesting problem, I believe the lazy iterator for the serializers is the one.

@jonas32
Copy link
Contributor Author

jonas32 commented Feb 14, 2023

the lazy iterator for the serializers

Just for clarification, you are talking about something in the direction of impl From<Iterator> for Bins right?

@khaf
Copy link
Collaborator

khaf commented Feb 15, 2023

It's about PR #126 .
At the moment those derive macros implement:

pub trait ToBins {
    /// Convert the struct to a `Bin` Array
    fn to_bins(&self) -> Vec<Bin>;
}

That Vec is expensive and unnecessary since you don't need the bins all at the same time. It would be better if we could get this:

fn to_bin_iter(&self, bins: Option<[String]>) -> iter::Iterator<Bin>;

This way the bin is allocated on the stack and it does not matter how big the struct is.

@khaf
Copy link
Collaborator

khaf commented Jun 13, 2023

@jonas32 I'm trying to use the aerospike-sync crate for a library, but I encounter this error for rt-tokio. Are you familiar with the issue?

thread '<unnamed>' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime'

I see a lot of mentions on the internet, but scarcely anyone provides a workaround that works. How did you test it?

@jonas32
Copy link
Contributor Author

jonas32 commented Jun 13, 2023

This happens because the Runtime is not initialized.
The sync client works by basically blocking on the async futures. But for that, tokio still needs to be running.
Currently the client does not open its own runtime since im not sure if that should be part of it.

So put #[tokio::main] over your main and it should work

@khaf
Copy link
Collaborator

khaf commented Jun 13, 2023

That I figured, but I'm using it in a library (that is itself a C plugin). There is no main.

@jonas32
Copy link
Contributor Author

jonas32 commented Jun 13, 2023

i think the easiest way would be using the async client and manually running the runtime.
Here, see that example. Could work similar.
https://tokio.rs/tokio/topics/bridging#a-synchronous-interface-to-mini-redis

@khaf
Copy link
Collaborator

khaf commented Jun 13, 2023

Shouldn't this be how the aerospike-sync works? Basically store the runtime inside and initialize it with the client. I don't think a sync user cares much about the inner workings of the library, they just want to call it.
I guess the futures is already doing that, I just don't understand how to initialize that default runtime, and can't find it anywhere.

@Hartigan
Copy link

@khaf you can create runtime without main https://docs.rs/tokio/latest/tokio/runtime/index.html#usage

@jonas32
Copy link
Contributor Author

jonas32 commented Jun 13, 2023

without main annotation, you cant really make a default one. You need to spawn manually then. But yes, it might be worth to integrate that into the client. Ill have a look at that later.

}
}

impl Drop for PooledConnection {
fn drop(&mut self) {
if let Some(conn) = self.conn.take() {
self.queue.put_back(conn);
block_on(self.queue.put_back(conn));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using futures::executor::block_on underneath an async function can cause a deadlock.

We had a production outage this week because of this line. I'm about to make a special patch to this branch to address it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for this inconvenience, i didnt know about that.
You should now be able to directly push to my branch in case its needed, since I currently dont have the time to work on this branch myself.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I'll run my fix past you and @khaf to see if everyone's onboard with it first.

I changed internals to a synchronous mutex, to allow it to be used more easily in the drop function.
Synchronous mutexes are safe in async code (and actually outperform async mutexes) with the proviso that they are never held across .await points... otherwise they deadlock.

So the put_back function now looks like this:

    pub fn put_back(&self, mut conn: Connection) {
        let mut internals = self.0.internals.lock().unwrap();
        if internals.num_conns < self.0.capacity {
            internals.connections.push_back(IdleConnection(conn));
        } else {
            aerospike_rt::spawn(async move { conn.close().await });
            internals.num_conns -= 1;
        }
    }

Because of the limitation on holding mutexes across await points, I also had to restructure fn get(&self) to do the synchronous code first and the asynchonous code later, as so:

    pub async fn get(&self) -> Result<PooledConnection> {
        let mut connections_to_free = Vec::new();
        let mut conn: Option<Connection> = None;
        {
            let mut internals = self.0.internals.lock().unwrap();
            while let Some(IdleConnection(this_conn)) = internals.connections.pop_front() {
                if this_conn.is_idle() {
                    connections_to_free.push(this_conn);
                    internals.num_conns -= 1;
                } else {
                    conn = Some(this_conn);
                    break;
                }
            }

            if conn.is_none() {
                if internals.num_conns >= self.0.capacity {
                    bail!(ErrorKind::NoMoreConnections);
                }
    
                internals.num_conns += 1;
            }
        }

        for mut connection in connections_to_free {
            connection.close().await;
        }

        if conn.is_none() {
            let new_conn = aerospike_rt::timeout(
                Duration::from_secs(5),
                Connection::new(&self.0.host.address(), &self.0.policy),
            )
            .await;

            let Ok(Ok(new_conn)) = new_conn else {
                self.0.internals.lock().unwrap().num_conns -= 1;
                bail!(ErrorKind::Connection(
                    "Could not open network connection".to_string()
                ));
            };

            conn = Some(new_conn);
        }

        Ok(PooledConnection {
            queue: self.clone(),
            conn,
        })

Not only does it allow the synchonous mutex to be used here, but it also does not hold a lock while calling close(), so it should be faster under load.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh by the way. I mentioned the outage, but I didn't mention how great it is to have this patch at all. I'm not sure if we'd have a production environment without it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

8 participants