Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use System.IO.Pipelines for sending #303

Merged
merged 30 commits into from Jan 10, 2024
Merged

Use System.IO.Pipelines for sending #303

merged 30 commits into from Jan 10, 2024

Conversation

caleblloyd
Copy link
Contributor

Refactors command sending to use System.IO.Pipelines

  • Serializes full command into Pipe Writer, returning any Serialization Exceptions to PublishAsync
  • Kicks off Flush to Pipe Reader
  • At this point, the Command is considered "Enqueued for Sending", and no further Cancellation is possible. A successful status is returned to the caller

There is a possibility that the Connection drops before the Pipe Reader is read and actually sends the bytes on the Network. If so, the bytes remain in the Pipe Reader and will be sent if the Connection reconnects. So the only way to ensure that the bytes were actually sent to the server is to perform a round-trip with PingAsync. I believe that this behavior is similar to the Go client.

Performance

Performance is similar to WaitUntilSent=false but a little better due to some reduced allocations

Before:

Method Iter Mean Error StdDev Gen0 Allocated
WaitUntilSentTrue 64 981.8 μs 945.78 μs 51.84 μs 1.9531 6.82 KB
WaitUntilSentFalse 64 119.6 μs 61.53 μs 3.37 μs - 1.1 KB
WaitUntilSentTrue 512 7,139.8 μs 6,944.54 μs 380.65 μs 15.6250 52.21 KB
WaitUntilSentFalse 512 549.3 μs 851.19 μs 46.66 μs 2.9297 10.43 KB
WaitUntilSentTrue 1024 16,415.3 μs 5,228.92 μs 286.61 μs 31.2500 104.26 KB
WaitUntilSentFalse 1024 1,015.8 μs 1,783.37 μs 97.75 μs 13.6719 43.74 KB

After

Method Iter Mean Error StdDev Allocated
PublishAsync 64 115.5 μs 68.81 μs 3.77 μs 510 B
PublishAsync 512 439.4 μs 778.86 μs 42.69 μs 535 B
PublishAsync 1024 806.7 μs 1,379.06 μs 75.59 μs 542 B

Backwards Compatibility

I aimed to make the changes as Backward Compatible as possible, but there are some considerations:

  • NatsPubOpts.WaitUntilSent and NatsPubOpts.ErrorHandler have been marked as Obsolete
    • Those were the only 2 options in NatsPubOpts
    • There still could be future options in NatsPubOpts, such as a per-publish CommandTimeout override. So we probably shouldn't get rid of it
  • CommandTimeout was lowered from 1 minute by default to 5 seconds. It is used to throw a Timeout exception if anything in the path of sending the command times out, including:
    • Connection does not enter Connected State
    • Cannot acquire Semaphore required to access Pipe Writer
    • Pipe is full and cannot Flush the last command that was enqueued
  • JetStream AckOpts were not in a format that matched NatsPubOpts and NatsSubOpts so I standardized those and got rid of WaitUntilSent

@mtmk
Copy link
Collaborator

mtmk commented Jan 2, 2024

I just started a long running test with JS publisher and consumer just to make sure there are no regressions.

@stebet
Copy link
Contributor

stebet commented Jan 3, 2024

It is also useful and simplifies a lot just to use a BoundedChannel instead of SemaphoreSlim to synchronize writes to the PipeWriter, since SemaphoreSlim, despite it's name, is not really a lightweight synchronization primitive considering it's pretty likely the writes might be happening from multiple threads for a highly concurrent application. Channels also have some nice optimizations for multiple-writers/single-reader scenarios.

That should simplify the CommandWriter code quite a bit, and connection exceptions can be bubbled up in the WriteLoop by completing the ChannelWriter with an exception for example.

With the Channel, you can also in many/most cases avoid the async state machine when the Channel isn't full, by doing a TryWrite and only falling down to WriteAsync when that fails (the bounded channel is full and waiting for the Write loop to pull stuff off it first). That makes PublishAsync very cheap for the cases where the Pipeline buffer can keep up with the channel ingress.

@caleblloyd
Copy link
Contributor Author

It is also useful and simplifies a lot just to use a BoundedChannel instead of SemaphoreSlim to synchronize writes to the PipeWriter, since SemaphoreSlim, despite it's name, is not really a lightweight synchronization primitive

We could, but we would have to introduce another layer of memory and copying. Right now Command Writer is writing directly to the Pipe Writer's buffers, which are the same buffers that get flushed to the Pipe Reader and sent on the wire. Access to the Pipe Writer's buffers must be serialized.

Worse case scenario for adding a BoundedChannel<Memory<byte>> is another (Max Message Size * Bounded Channel Size) bytes of memory, and adding a copy to the Pipe Writer for every message. It could end up being faster for highly parallel environments, would have to benchmark. Maybe we save that for a potential future improvement though?

