Skip to content

Commit

Permalink
Slight code refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
haneytron committed Jul 15, 2014
1 parent 73bfd46 commit 6b084ba
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 47 deletions.
16 changes: 5 additions & 11 deletions README.md
@@ -1,4 +1,4 @@
SIMPLSOCKETS 1.2.0
SIMPLSOCKETS 1.2.1
===========


Expand All @@ -15,18 +15,12 @@ VERSION INFORMATION
============================================


1.2.0
1.2.1
------------------

- SUBSTANTIALLY optimized performance and memory usage. Much less memory used and buffer object creation. The result is much faster sockets!
- Very minor refactor of code

- BREAKING CHANGES: altered interfaces and methods to be more OO-friendly and to enable client and server to have access to same operations

- Exposed events for MessageReceived and Error that you can hook into to receive (and process) messages, and to handle any communication errors

- Pool and Blocking Queue optimizations

- Refactored almost all code to be much more efficient
- Removed IDisposable from the SimplSocket implementation, leaving it only on the ISimplSocket interface.


INSTALLATION INSTRUCTIONS
Expand All @@ -37,4 +31,4 @@ Just include the DLL in your project ([NuGet](http://www.nuget.org/packages/Simp

To create a client or server:

`var clientOrServer = new SimplSocket()`
`var clientOrServer = new SimplSocket(...)`
2 changes: 1 addition & 1 deletion SimplSockets/Properties/AssemblyInfo.cs
Expand Up @@ -20,4 +20,4 @@
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.2.0.*")]
[assembly: AssemblyVersion("1.2.1.*")]
69 changes: 34 additions & 35 deletions SimplSockets/SimplSocket.cs
Expand Up @@ -11,7 +11,7 @@ namespace SimplSockets
/// <summary>
/// Wraps sockets and provides intuitive, extremely efficient, scalable methods for client-server communication.
/// </summary>
public class SimplSocket : ISimplSocket, IDisposable
public class SimplSocket : ISimplSocket
{
// The function that creates a new socket
private readonly Func<Socket> _socketFunc = null;
Expand Down Expand Up @@ -53,8 +53,8 @@ public class SimplSocket : ISimplSocket, IDisposable
private readonly Pool<MessageState> _messageStatePool = null;
// The pool of buffers
private readonly Pool<byte[]> _bufferPool = null;
// The pool of receive messages
private readonly Pool<ReceivedMessage> _receiveMessagePool = null;
// The pool of received messages
private readonly Pool<ReceivedMessage> _receivedMessagePool = null;

// The control bytes placeholder - the first 4 bytes are little endian message length, the last 4 are thread id
private static readonly byte[] _controlBytesPlaceholder = new byte[] { 0, 0, 0, 0, 0, 0, 0, 0 };
Expand Down Expand Up @@ -90,10 +90,10 @@ public SimplSocket(Func<Socket> socketFunc, int messageBufferSize, int maximumCo

_currentlyConnectedClients = new List<Socket>(maximumConnections);

_receiveBufferQueue = new Dictionary<int, BlockingQueue<KeyValuePair<byte[], int>>>(10);
_receiveBufferQueue = new Dictionary<int, BlockingQueue<KeyValuePair<byte[], int>>>(maximumConnections);

// Initialize the client multiplexer
_clientMultiplexer = new Dictionary<int, MultiplexerData>(10);
_clientMultiplexer = new Dictionary<int, MultiplexerData>(64);

// Create the pools
_messageStatePool = new Pool<MessageState>(maximumConnections, () => new MessageState(), messageState =>
Expand All @@ -105,7 +105,7 @@ public SimplSocket(Func<Socket> socketFunc, int messageBufferSize, int maximumCo
});
_manualResetEventPool = new Pool<ManualResetEvent>(maximumConnections, () => new ManualResetEvent(false), manualResetEvent => manualResetEvent.Reset());
_bufferPool = new Pool<byte[]>(maximumConnections, () => new byte[messageBufferSize], null);
_receiveMessagePool = new Pool<ReceivedMessage>(maximumConnections, () => new ReceivedMessage(), receivedMessage =>
_receivedMessagePool = new Pool<ReceivedMessage>(maximumConnections, () => new ReceivedMessage(), receivedMessage =>
{
receivedMessage.Message = null;
receivedMessage.Socket = null;
Expand All @@ -117,7 +117,7 @@ public SimplSocket(Func<Socket> socketFunc, int messageBufferSize, int maximumCo
_messageStatePool.Push(new MessageState());
_manualResetEventPool.Push(new ManualResetEvent(false));
_bufferPool.Push(new byte[messageBufferSize]);
_receiveMessagePool.Push(new ReceivedMessage());
_receivedMessagePool.Push(new ReceivedMessage());
}
}

Expand Down Expand Up @@ -178,11 +178,12 @@ public bool Connect(EndPoint endPoint)
messageState.Handler = _socket;
messageState.Buffer = _bufferPool.Pop();

// Create receive queue for this client
// Create receive buffer queue for this client
var receiveBufferQueue = new BlockingQueue<KeyValuePair<byte[], int>>(64);
_receiveBufferQueueLock.EnterWriteLock();
try
{
_receiveBufferQueue[messageState.Handler.GetHashCode()] = new BlockingQueue<KeyValuePair<byte[], int>>(10);
_receiveBufferQueue[messageState.Handler.GetHashCode()] = receiveBufferQueue;
}
finally
{
Expand All @@ -196,7 +197,7 @@ public bool Connect(EndPoint endPoint)
var processMessageState = _messageStatePool.Pop();
processMessageState.Handler = _socket;

Task.Factory.StartNew(() => ProcessReceivedMessage(processMessageState));
Task.Factory.StartNew(() => ProcessReceivedMessage(processMessageState, receiveBufferQueue));

return true;
}
Expand Down Expand Up @@ -515,11 +516,12 @@ private void AcceptCallback(IAsyncResult asyncResult)
return;
}

// Create receive queue for this client
// Create receive buffer queue for this client
var receiveBufferQueue = new BlockingQueue<KeyValuePair<byte[], int>>(64);
_receiveBufferQueueLock.EnterWriteLock();
try
{
_receiveBufferQueue[messageState.Handler.GetHashCode()] = new BlockingQueue<KeyValuePair<byte[], int>>(10);
_receiveBufferQueue[messageState.Handler.GetHashCode()] = receiveBufferQueue;
}
finally
{
Expand All @@ -529,9 +531,7 @@ private void AcceptCallback(IAsyncResult asyncResult)
// Process all incoming messages
var processMessageState = _messageStatePool.Pop();
processMessageState.Handler = handler;

// Process all incoming messages
ProcessReceivedMessage(processMessageState);
ProcessReceivedMessage(processMessageState, receiveBufferQueue);
}

private void SendCallback(IAsyncResult asyncResult)
Expand Down Expand Up @@ -614,31 +614,16 @@ private void ReceiveCallback(IAsyncResult asyncResult)
}
}

private void ProcessReceivedMessage(MessageState messageState)
private void ProcessReceivedMessage(MessageState messageState, BlockingQueue<KeyValuePair<byte[], int>> receiveBufferQueue)
{
// Get the receive buffer queue
BlockingQueue<KeyValuePair<byte[], int>> queue = null;
_receiveBufferQueueLock.EnterReadLock();
try
{
if (!_receiveBufferQueue.TryGetValue(messageState.Handler.GetHashCode(), out queue))
{
throw new Exception("FATAL: No receive queue created for current socket");
}
}
finally
{
_receiveBufferQueueLock.ExitReadLock();
}

int controlBytesOffset = 0;
byte[] protocolBuffer = new byte[_controlBytesPlaceholder.Length];

// Loop until socket is done
while (_isDoingSomething)
{
// Get the next buffer from the queue
var receiveBufferEntry = queue.Dequeue();
var receiveBufferEntry = receiveBufferQueue.Dequeue();
var buffer = receiveBufferEntry.Key;
int bytesRead = receiveBufferEntry.Value;

Expand Down Expand Up @@ -722,20 +707,23 @@ private void CompleteMessage(Socket handler, int threadId, byte[] message)
}

// No multiplexer
var receivedMessage = _receiveMessagePool.Pop();
var receivedMessage = _receivedMessagePool.Pop();
receivedMessage.Socket = handler;
receivedMessage.ThreadId = threadId;
receivedMessage.Message = message;

// Fire the event if needed
var messageReceived = MessageReceived;
if (messageReceived != null)
{
// Create the message received args
var messageReceivedArgs = new MessageReceivedArgs(receivedMessage);
// Fire the event
messageReceived(this, messageReceivedArgs);
}

// Put the received message back on the pool
_receiveMessagePool.Push(receivedMessage);
// Place received message back in pool
_receivedMessagePool.Push(receivedMessage);
}

/// <summary>
Expand Down Expand Up @@ -792,6 +780,17 @@ private void HandleCommunicationError(Socket socket, Exception ex)
_currentlyConnectedClientsLock.ExitWriteLock();
}

// Remove receive queue for this client
_receiveBufferQueueLock.EnterWriteLock();
try
{
_receiveBufferQueue.Remove(socket.GetHashCode());
}
finally
{
_receiveBufferQueueLock.ExitWriteLock();
}

// Decrement the counter keeping track of the total number of clients connected to the server
Interlocked.Decrement(ref _currentlyConnectedClientCount);

Expand Down

0 comments on commit 6b084ba

Please sign in to comment.