rtengine/src/asset_compiler/asset_compiler.c
2024-04-18 17:06:11 +02:00

438 lines
16 KiB
C

#include "asset_compiler.h"
#include "processor.h"
#include "runtime/aio.h"
#include "runtime/buffer_manager.h"
#include "runtime/config.h"
#include "runtime/file_tab.h"
#include "runtime/fsutils.h"
#include "runtime/mem_arena.h"
#include "runtime/resources.h"
#include "runtime/runtime.h"
#include "runtime/threading.h"
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
typedef struct {
uint64_t last_processed;
rt_resource_id resources[RT_MAX_RESOURCES_PER_ASSET];
unsigned int resource_count;
bool in_processing;
} rt_asset_data;
typedef struct {
rt_file_id *files;
rt_asset_data *data;
rt_rwlock lock;
} rt_asset_db;
typedef struct {
const char *file_ext;
rt_asset_processor_fn *proc;
} rt_asset_processor;
typedef struct {
unsigned int processor_index;
unsigned int db_index;
rt_file_id fid;
} rt_processing_queue_entry;
typedef struct {
rt_processing_queue_entry entries[1024];
unsigned int head;
unsigned int tail;
rt_condition_var *lock;
} rt_processing_queue;
RT_CVAR_S(rt_AssetDirectory, "Name of the asset directory. Default: assets", "assets");
RT_CVAR_I(rt_AssetDBSize, "Size of the asset database. Default: 1024", 1024);
RT_CVAR_I(rt_AssetProcessingThreads, "Number of asset processing threads. Default: 4", 4);
RT_CVAR_I(rt_AssetProcessorArenaSize,
"Size of the per-thread asset processor arena. Default: 128 MiB",
(int)RT_MB(128));
#define MAX_PROCESSING_THREADS 8
static rt_thread *_compiler_thread;
static rt_thread *_processing_threads[MAX_PROCESSING_THREADS];
static bool _keep_running = true;
static rt_asset_db _asset_db;
static rt_processing_queue _processing_queue;
extern RT_ASSET_PROCESSOR_FN(EffectProcessor);
/* extern RT_ASSET_PROCESSOR_FN(FramegraphProcessor);*/
static rt_asset_processor _processors[] = {
{.file_ext = ".effect", .proc = EffectProcessor},
/*
{.file_ext = ".framegraph", .proc = FramegraphProcessor},*/
};
static void ProcessorThreadEntry(void *);
static void CompilerThreadEntry(void *);
RT_DLLEXPORT void rtRegisterAssetCompilerCVars(void) {
rtRegisterCVAR(&rt_AssetDirectory);
rtRegisterCVAR(&rt_AssetDBSize);
rtRegisterCVAR(&rt_AssetProcessingThreads);
rtRegisterCVAR(&rt_AssetProcessorArenaSize);
}
RT_DLLEXPORT rt_result rtInitAssetCompiler(void) {
unsigned int db_size = (unsigned int)rt_AssetDBSize.i;
void *mem = malloc((sizeof(rt_file_id) + sizeof(rt_asset_data)) * db_size);
if (!mem)
return RT_OUT_OF_MEMORY;
_asset_db.files = mem;
_asset_db.data = (rt_asset_data *)(_asset_db.files + db_size);
memset(mem, 0, (sizeof(rt_file_id) + sizeof(rt_asset_data)) * db_size);
rt_create_rwlock_result lock_create = rtCreateRWLock();
if (!lock_create.ok) {
free(mem);
return RT_UNKNOWN_ERROR;
}
_asset_db.lock = lock_create.lock;
_processing_queue.lock = rtCreateConditionVar();
if (!_processing_queue.lock) {
free(mem);
return RT_UNKNOWN_ERROR;
}
_keep_running = true;
_compiler_thread = rtSpawnThread(CompilerThreadEntry, NULL, "AssetCompilerThread");
if (!_compiler_thread)
return RT_UNKNOWN_ERROR;
if (rt_AssetProcessingThreads.i > MAX_PROCESSING_THREADS)
rt_AssetProcessingThreads.i = MAX_PROCESSING_THREADS;
for (int i = 0; i < rt_AssetProcessingThreads.i; ++i) {
char name[64];
rtSPrint(name, 64, "AssetProcessorThread %d", i);
_processing_threads[i] = rtSpawnThread(ProcessorThreadEntry, NULL, name);
if (!_processing_threads[i]) {
/* Wake the processing threads */
rtLockConditionVar(_processing_queue.lock);
rtUnlockConditionVar(_processing_queue.lock, true);
_keep_running = false;
for (int j = 0; j < i; ++j)
rtJoinThread(_processing_threads[j]);
rtJoinThread(_compiler_thread);
free(mem);
rtDestroyConditionVar(_processing_queue.lock);
return RT_UNKNOWN_ERROR;
}
}
return RT_SUCCESS;
}
RT_DLLEXPORT void rtShutdownAssetCompiler(void) {
_keep_running = false;
/* Wake the threads */
rtLockConditionVar(_processing_queue.lock);
rtUnlockConditionVar(_processing_queue.lock, true);
rtJoinThread(_compiler_thread);
for (int i = 0; i < rt_AssetProcessingThreads.i; ++i)
rtJoinThread(_processing_threads[i]);
free(_asset_db.files);
rtDestroyConditionVar(_processing_queue.lock);
rtDestroyRWLock(&_asset_db.lock);
}
static int DiscoverAssets(void) {
/* Recursive descend into the asset directory */
#define MAX_DISCOVERY_DEPTH 64
#define MAX_FILENAME_LEN 260
static char directory_stack[MAX_DISCOVERY_DEPTH][MAX_FILENAME_LEN];
unsigned int top = 0;
memcpy(directory_stack[0],
rt_AssetDirectory.s,
(strlen(rt_AssetDirectory.s) <= (MAX_FILENAME_LEN - 1)) ? strlen(rt_AssetDirectory.s)
: MAX_FILENAME_LEN);
directory_stack[0][MAX_FILENAME_LEN - 1] = '\0';
++top;
int discovery_count = 0;
while (top > 0) {
/* Process the directory */
char dir[MAX_FILENAME_LEN];
memcpy(dir, directory_stack[top - 1], MAX_FILENAME_LEN);
--top;
/* Append to the path */
size_t dir_len = strlen(dir);
rt_scandir_handle *scan = rtScanDirectory(dir);
if (!scan)
continue;
rt_dirent entry;
do {
entry = rtNextDirectoryEntry(scan);
if (entry.name[0] == '.')
continue;
size_t entry_name_len = strlen(entry.name);
if (dir_len + entry_name_len + 1 >= MAX_FILENAME_LEN) {
rtLog("AC",
"Could not process %s\\%s because the name exceeds the maximum "
"allowed path length %d.",
dir,
entry.name,
MAX_FILENAME_LEN);
continue;
}
if (entry.type == RT_DIRENT_TYPE_DIRECTORY) {
/* Push on stack */
assert(MAX_FILENAME_LEN == RT_ARRAY_COUNT(entry.name));
memcpy(directory_stack[top], dir, MAX_FILENAME_LEN);
directory_stack[top][dir_len] = '/';
memcpy(&directory_stack[top][dir_len + 1], entry.name, entry_name_len);
directory_stack[top][dir_len + entry_name_len + 1] = '\0';
++top;
} else {
/* Check if the asset is new */
char file[MAX_FILENAME_LEN];
memcpy(file, dir, MAX_FILENAME_LEN);
file[dir_len] = '/';
memcpy(&file[dir_len + 1], entry.name, entry_name_len);
file[dir_len + entry_name_len + 1] = '\0';
rt_file_id fid = rtAddFile(file);
unsigned int i = 0;
rtLockWrite(&_asset_db.lock);
while (i < (unsigned int)rt_AssetDBSize.i) {
unsigned int slot = ((unsigned int)fid + i) % (unsigned int)rt_AssetDBSize.i;
if (_asset_db.files[slot] == fid) {
break;
} else if (_asset_db.files[slot] == 0) {
_asset_db.files[slot] = fid;
_asset_db.data[slot].last_processed = 0;
memset(&_asset_db.data[slot].resources,
0,
sizeof(_asset_db.data[slot].resources));
_asset_db.data[slot].resource_count = 0;
_asset_db.data[slot].in_processing = false;
++discovery_count;
break;
}
++i;
}
rtUnlockWrite(&_asset_db.lock);
if (i == (unsigned int)rt_AssetDBSize.i) {
rtLog("AC",
"Failed to add %s to AssetDB, because no free slots are left.",
file);
}
}
} while (!entry.is_last);
rtCloseDirectory(scan);
}
return discovery_count;
#undef MAX_DISCOVERY_DEPTH
#undef MAX_FILENAME_LEN
}
static int CheckUpdatedAssets(void) {
int updated_count = 0;
for (int i = 0; i < rt_AssetDBSize.i; ++i) {
rtLockRead(&_asset_db.lock);
if (_asset_db.files[i] == 0) {
rtUnlockRead(&_asset_db.lock);
continue;
}
const char *path = rtGetFilePath(_asset_db.files[i]);
uint64_t last_changed = rtGetFileModificationTimestamp(path);
if (!_asset_db.data[i].in_processing && _asset_db.data[i].last_processed < last_changed) {
/* Check that we have not already added this file */
rtLockConditionVar(_processing_queue.lock);
bool already_in_queue = false;
for (size_t entry_idx = _processing_queue.head; entry_idx != _processing_queue.tail;
entry_idx = (entry_idx + 1) % RT_ARRAY_COUNT(_processing_queue.entries)) {
if (_processing_queue.entries[entry_idx].fid == _asset_db.files[i]) {
already_in_queue = true;
break;
}
}
rtUnlockConditionVar(_processing_queue.lock, false);
if (already_in_queue) {
rtUnlockRead(&_asset_db.lock);
continue;
}
const char *ext = path + strlen(path);
while (*ext != '.' && ext != path)
--ext;
if (*ext != '.')
break;
bool found_processor = false;
for (unsigned int j = 0; j < RT_ARRAY_COUNT(_processors); ++j) {
if (strcmp(ext, _processors[j].file_ext) == 0) {
rt_processing_queue_entry entry;
entry.fid = _asset_db.files[i];
entry.processor_index = j;
entry.db_index = i;
found_processor = true;
while (true) {
bool inserted = false;
rtLockConditionVar(_processing_queue.lock);
unsigned int next_tail = (_processing_queue.tail + 1) %
RT_ARRAY_COUNT(_processing_queue.entries);
if (next_tail != _processing_queue.head) {
_processing_queue.entries[_processing_queue.tail] = entry;
_processing_queue.tail = next_tail;
inserted = true;
}
rtUnlockConditionVar(_processing_queue.lock, inserted);
if (inserted)
break;
}
}
}
if (found_processor)
++updated_count;
}
rtUnlockRead(&_asset_db.lock);
}
return updated_count;
}
static void CompilerThreadEntry(void *param) {
RT_UNUSED(param);
/* Mainloop. Watch for changed or new assets, compile them
* and notify the appropriate system */
while (_keep_running) {
int d = DiscoverAssets();
int u = CheckUpdatedAssets();
if (d == 0 && u == 0)
rtSleep(100);
}
}
static void ProcessorThreadEntry(void *param) {
RT_UNUSED(param);
rt_create_arena_result arena_res = rtCreateArena(NULL, (size_t)rt_AssetProcessorArenaSize.i);
if (!arena_res.ok) {
rtLog("AC",
"Failed to allocate %d bytes for the processing arena.",
rt_AssetProcessorArenaSize.i);
return;
}
rt_arena arena = arena_res.arena;
while (_keep_running) {
rtLockConditionVar(_processing_queue.lock);
while (_keep_running && (_processing_queue.tail == _processing_queue.head))
rtWaitOnConditionVar(_processing_queue.lock);
bool got_entry = false;
rt_processing_queue_entry entry = {0};
if (_processing_queue.tail != _processing_queue.head) {
entry = _processing_queue.entries[_processing_queue.head];
_processing_queue.head =
(_processing_queue.head + 1) % RT_ARRAY_COUNT(_processing_queue.entries);
got_entry = true;
}
rtUnlockConditionVar(_processing_queue.lock, false);
if (!got_entry)
continue;
rtLockWrite(&_asset_db.lock);
_asset_db.data[entry.db_index].in_processing = true;
rtUnlockWrite(&_asset_db.lock);
const char *path = rtGetFilePath(entry.fid);
rtLog("AC", "Processing %s", path);
rtArenaClear(&arena);
rt_resource_id existing_resources[RT_MAX_RESOURCES_PER_ASSET];
unsigned int existing_resource_count;
rtLockRead(&_asset_db.lock);
memcpy(existing_resources,
_asset_db.data[entry.db_index].resources,
sizeof(existing_resources));
existing_resource_count = _asset_db.data[entry.db_index].resource_count;
rtUnlockRead(&_asset_db.lock);
rt_resource_id new_resources[RT_MAX_RESOURCES_PER_ASSET];
memset(&new_resources, 0, sizeof(new_resources));
unsigned int new_resource_count = 0;
rt_result res = _processors[entry.processor_index].proc(entry.fid,
existing_resource_count,
existing_resources,
&new_resource_count,
new_resources,
&arena);
if (res != RT_SUCCESS) {
rtLog("AC", "Failed to process %s: %u", path, res);
}
rtLockWrite(&_asset_db.lock);
_asset_db.data[entry.db_index].last_processed = rtGetCurrentTimestamp();
_asset_db.data[entry.db_index].in_processing = false;
memcpy(_asset_db.data[entry.db_index].resources, new_resources, sizeof(new_resources));
_asset_db.data[entry.db_index].resource_count = new_resource_count;
rtUnlockWrite(&_asset_db.lock);
}
}
RT_DLLEXPORT void rtWaitForAssetProcessing(void) {
int turns = 10;
while (turns > 0) {
int in_processing_count = 0;
for (int i = 0; i < rt_AssetDBSize.i; ++i) {
rtLockRead(&_asset_db.lock);
if (_asset_db.files[i] != RT_INVALID_FILE_ID && _asset_db.data[i].in_processing)
++in_processing_count;
rtUnlockRead(&_asset_db.lock);
}
if (!in_processing_count) {
--turns;
rtSleep(250);
}
}
}
/* Utilities for asset processors*/
rt_loaded_asset LoadAsset(rt_file_id file) {
const char *path = rtGetFilePath(file);
size_t file_size = rtGetFileSize(path);
void *buffer = rtAllocBuffer(file_size);
if (!buffer) {
rtLog("AC", "Failed to allocate buffer for loading %s.", path);
return (rt_loaded_asset){.buffer = NULL, .size = 0};
}
rtLog("AC", "Allocated %p for %s", buffer, path);
rt_aio_state load_state = rtSubmitSingleLoadSync((rt_file_load){
.file = file,
.offset = 0,
.num_bytes = file_size,
.dest = buffer,
});
if (load_state != RT_AIO_STATE_FINISHED) {
rtReleaseBuffer(buffer, file_size);
rtLog("AC", "Failed to load %s.", path);
return (rt_loaded_asset){.buffer = NULL, .size = 0};
}
return (rt_loaded_asset){.buffer = buffer, .size = file_size};
}