rtengine/src/runtime/aio.c
Kevin Trogant 0c89d63716 load runtime asset dependency data
removed init/shutdown functions from headers.
2023-12-21 00:19:48 +01:00

280 lines
9.1 KiB
C

#include "aio.h"
#include "threading.h"
#include "config.h"
#ifdef _WIN32
#define WIN32_LEAN_AND_MEAN
#include <Windows.h>
#elif defined(__linux__)
#include <sched.h>
#endif
#include <assert.h>
#include <stdlib.h>
/* Maintain a ringbuffer of pending operations */
typedef struct {
#ifdef _WIN32
HANDLE file_handle;
OVERLAPPED overlapped;
#endif
volatile vy_aio_state state;
} vy_aio;
typedef struct {
vy_mutex *guard;
vy_aio *storage;
uint32_t capacity;
uint32_t head;
uint32_t tail;
} vy_aio_ringbuffer;
typedef struct {
vy_aio *a;
vy_aio *b;
uint32_t a_count;
} vy_ringbuffer_space;
static vy_aio_ringbuffer _ringbuffer;
static vy_ringbuffer_space ReserveRingbufferSpace(uint32_t count) {
if (!vyLockMutex(_ringbuffer.guard)) {
vy_ringbuffer_space failed = {NULL, NULL, 0};
return failed;
}
vy_ringbuffer_space result = {NULL, NULL, 0};
if (_ringbuffer.head >= _ringbuffer.tail) {
if (_ringbuffer.head + count <= _ringbuffer.capacity) {
result.a_count = count;
result.a = &_ringbuffer.storage[_ringbuffer.head];
_ringbuffer.head = (_ringbuffer.head + count) % _ringbuffer.capacity;
} else {
/* Check if enough space is free at the end */
uint32_t a_count = _ringbuffer.capacity - _ringbuffer.head;
uint32_t b_count = count - a_count;
if (b_count <= _ringbuffer.tail) {
result.a_count = a_count;
result.a = &_ringbuffer.storage[_ringbuffer.head];
result.b = &_ringbuffer.storage[0];
_ringbuffer.head = b_count;
} else {
/* Not enough space, we would overwrite the tail */
vyLog("aio", "Ringbuffer is full.");
}
}
} else {
/* Head is lower than tail */
uint32_t num_free = _ringbuffer.tail - _ringbuffer.head;
if (count < num_free) {
result.a_count = count;
result.a = &_ringbuffer.storage[_ringbuffer.head];
_ringbuffer.head = (_ringbuffer.head + count) % _ringbuffer.capacity;
} else {
/* Not enough space, we would overwrite the tail */
vyLog("aio", "Ringbuffer is full.");
}
}
vyUnlockMutex(_ringbuffer.guard);
return result;
}
#ifdef _WIN32
static void
win32CompletionRoutine(DWORD error_code, DWORD num_bytes_transfered, LPOVERLAPPED overlapped) {
vy_aio *op = (vy_aio *)overlapped->hEvent;
assert(op->state == VY_AIO_STATE_PENDING);
if (error_code != ERROR_SUCCESS) {
op->state = VY_AIO_STATE_FAILED;
vyLog("aio", "Async io failed: %u", error_code);
} else {
op->state = VY_AIO_STATE_FINISHED;
}
CloseHandle(op->file_handle);
}
#endif
VY_CVAR_I(rt_MaxConcurrentAsyncIO,
"Maximum number of concurrent async. I/O operations. Default: 1024",
1024);
vy_result InitAIO(void) {
unsigned int max_concurrent_operations = rt_MaxConcurrentAsyncIO.i;
_ringbuffer.guard = vyCreateMutex();
if (!_ringbuffer.guard) {
return VY_AIO_OUT_OF_MEMORY;
}
if (max_concurrent_operations == 0)
max_concurrent_operations = 1024;
_ringbuffer.storage = calloc(max_concurrent_operations, sizeof(vy_aio));
if (!_ringbuffer.storage)
return VY_AIO_OUT_OF_MEMORY;
_ringbuffer.head = 0;
_ringbuffer.tail = 0;
_ringbuffer.capacity = max_concurrent_operations;
return VY_SUCCESS;
}
void ShutdownAIO(void) {
vyDestroyMutex(_ringbuffer.guard);
free(_ringbuffer.storage);
_ringbuffer.capacity = 0;
}
VY_DLLEXPORT vy_result vySubmitLoadBatch(const vy_load_batch *batch, vy_aio_handle *handles) {
if (batch->num_loads > VY_LOAD_BATCH_MAX_SIZE) {
return VY_AIO_LOAD_TOO_LARGE;
}
vy_ringbuffer_space rbspace = ReserveRingbufferSpace(batch->num_loads);
if (!rbspace.a) {
vyReportError("aio", "Too many pending file operations");
return VY_AIO_TOO_MANY_OPERATIONS;
}
for (unsigned int i = 0; i < batch->num_loads; ++i) {
vy_aio *op = (i < rbspace.a_count) ? &rbspace.a[i] : &rbspace.b[i - rbspace.a_count];
op->state = VY_AIO_STATE_PENDING;
const char *file_path = vyGetFilePath(batch->loads[i].file);
if (!file_path) {
vyReportError("aio", "Failed to resolve file path for a batched load");
op->state = VY_AIO_STATE_INVALID;
handles[i] = VY_AIO_INVALID_HANDLE;
continue;
}
#ifdef _WIN32
op->overlapped = (OVERLAPPED){
/* ReadFileEx does not use hEvent and we are free to use it for our own purposes. */
.hEvent = (HANDLE)(op),
.Internal = 0,
.InternalHigh = 0,
.Offset = (DWORD)(batch->loads[i].offset & MAXDWORD),
.OffsetHigh = (DWORD)(batch->loads[i].offset >> 32),
};
WCHAR wpath[MAX_PATH];
if (MultiByteToWideChar(CP_UTF8,
MB_PRECOMPOSED,
file_path,
-1,
wpath,
VY_ARRAY_COUNT(wpath)) == 0) {
vyReportError("aio", "MultiByteToWideChar failed with error code: %u", GetLastError());
op->state = VY_AIO_STATE_FINISHED;
handles[i] = VY_AIO_INVALID_HANDLE;
continue;
}
HANDLE file_handle = CreateFileW(wpath,
GENERIC_READ,
FILE_SHARE_READ,
NULL,
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED,
NULL);
if (file_handle == INVALID_HANDLE_VALUE) {
vyReportError("aio",
"CreateFileW failed for file: %s with error code: %u",
file_path,
GetLastError());
op->state = VY_AIO_STATE_INVALID;
handles[i] = VY_AIO_INVALID_HANDLE;
continue;
}
op->file_handle = file_handle;
BOOL result = ReadFileEx(file_handle,
batch->loads[i].dest,
(DWORD)batch->loads[i].num_bytes,
&op->overlapped,
win32CompletionRoutine);
DWORD err = GetLastError();
if (!result || err != ERROR_SUCCESS) {
vyReportError("aio", "ReadFileEx failed with error code: %u", err);
op->state = VY_AIO_STATE_FINISHED;
handles[i] = VY_AIO_INVALID_HANDLE;
CloseHandle(file_handle);
op->file_handle = NULL;
}
/* Handle is the index into the ringbuffer + 1 */
ptrdiff_t op_idx = op - _ringbuffer.storage;
handles[i] = (uint32_t)op_idx + 1;
#endif
}
return VY_SUCCESS;
}
VY_DLLEXPORT volatile vy_aio_state vyGetAIOState(vy_aio_handle handle) {
if (handle == VY_AIO_INVALID_HANDLE || handle > _ringbuffer.capacity)
return VY_AIO_STATE_INVALID;
#ifdef _WIN32
/* Give the compation function an opportunity to run */
SleepEx(0, TRUE);
#endif
vyLockMutex(_ringbuffer.guard);
vy_aio_state state = _ringbuffer.storage[handle - 1].state;
vyUnlockMutex(_ringbuffer.guard);
return state;
}
VY_DLLEXPORT void vyReleaseAIO(vy_aio_handle handle) {
if (handle == VY_AIO_INVALID_HANDLE || handle > _ringbuffer.capacity) {
return;
}
vyLockMutex(_ringbuffer.guard);
_ringbuffer.storage[handle - 1].state = VY_AIO_STATE_INVALID;
if (handle - 1 == _ringbuffer.tail) {
/* Advance the tail such that it points to the last used slot. (Or to head, if the
* ringbuffer is now empty) */
uint32_t i = _ringbuffer.tail;
while ((_ringbuffer.storage[i].state == VY_AIO_STATE_INVALID) && i != _ringbuffer.head) {
i = (i + 1) % _ringbuffer.capacity;
}
_ringbuffer.tail = i;
}
vyUnlockMutex(_ringbuffer.guard);
}
VY_DLLEXPORT vy_aio_state vyWaitForAIOCompletion(vy_aio_handle handle) {
if (handle == VY_AIO_INVALID_HANDLE || handle > _ringbuffer.capacity)
return VY_AIO_STATE_INVALID;
vy_aio_state state;
do {
state = vyGetAIOState(handle);
/* NOTE(Kevin): This is where we could temporarily run a job. */
#ifdef _WIN32
YieldProcessor();
#elif defined(__linux__)
sched_yield();
#endif
} while (state == VY_AIO_STATE_PENDING);
return state;
}
VY_DLLEXPORT vy_result vySubmitSingleLoad(vy_file_load load, vy_aio_handle *handle) {
vy_load_batch batch;
batch.loads[0] = load;
batch.num_loads = 1;
return vySubmitLoadBatch(&batch, handle);
}
VY_DLLEXPORT vy_aio_state vySubmitSingleLoadSync(vy_file_load load) {
vy_aio_handle handle;
if (vySubmitSingleLoad(load, &handle) != VY_SUCCESS)
return VY_AIO_STATE_FAILED;
vy_aio_state state = vyWaitForAIOCompletion(handle);
vyReleaseAIO(handle);
return state;
}