From a0552febf66f0aba5fa388ab6cee03a45b06bdcc Mon Sep 17 00:00:00 2001 From: Zachary Levy Date: Fri, 24 Apr 2026 14:30:13 -0700 Subject: [PATCH] Phased executor naming consistency --- phased_executor/phased_executor.odin | 41 +++++++++++++++------------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/phased_executor/phased_executor.odin b/phased_executor/phased_executor.odin index cfed63b..6061e70 100644 --- a/phased_executor/phased_executor.odin +++ b/phased_executor/phased_executor.odin @@ -4,7 +4,8 @@ package phased_executor import "base:intrinsics" -import q "core:container/queue" +import "base:runtime" +import que "core:container/queue" import "core:prof/spall" import "core:sync" import "core:thread" @@ -18,7 +19,7 @@ DEFT_SPIN_LIMIT :: 2_500_000 Harness :: struct($T: typeid) where intrinsics.type_has_nil(T) { mutex: sync.Mutex, condition: sync.Cond, - cmd_queue: q.Queue(T), + cmd_queue: que.Queue(T), spin: bool, lock: levsync.Spinlock, _pad: [64 - size_of(uint)]u8, // We want join_count to have its own cache line @@ -42,13 +43,13 @@ Executor :: struct($T: typeid) where intrinsics.type_has_nil(T) { } //TODO: Provide a way to set some aspects of context for the executor threads. Namely a logger. -init_executor :: proc( +init :: proc( executor: ^Executor($T), #any_int num_threads: int, $on_command_received: proc(command: T), #any_int spin_limit: uint = DEFT_SPIN_LIMIT, allocator := context.allocator, -) { +) -> runtime.Allocator_Error { was_initialized, _ := intrinsics.atomic_compare_exchange_strong_explicit( &executor.initialized, false, @@ -60,9 +61,9 @@ init_executor :: proc( slave_task := build_task(on_command_received) executor.spin_limit = spin_limit - executor.harnesses = make([]Harness(T), num_threads, allocator) + executor.harnesses = make([]Harness(T), num_threads, allocator) or_return for &harness in executor.harnesses { - q.init(&harness.cmd_queue, allocator = allocator) + que.init(&harness.cmd_queue, allocator = allocator) or_return harness.spin = true } @@ -72,11 +73,11 @@ init_executor :: proc( } thread.pool_start(&executor.thread_pool) - return + return nil } // Cleanly shuts down all executor tasks then destroys the executor -destroy_executor :: proc(executor: ^Executor($T), allocator := context.allocator) { +destroy :: proc(executor: ^Executor($T), allocator := context.allocator) -> runtime.Allocator_Error { was_initialized, _ := intrinsics.atomic_compare_exchange_strong_explicit( &executor.initialized, true, @@ -90,7 +91,7 @@ destroy_executor :: proc(executor: ^Executor($T), allocator := context.allocator for &harness in executor.harnesses { for { if levsync.try_lock(&harness.lock) { - q.push_back(&harness.cmd_queue, nil) + que.push_back(&harness.cmd_queue, nil) if !harness.spin { sync.mutex_lock(&harness.mutex) sync.cond_signal(&harness.condition) @@ -105,9 +106,11 @@ destroy_executor :: proc(executor: ^Executor($T), allocator := context.allocator thread.pool_join(&executor.thread_pool) thread.pool_destroy(&executor.thread_pool) for &harness in executor.harnesses { - q.destroy(&harness.cmd_queue) + que.destroy(&harness.cmd_queue) } - delete(executor.harnesses, allocator) + delete(executor.harnesses, allocator) or_return + + return nil } build_task :: proc( @@ -131,10 +134,10 @@ build_task :: proc( spin_count: uint = 0 spin_loop: for { if levsync.try_lock(&harness.lock) { - if q.len(harness.cmd_queue) > 0 { + if que.len(harness.cmd_queue) > 0 { // Execute command - command := q.pop_front(&harness.cmd_queue) + command := que.pop_front(&harness.cmd_queue) levsync.unlock(&harness.lock) if command == nil do return on_command_received(command) @@ -163,7 +166,7 @@ build_task :: proc( defer intrinsics.cpu_relax() if levsync.try_lock(&harness.lock) { defer levsync.unlock(&harness.lock) - if q.len(harness.cmd_queue) > 0 { + if que.len(harness.cmd_queue) > 0 { harness.spin = true break cond_loop } else { @@ -190,9 +193,9 @@ exec_command :: proc(executor: ^Executor($T), command: T) { } harness := &executor.harnesses[executor.harness_index] if levsync.try_lock(&harness.lock) { - if q.len(harness.cmd_queue) <= executor.cmd_queue_floor { - q.push_back(&harness.cmd_queue, command) - executor.cmd_queue_floor = q.len(harness.cmd_queue) + if que.len(harness.cmd_queue) <= executor.cmd_queue_floor { + que.push_back(&harness.cmd_queue, command) + executor.cmd_queue_floor = que.len(harness.cmd_queue) slave_sleeping := !harness.spin // Must release lock before signalling to avoid race from slave spurious wakeup levsync.unlock(&harness.lock) @@ -258,7 +261,7 @@ stress_test_executor :: proc(t: ^testing.T) { defer free(exec_counts) executor: Executor(Stress_Cmd) - init_executor(&executor, STRESS_NUM_THREADS, stress_handler, spin_limit = 500) + init(&executor, STRESS_NUM_THREADS, stress_handler, spin_limit = 500) for round in 0 ..< STRESS_NUM_ROUNDS { base := round * STRESS_CMDS_PER_ROUND @@ -281,6 +284,6 @@ stress_test_executor :: proc(t: ^testing.T) { // Explicitly destroy to verify clean shutdown. // If destroy_executor returns, all threads received the nil sentinel and exited, // and thread.pool_join completed without deadlock. - destroy_executor(&executor) + destroy(&executor) testing.expect(t, !executor.initialized, "Executor still marked initialized after destroy") }