shortest subtask time to finish for scheduler

This commit is contained in:
Kevin Trogant 2025-04-22 20:56:20 +02:00
parent 8926ae0f5c
commit ff0a13e761
2 changed files with 94 additions and 16 deletions

View File

@ -1,6 +1,7 @@
package rune package rune
import "core:fmt" import "core:fmt"
import "core:time"
test_task :: proc(subtask: int, user_data: rawptr) { test_task :: proc(subtask: int, user_data: rawptr) {
fmt.printfln("test task %v", subtask) fmt.printfln("test task %v", subtask)
@ -13,9 +14,13 @@ test_finished :: proc(user_data: rawptr) {
main :: proc() { main :: proc() {
init_scheduler(100, 1000) init_scheduler(100, 1000)
add_worker({.General, .Streaming, .Physics, .Rendering}) add_worker({.General, .Streaming, .Physics, .Rendering})
add_worker({.General, .Streaming, .Physics, .Rendering}) //add_worker({.General, .Streaming, .Physics, .Rendering})
add_worker({.General, .Streaming, .Physics, .Rendering}) //add_worker({.General, .Streaming, .Physics, .Rendering})
add_worker({.General, .Streaming, .Physics, .Rendering}) //add_worker({.General, .Streaming, .Physics, .Rendering})
queue_task(test_task, test_finished, nil, 10)
time.sleep(1 * time.Second)
queue_task(test_task, test_finished, nil, 10) queue_task(test_task, test_finished, nil, 10)

View File

@ -1,9 +1,12 @@
package rune package rune
import "core:container/queue" import "core:container/queue"
import "core:hash"
import "core:mem" import "core:mem"
import "core:slice"
import "core:sync" import "core:sync"
import "core:thread" import "core:thread"
import "core:time"
Task_Error :: enum { Task_Error :: enum {
None, None,
@ -56,27 +59,32 @@ Worker :: struct {
@(private = "file") @(private = "file")
Scheduler :: struct { Scheduler :: struct {
// Order of subtasks to execute. Highest priority is at index 0 // Order of subtasks to execute. Highest priority is at index 0
schedule_storage: []int, schedule_storage: []int,
schedule: queue.Queue(int), schedule: queue.Queue(int),
// List of tasks. We don't move these to avoid expensive // List of tasks. We don't move these to avoid expensive
// copies when the schedule changes. // copies when the schedule changes.
tasks: []Task, tasks: []Task,
subtasks: []Sub_Task, subtasks: []Sub_Task,
first_free_task: int, first_free_task: int,
first_free_subtask: int, first_free_subtask: int,
free_subtasks: int, free_subtasks: int,
// Used when adding or removing tasks from the list, // Used when adding or removing tasks from the list,
// otherwise tasks are read-only (except for the remaining subtask counter, which is atomic) // otherwise tasks are read-only (except for the remaining subtask counter, which is atomic)
task_list_mutex: sync.Mutex, task_list_mutex: sync.Mutex,
subtask_list_mutex: sync.Mutex, subtask_list_mutex: sync.Mutex,
schedule_mutex: sync.Mutex, schedule_mutex: sync.Mutex,
subtask_times_mutex: sync.Mutex,
// Keeps track of how long subtasks (hash of function + subtask idx) ran in the past.
// We keep the last runtime and use it when scheduling subtasks.
subtask_times: map[u64]f64,
// List of workers. We allow the user to dynamically add or remove workers. // List of workers. We allow the user to dynamically add or remove workers.
workers: [dynamic]^Worker, workers: [dynamic]^Worker,
workers_mutex: sync.Mutex, workers_mutex: sync.Mutex,
allocator: mem.Allocator, allocator: mem.Allocator,
} }
@(private = "file") @(private = "file")
@ -100,10 +108,24 @@ init_scheduler :: proc(
_scheduler.subtasks[i].next_free = i + 1 _scheduler.subtasks[i].next_free = i + 1
} }
_scheduler.free_subtasks = max_subtasks_per_task * max_tasks _scheduler.free_subtasks = max_subtasks_per_task * max_tasks
_scheduler.subtask_times = make(map[u64]f64, allocator)
_scheduler.workers = make([dynamic]^Worker, allocator) _scheduler.workers = make([dynamic]^Worker, allocator)
_scheduler.allocator = allocator _scheduler.allocator = allocator
} }
shutdown_scheduler :: proc() {
for &worker in _scheduler.workers {
worker.run = false
thread.destroy(worker.thread)
free(worker, _scheduler.allocator)
}
delete(_scheduler.workers)
delete(_scheduler.subtask_times)
delete(_scheduler.schedule_storage, _scheduler.allocator)
delete(_scheduler.tasks, _scheduler.allocator)
delete(_scheduler.subtasks, _scheduler.allocator)
}
add_worker :: proc(task_types: bit_set[Task_Type]) -> (int, Task_Error) { add_worker :: proc(task_types: bit_set[Task_Type]) -> (int, Task_Error) {
sync.lock(&_scheduler.workers_mutex) sync.lock(&_scheduler.workers_mutex)
defer sync.unlock(&_scheduler.workers_mutex) defer sync.unlock(&_scheduler.workers_mutex)
@ -139,6 +161,17 @@ remove_worker :: proc(idx: int) {
unordered_remove(&_scheduler.workers, idx) unordered_remove(&_scheduler.workers, idx)
} }
Subtask_Timing :: struct {
slot: int,
time: f64,
}
@(private = "file")
sort_subtask_timings :: proc(lhs: Subtask_Timing, rhs: Subtask_Timing) -> bool {
return lhs.time < rhs.time
}
queue_task :: proc( queue_task :: proc(
entry: Task_Proc, entry: Task_Proc,
finished: Task_Finished_Proc, finished: Task_Finished_Proc,
@ -177,6 +210,10 @@ queue_task :: proc(
return .Too_Many_Tasks return .Too_Many_Tasks
} }
_scheduler.free_subtasks -= subtask_count _scheduler.free_subtasks -= subtask_count
subtask_timings := make([]Subtask_Timing, subtask_count)
task_hash := u64(uintptr(&task.entry))
for i := 0; i < subtask_count; i += 1 { for i := 0; i < subtask_count; i += 1 {
subtask_slot := _scheduler.first_free_subtask subtask_slot := _scheduler.first_free_subtask
assert(subtask_slot < len(_scheduler.subtasks)) assert(subtask_slot < len(_scheduler.subtasks))
@ -186,13 +223,36 @@ queue_task :: proc(
subtask.task = slot subtask.task = slot
subtask.idx = i subtask.idx = i
subtask_id: [1]int = {i}
subtask_hash := hash.fnv64a(slice.to_bytes(subtask_id[:]))
final_hash := task_hash ~ subtask_hash
subtask_timings[i].slot = i
if final_hash in _scheduler.subtask_times {
sync.lock(&_scheduler.subtask_times_mutex)
subtask_timings[i].time = _scheduler.subtask_times[final_hash]
sync.unlock(&_scheduler.subtask_times_mutex)
} else {
subtask_timings[i].time = 0.0
}
/*
sync.lock(&_scheduler.schedule_mutex) sync.lock(&_scheduler.schedule_mutex)
// Add to schedule. This is FIFO. We could be more clever (for example use shortest time to finish) // Add to schedule. This is FIFO. We could be more clever (for example use shortest time to finish)
queue.push_back(&_scheduler.schedule, subtask_slot) queue.push_back(&_scheduler.schedule, subtask_slot)
sync.unlock(&_scheduler.schedule_mutex) sync.unlock(&_scheduler.schedule_mutex)
*/
} }
sync.unlock(&_scheduler.subtask_list_mutex) sync.unlock(&_scheduler.subtask_list_mutex)
slice.sort_by(subtask_timings, sort_subtask_timings)
sync.lock(&_scheduler.schedule_mutex)
for i := 0; i < subtask_count; i += 1 {
queue.push_back(&_scheduler.schedule, subtask_timings[i].slot)
}
sync.unlock(&_scheduler.schedule_mutex)
return .None return .None
} }
@ -219,7 +279,20 @@ worker_proc :: proc(t: ^thread.Thread) {
task := &_scheduler.tasks[taskidx] task := &_scheduler.tasks[taskidx]
if task.type in task_types { if task.type in task_types {
start := time.tick_now()
task.entry(subtask.idx, task.user_data) task.entry(subtask.idx, task.user_data)
end := time.tick_now()
duration := time.tick_diff(start, end)
// Compute a hash that identifies this subtask.
// We just treat the (64bit) entry address as a hash
task_hash := u64(uintptr(&task.entry))
subtask_id: [1]int = {subtask.idx}
subtask_hash := hash.fnv64a(slice.to_bytes(subtask_id[:]))
final_hash := task_hash ~ subtask_hash
sync.lock(&_scheduler.subtask_times_mutex)
_scheduler.subtask_times[final_hash] = time.duration_microseconds(duration)
sync.unlock(&_scheduler.subtask_times_mutex)
sync.lock(&_scheduler.subtask_list_mutex) sync.lock(&_scheduler.subtask_list_mutex)
subtask.next_free = _scheduler.first_free_subtask subtask.next_free = _scheduler.first_free_subtask