package rune import "core:container/queue" import "core:mem" import "core:sync" import "core:thread" Task_Error :: enum { None, Too_Many_Tasks, Allocation_Failed, } Task_Type :: enum { General, Streaming, Rendering, Physics, Audio, } // Proc called to execute a subtask. Task_Proc :: proc(subtask_idx: int, user_data: rawptr) // Called when a task finished executing. // // The idea is that this is the place where subsequent tasks are enqueued. // This way, the user can control dependencies, but the scheduler does not // need to worry about un-runnable tasks, simplifying it (and hopefully improving performance) Task_Finished_Proc :: proc(user_data: rawptr) @(private = "file") Task :: struct { entry: Task_Proc, finished: Task_Finished_Proc, user_data: rawptr, remaining_subtasks: int, type: Task_Type, next_free: int, } @(private = "file") Sub_Task :: struct { task: int, // index into task list idx: int, // this subtasks index next_free: int, } @(private = "file") Worker :: struct { run: bool, task_types: bit_set[Task_Type], thread: ^thread.Thread, } @(private = "file") Scheduler :: struct { // Order of subtasks to execute. Highest priority is at index 0 schedule_storage: []int, schedule: queue.Queue(int), // List of tasks. We don't move these to avoid expensive // copies when the schedule changes. tasks: []Task, subtasks: []Sub_Task, first_free_task: int, first_free_subtask: int, free_subtasks: int, // Used when adding or removing tasks from the list, // otherwise tasks are read-only (except for the remaining subtask counter, which is atomic) task_list_mutex: sync.Mutex, subtask_list_mutex: sync.Mutex, schedule_mutex: sync.Mutex, // List of workers. We allow the user to dynamically add or remove workers. workers: [dynamic]^Worker, workers_mutex: sync.Mutex, allocator: mem.Allocator, } @(private = "file") _scheduler: Scheduler init_scheduler :: proc( max_tasks: int, max_subtasks_per_task: int, allocator := context.allocator, ) { _scheduler.schedule_storage = make([]int, max_tasks * max_subtasks_per_task, allocator) queue.init_from_slice(&_scheduler.schedule, _scheduler.schedule_storage) _scheduler.tasks = make([]Task, max_tasks, allocator) _scheduler.subtasks = make([]Sub_Task, max_tasks * max_subtasks_per_task, allocator) _scheduler.first_free_task = 0 for i := 0; i < max_tasks; i += 1 { _scheduler.tasks[i].next_free = i + 1 } _scheduler.first_free_subtask = 0 for i := 0; i < max_subtasks_per_task * max_tasks; i += 1 { _scheduler.subtasks[i].next_free = i + 1 } _scheduler.free_subtasks = max_subtasks_per_task * max_tasks _scheduler.workers = make([dynamic]^Worker, allocator) _scheduler.allocator = allocator } add_worker :: proc(task_types: bit_set[Task_Type]) -> (int, Task_Error) { sync.lock(&_scheduler.workers_mutex) defer sync.unlock(&_scheduler.workers_mutex) if d := new(Worker, _scheduler.allocator); d != nil { d.task_types = task_types d.run = true if d.thread = thread.create(worker_proc); d == nil { free(d) return -1, .Allocation_Failed } d.thread.data = d _, err := append(&_scheduler.workers, d) if err != nil { thread.destroy(d.thread) free(d) return -1, .Allocation_Failed } thread.start(d.thread) return len(_scheduler.workers) - 1, .None } return -1, .Allocation_Failed } remove_worker :: proc(idx: int) { sync.lock(&_scheduler.workers_mutex) defer sync.unlock(&_scheduler.workers_mutex) if idx >= len(_scheduler.workers) { return } _scheduler.workers[idx].run = false thread.destroy(_scheduler.workers[idx].thread) free(_scheduler.workers[idx]) unordered_remove(&_scheduler.workers, idx) } queue_task :: proc( entry: Task_Proc, finished: Task_Finished_Proc, user_data: rawptr, subtask_count: int, type := Task_Type.General, ) -> Task_Error { // Use the first free slot to store the task sync.lock(&_scheduler.task_list_mutex) slot := _scheduler.first_free_task if slot >= len(_scheduler.tasks) { sync.unlock(&_scheduler.task_list_mutex) return .Too_Many_Tasks } task := &_scheduler.tasks[slot] _scheduler.first_free_task = task.next_free task.entry = entry task.finished = finished task.user_data = user_data task.next_free = -1 task.remaining_subtasks = subtask_count task.type = type sync.unlock(&_scheduler.task_list_mutex) // Create the subtasks sync.lock(&_scheduler.subtask_list_mutex) if _scheduler.free_subtasks < subtask_count { sync.unlock(&_scheduler.subtask_list_mutex) sync.lock(&_scheduler.task_list_mutex) // Nevermind, release the task again task.next_free = _scheduler.first_free_task _scheduler.first_free_task = slot sync.unlock(&_scheduler.task_list_mutex) return .Too_Many_Tasks } _scheduler.free_subtasks -= subtask_count for i := 0; i < subtask_count; i += 1 { subtask_slot := _scheduler.first_free_subtask assert(subtask_slot < len(_scheduler.subtasks)) subtask := &_scheduler.subtasks[subtask_slot] _scheduler.first_free_subtask = subtask.next_free subtask.next_free = -1 subtask.task = slot subtask.idx = i sync.lock(&_scheduler.schedule_mutex) // Add to schedule. This is FIFO. We could be more clever (for example use shortest time to finish) queue.push_back(&_scheduler.schedule, subtask_slot) sync.unlock(&_scheduler.schedule_mutex) } sync.unlock(&_scheduler.subtask_list_mutex) return .None } @(private = "file") worker_proc :: proc(t: ^thread.Thread) { worker := (^Worker)(t.data) task_types := worker.task_types for worker.run { sync.lock(&_scheduler.schedule_mutex) subtask_idx := -1 if queue.len(_scheduler.schedule) > 0 { subtask_idx = queue.pop_front(&_scheduler.schedule) } sync.unlock(&_scheduler.schedule_mutex) if subtask_idx == -1 { // NO tasks available thread.yield() continue } subtask := &_scheduler.subtasks[subtask_idx] taskidx := subtask.task task := &_scheduler.tasks[taskidx] if task.type in task_types { task.entry(subtask.idx, task.user_data) sync.lock(&_scheduler.subtask_list_mutex) subtask.next_free = _scheduler.first_free_subtask _scheduler.first_free_subtask = subtask_idx _scheduler.free_subtasks += 1 sync.unlock(&_scheduler.subtask_list_mutex) prev_cnt := sync.atomic_sub(&task.remaining_subtasks, 1) if prev_cnt == 1 { // Finished the task, task.finished(task.user_data) sync.lock(&_scheduler.task_list_mutex) task.next_free = _scheduler.first_free_task _scheduler.first_free_task = taskidx sync.unlock(&_scheduler.task_list_mutex) } } else { // Push back in front, let someone else pick the task up sync.lock(&_scheduler.schedule_mutex) queue.push_front(&_scheduler.schedule, subtask_idx) sync.unlock(&_scheduler.schedule_mutex) thread.yield() } } }