feat: async io for windows

Submit async-io to the OS.
Next step is replacing all FIO calls with async-io and removing the old
code.
This commit is contained in:
Kevin Trogant 2023-11-22 14:25:37 +01:00
parent 97adb0dffa
commit 1a4a2109ca
12 changed files with 726 additions and 179 deletions

29
docs/NOTES_aio_assets.md Normal file
View File

@ -0,0 +1,29 @@
# AIO & Assets
## Async IO:
- Batches of loads (vy_load_batch)
- Each load: file-id, offset-in-file, num-bytes, destination buffer
- SubmitLoadBatch() -> Handle for whole batch or handles for each file op?
- Reason: Better saturation of disk interface
- Batch interacts nicely with asset system described below:
## Assets
- Have a tool that detects asset dependencies (maybe the same tool that bakes assets into their runtime format)
- i.e. world-cell -> meshes -> textures
- Bake into a single binary (asset_meta.bin)
- Have that file loaded at runtime at all times
- DetermineAssetDependencies() -> CreateLoadBatch(asset-id)
- CreateLoadBatch() could take a cache into account
### File Storage in Memory
- Linked lists of block-regions with fixed size blocks (bitmap allocator)
- E.g. 1, 2, 3, 5 mb (maybe down to kb) blocks
- Blocks have refcounts, explicitly increased/decreased
- Either lock the whole region or lock individual blocks
### Cache
- Hold one reference to storage
- Track how much space is taken for cache
- Evict LRU(?) once taken space exceeds threshold
- Copies could be managers handed to the cache

View File

