new lib: rtmpmc.h
This commit is contained in:
parent
6c6cd118c3
commit
cc7ef60be2
190
rtmpmc.h
Normal file
190
rtmpmc.h
Normal file
@ -0,0 +1,190 @@
|
||||
#ifndef RT_MPMC_QUEUE_H
|
||||
#define RT_MPMC_QUEUE_H
|
||||
|
||||
/*
|
||||
* rtmpmc.h - Lockfree, bounded multi producer multi consumer queue for C
|
||||
* Copyright (C) 2026 Kevin Trogant
|
||||
*
|
||||
* Lockfree, bounded multi producer multi consumer queue.
|
||||
* Based on the C++11 queue by Erik Rigtorp <erik@rigtop.se>
|
||||
* https://github.com/rigtorp/MPMCQueue
|
||||
*
|
||||
* This version only supports u32 values, because that is
|
||||
* the use-case I currently have. It would be possible
|
||||
* (via macro-magic or by passing element-sizes) to adapt
|
||||
* this to other types.
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
* of this software and associated documentation files (the "Software"), to deal
|
||||
* in the Software without restriction, including without limitation the rights
|
||||
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
* copies of the Software, and to permit persons to whom the Software is
|
||||
* furnished to do so, subject to the following conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be included in
|
||||
* all copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
* SOFTWARE.
|
||||
*/
|
||||
|
||||
#include "rtcore.h"
|
||||
|
||||
#ifndef RTMPMC_API
|
||||
#define RTMPMC_API RTC_API
|
||||
#endif
|
||||
|
||||
typedef struct
|
||||
{
|
||||
u32 turn;
|
||||
u32 value;
|
||||
char _pad[64 - 2 * sizeof(u32)];
|
||||
} mpmc_slot;
|
||||
|
||||
typedef struct
|
||||
{
|
||||
mpmc_slot *slots;
|
||||
u32 capacity;
|
||||
u32 mod;
|
||||
char _pad0[64 - 2 * sizeof(u32) - sizeof(mpmc_slot *)];
|
||||
u32 head;
|
||||
char _pad1[64 - sizeof(u32)];
|
||||
u32 tail;
|
||||
char _pad2[64 - sizeof(u32)];
|
||||
} mpmc_queue;
|
||||
|
||||
/* Construct a new queue on the given arena.
|
||||
* Capacity must be a power of two */
|
||||
RTMPMC_API mpmc_queue NewMPMCQueue(u32 capacity, arena *a);
|
||||
|
||||
/* Push a new element onto the queue. Blocks if the queue is full */
|
||||
RTMPMC_API void MPMCPush(mpmc_queue *q, u32 v);
|
||||
|
||||
/* Pop an element from the queue. Blocks if the queue is empty */
|
||||
RTMPMC_API u32 MPMCPop(mpmc_queue *q);
|
||||
|
||||
/* Tries to push an element onto the queue. Returns `false` if the queue is full,
|
||||
* `true` otherwise. */
|
||||
RTMPMC_API b32 MPMCTryPush(mpmc_queue *q, u32 v);
|
||||
|
||||
/* Tries to pop an element from the queue. Returns `false` if the queue is empty,
|
||||
* `true` otherwise. */
|
||||
RTMPMC_API b32 MPMCTryPop(mpmc_queue *q, u32 *elem);
|
||||
|
||||
#endif
|
||||
|
||||
#ifdef RT_MPMC_IMPLEMENTATION
|
||||
|
||||
#define IDX(_n, _q) ((_n) & ((_q)->mod))
|
||||
#define TURN(_n, _q) ((u32)((_n) / (_q)->capacity))
|
||||
|
||||
RTMPMC_API mpmc_queue
|
||||
NewMPMCQueue(u32 capacity, arena *a)
|
||||
{
|
||||
mpmc_queue q = {0};
|
||||
q.capacity = capacity;
|
||||
q.mod = capacity - 1;
|
||||
q.slots = alloc(a, mpmc_slot, capacity + 1);
|
||||
return q;
|
||||
}
|
||||
|
||||
RTMPMC_API void
|
||||
MPMCPush(mpmc_queue *q, u32 v)
|
||||
{
|
||||
u32 head = AtomicFetchAdd32(&q->head, 1);
|
||||
mpmc_slot *slot = &q->slots[IDX(head, q)];
|
||||
/* Wait until it is our turn (i.e. previous reads/writes to this slot have finished) */
|
||||
while (TURN(head, q) * 2 != AtomicLoadAcquire(&slot->turn))
|
||||
_mm_pause();
|
||||
slot->value = v;
|
||||
AtomicStoreRelease(&slot->turn, TURN(head, q) * 2 + 1);
|
||||
}
|
||||
|
||||
RTMPMC_API u32
|
||||
MPMCPop(mpmc_queue *q)
|
||||
{
|
||||
u32 tail = AtomicFetchAdd32(&q->tail, 1);
|
||||
mpmc_slot *slot = &q->slots[IDX(tail, q)];
|
||||
/* Wait until it is our turn (i.e. previous reads/writes to this slot have finished) */
|
||||
while (TURN(tail, q) * 2 + 1 != AtomicLoadAcquire(&slot->turn))
|
||||
_mm_pause();
|
||||
u32 v = slot->value;
|
||||
AtomicStoreRelease(&slot->turn, TURN(tail, q) * 2 + 2);
|
||||
return v;
|
||||
}
|
||||
|
||||
RTMPMC_API b32
|
||||
MPMCTryPush(mpmc_queue *q, u32 v)
|
||||
{
|
||||
u32 head = AtomicLoadAcquire(&q->head);
|
||||
while (1)
|
||||
{
|
||||
mpmc_slot *slot = &q->slots[IDX(head, q)];
|
||||
if (TURN(head, q) * 2 == AtomicLoadAcquire(&slot->turn))
|
||||
{
|
||||
/* It would be our turn to get this element.
|
||||
* Check if our head value fetched above is still valid */
|
||||
if (AtomicCompareExchange32(&q->head, head, head + 1))
|
||||
{
|
||||
slot->value = v;
|
||||
AtomicStoreRelease(&slot->turn, TURN(head, q) * 2 + 1);
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/* No. We need to get the new head. */
|
||||
u32 prev_head = head;
|
||||
head = AtomicLoadAcquire(&q->head);
|
||||
if (prev_head == head)
|
||||
{
|
||||
/* Actually, the queue is full */
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Tries to pop an element from the queue. Returns `false` if the queue is empty,
|
||||
* `true` otherwise. */
|
||||
RTMPMC_API b32
|
||||
MPMCTryPop(mpmc_queue *q, u32 *elem)
|
||||
{
|
||||
u32 tail = AtomicLoadAcquire(&q->tail);
|
||||
while (1)
|
||||
{
|
||||
mpmc_slot *slot = &q->slots[IDX(tail, q)];
|
||||
if (TURN(tail, q) * 2 + 1 == AtomicLoadAcquire(&slot->turn))
|
||||
{
|
||||
/* It would be our turn to get this element.
|
||||
* Check if the tail value fetched above is still valid */
|
||||
if (AtomicCompareExchange32(&q->tail, tail, tail + 1))
|
||||
{
|
||||
*elem = slot->value;
|
||||
AtomicStoreRelease(&slot->turn, TURN(tail, q) * 2 + 2);
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/* No, we need to get the new tail. */
|
||||
u32 prev_tail = tail;
|
||||
tail = AtomicLoadAcquire(&q->tail);
|
||||
if (prev_tail == tail)
|
||||
{
|
||||
/* Actually, the queue is full */
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#undef IDX
|
||||
#undef TURN
|
||||
|
||||
#endif
|
||||
Loading…
Reference in New Issue
Block a user