rtcore/rtmpmc.h
2026-03-09 15:40:35 +01:00

190 lines
5.2 KiB
C

#ifndef RT_MPMC_QUEUE_H
#define RT_MPMC_QUEUE_H
/*
* rtmpmc.h - Lockfree, bounded multi producer multi consumer queue for C
* Copyright (C) 2026 Kevin Trogant
*
* Lockfree, bounded multi producer multi consumer queue.
* Based on the C++11 queue by Erik Rigtorp <erik@rigtop.se>
* https://github.com/rigtorp/MPMCQueue
*
* This version only supports u32 values, because that is
* the use-case I currently have. It would be possible
* (via macro-magic or by passing element-sizes) to adapt
* this to other types.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#include "rtcore.h"
#ifndef RTMPMC_API
#define RTMPMC_API RTC_API
#endif
typedef struct
{
u32 turn;
u32 value;
char _pad[64 - 2 * sizeof(u32)];
} mpmc_slot;
typedef struct
{
mpmc_slot *slots;
u32 capacity;
u32 mod;
char _pad0[64 - 2 * sizeof(u32) - sizeof(mpmc_slot *)];
u32 head;
char _pad1[64 - sizeof(u32)];
u32 tail;
char _pad2[64 - sizeof(u32)];
} mpmc_queue;
/* Construct a new queue on the given arena.
* Capacity must be a power of two */
RTMPMC_API mpmc_queue NewMPMCQueue(u32 capacity, arena *a);
/* Push a new element onto the queue. Blocks if the queue is full */
RTMPMC_API void MPMCPush(mpmc_queue *q, u32 v);
/* Pop an element from the queue. Blocks if the queue is empty */
RTMPMC_API u32 MPMCPop(mpmc_queue *q);
/* Tries to push an element onto the queue. Returns `false` if the queue is full,
* `true` otherwise. */
RTMPMC_API b32 MPMCTryPush(mpmc_queue *q, u32 v);
/* Tries to pop an element from the queue. Returns `false` if the queue is empty,
* `true` otherwise. */
RTMPMC_API b32 MPMCTryPop(mpmc_queue *q, u32 *elem);
#endif
#ifdef RT_MPMC_IMPLEMENTATION
#define IDX(_n, _q) ((_n) & ((_q)->mod))
#define TURN(_n, _q) ((u32)((_n) / (_q)->capacity))
RTMPMC_API mpmc_queue
NewMPMCQueue(u32 capacity, arena *a)
{
mpmc_queue q = {0};
q.capacity = capacity;
q.mod = capacity - 1;
q.slots = alloc(a, mpmc_slot, capacity + 1);
return q;
}
RTMPMC_API void
MPMCPush(mpmc_queue *q, u32 v)
{
u32 head = AtomicFetchAdd32(&q->head, 1);
mpmc_slot *slot = &q->slots[IDX(head, q)];
/* Wait until it is our turn (i.e. previous reads/writes to this slot have finished) */
while (TURN(head, q) * 2 != AtomicLoadAcquire(&slot->turn))
_mm_pause();
slot->value = v;
AtomicStoreRelease(&slot->turn, TURN(head, q) * 2 + 1);
}
RTMPMC_API u32
MPMCPop(mpmc_queue *q)
{
u32 tail = AtomicFetchAdd32(&q->tail, 1);
mpmc_slot *slot = &q->slots[IDX(tail, q)];
/* Wait until it is our turn (i.e. previous reads/writes to this slot have finished) */
while (TURN(tail, q) * 2 + 1 != AtomicLoadAcquire(&slot->turn))
_mm_pause();
u32 v = slot->value;
AtomicStoreRelease(&slot->turn, TURN(tail, q) * 2 + 2);
return v;
}
RTMPMC_API b32
MPMCTryPush(mpmc_queue *q, u32 v)
{
u32 head = AtomicLoadAcquire(&q->head);
while (1)
{
mpmc_slot *slot = &q->slots[IDX(head, q)];
if (TURN(head, q) * 2 == AtomicLoadAcquire(&slot->turn))
{
/* It would be our turn to get this element.
* Check if our head value fetched above is still valid */
if (AtomicCompareExchange32(&q->head, head, head + 1))
{
slot->value = v;
AtomicStoreRelease(&slot->turn, TURN(head, q) * 2 + 1);
return 1;
}
}
else
{
/* No. We need to get the new head. */
u32 prev_head = head;
head = AtomicLoadAcquire(&q->head);
if (prev_head == head)
{
/* Actually, the queue is full */
return 0;
}
}
}
}
/* Tries to pop an element from the queue. Returns `false` if the queue is empty,
* `true` otherwise. */
RTMPMC_API b32
MPMCTryPop(mpmc_queue *q, u32 *elem)
{
u32 tail = AtomicLoadAcquire(&q->tail);
while (1)
{
mpmc_slot *slot = &q->slots[IDX(tail, q)];
if (TURN(tail, q) * 2 + 1 == AtomicLoadAcquire(&slot->turn))
{
/* It would be our turn to get this element.
* Check if the tail value fetched above is still valid */
if (AtomicCompareExchange32(&q->tail, tail, tail + 1))
{
*elem = slot->value;
AtomicStoreRelease(&slot->turn, TURN(tail, q) * 2 + 2);
return 1;
}
}
else
{
/* No, we need to get the new tail. */
u32 prev_tail = tail;
tail = AtomicLoadAcquire(&q->tail);
if (prev_tail == tail)
{
/* Actually, the queue is full */
return 0;
}
}
}
}
#undef IDX
#undef TURN
#endif