Skip to content

alexandrnikitin/MPMCQueue.NET

Repository files navigation

MPMCQueue.NET

Build Status

Bounded multiple producers multiple consumers queue for .NET

Note: ConcurrentQueue<T> uses this algorithm since .NET Core and .NET >5.0. Please use that instead. Unless you want to modify this implementation for you needs, e.g. MPSC, SCMP.

Overview

This is an attempt to port the famous Bounded MPMC queue algorithm by Dmitry Vyukov to .NET. All credit goes to Dmitry Vyukov. I let myself quote the description:

According to the classification it's MPMC, array-based, fails on overflow, does not require GC, w/o priorities, causal FIFO, blocking producers and consumers queue. The algorithm is pretty simple and fast. It's not lockfree in the official meaning, just implemented by means of atomic RMW operations w/o mutexes.

The cost of enqueue/dequeue is 1 CAS per operation. No amortization, just 1 CAS. No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue), i.e. do not touch the same data while queue is not empty.

Implementation

The queue class layout is shown below. The _buffer field stores enqueued elements and their sequences. It has size that is a power of two. The _bufferMask field is used to avoid the expensive modulo operation and use AND instead. There's padding applied to avoid false sharing of _enqueuePos and _dequeuePos counters. And Volatile.Read/Write to suppress memory instructions reordering when read/write cell.Sequence.

[StructLayout(LayoutKind.Explicit, Size = 192, CharSet = CharSet.Ansi)]
public class MPMCQueue
{
    [FieldOffset(0)]
    private readonly Cell[] _buffer;
    [FieldOffset(8)]
    private readonly int _bufferMask;
    [FieldOffset(64)]
    private int _enqueuePos;
    [FieldOffset(128)]
    private int _dequeuePos;

    ...
}

The enqueue algorithm:

public bool TryEnqueue(object item)
{
    do
    {
        var buffer = _buffer; // prefetch the buffer pointer
        var pos = _enqueuePos; // fetch the current position where to enqueue the item
        var index = pos & _bufferMask; // precalculate the index in the buffer for that position
        var cell = buffer[index]; // fetch the cell by the index
        // If its sequence wasn't touched by other producers
        // and we can increment the enqueue position
        if (cell.Sequence == pos && Interlocked.CompareExchange(ref _enqueuePos, pos + 1, pos) == pos)
        {
            // write the item we want to enqueue
            buffer[index].Element = item;
            // bump the sequence
            Volatile.Write(ref buffer[index].Sequence, pos + 1);
            return true;
        }

        // If the queue is full we cannot enqueue and just return false
        if (cell.Sequence < pos)
        {
            return false;
        }

        // repeat the process if other producer managed to enqueue before us
    } while (true);
}

The dequeue algorithm:

public bool TryDequeue(out object result)
{
    do
    {
        var buffer = _buffer; // prefetch the buffer pointer
        var bufferMask = _bufferMask; // prefetch the buffer mask
        var pos = _dequeuePos; // fetch the current position from where we can dequeue an item
        var index = pos & bufferMask; // precalculate the index in the buffer for that position
        var cell = buffer[index]; // fetch the cell by the index
        // If its sequence was changed by a producer and wasn't changed by other consumers
        // and we can increment the dequeue position
        if (cell.Sequence == pos + 1 && Interlocked.CompareExchange(ref _dequeuePos, pos + 1, pos) == pos)
        {
            // read the item
            result = cell.Element;
            // no more reference the dequeue data
            buffer[index].Element = null;
            // update for the next round of the buffer
            Volatile.Write(ref buffer[index].Sequence, pos + bufferMask + 1);
            return true;
        }

        // If the queue is empty return false
        if (cell.Sequence < pos + 1)
        {
            result = default(object);
            return false;
        }

        // repeat the process if other consumer managed to dequeue before us
    } while (true);
}

Benchmarks

Host Process Environment Information:
BenchmarkDotNet.Core=v0.9.9.0
OS=Microsoft Windows NT 6.2.9200.0
Processor=Intel(R) Core(TM) i7-4600U CPU 2.10GHz, ProcessorCount=4
Frequency=2630626 ticks, Resolution=380.1377 ns, Timer=TSC
CLR=MS.NET 4.0.30319.42000, Arch=64-bit RELEASE [RyuJIT]
GC=Concurrent Workstation
JitModules=clrjit-v4.6.1586.0

MPMCQueue.NET.MPMCQueue

Method NumberOfThreads Median StdDev
EnqueueDequeue 1 24.5941 ns 6.0686 ns
EnqueueDequeue 2 45.5109 ns 12.0462 ns
EnqueueDequeue 4 49.1997 ns 4.3251 ns

System.Collections.Concurrent.ConcurrentQueue

Method NumberOfThreads Median StdDev
EnqueueDequeue 1 34.1918 ns 0.5379 ns
EnqueueDequeue 2 72.1948 ns 2.8465 ns
EnqueueDequeue 4 63.6846 ns 3.6718 ns

