From 275f3b65f8558b9797872e5c3ad080d8a7a30220 Mon Sep 17 00:00:00 2001 From: Kevin Trogant Date: Sun, 31 Dec 2023 02:16:29 +0100 Subject: [PATCH] Untested asset cache - Added read-write lock implementation --- meson.build | 3 + src/runtime/asset_cache.c | 357 +++++++++++++++++++++++++++++++++ src/runtime/asset_loading.c | 66 +++--- src/runtime/assets.h | 9 + src/runtime/init.c | 10 + src/runtime/threading.h | 23 +++ src/runtime/threading_rwlock.c | 43 ++++ 7 files changed, 484 insertions(+), 27 deletions(-) create mode 100644 src/runtime/asset_cache.c create mode 100644 src/runtime/threading_rwlock.c diff --git a/meson.build b/meson.build index 8f8294c..d495701 100644 --- a/meson.build +++ b/meson.build @@ -67,6 +67,7 @@ runtime_lib = library('vyrt', 'src/runtime/aio.c', 'src/runtime/app.c', + 'src/runtime/asset_cache.c', 'src/runtime/asset_dependencies.c', 'src/runtime/asset_loading.c', 'src/runtime/buffer_manager.c', @@ -80,11 +81,13 @@ runtime_lib = library('vyrt', 'src/runtime/text.c', 'src/runtime/threading_cond.c', 'src/runtime/threading_mutex.c', + 'src/runtime/threading_rwlock.c', 'src/runtime/threading_thread.c', 'src/runtime/uidtab.c', # Contrib Sources 'contrib/xxhash/xxhash.c', + 'contrib/lz4/lz4.c', dependencies : [thread_dep, m_dep, windowing_dep], include_directories : incdir, c_pch : 'pch/rt_pch.h') diff --git a/src/runtime/asset_cache.c b/src/runtime/asset_cache.c new file mode 100644 index 0000000..7dd3a78 --- /dev/null +++ b/src/runtime/asset_cache.c @@ -0,0 +1,357 @@ +#include "assets.h" +#include "asset_dependencies.h" +#include "aio.h" +#include "buffer_manager.h" +#include "config.h" +#include "threading.h" +#include "uidtab.h" + +#include +#include + +VY_CVAR_I(rt_AssetCacheSize, "Number of asset cache entries. Default: 1024.", 1024); + +/* asset_loading.c */ +extern vy_result DecompressAsset(void *compressed_buffer, + size_t compressed_buffer_size, + void **p_decompressed, + size_t *p_decompressed_size); + +typedef enum { + CACHE_ENTRY_STATE_FREE, + CACHE_ENTRY_STATE_LOADING, + CACHE_ENTRY_STATE_LOADED, +} vy_asset_cache_entry_state; + +typedef struct vy_asset_cache_entry_s { + vy_asset_cache_entry_state state; + vy_aio_handle load; + void *buffer; + size_t size; + int refcount; + + /* Reclaim list */ + struct vy_asset_cache_entry_s *prev_reclaim; + struct vy_asset_cache_entry_s *next_reclaim; +} vy_asset_cache_entry; + +static vy_uid *_uids; +static vy_asset_cache_entry *_entries; +static vy_asset_cache_entry *_first_reclaim; +static vy_asset_cache_entry *_last_reclaim; + +/* Locked as writer when modifiying entries, as reader when searching */ +static vy_rwlock _lock; + +vy_result InitAssetCache(void) { + _entries = calloc((size_t)rt_AssetCacheSize.i, sizeof(vy_asset_cache_entry)); + if (!_entries) { + return VY_BUFFER_ALLOC_FAILED; + } + _uids = calloc((size_t)rt_AssetCacheSize.i, sizeof(vy_uid)); + if (!_uids) { + free(_entries); + return VY_BUFFER_ALLOC_FAILED; + } + vy_create_rwlock_result lock_res = vyCreateRWLock(); + if (!lock_res.ok) { + free(_entries); + free(_uids); + return VY_UNKNOWN_ERROR; + } + _lock = lock_res.lock; + return VY_SUCCESS; +} + +void ShutdownAssetCache(void) { + free(_entries); + free(_uids); + vyDestroyRWLock(&_lock); + _first_reclaim = NULL; + _last_reclaim = NULL; +} + +static void ReleaseEntry(vy_asset_cache_entry *entry) { + if (entry->load != VY_AIO_INVALID_HANDLE) { + vyWaitForAIOCompletion(entry->load); + vyReleaseAIO(entry->load); + entry->load = VY_AIO_INVALID_HANDLE; + } + vyReleaseBuffer(entry->buffer, entry->size); + entry->buffer = NULL; + entry->size = 0; + entry->next_reclaim = NULL; + entry->prev_reclaim = NULL; +} + +static void GarbageCollect(void) { + vyLockWrite(&_lock); + vy_asset_cache_entry *entry = _first_reclaim; + while (entry) { + assert(entry->refcount == 0); + vy_asset_cache_entry *next = entry->next_reclaim; + if (entry->state == CACHE_ENTRY_STATE_LOADED) { + ReleaseEntry(entry); + _first_reclaim = next; + } + entry = next; + } + vyUnlockWrite(&_lock); +} + +static vy_asset_cache_entry *GetEntry(vy_uid uid) { + /* Hash lookup */ + unsigned int mod = (unsigned int)rt_AssetCacheSize.i - 1; + for (unsigned int i = 0; i < (unsigned int)rt_AssetCacheSize.i; ++i) { + unsigned int slot = (uid + i) & mod; + if (_uids[slot] == uid) { + return &_entries[slot]; + } else if (_uids[slot] == VY_INVALID_UID) { + break; + } + } + return NULL; +} + + +static bool IsAssetLoaded(vy_uid uid) { + const vy_asset_cache_entry *entry = GetEntry(uid); + if (entry) + return entry->state == CACHE_ENTRY_STATE_LOADED || + entry->state == CACHE_ENTRY_STATE_LOADING; + else + return false; +} + +static int InsertEntry(vy_uid uid) { + unsigned int mod = (unsigned int)rt_AssetCacheSize.i - 1; + for (unsigned int i = 0; i < (unsigned int)rt_AssetCacheSize.i; ++i) { + unsigned int slot = (uid + i) & mod; + if (_uids[slot] == 0) { + return (int)slot; + } + } + return -1; +} +static vy_result InsertAndLoadAssets(const vy_uid *uids, size_t count) { + vy_load_batch batch = {.num_loads = 0}; + + vy_result res = VY_SUCCESS; + + count = (count < VY_LOAD_BATCH_MAX_SIZE) ? count : VY_LOAD_BATCH_MAX_SIZE; + vy_asset_cache_entry *load_entries[VY_LOAD_BATCH_MAX_SIZE]; + + for (size_t i = 0; i < count; ++i) { + vyLockRead(&_lock); + bool needs_load = !IsAssetLoaded(uids[i]); + vyUnlockRead(&_lock); + + if (!needs_load) + continue; + + vyLockWrite(&_lock); + /* It's possible that another thread loaded the asset in the meantime */ + if (!IsAssetLoaded(uids[i])) { + const vy_uid_data *data = vyGetUIDData(uids[i]); + if (!data) { + vyUnlockWrite(&_lock); + vyLog("ASSET_CACHE", "Failed to get uid data for uid %u", uids[i]); + res = VY_UNKNOWN_ASSET; + continue; + } + + void *compressed_data = vyAllocBuffer(data->size); + if (!compressed_data) { + /* Try again after garbage collection */ + vyUnlockWrite(&_lock); + GarbageCollect(); + compressed_data = vyAllocBuffer(data->size); + if (!compressed_data) { + vyLog("ASSET_CACHE", + "Failed to allocate intermediate buffer for uid %u", + uids[i]); + res = VY_BUFFER_ALLOC_FAILED; + continue; + } + vyLockWrite(&_lock); + } + + int slot = InsertEntry(uids[i]); + if (slot == -1) { + vyUnlockWrite(&_lock); + vyLog("ASSET_CACHE", "Failed to insert new entry for uid %u", uids[i]); + res = VY_ASSET_CACHE_FULL; + break; + } + + vy_asset_cache_entry *entry = &_entries[slot]; + load_entries[batch.num_loads] = entry; + + /* We set the refcount to 0, but don't insert the entry + * into the reclaim list, to ensure that its buffer does not get freed + * while the load still executes. Setting the refcount to 0 ensures + * that the count is correct, once the asset is accessed the first time. */ + entry->state = CACHE_ENTRY_STATE_LOADING; + entry->refcount = 0; + entry->buffer = compressed_data; + entry->size = data->size; + entry->next_reclaim = NULL; + entry->prev_reclaim = NULL; + entry->load = VY_AIO_INVALID_HANDLE; + + batch.loads[batch.num_loads].file = data->pkg_file; + batch.loads[batch.num_loads].num_bytes = data->size; + batch.loads[batch.num_loads].offset = data->offset; + batch.loads[batch.num_loads].dest = compressed_data; + ++batch.num_loads; + } + } + vyUnlockWrite(&_lock); + + /* Dispatch the load */ + vy_aio_handle handles[VY_LOAD_BATCH_MAX_SIZE]; + if ((res = vySubmitLoadBatch(&batch, handles)) != VY_SUCCESS) { + vyLog("ASSET_CACHE", "Failed to submit %u asset loads.", batch.num_loads); + return res; + } + + /* Set the aio handles of the inserted entries */ + vyLockWrite(&_lock); + for (unsigned int i = 0; i < batch.num_loads; ++i) { + load_entries[batch.num_loads]->load = handles[i]; + } + vyUnlockWrite(&_lock); + + return res; +} + +static bool DecompressEntry(vy_uid uid, vy_asset_cache_entry *entry) { + vyReleaseAIO(entry->load); + entry->load = VY_AIO_INVALID_HANDLE; + + void *decompressed_buffer; + size_t decompressed_size; + vy_result dec_res = + DecompressAsset(entry->buffer, entry->size, &decompressed_buffer, &decompressed_size); + if (dec_res == VY_SUCCESS) { + vyReleaseBuffer(entry->buffer, entry->size); + entry->buffer = decompressed_buffer; + entry->size = decompressed_size; + entry->state = CACHE_ENTRY_STATE_LOADED; + return true; + } else if (dec_res == VY_BUFFER_ALLOC_FAILED) { + GarbageCollect(); + /* Try again */ + if (DecompressAsset(entry->buffer, entry->size, &decompressed_buffer, &decompressed_size) == + VY_SUCCESS) { + vyReleaseBuffer(entry->buffer, entry->size); + entry->buffer = decompressed_buffer; + entry->size = decompressed_size; + entry->state = CACHE_ENTRY_STATE_LOADED; + return true; + } + /* Don't do anything yet. We might be able to to do this later, once some + * buffers become free. */ + vyLog("ASSET_CACHE", "Failed to decompress asset %u", uid); + return false; + } else { + vyLog("ASSET_CACHE", "Failed to decompress asset %u", uid); + ReleaseEntry(entry); + + ptrdiff_t idx = entry - _entries; + _uids[idx] = VY_INVALID_UID; + return false; + } +} + +static void CheckCompletedLoads(const vy_uid *uids, size_t count) { + for (size_t i = 0; i < count; ++i) { + vyLockRead(&_lock); + volatile vy_asset_cache_entry *entry = (volatile vy_asset_cache_entry *)GetEntry(uids[i]); + if (!entry) { + vyUnlockRead(&_lock); + vyLog("ASSET_CACHE", "Passed unknown uid %u to CheckCompletedLoads()", uids[i]); + continue; + } + + if (entry->state != CACHE_ENTRY_STATE_LOADING) { + vyUnlockRead(&_lock); + continue; + } + bool load_finished = vyGetAIOState(entry->load) == VY_AIO_STATE_FINISHED; + vyUnlockRead(&_lock); + + if (load_finished) { + vyLockWrite(&_lock); + /* Ensure that no-one else handled this */ + if (entry->state == CACHE_ENTRY_STATE_LOADING) { + DecompressEntry(uids[i], (vy_asset_cache_entry *)entry); + } + vyUnlockWrite(&_lock); + } + } +} + +VY_DLLEXPORT vy_get_asset_result vyGetAsset(vy_uid uid) { + vy_get_asset_result result = { + .result = VY_SUCCESS, + }; + + vyLockRead(&_lock); + bool needs_load = !IsAssetLoaded(uid); + vyUnlockRead(&_lock); + + if (needs_load) { + vy_uid load_uids[VY_LOAD_BATCH_MAX_SIZE]; + size_t load_count = 1; + load_uids[0] = uid; + + vy_asset_dependency_list deps = vyGetAssetDependencies(uid); + for (size_t i = 0; i < deps.count && i < VY_LOAD_BATCH_MAX_SIZE - 1; ++i) { + load_uids[i + 1] = deps.dependencies[i]; + ++load_count; + } + + result.result = InsertAndLoadAssets(load_uids, load_count); + if (result.result == VY_SUCCESS) { + CheckCompletedLoads(load_uids, load_count); + } + } + + vyLockRead(&_lock); + volatile vy_asset_cache_entry *entry = GetEntry(uid); + if (entry) { + if (entry->state == CACHE_ENTRY_STATE_LOADED) { + ++entry->refcount; + result.data = entry->buffer; + result.size = entry->size; + } else if (entry->state == CACHE_ENTRY_STATE_LOADING) { + /* Promote to write lock */ + vyUnlockRead(&_lock); + vyLockWrite(&_lock); + if (entry->state == CACHE_ENTRY_STATE_LOADING) { + assert(entry->load != VY_AIO_INVALID_HANDLE); + ++entry->refcount; + if (vyWaitForAIOCompletion(entry->load) == VY_AIO_STATE_FINISHED) { + if (DecompressEntry(uid, (vy_asset_cache_entry *)entry)) { + result.data = entry->buffer; + result.size = entry->size; + } else { + result.result = VY_LOAD_FAILED; + } + } else { + ReleaseEntry((vy_asset_cache_entry *)entry); + vyLog("ASSET_CACHE", "Failed to load asset %u", uid); + result.result = VY_LOAD_FAILED; + } + } + vyUnlockWrite(&_lock); + + /* To match the unlock below */ + vyLockRead(&_lock); + } + } + vyUnlockRead(&_lock); + + return result; +} \ No newline at end of file diff --git a/src/runtime/asset_loading.c b/src/runtime/asset_loading.c index 6d2e157..bc42b63 100644 --- a/src/runtime/asset_loading.c +++ b/src/runtime/asset_loading.c @@ -8,6 +8,40 @@ #include "lz4/lz4.h" +vy_result DecompressAsset(void *compressed_buffer, + size_t compressed_buffer_size, + void **p_decompressed, + size_t *p_decompressed_size) { + + const vy_package_asset_header *header = compressed_buffer; + + size_t compressed_size = (compressed_buffer_size) - sizeof(*header); + XXH64_hash_t calculated_hash = XXH3_64bits((header + 1), compressed_size); + XXH64_hash_t file_hash = XXH64_hashFromCanonical(&header->checksum); + if (calculated_hash != file_hash) { + vyLog("core", "Checksum mismatch for asset"); + return VY_LOAD_FAILED; + } + + size_t size = (size_t)header->decompressed_size; + void *decompressed_buffer = vyAllocBuffer(size); + if (!decompressed_buffer) { + return VY_BUFFER_ALLOC_FAILED; + } + + if (LZ4_decompress_safe((const char *)(header + 1), + (char *)decompressed_buffer, + (int)compressed_size, + (int)size) < 0) { + return VY_UNKNOWN_ERROR; + } + + *p_decompressed = decompressed_buffer; + *p_decompressed_size = size; + return VY_SUCCESS; +} + + VY_DLLEXPORT vy_result vyLoadAssetDirect(vy_uid uid, void **p_buffer, size_t *p_size) { const vy_uid_data *data = vyGetUIDData(uid); if (!data) @@ -28,35 +62,13 @@ VY_DLLEXPORT vy_result vyLoadAssetDirect(vy_uid uid, void **p_buffer, size_t *p_ return VY_LOAD_FAILED; } - const vy_package_asset_header *header = compressed_buffer; - - size_t compressed_size = (data->size) - sizeof(*header); - XXH64_hash_t calculated_hash = XXH3_64bits((header + 1), compressed_size); - XXH64_hash_t file_hash = XXH64_hashFromCanonical(&header->checksum); - if (calculated_hash != file_hash) { - vyLog("core", "Checksum mismatch for asset %u", uid); - vyReleaseBuffer(compressed_buffer, data->size); - return VY_LOAD_FAILED; - } + void *decompressed_buffer; + size_t decompressed_size; + vy_result res = DecompressAsset(compressed_buffer, data->size, &decompressed_buffer, &decompressed_size); - size_t size = (size_t)header->decompressed_size; - void *decompressed_buffer = vyAllocBuffer(size); - if (!decompressed_buffer) { - vyReleaseBuffer(compressed_buffer, data->size); - return VY_BUFFER_ALLOC_FAILED; - } - - if (LZ4_decompress_safe((const char *)(header + 1), - (char *)decompressed_buffer, - (int)compressed_size, - (int)size) < 0) { - vyReleaseBuffer(compressed_buffer, data->size); - return VY_LOAD_FAILED; - } - vyReleaseBuffer(compressed_buffer, data->size); *p_buffer = decompressed_buffer; - *p_size = size; + *p_size = decompressed_size; - return VY_SUCCESS; + return res; } \ No newline at end of file diff --git a/src/runtime/assets.h b/src/runtime/assets.h index 6c7e901..b954cd4 100644 --- a/src/runtime/assets.h +++ b/src/runtime/assets.h @@ -23,9 +23,18 @@ enum { VY_UNKNOWN_ASSET = VY_SUCCESS + 1, VY_BUFFER_ALLOC_FAILED, VY_LOAD_FAILED, + VY_ASSET_CACHE_FULL, }; /* Load an asset without using the cache */ VY_DLLEXPORT vy_result vyLoadAssetDirect(vy_uid uid, void **buffer, size_t *size); +typedef struct { + void *data; + size_t size; + vy_result result; +} vy_get_asset_result; + +VY_DLLEXPORT vy_get_asset_result vyGetAsset(vy_uid uid); + #endif \ No newline at end of file diff --git a/src/runtime/init.c b/src/runtime/init.c index c9682af..93ae830 100644 --- a/src/runtime/init.c +++ b/src/runtime/init.c @@ -12,6 +12,7 @@ extern vy_cvar rt_WindowHeight; extern vy_cvar rt_BufferManagerMemory; extern vy_cvar rt_FileTabCapacity; extern vy_cvar rt_MaxConcurrentAsyncIO; +extern vy_cvar rt_AssetCacheSize; void RegisterRuntimeCVars(void) { vyRegisterCVAR(&rt_Renderer); @@ -21,6 +22,7 @@ void RegisterRuntimeCVars(void) { vyRegisterCVAR(&rt_BufferManagerMemory); vyRegisterCVAR(&rt_FileTabCapacity); vyRegisterCVAR(&rt_MaxConcurrentAsyncIO); + vyRegisterCVAR(&rt_AssetCacheSize); } extern void SetMainThreadId(void); @@ -31,6 +33,8 @@ extern vy_result InitFileTab(void); extern void ShutdownFileTab(void); extern vy_result InitAIO(void); extern void ShutdownAIO(void); +extern vy_result InitAssetCache(void); +extern void ShutdownAssetCache(void); extern vy_result LoadUIDTable(void); extern void ReleaseUIDTable(void); @@ -58,6 +62,11 @@ VY_DLLEXPORT vy_result vyInitRuntime(void) { return res; } + if ((res = InitAssetCache()) != VY_SUCCESS) { + vyReportError("ASSETCACHE", "Init failed."); + return res; + } + if ((res = LoadUIDTable()) != VY_SUCCESS) { vyLog("CORE", "LoadUIDTable returned result: %u", res); } @@ -77,6 +86,7 @@ VY_DLLEXPORT vy_result vyInitRuntime(void) { VY_DLLEXPORT void vyShutdownRuntime(void) { ReleaseAssetDependencies(); ReleaseUIDTable(); + ShutdownAssetCache(); ShutdownAIO(); ShutdownFileTab(); ShutdownBufferManager(); diff --git a/src/runtime/threading.h b/src/runtime/threading.h index 687719f..d0279b3 100644 --- a/src/runtime/threading.h +++ b/src/runtime/threading.h @@ -34,6 +34,29 @@ VY_DLLEXPORT void vyUnlockConditionVar(vy_condition_var *var, bool signal); /* The condition variable must be locked by the thread! */ VY_DLLEXPORT void vyWaitOnConditionVar(vy_condition_var *var); +/* Read-Write Lock */ +typedef struct { + volatile int reader_count; + vy_condition_var *cond; +} vy_rwlock; + +typedef struct { + bool ok; + vy_rwlock lock; +} vy_create_rwlock_result; + +VY_DLLEXPORT vy_create_rwlock_result vyCreateRWLock(void); + +VY_DLLEXPORT void vyDestroyRWLock(vy_rwlock *lock); + +VY_DLLEXPORT void vyLockRead(vy_rwlock *lock); + +VY_DLLEXPORT void vyUnlockRead(vy_rwlock *lock); + +VY_DLLEXPORT void vyLockWrite(vy_rwlock *lock); + +VY_DLLEXPORT void vyUnlockWrite(vy_rwlock *lock); + /* Threads */ typedef struct vy_thread_s vy_thread; diff --git a/src/runtime/threading_rwlock.c b/src/runtime/threading_rwlock.c new file mode 100644 index 0000000..7400a49 --- /dev/null +++ b/src/runtime/threading_rwlock.c @@ -0,0 +1,43 @@ +#include "threading.h" + +#include + +/* Based on: https://eli.thegreenplace.net/2019/implementing-reader-writer-locks/ */ + +VY_DLLEXPORT vy_create_rwlock_result vyCreateRWLock(void) { + vy_create_rwlock_result res; + res.lock.reader_count = 0; + res.lock.cond = vyCreateConditionVar(); + res.ok = res.lock.cond != NULL; + return res; +} + +VY_DLLEXPORT void vyDestroyRWLock(vy_rwlock *lock) { + vyDestroyConditionVar(lock->cond); + lock->cond = NULL; + lock->reader_count = 0; +} + +VY_DLLEXPORT void vyLockRead(vy_rwlock *lock) { + vyLockConditionVar(lock->cond); + ++lock->reader_count; + vyUnlockConditionVar(lock->cond, false); +} + +VY_DLLEXPORT void vyUnlockRead(vy_rwlock *lock) { + vyLockConditionVar(lock->cond); + assert(lock->reader_count > 0); + --lock->reader_count; + bool signal = lock->reader_count == 0; + vyUnlockConditionVar(lock->cond, signal); +} + +VY_DLLEXPORT void vyLockWrite(vy_rwlock *lock) { + vyLockConditionVar(lock->cond); + while (lock->reader_count > 0) + vyWaitOnConditionVar(lock->cond); +} + +VY_DLLEXPORT void vyUnlockWrite(vy_rwlock *lock) { + vyUnlockConditionVar(lock->cond, true); +}