From 77f0db7a7cec8518c75708e2248e2515869f54f6 Mon Sep 17 00:00:00 2001 From: Kevin Trogant Date: Wed, 24 Jul 2024 15:10:06 +0200 Subject: [PATCH] feat(linux): Ported aio to linux I had to rename the header, because the POSIX async io header has the same name. --- src/asset_compiler/asset_compiler.c | 2 +- src/runtime/aio.c | 377 --------------------- src/runtime/config.c | 2 +- src/runtime/fsutils.c | 11 +- src/runtime/meson.build | 12 +- src/runtime/resource_manager.c | 2 +- src/runtime/rt_aio.c | 503 ++++++++++++++++++++++++++++ src/runtime/{aio.h => rt_aio.h} | 0 8 files changed, 519 insertions(+), 390 deletions(-) delete mode 100644 src/runtime/aio.c create mode 100644 src/runtime/rt_aio.c rename src/runtime/{aio.h => rt_aio.h} (100%) diff --git a/src/asset_compiler/asset_compiler.c b/src/asset_compiler/asset_compiler.c index 6872cbd..a1d47ed 100644 --- a/src/asset_compiler/asset_compiler.c +++ b/src/asset_compiler/asset_compiler.c @@ -1,7 +1,7 @@ #include "asset_compiler.h" #include "processor.h" -#include "runtime/aio.h" +#include "runtime/rt_aio.h" #include "runtime/buffer_manager.h" #include "runtime/config.h" #include "runtime/file_tab.h" diff --git a/src/runtime/aio.c b/src/runtime/aio.c deleted file mode 100644 index 9b94af0..0000000 --- a/src/runtime/aio.c +++ /dev/null @@ -1,377 +0,0 @@ -#include "aio.h" -#include "config.h" -#include "threading.h" - -#ifdef _WIN32 -#define WIN32_LEAN_AND_MEAN -#include - -void Win32ErrorToString(DWORD last_error, char *out, int bufsize); - -#elif defined(__linux__) -#include - -#endif - -#include -#include - -/* Maintain a ringbuffer of pending operations */ - -typedef struct { -#ifdef _WIN32 - HANDLE file_handle; - OVERLAPPED overlapped; -#endif - volatile rt_aio_state state; -} rt_aio; - -typedef struct { - rt_mutex *guard; - - rt_aio *storage; - uint32_t capacity; - uint32_t head; - uint32_t tail; -} rt_aio_ringbuffer; - -typedef struct { - rt_aio *a; - rt_aio *b; - uint32_t a_count; -} rt_ringbuffer_space; - -static rt_aio_ringbuffer _ringbuffer; - -static rt_ringbuffer_space ReserveRingbufferSpace(uint32_t count) { - if (!rtLockMutex(_ringbuffer.guard)) { - rt_ringbuffer_space failed = {NULL, NULL, 0}; - return failed; - } - - rt_ringbuffer_space result = {NULL, NULL, 0}; - - if (_ringbuffer.head >= _ringbuffer.tail) { - if (_ringbuffer.head + count <= _ringbuffer.capacity) { - result.a_count = count; - result.a = &_ringbuffer.storage[_ringbuffer.head]; - _ringbuffer.head = (_ringbuffer.head + count) % _ringbuffer.capacity; - } else { - /* Check if enough space is free at the end */ - uint32_t a_count = _ringbuffer.capacity - _ringbuffer.head; - uint32_t b_count = count - a_count; - - if (b_count <= _ringbuffer.tail) { - result.a_count = a_count; - result.a = &_ringbuffer.storage[_ringbuffer.head]; - result.b = &_ringbuffer.storage[0]; - _ringbuffer.head = b_count; - } else { - /* Not enough space, we would overwrite the tail */ - rtLog("aio", "Ringbuffer is full."); - } - } - } else { - /* Head is lower than tail */ - uint32_t num_free = _ringbuffer.tail - _ringbuffer.head; - if (count < num_free) { - result.a_count = count; - result.a = &_ringbuffer.storage[_ringbuffer.head]; - _ringbuffer.head = (_ringbuffer.head + count) % _ringbuffer.capacity; - } else { - /* Not enough space, we would overwrite the tail */ - rtLog("aio", "Ringbuffer is full."); - } - } - - rtUnlockMutex(_ringbuffer.guard); - return result; -} - -#ifdef _WIN32 -static void -win32CompletionRoutine(DWORD error_code, DWORD num_bytes_transfered, LPOVERLAPPED overlapped) { - rt_aio *op = (rt_aio *)overlapped->hEvent; - assert(op->state == RT_AIO_STATE_PENDING); - - if (error_code != ERROR_SUCCESS) { - op->state = RT_AIO_STATE_FAILED; - rtLog("aio", "Async io failed: %u", error_code); - } else { - op->state = RT_AIO_STATE_FINISHED; - } - - CloseHandle(op->file_handle); -} -#endif - -RT_CVAR_I(rt_MaxConcurrentAsyncIO, - "Maximum number of concurrent async. I/O operations. Default: 1024", - 1024); - -rt_result InitAIO(void) { - unsigned int max_concurrent_operations = rt_MaxConcurrentAsyncIO.i; - _ringbuffer.guard = rtCreateMutex(); - if (!_ringbuffer.guard) { - return RT_AIO_OUT_OF_MEMORY; - } - if (max_concurrent_operations == 0) - max_concurrent_operations = 1024; - - _ringbuffer.storage = calloc(max_concurrent_operations, sizeof(rt_aio)); - if (!_ringbuffer.storage) - return RT_AIO_OUT_OF_MEMORY; - _ringbuffer.head = 0; - _ringbuffer.tail = 0; - _ringbuffer.capacity = max_concurrent_operations; - return RT_SUCCESS; -} - -void ShutdownAIO(void) { - rtDestroyMutex(_ringbuffer.guard); - free(_ringbuffer.storage); - _ringbuffer.capacity = 0; -} - -RT_DLLEXPORT rt_result rtSubmitLoadBatch(const rt_load_batch *batch, rt_aio_handle *handles) { - if (batch->num_loads > RT_LOAD_BATCH_MAX_SIZE) { - return RT_AIO_LOAD_TOO_LARGE; - } - - rt_ringbuffer_space rbspace = ReserveRingbufferSpace(batch->num_loads); - if (!rbspace.a) { - rtReportError("aio", "Too many pending file operations"); - return RT_AIO_TOO_MANY_OPERATIONS; - } - - for (unsigned int i = 0; i < batch->num_loads; ++i) { - rt_aio *op = (i < rbspace.a_count) ? &rbspace.a[i] : &rbspace.b[i - rbspace.a_count]; - op->state = RT_AIO_STATE_PENDING; - const char *file_path = rtGetFilePath(batch->loads[i].file); - if (!file_path) { - rtReportError("aio", "Failed to resolve file path for a batched load"); - op->state = RT_AIO_STATE_INVALID; - handles[i] = RT_AIO_INVALID_HANDLE; - continue; - } -#ifdef _WIN32 - op->overlapped = (OVERLAPPED){ - /* ReadFileEx does not use hEvent and we are free to use it for our own purposes. */ - .hEvent = (HANDLE)(op), - .Internal = 0, - .InternalHigh = 0, - .Offset = (DWORD)(batch->loads[i].offset & MAXDWORD), - .OffsetHigh = (DWORD)(batch->loads[i].offset >> 32), - }; - - WCHAR wpath[MAX_PATH]; - if (MultiByteToWideChar(CP_UTF8, - MB_PRECOMPOSED, - file_path, - -1, - wpath, - RT_ARRAY_COUNT(wpath)) == 0) { - rtReportError("aio", "MultiByteToWideChar failed with error code: %u", GetLastError()); - op->state = RT_AIO_STATE_FINISHED; - handles[i] = RT_AIO_INVALID_HANDLE; - continue; - } - - HANDLE file_handle = CreateFileW(wpath, - GENERIC_READ, - FILE_SHARE_READ, - NULL, - OPEN_EXISTING, - FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, - NULL); - if (file_handle == INVALID_HANDLE_VALUE) { - DWORD err = GetLastError(); - char error_msg[256]; - Win32ErrorToString(err, error_msg, 256); - rtReportError("aio", - "CreateFileW failed for file: %s with error code: %u (%s)", - file_path, - err, - error_msg); - op->state = RT_AIO_STATE_INVALID; - handles[i] = RT_AIO_INVALID_HANDLE; - continue; - } - op->file_handle = file_handle; - BOOL result = ReadFileEx(file_handle, - batch->loads[i].dest, - (DWORD)batch->loads[i].num_bytes, - &op->overlapped, - win32CompletionRoutine); - DWORD err = GetLastError(); - if (!result || err != ERROR_SUCCESS) { - char error_msg[256]; - Win32ErrorToString(err, error_msg, 256); - rtReportError("aio", "ReadFileEx failed with error code: %u (%s)", err, error_msg); - op->state = RT_AIO_STATE_FINISHED; - handles[i] = RT_AIO_INVALID_HANDLE; - CloseHandle(file_handle); - op->file_handle = NULL; - } - - /* Handle is the index into the ringbuffer + 1 */ - ptrdiff_t op_idx = op - _ringbuffer.storage; - handles[i] = (uint32_t)op_idx + 1; -#endif - } - - return RT_SUCCESS; -} - -RT_DLLEXPORT rt_result rtSubmitWriteBatch(const rt_write_batch *batch, rt_aio_handle *handles) { - if (batch->num_writes > RT_LOAD_BATCH_MAX_SIZE) { - return RT_AIO_WRITE_TOO_LARGE; - } - - rt_ringbuffer_space rbspace = ReserveRingbufferSpace(batch->num_writes); - if (!rbspace.a) { - rtReportError("aio", "Too many pending file operations"); - return RT_AIO_TOO_MANY_OPERATIONS; - } - - for (unsigned int i = 0; i < batch->num_writes; ++i) { - rt_aio *op = (i < rbspace.a_count) ? &rbspace.a[i] : &rbspace.b[i - rbspace.a_count]; - op->state = RT_AIO_STATE_PENDING; - const char *file_path = rtGetFilePath(batch->writes[i].file); - if (!file_path) { - rtReportError("aio", "Failed to resolve file path for a batched write"); - op->state = RT_AIO_STATE_INVALID; - handles[i] = RT_AIO_INVALID_HANDLE; - continue; - } -#ifdef _WIN32 - op->overlapped = (OVERLAPPED){ - /* ReadFileEx does not use hEvent and we are free to use it for our own purposes. */ - .hEvent = (HANDLE)(op), - .Internal = 0, - .InternalHigh = 0, - .Offset = (DWORD)(batch->writes[i].offset & MAXDWORD), - .OffsetHigh = (DWORD)(batch->writes[i].offset >> 32), - }; - - WCHAR wpath[MAX_PATH]; - if (MultiByteToWideChar(CP_UTF8, - MB_PRECOMPOSED, - file_path, - -1, - wpath, - RT_ARRAY_COUNT(wpath)) == 0) { - rtReportError("aio", "MultiByteToWideChar failed with error code: %u", GetLastError()); - op->state = RT_AIO_STATE_FINISHED; - handles[i] = RT_AIO_INVALID_HANDLE; - continue; - } - - HANDLE file_handle = CreateFileW(wpath, - GENERIC_WRITE, - 0, - NULL, - OPEN_ALWAYS, - FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, - NULL); - if (file_handle == INVALID_HANDLE_VALUE) { - DWORD err = GetLastError(); - char error_msg[256]; - Win32ErrorToString(err, error_msg, 256); - rtReportError("aio", - "CreateFileW failed for file: %s with error code: %u (%s)", - file_path, - err, - error_msg); - op->state = RT_AIO_STATE_INVALID; - handles[i] = RT_AIO_INVALID_HANDLE; - continue; - } - op->file_handle = file_handle; - BOOL result = WriteFileEx(file_handle, - batch->writes[i].buffer, - (DWORD)batch->writes[i].num_bytes, - &op->overlapped, - win32CompletionRoutine); - DWORD err = GetLastError(); - if (!result || (err != ERROR_SUCCESS && err != ERROR_ALREADY_EXISTS)) { - char error_msg[256]; - Win32ErrorToString(err, error_msg, 256); - rtReportError("aio", "WriteFileEx failed with error code: %u (%s)", err, error_msg); - op->state = RT_AIO_STATE_FINISHED; - handles[i] = RT_AIO_INVALID_HANDLE; - CloseHandle(file_handle); - op->file_handle = NULL; - } - - /* Handle is the index into the ringbuffer + 1 */ - ptrdiff_t op_idx = op - _ringbuffer.storage; - handles[i] = (uint32_t)op_idx + 1; -#endif - } - - return RT_SUCCESS; -} - -RT_DLLEXPORT rt_aio_state rtGetAIOState(rt_aio_handle handle) { - if (handle == RT_AIO_INVALID_HANDLE || handle > _ringbuffer.capacity) - return RT_AIO_STATE_INVALID; -#ifdef _WIN32 - /* Give the compation function an opportunity to run */ - SleepEx(0, TRUE); -#endif - rtLockMutex(_ringbuffer.guard); - rt_aio_state state = _ringbuffer.storage[handle - 1].state; - rtUnlockMutex(_ringbuffer.guard); - return state; -} - -RT_DLLEXPORT void rtReleaseAIO(rt_aio_handle handle) { - if (handle == RT_AIO_INVALID_HANDLE || handle > _ringbuffer.capacity) { - return; - } - rtLockMutex(_ringbuffer.guard); - _ringbuffer.storage[handle - 1].state = RT_AIO_STATE_INVALID; - if (handle - 1 == _ringbuffer.tail) { - /* Advance the tail such that it points to the last used slot. (Or to head, if the - * ringbuffer is now empty) */ - uint32_t i = _ringbuffer.tail; - while ((_ringbuffer.storage[i].state == RT_AIO_STATE_INVALID) && i != _ringbuffer.head) { - i = (i + 1) % _ringbuffer.capacity; - } - _ringbuffer.tail = i; - } - rtUnlockMutex(_ringbuffer.guard); -} - -RT_DLLEXPORT rt_aio_state rtWaitForAIOCompletion(rt_aio_handle handle) { - if (handle == RT_AIO_INVALID_HANDLE || handle > _ringbuffer.capacity) - return RT_AIO_STATE_INVALID; - rt_aio_state state; - do { - state = rtGetAIOState(handle); - /* NOTE(Kevin): This is where we could temporarily run a job. */ -#ifdef _WIN32 - YieldProcessor(); -#elif defined(__linux__) - sched_yield(); -#endif - } while (state == RT_AIO_STATE_PENDING); - return state; -} - -RT_DLLEXPORT rt_result rtSubmitSingleLoad(rt_file_load load, rt_aio_handle *handle) { - rt_load_batch batch; - batch.loads[0] = load; - batch.num_loads = 1; - return rtSubmitLoadBatch(&batch, handle); -} - -RT_DLLEXPORT rt_aio_state rtSubmitSingleLoadSync(rt_file_load load) { - rt_aio_handle handle; - if (rtSubmitSingleLoad(load, &handle) != RT_SUCCESS) - return RT_AIO_STATE_FAILED; - rt_aio_state state = rtWaitForAIOCompletion(handle); - rtReleaseAIO(handle); - return state; -} diff --git a/src/runtime/config.c b/src/runtime/config.c index 1eb4a0b..0914376 100644 --- a/src/runtime/config.c +++ b/src/runtime/config.c @@ -3,7 +3,7 @@ #include "runtime.h" #include "threading.h" -#include "aio.h" +#include "rt_aio.h" #include "buffer_manager.h" #include "file_tab.h" #include "mem_arena.h" diff --git a/src/runtime/fsutils.c b/src/runtime/fsutils.c index 9dc9731..6af5b03 100644 --- a/src/runtime/fsutils.c +++ b/src/runtime/fsutils.c @@ -258,16 +258,13 @@ RT_DLLEXPORT uint64_t rtGetFileModificationTimestamp(const char *path) { } RT_DLLEXPORT bool rtSyncReadWholeFile(const char *path, void *dest, size_t dest_size) { + size_t fsz = rtGetFileSize(path); + if (fsz > dest_size) { + return false; + } FILE *f = fopen(path, "rb"); if (!f) return false; - fseek(f, SEEK_END, 0); - size_t fsz = (size_t)ftell(f); - fseek(f, SEEK_SET, 0); - if (fsz > dest_size) { - fclose(f); - return false; - } size_t n = fread(dest, 1, fsz, f); fclose(f); return n == fsz; diff --git a/src/runtime/meson.build b/src/runtime/meson.build index c027030..ac58403 100644 --- a/src/runtime/meson.build +++ b/src/runtime/meson.build @@ -7,10 +7,15 @@ xxhash_proj = subproject('xxhash', default_options: ['default_library=static', ' xxhash_dep = xxhash_proj.get_variable('xxhash_dep') runtime_deps = [thread_dep, m_dep, inih_dep, lz4_dep, xxhash_dep] + +if host_machine.system() == 'linux' + rt_dep = declare_dependency(link_args: ['-lrt']) + runtime_deps += rt_dep +endif + runtime_incdirs = contrib_incdir runtime_lib = library('rt', # Project Sources - 'aio.h', 'atomics.h', 'buffer_manager.h', 'compression.h', @@ -25,13 +30,13 @@ runtime_lib = library('rt', 'jobs.h', 'mem_arena.h', 'resources.h', + 'rt_aio.h', 'runtime.h', 'string_storage.h', 'threading.h', 'threading_helpers.hpp', 'timing.h', - - 'aio.c', + 'assert.c', 'buffer_manager.c', 'compression.c', @@ -48,6 +53,7 @@ runtime_lib = library('rt', 'jobs.c', 'mem_arena.c', 'resource_manager.c', + 'rt_aio.c', 'sprint.c', 'string_storage.c', 'text.c', diff --git a/src/runtime/resource_manager.c b/src/runtime/resource_manager.c index 5b4cab7..9808186 100644 --- a/src/runtime/resource_manager.c +++ b/src/runtime/resource_manager.c @@ -1,4 +1,4 @@ -#include "aio.h" +#include "rt_aio.h" #include "buffer_manager.h" #include "compression.h" #include "config.h" diff --git a/src/runtime/rt_aio.c b/src/runtime/rt_aio.c new file mode 100644 index 0000000..a65baab --- /dev/null +++ b/src/runtime/rt_aio.c @@ -0,0 +1,503 @@ +#include "rt_aio.h" +#include "config.h" +#include "runtime.h" +#include "threading.h" + +#ifdef _WIN32 +#define WIN32_LEAN_AND_MEAN +#include + +void Win32ErrorToString(DWORD last_error, char *out, int bufsize); + +#elif defined(__linux__) +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +#define IO_SIGNAL SIGUSR1 + +#endif + +#include +#include + +/* Maintain a ringbuffer of pending operations */ + +typedef struct { +#ifdef _WIN32 + HANDLE file_handle; + OVERLAPPED overlapped; +#elif defined(__linux__) + int fd; + struct aiocb64 cb; +#endif + volatile rt_aio_state state; +} rt_aio; + +typedef struct { + rt_mutex *guard; + + rt_aio *storage; + uint32_t capacity; + uint32_t head; + uint32_t tail; +} rt_aio_ringbuffer; + +typedef struct { + rt_aio *a; + rt_aio *b; + uint32_t a_count; +} rt_ringbuffer_space; + +static rt_aio_ringbuffer _ringbuffer; + +static rt_ringbuffer_space ReserveRingbufferSpace(uint32_t count) { + if (!rtLockMutex(_ringbuffer.guard)) { + rt_ringbuffer_space failed = {NULL, NULL, 0}; + return failed; + } + + rt_ringbuffer_space result = {NULL, NULL, 0}; + + if (_ringbuffer.head >= _ringbuffer.tail) { + if (_ringbuffer.head + count <= _ringbuffer.capacity) { + result.a_count = count; + result.a = &_ringbuffer.storage[_ringbuffer.head]; + _ringbuffer.head = (_ringbuffer.head + count) % _ringbuffer.capacity; + } else { + /* Check if enough space is free at the end */ + uint32_t a_count = _ringbuffer.capacity - _ringbuffer.head; + uint32_t b_count = count - a_count; + + if (b_count <= _ringbuffer.tail) { + result.a_count = a_count; + result.a = &_ringbuffer.storage[_ringbuffer.head]; + result.b = &_ringbuffer.storage[0]; + _ringbuffer.head = b_count; + } else { + /* Not enough space, we would overwrite the tail */ + rtLog("aio", "Ringbuffer is full."); + } + } + } else { + /* Head is lower than tail */ + uint32_t num_free = _ringbuffer.tail - _ringbuffer.head; + if (count < num_free) { + result.a_count = count; + result.a = &_ringbuffer.storage[_ringbuffer.head]; + _ringbuffer.head = (_ringbuffer.head + count) % _ringbuffer.capacity; + } else { + /* Not enough space, we would overwrite the tail */ + rtLog("aio", "Ringbuffer is full."); + } + } + + rtUnlockMutex(_ringbuffer.guard); + return result; +} + +#ifdef _WIN32 +static void +win32CompletionRoutine(DWORD error_code, DWORD num_bytes_transfered, LPOVERLAPPED overlapped) { + rt_aio *op = (rt_aio *)overlapped->hEvent; + assert(op->state == RT_AIO_STATE_PENDING); + + if (error_code != ERROR_SUCCESS) { + op->state = RT_AIO_STATE_FAILED; + rtLog("aio", "Async io failed: %u", error_code); + } else { + op->state = RT_AIO_STATE_FINISHED; + } + + CloseHandle(op->file_handle); +} + +#elif defined(__linux__) +static void linuxAIOSigHandler(int sig, siginfo_t *si, void *ucontext) { + RT_ASSERT(sig == IO_SIGNAL, "The signal handler was called for an unexpected signal."); + if (si->si_code !=SI_ASYNCIO) + return; + rt_aio *op = si->si_value.sival_ptr; + RT_ASSERT(op->state == RT_AIO_STATE_PENDING, "The async io operation was in an unexpected state."); + + if (si->si_errno != 0) { + const char *err = strerror(si->si_errno); + rtLog("aio", "Async io failed: %u (%s)", si->si_errno, err); + op->state = RT_AIO_STATE_FAILED; + } + else { + op->state = RT_AIO_STATE_FINISHED; + } + + close(op->fd); + op->fd = -1; +} +#endif + +RT_CVAR_I(rt_MaxConcurrentAsyncIO, + "Maximum number of concurrent async. I/O operations. Default: 1024", + 1024); + +rt_result InitAIO(void) { + unsigned int max_concurrent_operations = rt_MaxConcurrentAsyncIO.i; + _ringbuffer.guard = rtCreateMutex(); + if (!_ringbuffer.guard) { + return RT_AIO_OUT_OF_MEMORY; + } + if (max_concurrent_operations == 0) + max_concurrent_operations = 1024; + + _ringbuffer.storage = calloc(max_concurrent_operations, sizeof(rt_aio)); + if (!_ringbuffer.storage) + return RT_AIO_OUT_OF_MEMORY; + _ringbuffer.head = 0; + _ringbuffer.tail = 0; + _ringbuffer.capacity = max_concurrent_operations; + +#ifdef __linux__ + /* Register the handler for the IO completion signal */ + struct sigaction sa; + sigemptyset(&sa.sa_mask); + sa.sa_flags = SA_RESTART | SA_SIGINFO; + sa.sa_sigaction = linuxAIOSigHandler; + if (sigaction(IO_SIGNAL, &sa, NULL) == -1) { + return RT_UNKNOWN_ERROR; + } +#endif + + return RT_SUCCESS; +} + +void ShutdownAIO(void) { + rtDestroyMutex(_ringbuffer.guard); + free(_ringbuffer.storage); + _ringbuffer.capacity = 0; +} + +#ifdef _WIN32 +static void win32LoadBatchInner(const char *file_path, const rt_file_load *load, rt_aio *op, rt_aio_handle *handle) { + op->overlapped = (OVERLAPPED){ + /* ReadFileEx does not use hEvent and we are free to use it for our own purposes. */ + .hEvent = (HANDLE)(op), + .Internal = 0, + .InternalHigh = 0, + .Offset = (DWORD)(load->offset & MAXDWORD), + .OffsetHigh = (DWORD)(load->offset >> 32), + }; + + WCHAR wpath[MAX_PATH]; + if (MultiByteToWideChar(CP_UTF8, MB_PRECOMPOSED, file_path, -1, wpath, RT_ARRAY_COUNT(wpath)) == + 0) { + rtReportError("aio", "MultiByteToWideChar failed with error code: %u", GetLastError()); + op->state = RT_AIO_STATE_FINISHED; + *handle = RT_AIO_INVALID_HANDLE; + return; + } + + HANDLE file_handle = CreateFileW(wpath, + GENERIC_READ, + FILE_SHARE_READ, + NULL, + OPEN_EXISTING, + FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, + NULL); + if (file_handle == INVALID_HANDLE_VALUE) { + DWORD err = GetLastError(); + char error_msg[256]; + Win32ErrorToString(err, error_msg, 256); + rtReportError("aio", + "CreateFileW failed for file: %s with error code: %u (%s)", + file_path, + err, + error_msg); + op->state = RT_AIO_STATE_INVALID; + *handle = RT_AIO_INVALID_HANDLE; + return; + } + op->file_handle = file_handle; + BOOL result = ReadFileEx(file_handle, + load->dest, + (DWORD)load->num_bytes, + &op->overlapped, + win32CompletionRoutine); + DWORD err = GetLastError(); + if (!result || err != ERROR_SUCCESS) { + char error_msg[256]; + Win32ErrorToString(err, error_msg, 256); + rtReportError("aio", "ReadFileEx failed with error code: %u (%s)", err, error_msg); + op->state = RT_AIO_STATE_FINISHED; + *handle = RT_AIO_INVALID_HANDLE; + CloseHandle(file_handle); + op->file_handle = NULL; + } + + /* Handle is the index into the ringbuffer + 1 */ + ptrdiff_t op_idx = op - _ringbuffer.storage; + *handle = (uint32_t)op_idx + 1; +} +#endif + +#ifdef __linux__ +static void linuxLoadBatchInner(const char *file_path, const rt_file_load *load, rt_aio *op, rt_aio_handle *handle) { + memset(&op->cb, 0, sizeof(op->cb)); + int fd = open(file_path, O_RDONLY | O_LARGEFILE); + if (fd == -1) { + const char *err = strerror(errno); + rtReportError("aio", "open failed for file: %s with error: %d (%s)", file_path, errno, err);; + op->state = RT_AIO_STATE_INVALID; + *handle = RT_AIO_INVALID_HANDLE; + return; + } + op->fd = fd; + op->cb.aio_fildes = fd; + op->cb.aio_offset = load->offset; + op->cb.aio_buf = load->dest; + op->cb.aio_nbytes = load->num_bytes; + op->cb.aio_reqprio = 0; + op->cb.aio_sigevent.sigev_notify = SIGEV_SIGNAL; + op->cb.aio_sigevent.sigev_signo = IO_SIGNAL; + op->cb.aio_sigevent.sigev_value.sival_ptr = op; + + if (aio_read64(&op->cb) == -1) { + const char *err = strerror(errno); + rtReportError("aio", "aio_read64 failed for file: %s with error: %d (%s)", file_path, errno, err); + close(fd); + op->state = RT_AIO_STATE_INVALID; + *handle = RT_AIO_INVALID_HANDLE; + return; + } + + ptrdiff_t op_idx = op - _ringbuffer.storage; + *handle = (uint32_t)op_idx + 1; +} +#endif + +RT_DLLEXPORT rt_result rtSubmitLoadBatch(const rt_load_batch *batch, rt_aio_handle *handles) { + if (batch->num_loads > RT_LOAD_BATCH_MAX_SIZE) { + return RT_AIO_LOAD_TOO_LARGE; + } + + rt_ringbuffer_space rbspace = ReserveRingbufferSpace(batch->num_loads); + if (!rbspace.a) { + rtReportError("aio", "Too many pending file operations"); + return RT_AIO_TOO_MANY_OPERATIONS; + } + + for (unsigned int i = 0; i < batch->num_loads; ++i) { + rt_aio *op = (i < rbspace.a_count) ? &rbspace.a[i] : &rbspace.b[i - rbspace.a_count]; + op->state = RT_AIO_STATE_PENDING; + const char *file_path = rtGetFilePath(batch->loads[i].file); + if (!file_path) { + rtReportError("aio", "Failed to resolve file path for a batched load"); + op->state = RT_AIO_STATE_INVALID; + handles[i] = RT_AIO_INVALID_HANDLE; + continue; + } +#ifdef _WIN32 + win32LoadBatchInner(file_path, &batch->loads[i], op, &handles[i]); +#elif defined(__linux__) + linuxLoadBatchInner(file_path, &batch->loads[i], op, &handles[i]); +#endif + } + + return RT_SUCCESS; +} + +#ifdef _WIN32 +static void win32WriteBatchInner(const char *file_path, const rt_file_write *write, rt_aio *op, rt_aio_handle *handle) { + op->overlapped = (OVERLAPPED){ + /* ReadFileEx does not use hEvent and we are free to use it for our own purposes. */ + .hEvent = (HANDLE)(op), + .Internal = 0, + .InternalHigh = 0, + .Offset = (DWORD)(write->offset & MAXDWORD), + .OffsetHigh = (DWORD)(write->offset >> 32), + }; + + WCHAR wpath[MAX_PATH]; + if (MultiByteToWideChar(CP_UTF8, MB_PRECOMPOSED, file_path, -1, wpath, RT_ARRAY_COUNT(wpath)) == + 0) { + rtReportError("aio", "MultiByteToWideChar failed with error code: %u", GetLastError()); + op->state = RT_AIO_STATE_FINISHED; + *handle = RT_AIO_INVALID_HANDLE; + return; + } + + HANDLE file_handle = CreateFileW(wpath, + GENERIC_WRITE, + 0, + NULL, + OPEN_ALWAYS, + FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, + NULL); + if (file_handle == INVALID_HANDLE_VALUE) { + DWORD err = GetLastError(); + char error_msg[256]; + Win32ErrorToString(err, error_msg, 256); + rtReportError("aio", + "CreateFileW failed for file: %s with error code: %u (%s)", + file_path, + err, + error_msg); + op->state = RT_AIO_STATE_INVALID; + *handle = RT_AIO_INVALID_HANDLE; + return; + } + op->file_handle = file_handle; + BOOL result = WriteFileEx(file_handle, + write->buffer, + (DWORD)write->num_bytes, + &op->overlapped, + win32CompletionRoutine); + DWORD err = GetLastError(); + if (!result || (err != ERROR_SUCCESS && err != ERROR_ALREADY_EXISTS)) { + char error_msg[256]; + Win32ErrorToString(err, error_msg, 256); + rtReportError("aio", "WriteFileEx failed with error code: %u (%s)", err, error_msg); + op->state = RT_AIO_STATE_FINISHED; + *handle = RT_AIO_INVALID_HANDLE; + CloseHandle(file_handle); + op->file_handle = INVALID_HANDLE; + } + + /* Handle is the index into the ringbuffer + 1 */ + ptrdiff_t op_idx = op - _ringbuffer.storage; + *handle = (uint32_t)op_idx + 1; +} +#endif + +#ifdef __linux__ +static void linuxWriteBatchInner(const char *file_path, const rt_file_write *write, rt_aio *op, rt_aio_handle *handle) { + memset(&op->cb, 0, sizeof(op->cb)); + int fd = open(file_path, O_WRONLY | O_CREAT | O_LARGEFILE); + if (fd == -1) { + const char *err = strerror(errno); + rtReportError("aio", "open failed for file: %s with error: %d (%s)", file_path, errno, err);; + op->state = RT_AIO_STATE_INVALID; + *handle = RT_AIO_INVALID_HANDLE; + return; + } + op->fd = fd; + op->cb.aio_fildes = fd; + op->cb.aio_offset = write->offset; + op->cb.aio_buf = (volatile void *)write->buffer; + op->cb.aio_nbytes = write->num_bytes; + op->cb.aio_reqprio = 0; + op->cb.aio_sigevent.sigev_notify = SIGEV_SIGNAL; + op->cb.aio_sigevent.sigev_signo = IO_SIGNAL; + op->cb.aio_sigevent.sigev_value.sival_ptr = op; + + if (aio_write64(&op->cb) == -1) { + const char *err = strerror(errno); + rtReportError("aio", "aio_write64 failed for file: %s with error: %d (%s)", file_path, errno, err); + close(fd); + op->state = RT_AIO_STATE_INVALID; + *handle = RT_AIO_INVALID_HANDLE; + return; + } + + ptrdiff_t op_idx = op - _ringbuffer.storage; + *handle = (uint32_t)op_idx + 1; +} +#endif + +RT_DLLEXPORT rt_result rtSubmitWriteBatch(const rt_write_batch *batch, rt_aio_handle *handles) { + if (batch->num_writes > RT_LOAD_BATCH_MAX_SIZE) { + return RT_AIO_WRITE_TOO_LARGE; + } + + rt_ringbuffer_space rbspace = ReserveRingbufferSpace(batch->num_writes); + if (!rbspace.a) { + rtReportError("aio", "Too many pending file operations"); + return RT_AIO_TOO_MANY_OPERATIONS; + } + + for (unsigned int i = 0; i < batch->num_writes; ++i) { + rt_aio *op = (i < rbspace.a_count) ? &rbspace.a[i] : &rbspace.b[i - rbspace.a_count]; + op->state = RT_AIO_STATE_PENDING; + const char *file_path = rtGetFilePath(batch->writes[i].file); + if (!file_path) { + rtReportError("aio", "Failed to resolve file path for a batched write"); + op->state = RT_AIO_STATE_INVALID; + handles[i] = RT_AIO_INVALID_HANDLE; + continue; + } +#ifdef _WIN32 + win32WriteBatchInner(file_path, &batch->writes[i], op, &handles[i]); +#elif defined(__linux__) + linuxWriteBatchInner(file_path, &batch->writes[i], op, &handles[i]); +#endif + } + + return RT_SUCCESS; +} + +RT_DLLEXPORT rt_aio_state rtGetAIOState(rt_aio_handle handle) { + if (handle == RT_AIO_INVALID_HANDLE || handle > _ringbuffer.capacity) + return RT_AIO_STATE_INVALID; +#ifdef _WIN32 + /* Give the compation function an opportunity to run */ + SleepEx(0, TRUE); +#endif + rtLockMutex(_ringbuffer.guard); + rt_aio_state state = _ringbuffer.storage[handle - 1].state; + rtUnlockMutex(_ringbuffer.guard); + return state; +} + +RT_DLLEXPORT void rtReleaseAIO(rt_aio_handle handle) { + if (handle == RT_AIO_INVALID_HANDLE || handle > _ringbuffer.capacity) { + return; + } + rtLockMutex(_ringbuffer.guard); + _ringbuffer.storage[handle - 1].state = RT_AIO_STATE_INVALID; + if (handle - 1 == _ringbuffer.tail) { + /* Advance the tail such that it points to the last used slot. (Or to head, if the + * ringbuffer is now empty) */ + uint32_t i = _ringbuffer.tail; + while ((_ringbuffer.storage[i].state == RT_AIO_STATE_INVALID) && i != _ringbuffer.head) { + i = (i + 1) % _ringbuffer.capacity; + } + _ringbuffer.tail = i; + } + rtUnlockMutex(_ringbuffer.guard); +} + +RT_DLLEXPORT rt_aio_state rtWaitForAIOCompletion(rt_aio_handle handle) { + if (handle == RT_AIO_INVALID_HANDLE || handle > _ringbuffer.capacity) + return RT_AIO_STATE_INVALID; + rt_aio_state state; + do { + state = rtGetAIOState(handle); + /* NOTE(Kevin): This is where we could temporarily run a job. */ +#ifdef _WIN32 + YieldProcessor(); +#elif defined(__linux__) + sched_yield(); +#endif + } while (state == RT_AIO_STATE_PENDING); + return state; +} + +RT_DLLEXPORT rt_result rtSubmitSingleLoad(rt_file_load load, rt_aio_handle *handle) { + rt_load_batch batch; + batch.loads[0] = load; + batch.num_loads = 1; + return rtSubmitLoadBatch(&batch, handle); +} + +RT_DLLEXPORT rt_aio_state rtSubmitSingleLoadSync(rt_file_load load) { + rt_aio_handle handle; + if (rtSubmitSingleLoad(load, &handle) != RT_SUCCESS) + return RT_AIO_STATE_FAILED; + rt_aio_state state = rtWaitForAIOCompletion(handle); + rtReleaseAIO(handle); + return state; +} diff --git a/src/runtime/aio.h b/src/runtime/rt_aio.h similarity index 100% rename from src/runtime/aio.h rename to src/runtime/rt_aio.h