#ifndef RT_MPMC_QUEUE_H #define RT_MPMC_QUEUE_H /* * rtmpmc.h - Lockfree, bounded multi producer multi consumer queue for C * Copyright (C) 2026 Kevin Trogant * * Lockfree, bounded multi producer multi consumer queue. * Based on the C++11 queue by Erik Rigtorp * https://github.com/rigtorp/MPMCQueue * * This version only supports u32 values, because that is * the use-case I currently have. It would be possible * (via macro-magic or by passing element-sizes) to adapt * this to other types. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ #include "rtcore.h" #ifndef RTMPMC_API #define RTMPMC_API RTC_API #endif typedef struct { u32 turn; u32 value; char _pad[64 - 2 * sizeof(u32)]; } mpmc_slot; typedef struct { mpmc_slot *slots; u32 capacity; u32 mod; char _pad0[64 - 2 * sizeof(u32) - sizeof(mpmc_slot *)]; u32 head; char _pad1[64 - sizeof(u32)]; u32 tail; char _pad2[64 - sizeof(u32)]; } mpmc_queue; /* Construct a new queue on the given arena. * Capacity must be a power of two */ RTMPMC_API mpmc_queue NewMPMCQueue(u32 capacity, arena *a); /* Push a new element onto the queue. Blocks if the queue is full */ RTMPMC_API void MPMCPush(mpmc_queue *q, u32 v); /* Pop an element from the queue. Blocks if the queue is empty */ RTMPMC_API u32 MPMCPop(mpmc_queue *q); /* Tries to push an element onto the queue. Returns `false` if the queue is full, * `true` otherwise. */ RTMPMC_API b32 MPMCTryPush(mpmc_queue *q, u32 v); /* Tries to pop an element from the queue. Returns `false` if the queue is empty, * `true` otherwise. */ RTMPMC_API b32 MPMCTryPop(mpmc_queue *q, u32 *elem); #endif #ifdef RT_MPMC_IMPLEMENTATION #define IDX(_n, _q) ((_n) & ((_q)->mod)) #define TURN(_n, _q) ((u32)((_n) / (_q)->capacity)) RTMPMC_API mpmc_queue NewMPMCQueue(u32 capacity, arena *a) { mpmc_queue q = {0}; q.capacity = capacity; q.mod = capacity - 1; q.slots = alloc(a, mpmc_slot, capacity + 1); return q; } RTMPMC_API void MPMCPush(mpmc_queue *q, u32 v) { u32 head = AtomicFetchAdd32(&q->head, 1); mpmc_slot *slot = &q->slots[IDX(head, q)]; /* Wait until it is our turn (i.e. previous reads/writes to this slot have finished) */ while (TURN(head, q) * 2 != AtomicLoadAcquire(&slot->turn)) _mm_pause(); slot->value = v; AtomicStoreRelease(&slot->turn, TURN(head, q) * 2 + 1); } RTMPMC_API u32 MPMCPop(mpmc_queue *q) { u32 tail = AtomicFetchAdd32(&q->tail, 1); mpmc_slot *slot = &q->slots[IDX(tail, q)]; /* Wait until it is our turn (i.e. previous reads/writes to this slot have finished) */ while (TURN(tail, q) * 2 + 1 != AtomicLoadAcquire(&slot->turn)) _mm_pause(); u32 v = slot->value; AtomicStoreRelease(&slot->turn, TURN(tail, q) * 2 + 2); return v; } RTMPMC_API b32 MPMCTryPush(mpmc_queue *q, u32 v) { u32 head = AtomicLoadAcquire(&q->head); while (1) { mpmc_slot *slot = &q->slots[IDX(head, q)]; if (TURN(head, q) * 2 == AtomicLoadAcquire(&slot->turn)) { /* It would be our turn to get this element. * Check if our head value fetched above is still valid */ if (AtomicCompareExchange32(&q->head, head, head + 1)) { slot->value = v; AtomicStoreRelease(&slot->turn, TURN(head, q) * 2 + 1); return 1; } } else { /* No. We need to get the new head. */ u32 prev_head = head; head = AtomicLoadAcquire(&q->head); if (prev_head == head) { /* Actually, the queue is full */ return 0; } } } } /* Tries to pop an element from the queue. Returns `false` if the queue is empty, * `true` otherwise. */ RTMPMC_API b32 MPMCTryPop(mpmc_queue *q, u32 *elem) { u32 tail = AtomicLoadAcquire(&q->tail); while (1) { mpmc_slot *slot = &q->slots[IDX(tail, q)]; if (TURN(tail, q) * 2 + 1 == AtomicLoadAcquire(&slot->turn)) { /* It would be our turn to get this element. * Check if the tail value fetched above is still valid */ if (AtomicCompareExchange32(&q->tail, tail, tail + 1)) { *elem = slot->value; AtomicStoreRelease(&slot->turn, TURN(tail, q) * 2 + 2); return 1; } } else { /* No, we need to get the new tail. */ u32 prev_tail = tail; tail = AtomicLoadAcquire(&q->tail); if (prev_tail == tail) { /* Actually, the queue is full */ return 0; } } } } #undef IDX #undef TURN #endif