Skip to content

Commit

Permalink
More robust communication with keep-alive pings
Browse files Browse the repository at this point in the history
  • Loading branch information
haneytron committed Oct 3, 2014
1 parent 105d943 commit fcc4add
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 140 deletions.
172 changes: 89 additions & 83 deletions SimplSockets/SimplSocketClient.cs
Expand Up @@ -140,14 +140,7 @@ public void Connect(EndPoint endPoint)
}
catch (SocketException ex)
{
lock (_isConnectedLock)
{
if (_isConnected)
{
HandleCommunicationError(_socket, ex);
_isConnected = false;
}
}
HandleCommunicationError(_socket, ex);
}
catch (ObjectDisposedException)
{
Expand Down Expand Up @@ -179,14 +172,7 @@ public void Send(byte[] message)
}
catch (SocketException ex)
{
lock (_isConnectedLock)
{
if (_isConnected)
{
HandleCommunicationError(_socket, ex);
_isConnected = false;
}
}
HandleCommunicationError(_socket, ex);
}
catch (ObjectDisposedException)
{
Expand Down Expand Up @@ -222,15 +208,7 @@ public byte[] SendReceive(byte[] message)
}
catch (SocketException ex)
{
lock (_isConnectedLock)
{
if (_isConnected)
{
HandleCommunicationError(_socket, ex);
_isConnected = false;
}
}

HandleCommunicationError(_socket, ex);
return null;
}
catch (ObjectDisposedException)
Expand Down Expand Up @@ -273,10 +251,7 @@ public void Close()
_socket.Close();

// No longer connected
lock (_isConnectedLock)
{
_isConnected = false;
}
_isConnected = false;
}

/// <summary>
Expand Down Expand Up @@ -322,15 +297,7 @@ private void ConnectCallback(IAsyncResult asyncResult)
}
catch (SocketException ex)
{
lock (_isConnectedLock)
{
if (_isConnected)
{
HandleCommunicationError(_socket, ex);
_isConnected = false;
}
}

HandleCommunicationError(_socket, ex);
return;
}
catch (ObjectDisposedException)
Expand All @@ -355,13 +322,55 @@ private void ConnectCallback(IAsyncResult asyncResult)
// Post a receive to the socket as the client will be continuously receiving messages to be pushed to the queue
_socket.BeginReceive(messageState.Buffer, 0, messageState.Buffer.Length, 0, ReceiveCallback, messageState);

// Spin up the keep-alive
KeepAlive(_socket);

// Process all incoming messages
var processMessageState = _messageStatePool.Pop();
processMessageState.Handler = _socket;

ProcessReceivedMessage(processMessageState);
}

private void KeepAlive(Socket handler)
{
int availableTest = 0;

// If the socket is disposed, we're done
try
{
availableTest = handler.Available;
}
catch (ObjectDisposedException)
{
// Peace out!
return;
}

// Do the keep-alive
try
{
handler.BeginSend(_controlBytesPlaceholder, 0, _controlBytesPlaceholder.Length, 0, KeepAliveCallback, handler);
}
catch (SocketException ex)
{
HandleCommunicationError(handler, ex);
}
catch (ObjectDisposedException)
{
// If disposed, handle communication error was already done and we're just catching up on other threads. Supress it.
}
}

private void KeepAliveCallback(IAsyncResult asyncResult)
{
SendCallback(asyncResult);

Thread.Sleep(1000);

KeepAlive((Socket)asyncResult.AsyncState);
}

private void SendCallback(IAsyncResult asyncResult)
{
// Get the socket to complete on
Expand All @@ -374,15 +383,7 @@ private void SendCallback(IAsyncResult asyncResult)
}
catch (SocketException ex)
{
lock (_isConnectedLock)
{
if (_isConnected)
{
HandleCommunicationError(_socket, ex);
_isConnected = false;
}
}

HandleCommunicationError(_socket, ex);
return;
}
catch (ObjectDisposedException)
Expand All @@ -405,15 +406,7 @@ private void ReceiveCallback(IAsyncResult asyncResult)
}
catch (SocketException ex)
{
lock (_isConnectedLock)
{
if (_isConnected)
{
HandleCommunicationError(_socket, ex);
_isConnected = false;
}
}

HandleCommunicationError(_socket, ex);
return;
}
catch (ObjectDisposedException)
Expand Down Expand Up @@ -491,27 +484,34 @@ private void ProcessReceivedMessage(MessageState messageState)

// Have control bytes, get message bytes

// Initialize buffer if needed
if (messageState.Buffer == null)
// SPECIAL CASE: if empty message, skip a bunch of stuff
if (messageState.BytesToRead != 0)
{
messageState.Buffer = new byte[messageState.BytesToRead];
}
// Initialize buffer if needed
if (messageState.Buffer == null)
{
messageState.Buffer = new byte[messageState.BytesToRead];
}

var bytesAvailable = bytesRead - currentOffset;
var bytesAvailable = bytesRead - currentOffset;

var bytesToCopy = Math.Min(messageState.BytesToRead, bytesAvailable);
var bytesToCopy = Math.Min(messageState.BytesToRead, bytesAvailable);

// Copy bytes to buffer
Buffer.BlockCopy(buffer, currentOffset, messageState.Buffer, messageState.Buffer.Length - messageState.BytesToRead, bytesToCopy);
// Copy bytes to buffer
Buffer.BlockCopy(buffer, currentOffset, messageState.Buffer, messageState.Buffer.Length - messageState.BytesToRead, bytesToCopy);

currentOffset += bytesToCopy;
messageState.BytesToRead -= bytesToCopy;
currentOffset += bytesToCopy;
messageState.BytesToRead -= bytesToCopy;
}

// Check if we're done
if (messageState.BytesToRead == 0)
{
// Done, add to complete received messages
CompleteMessage(messageState.Handler, messageState.ThreadId, messageState.Buffer);
if (messageState.Buffer != null)
{
// Done, add to complete received messages
CompleteMessage(messageState.Handler, messageState.ThreadId, messageState.Buffer);
}

// Reset message state
messageState.Buffer = null;
Expand Down Expand Up @@ -566,24 +566,30 @@ private void CompleteMessage(Socket handler, int threadId, byte[] message)
/// <param name="ex">The exception that the socket raised.</param>
private void HandleCommunicationError(Socket socket, Exception ex)
{
// Close the socket
try
{
socket.Shutdown(SocketShutdown.Both);
}
catch (SocketException)
lock (socket)
{
// Socket was not able to be shutdown, likely because it was never opened
}
catch (ObjectDisposedException)
{
// Socket was already closed/disposed, so return out to prevent raising the Error event multiple times
// This is most likely to happen when an error occurs during heavily multithreaded use
return;
// Close the socket
try
{
socket.Shutdown(SocketShutdown.Both);
}
catch (SocketException)
{
// Socket was not able to be shutdown, likely because it was never opened
}
catch (ObjectDisposedException)
{
// Socket was already closed/disposed, so return out to prevent raising the Error event multiple times
// This is most likely to happen when an error occurs during heavily multithreaded use
return;
}

// Close / dispose the socket
socket.Close();
}

// Close / dispose the socket
socket.Close();
// No longer connected
_isConnected = false;

// Clear receive queue for this client
_receiveBufferQueue.Clear();
Expand Down

0 comments on commit fcc4add

Please sign in to comment.