Skip to content

Commit

Permalink
Fixing bugs in receive
Browse files Browse the repository at this point in the history
  • Loading branch information
LiveOrDevTrying committed May 15, 2023
1 parent 887fe02 commit 27c5998
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 133 deletions.
104 changes: 55 additions & 49 deletions Tcp.NET.Client/Handlers/TcpClientHandlerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using PHS.Networking.Utilities;
using System;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Security;
using System.Net.Sockets;
Expand Down Expand Up @@ -89,51 +90,33 @@ public override async Task<bool> ConnectAsync(CancellationToken cancellationToke
}
public override async Task<bool> DisconnectAsync(CancellationToken cancellationToken = default)
{
try
if (_connection != null)
{
if (_connection != null)
if (!_connection.Disposed)
{
if (!_connection.Disposed)
{
_connection.Disposed = true;
_connection.Disposed = true;

if (_parameters.UseDisconnectBytes)
{
await SendAsync(_parameters.DisconnectBytes, cancellationToken).ConfigureAwait(false);
}
if (_parameters.UseDisconnectBytes)
{
await SendAsync(_parameters.DisconnectBytes, cancellationToken).ConfigureAwait(false);
}

_connection?.Dispose();
_connection?.Dispose();

FireEvent(this, CreateConnectionEventArgs(new TcpConnectionEventArgs<Y>
{
ConnectionEventType = ConnectionEventType.Disconnect,
Connection = _connection,
CancellationToken = cancellationToken
}));
FireEvent(this, CreateConnectionEventArgs(new TcpConnectionEventArgs<Y>
{
ConnectionEventType = ConnectionEventType.Disconnect,
Connection = _connection,
CancellationToken = cancellationToken
}));

_connection = null;
_connection = null;

_isRunning = false;
_isRunning = false;

return true;
}
return true;
}
}
catch (Exception ex)
{
FireEvent(this, CreateErrorEventArgs(new TcpErrorEventArgs<Y>
{
Connection = _connection,
Exception = ex,
Message = ex.Message,
CancellationToken = cancellationToken
}));
}

_connection?.Dispose();
_connection = null;

_isRunning = false;

return false;
}
Expand All @@ -145,7 +128,8 @@ public override async Task<bool> SendAsync(string message, CancellationToken can
if (_connection != null &&
_connection.TcpClient != null &&
_connection.TcpClient.Connected &&
!cancellationToken.IsCancellationRequested)
!cancellationToken.IsCancellationRequested &&
!string.IsNullOrWhiteSpace(message))
{
var bytes = Statics.ByteArrayAppend(Encoding.UTF8.GetBytes($"{message}"), _parameters.EndOfLineBytes);
await _connection.TcpClient.Client.SendAsync(new ArraySegment<byte>(bytes), SocketFlags.None, cancellationToken).ConfigureAwait(false);
Expand Down Expand Up @@ -184,7 +168,8 @@ public override async Task<bool> SendAsync(byte[] message, CancellationToken can
if (_connection != null &&
_connection.TcpClient != null &&
_connection.TcpClient.Connected &&
!cancellationToken.IsCancellationRequested)
!cancellationToken.IsCancellationRequested &&
message.Where(x => x != 0).Any())
{
var bytes = Statics.ByteArrayAppend(message, _parameters.EndOfLineBytes);
await _connection.TcpClient.Client.SendAsync(new ArraySegment<byte>(bytes), SocketFlags.None, cancellationToken).ConfigureAwait(false);
Expand Down Expand Up @@ -223,29 +208,40 @@ protected virtual async Task ReceiveAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested && _connection != null && _connection.TcpClient.Connected)
{
using (var ms = new MemoryStream())
var endOfMessage = false;

do
{
if (_connection.TcpClient.Available > 0)
{
var buffer = new ArraySegment<byte>(new byte[_connection.TcpClient.Available]);
var result = await _connection.TcpClient.Client.ReceiveAsync(buffer, SocketFlags.None, cancellationToken).ConfigureAwait(false);
await ms.WriteAsync(buffer.Array.AsMemory(buffer.Offset, result), cancellationToken).ConfigureAwait(false);
await _connection.MemoryStream.WriteAsync(buffer.Array.AsMemory(buffer.Offset, result), cancellationToken).ConfigureAwait(false);

endOfMessage = Statics.ByteArrayContainsSequence(_connection.MemoryStream.GetBuffer(), _parameters.EndOfLineBytes) > -1;
}
else
{
await Task.Delay(1, cancellationToken).ConfigureAwait(false);
continue;
}

var endOfMessage = Statics.ByteArrayContainsSequence(ms.ToArray(), _parameters.EndOfLineBytes) > -1;
}
while (!endOfMessage && _connection != null && _connection.TcpClient.Connected);

if (endOfMessage)
{
var bytes = _connection.MemoryStream.ToArray();
_connection.MemoryStream.SetLength(0);

if (endOfMessage)
while (Statics.ByteArrayContainsSequence(bytes, _parameters.EndOfLineBytes) > -1)
{
var parts = Statics.ByteArraySeparate(ms.ToArray(), _parameters.EndOfLineBytes);
var index = Statics.ByteArrayContainsSequence(bytes, _parameters.EndOfLineBytes);
var sub = bytes.Take(index).ToArray();

bytes = bytes.Skip(index + _parameters.EndOfLineBytes.Length).ToArray();

for (int i = 0; i < parts.Length; i++)
if (sub.Length > 0)
{
if (_parameters.UseDisconnectBytes && Statics.ByteArrayEquals(parts[i], _parameters.DisconnectBytes))
if (_parameters.UseDisconnectBytes && Statics.ByteArrayEquals(sub, _parameters.DisconnectBytes))
{
_connection?.Dispose();

Expand All @@ -261,7 +257,7 @@ protected virtual async Task ReceiveAsync(CancellationToken cancellationToken)
_isRunning = false;
return;
}
else if (_parameters.UsePingPong && Statics.ByteArrayEquals(parts[i], _parameters.PingBytes))
else if (_parameters.UsePingPong && Statics.ByteArrayEquals(sub, _parameters.PingBytes))
{
await SendAsync(_parameters.PongBytes, cancellationToken).ConfigureAwait(false);
}
Expand All @@ -271,13 +267,18 @@ protected virtual async Task ReceiveAsync(CancellationToken cancellationToken)
{
MessageEventType = MessageEventType.Receive,
Connection = _connection,
Message = !_parameters.OnlyEmitBytes ? Encoding.UTF8.GetString(parts[i]) : null,
Bytes = parts[i],
Message = !_parameters.OnlyEmitBytes ? Encoding.UTF8.GetString(sub) : null,
Bytes = sub,
CancellationToken = cancellationToken
}));
}
}
}

