feat(linux): Ported aio to linux
Some checks failed
Ubuntu Cross to Win64 / Cross Compile with ming64 (1.4.0, ubuntu-latest) (push) Failing after 1m46s

I had to rename the header, because the POSIX async io header has the same name.
This commit is contained in:
Kevin Trogant 2024-07-24 15:10:06 +02:00
parent e371a24761
commit 77f0db7a7c
8 changed files with 519 additions and 390 deletions

View File

@ -1,7 +1,7 @@
#include "asset_compiler.h"
#include "processor.h"
#include "runtime/aio.h"
#include "runtime/rt_aio.h"
#include "runtime/buffer_manager.h"
#include "runtime/config.h"
#include "runtime/file_tab.h"

View File

@ -1,377 +0,0 @@
#include "aio.h"
#include "config.h"
#include "threading.h"
#ifdef _WIN32
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
void Win32ErrorToString(DWORD last_error, char *out, int bufsize);
#elif defined(__linux__)
#include <sched.h>
#endif
#include <assert.h>
#include <stdlib.h>
/* Maintain a ringbuffer of pending operations */
typedef struct {
#ifdef _WIN32
HANDLE file_handle;
OVERLAPPED overlapped;
#endif
volatile rt_aio_state state;
} rt_aio;
typedef struct {
rt_mutex *guard;
rt_aio *storage;
uint32_t capacity;
uint32_t head;
uint32_t tail;
} rt_aio_ringbuffer;
typedef struct {
rt_aio *a;
rt_aio *b;
uint32_t a_count;
} rt_ringbuffer_space;
static rt_aio_ringbuffer _ringbuffer;
static rt_ringbuffer_space ReserveRingbufferSpace(uint32_t count) {
if (!rtLockMutex(_ringbuffer.guard)) {
rt_ringbuffer_space failed = {NULL, NULL, 0};
return failed;
}
rt_ringbuffer_space result = {NULL, NULL, 0};
if (_ringbuffer.head >= _ringbuffer.tail) {
if (_ringbuffer.head + count <= _ringbuffer.capacity) {
result.a_count = count;
result.a = &_ringbuffer.storage[_ringbuffer.head];
_ringbuffer.head = (_ringbuffer.head + count) % _ringbuffer.capacity;
} else {
/* Check if enough space is free at the end */
uint32_t a_count = _ringbuffer.capacity - _ringbuffer.head;
uint32_t b_count = count - a_count;
if (b_count <= _ringbuffer.tail) {
result.a_count = a_count;
result.a = &_ringbuffer.storage[_ringbuffer.head];
result.b = &_ringbuffer.storage[0];
_ringbuffer.head = b_count;
} else {
/* Not enough space, we would overwrite the tail */
rtLog("aio", "Ringbuffer is full.");
}
}
} else {
/* Head is lower than tail */
uint32_t num_free = _ringbuffer.tail - _ringbuffer.head;
if (count < num_free) {
result.a_count = count;
result.a = &_ringbuffer.storage[_ringbuffer.head];
_ringbuffer.head = (_ringbuffer.head + count) % _ringbuffer.capacity;
} else {
/* Not enough space, we would overwrite the tail */
rtLog("aio", "Ringbuffer is full.");
}
}
rtUnlockMutex(_ringbuffer.guard);
return result;
}
#ifdef _WIN32
static void
win32CompletionRoutine(DWORD error_code, DWORD num_bytes_transfered, LPOVERLAPPED overlapped) {
rt_aio *op = (rt_aio *)overlapped->hEvent;
assert(op->state == RT_AIO_STATE_PENDING);
if (error_code != ERROR_SUCCESS) {
op->state = RT_AIO_STATE_FAILED;
rtLog("aio", "Async io failed: %u", error_code);
} else {
op->state = RT_AIO_STATE_FINISHED;
}
CloseHandle(op->file_handle);
}
#endif
RT_CVAR_I(rt_MaxConcurrentAsyncIO,
"Maximum number of concurrent async. I/O operations. Default: 1024",
1024);
rt_result InitAIO(void) {
unsigned int max_concurrent_operations = rt_MaxConcurrentAsyncIO.i;
_ringbuffer.guard = rtCreateMutex();
if (!_ringbuffer.guard) {
return RT_AIO_OUT_OF_MEMORY;
}
if (max_concurrent_operations == 0)
max_concurrent_operations = 1024;
_ringbuffer.storage = calloc(max_concurrent_operations, sizeof(rt_aio));
if (!_ringbuffer.storage)
return RT_AIO_OUT_OF_MEMORY;
_ringbuffer.head = 0;
_ringbuffer.tail = 0;
_ringbuffer.capacity = max_concurrent_operations;
return RT_SUCCESS;
}
void ShutdownAIO(void) {
rtDestroyMutex(_ringbuffer.guard);
free(_ringbuffer.storage);
_ringbuffer.capacity = 0;
}
RT_DLLEXPORT rt_result rtSubmitLoadBatch(const rt_load_batch *batch, rt_aio_handle *handles) {
if (batch->num_loads > RT_LOAD_BATCH_MAX_SIZE) {
return RT_AIO_LOAD_TOO_LARGE;
}
rt_ringbuffer_space rbspace = ReserveRingbufferSpace(batch->num_loads);
if (!rbspace.a) {
rtReportError("aio", "Too many pending file operations");
return RT_AIO_TOO_MANY_OPERATIONS;
}
for (unsigned int i = 0; i < batch->num_loads; ++i) {
rt_aio *op = (i < rbspace.a_count) ? &rbspace.a[i] : &rbspace.b[i - rbspace.a_count];
op->state = RT_AIO_STATE_PENDING;
const char *file_path = rtGetFilePath(batch->loads[i].file);
if (!file_path) {
rtReportError("aio", "Failed to resolve file path for a batched load");
op->state = RT_AIO_STATE_INVALID;
handles[i] = RT_AIO_INVALID_HANDLE;
continue;
}
#ifdef _WIN32
op->overlapped = (OVERLAPPED){
/* ReadFileEx does not use hEvent and we are free to use it for our own purposes. */
.hEvent = (HANDLE)(op),
.Internal = 0,
.InternalHigh = 0,
.Offset = (DWORD)(batch->loads[i].offset & MAXDWORD),
.OffsetHigh = (DWORD)(batch->loads[i].offset >> 32),
};
WCHAR wpath[MAX_PATH];
if (MultiByteToWideChar(CP_UTF8,
MB_PRECOMPOSED,
file_path,
-1,
wpath,
RT_ARRAY_COUNT(wpath)) == 0) {
rtReportError("aio", "MultiByteToWideChar failed with error code: %u", GetLastError());
op->state = RT_AIO_STATE_FINISHED;
handles[i] = RT_AIO_INVALID_HANDLE;
continue;
}
HANDLE file_handle = CreateFileW(wpath,
GENERIC_READ,
FILE_SHARE_READ,
NULL,
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED,
NULL);
if (file_handle == INVALID_HANDLE_VALUE) {
DWORD err = GetLastError();
char error_msg[256];
Win32ErrorToString(err, error_msg, 256);
rtReportError("aio",
"CreateFileW failed for file: %s with error code: %u (%s)",
file_path,
err,
error_msg);
op->state = RT_AIO_STATE_INVALID;
handles[i] = RT_AIO_INVALID_HANDLE;
continue;
}
op->file_handle = file_handle;
BOOL result = ReadFileEx(file_handle,
batch->loads[i].dest,
(DWORD)batch->loads[i].num_bytes,
&op->overlapped,
win32CompletionRoutine);
DWORD err = GetLastError();
if (!result || err != ERROR_SUCCESS) {
char error_msg[256];
Win32ErrorToString(err, error_msg, 256);
rtReportError("aio", "ReadFileEx failed with error code: %u (%s)", err, error_msg);
op->state = RT_AIO_STATE_FINISHED;
handles[i] = RT_AIO_INVALID_HANDLE;
CloseHandle(file_handle);
op->file_handle = NULL;
}
/* Handle is the index into the ringbuffer + 1 */
ptrdiff_t op_idx = op - _ringbuffer.storage;
handles[i] = (uint32_t)op_idx + 1;
#endif
}
return RT_SUCCESS;
}
RT_DLLEXPORT rt_result rtSubmitWriteBatch(const rt_write_batch *batch, rt_aio_handle *handles) {
if (batch->num_writes > RT_LOAD_BATCH_MAX_SIZE) {
return RT_AIO_WRITE_TOO_LARGE;
}
rt_ringbuffer_space rbspace = ReserveRingbufferSpace(batch->num_writes);
if (!rbspace.a) {
rtReportError("aio", "Too many pending file operations");
return RT_AIO_TOO_MANY_OPERATIONS;
}
for (unsigned int i = 0; i < batch->num_writes; ++i) {
rt_aio *op = (i < rbspace.a_count) ? &rbspace.a[i] : &rbspace.b[i - rbspace.a_count];
op->state = RT_AIO_STATE_PENDING;
const char *file_path = rtGetFilePath(batch->writes[i].file);
if (!file_path) {
rtReportError("aio", "Failed to resolve file path for a batched write");
op->state = RT_AIO_STATE_INVALID;
handles[i] = RT_AIO_INVALID_HANDLE;
continue;
}
#ifdef _WIN32
op->overlapped = (OVERLAPPED){
/* ReadFileEx does not use hEvent and we are free to use it for our own purposes. */
.hEvent = (HANDLE)(op),
.Internal = 0,
.InternalHigh = 0,
.Offset = (DWORD)(batch->writes[i].offset & MAXDWORD),
.OffsetHigh = (DWORD)(batch->writes[i].offset >> 32),
};
WCHAR wpath[MAX_PATH];
if (MultiByteToWideChar(CP_UTF8,
MB_PRECOMPOSED,
file_path,
-1,
wpath,
RT_ARRAY_COUNT(wpath)) == 0) {
rtReportError("aio", "MultiByteToWideChar failed with error code: %u", GetLastError());
op->state = RT_AIO_STATE_FINISHED;
handles[i] = RT_AIO_INVALID_HANDLE;
continue;
}
HANDLE file_handle = CreateFileW(wpath,
GENERIC_WRITE,
0,
NULL,
OPEN_ALWAYS,
FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED,
NULL);
if (file_handle == INVALID_HANDLE_VALUE) {
DWORD err = GetLastError();
char error_msg[256];
Win32ErrorToString(err, error_msg, 256);
rtReportError("aio",
"CreateFileW failed for file: %s with error code: %u (%s)",
file_path,
err,
error_msg);
op->state = RT_AIO_STATE_INVALID;
handles[i] = RT_AIO_INVALID_HANDLE;
continue;
}
op->file_handle = file_handle;
BOOL result = WriteFileEx(file_handle,
batch->writes[i].buffer,
(DWORD)batch->writes[i].num_bytes,
&op->overlapped,
win32CompletionRoutine);
DWORD err = GetLastError();
if (!result || (err != ERROR_SUCCESS && err != ERROR_ALREADY_EXISTS)) {
char error_msg[256];
Win32ErrorToString(err, error_msg, 256);
rtReportError("aio", "WriteFileEx failed with error code: %u (%s)", err, error_msg);
op->state = RT_AIO_STATE_FINISHED;
handles[i] = RT_AIO_INVALID_HANDLE;
CloseHandle(file_handle);
op->file_handle = NULL;
}
/* Handle is the index into the ringbuffer + 1 */
ptrdiff_t op_idx = op - _ringbuffer.storage;
handles[i] = (uint32_t)op_idx + 1;
#endif
}
return RT_SUCCESS;
}
RT_DLLEXPORT rt_aio_state rtGetAIOState(rt_aio_handle handle) {
if (handle == RT_AIO_INVALID_HANDLE || handle > _ringbuffer.capacity)
return RT_AIO_STATE_INVALID;
#ifdef _WIN32
/* Give the compation function an opportunity to run */
SleepEx(0, TRUE);
#endif
rtLockMutex(_ringbuffer.guard);
rt_aio_state state = _ringbuffer.storage[handle - 1].state;
rtUnlockMutex(_ringbuffer.guard);
return state;
}
RT_DLLEXPORT void rtReleaseAIO(rt_aio_handle handle) {
if (handle == RT_AIO_INVALID_HANDLE || handle > _ringbuffer.capacity) {
return;
}
rtLockMutex(_ringbuffer.guard);
_ringbuffer.storage[handle - 1].state = RT_AIO_STATE_INVALID;
if (handle - 1 == _ringbuffer.tail) {
/* Advance the tail such that it points to the last used slot. (Or to head, if the
* ringbuffer is now empty) */
uint32_t i = _ringbuffer.tail;
while ((_ringbuffer.storage[i].state == RT_AIO_STATE_INVALID) && i != _ringbuffer.head) {
i = (i + 1) % _ringbuffer.capacity;
}
_ringbuffer.tail = i;
}
rtUnlockMutex(_ringbuffer.guard);
}
RT_DLLEXPORT rt_aio_state rtWaitForAIOCompletion(rt_aio_handle handle) {
if (handle == RT_AIO_INVALID_HANDLE || handle > _ringbuffer.capacity)
return RT_AIO_STATE_INVALID;
rt_aio_state state;
do {
state = rtGetAIOState(handle);
/* NOTE(Kevin): This is where we could temporarily run a job. */
#ifdef _WIN32
YieldProcessor();
#elif defined(__linux__)
sched_yield();
#endif
} while (state == RT_AIO_STATE_PENDING);
return state;
}
RT_DLLEXPORT rt_result rtSubmitSingleLoad(rt_file_load load, rt_aio_handle *handle) {
rt_load_batch batch;
batch.loads[0] = load;
batch.num_loads = 1;
return rtSubmitLoadBatch(&batch, handle);
}
RT_DLLEXPORT rt_aio_state rtSubmitSingleLoadSync(rt_file_load load) {
rt_aio_handle handle;
if (rtSubmitSingleLoad(load, &handle) != RT_SUCCESS)
return RT_AIO_STATE_FAILED;
rt_aio_state state = rtWaitForAIOCompletion(handle);
rtReleaseAIO(handle);
return state;
}

