package rune import "core:container/queue" import "core:hash" import "core:mem" import "core:slice" import "core:sync" import "core:thread" import "core:time" Task_Error :: enum { None, Too_Many_Tasks, Allocation_Failed, } Task_Type :: enum { General, Streaming, Rendering, Physics, Audio, } // Proc called to execute a subtask. Task_Proc :: proc(subtask_idx: int, user_data: rawptr) // Called when a task finished executing. // // The idea is that this is the place where subsequent tasks are enqueued. // This way, the user can control dependencies, but the scheduler does not // need to worry about un-runnable tasks, simplifying it (and hopefully improving performance) Task_Finished_Proc :: proc(user_data: rawptr) @(private = "file") Task :: struct { entry: Task_Proc, finished: Task_Finished_Proc, user_data: rawptr, remaining_subtasks: int, type: Task_Type, next_free: int, } @(private = "file") Sub_Task :: struct { task: int, // index into task list idx: int, // this subtasks index next_free: int, } @(private = "file") Worker :: struct { run: bool, task_types: bit_set[Task_Type], thread: ^thread.Thread, } @(private = "file") Scheduler :: struct { // Order of subtasks to execute. Highest priority is at index 0 schedule_storage: []int, schedule: queue.Queue(int), // List of tasks. We don't move these to avoid expensive // copies when the schedule changes. tasks: []Task, subtasks: []Sub_Task, first_free_task: int, first_free_subtask: int, free_subtasks: int, // Used when adding or removing tasks from the list, // otherwise tasks are read-only (except for the remaining subtask counter, which is atomic) task_list_mutex: sync.Mutex, subtask_list_mutex: sync.Mutex, schedule_mutex: sync.Mutex, subtask_times_mutex: sync.Mutex, // Keeps track of how long subtasks (hash of function + subtask idx) ran in the past. // We keep the last runtime and use it when scheduling subtasks. subtask_times: map[u64]f64, // List of workers. We allow the user to dynamically add or remove workers. workers: [dynamic]^Worker, workers_mutex: sync.Mutex, allocator: mem.Allocator, } @(private = "file") _scheduler: Scheduler init_scheduler :: proc( max_tasks: int, max_subtasks_per_task: int, allocator := context.allocator, ) { _scheduler.schedule_storage = make([]int, max_tasks * max_subtasks_per_task, allocator) queue.init_from_slice(&_scheduler.schedule, _scheduler.schedule_storage) _scheduler.tasks = make([]Task, max_tasks, allocator) _scheduler.subtasks = make([]Sub_Task, max_tasks * max_subtasks_per_task, allocator) _scheduler.first_free_task = 0 for i := 0; i < max_tasks; i += 1 { _scheduler.tasks[i].next_free = i + 1 } _scheduler.first_free_subtask = 0 for i := 0; i < max_subtasks_per_task * max_tasks; i += 1 { _scheduler.subtasks[i].next_free = i + 1 } _scheduler.free_subtasks = max_subtasks_per_task * max_tasks _scheduler.subtask_times = make(map[u64]f64, allocator) _scheduler.workers = make([dynamic]^Worker, allocator) _scheduler.allocator = allocator } shutdown_scheduler :: proc() { for &worker in _scheduler.workers { worker.run = false thread.destroy(worker.thread) free(worker, _scheduler.allocator) } delete(_scheduler.workers) delete(_scheduler.subtask_times) delete(_scheduler.schedule_storage, _scheduler.allocator) delete(_scheduler.tasks, _scheduler.allocator) delete(_scheduler.subtasks, _scheduler.allocator) } add_worker :: proc(task_types: bit_set[Task_Type]) -> (int, Task_Error) { sync.lock(&_scheduler.workers_mutex) defer sync.unlock(&_scheduler.workers_mutex) if d := new(Worker, _scheduler.allocator); d != nil { d.task_types = task_types d.run = true if d.thread = thread.create(worker_proc); d == nil { free(d) return -1, .Allocation_Failed } d.thread.data = d _, err := append(&_scheduler.workers, d) if err != nil { thread.destroy(d.thread) free(d) return -1, .Allocation_Failed } thread.start(d.thread) return len(_scheduler.workers) - 1, .None } return -1, .Allocation_Failed } remove_worker :: proc(idx: int) { sync.lock(&_scheduler.workers_mutex) defer sync.unlock(&_scheduler.workers_mutex) if idx >= len(_scheduler.workers) { return } _scheduler.workers[idx].run = false thread.destroy(_scheduler.workers[idx].thread) free(_scheduler.workers[idx]) unordered_remove(&_scheduler.workers, idx) } @(private = "file") Subtask_Timing :: struct { slot: int, time: f64, } @(private = "file") sort_subtask_timings :: proc(lhs: Subtask_Timing, rhs: Subtask_Timing) -> bool { return lhs.time < rhs.time } queue_task :: proc( entry: Task_Proc, finished: Task_Finished_Proc, user_data: rawptr, subtask_count: int, type := Task_Type.General, ) -> Task_Error { // Use the first free slot to store the task sync.lock(&_scheduler.task_list_mutex) slot := _scheduler.first_free_task if slot >= len(_scheduler.tasks) { sync.unlock(&_scheduler.task_list_mutex) return .Too_Many_Tasks } task := &_scheduler.tasks[slot] _scheduler.first_free_task = task.next_free task.entry = entry task.finished = finished task.user_data = user_data task.next_free = -1 task.remaining_subtasks = subtask_count task.type = type sync.unlock(&_scheduler.task_list_mutex) // Create the subtasks sync.lock(&_scheduler.subtask_list_mutex) if _scheduler.free_subtasks < subtask_count { sync.unlock(&_scheduler.subtask_list_mutex) sync.lock(&_scheduler.task_list_mutex) // Nevermind, release the task again task.next_free = _scheduler.first_free_task _scheduler.first_free_task = slot sync.unlock(&_scheduler.task_list_mutex) return .Too_Many_Tasks } _scheduler.free_subtasks -= subtask_count subtask_timings := make([]Subtask_Timing, subtask_count) task_hash := u64(uintptr(&task.entry)) for i := 0; i < subtask_count; i += 1 { subtask_slot := _scheduler.first_free_subtask assert(subtask_slot < len(_scheduler.subtasks)) subtask := &_scheduler.subtasks[subtask_slot] _scheduler.first_free_subtask = subtask.next_free subtask.next_free = -1 subtask.task = slot subtask.idx = i subtask_id: [1]int = {i} subtask_hash := hash.fnv64a(slice.to_bytes(subtask_id[:])) final_hash := task_hash ~ subtask_hash subtask_timings[i].slot = i if final_hash in _scheduler.subtask_times { sync.lock(&_scheduler.subtask_times_mutex) subtask_timings[i].time = _scheduler.subtask_times[final_hash] sync.unlock(&_scheduler.subtask_times_mutex) } else { subtask_timings[i].time = 0.0 } } sync.unlock(&_scheduler.subtask_list_mutex) slice.sort_by(subtask_timings, sort_subtask_timings) sync.lock(&_scheduler.schedule_mutex) for i := 0; i < subtask_count; i += 1 { queue.push_back(&_scheduler.schedule, subtask_timings[i].slot) } sync.unlock(&_scheduler.schedule_mutex) return .None } @(private = "file") worker_proc :: proc(t: ^thread.Thread) { worker := (^Worker)(t.data) task_types := worker.task_types for worker.run { sync.lock(&_scheduler.schedule_mutex) subtask_idx := -1 if queue.len(_scheduler.schedule) > 0 { subtask_idx = queue.pop_front(&_scheduler.schedule) } sync.unlock(&_scheduler.schedule_mutex) if subtask_idx == -1 { // NO tasks available thread.yield() continue } subtask := &_scheduler.subtasks[subtask_idx] taskidx := subtask.task task := &_scheduler.tasks[taskidx] if task.type in task_types { start := time.tick_now() task.entry(subtask.idx, task.user_data) end := time.tick_now() duration := time.tick_diff(start, end) // Compute a hash that identifies this subtask. // We just treat the (64bit) entry address as a hash task_hash := u64(uintptr(&task.entry)) subtask_id: [1]int = {subtask.idx} subtask_hash := hash.fnv64a(slice.to_bytes(subtask_id[:])) final_hash := task_hash ~ subtask_hash sync.lock(&_scheduler.subtask_times_mutex) _scheduler.subtask_times[final_hash] = time.duration_microseconds(duration) sync.unlock(&_scheduler.subtask_times_mutex) sync.lock(&_scheduler.subtask_list_mutex) subtask.next_free = _scheduler.first_free_subtask _scheduler.first_free_subtask = subtask_idx _scheduler.free_subtasks += 1 sync.unlock(&_scheduler.subtask_list_mutex) prev_cnt := sync.atomic_sub(&task.remaining_subtasks, 1) if prev_cnt == 1 { // Finished the task, task.finished(task.user_data) sync.lock(&_scheduler.task_list_mutex) task.next_free = _scheduler.first_free_task _scheduler.first_free_task = taskidx sync.unlock(&_scheduler.task_list_mutex) } } else { // Push back in front, let someone else pick the task up sync.lock(&_scheduler.schedule_mutex) queue.push_front(&_scheduler.schedule, subtask_idx) sync.unlock(&_scheduler.schedule_mutex) thread.yield() } } }