From 5559ed2e0b63103e042a1366c21e24f6aa756d63 Mon Sep 17 00:00:00 2001 From: Zachary Levy Date: Thu, 2 Apr 2026 17:42:36 -0700 Subject: [PATCH] Added phased executor --- .zed/tasks.json | 5 + levmath/levmath.odin | 2 +- phased_executor/phased_executor.odin | 305 +++++++++++++++++++++++++++ 3 files changed, 311 insertions(+), 1 deletion(-) create mode 100644 phased_executor/phased_executor.odin diff --git a/.zed/tasks.json b/.zed/tasks.json index d49b77d..c24534c 100644 --- a/.zed/tasks.json +++ b/.zed/tasks.json @@ -27,6 +27,11 @@ "command": "odin test levmath -out=out/debug/test_levmath", "cwd": "$ZED_WORKTREE_ROOT" }, + { + "label": "Test phased_executor", + "command": "odin test phased_executor -out=out/debug/test_phased_executor", + "cwd": "$ZED_WORKTREE_ROOT" + }, // --------------------------------------------------------------------------------------------------------------------- // ----- LMDB Examples ------------------------ // --------------------------------------------------------------------------------------------------------------------- diff --git a/levmath/levmath.odin b/levmath/levmath.odin index b9ddaa6..86ee0f9 100644 --- a/levmath/levmath.odin +++ b/levmath/levmath.odin @@ -2,7 +2,6 @@ package levmath import "base:intrinsics" import "core:math" -import "core:testing" // --------------------------------------------------------------------------------------------------------------------- // ----- Fast Exp (Schraudolph IEEE 754 bit trick) --------------------------------------------------------------------- @@ -77,6 +76,7 @@ fast_exp :: #force_inline proc "contextless" (x: $FLOAT) -> FLOAT where intrinsi // --------------------------------------------------------------------------------------------------------------------- // ----- Testing ------------------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------------------------------- +import "core:testing" @(test) test_fast_exp_identity :: proc(t: ^testing.T) { diff --git a/phased_executor/phased_executor.odin b/phased_executor/phased_executor.odin new file mode 100644 index 0000000..0c88c09 --- /dev/null +++ b/phased_executor/phased_executor.odin @@ -0,0 +1,305 @@ +// Executor for fan out/in phases where each phase must enitrely finish before the next begins. +// This executor does not gaurentee strict ordering of commands withing a phase so it is only suited +// to tasks where order within a phase is not critical. +package phased_executor + +import "base:intrinsics" +import q "core:container/queue" +import "core:prof/spall" +import "core:sync" +import "core:thread" + +import b "../basic" + +DEFT_BATCH_SIZE :: 1024 // Number of nodes in each batch +DEFT_SPIN_LIMIT :: 2_500_000 + +Harness :: struct($T: typeid) where intrinsics.type_has_nil(T) { + mutex: sync.Mutex, + condition: sync.Cond, + cmd_queue: q.Queue(T), + spin, locked: bool, + _pad: [64 - size_of(uint)]u8, // We want join_count to have its own cache line + join_count: uint, // Number of commands completed since last exec_join +} + +// `nil` for type `T` is reserved for executor shutdown. If you need `nil` for something else wrap `T` +// in a union even if there is only a single type. +// +// Executor is not thread safe and can only be used from a single thread at a time. +// Executor can only handle 1 graph at a time. +// To execute multiple graphs at the same time, use multiple executors. +Executor :: struct($T: typeid) where intrinsics.type_has_nil(T) { + harnesses: []Harness(T), // Accessed from slave threads + spin_limit: uint, // Accessed from slave threads + num_cmds_in_round: uint, // Number of commands submitted without join being called + harness_index: int, + cmd_queue_floor: int, + thread_pool: thread.Pool, + initialized: bool, +} + +//TODO: Provide a way to set some aspects of context for the executor threads. Namely a logger. +init_executor :: proc( + executor: ^Executor($T), + #any_int num_threads: int, + $on_command_received: proc(command: T), + #any_int spin_limit: uint = DEFT_SPIN_LIMIT, + allocator := context.allocator, +) { + was_initialized, _ := intrinsics.atomic_compare_exchange_strong_explicit( + &executor.initialized, + false, + true, + .Seq_Cst, + .Seq_Cst, + ) + assert(!was_initialized, "Executor already initialized.") + + slave_task := build_task(on_command_received) + executor.spin_limit = spin_limit + executor.harnesses = make([]Harness(T), num_threads, allocator) + for &harness in executor.harnesses { + q.init(&harness.cmd_queue, allocator = allocator) + harness.spin = true + } + + thread.pool_init(&executor.thread_pool, allocator, num_threads) + for i in 0 ..< num_threads { + thread.pool_add_task(&executor.thread_pool, allocator, slave_task, data = executor, user_index = i) + } + thread.pool_start(&executor.thread_pool) + + return +} + +// Cleanly shuts down all executor tasks then destroys the executor +destroy_executor :: proc(executor: ^Executor($T), allocator := context.allocator) { + was_initialized, _ := intrinsics.atomic_compare_exchange_strong_explicit( + &executor.initialized, + true, + false, + .Seq_Cst, + .Seq_Cst, + ) + assert(was_initialized, "Executor not initialized.") + + // Exit thread loops + for &harness in executor.harnesses { + for { + if try_lock_harness(&harness.locked) { + q.push_back(&harness.cmd_queue, nil) + if !harness.spin { + sync.mutex_lock(&harness.mutex) + sync.cond_signal(&harness.condition) + sync.mutex_unlock(&harness.mutex) + } + intrinsics.atomic_store_explicit(&harness.locked, false, .Release) + break + } + } + } + + thread.pool_join(&executor.thread_pool) + thread.pool_destroy(&executor.thread_pool) + for &harness in executor.harnesses { + q.destroy(&harness.cmd_queue) + } + delete(executor.harnesses, allocator) +} + +// Returns true if lock successfuly acquired, false otherwise +try_lock_harness :: #force_inline proc "contextless" (locked: ^bool) -> bool { + was_locked, lock_acquired := intrinsics.atomic_compare_exchange_weak_explicit( + locked, + false, + true, + .Acq_Rel, + .Relaxed, + ) + return lock_acquired +} + +build_task :: proc( + $on_command_received: proc(command: $T), +) -> ( + slave_task: proc(task: thread.Task), +) where intrinsics.type_has_nil(T) { + slave_task = proc(task: thread.Task) { + when b.SPALL_TRACE { + spall_data := make([]u8, spall.BUFFER_DEFAULT_SIZE) + spall_buffer = spall.buffer_create(spall_data, u32(sync.current_thread_id())) + defer spall.buffer_destroy(&spall_ctx, &spall_buffer) + } + + executor := cast(^Executor(T))task.data + harness := &executor.harnesses[task.user_index] + sync.mutex_lock(&harness.mutex) + for { + defer free_all(context.temp_allocator) + // Spinning + spin_count: uint = 0 + spin_loop: for { + if try_lock_harness(&harness.locked) { + if q.len(harness.cmd_queue) > 0 { + + // Execute command + command := q.pop_front(&harness.cmd_queue) + intrinsics.atomic_store_explicit(&harness.locked, false, .Release) + if command == nil do return + on_command_received(command) + + spin_count = 0 + intrinsics.atomic_add_explicit(&harness.join_count, 1, .Release) + } else { + defer intrinsics.cpu_relax() + defer intrinsics.atomic_store_explicit(&harness.locked, false, .Release) + spin_count += 1 + if spin_count == executor.spin_limit { + harness.spin = false + break spin_loop + } + } + } else { // If master locked the command queue there will be a new command soon + spin_count = 0 + intrinsics.cpu_relax() + } + } + + // Sleeping + cond_loop: for { // We have to loop because cond_wait can return without signal sometimes + sync.cond_wait(&harness.condition, &harness.mutex) + for { // Loop to acquire harness lock + defer intrinsics.cpu_relax() + if try_lock_harness(&harness.locked) { + defer intrinsics.atomic_store_explicit(&harness.locked, false, .Release) + if q.len(harness.cmd_queue) > 0 { + harness.spin = true + break cond_loop + } else { + continue cond_loop // Spurious wakeup, go back to sleep + } + } + } + } + } + } + + return slave_task +} + +exec_command :: proc(executor: ^Executor($T), command: T) { + defer executor.num_cmds_in_round += 1 + for { + if executor.num_cmds_in_round > 0 { // Avoid spinning multiple locks if we're only using 1 thread + if executor.harness_index == len(executor.harnesses) - 1 { + executor.harness_index = 0 + } else { + executor.harness_index += 1 + } + } + harness := &executor.harnesses[executor.harness_index] + if try_lock_harness(&harness.locked) { + if q.len(harness.cmd_queue) <= executor.cmd_queue_floor { + q.push_back(&harness.cmd_queue, command) + executor.cmd_queue_floor = q.len(harness.cmd_queue) + slave_sleeping := !harness.spin + // Must release lock before signalling to avoid race from slave spurious wakeup + intrinsics.atomic_store_explicit(&harness.locked, false, .Release) + if slave_sleeping { + sync.mutex_lock(&harness.mutex) + sync.cond_signal(&harness.condition) + sync.mutex_unlock(&harness.mutex) + } + break + } + intrinsics.atomic_store_explicit(&harness.locked, false, .Release) + } + } +} + +// Spin check until issued executor commands finish +exec_join :: proc(executor: ^Executor($T)) { + defer executor.num_cmds_in_round = 0 + for { + completed_commands: uint = 0 + for &harness in executor.harnesses { + completed_commands += intrinsics.atomic_load_explicit(&harness.join_count, .Acquire) + } + if completed_commands == executor.num_cmds_in_round { + for &harness in executor.harnesses { + // We know the slave will never access join_count at this time so we don't need to synchronize + harness.join_count = 0 + } + return + } + intrinsics.cpu_relax() + } +} + +// --------------------------------------------------------------------------------------------------------------------- +// ----- Tests --------------------------------------------------------------------- +// --------------------------------------------------------------------------------------------------------------------- +import "core:fmt" +import "core:testing" + +when ODIN_TEST { + @(private = "file") + STRESS_TOTAL_CMDS :: 200_000 + @(private = "file") + STRESS_NUM_THREADS :: 8 + @(private = "file") + STRESS_NUM_ROUNDS :: 100 + @(private = "file") + STRESS_CMDS_PER_ROUND :: STRESS_TOTAL_CMDS / STRESS_NUM_ROUNDS + + @(private = "file") + Stress_Cmd :: union { + Stress_Payload, + } + + @(private = "file") + Stress_Payload :: struct { + exec_counts: ^[STRESS_TOTAL_CMDS]uint, + id: int, + } + + @(private = "file") + stress_handler :: proc(command: Stress_Cmd) { + payload := command.(Stress_Payload) + intrinsics.atomic_add_explicit(&payload.exec_counts[payload.id], 1, .Release) + } + + @(test) + stress_test_executor :: proc(t: ^testing.T) { + exec_counts := new([STRESS_TOTAL_CMDS]uint) + defer free(exec_counts) + + executor: Executor(Stress_Cmd) + init_executor(&executor, STRESS_NUM_THREADS, stress_handler, spin_limit = 500) + + for round in 0 ..< STRESS_NUM_ROUNDS { + base := round * STRESS_CMDS_PER_ROUND + for i in 0 ..< STRESS_CMDS_PER_ROUND { + exec_command(&executor, Stress_Payload{exec_counts = exec_counts, id = base + i}) + } + exec_join(&executor) + } + + missed, duped: int + for i in 0 ..< STRESS_TOTAL_CMDS { + count := exec_counts[i] + if count == 0 do missed += 1 + else if count > 1 do duped += 1 + } + + testing.expect(t, missed == 0, fmt.tprintf("Missed %d / %d commands", missed, STRESS_TOTAL_CMDS)) + testing.expect(t, duped == 0, fmt.tprintf("Duplicated %d / %d commands", duped, STRESS_TOTAL_CMDS)) + + // Explicitly destroy to verify clean shutdown. + // If destroy_executor returns, all threads received the nil sentinel and exited, + // and thread.pool_join completed without deadlock. + destroy_executor(&executor) + testing.expect(t, !executor.initialized, "Executor still marked initialized after destroy") + } +}