if (bytes.Length > 0)
{
await _connection.MemoryStream.WriteAsync(bytes, cancellationToken);
}
}
}
}
Expand All @@ -292,6 +293,11 @@ protected virtual async Task ReceiveAsync(CancellationToken cancellationToken)
}));
}

if (_connection != null)
{
_connection.Disposed = true;
}

await DisconnectAsync(cancellationToken).ConfigureAwait(false);
}

Expand Down
18 changes: 16 additions & 2 deletions Tcp.NET.Core/Models/ConnectionTcp.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using PHS.Networking.Models;
using System;
using System.IO;
using System.Net.Sockets;

namespace Tcp.NET.Core.Models
Expand All @@ -8,22 +9,35 @@ public class ConnectionTcp : IConnection
{
public string ConnectionId { get; set; }
public TcpClient TcpClient { get; set; }
public MemoryStream MemoryStream { get; set; }
public DateTime NextPing { get; set; }
public bool Disposed { get; set; }

public ConnectionTcp()
{
MemoryStream = new MemoryStream();
}

public virtual void Dispose()
{
Disposed = true;

try
{
TcpClient?.GetStream().Close();
MemoryStream.Close();
MemoryStream.Dispose();
}
catch { }

try
{
TcpClient?.Close();
}
catch { }

try
{
TcpClient?.Dispose();
//TcpClient?.Dispose();
}
catch { }
}
Expand Down

0 comments on commit 27c5998

Please sign in to comment.