diff --git a/main.odin b/main.odin index 1766019..8b3f81e 100644 --- a/main.odin +++ b/main.odin @@ -1,6 +1,7 @@ package rune import "core:fmt" +import "core:time" test_task :: proc(subtask: int, user_data: rawptr) { fmt.printfln("test task %v", subtask) @@ -13,9 +14,13 @@ test_finished :: proc(user_data: rawptr) { main :: proc() { init_scheduler(100, 1000) add_worker({.General, .Streaming, .Physics, .Rendering}) - add_worker({.General, .Streaming, .Physics, .Rendering}) - add_worker({.General, .Streaming, .Physics, .Rendering}) - add_worker({.General, .Streaming, .Physics, .Rendering}) + //add_worker({.General, .Streaming, .Physics, .Rendering}) + //add_worker({.General, .Streaming, .Physics, .Rendering}) + //add_worker({.General, .Streaming, .Physics, .Rendering}) + + queue_task(test_task, test_finished, nil, 10) + + time.sleep(1 * time.Second) queue_task(test_task, test_finished, nil, 10) diff --git a/task.odin b/task.odin index be84596..6a4fd33 100644 --- a/task.odin +++ b/task.odin @@ -1,9 +1,12 @@ package rune import "core:container/queue" +import "core:hash" import "core:mem" +import "core:slice" import "core:sync" import "core:thread" +import "core:time" Task_Error :: enum { None, @@ -56,27 +59,32 @@ Worker :: struct { @(private = "file") Scheduler :: struct { // Order of subtasks to execute. Highest priority is at index 0 - schedule_storage: []int, - schedule: queue.Queue(int), + schedule_storage: []int, + schedule: queue.Queue(int), // List of tasks. We don't move these to avoid expensive // copies when the schedule changes. - tasks: []Task, - subtasks: []Sub_Task, - first_free_task: int, - first_free_subtask: int, - free_subtasks: int, + tasks: []Task, + subtasks: []Sub_Task, + first_free_task: int, + first_free_subtask: int, + free_subtasks: int, // Used when adding or removing tasks from the list, // otherwise tasks are read-only (except for the remaining subtask counter, which is atomic) - task_list_mutex: sync.Mutex, - subtask_list_mutex: sync.Mutex, - schedule_mutex: sync.Mutex, + task_list_mutex: sync.Mutex, + subtask_list_mutex: sync.Mutex, + schedule_mutex: sync.Mutex, + subtask_times_mutex: sync.Mutex, + + // Keeps track of how long subtasks (hash of function + subtask idx) ran in the past. + // We keep the last runtime and use it when scheduling subtasks. + subtask_times: map[u64]f64, // List of workers. We allow the user to dynamically add or remove workers. - workers: [dynamic]^Worker, - workers_mutex: sync.Mutex, - allocator: mem.Allocator, + workers: [dynamic]^Worker, + workers_mutex: sync.Mutex, + allocator: mem.Allocator, } @(private = "file") @@ -100,10 +108,24 @@ init_scheduler :: proc( _scheduler.subtasks[i].next_free = i + 1 } _scheduler.free_subtasks = max_subtasks_per_task * max_tasks + _scheduler.subtask_times = make(map[u64]f64, allocator) _scheduler.workers = make([dynamic]^Worker, allocator) _scheduler.allocator = allocator } +shutdown_scheduler :: proc() { + for &worker in _scheduler.workers { + worker.run = false + thread.destroy(worker.thread) + free(worker, _scheduler.allocator) + } + delete(_scheduler.workers) + delete(_scheduler.subtask_times) + delete(_scheduler.schedule_storage, _scheduler.allocator) + delete(_scheduler.tasks, _scheduler.allocator) + delete(_scheduler.subtasks, _scheduler.allocator) +} + add_worker :: proc(task_types: bit_set[Task_Type]) -> (int, Task_Error) { sync.lock(&_scheduler.workers_mutex) defer sync.unlock(&_scheduler.workers_mutex) @@ -139,6 +161,17 @@ remove_worker :: proc(idx: int) { unordered_remove(&_scheduler.workers, idx) } + +Subtask_Timing :: struct { + slot: int, + time: f64, +} + +@(private = "file") +sort_subtask_timings :: proc(lhs: Subtask_Timing, rhs: Subtask_Timing) -> bool { + return lhs.time < rhs.time +} + queue_task :: proc( entry: Task_Proc, finished: Task_Finished_Proc, @@ -177,6 +210,10 @@ queue_task :: proc( return .Too_Many_Tasks } _scheduler.free_subtasks -= subtask_count + + subtask_timings := make([]Subtask_Timing, subtask_count) + task_hash := u64(uintptr(&task.entry)) + for i := 0; i < subtask_count; i += 1 { subtask_slot := _scheduler.first_free_subtask assert(subtask_slot < len(_scheduler.subtasks)) @@ -186,13 +223,36 @@ queue_task :: proc( subtask.task = slot subtask.idx = i + + subtask_id: [1]int = {i} + subtask_hash := hash.fnv64a(slice.to_bytes(subtask_id[:])) + final_hash := task_hash ~ subtask_hash + + subtask_timings[i].slot = i + if final_hash in _scheduler.subtask_times { + sync.lock(&_scheduler.subtask_times_mutex) + subtask_timings[i].time = _scheduler.subtask_times[final_hash] + sync.unlock(&_scheduler.subtask_times_mutex) + } else { + subtask_timings[i].time = 0.0 + } + + /* sync.lock(&_scheduler.schedule_mutex) // Add to schedule. This is FIFO. We could be more clever (for example use shortest time to finish) queue.push_back(&_scheduler.schedule, subtask_slot) sync.unlock(&_scheduler.schedule_mutex) + */ } sync.unlock(&_scheduler.subtask_list_mutex) + slice.sort_by(subtask_timings, sort_subtask_timings) + + sync.lock(&_scheduler.schedule_mutex) + for i := 0; i < subtask_count; i += 1 { + queue.push_back(&_scheduler.schedule, subtask_timings[i].slot) + } + sync.unlock(&_scheduler.schedule_mutex) return .None } @@ -219,7 +279,20 @@ worker_proc :: proc(t: ^thread.Thread) { task := &_scheduler.tasks[taskidx] if task.type in task_types { + start := time.tick_now() task.entry(subtask.idx, task.user_data) + end := time.tick_now() + duration := time.tick_diff(start, end) + + // Compute a hash that identifies this subtask. + // We just treat the (64bit) entry address as a hash + task_hash := u64(uintptr(&task.entry)) + subtask_id: [1]int = {subtask.idx} + subtask_hash := hash.fnv64a(slice.to_bytes(subtask_id[:])) + final_hash := task_hash ~ subtask_hash + sync.lock(&_scheduler.subtask_times_mutex) + _scheduler.subtask_times[final_hash] = time.duration_microseconds(duration) + sync.unlock(&_scheduler.subtask_times_mutex) sync.lock(&_scheduler.subtask_list_mutex) subtask.next_free = _scheduler.first_free_subtask