caleblloyd and others added 21 commits January 3, 2024 22:37
Signed-off-by: Caleb Lloyd <caleb@synadia.com>
Signed-off-by: Caleb Lloyd <caleb@synadia.com>
Signed-off-by: Caleb Lloyd <caleb@synadia.com>
Signed-off-by: Caleb Lloyd <caleb@synadia.com>
Signed-off-by: Caleb Lloyd <caleblloyd@gmail.com>
Signed-off-by: Caleb Lloyd <caleb@synadia.com>
Signed-off-by: Caleb Lloyd <caleb@synadia.com>
Signed-off-by: Caleb Lloyd <caleblloyd@gmail.com>
Signed-off-by: Caleb Lloyd <caleb@synadia.com>
Signed-off-by: Caleb Lloyd <caleb@synadia.com>
Signed-off-by: Caleb Lloyd <caleb@synadia.com>
Signed-off-by: Caleb Lloyd <caleb@synadia.com>
Signed-off-by: Caleb Lloyd <caleb@synadia.com>
Signed-off-by: Caleb Lloyd <caleb@synadia.com>
Signed-off-by: Caleb Lloyd <caleb@synadia.com>
Signed-off-by: Caleb Lloyd <caleb@synadia.com>
Signed-off-by: Caleb Lloyd <caleb@synadia.com>
Signed-off-by: Caleb Lloyd <caleb@synadia.com>
Signed-off-by: Caleb Lloyd <caleb@synadia.com>
Signed-off-by: Caleb Lloyd <caleb@synadia.com>
Signed-off-by: Caleb Lloyd <caleb@synadia.com>
Copy link
Collaborator

@mtmk mtmk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good at first glance. Important thing is the bit about pipe reader try-finally thing. Others are nits.

src/NATS.Client.Core/Commands/CommandWriter.cs Outdated Show resolved Hide resolved
src/NATS.Client.Core/Commands/CommandWriter.cs Outdated Show resolved Hide resolved
src/NATS.Client.Core/Internal/BufferWriterExtensions.cs Outdated Show resolved Hide resolved
Signed-off-by: Caleb Lloyd <caleb@synadia.com>
Signed-off-by: Caleb Lloyd <caleb@synadia.com>
Copy link
Contributor

@to11mtm to11mtm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still need to look at some things closer but wanted to leave what feedback I have

@to11mtm
Copy link
Contributor

to11mtm commented Jan 4, 2024

I'm still reviewing but wanted to try to add some flavor to all of this;

We could, but we would have to introduce another layer of memory and copying. Right now Command Writer is writing directly to the Pipe Writer's buffers, which are the same buffers that get flushed to the Pipe Reader and sent on the wire. Access to the Pipe Writer's buffers must be serialized.

At this point I probably -should- pull down the PR and look, How does SemaphoreSlim help here? Main reason I ask is that it does not, AFAIK, have FIFO behavior as a guarantee.

Worse case scenario for adding a BoundedChannel<Memory<byte>> is another (Max Message Size * Bounded Channel Size) bytes of memory, and adding a copy to the Pipe Writer for every message. It could end up being faster for highly parallel environments, would have to benchmark. Maybe we save that for a potential future improvement though?

It's possible that it was being suggested, rather than use a BoundedChannel<Memory<byte>>... but instead to use Channel<T> (in the past I've done Channel<Exception> with sentinels or Channel<int> with numbers for state... [0]) with in a specific way to mimic a SemaphoreSlim.

Getting more specific, as a semi-contrived example, SubscribeAsync now does a TryWait, if that fails it then dumps into the state machine. but I'd have to guess that the TryWait->WaitAsync will have more interlocked/volatile operations than a ReadAsync or WaitToReadAsync on a ChannelReader. [1]

[0] - In fact, I have done so on more than one occasion to 'gate' NatsConnection and various things on disconnect/reconnect.

[1] - I've seen this myself, where WaitToReadAsync on a BoundedChannel worked better than SemaphoreSlim.

Signed-off-by: Caleb Lloyd <caleb@synadia.com>
Signed-off-by: Caleb Lloyd <caleb@synadia.com>
Copy link
Collaborator

@mtmk mtmk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Signed-off-by: Caleb Lloyd <caleb@synadia.com>
Signed-off-by: Caleb Lloyd <caleb@synadia.com>
Signed-off-by: Caleb Lloyd <caleb@synadia.com>
Signed-off-by: Caleb Lloyd <caleb@synadia.com>
@mtmk mtmk merged commit 421463c into main Jan 10, 2024
9 checks passed
@mtmk mtmk deleted the pipelines-send branch January 10, 2024 21:23
mtmk added a commit that referenced this pull request Jan 11, 2024
* Use System.IO.Pipelines for sending (#303)
* Suggested fixes to interfaces (#297)
* JS Message reply-to exposed (#314)
@mtmk mtmk mentioned this pull request Jan 11, 2024
mtmk added a commit that referenced this pull request Jan 11, 2024
* Use System.IO.Pipelines for sending (#303)
* Suggested fixes to interfaces (#297)
* JS Message reply-to exposed (#314)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants