#include "asset_compiler.h" #include "processor.h" #include "runtime/aio.h" #include "runtime/buffer_manager.h" #include "runtime/config.h" #include "runtime/file_tab.h" #include "runtime/fsutils.h" #include "runtime/mem_arena.h" #include "runtime/resources.h" #include "runtime/runtime.h" #include "runtime/threading.h" #include #include #include #include typedef struct { uint64_t last_processed; rt_resource_id resources[RT_MAX_RESOURCES_PER_ASSET]; unsigned int resource_count; bool in_processing; } rt_asset_data; typedef struct { rt_file_id *files; rt_asset_data *data; rt_rwlock lock; } rt_asset_db; typedef struct { const char *file_ext; rt_asset_processor_fn *proc; } rt_asset_processor; typedef struct { unsigned int processor_index; unsigned int db_index; rt_file_id fid; } rt_processing_queue_entry; typedef struct { rt_processing_queue_entry entries[1024]; unsigned int head; unsigned int tail; rt_condition_var *lock; } rt_processing_queue; RT_CVAR_S(rt_AssetDirectory, "Name of the asset directory. Default: assets", "assets"); RT_CVAR_I(rt_AssetDBSize, "Size of the asset database. Default: 1024", 1024); RT_CVAR_I(rt_AssetProcessingThreads, "Number of asset processing threads. Default: 4", 1); RT_CVAR_I(rt_AssetProcessorArenaSize, "Size of the per-thread asset processor arena. Default: 128 MiB", (int)RT_MB(128)); #define MAX_PROCESSING_THREADS 8 static rt_thread *_compiler_thread; static rt_thread *_processing_threads[MAX_PROCESSING_THREADS]; static bool _keep_running = true; static rt_asset_db _asset_db; static rt_processing_queue _processing_queue; extern RT_ASSET_PROCESSOR_FN(PipelineProcessor); extern RT_ASSET_PROCESSOR_FN(FramegraphProcessor); static rt_asset_processor _processors[] = { {.file_ext = ".pipeline", .proc = PipelineProcessor}, {.file_ext = ".framegraph", .proc = FramegraphProcessor}, }; static void ProcessorThreadEntry(void *); static void CompilerThreadEntry(void *); RT_DLLEXPORT void rtRegisterAssetCompilerCVars(void) { rtRegisterCVAR(&rt_AssetDirectory); rtRegisterCVAR(&rt_AssetDBSize); rtRegisterCVAR(&rt_AssetProcessingThreads); rtRegisterCVAR(&rt_AssetProcessorArenaSize); } RT_DLLEXPORT rt_result rtInitAssetCompiler(void) { unsigned int db_size = (unsigned int)rt_AssetDBSize.i; void *mem = malloc((sizeof(rt_file_id) + sizeof(rt_asset_data)) * db_size); if (!mem) return RT_OUT_OF_MEMORY; _asset_db.files = mem; _asset_db.data = (rt_asset_data *)(_asset_db.files + db_size); memset(mem, 0, (sizeof(rt_file_id) + sizeof(rt_asset_data)) * db_size); rt_create_rwlock_result lock_create = rtCreateRWLock(); if (!lock_create.ok) { free(mem); return RT_UNKNOWN_ERROR; } _asset_db.lock = lock_create.lock; _processing_queue.lock = rtCreateConditionVar(); if (!_processing_queue.lock) { free(mem); return RT_UNKNOWN_ERROR; } _keep_running = true; _compiler_thread = rtSpawnThread(CompilerThreadEntry, NULL, "AssetCompilerThread"); if (!_compiler_thread) return RT_UNKNOWN_ERROR; if (rt_AssetProcessingThreads.i > MAX_PROCESSING_THREADS) rt_AssetProcessingThreads.i = MAX_PROCESSING_THREADS; for (int i = 0; i < rt_AssetProcessingThreads.i; ++i) { char name[64]; rtSPrint(name, 64, "AssetProcessorThread %d", i); _processing_threads[i] = rtSpawnThread(ProcessorThreadEntry, NULL, name); if (!_processing_threads[i]) { /* Wake the processing threads */ rtLockConditionVar(_processing_queue.lock); rtUnlockConditionVar(_processing_queue.lock, true); _keep_running = false; for (int j = 0; j < i; ++j) rtJoinThread(_processing_threads[j]); rtJoinThread(_compiler_thread); free(mem); rtDestroyConditionVar(_processing_queue.lock); return RT_UNKNOWN_ERROR; } } return RT_SUCCESS; } RT_DLLEXPORT void rtShutdownAssetCompiler(void) { _keep_running = false; /* Wake the threads */ rtLockConditionVar(_processing_queue.lock); rtUnlockConditionVar(_processing_queue.lock, true); rtJoinThread(_compiler_thread); for (int i = 0; i < rt_AssetProcessingThreads.i; ++i) rtJoinThread(_processing_threads[i]); free(_asset_db.files); rtDestroyConditionVar(_processing_queue.lock); rtDestroyRWLock(&_asset_db.lock); } static int DiscoverAssets(void) { /* Recursive descend into the asset directory */ #define MAX_DISCOVERY_DEPTH 64 #define MAX_FILENAME_LEN 260 static char directory_stack[MAX_DISCOVERY_DEPTH][MAX_FILENAME_LEN]; static unsigned int path_lens[MAX_DISCOVERY_DEPTH]; unsigned int top = 0; memcpy(directory_stack[0], rt_AssetDirectory.s, (strlen(rt_AssetDirectory.s) <= (MAX_FILENAME_LEN - 1)) ? strlen(rt_AssetDirectory.s) : MAX_FILENAME_LEN); directory_stack[0][MAX_FILENAME_LEN - 1] = '\0'; ++top; int discovery_count = 0; while (top > 0) { /* Process the directory */ char dir[MAX_FILENAME_LEN]; memcpy(dir, directory_stack[top - 1], MAX_FILENAME_LEN); --top; /* Append to the path */ size_t dir_len = strlen(dir); rt_scandir_handle *scan = rtScanDirectory(dir); if (!scan) continue; rt_dirent entry; do { entry = rtNextDirectoryEntry(scan); if (entry.name[0] == '.') continue; size_t entry_name_len = strlen(entry.name); if (dir_len + entry_name_len + 1 >= MAX_FILENAME_LEN) { rtLog("AC", "Could not process %s\\%s because the name exceeds the maximum " "allowed path length %d.", dir, entry.name, MAX_FILENAME_LEN); continue; } if (entry.type == RT_DIRENT_TYPE_DIRECTORY) { /* Push on stack */ assert(MAX_FILENAME_LEN == RT_ARRAY_COUNT(entry.name)); memcpy(directory_stack[top], dir, MAX_FILENAME_LEN); directory_stack[top][dir_len] = '/'; memcpy(&directory_stack[top][dir_len + 1], entry.name, entry_name_len); directory_stack[top][dir_len + entry_name_len + 1] = '\0'; ++top; } else { /* Check if the asset is new */ char file[MAX_FILENAME_LEN]; memcpy(file, dir, MAX_FILENAME_LEN); file[dir_len] = '/'; memcpy(&file[dir_len + 1], entry.name, entry_name_len); file[dir_len + entry_name_len + 1] = '\0'; rt_file_id fid = rtAddFile(file); unsigned int i = 0; rtLockWrite(&_asset_db.lock); while (i < (unsigned int)rt_AssetDBSize.i) { unsigned int slot = (fid + i) % (unsigned int)rt_AssetDBSize.i; if (_asset_db.files[slot] == fid) { break; } else if (_asset_db.files[slot] == 0) { _asset_db.files[slot] = fid; _asset_db.data[slot].last_processed = 0; memset(&_asset_db.data[slot].resources, 0, sizeof(_asset_db.data[slot].resources)); _asset_db.data[slot].resource_count = 0; _asset_db.data[slot].in_processing = false; ++discovery_count; break; } ++i; } rtUnlockWrite(&_asset_db.lock); if (i == (unsigned int)rt_AssetDBSize.i) { rtLog("AC", "Failed to add %s to AssetDB, because no free slots are left.", file); } } } while (!entry.is_last); rtCloseDirectory(scan); } return discovery_count; #undef MAX_DISCOVERY_DEPTH #undef MAX_FILENAME_LEN } static int CheckUpdatedAssets(void) { int updated_count = 0; for (int i = 0; i < rt_AssetDBSize.i; ++i) { rtLockRead(&_asset_db.lock); if (_asset_db.files[i] == 0) { rtUnlockRead(&_asset_db.lock); continue; } const char *path = rtGetFilePath(_asset_db.files[i]); uint64_t last_changed = rtGetFileModificationTimestamp(path); if (!_asset_db.data[i].in_processing && _asset_db.data[i].last_processed < last_changed) { /* Check that we have not already added this file */ rtLockConditionVar(_processing_queue.lock); bool already_in_queue = false; for (size_t entry_idx = _processing_queue.head; entry_idx != _processing_queue.tail; entry_idx = (entry_idx + 1) % RT_ARRAY_COUNT(_processing_queue.entries)) { if (_processing_queue.entries[entry_idx].fid == _asset_db.files[i]) { already_in_queue = true; break; } } rtUnlockConditionVar(_processing_queue.lock, false); if (already_in_queue) { rtUnlockRead(&_asset_db.lock); continue; } const char *ext = path + strlen(path); while (*ext != '.' && ext != path) --ext; if (*ext != '.') break; bool found_processor = false; for (unsigned int j = 0; j < RT_ARRAY_COUNT(_processors); ++j) { if (strcmp(ext, _processors[j].file_ext) == 0) { rt_processing_queue_entry entry; entry.fid = _asset_db.files[i]; entry.processor_index = j; entry.db_index = i; found_processor = true; while (true) { bool inserted = false; rtLockConditionVar(_processing_queue.lock); unsigned int next_tail = (_processing_queue.tail + 1) % RT_ARRAY_COUNT(_processing_queue.entries); if (next_tail != _processing_queue.head) { _processing_queue.entries[_processing_queue.tail] = entry; _processing_queue.tail = next_tail; inserted = true; } rtUnlockConditionVar(_processing_queue.lock, inserted); if (inserted) break; } } } if (found_processor) ++updated_count; } rtUnlockRead(&_asset_db.lock); } return updated_count; } static void CompilerThreadEntry(void *param) { RT_UNUSED(param); /* Mainloop. Watch for changed or new assets, compile them * and notify the appropriate system */ while (_keep_running) { int d = DiscoverAssets(); int u = CheckUpdatedAssets(); if (d == 0 && u == 0) rtSleep(100); } } static void ProcessorThreadEntry(void *param) { RT_UNUSED(param); rt_create_arena_result arena_res = rtCreateArena(NULL, (size_t)rt_AssetProcessorArenaSize.i); if (!arena_res.ok) { rtLog("AC", "Failed to allocate %d bytes for the processing arena.", rt_AssetProcessorArenaSize.i); return; } rt_arena arena = arena_res.arena; while (_keep_running) { rtLockConditionVar(_processing_queue.lock); while (_keep_running && (_processing_queue.tail == _processing_queue.head)) rtWaitOnConditionVar(_processing_queue.lock); bool got_entry = false; rt_processing_queue_entry entry = {0}; if (_processing_queue.tail != _processing_queue.head) { entry = _processing_queue.entries[_processing_queue.head]; _processing_queue.head = (_processing_queue.head + 1) % RT_ARRAY_COUNT(_processing_queue.entries); got_entry = true; } rtUnlockConditionVar(_processing_queue.lock, false); if (!got_entry) continue; rtLockWrite(&_asset_db.lock); _asset_db.data[entry.db_index].in_processing = true; rtUnlockWrite(&_asset_db.lock); const char *path = rtGetFilePath(entry.fid); rtLog("AC", "Processing %s", path); rtArenaClear(&arena); rt_resource_id existing_resources[RT_MAX_RESOURCES_PER_ASSET]; unsigned int existing_resource_count; rtLockRead(&_asset_db.lock); memcpy(existing_resources, _asset_db.data[entry.db_index].resources, sizeof(existing_resources)); existing_resource_count = _asset_db.data[entry.db_index].resource_count; rtUnlockRead(&_asset_db.lock); rt_resource_id new_resources[RT_MAX_RESOURCES_PER_ASSET]; memset(&new_resources, 0, sizeof(new_resources)); unsigned int new_resource_count = 0; rt_result res = _processors[entry.processor_index].proc(entry.fid, existing_resource_count, existing_resources, &new_resource_count, new_resources, &arena); if (res != RT_SUCCESS) { rtLog("AC", "Failed to process %s: %u", path, res); } rtLockWrite(&_asset_db.lock); _asset_db.data[entry.db_index].last_processed = rtGetCurrentTimestamp(); _asset_db.data[entry.db_index].in_processing = false; memcpy(_asset_db.data[entry.db_index].resources, new_resources, sizeof(new_resources)); _asset_db.data[entry.db_index].resource_count = new_resource_count; rtUnlockWrite(&_asset_db.lock); } } RT_DLLEXPORT void rtWaitForAssetProcessing(void) { int turns = 10; while (turns > 0) { int in_processing_count = 0; for (int i = 0; i < rt_AssetDBSize.i; ++i) { rtLockRead(&_asset_db.lock); if (_asset_db.files[i] != RT_INVALID_FILE_ID && _asset_db.data[i].in_processing) ++in_processing_count; rtUnlockRead(&_asset_db.lock); } if (!in_processing_count) { --turns; rtSleep(250); } } } /* Utilities for asset processors*/ rt_loaded_asset LoadAsset(rt_file_id file) { const char *path = rtGetFilePath(file); size_t file_size = rtGetFileSize(path); void *buffer = rtAllocBuffer(file_size); if (!buffer) { rtLog("AC", "Failed to allocate buffer for loading %s.", path); return (rt_loaded_asset){.buffer = NULL, .size = 0}; } rt_aio_state load_state = rtSubmitSingleLoadSync((rt_file_load){ .file = file, .offset = 0, .num_bytes = file_size, .dest = buffer, }); if (load_state != RT_AIO_STATE_FINISHED) { rtReleaseBuffer(buffer, file_size); rtLog("AC", "Failed to load %s.", path); return (rt_loaded_asset){.buffer = NULL, .size = 0}; } return (rt_loaded_asset){.buffer = buffer, .size = file_size}; }