View File

@ -3,7 +3,7 @@
#include "runtime.h"
#include "threading.h"
#include "aio.h"
#include "rt_aio.h"
#include "buffer_manager.h"
#include "file_tab.h"
#include "mem_arena.h"

View File

@ -258,16 +258,13 @@ RT_DLLEXPORT uint64_t rtGetFileModificationTimestamp(const char *path) {
}
RT_DLLEXPORT bool rtSyncReadWholeFile(const char *path, void *dest, size_t dest_size) {
size_t fsz = rtGetFileSize(path);
if (fsz > dest_size) {
return false;
}
FILE *f = fopen(path, "rb");
if (!f)
return false;
fseek(f, SEEK_END, 0);
size_t fsz = (size_t)ftell(f);
fseek(f, SEEK_SET, 0);
if (fsz > dest_size) {
fclose(f);
return false;
}
size_t n = fread(dest, 1, fsz, f);
fclose(f);
return n == fsz;

View File

@ -7,10 +7,15 @@ xxhash_proj = subproject('xxhash', default_options: ['default_library=static', '
xxhash_dep = xxhash_proj.get_variable('xxhash_dep')
runtime_deps = [thread_dep, m_dep, inih_dep, lz4_dep, xxhash_dep]
if host_machine.system() == 'linux'
rt_dep = declare_dependency(link_args: ['-lrt'])
runtime_deps += rt_dep
endif
runtime_incdirs = contrib_incdir
runtime_lib = library('rt',
# Project Sources
'aio.h',
'atomics.h',
'buffer_manager.h',
'compression.h',
@ -25,13 +30,13 @@ runtime_lib = library('rt',
'jobs.h',
'mem_arena.h',
'resources.h',
'rt_aio.h',
'runtime.h',
'string_storage.h',
'threading.h',
'threading_helpers.hpp',
'timing.h',
'aio.c',
'assert.c',
'buffer_manager.c',
'compression.c',
@ -48,6 +53,7 @@ runtime_lib = library('rt',
'jobs.c',
'mem_arena.c',
'resource_manager.c',
'rt_aio.c',
'sprint.c',
'string_storage.c',
'text.c',

View File

@ -1,4 +1,4 @@
#include "aio.h"
#include "rt_aio.h"
#include "buffer_manager.h"
#include "compression.h"
#include "config.h"

503
src/runtime/rt_aio.c Normal file
View File

@ -0,0 +1,503 @@
#include "rt_aio.h"
#include "config.h"
#include "runtime.h"
#include "threading.h"
#ifdef _WIN32
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
void Win32ErrorToString(DWORD last_error, char *out, int bufsize);
#elif defined(__linux__)
#include <sys/types.h>
#include <sys/file.h>
#include <sched.h>
#include <aio.h>
#include <signal.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#define IO_SIGNAL SIGUSR1
#endif
#include <assert.h>
#include <stdlib.h>
/* Maintain a ringbuffer of pending operations */
typedef struct {
#ifdef _WIN32
HANDLE file_handle;
OVERLAPPED overlapped;
#elif defined(__linux__)
int fd;
struct aiocb64 cb;
#endif
volatile rt_aio_state state;
} rt_aio;
typedef struct {
rt_mutex *guard;
rt_aio *storage;
uint32_t capacity;
uint32_t head;
uint32_t tail;
} rt_aio_ringbuffer;
typedef struct {
rt_aio *a;
rt_aio *b;
uint32_t a_count;
} rt_ringbuffer_space;
static rt_aio_ringbuffer _ringbuffer;
static rt_ringbuffer_space ReserveRingbufferSpace(uint32_t count) {
if (!rtLockMutex(_ringbuffer.guard)) {
rt_ringbuffer_space failed = {NULL, NULL, 0};
return failed;
}
rt_ringbuffer_space result = {NULL, NULL, 0};
if (_ringbuffer.head >= _ringbuffer.tail) {
if (_ringbuffer.head + count <= _ringbuffer.capacity) {
result.a_count = count;
result.a = &_ringbuffer.storage[_ringbuffer.head];
_ringbuffer.head = (_ringbuffer.head + count) % _ringbuffer.capacity;
} else {
/* Check if enough space is free at the end */
uint32_t a_count = _ringbuffer.capacity - _ringbuffer.head;
uint32_t b_count = count - a_count;
if (b_count <= _ringbuffer.tail) {
result.a_count = a_count;
result.a = &_ringbuffer.storage[_ringbuffer.head];
result.b = &_ringbuffer.storage[0];
_ringbuffer.head = b_count;
} else {
/* Not enough space, we would overwrite the tail */
rtLog("aio", "Ringbuffer is full.");
}
}
} else {
/* Head is lower than tail */
uint32_t num_free = _ringbuffer.tail - _ringbuffer.head;
if (count < num_free) {
result.a_count = count;
result.a = &_ringbuffer.storage[_ringbuffer.head];
_ringbuffer.head = (_ringbuffer.head + count) % _ringbuffer.capacity;
} else {
/* Not enough space, we would overwrite the tail */
rtLog("aio", "Ringbuffer is full.");
}
}
rtUnlockMutex(_ringbuffer.guard);
return result;
}
#ifdef _WIN32
static void
win32CompletionRoutine(DWORD error_code, DWORD num_bytes_transfered, LPOVERLAPPED overlapped) {
rt_aio *op = (rt_aio *)overlapped->hEvent;
assert(op->state == RT_AIO_STATE_PENDING);
if (error_code != ERROR_SUCCESS) {
op->state = RT_AIO_STATE_FAILED;
rtLog("aio", "Async io failed: %u", error_code);
} else {
op->state = RT_AIO_STATE_FINISHED;
}
CloseHandle(op->file_handle);
}
#elif defined(__linux__)
static void linuxAIOSigHandler(int sig, siginfo_t *si, void *ucontext) {
RT_ASSERT(sig == IO_SIGNAL, "The signal handler was called for an unexpected signal.");
if (si->si_code !=SI_ASYNCIO)
return;
rt_aio *op = si->si_value.sival_ptr;
RT_ASSERT(op->state == RT_AIO_STATE_PENDING, "The async io operation was in an unexpected state.");
if (si->si_errno != 0) {
const char *err = strerror(si->si_errno);
rtLog("aio", "Async io failed: %u (%s)", si->si_errno, err);
op->state = RT_AIO_STATE_FAILED;
}
else {
op->state = RT_AIO_STATE_FINISHED;
}
close(op->fd);
op->fd = -1;
}
#endif
RT_CVAR_I(rt_MaxConcurrentAsyncIO,
"Maximum number of concurrent async. I/O operations. Default: 1024",
1024);
rt_result InitAIO(void) {
unsigned int max_concurrent_operations = rt_MaxConcurrentAsyncIO.i;
_ringbuffer.guard = rtCreateMutex();
if (!_ringbuffer.guard) {
return RT_AIO_OUT_OF_MEMORY;
}
if (max_concurrent_operations == 0)
max_concurrent_operations = 1024;
_ringbuffer.storage = calloc(max_concurrent_operations, sizeof(rt_aio));
if (!_ringbuffer.storage)
return RT_AIO_OUT_OF_MEMORY;
_ringbuffer.head = 0;
_ringbuffer.tail = 0;
_ringbuffer.capacity = max_concurrent_operations;
#ifdef __linux__
/* Register the handler for the IO completion signal */
struct sigaction sa;
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_RESTART | SA_SIGINFO;
sa.sa_sigaction = linuxAIOSigHandler;
if (sigaction(IO_SIGNAL, &sa, NULL) == -1) {
return RT_UNKNOWN_ERROR;
}
#endif
return RT_SUCCESS;
}
void ShutdownAIO(void) {
rtDestroyMutex(_ringbuffer.guard);
free(_ringbuffer.storage);
_ringbuffer.capacity = 0;
}
#ifdef _WIN32
static void win32LoadBatchInner(const char *file_path, const rt_file_load *load, rt_aio *op, rt_aio_handle *handle) {
op->overlapped = (OVERLAPPED){
/* ReadFileEx does not use hEvent and we are free to use it for our own purposes. */
.hEvent = (HANDLE)(op),
.Internal = 0,
.InternalHigh = 0,
.Offset = (DWORD)(load->offset & MAXDWORD),
.OffsetHigh = (DWORD)(load->offset >> 32),
};
WCHAR wpath[MAX_PATH];
if (MultiByteToWideChar(CP_UTF8, MB_PRECOMPOSED, file_path, -1, wpath, RT_ARRAY_COUNT(wpath)) ==
0) {
rtReportError("aio", "MultiByteToWideChar failed with error code: %u", GetLastError());
op->state = RT_AIO_STATE_FINISHED;
*handle = RT_AIO_INVALID_HANDLE;
return;
}
HANDLE file_handle = CreateFileW(wpath,
GENERIC_READ,
FILE_SHARE_READ,
NULL,
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED,
NULL);
if (file_handle == INVALID_HANDLE_VALUE) {
DWORD err = GetLastError();
char error_msg[256];
Win32ErrorToString(err, error_msg, 256);
rtReportError("aio",
"CreateFileW failed for file: %s with error code: %u (%s)",
file_path,
err,
error_msg);
op->state = RT_AIO_STATE_INVALID;
*handle = RT_AIO_INVALID_HANDLE;
return;
}
op->file_handle = file_handle;
BOOL result = ReadFileEx(file_handle,
load->dest,
(DWORD)load->num_bytes,
&op->overlapped,
win32CompletionRoutine);
DWORD err = GetLastError();
if (!result || err != ERROR_SUCCESS) {
char error_msg[256];
Win32ErrorToString(err, error_msg, 256);
rtReportError("aio", "ReadFileEx failed with error code: %u (%s)", err, error_msg);
op->state = RT_AIO_STATE_FINISHED;
*handle = RT_AIO_INVALID_HANDLE;
CloseHandle(file_handle);
op->file_handle = NULL;
}
/* Handle is the index into the ringbuffer + 1 */
ptrdiff_t op_idx = op - _ringbuffer.storage;
*handle = (uint32_t)op_idx + 1;
}
#endif
#ifdef __linux__
static void linuxLoadBatchInner(const char *file_path, const rt_file_load *load, rt_aio *op, rt_aio_handle *handle) {
memset(&op->cb, 0, sizeof(op->cb));
int fd = open(file_path, O_RDONLY | O_LARGEFILE);
if (fd == -1) {
const char *err = strerror(errno);
rtReportError("aio", "open failed for file: %s with error: %d (%s)", file_path, errno, err);;
op->state = RT_AIO_STATE_INVALID;
*handle = RT_AIO_INVALID_HANDLE;
return;
}
op->fd = fd;
op->cb.aio_fildes = fd;
op->cb.aio_offset = load->offset;
op->cb.aio_buf = load->dest;
op->cb.aio_nbytes = load->num_bytes;
op->cb.aio_reqprio = 0;
op->cb.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
op->cb.aio_sigevent.sigev_signo = IO_SIGNAL;
op->cb.aio_sigevent.sigev_value.sival_ptr = op;
if (aio_read64(&op->cb) == -1) {
const char *err = strerror(errno);
rtReportError("aio", "aio_read64 failed for file: %s with error: %d (%s)", file_path, errno, err);
close(fd);
op->state = RT_AIO_STATE_INVALID;
*handle = RT_AIO_INVALID_HANDLE;
return;
}
ptrdiff_t op_idx = op - _ringbuffer.storage;
*handle = (uint32_t)op_idx + 1;
}
#endif
RT_DLLEXPORT rt_result rtSubmitLoadBatch(const rt_load_batch *batch, rt_aio_handle *handles) {
if (batch->num_loads > RT_LOAD_BATCH_MAX_SIZE) {
return RT_AIO_LOAD_TOO_LARGE;
}
rt_ringbuffer_space rbspace = ReserveRingbufferSpace(batch->num_loads);
if (!rbspace.a) {
rtReportError("aio", "Too many pending file operations");
return RT_AIO_TOO_MANY_OPERATIONS;
}
for (unsigned int i = 0; i < batch->num_loads; ++i) {
rt_aio *op = (i < rbspace.a_count) ? &rbspace.a[i] : &rbspace.b[i - rbspace.a_count];
op->state = RT_AIO_STATE_PENDING;
const char *file_path = rtGetFilePath(batch->loads[i].file);
if (!file_path) {
rtReportError("aio", "Failed to resolve file path for a batched load");
op->state = RT_AIO_STATE_INVALID;
handles[i] = RT_AIO_INVALID_HANDLE;
continue;
}
#ifdef _WIN32
win32LoadBatchInner(file_path, &batch->loads[i], op, &handles[i]);
#elif defined(__linux__)
linuxLoadBatchInner(file_path, &batch->loads[i], op, &handles[i]);
#endif
}
return RT_SUCCESS;
}
#ifdef _WIN32
static void win32WriteBatchInner(const char *file_path, const rt_file_write *write, rt_aio *op, rt_aio_handle *handle) {
op->overlapped = (OVERLAPPED){
/* ReadFileEx does not use hEvent and we are free to use it for our own purposes. */
.hEvent = (HANDLE)(op),
.Internal = 0,
.InternalHigh = 0,
.Offset = (DWORD)(write->offset & MAXDWORD),
.OffsetHigh = (DWORD)(write->offset >> 32),
};
WCHAR wpath[MAX_PATH];
if (MultiByteToWideChar(CP_UTF8, MB_PRECOMPOSED, file_path, -1, wpath, RT_ARRAY_COUNT(wpath)) ==
0) {
rtReportError("aio", "MultiByteToWideChar failed with error code: %u", GetLastError());
op->state = RT_AIO_STATE_FINISHED;
*handle = RT_AIO_INVALID_HANDLE;
return;
}
HANDLE file_handle = CreateFileW(wpath,
GENERIC_WRITE,
0,
NULL,
OPEN_ALWAYS,
FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED,
NULL);
if (file_handle == INVALID_HANDLE_VALUE) {
DWORD err = GetLastError();
char error_msg[256];
Win32ErrorToString(err, error_msg, 256);
rtReportError("aio",
"CreateFileW failed for file: %s with error code: %u (%s)",
file_path,
err,
error_msg);
op->state = RT_AIO_STATE_INVALID;
*handle = RT_AIO_INVALID_HANDLE;
return;
}
op->file_handle = file_handle;
BOOL result = WriteFileEx(file_handle,
write->buffer,
(DWORD)write->num_bytes,
&op->overlapped,
win32CompletionRoutine);
DWORD err = GetLastError();
if (!result || (err != ERROR_SUCCESS && err != ERROR_ALREADY_EXISTS)) {
char error_msg[256];
Win32ErrorToString(err, error_msg, 256);
rtReportError("aio", "WriteFileEx failed with error code: %u (%s)", err, error_msg);
op->state = RT_AIO_STATE_FINISHED;
*handle = RT_AIO_INVALID_HANDLE;
CloseHandle(file_handle);
op->file_handle = INVALID_HANDLE;
}
/* Handle is the index into the ringbuffer + 1 */
ptrdiff_t op_idx = op - _ringbuffer.storage;
*handle = (uint32_t)op_idx + 1;
}
#endif
#ifdef __linux__
static void linuxWriteBatchInner(const char *file_path, const rt_file_write *write, rt_aio *op, rt_aio_handle *handle) {
memset(&op->cb, 0, sizeof(op->cb));
int fd = open(file_path, O_WRONLY | O_CREAT | O_LARGEFILE);
if (fd == -1) {
const char *err = strerror(errno);
rtReportError("aio", "open failed for file: %s with error: %d (%s)", file_path, errno, err);;
op->state = RT_AIO_STATE_INVALID;
*handle = RT_AIO_INVALID_HANDLE;
return;
}
op->fd = fd;
op->cb.aio_fildes = fd;
op->cb.aio_offset = write->offset;
op->cb.aio_buf = (volatile void *)write->buffer;
op->cb.aio_nbytes = write->num_bytes;
op->cb.aio_reqprio = 0;
op->cb.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
op->cb.aio_sigevent.sigev_signo = IO_SIGNAL;
op->cb.aio_sigevent.sigev_value.sival_ptr = op;
if (aio_write64(&op->cb) == -1) {
const char *err = strerror(errno);
rtReportError("aio", "aio_write64 failed for file: %s with error: %d (%s)", file_path, errno, err);
close(fd);
op->state = RT_AIO_STATE_INVALID;
*handle = RT_AIO_INVALID_HANDLE;
return;
}
ptrdiff_t op_idx = op - _ringbuffer.storage;
*handle = (uint32_t)op_idx + 1;
}
#endif
RT_DLLEXPORT rt_result rtSubmitWriteBatch(const rt_write_batch *batch, rt_aio_handle *handles) {
if (batch->num_writes > RT_LOAD_BATCH_MAX_SIZE) {
return RT_AIO_WRITE_TOO_LARGE;
}
rt_ringbuffer_space rbspace = ReserveRingbufferSpace(batch->num_writes);
if (!rbspace.a) {
rtReportError("aio", "Too many pending file operations");
return RT_AIO_TOO_MANY_OPERATIONS;
}
for (unsigned int i = 0; i < batch->num_writes; ++i) {
rt_aio *op = (i < rbspace.a_count) ? &rbspace.a[i] : &rbspace.b[i - rbspace.a_count];
op->state = RT_AIO_STATE_PENDING;
const char *file_path = rtGetFilePath(batch->writes[i].file);
if (!file_path) {
rtReportError("aio", "Failed to resolve file path for a batched write");
op->state = RT_AIO_STATE_INVALID;
handles[i] = RT_AIO_INVALID_HANDLE;
continue;
}
#ifdef _WIN32
win32WriteBatchInner(file_path, &batch->writes[i], op, &handles[i]);
#elif defined(__linux__)
linuxWriteBatchInner(file_path, &batch->writes[i], op, &handles[i]);
#endif
}
return RT_SUCCESS;
}
RT_DLLEXPORT rt_aio_state rtGetAIOState(rt_aio_handle handle) {
if (handle == RT_AIO_INVALID_HANDLE || handle > _ringbuffer.capacity)
return RT_AIO_STATE_INVALID;
#ifdef _WIN32
/* Give the compation function an opportunity to run */
SleepEx(0, TRUE);
#endif
rtLockMutex(_ringbuffer.guard);
rt_aio_state state = _ringbuffer.storage[handle - 1].state;
rtUnlockMutex(_ringbuffer.guard);
return state;
}
RT_DLLEXPORT void rtReleaseAIO(rt_aio_handle handle) {
if (handle == RT_AIO_INVALID_HANDLE || handle > _ringbuffer.capacity) {
return;
}
rtLockMutex(_ringbuffer.guard);
_ringbuffer.storage[handle - 1].state = RT_AIO_STATE_INVALID;
if (handle - 1 == _ringbuffer.tail) {
/* Advance the tail such that it points to the last used slot. (Or to head, if the
* ringbuffer is now empty) */
uint32_t i = _ringbuffer.tail;
while ((_ringbuffer.storage[i].state == RT_AIO_STATE_INVALID) && i != _ringbuffer.head) {
i = (i + 1) % _ringbuffer.capacity;
}
_ringbuffer.tail = i;
}
rtUnlockMutex(_ringbuffer.guard);
}
RT_DLLEXPORT rt_aio_state rtWaitForAIOCompletion(rt_aio_handle handle) {
if (handle == RT_AIO_INVALID_HANDLE || handle > _ringbuffer.capacity)
return RT_AIO_STATE_INVALID;
rt_aio_state state;
do {
state = rtGetAIOState(handle);
/* NOTE(Kevin): This is where we could temporarily run a job. */
#ifdef _WIN32
YieldProcessor();
#elif defined(__linux__)
sched_yield();
#endif
} while (state == RT_AIO_STATE_PENDING);
return state;
}
RT_DLLEXPORT rt_result rtSubmitSingleLoad(rt_file_load load, rt_aio_handle *handle) {
rt_load_batch batch;
batch.loads[0] = load;
batch.num_loads = 1;
return rtSubmitLoadBatch(&batch, handle);
}
RT_DLLEXPORT rt_aio_state rtSubmitSingleLoadSync(rt_file_load load) {
rt_aio_handle handle;
if (rtSubmitSingleLoad(load, &handle) != RT_SUCCESS)
return RT_AIO_STATE_FAILED;
rt_aio_state state = rtWaitForAIOCompletion(handle);
rtReleaseAIO(handle);
return state;
}