#include "jobs.h" #include "threading.h" #define MAX_WORKERS 32 #define JOB_QUEUE_SIZE 2048 /* A "chunk" of iterations for a particular job */ typedef struct { uint32_t base_iteration; uint32_t iteration_count; rt_job_fn *fn; void *param; } rt_job_chunk; typedef struct { /* Queue */ struct { rt_job_chunk *chunks; unsigned int head; unsigned int tail; rt_condition_var *lock; } job_queue; /* Thread data */ rt_thread *thread; } rt_worker_data; static volatile bool _keep_running = true; static rt_worker_data _worker_data[MAX_WORKERS]; static void ExecOneJobIfAvailable(rt_worker_data *wd) { if (wd->job_queue.head == wd->job_queue.tail) { /* No job available. * TODO: Pick one job queue at random and check if we can steal? */ return; } rt_job_chunk chunk = wd->job_queue.chunks[wd->job_queue.head]; wd->job_queue.head = (wd->job_queue.head + 1) % JOB_QUEUE_SIZE; for (uint32_t i = 0; i < chunk.iteration_count; ++i) { chunk.fn(chunk.param, chunk.base_iteration + i); } } static void WorkerEntry(void *param) { rt_worker_data *wd = param; while (_keep_running) { rtLockConditionVar(wd->job_queue.lock); while (wd->job_queue.head == wd->job_queue.tail && _keep_running) { rtWaitOnConditionVar(wd->job_queue.lock); } ExecOneJobIfAvailable(wd); rtUnlockConditionVar(wd->job_queue.lock, false); } } void rtInitJobSystem(unsigned int worker_count) { if (worker_count > MAX_WORKERS) worker_count = MAX_WORKERS; for (unsigned int i = 0; i < worker_count; ++i) { _worker_data[i].thread = rtSpawnThread(WorkerEntry, &_worker_data[i], "JobWorker"); } }