rune/task.odin

315 lines
8.8 KiB
Odin

package rune
import "core:container/queue"
import "core:hash"
import "core:mem"
import "core:slice"
import "core:sync"
import "core:thread"
import "core:time"
Task_Error :: enum {
None,
Too_Many_Tasks,
Allocation_Failed,
}
Task_Type :: enum {
General,
Streaming,
Rendering,
Physics,
Audio,
}
// Proc called to execute a subtask.
Task_Proc :: proc(subtask_idx: int, user_data: rawptr)
// Called when a task finished executing.
//
// The idea is that this is the place where subsequent tasks are enqueued.
// This way, the user can control dependencies, but the scheduler does not
// need to worry about un-runnable tasks, simplifying it (and hopefully improving performance)
Task_Finished_Proc :: proc(user_data: rawptr)
@(private = "file")
Task :: struct {
entry: Task_Proc,
finished: Task_Finished_Proc,
user_data: rawptr,
remaining_subtasks: int,
type: Task_Type,
next_free: int,
}
@(private = "file")
Sub_Task :: struct {
task: int, // index into task list
idx: int, // this subtasks index
next_free: int,
}
@(private = "file")
Worker :: struct {
run: bool,
task_types: bit_set[Task_Type],
thread: ^thread.Thread,
}
@(private = "file")
Scheduler :: struct {
// Order of subtasks to execute. Highest priority is at index 0
schedule_storage: []int,
schedule: queue.Queue(int),
// List of tasks. We don't move these to avoid expensive
// copies when the schedule changes.
tasks: []Task,
subtasks: []Sub_Task,
first_free_task: int,
first_free_subtask: int,
free_subtasks: int,
// Used when adding or removing tasks from the list,
// otherwise tasks are read-only (except for the remaining subtask counter, which is atomic)
task_list_mutex: sync.Mutex,
subtask_list_mutex: sync.Mutex,
schedule_mutex: sync.Mutex,
subtask_times_mutex: sync.Mutex,
// Keeps track of how long subtasks (hash of function + subtask idx) ran in the past.
// We keep the last runtime and use it when scheduling subtasks.
subtask_times: map[u64]f64,
// List of workers. We allow the user to dynamically add or remove workers.
workers: [dynamic]^Worker,
workers_mutex: sync.Mutex,
allocator: mem.Allocator,
}
@(private = "file")
_scheduler: Scheduler
init_scheduler :: proc(
max_tasks: int,
max_subtasks_per_task: int,
allocator := context.allocator,
) {
_scheduler.schedule_storage = make([]int, max_tasks * max_subtasks_per_task, allocator)
queue.init_from_slice(&_scheduler.schedule, _scheduler.schedule_storage)
_scheduler.tasks = make([]Task, max_tasks, allocator)
_scheduler.subtasks = make([]Sub_Task, max_tasks * max_subtasks_per_task, allocator)
_scheduler.first_free_task = 0
for i := 0; i < max_tasks; i += 1 {
_scheduler.tasks[i].next_free = i + 1
}
_scheduler.first_free_subtask = 0
for i := 0; i < max_subtasks_per_task * max_tasks; i += 1 {
_scheduler.subtasks[i].next_free = i + 1
}
_scheduler.free_subtasks = max_subtasks_per_task * max_tasks
_scheduler.subtask_times = make(map[u64]f64, allocator)
_scheduler.workers = make([dynamic]^Worker, allocator)
_scheduler.allocator = allocator
}
shutdown_scheduler :: proc() {
for &worker in _scheduler.workers {
worker.run = false
thread.destroy(worker.thread)
free(worker, _scheduler.allocator)
}
delete(_scheduler.workers)
delete(_scheduler.subtask_times)
delete(_scheduler.schedule_storage, _scheduler.allocator)
delete(_scheduler.tasks, _scheduler.allocator)
delete(_scheduler.subtasks, _scheduler.allocator)
}
add_worker :: proc(task_types: bit_set[Task_Type]) -> (int, Task_Error) {
sync.lock(&_scheduler.workers_mutex)
defer sync.unlock(&_scheduler.workers_mutex)
if d := new(Worker, _scheduler.allocator); d != nil {
d.task_types = task_types
d.run = true
if d.thread = thread.create(worker_proc); d == nil {
free(d)
return -1, .Allocation_Failed
}
d.thread.data = d
_, err := append(&_scheduler.workers, d)
if err != nil {
thread.destroy(d.thread)
free(d)
return -1, .Allocation_Failed
}
thread.start(d.thread)
return len(_scheduler.workers) - 1, .None
}
return -1, .Allocation_Failed
}
remove_worker :: proc(idx: int) {
sync.lock(&_scheduler.workers_mutex)
defer sync.unlock(&_scheduler.workers_mutex)
if idx >= len(_scheduler.workers) {
return
}
_scheduler.workers[idx].run = false
thread.destroy(_scheduler.workers[idx].thread)
free(_scheduler.workers[idx])
unordered_remove(&_scheduler.workers, idx)
}
@(private = "file")
Subtask_Timing :: struct {
slot: int,
time: f64,
}
@(private = "file")
sort_subtask_timings :: proc(lhs: Subtask_Timing, rhs: Subtask_Timing) -> bool {
return lhs.time < rhs.time
}
queue_task :: proc(
entry: Task_Proc,
finished: Task_Finished_Proc,
user_data: rawptr,
subtask_count: int,
type := Task_Type.General,
) -> Task_Error {
// Use the first free slot to store the task
sync.lock(&_scheduler.task_list_mutex)
slot := _scheduler.first_free_task
if slot >= len(_scheduler.tasks) {
sync.unlock(&_scheduler.task_list_mutex)
return .Too_Many_Tasks
}
task := &_scheduler.tasks[slot]
_scheduler.first_free_task = task.next_free
task.entry = entry
task.finished = finished
task.user_data = user_data
task.next_free = -1
task.remaining_subtasks = subtask_count
task.type = type
sync.unlock(&_scheduler.task_list_mutex)
// Create the subtasks
sync.lock(&_scheduler.subtask_list_mutex)
if _scheduler.free_subtasks < subtask_count {
sync.unlock(&_scheduler.subtask_list_mutex)
sync.lock(&_scheduler.task_list_mutex)
// Nevermind, release the task again
task.next_free = _scheduler.first_free_task
_scheduler.first_free_task = slot
sync.unlock(&_scheduler.task_list_mutex)
return .Too_Many_Tasks
}
_scheduler.free_subtasks -= subtask_count
subtask_timings := make([]Subtask_Timing, subtask_count)
task_hash := u64(uintptr(&task.entry))
for i := 0; i < subtask_count; i += 1 {
subtask_slot := _scheduler.first_free_subtask
assert(subtask_slot < len(_scheduler.subtasks))
subtask := &_scheduler.subtasks[subtask_slot]
_scheduler.first_free_subtask = subtask.next_free
subtask.next_free = -1
subtask.task = slot
subtask.idx = i
subtask_id: [1]int = {i}
subtask_hash := hash.fnv64a(slice.to_bytes(subtask_id[:]))
final_hash := task_hash ~ subtask_hash
subtask_timings[i].slot = i
if final_hash in _scheduler.subtask_times {
sync.lock(&_scheduler.subtask_times_mutex)
subtask_timings[i].time = _scheduler.subtask_times[final_hash]
sync.unlock(&_scheduler.subtask_times_mutex)
} else {
subtask_timings[i].time = 0.0
}
}
sync.unlock(&_scheduler.subtask_list_mutex)
slice.sort_by(subtask_timings, sort_subtask_timings)
sync.lock(&_scheduler.schedule_mutex)
for i := 0; i < subtask_count; i += 1 {
queue.push_back(&_scheduler.schedule, subtask_timings[i].slot)
}
sync.unlock(&_scheduler.schedule_mutex)
return .None
}
@(private = "file")
worker_proc :: proc(t: ^thread.Thread) {
worker := (^Worker)(t.data)
task_types := worker.task_types
for worker.run {
sync.lock(&_scheduler.schedule_mutex)
subtask_idx := -1
if queue.len(_scheduler.schedule) > 0 {
subtask_idx = queue.pop_front(&_scheduler.schedule)
}
sync.unlock(&_scheduler.schedule_mutex)
if subtask_idx == -1 {
// NO tasks available
thread.yield()
continue
}
subtask := &_scheduler.subtasks[subtask_idx]
taskidx := subtask.task
task := &_scheduler.tasks[taskidx]
if task.type in task_types {
start := time.tick_now()
task.entry(subtask.idx, task.user_data)
end := time.tick_now()
duration := time.tick_diff(start, end)
// Compute a hash that identifies this subtask.
// We just treat the (64bit) entry address as a hash
task_hash := u64(uintptr(&task.entry))
subtask_id: [1]int = {subtask.idx}
subtask_hash := hash.fnv64a(slice.to_bytes(subtask_id[:]))
final_hash := task_hash ~ subtask_hash
sync.lock(&_scheduler.subtask_times_mutex)
_scheduler.subtask_times[final_hash] = time.duration_microseconds(duration)
sync.unlock(&_scheduler.subtask_times_mutex)
sync.lock(&_scheduler.subtask_list_mutex)
subtask.next_free = _scheduler.first_free_subtask
_scheduler.first_free_subtask = subtask_idx
_scheduler.free_subtasks += 1
sync.unlock(&_scheduler.subtask_list_mutex)
prev_cnt := sync.atomic_sub(&task.remaining_subtasks, 1)
if prev_cnt == 1 {
// Finished the task,
task.finished(task.user_data)
sync.lock(&_scheduler.task_list_mutex)
task.next_free = _scheduler.first_free_task
_scheduler.first_free_task = taskidx
sync.unlock(&_scheduler.task_list_mutex)
}
} else {
// Push back in front, let someone else pick the task up
sync.lock(&_scheduler.schedule_mutex)
queue.push_front(&_scheduler.schedule, subtask_idx)
sync.unlock(&_scheduler.schedule_mutex)
thread.yield()
}
}
}