rtengine/src/fio.c
Kevin Trogant 1dba3d2d63 fix, feat: various issues
- [win32] FIO thread no longer hangs during application exit
- Signal if a file operation was actually successfull.
- Add a logging function (currently identical to vyReportError)
2023-10-13 23:15:23 +02:00

477 lines
13 KiB
C

#include "fio.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <xxhash/xxhash.h>
#ifdef __linux__
#include <pthread.h>
#elif defined(_WIN32)
#define WIN32_LEAN_AND_MEAN
#include <Windows.h>
#endif
#define FLAGS_FINISHED 0x001
#define FLAGS_RETRIEVED 0x002
#define FLAGS_IN_USE 0x004
typedef struct {
vy_file_id fid;
vy_file_buffer buffer;
unsigned int flags;
} vy_file_op;
/* Ringbuffer of file io operations */
typedef struct {
vy_file_op *ops;
unsigned int size;
unsigned int write_pos;
unsigned int read_pos;
#ifdef __linux__
pthread_mutex_t mutex;
pthread_cond_t pending_cond;
#elif defined(_WIN32)
CRITICAL_SECTION critical_section;
CONDITION_VARIABLE pending_cond;
#endif
} vy_fio_queue;
#define NAME_CAP(cap) ((cap)*128)
typedef struct {
vy_file_id *ids;
unsigned int *name_offsets;
char *names;
unsigned int capacity;
unsigned int name_head;
#ifdef __linux__
pthread_mutex_t mutex;
#elif defined(_WIN32)
HANDLE mutex;
#endif
} vy_file_tab;
static vy_fio_queue _queue;
static vy_file_tab _file_tab;
#ifdef __linux__
static pthread_t _thread;
#elif defined(_WIN32)
static HANDLE _fio_thread = NULL;
static HANDLE _fio_term_event = NULL;
#endif
static bool InitFIOQueue(unsigned int size) {
_queue.ops = calloc(size, sizeof(vy_file_op));
if (!_queue.ops)
return false;
_queue.write_pos = 0;
_queue.read_pos = 0;
_queue.size = size;
#ifdef __linux__
if (pthread_cond_init(&_queue.pending_cond, NULL) != 0)
return false;
if (pthread_mutex_init(&_queue.mutex, NULL) != 0)
return false;
#elif defined(_WIN32)
if (!InitializeCriticalSectionAndSpinCount(&_queue.critical_section, 4000))
return false;
InitializeConditionVariable(&_queue.pending_cond);
#endif
return true;
}
static void ShutdownFIOQueue(void) {
for (unsigned int i = 0; i < _queue.size; ++i) {
free(_queue.ops[i].buffer.data);
}
free(_queue.ops);
#ifdef __linux__
pthread_cond_destroy(&_queue.pending_cond);
pthread_mutex_destroy(&_queue.mutex);
#elif defined(_WIN32)
DeleteCriticalSection(&_queue.critical_section);
#endif
}
static bool InitFileTab(unsigned int max_files) {
_file_tab.ids = calloc(max_files, sizeof(vy_file_id));
if (!_file_tab.ids)
return false;
_file_tab.name_offsets = calloc(max_files, sizeof(unsigned int));
if (!_file_tab.name_offsets)
return false;
_file_tab.names = malloc(NAME_CAP(max_files));
if (!_file_tab.names)
return false;
_file_tab.capacity = max_files;
_file_tab.name_head = 0;
#ifdef __linux__
if (pthread_mutex_init(&_file_tab.mutex, NULL) != 0)
return false;
#elif defined(_WIN32)
_file_tab.mutex = CreateMutex(NULL, FALSE, NULL);
if (!_file_tab.mutex)
return false;
#endif
return true;
}
static void ShutdownFileTab(void) {
free(_file_tab.ids);
free(_file_tab.names);
free(_file_tab.name_offsets);
#ifdef __linux__
pthread_mutex_destroy(&_file_tab.mutex);
#elif defined(_WIN32)
CloseHandle(_file_tab.mutex);
#endif
}
#ifdef __linux__
static void *linuxFIOThreadProc(void *);
#elif defined(_WIN32)
static DWORD WINAPI win32FIOThreadProc(_In_ LPVOID);
#endif
bool vyInitFIO(const vy_fio_config *config) {
unsigned int queue_size = (config->queue_size) ? config->queue_size : 512;
unsigned int max_file_count =
(config->max_file_count) ? config->max_file_count : 512;
if (!InitFIOQueue(queue_size))
return false;
if (!InitFileTab(max_file_count))
return false;
#ifdef __linux__
if (pthread_create(&_thread, NULL, linuxFIOThreadProc, NULL) != 0)
return false;
#elif defined(_WIN32)
_fio_term_event = CreateEventW(NULL, FALSE, FALSE, NULL);
if (!_fio_term_event)
return false;
_fio_thread = CreateThread(NULL, 0, win32FIOThreadProc, NULL, 0, NULL);
if (!_fio_thread)
return false;
SetThreadDescription(_fio_thread, L"FIO Thread");
#endif
return true;
}
void vyShutdownFIO(void) {
#ifdef __linux__
pthread_cancel(_thread);
pthread_join(_thread, NULL);
#elif defined(_WIN32)
if (SetEvent(_fio_term_event)) {
WakeAllConditionVariable(&_queue.pending_cond);
WaitForSingleObject(_fio_thread, INFINITE);
CloseHandle(_fio_thread);
CloseHandle(_fio_term_event);
} else {
vyReportError("FIO", "Failed to signal the termination event.");
}
#endif
ShutdownFIOQueue();
ShutdownFileTab();
}
vy_file_id vyGetFileId(const char *path) {
vy_text_span span;
span.start = path;
span.length = (unsigned int)strlen(path);
return vyGetFileIdFromSpan(span);
}
vy_file_id vyGetFileIdFromSpan(vy_text_span path) {
/* Randomly choosen, aka finger smash keyboard */
XXH64_hash_t seed = 15340978;
vy_file_id fid = (vy_file_id)XXH64(path.start, path.length, seed);
if (fid == 0)
fid = ~fid;
return fid;
}
vy_file_id vyAddFileFromSpan(vy_text_span path) {
vy_file_id fid = vyGetFileIdFromSpan(path);
#ifdef __linux__
pthread_mutex_lock(&_file_tab.mutex);
#elif defined(_WIN32)
if (WaitForSingleObject(_file_tab.mutex, INFINITE) == WAIT_FAILED) {
vyReportError("fio", "WaitForSingleObject failed with WAIT_FAILED!");
return 0;
}
#endif
/* Hash Insert */
unsigned int i = 0;
unsigned int base = (unsigned int)(fid % _file_tab.capacity);
while (i < _file_tab.capacity) {
unsigned int at = (base + i) % _file_tab.capacity;
if (_file_tab.ids[at] == 0) {
/* Insert */
unsigned int slen = (unsigned int)path.length + 1;
if ((_file_tab.name_head + slen) >= NAME_CAP(_file_tab.capacity)) {
/* Out of name storage */
fid = 0;
break;
}
memcpy(_file_tab.names + _file_tab.name_head, path.start, slen);
_file_tab.name_offsets[at] = _file_tab.name_head;
_file_tab.ids[at] = fid;
_file_tab.name_head += slen;
break;
} else if (_file_tab.ids[at] == fid) {
break;
}
++i;
}
/* Out of space */
if (i == _file_tab.capacity)
fid = 0;
#ifdef __linux__
pthread_mutex_unlock(&_file_tab.mutex);
#elif defined(_WIN32)
ReleaseMutex(_file_tab.mutex);
#endif
return fid;
}
vy_file_id vyAddFile(const char *path) {
vy_text_span span;
span.start = path;
span.length = (unsigned int)strlen(path);
return vyAddFileFromSpan(span);
}
const char *vyGetFilePath(vy_file_id fid) {
/* Hash Lookup */
if (fid == 0)
return NULL;
#ifdef __linux__
pthread_mutex_lock(&_file_tab.mutex);
#elif defined(_WIN32)
if (WaitForSingleObject(_file_tab.mutex, INFINITE) == WAIT_FAILED) {
vyReportError("fio", "WaitForSingleObject failed with WAIT_FAILED!");
return 0;
}
#endif
const char *result = NULL;
unsigned int i = 0;
unsigned int base = (unsigned int)(fid % _file_tab.capacity);
while (i < _file_tab.capacity) {
unsigned int at = (base + i) % _file_tab.capacity;
if (_file_tab.ids[at] == fid) {
result = _file_tab.names + _file_tab.name_offsets[at];
break;
} else if (_file_tab.ids[at] == 0) {
break;
}
++i;
}
#ifdef __linux__
pthread_mutex_unlock(&_file_tab.mutex);
#elif defined(_WIN32)
ReleaseMutex(_file_tab.mutex);
#endif
return result;
}
vy_fio_handle vyEnqueueRead(vy_file_id fid) {
vy_fio_handle handle = 0;
do {
#ifdef __linux__
pthread_mutex_lock(&_queue.mutex);
#elif defined(_WIN32)
EnterCriticalSection(&_queue.critical_section);
#endif
if (_queue.ops[_queue.write_pos].flags == 0 ||
((_queue.ops[_queue.write_pos].flags & FLAGS_FINISHED) != 0 &&
(_queue.ops[_queue.write_pos].flags & FLAGS_RETRIEVED) != 0)) {
_queue.ops[_queue.write_pos].fid = fid;
_queue.ops[_queue.write_pos].flags = FLAGS_IN_USE;
handle = _queue.write_pos + 1;
_queue.write_pos = (_queue.write_pos + 1) % _queue.size;
#ifdef __linux__
pthread_cond_signal(&_queue.pending_cond);
pthread_mutex_unlock(&_queue.mutex);
#elif defined(_WIN32)
LeaveCriticalSection(&_queue.critical_section);
WakeAllConditionVariable(&_queue.pending_cond);
#endif
break;
}
#ifdef __linux__
pthread_mutex_unlock(&_queue.mutex);
#elif defined(_WIN32)
LeaveCriticalSection(&_queue.critical_section);
#endif
} while (1);
return handle;
}
void vyAbortFIO(vy_fio_handle fio) {
if (fio == 0)
return;
#ifdef __linux__
pthread_mutex_lock(&_queue.mutex);
#elif defined(_WIN32)
EnterCriticalSection(&_queue.critical_section);
#endif
_queue.ops[fio - 1].flags = 0;
if (_queue.ops[fio - 1].buffer.data) {
free(_queue.ops[fio - 1].buffer.data);
_queue.ops[fio - 1].buffer.size = 0;
}
#ifdef __linux__
pthread_mutex_unlock(&_queue.mutex);
#elif defined(_WIN32)
LeaveCriticalSection(&_queue.critical_section);
#endif
}
bool vyIsFIOFinished(vy_fio_handle fio) {
if (fio == 0)
return false;
#ifdef __linux__
pthread_mutex_lock(&_queue.mutex);
#elif defined(_WIN32)
EnterCriticalSection(&_queue.critical_section);
#endif
bool result = (_queue.ops[fio - 1].flags & FLAGS_FINISHED) != 0;
#ifdef __linux__
pthread_mutex_unlock(&_queue.mutex);
#elif defined(_WIN32)
LeaveCriticalSection(&_queue.critical_section);
#endif
return result;
}
bool vyRetrieveReadBuffer(vy_fio_handle fio, vy_file_buffer *buffer) {
if (fio == 0)
return false;
#ifdef __linux__
pthread_mutex_lock(&_queue.mutex);
#elif defined(_WIN32)
EnterCriticalSection(&_queue.critical_section);
#endif
bool is_finished = (_queue.ops[fio - 1].flags & FLAGS_FINISHED) != 0;
if (is_finished) {
size_t sz = _queue.ops[fio - 1].buffer.size;
buffer->data = malloc(sz);
if (!buffer->data)
return false;
buffer->size = sz;
memcpy(buffer->data, _queue.ops[fio - 1].buffer.data, sz);
buffer->flags = _queue.ops[fio - 1].buffer.flags;
_queue.ops[fio - 1].flags |= FLAGS_RETRIEVED;
}
#ifdef __linux__
pthread_mutex_unlock(&_queue.mutex);
#elif defined(_WIN32)
LeaveCriticalSection(&_queue.critical_section);
#endif
return is_finished;
}
void vyFreeFileBuffer(vy_file_buffer buffer) {
free(buffer.data);
}
static void ProcessRead(vy_file_op *op) {
const char *path = vyGetFilePath(op->fid);
if (!path) {
op->flags |= FLAGS_FINISHED;
return;
}
op->buffer.data = NULL;
op->buffer.size = 0;
op->buffer.flags = 0;
FILE *file = fopen(path, "rb");
if (!file) {
op->flags |= FLAGS_FINISHED;
op->buffer.flags = VY_FILE_BUFFER_FLAG_FILE_NOT_FOUND;
return;
}
fseek(file, 0, SEEK_END);
long fsz = ftell(file);
fseek(file, 0, SEEK_SET);
op->buffer.data = malloc(fsz);
op->buffer.size = (size_t)fsz;
if (fread(op->buffer.data, fsz, 1, file) != 1) {
free(op->buffer.data);
op->buffer.data = NULL;
op->buffer.size = 0;
op->buffer.flags = VY_FILE_BUFFER_FLAG_READ_FAILED;
}
fclose(file);
op->flags |= FLAGS_FINISHED;
}
#ifdef __linux__
static void *linuxFIOThreadProc(void *_param) {
while (true) {
pthread_mutex_lock(&_queue.mutex);
while (_queue.write_pos == _queue.read_pos) {
pthread_cond_wait(&_queue.pending_cond, &_queue.mutex);
}
ProcessRead(&_queue.ops[_queue.read_pos]);
_queue.read_pos = (_queue.read_pos + 1) % _queue.size;
pthread_mutex_unlock(&_queue.mutex);
}
return NULL;
}
#elif defined(_WIN32)
static DWORD WINAPI win32FIOThreadProc(_In_ LPVOID lpParam) {
VY_UNUSED(lpParam);
bool keep_running = true;
while (keep_running) {
EnterCriticalSection(&_queue.critical_section);
while (_queue.read_pos == _queue.write_pos && keep_running) {
SleepConditionVariableCS(&_queue.pending_cond,
&_queue.critical_section,
INFINITE);
DWORD wfs = WaitForSingleObject(_fio_term_event, 0);
if (wfs != WAIT_FAILED) {
keep_running = wfs != WAIT_OBJECT_0;
} else {
vyLog("FIO", "ThreadProc wait error: %d", GetLastError());
keep_running = false;
}
}
/* It's possible that we were awoken during application shutdown. */
if (_queue.write_pos != _queue.read_pos) {
ProcessRead(&_queue.ops[_queue.read_pos]);
_queue.read_pos = (_queue.read_pos + 1) % _queue.size;
}
LeaveCriticalSection(&_queue.critical_section);
}
vyLog("FIO", "Exit FIO thread");
return 0;
}
#endif