From 1a4a2109ca1022945af7e603325294583007763f Mon Sep 17 00:00:00 2001 From: Kevin Trogant Date: Wed, 22 Nov 2023 14:25:37 +0100 Subject: [PATCH] feat: async io for windows Submit async-io to the OS. Next step is replacing all FIO calls with async-io and removing the old code. --- docs/NOTES_aio_assets.md | 29 ++++ meson.build | 4 + src/runtime/aio.c | 247 +++++++++++++++++++++++++++++++++ src/runtime/aio.h | 66 +++++++++ src/runtime/app.c | 11 ++ src/runtime/file_tab.c | 131 +++++++++++++++++ src/runtime/file_tab.h | 26 ++++ src/runtime/fio.c | 164 ---------------------- src/runtime/fio.h | 13 +- src/runtime/threading_cond.c | 86 ++++++++++++ src/runtime/threading_mutex.c | 28 +++- src/runtime/threading_thread.c | 100 ++++++++++++- 12 files changed, 726 insertions(+), 179 deletions(-) create mode 100644 docs/NOTES_aio_assets.md create mode 100644 src/runtime/aio.c create mode 100644 src/runtime/aio.h create mode 100644 src/runtime/file_tab.c create mode 100644 src/runtime/file_tab.h diff --git a/docs/NOTES_aio_assets.md b/docs/NOTES_aio_assets.md new file mode 100644 index 0000000..b7c29d3 --- /dev/null +++ b/docs/NOTES_aio_assets.md @@ -0,0 +1,29 @@ +# AIO & Assets + +## Async IO: +- Batches of loads (vy_load_batch) +- Each load: file-id, offset-in-file, num-bytes, destination buffer +- SubmitLoadBatch() -> Handle for whole batch or handles for each file op? +- Reason: Better saturation of disk interface +- Batch interacts nicely with asset system described below: + +## Assets +- Have a tool that detects asset dependencies (maybe the same tool that bakes assets into their runtime format) +- i.e. world-cell -> meshes -> textures +- Bake into a single binary (asset_meta.bin) +- Have that file loaded at runtime at all times +- DetermineAssetDependencies() -> CreateLoadBatch(asset-id) +- CreateLoadBatch() could take a cache into account + +### File Storage in Memory +- Linked lists of block-regions with fixed size blocks (bitmap allocator) +- E.g. 1, 2, 3, 5 mb (maybe down to kb) blocks +- Blocks have refcounts, explicitly increased/decreased +- Either lock the whole region or lock individual blocks + +### Cache +- Hold one reference to storage +- Track how much space is taken for cache +- Evict LRU(?) once taken space exceeds threshold +- Copies could be managers handed to the cache + diff --git a/meson.build b/meson.build index 25056cd..265feb9 100644 --- a/meson.build +++ b/meson.build @@ -50,6 +50,8 @@ runtime_lib = library('vyrt', 'src/runtime/app.h', 'src/runtime/dynamic_libs.h', 'src/runtime/jobs.h', + 'src/runtime/aio.h', + 'src/runtime/file_tab.h', 'src/runtime/error_report.c', 'src/runtime/gfx_main.c', @@ -63,6 +65,8 @@ runtime_lib = library('vyrt', 'src/runtime/app.c', 'src/runtime/dynamic_libs.c', 'src/runtime/jobs.c', + 'src/runtime/aio.c', + 'src/runtime/file_tab.c', # Contrib Sources 'contrib/xxhash/xxhash.c', diff --git a/src/runtime/aio.c b/src/runtime/aio.c new file mode 100644 index 0000000..f7f76e6 --- /dev/null +++ b/src/runtime/aio.c @@ -0,0 +1,247 @@ +#include "aio.h" +#include "threading.h" + +#ifdef _WIN32 +#define WIN32_LEAN_AND_MEAN +#include +#endif + +#include +#include + +/* Maintain a ringbuffer of pending operations */ + + +typedef struct { +#ifdef _WIN32 + HANDLE file_handle; + OVERLAPPED overlapped; +#endif + volatile vy_aio_state state; +} vy_aio; + +typedef struct { + vy_mutex *guard; + + vy_aio *storage; + uint32_t capacity; + uint32_t head; + uint32_t tail; +} vy_aio_ringbuffer; + +typedef struct { + vy_aio *a; + vy_aio *b; + uint32_t a_count; +} vy_ringbuffer_space; + +static vy_aio_ringbuffer _ringbuffer; + +static vy_ringbuffer_space ReserveRingbufferSpace(uint32_t count) { + if (!vyLockMutex(_ringbuffer.guard)) { + vy_ringbuffer_space failed = {NULL, NULL, 0}; + return failed; + } + + vy_ringbuffer_space result = {NULL, NULL, 0}; + + if (_ringbuffer.head >= _ringbuffer.tail) { + if (_ringbuffer.head + count <= _ringbuffer.capacity) { + result.a_count = count; + result.a = &_ringbuffer.storage[_ringbuffer.head]; + _ringbuffer.head = + (_ringbuffer.head + count) % _ringbuffer.capacity; + } else { + /* Check if enough space is free at the end */ + uint32_t a_count = _ringbuffer.capacity - _ringbuffer.head; + uint32_t b_count = count - a_count; + + if (b_count <= _ringbuffer.tail) { + result.a_count = a_count; + result.a = &_ringbuffer.storage[_ringbuffer.head]; + result.b = &_ringbuffer.storage[0]; + _ringbuffer.head = b_count; + } else { + /* Not enough space, we would overwrite the tail */ + vyLog("aio", "Ringbuffer is full."); + } + } + } else { + /* Head is lower than tail */ + uint32_t num_free = _ringbuffer.tail - _ringbuffer.head; + if (count < num_free) { + result.a_count = count; + result.a = &_ringbuffer.storage[_ringbuffer.head]; + _ringbuffer.head = + (_ringbuffer.head + count) % _ringbuffer.capacity; + } else { + /* Not enough space, we would overwrite the tail */ + vyLog("aio", "Ringbuffer is full."); + } + } + + vyUnlockMutex(_ringbuffer.guard); + return result; +} + +#ifdef _WIN32 +static void win32CompletionRoutine(DWORD error_code, + DWORD num_bytes_transfered, + LPOVERLAPPED overlapped) { + vy_aio *op = (vy_aio*)overlapped->hEvent; + assert(op->state == VY_AIO_STATE_PENDING); + + if (error_code != ERROR_SUCCESS) { + op->state = VY_AIO_STATE_FAILED; + vyLog("aio", "Async io failed: %u", error_code); + } else { + op->state = VY_AIO_STATE_FINISHED; + } + + CloseHandle(op->file_handle); +} +#endif + + +VY_DLLEXPORT vy_result vyInitAIO(unsigned int max_concurrent_operations) { + _ringbuffer.guard = vyCreateMutex(); + if (!_ringbuffer.guard) { + return VY_AIO_OUT_OF_MEMORY; + } + if (max_concurrent_operations == 0) + max_concurrent_operations = 1024; + + _ringbuffer.storage = calloc(max_concurrent_operations, sizeof(vy_aio)); + if (!_ringbuffer.storage) + return VY_AIO_OUT_OF_MEMORY; + _ringbuffer.head = 0; + _ringbuffer.tail = 0; + _ringbuffer.capacity = max_concurrent_operations; + return VY_SUCCESS; +} + +VY_DLLEXPORT void vyShutdownAIO(void) { + vyDestroyMutex(_ringbuffer.guard); + free(_ringbuffer.storage); + _ringbuffer.capacity = 0; +} + +VY_DLLEXPORT vy_result vySubmitLoadBatch(const vy_load_batch *batch, + vy_aio_handle *handles) { + if (batch->num_loads > VY_LOAD_BATCH_MAX_SIZE) { + return VY_AIO_LOAD_TOO_LARGE; + } + + vy_ringbuffer_space rbspace = ReserveRingbufferSpace(batch->num_loads); + if (!rbspace.a) { + vyReportError("aio", "Too many pending file operations"); + return VY_AIO_TOO_MANY_OPERATIONS; + } + + for (unsigned int i = 0; i < batch->num_loads; ++i) { + vy_aio *op = (i < rbspace.a_count) ? &rbspace.a[i] + : &rbspace.b[i - rbspace.a_count]; + op->state = VY_AIO_STATE_PENDING; + const char *file_path = vyGetFilePath(batch->loads[i].file); + if (!file_path) { + vyReportError("aio", + "Failed to resolve file path for a batched load"); + op->state = VY_AIO_STATE_INVALID; + handles[i] = VY_AIO_INVALID_HANDLE; + continue; + } +#ifdef _WIN32 + op->overlapped = (OVERLAPPED){ + /* ReadFileEx does not use hEvent and we are free to use it for our own purposes. */ + .hEvent = (HANDLE)(op), + .Internal = 0, + .InternalHigh = 0, + .Offset = (DWORD)(batch->loads[i].offset & MAXDWORD), + .OffsetHigh = (DWORD)(batch->loads[i].offset >> 32), + .Pointer = NULL, + }; + + WCHAR wpath[MAX_PATH]; + if (MultiByteToWideChar(CP_UTF8, + MB_PRECOMPOSED, + file_path, + -1, + wpath, + VY_ARRAY_COUNT(wpath)) == 0) { + vyReportError("aio", + "MultiByteToWideChar failed with error code: %u", + GetLastError()); + op->state = VY_AIO_STATE_FINISHED; + handles[i] = VY_AIO_INVALID_HANDLE; + continue; + } + + HANDLE file_handle = CreateFileW(wpath, + GENERIC_READ, + FILE_SHARE_READ, + NULL, + OPEN_EXISTING, + FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, + NULL); + if (file_handle == INVALID_HANDLE_VALUE) { + vyReportError("aio", + "CreateFileW failed for file: %s with error code: %u", + file_path, + GetLastError()); + op->state = VY_AIO_STATE_INVALID; + handles[i] = VY_AIO_INVALID_HANDLE; + continue; + } + op->file_handle = file_handle; + BOOL result = ReadFileEx(file_handle, + batch->loads[i].dest, + (DWORD)batch->loads[i].num_bytes, + &op->overlapped, + win32CompletionRoutine); + DWORD err = GetLastError(); + if (!result || err != ERROR_SUCCESS) { + vyReportError("aio", "ReadFileEx failed with error code: %u", err); + op->state = VY_AIO_STATE_FINISHED; + handles[i] = VY_AIO_INVALID_HANDLE; + CloseHandle(file_handle); + op->file_handle = NULL; + } + + /* Handle is the index into the ringbuffer + 1 */ + ptrdiff_t op_idx = op - _ringbuffer.storage; + handles[i] = (uint32_t)op_idx + 1; +#endif + } + + return VY_SUCCESS; +} + +VY_DLLEXPORT volatile vy_aio_state vyGetAIOState(vy_aio_handle handle) { + if (handle == VY_AIO_INVALID_HANDLE || handle > _ringbuffer.capacity) + return VY_AIO_STATE_INVALID; +#ifdef _WIN32 + /* Give the compation function an opportunity to run */ + SleepEx(0, TRUE); +#endif + vyLockMutex(_ringbuffer.guard); + vy_aio_state state = _ringbuffer.storage[handle - 1].state; + vyUnlockMutex(_ringbuffer.guard); + return state; +} + + VY_DLLEXPORT void vyReleaseAIO(vy_aio_handle handle) { + if (handle == VY_AIO_INVALID_HANDLE || handle > _ringbuffer.capacity) { + return; + } + vyLockMutex(_ringbuffer.guard); + _ringbuffer.storage[handle - 1].state = VY_AIO_STATE_INVALID; + if (handle - 1 == _ringbuffer.tail) { + /* Advance the tail such that it points to the last used slot. (Or to head, if the ringbuffer is now empty) */ + uint32_t i = _ringbuffer.tail; + while ((_ringbuffer.storage[i].state == VY_AIO_STATE_INVALID) && i != _ringbuffer.head) { + i = (i + 1) % _ringbuffer.capacity; + } + _ringbuffer.tail = i; + } + vyUnlockMutex(_ringbuffer.guard); +} diff --git a/src/runtime/aio.h b/src/runtime/aio.h new file mode 100644 index 0000000..a263e06 --- /dev/null +++ b/src/runtime/aio.h @@ -0,0 +1,66 @@ +#ifndef VY_AIO_H +#define VY_AIO_H + +#include + +#include "runtime.h" +#include "file_tab.h" + +typedef struct { + size_t offset; /**< Starting offset inside the file in bytes */ + size_t num_bytes; /**< Number of bytes to load */ + /** Destination buffer with at least @ num_bytes bytes. + * Must be valid until the load is finished. + */ + void *dest; + vy_file_id file; /**< Unique identifier for the file */ +} vy_file_load; + +#define VY_LOAD_BATCH_MAX_SIZE 64 + +/** A batch of loads that will be started together. + * + * The aio system will hand these to the OS. + */ +typedef struct { + vy_file_load loads[VY_LOAD_BATCH_MAX_SIZE]; + + /** Must be smaller or equal to @c VY_LOAD_BATCH_MAX_SIZE */ + unsigned int num_loads; +} vy_load_batch; + + +#define VY_AIO_INVALID_HANDLE 0 + +/** Handle for an async io operation. Can be used to query the state and result. */ +typedef uint32_t vy_aio_handle; + +enum { + VY_AIO_LOAD_TOO_LARGE = (VY_SUCCESS + 1), + VY_AIO_TOO_MANY_OPERATIONS, + VY_AIO_OUT_OF_MEMORY, +}; + + +typedef enum { + VY_AIO_STATE_INVALID, + VY_AIO_STATE_PENDING, + VY_AIO_STATE_FINISHED, + VY_AIO_STATE_FAILED, +} vy_aio_state; + +VY_DLLEXPORT vy_result vyInitAIO(unsigned int max_concurrent_operations); + +VY_DLLEXPORT void vyShutdownAIO(void); + +VY_DLLEXPORT vy_result vySubmitLoadBatch(const vy_load_batch *batch, + vy_aio_handle *handles); + +VY_DLLEXPORT volatile vy_aio_state vyGetAIOState(vy_aio_handle handle); + +/* Releases the internal storage for the operation. + * The system is allowed to re-use the same handle value for new operations after this was called. + */ +VY_DLLEXPORT void vyReleaseAIO(vy_aio_handle handle); + +#endif diff --git a/src/runtime/app.c b/src/runtime/app.c index 09e4e0d..30eb4d2 100644 --- a/src/runtime/app.c +++ b/src/runtime/app.c @@ -2,6 +2,7 @@ #include "config.h" #include "fio.h" #include "gfx.h" +#include "aio.h" #include "renderer_api.h" extern void __RegisterRuntimeCVars(void); @@ -43,6 +44,16 @@ VY_DLLEXPORT int vyWin32Entry(HINSTANCE hInstance, return 1; } + if (vyInitFileTab(1024) != VY_SUCCESS) { + vyReportError("FTAB", "Init failed."); + return 1; + } + + if (vyInitAIO(0) != VY_SUCCESS) { + vyReportError("AIO", "Init failed."); + return 1; + } + WNDCLASSEXW wndclass = { .cbSize = sizeof(wndclass), .hInstance = hInstance, diff --git a/src/runtime/file_tab.c b/src/runtime/file_tab.c new file mode 100644 index 0000000..64a7f2e --- /dev/null +++ b/src/runtime/file_tab.c @@ -0,0 +1,131 @@ +#include "file_tab.h" +#include "threading.h" + +#include + +#define NAME_CAP(cap) ((cap)*128) +typedef struct { + vy_file_id *ids; + unsigned int *name_offsets; + char *names; + unsigned int capacity; + unsigned int name_head; + + vy_mutex *mutex; +} vy_file_tab; + +static vy_file_tab _file_tab; + +vy_result vyInitFileTab(unsigned int max_files) { + _file_tab.ids = calloc(max_files, sizeof(vy_file_id)); + if (!_file_tab.ids) + return 1; + _file_tab.name_offsets = calloc(max_files, sizeof(unsigned int)); + if (!_file_tab.name_offsets) + return 1; + _file_tab.names = malloc(NAME_CAP(max_files)); + if (!_file_tab.names) + return 1; + + _file_tab.capacity = max_files; + _file_tab.name_head = 0; + _file_tab.mutex = vyCreateMutex(); + return VY_SUCCESS; +} + +void vyShutdownFileTab(void) { + free(_file_tab.ids); + free(_file_tab.names); + free(_file_tab.name_offsets); + vyDestroyMutex(_file_tab.mutex); +} + + +vy_file_id vyGetFileId(const char *path) { + vy_text_span span; + span.start = path; + span.length = (unsigned int)strlen(path); + return vyGetFileIdFromSpan(span); +} + +vy_file_id vyGetFileIdFromSpan(vy_text_span path) { + /* Randomly choosen, aka finger smash keyboard */ + XXH64_hash_t seed = 15340978; + vy_file_id fid = (vy_file_id)XXH64(path.start, path.length, seed); + if (fid == 0) + fid = ~fid; + return fid; +} + +VY_DLLEXPORT vy_file_id vyAddFileFromSpan(vy_text_span path) { + vy_file_id fid = vyGetFileIdFromSpan(path); + + if (!vyLockMutex(_file_tab.mutex)) { + vyReportError("fio", "Failed to lock the guard mutex."); + return 0; + } + + /* Hash Insert */ + unsigned int i = 0; + unsigned int base = (unsigned int)(fid % _file_tab.capacity); + while (i < _file_tab.capacity) { + unsigned int at = (base + i) % _file_tab.capacity; + if (_file_tab.ids[at] == 0) { + /* Insert */ + unsigned int slen = (unsigned int)path.length + 1; + if ((_file_tab.name_head + slen) >= NAME_CAP(_file_tab.capacity)) { + /* Out of name storage */ + fid = 0; + break; + } + memcpy(_file_tab.names + _file_tab.name_head, path.start, slen); + _file_tab.name_offsets[at] = _file_tab.name_head; + _file_tab.ids[at] = fid; + _file_tab.name_head += slen; + break; + } else if (_file_tab.ids[at] == fid) { + break; + } + ++i; + } + + /* Out of space */ + if (i == _file_tab.capacity) + fid = 0; + + vyUnlockMutex(_file_tab.mutex); + return fid; +} + +VY_DLLEXPORT vy_file_id vyAddFile(const char *path) { + vy_text_span span; + span.start = path; + span.length = (unsigned int)strlen(path); + return vyAddFileFromSpan(span); +} + +VY_DLLEXPORT const char *vyGetFilePath(vy_file_id fid) { + /* Hash Lookup */ + if (fid == 0) + return NULL; + + if (!vyLockMutex(_file_tab.mutex)) { + vyReportError("fio", "Failed to lock the guard mutex."); + return 0; + } + const char *result = NULL; + unsigned int i = 0; + unsigned int base = (unsigned int)(fid % _file_tab.capacity); + while (i < _file_tab.capacity) { + unsigned int at = (base + i) % _file_tab.capacity; + if (_file_tab.ids[at] == fid) { + result = _file_tab.names + _file_tab.name_offsets[at]; + break; + } else if (_file_tab.ids[at] == 0) { + break; + } + ++i; + } + vyUnlockMutex(_file_tab.mutex); + return result; +} diff --git a/src/runtime/file_tab.h b/src/runtime/file_tab.h new file mode 100644 index 0000000..c7c10cc --- /dev/null +++ b/src/runtime/file_tab.h @@ -0,0 +1,26 @@ +#ifndef VY_FILE_TAB_H +#define VY_FILE_TAB_H + +#include "runtime.h" + +#include + +/* used to identify a file (XXH3 hash of the path) */ +typedef uint64_t vy_file_id; + +VY_DLLEXPORT vy_result vyInitFileTab(unsigned int max_files); + +VY_DLLEXPORT void vyShutdownFileTab(void); + +VY_DLLEXPORT vy_file_id vyGetFileId(const char *path); + +VY_DLLEXPORT vy_file_id vyGetFileIdFromSpan(vy_text_span path); + +VY_DLLEXPORT vy_file_id vyAddFile(const char *path); + +VY_DLLEXPORT vy_file_id vyAddFileFromSpan(vy_text_span path); + +VY_DLLEXPORT const char *vyGetFilePath(vy_file_id fid); + + +#endif diff --git a/src/runtime/fio.c b/src/runtime/fio.c index 3c042b9..8727b8c 100644 --- a/src/runtime/fio.c +++ b/src/runtime/fio.c @@ -4,8 +4,6 @@ #include #include -#include - #ifdef __linux__ #include #elif defined(_WIN32) @@ -38,23 +36,7 @@ typedef struct { #endif } vy_fio_queue; -#define NAME_CAP(cap) ((cap)*128) -typedef struct { - vy_file_id *ids; - unsigned int *name_offsets; - char *names; - unsigned int capacity; - unsigned int name_head; - -#ifdef __linux__ - pthread_mutex_t mutex; -#elif defined(_WIN32) - HANDLE mutex; -#endif -} vy_file_tab; - static vy_fio_queue _queue; -static vy_file_tab _file_tab; #ifdef __linux__ static pthread_t _thread; @@ -96,43 +78,6 @@ static void ShutdownFIOQueue(void) { #endif } -static bool InitFileTab(unsigned int max_files) { - _file_tab.ids = calloc(max_files, sizeof(vy_file_id)); - if (!_file_tab.ids) - return false; - _file_tab.name_offsets = calloc(max_files, sizeof(unsigned int)); - if (!_file_tab.name_offsets) - return false; - _file_tab.names = malloc(NAME_CAP(max_files)); - if (!_file_tab.names) - return false; - - _file_tab.capacity = max_files; - _file_tab.name_head = 0; - -#ifdef __linux__ - if (pthread_mutex_init(&_file_tab.mutex, NULL) != 0) - return false; -#elif defined(_WIN32) - _file_tab.mutex = CreateMutex(NULL, FALSE, NULL); - if (!_file_tab.mutex) - return false; -#endif - return true; -} - -static void ShutdownFileTab(void) { - free(_file_tab.ids); - free(_file_tab.names); - free(_file_tab.name_offsets); - -#ifdef __linux__ - pthread_mutex_destroy(&_file_tab.mutex); -#elif defined(_WIN32) - CloseHandle(_file_tab.mutex); -#endif -} - #ifdef __linux__ static void *linuxFIOThreadProc(void *); #elif defined(_WIN32) @@ -141,13 +86,8 @@ static DWORD WINAPI win32FIOThreadProc(_In_ LPVOID); VY_DLLEXPORT bool vyInitFIO(const vy_fio_config *config) { unsigned int queue_size = (config->queue_size) ? config->queue_size : 512; - unsigned int max_file_count = - (config->max_file_count) ? config->max_file_count : 512; - if (!InitFIOQueue(queue_size)) return false; - if (!InitFileTab(max_file_count)) - return false; #ifdef __linux__ if (pthread_create(&_thread, NULL, linuxFIOThreadProc, NULL) != 0) @@ -180,110 +120,6 @@ VY_DLLEXPORT void vyShutdownFIO(void) { } #endif ShutdownFIOQueue(); - ShutdownFileTab(); -} - -vy_file_id vyGetFileId(const char *path) { - vy_text_span span; - span.start = path; - span.length = (unsigned int)strlen(path); - return vyGetFileIdFromSpan(span); -} - -vy_file_id vyGetFileIdFromSpan(vy_text_span path) { - /* Randomly choosen, aka finger smash keyboard */ - XXH64_hash_t seed = 15340978; - vy_file_id fid = (vy_file_id)XXH64(path.start, path.length, seed); - if (fid == 0) - fid = ~fid; - return fid; -} - -VY_DLLEXPORT vy_file_id vyAddFileFromSpan(vy_text_span path) { - vy_file_id fid = vyGetFileIdFromSpan(path); - -#ifdef __linux__ - pthread_mutex_lock(&_file_tab.mutex); -#elif defined(_WIN32) - if (WaitForSingleObject(_file_tab.mutex, INFINITE) == WAIT_FAILED) { - vyReportError("fio", "WaitForSingleObject failed with WAIT_FAILED!"); - return 0; - } -#endif - /* Hash Insert */ - unsigned int i = 0; - unsigned int base = (unsigned int)(fid % _file_tab.capacity); - while (i < _file_tab.capacity) { - unsigned int at = (base + i) % _file_tab.capacity; - if (_file_tab.ids[at] == 0) { - /* Insert */ - unsigned int slen = (unsigned int)path.length + 1; - if ((_file_tab.name_head + slen) >= NAME_CAP(_file_tab.capacity)) { - /* Out of name storage */ - fid = 0; - break; - } - memcpy(_file_tab.names + _file_tab.name_head, path.start, slen); - _file_tab.name_offsets[at] = _file_tab.name_head; - _file_tab.ids[at] = fid; - _file_tab.name_head += slen; - break; - } else if (_file_tab.ids[at] == fid) { - break; - } - ++i; - } - - /* Out of space */ - if (i == _file_tab.capacity) - fid = 0; - -#ifdef __linux__ - pthread_mutex_unlock(&_file_tab.mutex); -#elif defined(_WIN32) - ReleaseMutex(_file_tab.mutex); -#endif - return fid; -} - -VY_DLLEXPORT vy_file_id vyAddFile(const char *path) { - vy_text_span span; - span.start = path; - span.length = (unsigned int)strlen(path); - return vyAddFileFromSpan(span); -} - -VY_DLLEXPORT const char *vyGetFilePath(vy_file_id fid) { - /* Hash Lookup */ - if (fid == 0) - return NULL; -#ifdef __linux__ - pthread_mutex_lock(&_file_tab.mutex); -#elif defined(_WIN32) - if (WaitForSingleObject(_file_tab.mutex, INFINITE) == WAIT_FAILED) { - vyReportError("fio", "WaitForSingleObject failed with WAIT_FAILED!"); - return 0; - } -#endif - const char *result = NULL; - unsigned int i = 0; - unsigned int base = (unsigned int)(fid % _file_tab.capacity); - while (i < _file_tab.capacity) { - unsigned int at = (base + i) % _file_tab.capacity; - if (_file_tab.ids[at] == fid) { - result = _file_tab.names + _file_tab.name_offsets[at]; - break; - } else if (_file_tab.ids[at] == 0) { - break; - } - ++i; - } -#ifdef __linux__ - pthread_mutex_unlock(&_file_tab.mutex); -#elif defined(_WIN32) - ReleaseMutex(_file_tab.mutex); -#endif - return result; } VY_DLLEXPORT vy_fio_handle vyEnqueueRead(vy_file_id fid) { diff --git a/src/runtime/fio.h b/src/runtime/fio.h index 7f49fbc..990282f 100644 --- a/src/runtime/fio.h +++ b/src/runtime/fio.h @@ -6,6 +6,7 @@ #include #include "runtime.h" +#include "file_tab.h" enum { VY_FILE_BUFFER_FLAG_FILE_NOT_FOUND = 0x1, @@ -22,8 +23,6 @@ static inline bool vyWasFileBufferSuccessful(const vy_file_buffer *fb) { return fb->flags == 0; } -/* used to identify a file (XXH3 hash of the path) */ -typedef uint64_t vy_file_id; typedef unsigned int vy_fio_handle; @@ -36,16 +35,6 @@ VY_DLLEXPORT bool vyInitFIO(const vy_fio_config *config); VY_DLLEXPORT void vyShutdownFIO(void); -VY_DLLEXPORT vy_file_id vyGetFileId(const char *path); - -VY_DLLEXPORT vy_file_id vyGetFileIdFromSpan(vy_text_span path); - -VY_DLLEXPORT vy_file_id vyAddFile(const char *path); - -VY_DLLEXPORT vy_file_id vyAddFileFromSpan(vy_text_span path); - -VY_DLLEXPORT const char *vyGetFilePath(vy_file_id fid); - VY_DLLEXPORT vy_fio_handle vyEnqueueRead(vy_file_id fid); VY_DLLEXPORT void vyAbortFIO(vy_fio_handle fio); diff --git a/src/runtime/threading_cond.c b/src/runtime/threading_cond.c index 10213f8..980392a 100644 --- a/src/runtime/threading_cond.c +++ b/src/runtime/threading_cond.c @@ -65,4 +65,90 @@ VY_DLLEXPORT void vyWaitOnConditionVar(vy_condition_var *var) { pthread_cond_wait(&var->cond, &var->mutex); } +#elif defined(_WIN32) + +#define WIN32_LEAN_AND_MEAN + #include +struct vy_condition_var_s { + CRITICAL_SECTION critical_section; + CONDITION_VARIABLE cond; + ptrdiff_t next_reusable; +}; + + #define MAX_CONDS 1024 +vy_condition_var _conds[MAX_CONDS]; +static ptrdiff_t _first_reusable = MAX_CONDS; +static ptrdiff_t _next = 0; +static HANDLE _guard; +static INIT_ONCE _guard_init = INIT_ONCE_STATIC_INIT; + +static BOOL CALLBACK InitGuardFn(PINIT_ONCE initOnce, + PVOID parameter, + PVOID *context) { + VY_UNUSED(initOnce); + VY_UNUSED(parameter); + VY_UNUSED(context); + + _guard = CreateMutexW(NULL, FALSE, NULL); + return _guard != NULL; +} + +VY_DLLEXPORT vy_condition_var *vyCreateConditionVar(void) { + if (!InitOnceExecuteOnce(&_guard_init, InitGuardFn, NULL, NULL)) { + vyReportError("core", "Failed to initialize the guard mutex."); + return NULL; + } + + if (WaitForSingleObjectEx(_guard, INFINITE, TRUE) != WAIT_OBJECT_0) { + vyLog("core", "Failed to lock the guard variable: %u", GetLastError()); + return NULL; + } + if (_first_reusable < MAX_CONDS) { + vy_condition_var *cond = &_conds[_first_reusable]; + _first_reusable = cond->next_reusable; + ReleaseMutex(_guard); + return cond; + } else if (_next < MAX_CONDS) { + vy_condition_var *cond = &_conds[_next]; + if (!InitializeCriticalSectionAndSpinCount(&cond->critical_section, 4000)) { + vyLog("core", "Condition variable creation failed"); + ReleaseMutex(_guard); + return NULL; + } + InitializeConditionVariable(&cond->cond); + cond->next_reusable = MAX_CONDS; + ++_next; + ReleaseMutex(_guard); + return cond; + } + vyReportError("core", "Ran out of condition variable objects"); + ReleaseMutex(_guard); + return NULL; +} + +VY_DLLEXPORT void vyDestroyConditionVar(vy_condition_var *var) { + ptrdiff_t index = var - &_conds[0]; + if (WaitForSingleObjectEx(_guard, INFINITE, TRUE) != WAIT_OBJECT_0) { + vyLog("core", "Failed to lock the guard variable: %u", GetLastError()); + return; + } + var->next_reusable = _first_reusable; + _first_reusable = index; + ReleaseMutex(_guard); +} + +VY_DLLEXPORT void vyLockConditionVar(vy_condition_var *var) { + EnterCriticalSection(&var->critical_section); +} + +VY_DLLEXPORT void vyUnlockConditionVar(vy_condition_var *var, bool signal) { + LeaveCriticalSection(&var->critical_section); + if (signal) + WakeAllConditionVariable(&var->cond); +} + +VY_DLLEXPORT void vyWaitOnConditionVar(vy_condition_var *var) { + SleepConditionVariableCS(&var->cond, &var->critical_section, INFINITE); +} + #endif diff --git a/src/runtime/threading_mutex.c b/src/runtime/threading_mutex.c index 9049661..2619064 100644 --- a/src/runtime/threading_mutex.c +++ b/src/runtime/threading_mutex.c @@ -14,8 +14,31 @@ struct vy_mutex_s { static vy_mutex _mutex[MAX_MUTEX]; static ptrdiff_t _first_reusable = MAX_MUTEX; static ptrdiff_t _next = 0; +static HANDLE _guard; +static INIT_ONCE _guard_init = INIT_ONCE_STATIC_INIT; + +static BOOL CALLBACK InitGuardFn(PINIT_ONCE initOnce, + PVOID parameter, + PVOID *context) { + VY_UNUSED(initOnce); + VY_UNUSED(parameter); + VY_UNUSED(context); + + _guard = CreateMutexW(NULL, FALSE, NULL); + return _guard != NULL; +} + VY_DLLEXPORT vy_mutex *vyCreateMutex(void) { + if (!InitOnceExecuteOnce(&_guard_init, InitGuardFn, NULL, NULL)) { + vyReportError("core", "Failed to initialize the guard mutex."); + return NULL; + } + + if (WaitForSingleObjectEx(_guard, INFINITE, TRUE) != WAIT_OBJECT_0) { + vyLog("core", "Failed to lock the guard variable: %u", GetLastError()); + return NULL; + } if (_first_reusable < MAX_MUTEX) { vy_mutex *mtx = &_mutex[_first_reusable]; _first_reusable = mtx->next_reusable; @@ -42,9 +65,10 @@ VY_DLLEXPORT void vyDestroyMutex(vy_mutex *mutex) { } VY_DLLEXPORT bool vyLockMutex(vy_mutex *mutex) { - return WaitForSingleObject(mutex->handle, INFINITE) == WAIT_OBJECT_0; + return WaitForSingleObjectEx(mutex->handle, INFINITE, TRUE) == WAIT_OBJECT_0; } -v VY_DLLEXPORT bool vyUnlockMutex(vy_mutex *mutex) { + +VY_DLLEXPORT bool vyUnlockMutex(vy_mutex *mutex) { return ReleaseMutex(mutex->handle) != 0; } diff --git a/src/runtime/threading_thread.c b/src/runtime/threading_thread.c index 7a53fa2..d615c8c 100644 --- a/src/runtime/threading_thread.c +++ b/src/runtime/threading_thread.c @@ -2,6 +2,104 @@ #include "threading.h" #if defined(_WIN32) + +#define WIN32_LEAN_AND_MEAN +#include + +struct vy_thread_s { + HANDLE handle; + + ptrdiff_t next_reusable; + + vy_thread_entry_fn *entry; + void *param; + + bool needs_join; +}; + + #define MAX_THREADS 256 +static vy_thread _threads[MAX_THREADS]; +static ptrdiff_t _first_reusable = MAX_THREADS; +static ptrdiff_t _next = 0; + +static HANDLE _guard; +static INIT_ONCE _guard_init = INIT_ONCE_STATIC_INIT; + +static BOOL CALLBACK InitGuardFn(PINIT_ONCE initOnce, + PVOID parameter, + PVOID *context) { + VY_UNUSED(initOnce); + VY_UNUSED(parameter); + VY_UNUSED(context); + + _guard = CreateMutexW(NULL, FALSE, NULL); + return _guard != NULL; +} + +static DWORD WINAPI win32ThreadWrapper(LPVOID arg) { + vy_thread *user_thread = arg; + user_thread->needs_join = false; + user_thread->entry(user_thread->param); + user_thread->needs_join = true; + return 0; +} + +VY_DLLEXPORT vy_thread *vySpawnThread(vy_thread_entry_fn *entry, void *param) { + + if (!InitOnceExecuteOnce(&_guard_init, InitGuardFn, NULL, NULL)) { + vyReportError("core", "Failed to initialize the guard mutex."); + return NULL; + } + + vy_thread *thrd = NULL; + if (WaitForSingleObject(_guard, INFINITE) != WAIT_OBJECT_0) { + vyLog("core", "Failed to lock the guard variable: %u", GetLastError()); + return NULL; + } + if (_first_reusable < MAX_THREADS) { + thrd = &_threads[_first_reusable]; + _first_reusable = thrd->next_reusable; + if (thrd->needs_join) { + WaitForSingleObject(thrd->handle, INFINITE); + CloseHandle(thrd->handle); + thrd->needs_join = false; + } + } else if (_next < MAX_THREADS) { + thrd = &_threads[_next]; + thrd->next_reusable = MAX_THREADS; + ++_next; + } + if (thrd) { + thrd->entry = entry; + thrd->param = param; + thrd->handle = CreateThread(NULL, 0, win32ThreadWrapper, (LPVOID)thrd, 0, NULL); + if (thrd->handle == NULL) { + vyLog("core", "Thread creation failed"); + thrd = NULL; + } + } else { + vyReportError("core", "Ran out of thread objects"); + } + ReleaseMutex(_guard); + return thrd; +} + +VY_DLLEXPORT void vyJoinThread(vy_thread *thread) { + WaitForSingleObject(thread->handle, INFINITE); + CloseHandle(thread->handle); + thread->needs_join = false; + ptrdiff_t index = thread - &_threads[0]; + + if (WaitForSingleObject(_guard, INFINITE) != WAIT_OBJECT_0) { + vyLog("core", "Failed to lock the guard variable: %u", GetLastError()); + return; + } + thread->next_reusable = _first_reusable; + _first_reusable = index; + ReleaseMutex(_guard); +} + + #elif defined(__linux__) #include @@ -51,7 +149,7 @@ VY_DLLEXPORT vy_thread *vySpawnThread(vy_thread_entry_fn *entry, void *param) { thrd->param = param; if (pthread_create(&thrd->handle, NULL, linuxThreadWrapper, thrd) != 0) { - vyLog("core", "Mutex creation failed"); + vyLog("core", "Thread creation failed"); thrd = NULL; } } else {