From cc7ef60be27fa27549760d8b0519bd82d4eeb4d0 Mon Sep 17 00:00:00 2001 From: Kevin Trogant Date: Mon, 9 Mar 2026 15:40:35 +0100 Subject: [PATCH] new lib: rtmpmc.h --- rtmpmc.h | 190 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 190 insertions(+) create mode 100644 rtmpmc.h diff --git a/rtmpmc.h b/rtmpmc.h new file mode 100644 index 0000000..c106b83 --- /dev/null +++ b/rtmpmc.h @@ -0,0 +1,190 @@ +#ifndef RT_MPMC_QUEUE_H +#define RT_MPMC_QUEUE_H + +/* + * rtmpmc.h - Lockfree, bounded multi producer multi consumer queue for C + * Copyright (C) 2026 Kevin Trogant + * + * Lockfree, bounded multi producer multi consumer queue. + * Based on the C++11 queue by Erik Rigtorp + * https://github.com/rigtorp/MPMCQueue + * + * This version only supports u32 values, because that is + * the use-case I currently have. It would be possible + * (via macro-magic or by passing element-sizes) to adapt + * this to other types. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include "rtcore.h" + +#ifndef RTMPMC_API + #define RTMPMC_API RTC_API +#endif + +typedef struct +{ + u32 turn; + u32 value; + char _pad[64 - 2 * sizeof(u32)]; +} mpmc_slot; + +typedef struct +{ + mpmc_slot *slots; + u32 capacity; + u32 mod; + char _pad0[64 - 2 * sizeof(u32) - sizeof(mpmc_slot *)]; + u32 head; + char _pad1[64 - sizeof(u32)]; + u32 tail; + char _pad2[64 - sizeof(u32)]; +} mpmc_queue; + +/* Construct a new queue on the given arena. + * Capacity must be a power of two */ +RTMPMC_API mpmc_queue NewMPMCQueue(u32 capacity, arena *a); + +/* Push a new element onto the queue. Blocks if the queue is full */ +RTMPMC_API void MPMCPush(mpmc_queue *q, u32 v); + +/* Pop an element from the queue. Blocks if the queue is empty */ +RTMPMC_API u32 MPMCPop(mpmc_queue *q); + +/* Tries to push an element onto the queue. Returns `false` if the queue is full, + * `true` otherwise. */ +RTMPMC_API b32 MPMCTryPush(mpmc_queue *q, u32 v); + +/* Tries to pop an element from the queue. Returns `false` if the queue is empty, + * `true` otherwise. */ +RTMPMC_API b32 MPMCTryPop(mpmc_queue *q, u32 *elem); + +#endif + +#ifdef RT_MPMC_IMPLEMENTATION + +#define IDX(_n, _q) ((_n) & ((_q)->mod)) +#define TURN(_n, _q) ((u32)((_n) / (_q)->capacity)) + +RTMPMC_API mpmc_queue +NewMPMCQueue(u32 capacity, arena *a) +{ + mpmc_queue q = {0}; + q.capacity = capacity; + q.mod = capacity - 1; + q.slots = alloc(a, mpmc_slot, capacity + 1); + return q; +} + +RTMPMC_API void +MPMCPush(mpmc_queue *q, u32 v) +{ + u32 head = AtomicFetchAdd32(&q->head, 1); + mpmc_slot *slot = &q->slots[IDX(head, q)]; + /* Wait until it is our turn (i.e. previous reads/writes to this slot have finished) */ + while (TURN(head, q) * 2 != AtomicLoadAcquire(&slot->turn)) + _mm_pause(); + slot->value = v; + AtomicStoreRelease(&slot->turn, TURN(head, q) * 2 + 1); +} + +RTMPMC_API u32 +MPMCPop(mpmc_queue *q) +{ + u32 tail = AtomicFetchAdd32(&q->tail, 1); + mpmc_slot *slot = &q->slots[IDX(tail, q)]; + /* Wait until it is our turn (i.e. previous reads/writes to this slot have finished) */ + while (TURN(tail, q) * 2 + 1 != AtomicLoadAcquire(&slot->turn)) + _mm_pause(); + u32 v = slot->value; + AtomicStoreRelease(&slot->turn, TURN(tail, q) * 2 + 2); + return v; +} + +RTMPMC_API b32 +MPMCTryPush(mpmc_queue *q, u32 v) +{ + u32 head = AtomicLoadAcquire(&q->head); + while (1) + { + mpmc_slot *slot = &q->slots[IDX(head, q)]; + if (TURN(head, q) * 2 == AtomicLoadAcquire(&slot->turn)) + { + /* It would be our turn to get this element. + * Check if our head value fetched above is still valid */ + if (AtomicCompareExchange32(&q->head, head, head + 1)) + { + slot->value = v; + AtomicStoreRelease(&slot->turn, TURN(head, q) * 2 + 1); + return 1; + } + } + else + { + /* No. We need to get the new head. */ + u32 prev_head = head; + head = AtomicLoadAcquire(&q->head); + if (prev_head == head) + { + /* Actually, the queue is full */ + return 0; + } + } + } +} + +/* Tries to pop an element from the queue. Returns `false` if the queue is empty, + * `true` otherwise. */ +RTMPMC_API b32 +MPMCTryPop(mpmc_queue *q, u32 *elem) +{ + u32 tail = AtomicLoadAcquire(&q->tail); + while (1) + { + mpmc_slot *slot = &q->slots[IDX(tail, q)]; + if (TURN(tail, q) * 2 + 1 == AtomicLoadAcquire(&slot->turn)) + { + /* It would be our turn to get this element. + * Check if the tail value fetched above is still valid */ + if (AtomicCompareExchange32(&q->tail, tail, tail + 1)) + { + *elem = slot->value; + AtomicStoreRelease(&slot->turn, TURN(tail, q) * 2 + 2); + return 1; + } + } + else + { + /* No, we need to get the new tail. */ + u32 prev_tail = tail; + tail = AtomicLoadAcquire(&q->tail); + if (prev_tail == tail) + { + /* Actually, the queue is full */ + return 0; + } + } + } +} + +#undef IDX +#undef TURN + +#endif \ No newline at end of file