MPMCQueue shows worse than ConcurrentQueue results on many core and multi socket CPUs systems because cmpxchg instruction doesn't scale well, read more

Assembly (RyuJIT x64, clrjit-v4.6.1586.0)

MPMCQueue.NET.MPMCQueue.TryEnqueue(System.Object)

var buffer = _buffer;
>>> 
00007ffb`3a790660 57              push    rdi
00007ffb`3a790661 56              push    rsi
00007ffb`3a790662 53              push    rbx
00007ffb`3a790663 4883ec20        sub     rsp,20h
00007ffb`3a790667 4c8b4108        mov     r8,qword ptr [rcx+8]

var pos = _enqueuePos;
00007ffb`3a79066b 8b7148          mov     esi,dword ptr [rcx+48h]

var index = pos & _bufferMask;
00007ffb`3a79066e 448bce          mov     r9d,esi
00007ffb`3a790671 44234910        and     r9d,dword ptr [rcx+10h]

var cell = buffer[index];
00007ffb`3a790675 453b4808        cmp     r9d,dword ptr [r8+8]
00007ffb`3a790679 735e            jae     00007ffb`3a7906d9
00007ffb`3a79067b 4963c1          movsxd  rax,r9d
00007ffb`3a79067e 48c1e004        shl     rax,4
00007ffb`3a790682 498d7c0010      lea     rdi,[r8+rax+10h]
00007ffb`3a790687 488bc7          mov     rax,rdi
00007ffb`3a79068a 8b5808          mov     ebx,dword ptr [rax+8]

if (cell.Sequence == pos && Interlocked.CompareExchange(ref _enqueuePos, pos + 1, pos) == pos)
00007ffb`3a79068d 3bde            cmp     ebx,esi
00007ffb`3a79068f 753a            jne     00007ffb`3a7906cb
00007ffb`3a790691 4c8d5148        lea     r10,[rcx+48h]
00007ffb`3a790695 448d5e01        lea     r11d,[rsi+1]
00007ffb`3a790699 8bc6            mov     eax,esi
00007ffb`3a79069b f0450fb11a      lock cmpxchg dword ptr [r10],r11d
00007ffb`3a7906a0 3bc6            cmp     eax,esi
00007ffb`3a7906a2 7527            jne     00007ffb`3a7906cb

