From 8926ae0f5c3ea76793935fa77b4ccfe2052ba8c6 Mon Sep 17 00:00:00 2001 From: Kevin Trogant Date: Tue, 22 Apr 2025 10:21:22 +0200 Subject: [PATCH] First draft of the task system --- main.odin | 25 ++++++ task.odin | 248 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 273 insertions(+) create mode 100644 main.odin create mode 100644 task.odin diff --git a/main.odin b/main.odin new file mode 100644 index 0000000..1766019 --- /dev/null +++ b/main.odin @@ -0,0 +1,25 @@ +package rune + +import "core:fmt" + +test_task :: proc(subtask: int, user_data: rawptr) { + fmt.printfln("test task %v", subtask) +} + +test_finished :: proc(user_data: rawptr) { + fmt.println("test finished!") +} + +main :: proc() { + init_scheduler(100, 1000) + add_worker({.General, .Streaming, .Physics, .Rendering}) + add_worker({.General, .Streaming, .Physics, .Rendering}) + add_worker({.General, .Streaming, .Physics, .Rendering}) + add_worker({.General, .Streaming, .Physics, .Rendering}) + + queue_task(test_task, test_finished, nil, 10) + + for true { + + } +} diff --git a/task.odin b/task.odin new file mode 100644 index 0000000..be84596 --- /dev/null +++ b/task.odin @@ -0,0 +1,248 @@ +package rune + +import "core:container/queue" +import "core:mem" +import "core:sync" +import "core:thread" + +Task_Error :: enum { + None, + Too_Many_Tasks, + Allocation_Failed, +} + +Task_Type :: enum { + General, + Streaming, + Rendering, + Physics, + Audio, +} + +// Proc called to execute a subtask. +Task_Proc :: proc(subtask_idx: int, user_data: rawptr) + +// Called when a task finished executing. +// +// The idea is that this is the place where subsequent tasks are enqueued. +// This way, the user can control dependencies, but the scheduler does not +// need to worry about un-runnable tasks, simplifying it (and hopefully improving performance) +Task_Finished_Proc :: proc(user_data: rawptr) + +@(private = "file") +Task :: struct { + entry: Task_Proc, + finished: Task_Finished_Proc, + user_data: rawptr, + remaining_subtasks: int, + type: Task_Type, + next_free: int, +} + +@(private = "file") +Sub_Task :: struct { + task: int, // index into task list + idx: int, // this subtasks index + next_free: int, +} + +@(private = "file") +Worker :: struct { + run: bool, + task_types: bit_set[Task_Type], + thread: ^thread.Thread, +} + +@(private = "file") +Scheduler :: struct { + // Order of subtasks to execute. Highest priority is at index 0 + schedule_storage: []int, + schedule: queue.Queue(int), + + // List of tasks. We don't move these to avoid expensive + // copies when the schedule changes. + tasks: []Task, + subtasks: []Sub_Task, + first_free_task: int, + first_free_subtask: int, + free_subtasks: int, + + // Used when adding or removing tasks from the list, + // otherwise tasks are read-only (except for the remaining subtask counter, which is atomic) + task_list_mutex: sync.Mutex, + subtask_list_mutex: sync.Mutex, + schedule_mutex: sync.Mutex, + + // List of workers. We allow the user to dynamically add or remove workers. + workers: [dynamic]^Worker, + workers_mutex: sync.Mutex, + allocator: mem.Allocator, +} + +@(private = "file") +_scheduler: Scheduler + +init_scheduler :: proc( + max_tasks: int, + max_subtasks_per_task: int, + allocator := context.allocator, +) { + _scheduler.schedule_storage = make([]int, max_tasks * max_subtasks_per_task, allocator) + queue.init_from_slice(&_scheduler.schedule, _scheduler.schedule_storage) + _scheduler.tasks = make([]Task, max_tasks, allocator) + _scheduler.subtasks = make([]Sub_Task, max_tasks * max_subtasks_per_task, allocator) + _scheduler.first_free_task = 0 + for i := 0; i < max_tasks; i += 1 { + _scheduler.tasks[i].next_free = i + 1 + } + _scheduler.first_free_subtask = 0 + for i := 0; i < max_subtasks_per_task * max_tasks; i += 1 { + _scheduler.subtasks[i].next_free = i + 1 + } + _scheduler.free_subtasks = max_subtasks_per_task * max_tasks + _scheduler.workers = make([dynamic]^Worker, allocator) + _scheduler.allocator = allocator +} + +add_worker :: proc(task_types: bit_set[Task_Type]) -> (int, Task_Error) { + sync.lock(&_scheduler.workers_mutex) + defer sync.unlock(&_scheduler.workers_mutex) + if d := new(Worker, _scheduler.allocator); d != nil { + d.task_types = task_types + d.run = true + if d.thread = thread.create(worker_proc); d == nil { + free(d) + return -1, .Allocation_Failed + } + d.thread.data = d + _, err := append(&_scheduler.workers, d) + if err != nil { + thread.destroy(d.thread) + free(d) + return -1, .Allocation_Failed + } + thread.start(d.thread) + return len(_scheduler.workers) - 1, .None + } + return -1, .Allocation_Failed +} + +remove_worker :: proc(idx: int) { + sync.lock(&_scheduler.workers_mutex) + defer sync.unlock(&_scheduler.workers_mutex) + if idx >= len(_scheduler.workers) { + return + } + _scheduler.workers[idx].run = false + thread.destroy(_scheduler.workers[idx].thread) + free(_scheduler.workers[idx]) + unordered_remove(&_scheduler.workers, idx) +} + +queue_task :: proc( + entry: Task_Proc, + finished: Task_Finished_Proc, + user_data: rawptr, + subtask_count: int, + type := Task_Type.General, +) -> Task_Error { + // Use the first free slot to store the task + sync.lock(&_scheduler.task_list_mutex) + slot := _scheduler.first_free_task + if slot >= len(_scheduler.tasks) { + sync.unlock(&_scheduler.task_list_mutex) + return .Too_Many_Tasks + } + + task := &_scheduler.tasks[slot] + _scheduler.first_free_task = task.next_free + task.entry = entry + task.finished = finished + task.user_data = user_data + task.next_free = -1 + task.remaining_subtasks = subtask_count + task.type = type + sync.unlock(&_scheduler.task_list_mutex) + + // Create the subtasks + sync.lock(&_scheduler.subtask_list_mutex) + if _scheduler.free_subtasks < subtask_count { + sync.unlock(&_scheduler.subtask_list_mutex) + + sync.lock(&_scheduler.task_list_mutex) + // Nevermind, release the task again + task.next_free = _scheduler.first_free_task + _scheduler.first_free_task = slot + sync.unlock(&_scheduler.task_list_mutex) + return .Too_Many_Tasks + } + _scheduler.free_subtasks -= subtask_count + for i := 0; i < subtask_count; i += 1 { + subtask_slot := _scheduler.first_free_subtask + assert(subtask_slot < len(_scheduler.subtasks)) + subtask := &_scheduler.subtasks[subtask_slot] + _scheduler.first_free_subtask = subtask.next_free + subtask.next_free = -1 + subtask.task = slot + subtask.idx = i + + sync.lock(&_scheduler.schedule_mutex) + // Add to schedule. This is FIFO. We could be more clever (for example use shortest time to finish) + queue.push_back(&_scheduler.schedule, subtask_slot) + sync.unlock(&_scheduler.schedule_mutex) + } + sync.unlock(&_scheduler.subtask_list_mutex) + + + return .None +} + +@(private = "file") +worker_proc :: proc(t: ^thread.Thread) { + worker := (^Worker)(t.data) + task_types := worker.task_types + for worker.run { + sync.lock(&_scheduler.schedule_mutex) + subtask_idx := -1 + if queue.len(_scheduler.schedule) > 0 { + subtask_idx = queue.pop_front(&_scheduler.schedule) + } + sync.unlock(&_scheduler.schedule_mutex) + if subtask_idx == -1 { + // NO tasks available + thread.yield() + continue + } + + subtask := &_scheduler.subtasks[subtask_idx] + taskidx := subtask.task + task := &_scheduler.tasks[taskidx] + + if task.type in task_types { + task.entry(subtask.idx, task.user_data) + + sync.lock(&_scheduler.subtask_list_mutex) + subtask.next_free = _scheduler.first_free_subtask + _scheduler.first_free_subtask = subtask_idx + _scheduler.free_subtasks += 1 + sync.unlock(&_scheduler.subtask_list_mutex) + + prev_cnt := sync.atomic_sub(&task.remaining_subtasks, 1) + if prev_cnt == 1 { + // Finished the task, + task.finished(task.user_data) + + sync.lock(&_scheduler.task_list_mutex) + task.next_free = _scheduler.first_free_task + _scheduler.first_free_task = taskidx + sync.unlock(&_scheduler.task_list_mutex) + } + } else { + // Push back in front, let someone else pick the task up + sync.lock(&_scheduler.schedule_mutex) + queue.push_front(&_scheduler.schedule, subtask_idx) + sync.unlock(&_scheduler.schedule_mutex) + thread.yield() + } + } +}