#include "processing.h" #include "utils.h" #include "description_parser.h" #include "packages.h" #include "assetmeta.h" #include "runtime/aio.h" #include "runtime/buffer_manager.h" #include "runtime/file_tab.h" #include "runtime/threading.h" #include #include #include #include typedef struct { vy_file_id fid; uint32_t flags; /* How many times has this file been added? */ unsigned int turn; } vy_file_processing_queue_entry; #define QUEUE_LENGTH 1024 typedef struct { vy_file_processing_queue_entry entries[QUEUE_LENGTH]; unsigned int head; unsigned int tail; } vy_file_processing_queue; static vy_file_processing_queue _queues[2]; static vy_file_processing_queue *_processing_queue; static vy_file_processing_queue *_retry_queue; static vy_mutex *_guard; static bool _keep_running; /* A single file could have a lot of dependencies. */ #define MAX_TURNS 100 #define MAX_PROCESSING_THREADS 16 #define FORCE_SINGLE_THREAD 1 static unsigned int _num_processing_threads = 0; static vy_thread *_processing_threads[MAX_PROCESSING_THREADS]; static unsigned int _processing_thread_count = 0; typedef struct { unsigned int package; } vy_asset_settings; static vy_result ParseAssetSettings(const char *text, size_t length, const char *file_path, vy_asset_settings *settings) { unsigned int root_list; vy_parse_state state; vy_result res = vyParseDescription(text, length, file_path, &root_list, &state); if (res != VY_SUCCESS) { vyReportError("ASSETC", "Failed to parse asset settings: %s", file_path); return res; } settings->package = 0; const vy_parsed_stmt *package_stmt = vyFindStatement(&state, root_list, "package"); if (package_stmt) { if (package_stmt->form != VY_STMT_FORM_VALUE) { vyReportError("ASSETC", "Expected a package name as the value of 'package' in %s.", file_path); res = VY_UNKNOWN_ERROR; goto out; } settings->package = vyAddPackageFile(package_stmt->value); } out: vyReleaseParseState(&state); return res; } static vy_result vyAddFileToProcessingQueueImpl(vy_file_processing_queue *queue, vy_file_id file, uint32_t flags, unsigned int turn) { vy_result result = VY_SUCCESS; vyLog("ASSETC", "Adding %s to processing queue.", vyGetFilePath(file)); vy_file_processing_queue_entry entry = { .fid = file, .flags = flags, .turn = turn, }; if ((queue->head + 1) % QUEUE_LENGTH != queue->tail) { unsigned int slot = queue->head; queue->entries[slot] = entry; queue->head = (queue->head + 1) % QUEUE_LENGTH; } else { vyReportError("ASSETC", "The processing queue is full!"); result = 1; } return result; } vy_result vyAddFileToProcessingQueue(vy_file_id file, uint32_t flags) { assert(_guard != NULL); vyLockMutex(_guard); vy_result res = vyAddFileToProcessingQueueImpl(_processing_queue, file, flags, 1); vyUnlockMutex(_guard); return res; } #define MAX_PROCESSORS 256 static vy_processor_fn *_processor_fns[MAX_PROCESSORS]; static const char *_processor_exts[MAX_PROCESSORS]; static unsigned int _processor_count; vy_result vyAddAssetProcessor(const char *file_extension, vy_processor_fn fn) { /* Should only be called from main thread */ if (_processor_count == MAX_PROCESSORS) { vyReportError("ASSETC", "Too many asset processor functions!"); return 1; } _processor_fns[_processor_count] = fn; _processor_exts[_processor_count] = file_extension; ++_processor_count; return VY_SUCCESS; } static void PopAndSwapSubmittedData(unsigned int at, unsigned int *count, vy_file_processing_queue_entry *queue_entries, vy_aio_handle *handles, void **buffers, size_t *sizes) { if (at < *count - 1) { queue_entries[at] = queue_entries[*count - 1]; buffers[at] = buffers[*count - 1]; handles[at] = handles[*count - 1]; sizes[at] = sizes[*count - 1]; } *count = *count - 1; } static vy_result ProcessLoadedFile(vy_file_processing_queue_entry entry, void *buffer, size_t size) { /* Search for a matching processor function */ const char *path = vyGetFilePath(entry.fid); size_t path_len = strlen(path); for (unsigned int i = 0; i < _processor_count; ++i) { size_t ext_len = strlen(_processor_exts[i]); if (ext_len > path_len) continue; const char *path_end = &path[path_len - ext_len]; if (memcmp(path_end, _processor_exts[i], ext_len) == 0) { /* Load the corresponding .as file. * TODO: Using malloc here is probably relatively slow. */ vy_asset_settings settings; { char *as_path = malloc(path_len + 3); if (!as_path) { return VY_UNKNOWN_ERROR; } memcpy(as_path, path, path_len); strcpy(&as_path[path_len - ext_len], ".as"); size_t as_size = vyGetFileSize(as_path); if (as_size == 0) { vyReportError("ASSETC", "Failed to retrieve size of setting file %s", as_path); free(as_path); return VY_UNKNOWN_ERROR; } void *as_buffer = vyAllocBuffer(as_size); vy_load_batch as_load; as_load.loads[0].file = vyAddFile(as_path); as_load.loads[0].num_bytes = as_size; as_load.loads[0].dest = as_buffer; if (!as_load.loads[0].dest) { vyReportError("ASSETC", "Failed to allocate buffer for setting file %s", as_path); free(as_path); return VY_UNKNOWN_ERROR; } as_load.loads[0].offset = 0; as_load.num_loads = 1; vy_aio_handle as_handle; if (vySubmitLoadBatch(&as_load, &as_handle) != VY_SUCCESS) { vyReportError("ASSETC", "Failed to submit load of setting file %s", as_path); free(as_path); return VY_UNKNOWN_ERROR; } if (vyWaitForAIOCompletion(as_handle) != VY_AIO_STATE_FINISHED) { vyReportError("ASSETC", "Failed to load setting file %s", as_path); free(as_path); return VY_UNKNOWN_ERROR; } vyReleaseAIO(as_handle); if (ParseAssetSettings(as_buffer, as_size, as_path, &settings) != VY_SUCCESS) { free(as_path); return VY_UNKNOWN_ERROR; } free(as_path); } /* Process the asset */ vy_processor_output out; vy_result res = _processor_fns[i](entry.fid, buffer, size, entry.flags, &out); if (res == VY_SUCCESS) { /* Add the output to the appropriate package file */ vy_assetmeta meta; meta.compiled_ts = vyGetCurrentTimestamp(); meta.last_changed = vyGetFileModificationTimestamp(entry.fid); vyAddUIDMapping(entry.fid, out.asset_uid, &meta); vyAddAssetToPackage(settings.package, out.asset_uid, out.data, out.size); } else if (res == VY_PROCESSING_TRY_AGAIN) { if (entry.turn < MAX_TURNS) { vyLockMutex(_guard); vyAddFileToProcessingQueueImpl(_retry_queue, entry.fid, entry.flags, entry.turn + 1); vyUnlockMutex(_guard); } else { vyLog("ASSETC", "File '%s' took too many turns to process: %u", path, entry.turn); } } else { vyLog("ASSETC", "Failed to process file: %s (Result %u)", path, res); } return res; } } vyLog("ASSETC", "No asset processor for file: %s", path); return VY_UNKNOWN_ERROR; } static void ProcessingThread(void *_param) { VY_UNUSED(_param); vy_file_processing_queue_entry submitted_entries[VY_LOAD_BATCH_MAX_SIZE]; vy_aio_handle submitted_handles[VY_LOAD_BATCH_MAX_SIZE]; void *submitted_buffers[VY_LOAD_BATCH_MAX_SIZE]; size_t submitted_sizes[VY_LOAD_BATCH_MAX_SIZE]; unsigned int submitted_outstanding = 0; while (_keep_running) { vy_load_batch load_batch; vy_file_processing_queue_entry load_entries[VY_LOAD_BATCH_MAX_SIZE]; void *load_buffers[VY_LOAD_BATCH_MAX_SIZE]; size_t load_sizes[VY_LOAD_BATCH_MAX_SIZE]; load_batch.num_loads = 0; bool got_entry = false; do { got_entry = false; vy_file_processing_queue_entry entry = {0}; vyLockMutex(_guard); if (_processing_queue->head != _processing_queue->tail) { entry = _processing_queue->entries[_processing_queue->tail]; _processing_queue->tail = (_processing_queue->tail + 1) % QUEUE_LENGTH; got_entry = true; } else if (load_batch.num_loads == 0) { /* Switch the queues -> Retry all the entries that returned VY_PROCESSING_TRY_AGAIN */ if (_retry_queue->head != _retry_queue->tail) { vy_file_processing_queue *tmp = _retry_queue; _retry_queue = _processing_queue; _processing_queue = tmp; } } vyUnlockMutex(_guard); /* Retry, if we did not get an entry */ if (!got_entry) continue; const char *path = vyGetFilePath(entry.fid); if (!path) { vyLog("ASSETC", "Invalid file id: %#x", entry.fid); continue; } vyLog("ASSETC", "Processing %s", path); size_t fsz = vyGetFileSize(path); void *dest = vyAllocBuffer(fsz); if (!dest) { vyLog("ASSETC", "Ran out of memory for loading the file: %s", path); continue; } memset(dest, 0, fsz); load_sizes[load_batch.num_loads] = fsz; load_buffers[load_batch.num_loads] = dest; load_batch.loads[load_batch.num_loads].file = entry.fid; load_batch.loads[load_batch.num_loads].dest = dest; load_batch.loads[load_batch.num_loads].num_bytes = fsz; load_batch.loads[load_batch.num_loads].offset = 0; load_entries[load_batch.num_loads] = entry; ++load_batch.num_loads; } while (got_entry && load_batch.num_loads < VY_LOAD_BATCH_MAX_SIZE); vy_aio_handle load_handles[VY_LOAD_BATCH_MAX_SIZE]; if (load_batch.num_loads > 0) { vy_result submit_result = vySubmitLoadBatch(&load_batch, load_handles); if (submit_result != VY_SUCCESS) { vyLog("ASSETC", "SubmitLoadBatch failed: %u", submit_result); continue; } } /* Process the previously submitted loads */ while (submitted_outstanding > 0) { vyLockMutex(_guard); _processing_thread_count += 1; vyUnlockMutex(_guard); for (unsigned int i = 0; i < submitted_outstanding; ++i) { vy_aio_state state = vyGetAIOState(submitted_handles[i]); switch (state) { case VY_AIO_STATE_PENDING: continue; case VY_AIO_STATE_FAILED: vyLog("ASSETC", "Loading file %s failed.", vyGetFilePath(submitted_entries[i].fid)); vyReleaseAIO(submitted_handles[i]); vyReleaseBuffer(submitted_buffers[i], submitted_sizes[i]); PopAndSwapSubmittedData(i, &submitted_outstanding, submitted_entries, submitted_handles, submitted_buffers, submitted_sizes); --i; break; case VY_AIO_STATE_INVALID: vyLog("ASSETC", "Got invalid AIO handle for file: %s", vyGetFilePath(submitted_entries[i].fid)); vyReleaseBuffer(submitted_buffers[i], submitted_sizes[i]); PopAndSwapSubmittedData(i, &submitted_outstanding, submitted_entries, submitted_handles, submitted_buffers, submitted_sizes); --i; break; case VY_AIO_STATE_FINISHED: ProcessLoadedFile(submitted_entries[i], submitted_buffers[i], submitted_sizes[i]); vyReleaseAIO(submitted_handles[i]); vyReleaseBuffer(submitted_buffers[i], submitted_sizes[i]); PopAndSwapSubmittedData(i, &submitted_outstanding, submitted_entries, submitted_handles, submitted_buffers, submitted_sizes); --i; } } vyLockMutex(_guard); _processing_thread_count -= 1; vyUnlockMutex(_guard); } /* Start new round */ assert(sizeof(submitted_entries) == sizeof(load_entries)); assert(sizeof(submitted_handles) == sizeof(load_handles)); assert(sizeof(submitted_buffers) == sizeof(load_buffers)); assert(sizeof(submitted_sizes) == sizeof(load_sizes)); memcpy(submitted_entries, load_entries, sizeof(submitted_entries)); memcpy(submitted_handles, load_handles, sizeof(submitted_handles)); memcpy(submitted_buffers, load_buffers, sizeof(submitted_buffers)); memcpy(submitted_sizes, load_sizes, sizeof(submitted_sizes)); submitted_outstanding = load_batch.num_loads; } } vy_result vyStartProcessing(void) { if (!_guard) _guard = vyCreateMutex(); #if !FORCE_SINGLE_THREAD _num_processing_threads = vyGetCPUCoreCount(); if (_num_processing_threads > MAX_PROCESSING_THREADS) _num_processing_threads = MAX_PROCESSING_THREADS; #else _num_processing_threads = 1; #endif _processing_queue = &_queues[0]; _retry_queue = &_queues[1]; _keep_running = true; for (unsigned int i = 0; i < _num_processing_threads; ++i) { _processing_threads[i] = vySpawnThread(ProcessingThread, NULL, "Processor"); if (!_processing_threads[i]) { vyReportError("ASSETC", "Failed to spawn processing thread %u!", i); _keep_running = false; return 1; } } return VY_SUCCESS; } void vyStopProcessing(void) { _keep_running = false; for (unsigned int i = 0; i < _num_processing_threads; ++i) vyJoinThread(_processing_threads[i]); } #ifdef _WIN32 #define WIN32_LEAN_AND_MEAN #include #else #include #endif void vyWaitUntilProcessingIsFinished(void) { unsigned int done_counter = 0; while (done_counter < 3) { #ifdef _WIN32 Sleep(1000); #else sleep(1); #endif vyLockMutex(_guard); volatile bool done = _processing_queue->head == _processing_queue->tail && _retry_queue->head == _retry_queue->tail && _processing_thread_count == 0; vyUnlockMutex(_guard); if (done) ++done_counter; else done_counter = 0; } }