Skip to content

Commit

Permalink
Remove threshold from client SSE pipe (#1577)
Browse files Browse the repository at this point in the history
  • Loading branch information
BrennanConroy committed Mar 12, 2018
1 parent 0b81658 commit 489bd80
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 138 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,28 +1,13 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Buffers;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace System.IO.Pipelines
{
internal static class StreamExtensions
{
/// <summary>
/// Copies the content of a <see cref="Stream"/> into a <see cref="PipeWriter"/>.
/// </summary>
/// <param name="stream"></param>
/// <param name="writer"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static Task CopyToAsync(this Stream stream, PipeWriter writer, CancellationToken cancellationToken = default)
{
// 81920 is the default bufferSize, there is not stream.CopyToAsync overload that takes only a cancellationToken
return stream.CopyToAsync(new PipelineWriterStream(writer), bufferSize: 81920, cancellationToken: cancellationToken);
}

public static async Task CopyToEndAsync(this Stream stream, PipeWriter writer, CancellationToken cancellationToken = default)
{
try
Expand All @@ -38,62 +23,16 @@ public static async Task CopyToEndAsync(this Stream stream, PipeWriter writer, C
}

/// <summary>
/// Copies a <see cref="ReadOnlySequence{Byte}"/> to a <see cref="Stream"/> asynchronously
/// Copies the content of a <see cref="Stream"/> into a <see cref="PipeWriter"/>.
/// </summary>
/// <param name="buffer">The <see cref="ReadOnlySequence{Byte}"/> to copy</param>
/// <param name="stream">The target <see cref="Stream"/></param>
/// <param name="stream"></param>
/// <param name="writer"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static Task CopyToAsync(this ReadOnlySequence<byte> buffer, Stream stream)
{
if (buffer.IsSingleSegment)
{
return WriteToStream(stream, buffer.First);
}

return CopyMultipleToStreamAsync(buffer, stream);
}

private static async Task CopyMultipleToStreamAsync(this ReadOnlySequence<byte> buffer, Stream stream)
{
foreach (var memory in buffer)
{
await WriteToStream(stream, memory);
}
}

private static async Task WriteToStream(Stream stream, ReadOnlyMemory<byte> readOnlyMemory)
{
var memory = MemoryMarshal.AsMemory(readOnlyMemory);
if (MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> data))
{
await stream.WriteAsync(data.Array, data.Offset, data.Count)
.ConfigureAwait(continueOnCapturedContext: false);
}
else
{
// Copy required
var array = memory.Span.ToArray();
await stream.WriteAsync(array, 0, array.Length).ConfigureAwait(continueOnCapturedContext: false);
}
}

public static Task CopyToEndAsync(this PipeReader input, Stream stream)
private static Task CopyToAsync(this Stream stream, PipeWriter writer, CancellationToken cancellationToken = default)
{
return input.CopyToEndAsync(stream, 4096, CancellationToken.None);
}

public static async Task CopyToEndAsync(this PipeReader input, Stream stream, int bufferSize, CancellationToken cancellationToken)
{
try
{
await input.CopyToAsync(stream, bufferSize, cancellationToken);
}
catch (Exception ex)
{
input.Complete(ex);
return;
}
return;
// 81920 is the default bufferSize, there is not stream.CopyToAsync overload that takes only a cancellationToken
return stream.CopyToAsync(new PipelineWriterStream(writer), bufferSize: 81920, cancellationToken: cancellationToken);
}

private class PipelineWriterStream : Stream
Expand Down Expand Up @@ -148,8 +87,7 @@ public override async Task WriteAsync(byte[] buffer, int offset, int count, Canc
{
cancellationToken.ThrowIfCancellationRequested();

_writer.Write(new ReadOnlySpan<byte>(buffer, offset, count));
await _writer.FlushAsync(cancellationToken);
await _writer.WriteAsync(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,8 @@

namespace System.IO.Pipelines
{
internal class StreamPipeConnection : IDuplexPipe
internal class StreamPipeConnection
{
public StreamPipeConnection(PipeOptions options, Stream stream)
{
Input = CreateReader(options, stream);
Output = CreateWriter(options, stream);
}

public PipeReader Input { get; }

public PipeWriter Output { get; }

public void Dispose()
{
Input.Complete();
Output.Complete();
}

public static PipeReader CreateReader(PipeOptions options, Stream stream)
{
if (!stream.CanRead)
Expand All @@ -29,22 +13,9 @@ public static PipeReader CreateReader(PipeOptions options, Stream stream)
}

var pipe = new Pipe(options);
var ignore = stream.CopyToEndAsync(pipe.Writer);
_ = stream.CopyToEndAsync(pipe.Writer);

return pipe.Reader;
}

public static PipeWriter CreateWriter(PipeOptions options, Stream stream)
{
if (!stream.CanWrite)
{
throw new NotSupportedException();
}

var pipe = new Pipe(options);
var ignore = pipe.Reader.CopyToEndAsync(stream);

return pipe.Writer;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public ParseResult ParseMessage(ReadOnlySequence<byte> buffer, out SequencePosit
{
if (!(buffer.PositionOf(ByteLF) is SequencePosition lineEnd))
{
// For the case of data: Foo\r\n\r\<Anytine except \n>
// For the case of data: Foo\r\n\r\<Anything except \n>
if (_internalParserState == InternalParseState.ReadEndOfMessage)
{
if (ConvertBufferToSpan(buffer.Slice(start, buffer.End)).Length > 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ private async Task OpenConnection(IDuplexPipe application, Uri url, Cancellation

using (var stream = await response.Content.ReadAsStreamAsync())
{
var pipelineReader = StreamPipeConnection.CreateReader(PipeOptions.Default, stream);
var pipeOptions = new PipeOptions(pauseWriterThreshold: 0, resumeWriterThreshold: 0);
var pipelineReader = StreamPipeConnection.CreateReader(pipeOptions, stream);
var readCancellationRegistration = cancellationToken.Register(
reader => ((PipeReader)reader).CancelPendingRead(), pipelineReader);
try
Expand All @@ -99,7 +100,6 @@ private async Task OpenConnection(IDuplexPipe application, Uri url, Cancellation

var consumed = input.Start;
var examined = input.End;

try
{
Log.ParsingSSE(_logger, input.Length);
Expand Down

0 comments on commit 489bd80

Please sign in to comment.