buffer[index].Element = item;
00007ffb`3a7906a4 4963c9          movsxd  rcx,r9d
00007ffb`3a7906a7 48c1e104        shl     rcx,4
00007ffb`3a7906ab 498d4c0810      lea     rcx,[r8+rcx+10h]
00007ffb`3a7906b0 e83b37605f      call    clr+0x3df0 (00007ffb`99d93df0) (JitHelp: CORINFO_HELP_ASSIGN_REF)

Volatile.Write(ref buffer[index].Sequence, pos + 1);
00007ffb`3a7906b5 488d4708        lea     rax,[rdi+8]
00007ffb`3a7906b9 8d5601          lea     edx,[rsi+1]
00007ffb`3a7906bc 8910            mov     dword ptr [rax],edx

return true;
00007ffb`3a7906be b801000000      mov     eax,1

src\MPMCQueue.NET\MPMCQueue.cs @ 54: (return false;)
00007ffb`3a7906c3 4883c420        add     rsp,20h
00007ffb`3a7906c7 5b              pop     rbx
00007ffb`3a7906c8 5e              pop     rsi
00007ffb`3a7906c9 5f              pop     rdi
00007ffb`3a7906ca c3              ret

if (cell.Sequence < pos)
00007ffb`3a7906cb 3bde            cmp     ebx,esi
00007ffb`3a7906cd 7d98            jge     00007ffb`3a790667

src\MPMCQueue.NET\MPMCQueue.cs @ 54: (return false;)
00007ffb`3a7906cf 33c0            xor     eax,eax
00007ffb`3a7906d1 4883c420        add     rsp,20h
00007ffb`3a7906d5 5b              pop     rbx
00007ffb`3a7906d6 5e              pop     rsi
00007ffb`3a7906d7 5f              pop     rdi
00007ffb`3a7906d8 c3              ret

src\MPMCQueue.NET\MPMCQueue.cs @ 41: 
00007ffb`3a7906d9 e8e21caa5f      call    clr!TranslateSecurityAttributes+0x88050 (00007ffb`9a2323c0) (JitHelp: CORINFO_HELP_RNGCHKFAIL)
00007ffb`3a7906de cc              int     3

MPMCQueue.NET.MPMCQueue.TryDequeue(System.Object ByRef)

var buffer = _buffer;
>>> 
00007ffb`3a790700 4157            push    r15
00007ffb`3a790702 4156            push    r14
00007ffb`3a790704 4154            push    r12
00007ffb`3a790706 57              push    rdi
00007ffb`3a790707 56              push    rsi
00007ffb`3a790708 55              push    rbp
00007ffb`3a790709 53              push    rbx
00007ffb`3a79070a 4883ec20        sub     rsp,20h
00007ffb`3a79070e 4c8bc2          mov     r8,rdx
00007ffb`3a790711 488b7108        mov     rsi,qword ptr [rcx+8]

var bufferMask = _bufferMask;
00007ffb`3a790715 8b7910          mov     edi,dword ptr [rcx+10h]

var pos = _dequeuePos;
00007ffb`3a790718 8b9988000000    mov     ebx,dword ptr [rcx+88h]

var index = pos & bufferMask;
00007ffb`3a79071e 8beb            mov     ebp,ebx
00007ffb`3a790720 23ef            and     ebp,edi

var cell = buffer[index];
00007ffb`3a790722 3b6e08          cmp     ebp,dword ptr [rsi+8]
00007ffb`3a790725 0f8384000000    jae     00007ffb`3a7907af
00007ffb`3a79072b 4863c5          movsxd  rax,ebp
00007ffb`3a79072e 48c1e004        shl     rax,4
00007ffb`3a790732 4c8d740610      lea     r14,[rsi+rax+10h]
00007ffb`3a790737 498bc6          mov     rax,r14
00007ffb`3a79073a 488b10          mov     rdx,qword ptr [rax]
00007ffb`3a79073d 448b7808        mov     r15d,dword ptr [rax+8]

if (cell.Sequence == pos + 1 && Interlocked.CompareExchange(ref _dequeuePos, pos + 1, pos) == pos)
00007ffb`3a790741 448d6301        lea     r12d,[rbx+1]
00007ffb`3a790745 453bfc          cmp     r15d,r12d
00007ffb`3a790748 7546            jne     00007ffb`3a790790
00007ffb`3a79074a 4c8d8988000000  lea     r9,[rcx+88h]
00007ffb`3a790751 8bc3            mov     eax,ebx
00007ffb`3a790753 f0450fb121      lock cmpxchg dword ptr [r9],r12d
00007ffb`3a790758 3bc3            cmp     eax,ebx
00007ffb`3a79075a 7534            jne     00007ffb`3a790790

result = cell.Element;
00007ffb`3a79075c 498bc8          mov     rcx,r8
00007ffb`3a79075f e85c36605f      call    clr+0x3dc0 (00007ffb`99d93dc0) (JitHelp: CORINFO_HELP_CHECKED_ASSIGN_REF)

buffer[index].Element = null;
00007ffb`3a790764 4863c5          movsxd  rax,ebp
00007ffb`3a790767 48c1e004        shl     rax,4
00007ffb`3a79076b 33d2            xor     edx,edx
00007ffb`3a79076d 4889540610      mov     qword ptr [rsi+rax+10h],rdx

Volatile.Write(ref buffer[index].Sequence, pos + bufferMask + 1);
00007ffb`3a790772 498d4608        lea     rax,[r14+8]
00007ffb`3a790776 03df            add     ebx,edi
00007ffb`3a790778 ffc3            inc     ebx
00007ffb`3a79077a 8918            mov     dword ptr [rax],ebx

return true;
00007ffb`3a79077c b801000000      mov     eax,1

src\MPMCQueue.NET\MPMCQueue.cs @ 79: (return false;)
00007ffb`3a790781 4883c420        add     rsp,20h
00007ffb`3a790785 5b              pop     rbx
00007ffb`3a790786 5d              pop     rbp
00007ffb`3a790787 5e              pop     rsi
00007ffb`3a790788 5f              pop     rdi
00007ffb`3a790789 415c            pop     r12
00007ffb`3a79078b 415e            pop     r14
00007ffb`3a79078d 415f            pop     r15
00007ffb`3a79078f c3              ret

if (cell.Sequence < pos + 1)
00007ffb`3a790790 453bfc          cmp     r15d,r12d
00007ffb`3a790793 0f8d78ffffff    jge     00007ffb`3a790711

result = default(object);
00007ffb`3a790799 33c0            xor     eax,eax
00007ffb`3a79079b 498900          mov     qword ptr [r8],rax

src\MPMCQueue.NET\MPMCQueue.cs @ 79: (return false;)
00007ffb`3a79079e 33c0            xor     eax,eax
00007ffb`3a7907a0 4883c420        add     rsp,20h
00007ffb`3a7907a4 5b              pop     rbx
00007ffb`3a7907a5 5d              pop     rbp
00007ffb`3a7907a6 5e              pop     rsi
00007ffb`3a7907a7 5f              pop     rdi
00007ffb`3a7907a8 415c            pop     r12
00007ffb`3a7907aa 415e            pop     r14
00007ffb`3a7907ac 415f            pop     r15
00007ffb`3a7907ae c3              ret

src\MPMCQueue.NET\MPMCQueue.cs @ 63:
00007ffb`3a7907af e80c1caa5f      call    clr!TranslateSecurityAttributes+0x88050 (00007ffb`9a2323c0) (JitHelp: CORINFO_HELP_RNGCHKFAIL)
00007ffb`3a7907b4 cc              int     3