Port fio to windows

This commit is contained in:
Kevin Trogant 2023-10-12 23:45:44 +02:00
parent 268a9722f5
commit fa50d9fab5

106
src/fio.c
View File

@ -8,6 +8,9 @@
#ifdef __linux__ #ifdef __linux__
#include <pthread.h> #include <pthread.h>
#elif defined(_WIN32)
#define WIN32_LEAN_AND_MEAN
#include <Windows.h>
#endif #endif
#define FLAGS_FINISHED 0x001 #define FLAGS_FINISHED 0x001
@ -29,6 +32,9 @@ typedef struct {
#ifdef __linux__ #ifdef __linux__
pthread_mutex_t mutex; pthread_mutex_t mutex;
pthread_cond_t pending_cond; pthread_cond_t pending_cond;
#elif defined(_WIN32)
CRITICAL_SECTION critical_section;
CONDITION_VARIABLE pending_cond;
#endif #endif
} vy_fio_queue; } vy_fio_queue;
@ -42,6 +48,8 @@ typedef struct {
#ifdef __linux__ #ifdef __linux__
pthread_mutex_t mutex; pthread_mutex_t mutex;
#elif defined(_WIN32)
HANDLE mutex;
#endif #endif
} vy_file_tab; } vy_file_tab;
@ -50,6 +58,9 @@ static vy_file_tab _file_tab;
#ifdef __linux__ #ifdef __linux__
static pthread_t _thread; static pthread_t _thread;
#elif defined(_WIN32)
static HANDLE _fio_thread = NULL;
static HANDLE _fio_term_event = NULL;
#endif #endif
static bool InitFIOQueue(unsigned int size) { static bool InitFIOQueue(unsigned int size) {
@ -59,6 +70,16 @@ static bool InitFIOQueue(unsigned int size) {
_queue.write_pos = 0; _queue.write_pos = 0;
_queue.read_pos = 0; _queue.read_pos = 0;
_queue.size = size; _queue.size = size;
#ifdef __linux__
if (pthread_cond_init(&_queue.pending_cond, NULL) != 0)
return false;
if (pthread_mutex_init(&_queue.mutex, NULL) != 0)
return false;
#elif defined(_WIN32)
if (!InitializeCriticalSectionAndSpinCount(&_queue.critical_section, 4000))
return false;
InitializeConditionVariable(&_queue.pending_cond);
#endif
return true; return true;
} }
@ -67,6 +88,12 @@ static void ShutdownFIOQueue(void) {
free(_queue.ops[i].buffer.data); free(_queue.ops[i].buffer.data);
} }
free(_queue.ops); free(_queue.ops);
#ifdef __linux__
pthread_cond_destroy(&_queue.pending_cond);
pthread_mutex_destroy(&_queue.mutex);
#elif defined(_WIN32)
DeleteCriticalSection(&_queue.critical_section);
#endif
} }
static bool InitFileTab(unsigned int max_files) { static bool InitFileTab(unsigned int max_files) {
@ -86,6 +113,10 @@ static bool InitFileTab(unsigned int max_files) {
#ifdef __linux__ #ifdef __linux__
if (pthread_mutex_init(&_file_tab.mutex, NULL) != 0) if (pthread_mutex_init(&_file_tab.mutex, NULL) != 0)
return false; return false;
#elif defined(_WIN32)
_file_tab.mutex = CreateMutex(NULL, FALSE, NULL);
if (!_file_tab.mutex)
return false;
#endif #endif
return true; return true;
} }
@ -97,10 +128,16 @@ static void ShutdownFileTab(void) {
#ifdef __linux__ #ifdef __linux__
pthread_mutex_destroy(&_file_tab.mutex); pthread_mutex_destroy(&_file_tab.mutex);
#elif defined(_WIN32)
CloseHandle(_file_tab.mutex);
#endif #endif
} }
static void *FIOThreadProc(void *); #ifdef __linux__
static void *linuxFIOThreadProc(void *);
#elif defined(_WIN32)
static DWORD WINAPI win32FIOThreadProc(_In_ LPVOID);
#endif
bool vyInitFIO(const vy_fio_config *config) { bool vyInitFIO(const vy_fio_config *config) {
unsigned int queue_size = (config->queue_size) ? config->queue_size : 512; unsigned int queue_size = (config->queue_size) ? config->queue_size : 512;
@ -109,12 +146,18 @@ bool vyInitFIO(const vy_fio_config *config) {
if (!InitFIOQueue(queue_size)) if (!InitFIOQueue(queue_size))
return false; return false;
if (!InitFileTab(max_file_count)) if (!InitFileTab(max_file_count))
return false; return false;
#ifdef __linux__ #ifdef __linux__
if (pthread_create(&_thread, NULL, FIOThreadProc, NULL) != 0) if (pthread_create(&_thread, NULL, linuxFIOThreadProc, NULL) != 0)
return false;
#elif defined(_WIN32)
_fio_term_event = CreateEvent(NULL, TRUE, FALSE, NULL);
if (!_fio_term_event)
return false;
_fio_thread = CreateThread(NULL, 0, win32FIOThreadProc, NULL, 0, NULL);
if (!_fio_thread)
return false; return false;
#endif #endif
@ -125,6 +168,9 @@ void vyShutdownFIO(void) {
#ifdef __linux__ #ifdef __linux__
pthread_cancel(_thread); pthread_cancel(_thread);
pthread_join(_thread, NULL); pthread_join(_thread, NULL);
#elif defined(_WIN32)
WaitForSingleObject(_fio_thread, INFINITE);
CloseHandle(_fio_thread);
#endif #endif
ShutdownFIOQueue(); ShutdownFIOQueue();
ShutdownFileTab(); ShutdownFileTab();
@ -151,6 +197,11 @@ vy_file_id vyAddFileFromSpan(vy_text_span path) {
#ifdef __linux__ #ifdef __linux__
pthread_mutex_lock(&_file_tab.mutex); pthread_mutex_lock(&_file_tab.mutex);
#elif defined(_WIN32)
if (WaitForSingleObject(_file_tab.mutex, INFINITE) == WAIT_FAILED) {
vyReportError("fio", "WaitForSingleObject failed with WAIT_FAILED!");
return 0;
}
#endif #endif
/* Hash Insert */ /* Hash Insert */
unsigned int i = 0; unsigned int i = 0;
@ -182,6 +233,8 @@ vy_file_id vyAddFileFromSpan(vy_text_span path) {
#ifdef __linux__ #ifdef __linux__
pthread_mutex_unlock(&_file_tab.mutex); pthread_mutex_unlock(&_file_tab.mutex);
#elif defined(_WIN32)
ReleaseMutex(_file_tab.mutex);
#endif #endif
return fid; return fid;
} }
@ -199,6 +252,11 @@ const char *vyGetFilePath(vy_file_id fid) {
return NULL; return NULL;
#ifdef __linux__ #ifdef __linux__
pthread_mutex_lock(&_file_tab.mutex); pthread_mutex_lock(&_file_tab.mutex);
#elif defined(_WIN32)
if (WaitForSingleObject(_file_tab.mutex, INFINITE) == WAIT_FAILED) {
vyReportError("fio", "WaitForSingleObject failed with WAIT_FAILED!");
return 0;
}
#endif #endif
const char *result = NULL; const char *result = NULL;
unsigned int i = 0; unsigned int i = 0;
@ -215,6 +273,8 @@ const char *vyGetFilePath(vy_file_id fid) {
} }
#ifdef __linux__ #ifdef __linux__
pthread_mutex_unlock(&_file_tab.mutex); pthread_mutex_unlock(&_file_tab.mutex);
#elif defined(_WIN32)
ReleaseMutex(_file_tab.mutex);
#endif #endif
return result; return result;
} }
@ -225,6 +285,8 @@ vy_fio_handle vyEnqueueRead(vy_file_id fid) {
do { do {
#ifdef __linux__ #ifdef __linux__
pthread_mutex_lock(&_queue.mutex); pthread_mutex_lock(&_queue.mutex);
#elif defined(_WIN32)
EnterCriticalSection(&_queue.critical_section);
#endif #endif
if (_queue.ops[_queue.write_pos].flags == 0 || if (_queue.ops[_queue.write_pos].flags == 0 ||
@ -240,12 +302,17 @@ vy_fio_handle vyEnqueueRead(vy_file_id fid) {
#ifdef __linux__ #ifdef __linux__
pthread_cond_signal(&_queue.pending_cond); pthread_cond_signal(&_queue.pending_cond);
pthread_mutex_unlock(&_queue.mutex); pthread_mutex_unlock(&_queue.mutex);
#elif defined(_WIN32)
LeaveCriticalSection(&_queue.critical_section);
WakeAllConditionVariable(&_queue.pending_cond);
#endif #endif
break; break;
} }
#ifdef __linux__ #ifdef __linux__
pthread_mutex_unlock(&_queue.mutex); pthread_mutex_unlock(&_queue.mutex);
#elif defined(_WIN32)
LeaveCriticalSection(&_queue.critical_section);
#endif #endif
} while (1); } while (1);
@ -257,6 +324,8 @@ void vyAbortFIO(vy_fio_handle fio) {
return; return;
#ifdef __linux__ #ifdef __linux__
pthread_mutex_lock(&_queue.mutex); pthread_mutex_lock(&_queue.mutex);
#elif defined(_WIN32)
EnterCriticalSection(&_queue.critical_section);
#endif #endif
_queue.ops[fio - 1].flags = 0; _queue.ops[fio - 1].flags = 0;
if (_queue.ops[fio - 1].buffer.data) { if (_queue.ops[fio - 1].buffer.data) {
@ -265,6 +334,8 @@ void vyAbortFIO(vy_fio_handle fio) {
} }
#ifdef __linux__ #ifdef __linux__
pthread_mutex_unlock(&_queue.mutex); pthread_mutex_unlock(&_queue.mutex);
#elif defined(_WIN32)
LeaveCriticalSection(&_queue.critical_section);
#endif #endif
} }
@ -273,10 +344,14 @@ bool vyIsFIOFinished(vy_fio_handle fio) {
return false; return false;
#ifdef __linux__ #ifdef __linux__
pthread_mutex_lock(&_queue.mutex); pthread_mutex_lock(&_queue.mutex);
#elif defined(_WIN32)
EnterCriticalSection(&_queue.critical_section);
#endif #endif
bool result = (_queue.ops[fio - 1].flags & FLAGS_FINISHED) != 0; bool result = (_queue.ops[fio - 1].flags & FLAGS_FINISHED) != 0;
#ifdef __linux__ #ifdef __linux__
pthread_mutex_unlock(&_queue.mutex); pthread_mutex_unlock(&_queue.mutex);
#elif defined(_WIN32)
LeaveCriticalSection(&_queue.critical_section);
#endif #endif
return result; return result;
} }
@ -287,6 +362,8 @@ bool vyRetrieveReadBuffer(vy_fio_handle fio, vy_file_buffer *buffer) {
#ifdef __linux__ #ifdef __linux__
pthread_mutex_lock(&_queue.mutex); pthread_mutex_lock(&_queue.mutex);
#elif defined(_WIN32)
EnterCriticalSection(&_queue.critical_section);
#endif #endif
bool is_finished = (_queue.ops[fio - 1].flags & FLAGS_FINISHED) != 0; bool is_finished = (_queue.ops[fio - 1].flags & FLAGS_FINISHED) != 0;
@ -302,6 +379,8 @@ bool vyRetrieveReadBuffer(vy_fio_handle fio, vy_file_buffer *buffer) {
#ifdef __linux__ #ifdef __linux__
pthread_mutex_unlock(&_queue.mutex); pthread_mutex_unlock(&_queue.mutex);
#elif defined(_WIN32)
LeaveCriticalSection(&_queue.critical_section);
#endif #endif
return is_finished; return is_finished;
@ -342,8 +421,8 @@ static void ProcessRead(vy_file_op *op) {
op->flags |= FLAGS_FINISHED; op->flags |= FLAGS_FINISHED;
} }
static void *FIOThreadProc(void *_param) {
#ifdef __linux__ #ifdef __linux__
static void *linuxFIOThreadProc(void *_param) {
while (true) { while (true) {
pthread_mutex_lock(&_queue.mutex); pthread_mutex_lock(&_queue.mutex);
while (_queue.write_pos == _queue.read_pos) { while (_queue.write_pos == _queue.read_pos) {
@ -353,6 +432,23 @@ static void *FIOThreadProc(void *_param) {
_queue.read_pos = (_queue.read_pos + 1) % _queue.size; _queue.read_pos = (_queue.read_pos + 1) % _queue.size;
pthread_mutex_unlock(&_queue.mutex); pthread_mutex_unlock(&_queue.mutex);
} }
#endif
return NULL; return NULL;
} }
#elif defined(_WIN32)
static DWORD WINAPI win32FIOThreadProc(_In_ LPVOID lpParam) {
VY_UNUSED(lpParam);
while (WaitForSingleObject(&_fio_term_event, 0) != WAIT_OBJECT_0) {
EnterCriticalSection(&_queue.critical_section);
while (_queue.write_pos == _queue.read_pos) {
SleepConditionVariableCS(&_queue.pending_cond,
&_queue.critical_section,
INFINITE);
}
ProcessRead(&_queue.ops[_queue.read_pos]);
_queue.read_pos = (_queue.read_pos + 1) % _queue.size;
LeaveCriticalSection(&_queue.critical_section);
}
return 0;
}
#endif