@ -50,6 +50,8 @@ runtime_lib = library('vyrt',
'src/runtime/app.h',
'src/runtime/dynamic_libs.h',
'src/runtime/jobs.h',
'src/runtime/aio.h',
'src/runtime/file_tab.h',
'src/runtime/error_report.c',
'src/runtime/gfx_main.c',
@ -63,6 +65,8 @@ runtime_lib = library('vyrt',
'src/runtime/app.c',
'src/runtime/dynamic_libs.c',
'src/runtime/jobs.c',
'src/runtime/aio.c',
'src/runtime/file_tab.c',
# Contrib Sources
'contrib/xxhash/xxhash.c',

247
src/runtime/aio.c Normal file
View File

@ -0,0 +1,247 @@
#include "aio.h"
#include "threading.h"
#ifdef _WIN32
#define WIN32_LEAN_AND_MEAN
#include <Windows.h>
#endif
#include <assert.h>
#include <stdlib.h>
/* Maintain a ringbuffer of pending operations */
typedef struct {
#ifdef _WIN32
HANDLE file_handle;
OVERLAPPED overlapped;
#endif
volatile vy_aio_state state;
} vy_aio;
typedef struct {
vy_mutex *guard;
vy_aio *storage;
uint32_t capacity;
uint32_t head;
uint32_t tail;
} vy_aio_ringbuffer;
typedef struct {
vy_aio *a;
vy_aio *b;
uint32_t a_count;
} vy_ringbuffer_space;
static vy_aio_ringbuffer _ringbuffer;
static vy_ringbuffer_space ReserveRingbufferSpace(uint32_t count) {
if (!vyLockMutex(_ringbuffer.guard)) {
vy_ringbuffer_space failed = {NULL, NULL, 0};
return failed;
}
vy_ringbuffer_space result = {NULL, NULL, 0};
if (_ringbuffer.head >= _ringbuffer.tail) {
if (_ringbuffer.head + count <= _ringbuffer.capacity) {
result.a_count = count;
result.a = &_ringbuffer.storage[_ringbuffer.head];
_ringbuffer.head =
(_ringbuffer.head + count) % _ringbuffer.capacity;
} else {
/* Check if enough space is free at the end */
uint32_t a_count = _ringbuffer.capacity - _ringbuffer.head;
uint32_t b_count = count - a_count;
if (b_count <= _ringbuffer.tail) {
result.a_count = a_count;
result.a = &_ringbuffer.storage[_ringbuffer.head];
result.b = &_ringbuffer.storage[0];
_ringbuffer.head = b_count;
} else {
/* Not enough space, we would overwrite the tail */
vyLog("aio", "Ringbuffer is full.");
}
}
} else {
/* Head is lower than tail */
uint32_t num_free = _ringbuffer.tail - _ringbuffer.head;
if (count < num_free) {
result.a_count = count;
result.a = &_ringbuffer.storage[_ringbuffer.head];
_ringbuffer.head =
(_ringbuffer.head + count) % _ringbuffer.capacity;
} else {
/* Not enough space, we would overwrite the tail */
vyLog("aio", "Ringbuffer is full.");
}
}
vyUnlockMutex(_ringbuffer.guard);
return result;
}
#ifdef _WIN32
static void win32CompletionRoutine(DWORD error_code,
DWORD num_bytes_transfered,
LPOVERLAPPED overlapped) {
vy_aio *op = (vy_aio*)overlapped->hEvent;
assert(op->state == VY_AIO_STATE_PENDING);
if (error_code != ERROR_SUCCESS) {
op->state = VY_AIO_STATE_FAILED;
vyLog("aio", "Async io failed: %u", error_code);
} else {
op->state = VY_AIO_STATE_FINISHED;
}
CloseHandle(op->file_handle);
}
#endif
VY_DLLEXPORT vy_result vyInitAIO(unsigned int max_concurrent_operations) {
_ringbuffer.guard = vyCreateMutex();
if (!_ringbuffer.guard) {
return VY_AIO_OUT_OF_MEMORY;
}
if (max_concurrent_operations == 0)
max_concurrent_operations = 1024;
_ringbuffer.storage = calloc(max_concurrent_operations, sizeof(vy_aio));
if (!_ringbuffer.storage)
return VY_AIO_OUT_OF_MEMORY;
_ringbuffer.head = 0;
_ringbuffer.tail = 0;
_ringbuffer.capacity = max_concurrent_operations;
return VY_SUCCESS;
}
VY_DLLEXPORT void vyShutdownAIO(void) {
vyDestroyMutex(_ringbuffer.guard);
free(_ringbuffer.storage);
_ringbuffer.capacity = 0;
}
VY_DLLEXPORT vy_result vySubmitLoadBatch(const vy_load_batch *batch,
vy_aio_handle *handles) {
if (batch->num_loads > VY_LOAD_BATCH_MAX_SIZE) {
return VY_AIO_LOAD_TOO_LARGE;
}
vy_ringbuffer_space rbspace = ReserveRingbufferSpace(batch->num_loads);
if (!rbspace.a) {
vyReportError("aio", "Too many pending file operations");
return VY_AIO_TOO_MANY_OPERATIONS;
}
for (unsigned int i = 0; i < batch->num_loads; ++i) {
vy_aio *op = (i < rbspace.a_count) ? &rbspace.a[i]
: &rbspace.b[i - rbspace.a_count];
op->state = VY_AIO_STATE_PENDING;
const char *file_path = vyGetFilePath(batch->loads[i].file);
if (!file_path) {
vyReportError("aio",
"Failed to resolve file path for a batched load");
op->state = VY_AIO_STATE_INVALID;
handles[i] = VY_AIO_INVALID_HANDLE;
continue;
}
#ifdef _WIN32
op->overlapped = (OVERLAPPED){
/* ReadFileEx does not use hEvent and we are free to use it for our own purposes. */
.hEvent = (HANDLE)(op),
.Internal = 0,
.InternalHigh = 0,
.Offset = (DWORD)(batch->loads[i].offset & MAXDWORD),
.OffsetHigh = (DWORD)(batch->loads[i].offset >> 32),
.Pointer = NULL,
};
WCHAR wpath[MAX_PATH];
if (MultiByteToWideChar(CP_UTF8,
MB_PRECOMPOSED,
file_path,
-1,
wpath,
VY_ARRAY_COUNT(wpath)) == 0) {
vyReportError("aio",
"MultiByteToWideChar failed with error code: %u",
GetLastError());
op->state = VY_AIO_STATE_FINISHED;
handles[i] = VY_AIO_INVALID_HANDLE;
continue;
}
HANDLE file_handle = CreateFileW(wpath,
GENERIC_READ,
FILE_SHARE_READ,
NULL,
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED,
NULL);
if (file_handle == INVALID_HANDLE_VALUE) {
vyReportError("aio",
"CreateFileW failed for file: %s with error code: %u",
file_path,
GetLastError());
op->state = VY_AIO_STATE_INVALID;
handles[i] = VY_AIO_INVALID_HANDLE;
continue;
}
op->file_handle = file_handle;
BOOL result = ReadFileEx(file_handle,
batch->loads[i].dest,
(DWORD)batch->loads[i].num_bytes,
&op->overlapped,
win32CompletionRoutine);
DWORD err = GetLastError();
if (!result || err != ERROR_SUCCESS) {
vyReportError("aio", "ReadFileEx failed with error code: %u", err);
op->state = VY_AIO_STATE_FINISHED;
handles[i] = VY_AIO_INVALID_HANDLE;
CloseHandle(file_handle);
op->file_handle = NULL;
}
/* Handle is the index into the ringbuffer + 1 */
ptrdiff_t op_idx = op - _ringbuffer.storage;
handles[i] = (uint32_t)op_idx + 1;
#endif
}
return VY_SUCCESS;
}
VY_DLLEXPORT volatile vy_aio_state vyGetAIOState(vy_aio_handle handle) {
if (handle == VY_AIO_INVALID_HANDLE || handle > _ringbuffer.capacity)
return VY_AIO_STATE_INVALID;
#ifdef _WIN32
/* Give the compation function an opportunity to run */
SleepEx(0, TRUE);
#endif
vyLockMutex(_ringbuffer.guard);
vy_aio_state state = _ringbuffer.storage[handle - 1].state;
vyUnlockMutex(_ringbuffer.guard);
return state;
}
VY_DLLEXPORT void vyReleaseAIO(vy_aio_handle handle) {
if (handle == VY_AIO_INVALID_HANDLE || handle > _ringbuffer.capacity) {
return;
}
vyLockMutex(_ringbuffer.guard);
_ringbuffer.storage[handle - 1].state = VY_AIO_STATE_INVALID;
if (handle - 1 == _ringbuffer.tail) {
/* Advance the tail such that it points to the last used slot. (Or to head, if the ringbuffer is now empty) */
uint32_t i = _ringbuffer.tail;
while ((_ringbuffer.storage[i].state == VY_AIO_STATE_INVALID) && i != _ringbuffer.head) {
i = (i + 1) % _ringbuffer.capacity;
}
_ringbuffer.tail = i;
}
vyUnlockMutex(_ringbuffer.guard);
}

66
src/runtime/aio.h Normal file
View File

@ -0,0 +1,66 @@
#ifndef VY_AIO_H
#define VY_AIO_H
#include <stdint.h>
#include "runtime.h"
#include "file_tab.h"
typedef struct {
size_t offset; /**< Starting offset inside the file in bytes */
size_t num_bytes; /**< Number of bytes to load */
/** Destination buffer with at least @ num_bytes bytes.
* Must be valid until the load is finished.
*/
void *dest;
vy_file_id file; /**< Unique identifier for the file */
} vy_file_load;
#define VY_LOAD_BATCH_MAX_SIZE 64
/** A batch of loads that will be started together.
*
* The aio system will hand these to the OS.
*/
typedef struct {
vy_file_load loads[VY_LOAD_BATCH_MAX_SIZE];
/** Must be smaller or equal to @c VY_LOAD_BATCH_MAX_SIZE */
unsigned int num_loads;
} vy_load_batch;
#define VY_AIO_INVALID_HANDLE 0
/** Handle for an async io operation. Can be used to query the state and result. */
typedef uint32_t vy_aio_handle;
enum {
VY_AIO_LOAD_TOO_LARGE = (VY_SUCCESS + 1),
VY_AIO_TOO_MANY_OPERATIONS,
VY_AIO_OUT_OF_MEMORY,
};
typedef enum {
VY_AIO_STATE_INVALID,
VY_AIO_STATE_PENDING,
VY_AIO_STATE_FINISHED,
VY_AIO_STATE_FAILED,
} vy_aio_state;
VY_DLLEXPORT vy_result vyInitAIO(unsigned int max_concurrent_operations);
VY_DLLEXPORT void vyShutdownAIO(void);
VY_DLLEXPORT vy_result vySubmitLoadBatch(const vy_load_batch *batch,
vy_aio_handle *handles);
VY_DLLEXPORT volatile vy_aio_state vyGetAIOState(vy_aio_handle handle);
/* Releases the internal storage for the operation.
* The system is allowed to re-use the same handle value for new operations after this was called.
*/
VY_DLLEXPORT void vyReleaseAIO(vy_aio_handle handle);
#endif

View File

@ -2,6 +2,7 @@
#include "config.h"
#include "fio.h"
#include "gfx.h"
#include "aio.h"
#include "renderer_api.h"
extern void __RegisterRuntimeCVars(void);
@ -43,6 +44,16 @@ VY_DLLEXPORT int vyWin32Entry(HINSTANCE hInstance,
return 1;
}
if (vyInitFileTab(1024) != VY_SUCCESS) {
vyReportError("FTAB", "Init failed.");
return 1;
}
if (vyInitAIO(0) != VY_SUCCESS) {
vyReportError("AIO", "Init failed.");
return 1;
}
WNDCLASSEXW wndclass = {
.cbSize = sizeof(wndclass),
.hInstance = hInstance,

131
src/runtime/file_tab.c Normal file
View File

@ -0,0 +1,131 @@
#include "file_tab.h"
#include "threading.h"
#include <xxhash/xxhash.h>
#define NAME_CAP(cap) ((cap)*128)
typedef struct {
vy_file_id *ids;
unsigned int *name_offsets;
char *names;
unsigned int capacity;
unsigned int name_head;
vy_mutex *mutex;
} vy_file_tab;
static vy_file_tab _file_tab;
vy_result vyInitFileTab(unsigned int max_files) {
_file_tab.ids = calloc(max_files, sizeof(vy_file_id));
if (!_file_tab.ids)
return 1;
_file_tab.name_offsets = calloc(max_files, sizeof(unsigned int));
if (!_file_tab.name_offsets)
return 1;
_file_tab.names = malloc(NAME_CAP(max_files));
if (!_file_tab.names)
return 1;
_file_tab.capacity = max_files;
_file_tab.name_head = 0;
_file_tab.mutex = vyCreateMutex();
return VY_SUCCESS;
}
void vyShutdownFileTab(void) {
free(_file_tab.ids);
free(_file_tab.names);
free(_file_tab.name_offsets);
vyDestroyMutex(_file_tab.mutex);
}
vy_file_id vyGetFileId(const char *path) {
vy_text_span span;
span.start = path;
span.length = (unsigned int)strlen(path);
return vyGetFileIdFromSpan(span);
}
vy_file_id vyGetFileIdFromSpan(vy_text_span path) {
/* Randomly choosen, aka finger smash keyboard */
XXH64_hash_t seed = 15340978;
vy_file_id fid = (vy_file_id)XXH64(path.start, path.length, seed);
if (fid == 0)
fid = ~fid;
return fid;
}
VY_DLLEXPORT vy_file_id vyAddFileFromSpan(vy_text_span path) {
vy_file_id fid = vyGetFileIdFromSpan(path);
if (!vyLockMutex(_file_tab.mutex)) {
vyReportError("fio", "Failed to lock the guard mutex.");
return 0;
}
/* Hash Insert */
unsigned int i = 0;
unsigned int base = (unsigned int)(fid % _file_tab.capacity);
while (i < _file_tab.capacity) {
unsigned int at = (base + i) % _file_tab.capacity;
if (_file_tab.ids[at] == 0) {
/* Insert */
unsigned int slen = (unsigned int)path.length + 1;
if ((_file_tab.name_head + slen) >= NAME_CAP(_file_tab.capacity)) {
/* Out of name storage */
fid = 0;
break;
}
memcpy(_file_tab.names + _file_tab.name_head, path.start, slen);
_file_tab.name_offsets[at] = _file_tab.name_head;
_file_tab.ids[at] = fid;
_file_tab.name_head += slen;
break;
} else if (_file_tab.ids[at] == fid) {
break;
}
++i;
}
/* Out of space */
if (i == _file_tab.capacity)
fid = 0;
vyUnlockMutex(_file_tab.mutex);
return fid;
}
VY_DLLEXPORT vy_file_id vyAddFile(const char *path) {
vy_text_span span;
span.start = path;
span.length = (unsigned int)strlen(path);
return vyAddFileFromSpan(span);
}
VY_DLLEXPORT const char *vyGetFilePath(vy_file_id fid) {
/* Hash Lookup */
if (fid == 0)
return NULL;
if (!vyLockMutex(_file_tab.mutex)) {
vyReportError("fio", "Failed to lock the guard mutex.");
return 0;
}
const char *result = NULL;
unsigned int i = 0;
unsigned int base = (unsigned int)(fid % _file_tab.capacity);
while (i < _file_tab.capacity) {
unsigned int at = (base + i) % _file_tab.capacity;
if (_file_tab.ids[at] == fid) {
result = _file_tab.names + _file_tab.name_offsets[at];
break;
} else if (_file_tab.ids[at] == 0) {
break;
}
++i;
}
vyUnlockMutex(_file_tab.mutex);
return result;
}

26
src/runtime/file_tab.h Normal file
View File

@ -0,0 +1,26 @@
#ifndef VY_FILE_TAB_H
#define VY_FILE_TAB_H
#include "runtime.h"
#include <stdint.h>
/* used to identify a file (XXH3 hash of the path) */
typedef uint64_t vy_file_id;
VY_DLLEXPORT vy_result vyInitFileTab(unsigned int max_files);
VY_DLLEXPORT void vyShutdownFileTab(void);
VY_DLLEXPORT vy_file_id vyGetFileId(const char *path);
VY_DLLEXPORT vy_file_id vyGetFileIdFromSpan(vy_text_span path);
VY_DLLEXPORT vy_file_id vyAddFile(const char *path);
VY_DLLEXPORT vy_file_id vyAddFileFromSpan(vy_text_span path);
VY_DLLEXPORT const char *vyGetFilePath(vy_file_id fid);
#endif

View File

@ -4,8 +4,6 @@
#include <stdlib.h>
#include <string.h>
#include <xxhash/xxhash.h>
#ifdef __linux__
#include <pthread.h>
#elif defined(_WIN32)
@ -38,23 +36,7 @@ typedef struct {
#endif
} vy_fio_queue;
#define NAME_CAP(cap) ((cap)*128)
typedef struct {
vy_file_id *ids;
unsigned int *name_offsets;
char *names;
unsigned int capacity;
unsigned int name_head;
#ifdef __linux__
pthread_mutex_t mutex;
#elif defined(_WIN32)
HANDLE mutex;
#endif
} vy_file_tab;
static vy_fio_queue _queue;
static vy_file_tab _file_tab;
#ifdef __linux__
static pthread_t _thread;
@ -96,43 +78,6 @@ static void ShutdownFIOQueue(void) {
#endif
}
static bool InitFileTab(unsigned int max_files) {
_file_tab.ids = calloc(max_files, sizeof(vy_file_id));
if (!_file_tab.ids)
return false;
_file_tab.name_offsets = calloc(max_files, sizeof(unsigned int));
if (!_file_tab.name_offsets)
return false;
_file_tab.names = malloc(NAME_CAP(max_files));
if (!_file_tab.names)
return false;
_file_tab.capacity = max_files;
_file_tab.name_head = 0;
#ifdef __linux__
if (pthread_mutex_init(&_file_tab.mutex, NULL) != 0)
return false;
#elif defined(_WIN32)
_file_tab.mutex = CreateMutex(NULL, FALSE, NULL);
if (!_file_tab.mutex)
return false;
#endif
return true;
}
static void ShutdownFileTab(void) {
free(_file_tab.ids);
free(_file_tab.names);
free(_file_tab.name_offsets);
#ifdef __linux__
pthread_mutex_destroy(&_file_tab.mutex);
#elif defined(_WIN32)
CloseHandle(_file_tab.mutex);
#endif
}
#ifdef __linux__
static void *linuxFIOThreadProc(void *);
#elif defined(_WIN32)
@ -141,13 +86,8 @@ static DWORD WINAPI win32FIOThreadProc(_In_ LPVOID);
VY_DLLEXPORT bool vyInitFIO(const vy_fio_config *config) {
unsigned int queue_size = (config->queue_size) ? config->queue_size : 512;
unsigned int max_file_count =
(config->max_file_count) ? config->max_file_count : 512;
if (!InitFIOQueue(queue_size))
return false;
if (!InitFileTab(max_file_count))
return false;
#ifdef __linux__
if (pthread_create(&_thread, NULL, linuxFIOThreadProc, NULL) != 0)
@ -180,110 +120,6 @@ VY_DLLEXPORT void vyShutdownFIO(void) {
}
#endif
ShutdownFIOQueue();
ShutdownFileTab();
}
vy_file_id vyGetFileId(const char *path) {
vy_text_span span;
span.start = path;
span.length = (unsigned int)strlen(path);
return vyGetFileIdFromSpan(span);
}
vy_file_id vyGetFileIdFromSpan(vy_text_span path) {
/* Randomly choosen, aka finger smash keyboard */
XXH64_hash_t seed = 15340978;
vy_file_id fid = (vy_file_id)XXH64(path.start, path.length, seed);
if (fid == 0)
fid = ~fid;
return fid;
}
VY_DLLEXPORT vy_file_id vyAddFileFromSpan(vy_text_span path) {
vy_file_id fid = vyGetFileIdFromSpan(path);
#ifdef __linux__
pthread_mutex_lock(&_file_tab.mutex);
#elif defined(_WIN32)
if (WaitForSingleObject(_file_tab.mutex, INFINITE) == WAIT_FAILED) {
vyReportError("fio", "WaitForSingleObject failed with WAIT_FAILED!");
return 0;
}
#endif
/* Hash Insert */
unsigned int i = 0;
unsigned int base = (unsigned int)(fid % _file_tab.capacity);
while (i < _file_tab.capacity) {
unsigned int at = (base + i) % _file_tab.capacity;
if (_file_tab.ids[at] == 0) {
/* Insert */
unsigned int slen = (unsigned int)path.length + 1;
if ((_file_tab.name_head + slen) >= NAME_CAP(_file_tab.capacity)) {
/* Out of name storage */
fid = 0;
break;
}
memcpy(_file_tab.names + _file_tab.name_head, path.start, slen);
_file_tab.name_offsets[at] = _file_tab.name_head;
_file_tab.ids[at] = fid;
_file_tab.name_head += slen;
break;
} else if (_file_tab.ids[at] == fid) {
break;
}
++i;
}
/* Out of space */
if (i == _file_tab.capacity)
fid = 0;
#ifdef __linux__
pthread_mutex_unlock(&_file_tab.mutex);
#elif defined(_WIN32)
ReleaseMutex(_file_tab.mutex);
#endif
return fid;
}
VY_DLLEXPORT vy_file_id vyAddFile(const char *path) {
vy_text_span span;
span.start = path;
span.length = (unsigned int)strlen(path);
return vyAddFileFromSpan(span);
}
VY_DLLEXPORT const char *vyGetFilePath(vy_file_id fid) {
/* Hash Lookup */
if (fid == 0)
return NULL;
#ifdef __linux__
pthread_mutex_lock(&_file_tab.mutex);
#elif defined(_WIN32)
if (WaitForSingleObject(_file_tab.mutex, INFINITE) == WAIT_FAILED) {
vyReportError("fio", "WaitForSingleObject failed with WAIT_FAILED!");
return 0;
}
#endif
const char *result = NULL;
unsigned int i = 0;
unsigned int base = (unsigned int)(fid % _file_tab.capacity);
while (i < _file_tab.capacity) {
unsigned int at = (base + i) % _file_tab.capacity;
if (_file_tab.ids[at] == fid) {
result = _file_tab.names + _file_tab.name_offsets[at];
break;
} else if (_file_tab.ids[at] == 0) {
break;
}
++i;
}
#ifdef __linux__
pthread_mutex_unlock(&_file_tab.mutex);
#elif defined(_WIN32)
ReleaseMutex(_file_tab.mutex);
#endif
return result;
}
VY_DLLEXPORT vy_fio_handle vyEnqueueRead(vy_file_id fid) {

View File

@ -6,6 +6,7 @@
#include <stdint.h>
#include "runtime.h"
#include "file_tab.h"
enum {
VY_FILE_BUFFER_FLAG_FILE_NOT_FOUND = 0x1,
@ -22,8 +23,6 @@ static inline bool vyWasFileBufferSuccessful(const vy_file_buffer *fb) {
return fb->flags == 0;
}
/* used to identify a file (XXH3 hash of the path) */
typedef uint64_t vy_file_id;
typedef unsigned int vy_fio_handle;
@ -36,16 +35,6 @@ VY_DLLEXPORT bool vyInitFIO(const vy_fio_config *config);
VY_DLLEXPORT void vyShutdownFIO(void);
VY_DLLEXPORT vy_file_id vyGetFileId(const char *path);
VY_DLLEXPORT vy_file_id vyGetFileIdFromSpan(vy_text_span path);
VY_DLLEXPORT vy_file_id vyAddFile(const char *path);
VY_DLLEXPORT vy_file_id vyAddFileFromSpan(vy_text_span path);
VY_DLLEXPORT const char *vyGetFilePath(vy_file_id fid);
VY_DLLEXPORT vy_fio_handle vyEnqueueRead(vy_file_id fid);
VY_DLLEXPORT void vyAbortFIO(vy_fio_handle fio);

View File

@ -65,4 +65,90 @@ VY_DLLEXPORT void vyWaitOnConditionVar(vy_condition_var *var) {
pthread_cond_wait(&var->cond, &var->mutex);
}
#elif defined(_WIN32)
#define WIN32_LEAN_AND_MEAN
#include <Windows.h>
struct vy_condition_var_s {
CRITICAL_SECTION critical_section;
CONDITION_VARIABLE cond;
ptrdiff_t next_reusable;
};
#define MAX_CONDS 1024
vy_condition_var _conds[MAX_CONDS];
static ptrdiff_t _first_reusable = MAX_CONDS;
static ptrdiff_t _next = 0;
static HANDLE _guard;
static INIT_ONCE _guard_init = INIT_ONCE_STATIC_INIT;
static BOOL CALLBACK InitGuardFn(PINIT_ONCE initOnce,
PVOID parameter,
PVOID *context) {
VY_UNUSED(initOnce);
VY_UNUSED(parameter);
VY_UNUSED(context);
_guard = CreateMutexW(NULL, FALSE, NULL);
return _guard != NULL;
}
VY_DLLEXPORT vy_condition_var *vyCreateConditionVar(void) {
if (!InitOnceExecuteOnce(&_guard_init, InitGuardFn, NULL, NULL)) {
vyReportError("core", "Failed to initialize the guard mutex.");
return NULL;
}
if (WaitForSingleObjectEx(_guard, INFINITE, TRUE) != WAIT_OBJECT_0) {
vyLog("core", "Failed to lock the guard variable: %u", GetLastError());
return NULL;
}
if (_first_reusable < MAX_CONDS) {
vy_condition_var *cond = &_conds[_first_reusable];
_first_reusable = cond->next_reusable;
ReleaseMutex(_guard);
return cond;
} else if (_next < MAX_CONDS) {
vy_condition_var *cond = &_conds[_next];
if (!InitializeCriticalSectionAndSpinCount(&cond->critical_section, 4000)) {
vyLog("core", "Condition variable creation failed");
ReleaseMutex(_guard);
return NULL;
}
InitializeConditionVariable(&cond->cond);
cond->next_reusable = MAX_CONDS;
++_next;
ReleaseMutex(_guard);
return cond;
}
vyReportError("core", "Ran out of condition variable objects");
ReleaseMutex(_guard);
return NULL;
}
VY_DLLEXPORT void vyDestroyConditionVar(vy_condition_var *var) {
ptrdiff_t index = var - &_conds[0];
if (WaitForSingleObjectEx(_guard, INFINITE, TRUE) != WAIT_OBJECT_0) {
vyLog("core", "Failed to lock the guard variable: %u", GetLastError());
return;
}
var->next_reusable = _first_reusable;
_first_reusable = index;
ReleaseMutex(_guard);
}
VY_DLLEXPORT void vyLockConditionVar(vy_condition_var *var) {
EnterCriticalSection(&var->critical_section);
}
VY_DLLEXPORT void vyUnlockConditionVar(vy_condition_var *var, bool signal) {
LeaveCriticalSection(&var->critical_section);
if (signal)
WakeAllConditionVariable(&var->cond);
}
VY_DLLEXPORT void vyWaitOnConditionVar(vy_condition_var *var) {
SleepConditionVariableCS(&var->cond, &var->critical_section, INFINITE);
}
#endif

View File

@ -14,8 +14,31 @@ struct vy_mutex_s {
static vy_mutex _mutex[MAX_MUTEX];
static ptrdiff_t _first_reusable = MAX_MUTEX;
static ptrdiff_t _next = 0;
static HANDLE _guard;
static INIT_ONCE _guard_init = INIT_ONCE_STATIC_INIT;
static BOOL CALLBACK InitGuardFn(PINIT_ONCE initOnce,
PVOID parameter,
PVOID *context) {
VY_UNUSED(initOnce);
VY_UNUSED(parameter);
VY_UNUSED(context);
_guard = CreateMutexW(NULL, FALSE, NULL);
return _guard != NULL;
}
VY_DLLEXPORT vy_mutex *vyCreateMutex(void) {
if (!InitOnceExecuteOnce(&_guard_init, InitGuardFn, NULL, NULL)) {
vyReportError("core", "Failed to initialize the guard mutex.");
return NULL;
}
if (WaitForSingleObjectEx(_guard, INFINITE, TRUE) != WAIT_OBJECT_0) {
vyLog("core", "Failed to lock the guard variable: %u", GetLastError());
return NULL;
}
if (_first_reusable < MAX_MUTEX) {
vy_mutex *mtx = &_mutex[_first_reusable];
_first_reusable = mtx->next_reusable;
@ -42,9 +65,10 @@ VY_DLLEXPORT void vyDestroyMutex(vy_mutex *mutex) {
}
VY_DLLEXPORT bool vyLockMutex(vy_mutex *mutex) {
return WaitForSingleObject(mutex->handle, INFINITE) == WAIT_OBJECT_0;
return WaitForSingleObjectEx(mutex->handle, INFINITE, TRUE) == WAIT_OBJECT_0;
}
v VY_DLLEXPORT bool vyUnlockMutex(vy_mutex *mutex) {
VY_DLLEXPORT bool vyUnlockMutex(vy_mutex *mutex) {
return ReleaseMutex(mutex->handle) != 0;
}

View File

@ -2,6 +2,104 @@
#include "threading.h"
#if defined(_WIN32)
#define WIN32_LEAN_AND_MEAN
#include <Windows.h>
struct vy_thread_s {
HANDLE handle;
ptrdiff_t next_reusable;
vy_thread_entry_fn *entry;
void *param;
bool needs_join;
};
#define MAX_THREADS 256
static vy_thread _threads[MAX_THREADS];
static ptrdiff_t _first_reusable = MAX_THREADS;
static ptrdiff_t _next = 0;
static HANDLE _guard;
static INIT_ONCE _guard_init = INIT_ONCE_STATIC_INIT;
static BOOL CALLBACK InitGuardFn(PINIT_ONCE initOnce,
PVOID parameter,
PVOID *context) {
VY_UNUSED(initOnce);
VY_UNUSED(parameter);
VY_UNUSED(context);
_guard = CreateMutexW(NULL, FALSE, NULL);
return _guard != NULL;
}
static DWORD WINAPI win32ThreadWrapper(LPVOID arg) {
vy_thread *user_thread = arg;
user_thread->needs_join = false;
user_thread->entry(user_thread->param);
user_thread->needs_join = true;
return 0;
}
VY_DLLEXPORT vy_thread *vySpawnThread(vy_thread_entry_fn *entry, void *param) {
if (!InitOnceExecuteOnce(&_guard_init, InitGuardFn, NULL, NULL)) {
vyReportError("core", "Failed to initialize the guard mutex.");
return NULL;
}
vy_thread *thrd = NULL;
if (WaitForSingleObject(_guard, INFINITE) != WAIT_OBJECT_0) {
vyLog("core", "Failed to lock the guard variable: %u", GetLastError());
return NULL;
}
if (_first_reusable < MAX_THREADS) {
thrd = &_threads[_first_reusable];
_first_reusable = thrd->next_reusable;
if (thrd->needs_join) {
WaitForSingleObject(thrd->handle, INFINITE);
CloseHandle(thrd->handle);
thrd->needs_join = false;
}
} else if (_next < MAX_THREADS) {
thrd = &_threads[_next];
thrd->next_reusable = MAX_THREADS;
++_next;
}
if (thrd) {
thrd->entry = entry;
thrd->param = param;
thrd->handle = CreateThread(NULL, 0, win32ThreadWrapper, (LPVOID)thrd, 0, NULL);
if (thrd->handle == NULL) {
vyLog("core", "Thread creation failed");
thrd = NULL;
}
} else {
vyReportError("core", "Ran out of thread objects");
}
ReleaseMutex(_guard);
return thrd;
}
VY_DLLEXPORT void vyJoinThread(vy_thread *thread) {
WaitForSingleObject(thread->handle, INFINITE);
CloseHandle(thread->handle);
thread->needs_join = false;
ptrdiff_t index = thread - &_threads[0];
if (WaitForSingleObject(_guard, INFINITE) != WAIT_OBJECT_0) {
vyLog("core", "Failed to lock the guard variable: %u", GetLastError());
return;
}
thread->next_reusable = _first_reusable;
_first_reusable = index;
ReleaseMutex(_guard);
}
#elif defined(__linux__)
#include <pthread.h>
@ -51,7 +149,7 @@ VY_DLLEXPORT vy_thread *vySpawnThread(vy_thread_entry_fn *entry, void *param) {
thrd->param = param;
if (pthread_create(&thrd->handle, NULL, linuxThreadWrapper, thrd) !=
0) {
vyLog("core", "Mutex creation failed");
vyLog("core", "Thread creation failed");
thrd = NULL;
}
} else {