#include "assets.h" #include "asset_dependencies.h" #include "aio.h" #include "buffer_manager.h" #include "config.h" #include "threading.h" #include "uidtab.h" #include #include RT_CVAR_I(rt_AssetCacheSize, "Number of asset cache entries. Default: 1024.", 1024); /* asset_loading.c */ extern rt_result DecompressAsset(void *compressed_buffer, size_t compressed_buffer_size, void **p_decompressed, size_t *p_decompressed_size); typedef enum { CACHE_ENTRY_STATE_FREE, CACHE_ENTRY_STATE_LOADING, CACHE_ENTRY_STATE_LOADED, } rt_asset_cache_entry_state; typedef struct rt_asset_cache_entry_s { rt_asset_cache_entry_state state; rt_aio_handle load; void *buffer; size_t size; int refcount; /* Reclaim list */ struct rt_asset_cache_entry_s *prev_reclaim; struct rt_asset_cache_entry_s *next_reclaim; } rt_asset_cache_entry; static rt_uid *_uids; static rt_asset_cache_entry *_entries; static rt_asset_cache_entry *_first_reclaim; static rt_asset_cache_entry *_last_reclaim; /* Locked as writer when modifiying entries, as reader when searching */ static rt_rwlock _lock; rt_result InitAssetCache(void) { _entries = calloc((size_t)rt_AssetCacheSize.i, sizeof(rt_asset_cache_entry)); if (!_entries) { return RT_BUFFER_ALLOC_FAILED; } _uids = calloc((size_t)rt_AssetCacheSize.i, sizeof(rt_uid)); if (!_uids) { free(_entries); return RT_BUFFER_ALLOC_FAILED; } rt_create_rwlock_result lock_res = rtCreateRWLock(); if (!lock_res.ok) { free(_entries); free(_uids); return RT_UNKNOWN_ERROR; } _lock = lock_res.lock; return RT_SUCCESS; } void ShutdownAssetCache(void) { free(_entries); free(_uids); rtDestroyRWLock(&_lock); _first_reclaim = NULL; _last_reclaim = NULL; } static void ReleaseEntry(rt_asset_cache_entry *entry) { if (entry->load != RT_AIO_INVALID_HANDLE) { rtWaitForAIOCompletion(entry->load); rtReleaseAIO(entry->load); entry->load = RT_AIO_INVALID_HANDLE; } rtReleaseBuffer(entry->buffer, entry->size); entry->buffer = NULL; entry->size = 0; entry->next_reclaim = NULL; entry->prev_reclaim = NULL; } static void GarbageCollect(void) { rtLockWrite(&_lock); rt_asset_cache_entry *entry = _first_reclaim; while (entry) { assert(entry->refcount == 0); rt_asset_cache_entry *next = entry->next_reclaim; if (entry->state == CACHE_ENTRY_STATE_LOADED) { ReleaseEntry(entry); _first_reclaim = next; } entry = next; } rtUnlockWrite(&_lock); } static rt_asset_cache_entry *GetEntry(rt_uid uid) { /* Hash lookup */ unsigned int mod = (unsigned int)rt_AssetCacheSize.i - 1; for (unsigned int i = 0; i < (unsigned int)rt_AssetCacheSize.i; ++i) { unsigned int slot = (uid + i) & mod; if (_uids[slot] == uid) { return &_entries[slot]; } else if (_uids[slot] == RT_INVALID_UID) { break; } } return NULL; } static bool IsAssetLoaded(rt_uid uid) { const rt_asset_cache_entry *entry = GetEntry(uid); if (entry) return entry->state == CACHE_ENTRY_STATE_LOADED || entry->state == CACHE_ENTRY_STATE_LOADING; else return false; } static int InsertEntry(rt_uid uid) { unsigned int mod = (unsigned int)rt_AssetCacheSize.i - 1; for (unsigned int i = 0; i < (unsigned int)rt_AssetCacheSize.i; ++i) { unsigned int slot = (uid + i) & mod; if (_uids[slot] == 0) { return (int)slot; } } return -1; } static rt_result InsertAndLoadAssets(const rt_uid *uids, size_t count) { rt_load_batch batch = {.num_loads = 0}; rt_result res = RT_SUCCESS; count = (count < RT_LOAD_BATCH_MAX_SIZE) ? count : RT_LOAD_BATCH_MAX_SIZE; rt_asset_cache_entry *load_entries[RT_LOAD_BATCH_MAX_SIZE]; for (size_t i = 0; i < count; ++i) { rtLockRead(&_lock); bool needs_load = !IsAssetLoaded(uids[i]); rtUnlockRead(&_lock); if (!needs_load) continue; rtLockWrite(&_lock); /* It's possible that another thread loaded the asset in the meantime */ if (!IsAssetLoaded(uids[i])) { const rt_uid_data *data = rtGetUIDData(uids[i]); if (!data) { rtUnlockWrite(&_lock); rtLog("ASSET_CACHE", "Failed to get uid data for uid %u", uids[i]); res = RT_UNKNOWN_ASSET; continue; } void *compressed_data = rtAllocBuffer(data->size); if (!compressed_data) { /* Try again after garbage collection */ rtUnlockWrite(&_lock); GarbageCollect(); compressed_data = rtAllocBuffer(data->size); if (!compressed_data) { rtLog("ASSET_CACHE", "Failed to allocate intermediate buffer for uid %u", uids[i]); res = RT_BUFFER_ALLOC_FAILED; continue; } rtLockWrite(&_lock); } int slot = InsertEntry(uids[i]); if (slot == -1) { rtUnlockWrite(&_lock); rtLog("ASSET_CACHE", "Failed to insert new entry for uid %u", uids[i]); res = RT_ASSET_CACHE_FULL; break; } rt_asset_cache_entry *entry = &_entries[slot]; load_entries[batch.num_loads] = entry; /* We set the refcount to 0, but don't insert the entry * into the reclaim list, to ensure that its buffer does not get freed * while the load still executes. Setting the refcount to 0 ensures * that the count is correct, once the asset is accessed the first time. */ entry->state = CACHE_ENTRY_STATE_LOADING; entry->refcount = 0; entry->buffer = compressed_data; entry->size = data->size; entry->next_reclaim = NULL; entry->prev_reclaim = NULL; entry->load = RT_AIO_INVALID_HANDLE; batch.loads[batch.num_loads].file = data->pkg_file; batch.loads[batch.num_loads].num_bytes = data->size; batch.loads[batch.num_loads].offset = data->offset; batch.loads[batch.num_loads].dest = compressed_data; ++batch.num_loads; } } rtUnlockWrite(&_lock); /* Dispatch the load */ rt_aio_handle handles[RT_LOAD_BATCH_MAX_SIZE]; if ((res = rtSubmitLoadBatch(&batch, handles)) != RT_SUCCESS) { rtLog("ASSET_CACHE", "Failed to submit %u asset loads.", batch.num_loads); return res; } /* Set the aio handles of the inserted entries */ rtLockWrite(&_lock); for (unsigned int i = 0; i < batch.num_loads; ++i) { load_entries[batch.num_loads]->load = handles[i]; } rtUnlockWrite(&_lock); return res; } static bool DecompressEntry(rt_uid uid, rt_asset_cache_entry *entry) { rtReleaseAIO(entry->load); entry->load = RT_AIO_INVALID_HANDLE; void *decompressed_buffer; size_t decompressed_size; rt_result dec_res = DecompressAsset(entry->buffer, entry->size, &decompressed_buffer, &decompressed_size); if (dec_res == RT_SUCCESS) { rtReleaseBuffer(entry->buffer, entry->size); entry->buffer = decompressed_buffer; entry->size = decompressed_size; entry->state = CACHE_ENTRY_STATE_LOADED; return true; } else if (dec_res == RT_BUFFER_ALLOC_FAILED) { GarbageCollect(); /* Try again */ if (DecompressAsset(entry->buffer, entry->size, &decompressed_buffer, &decompressed_size) == RT_SUCCESS) { rtReleaseBuffer(entry->buffer, entry->size); entry->buffer = decompressed_buffer; entry->size = decompressed_size; entry->state = CACHE_ENTRY_STATE_LOADED; return true; } /* Don't do anything yet. We might be able to to do this later, once some * buffers become free. */ rtLog("ASSET_CACHE", "Failed to decompress asset %u", uid); return false; } else { rtLog("ASSET_CACHE", "Failed to decompress asset %u", uid); ReleaseEntry(entry); ptrdiff_t idx = entry - _entries; _uids[idx] = RT_INVALID_UID; return false; } } static void CheckCompletedLoads(const rt_uid *uids, size_t count) { for (size_t i = 0; i < count; ++i) { rtLockRead(&_lock); volatile rt_asset_cache_entry *entry = (volatile rt_asset_cache_entry *)GetEntry(uids[i]); if (!entry) { rtUnlockRead(&_lock); rtLog("ASSET_CACHE", "Passed unknown uid %u to CheckCompletedLoads()", uids[i]); continue; } if (entry->state != CACHE_ENTRY_STATE_LOADING) { rtUnlockRead(&_lock); continue; } bool load_finished = rtGetAIOState(entry->load) == RT_AIO_STATE_FINISHED; rtUnlockRead(&_lock); if (load_finished) { rtLockWrite(&_lock); /* Ensure that no-one else handled this */ if (entry->state == CACHE_ENTRY_STATE_LOADING) { DecompressEntry(uids[i], (rt_asset_cache_entry *)entry); } rtUnlockWrite(&_lock); } } } RT_DLLEXPORT rt_get_asset_result rtGetAsset(rt_uid uid) { rt_get_asset_result result = { .result = RT_SUCCESS, }; rtLockRead(&_lock); bool needs_load = !IsAssetLoaded(uid); rtUnlockRead(&_lock); if (needs_load) { rt_uid load_uids[RT_LOAD_BATCH_MAX_SIZE]; size_t load_count = 1; load_uids[0] = uid; rt_asset_dependency_list deps = rtGetAssetDependencies(uid); for (size_t i = 0; i < deps.count && i < RT_LOAD_BATCH_MAX_SIZE - 1; ++i) { load_uids[i + 1] = deps.dependencies[i]; ++load_count; } result.result = InsertAndLoadAssets(load_uids, load_count); if (result.result == RT_SUCCESS) { CheckCompletedLoads(load_uids, load_count); } } rtLockWrite(&_lock); rt_asset_cache_entry *entry = GetEntry(uid); if (entry) { if (entry->state == CACHE_ENTRY_STATE_LOADED) { ++entry->refcount; result.data = entry->buffer; result.size = entry->size; } else if (entry->state == CACHE_ENTRY_STATE_LOADING) { if (entry->state == CACHE_ENTRY_STATE_LOADING) { assert(entry->load != RT_AIO_INVALID_HANDLE); ++entry->refcount; if (rtWaitForAIOCompletion(entry->load) == RT_AIO_STATE_FINISHED) { if (DecompressEntry(uid, entry)) { result.data = entry->buffer; result.size = entry->size; } else { result.result = RT_LOAD_FAILED; } } else { ReleaseEntry(entry); rtLog("ASSET_CACHE", "Failed to load asset %u", uid); result.result = RT_LOAD_FAILED; } } } /* Remove from the reclaim list */ if (_first_reclaim == entry) _first_reclaim = entry->next_reclaim; if (_last_reclaim == entry) _last_reclaim = entry->prev_reclaim; if (entry->next_reclaim) entry->next_reclaim->prev_reclaim = entry->prev_reclaim; if (entry->prev_reclaim) entry->prev_reclaim->next_reclaim = entry->next_reclaim; } rtUnlockWrite(&_lock); return result; } RT_DLLEXPORT void rtReleaseAsset(rt_uid uid) { rtLockWrite(&_lock); rt_asset_cache_entry *entry = GetEntry(uid); if (entry && entry->refcount > 0) { --entry->refcount; if (entry->refcount == 0) { /* add to the reclaim list */ if (_last_reclaim) _last_reclaim->next_reclaim = entry; if (!_first_reclaim) _first_reclaim = entry; entry->prev_reclaim = _last_reclaim; entry->next_reclaim = NULL; _last_reclaim = entry; } } rtUnlockWrite(&_lock); }