diff --git a/src/fio.c b/src/fio.c index 0a54889..55cacd2 100644 --- a/src/fio.c +++ b/src/fio.c @@ -8,6 +8,9 @@ #ifdef __linux__ #include +#elif defined(_WIN32) + #define WIN32_LEAN_AND_MEAN + #include #endif #define FLAGS_FINISHED 0x001 @@ -29,6 +32,9 @@ typedef struct { #ifdef __linux__ pthread_mutex_t mutex; pthread_cond_t pending_cond; +#elif defined(_WIN32) + CRITICAL_SECTION critical_section; + CONDITION_VARIABLE pending_cond; #endif } vy_fio_queue; @@ -42,6 +48,8 @@ typedef struct { #ifdef __linux__ pthread_mutex_t mutex; +#elif defined(_WIN32) + HANDLE mutex; #endif } vy_file_tab; @@ -50,6 +58,9 @@ static vy_file_tab _file_tab; #ifdef __linux__ static pthread_t _thread; +#elif defined(_WIN32) +static HANDLE _fio_thread = NULL; +static HANDLE _fio_term_event = NULL; #endif static bool InitFIOQueue(unsigned int size) { @@ -59,6 +70,16 @@ static bool InitFIOQueue(unsigned int size) { _queue.write_pos = 0; _queue.read_pos = 0; _queue.size = size; +#ifdef __linux__ + if (pthread_cond_init(&_queue.pending_cond, NULL) != 0) + return false; + if (pthread_mutex_init(&_queue.mutex, NULL) != 0) + return false; +#elif defined(_WIN32) + if (!InitializeCriticalSectionAndSpinCount(&_queue.critical_section, 4000)) + return false; + InitializeConditionVariable(&_queue.pending_cond); +#endif return true; } @@ -67,6 +88,12 @@ static void ShutdownFIOQueue(void) { free(_queue.ops[i].buffer.data); } free(_queue.ops); +#ifdef __linux__ + pthread_cond_destroy(&_queue.pending_cond); + pthread_mutex_destroy(&_queue.mutex); +#elif defined(_WIN32) + DeleteCriticalSection(&_queue.critical_section); +#endif } static bool InitFileTab(unsigned int max_files) { @@ -86,6 +113,10 @@ static bool InitFileTab(unsigned int max_files) { #ifdef __linux__ if (pthread_mutex_init(&_file_tab.mutex, NULL) != 0) return false; +#elif defined(_WIN32) + _file_tab.mutex = CreateMutex(NULL, FALSE, NULL); + if (!_file_tab.mutex) + return false; #endif return true; } @@ -97,10 +128,16 @@ static void ShutdownFileTab(void) { #ifdef __linux__ pthread_mutex_destroy(&_file_tab.mutex); +#elif defined(_WIN32) + CloseHandle(_file_tab.mutex); #endif } -static void *FIOThreadProc(void *); +#ifdef __linux__ +static void *linuxFIOThreadProc(void *); +#elif defined(_WIN32) +static DWORD WINAPI win32FIOThreadProc(_In_ LPVOID); +#endif bool vyInitFIO(const vy_fio_config *config) { unsigned int queue_size = (config->queue_size) ? config->queue_size : 512; @@ -109,12 +146,18 @@ bool vyInitFIO(const vy_fio_config *config) { if (!InitFIOQueue(queue_size)) return false; - if (!InitFileTab(max_file_count)) return false; #ifdef __linux__ - if (pthread_create(&_thread, NULL, FIOThreadProc, NULL) != 0) + if (pthread_create(&_thread, NULL, linuxFIOThreadProc, NULL) != 0) + return false; +#elif defined(_WIN32) + _fio_term_event = CreateEvent(NULL, TRUE, FALSE, NULL); + if (!_fio_term_event) + return false; + _fio_thread = CreateThread(NULL, 0, win32FIOThreadProc, NULL, 0, NULL); + if (!_fio_thread) return false; #endif @@ -125,6 +168,9 @@ void vyShutdownFIO(void) { #ifdef __linux__ pthread_cancel(_thread); pthread_join(_thread, NULL); +#elif defined(_WIN32) + WaitForSingleObject(_fio_thread, INFINITE); + CloseHandle(_fio_thread); #endif ShutdownFIOQueue(); ShutdownFileTab(); @@ -151,6 +197,11 @@ vy_file_id vyAddFileFromSpan(vy_text_span path) { #ifdef __linux__ pthread_mutex_lock(&_file_tab.mutex); +#elif defined(_WIN32) + if (WaitForSingleObject(_file_tab.mutex, INFINITE) == WAIT_FAILED) { + vyReportError("fio", "WaitForSingleObject failed with WAIT_FAILED!"); + return 0; + } #endif /* Hash Insert */ unsigned int i = 0; @@ -182,6 +233,8 @@ vy_file_id vyAddFileFromSpan(vy_text_span path) { #ifdef __linux__ pthread_mutex_unlock(&_file_tab.mutex); +#elif defined(_WIN32) + ReleaseMutex(_file_tab.mutex); #endif return fid; } @@ -199,6 +252,11 @@ const char *vyGetFilePath(vy_file_id fid) { return NULL; #ifdef __linux__ pthread_mutex_lock(&_file_tab.mutex); +#elif defined(_WIN32) + if (WaitForSingleObject(_file_tab.mutex, INFINITE) == WAIT_FAILED) { + vyReportError("fio", "WaitForSingleObject failed with WAIT_FAILED!"); + return 0; + } #endif const char *result = NULL; unsigned int i = 0; @@ -215,6 +273,8 @@ const char *vyGetFilePath(vy_file_id fid) { } #ifdef __linux__ pthread_mutex_unlock(&_file_tab.mutex); +#elif defined(_WIN32) + ReleaseMutex(_file_tab.mutex); #endif return result; } @@ -225,6 +285,8 @@ vy_fio_handle vyEnqueueRead(vy_file_id fid) { do { #ifdef __linux__ pthread_mutex_lock(&_queue.mutex); +#elif defined(_WIN32) + EnterCriticalSection(&_queue.critical_section); #endif if (_queue.ops[_queue.write_pos].flags == 0 || @@ -240,12 +302,17 @@ vy_fio_handle vyEnqueueRead(vy_file_id fid) { #ifdef __linux__ pthread_cond_signal(&_queue.pending_cond); pthread_mutex_unlock(&_queue.mutex); +#elif defined(_WIN32) + LeaveCriticalSection(&_queue.critical_section); + WakeAllConditionVariable(&_queue.pending_cond); #endif break; } #ifdef __linux__ pthread_mutex_unlock(&_queue.mutex); +#elif defined(_WIN32) + LeaveCriticalSection(&_queue.critical_section); #endif } while (1); @@ -257,6 +324,8 @@ void vyAbortFIO(vy_fio_handle fio) { return; #ifdef __linux__ pthread_mutex_lock(&_queue.mutex); +#elif defined(_WIN32) + EnterCriticalSection(&_queue.critical_section); #endif _queue.ops[fio - 1].flags = 0; if (_queue.ops[fio - 1].buffer.data) { @@ -265,6 +334,8 @@ void vyAbortFIO(vy_fio_handle fio) { } #ifdef __linux__ pthread_mutex_unlock(&_queue.mutex); +#elif defined(_WIN32) + LeaveCriticalSection(&_queue.critical_section); #endif } @@ -273,10 +344,14 @@ bool vyIsFIOFinished(vy_fio_handle fio) { return false; #ifdef __linux__ pthread_mutex_lock(&_queue.mutex); +#elif defined(_WIN32) + EnterCriticalSection(&_queue.critical_section); #endif bool result = (_queue.ops[fio - 1].flags & FLAGS_FINISHED) != 0; #ifdef __linux__ pthread_mutex_unlock(&_queue.mutex); +#elif defined(_WIN32) + LeaveCriticalSection(&_queue.critical_section); #endif return result; } @@ -287,6 +362,8 @@ bool vyRetrieveReadBuffer(vy_fio_handle fio, vy_file_buffer *buffer) { #ifdef __linux__ pthread_mutex_lock(&_queue.mutex); +#elif defined(_WIN32) + EnterCriticalSection(&_queue.critical_section); #endif bool is_finished = (_queue.ops[fio - 1].flags & FLAGS_FINISHED) != 0; @@ -302,6 +379,8 @@ bool vyRetrieveReadBuffer(vy_fio_handle fio, vy_file_buffer *buffer) { #ifdef __linux__ pthread_mutex_unlock(&_queue.mutex); +#elif defined(_WIN32) + LeaveCriticalSection(&_queue.critical_section); #endif return is_finished; @@ -342,8 +421,8 @@ static void ProcessRead(vy_file_op *op) { op->flags |= FLAGS_FINISHED; } -static void *FIOThreadProc(void *_param) { #ifdef __linux__ +static void *linuxFIOThreadProc(void *_param) { while (true) { pthread_mutex_lock(&_queue.mutex); while (_queue.write_pos == _queue.read_pos) { @@ -353,6 +432,23 @@ static void *FIOThreadProc(void *_param) { _queue.read_pos = (_queue.read_pos + 1) % _queue.size; pthread_mutex_unlock(&_queue.mutex); } -#endif return NULL; } +#elif defined(_WIN32) +static DWORD WINAPI win32FIOThreadProc(_In_ LPVOID lpParam) { + VY_UNUSED(lpParam); + + while (WaitForSingleObject(&_fio_term_event, 0) != WAIT_OBJECT_0) { + EnterCriticalSection(&_queue.critical_section); + while (_queue.write_pos == _queue.read_pos) { + SleepConditionVariableCS(&_queue.pending_cond, + &_queue.critical_section, + INFINITE); + } + ProcessRead(&_queue.ops[_queue.read_pos]); + _queue.read_pos = (_queue.read_pos + 1) % _queue.size; + LeaveCriticalSection(&_queue.critical_section); + } + return 0; +} +#endif \ No newline at end of file