phased-executor (#4)

Co-authored-by: Zachary Levy <zachary@sunforge.is>
Reviewed-on: #4
This commit was merged in pull request #4.
This commit is contained in:
2026-04-03 01:53:23 +00:00
parent 25fca052f5
commit 59c600d630
5 changed files with 315 additions and 11 deletions

View File

@@ -5,27 +5,37 @@
{
"label": "Test many_bits",
"command": "odin test many_bits -out=out/debug/test_many_bits",
"cwd": "$ZED_WORKTREE_ROOT"
"cwd": "$ZED_WORKTREE_ROOT",
},
{
"label": "Test ring",
"command": "odin test ring -out=out/debug/test_ring",
"cwd": "$ZED_WORKTREE_ROOT"
"cwd": "$ZED_WORKTREE_ROOT",
},
{
"label": "Test levsort",
"command": "odin test levsort -out=out/debug/test_levsort",
"cwd": "$ZED_WORKTREE_ROOT"
"cwd": "$ZED_WORKTREE_ROOT",
},
{
"label": "Test levsync",
"command": "odin test levsync -out=out/debug/test_levsync",
"cwd": "$ZED_WORKTREE_ROOT"
"cwd": "$ZED_WORKTREE_ROOT",
},
{
"label": "Test levmath",
"command": "odin test levmath -out=out/debug/test_levmath",
"cwd": "$ZED_WORKTREE_ROOT"
"cwd": "$ZED_WORKTREE_ROOT",
},
{
"label": "Test phased_executor",
"command": "odin test phased_executor -out=out/debug/test_phased_executor",
"cwd": "$ZED_WORKTREE_ROOT",
},
{
"label": "Test all",
"command": "odin test many_bits -out=out/debug/test_many_bits && odin test ring -out=out/debug/test_ring && odin test levsort -out=out/debug/test_levsort && odin test levsync -out=out/debug/test_levsync && odin test levmath -out=out/debug/test_levmath && odin test phased_executor -out=out/debug/test_phased_executor",
"cwd": "$ZED_WORKTREE_ROOT",
},
// ---------------------------------------------------------------------------------------------------------------------
// ----- LMDB Examples ------------------------
@@ -33,7 +43,7 @@
{
"label": "Run lmdb example",
"command": "odin run vendor/lmdb/examples -debug -out=out/debug/lmdb-examples",
"cwd": "$ZED_WORKTREE_ROOT"
"cwd": "$ZED_WORKTREE_ROOT",
},
// ---------------------------------------------------------------------------------------------------------------------
// ----- Other ------------------------
@@ -41,6 +51,6 @@
{
"label": "Run debug",
"command": "odin run debug -debug -out=out/debug/debug",
"cwd": "$ZED_WORKTREE_ROOT"
}
"cwd": "$ZED_WORKTREE_ROOT",
},
]

View File

@@ -2,7 +2,6 @@ package levmath
import "base:intrinsics"
import "core:math"
import "core:testing"
// ---------------------------------------------------------------------------------------------------------------------
// ----- Fast Exp (Schraudolph IEEE 754 bit trick) ---------------------------------------------------------------------
@@ -77,6 +76,7 @@ fast_exp :: #force_inline proc "contextless" (x: $FLOAT) -> FLOAT where intrinsi
// ---------------------------------------------------------------------------------------------------------------------
// ----- Testing -------------------------------------------------------------------------------------------------------
// ---------------------------------------------------------------------------------------------------------------------
import "core:testing"
@(test)
test_fast_exp_identity :: proc(t: ^testing.T) {

View File

@@ -124,6 +124,14 @@ spinlock_unlock :: #force_inline proc "contextless" (lock: ^Spinlock) {
intrinsics.atomic_store_explicit(lock, false, .Release)
}
try_lock :: proc {
spinlock_try_lock,
}
unlock :: proc {
spinlock_unlock,
}
// ---------------------------------------------------------------------------------------------------------------------
// ----- Tests ------------------------
// ---------------------------------------------------------------------------------------------------------------------

View File

@@ -25,8 +25,8 @@ Bits :: struct {
length: int, // Total number of bits being stored
}
delete :: proc(using bits: Bits, allocator := context.allocator) {
delete_slice(int_array, allocator)
delete :: proc(bits: Bits, allocator := context.allocator) {
delete_slice(bits.int_array, allocator)
}
make :: proc(#any_int length: int, allocator := context.allocator) -> Bits {

View File

@@ -0,0 +1,286 @@
// Executor for fan out/in phases where each phase must enitrely finish before the next begins.
// This executor does not gaurentee strict ordering of commands withing a phase so it is only suited
// to tasks where order within a phase is not critical.
package phased_executor
import "base:intrinsics"
import q "core:container/queue"
import "core:prof/spall"
import "core:sync"
import "core:thread"
import b "../basic"
import "../levsync"
DEFT_BATCH_SIZE :: 1024 // Number of nodes in each batch
DEFT_SPIN_LIMIT :: 2_500_000
Harness :: struct($T: typeid) where intrinsics.type_has_nil(T) {
mutex: sync.Mutex,
condition: sync.Cond,
cmd_queue: q.Queue(T),
spin: bool,
lock: levsync.Spinlock,
_pad: [64 - size_of(uint)]u8, // We want join_count to have its own cache line
join_count: uint, // Number of commands completed since last exec_join
}
// `nil` for type `T` is reserved for executor shutdown. If you need `nil` for something else wrap `T`
// in a union even if there is only a single type.
//
// Executor is not thread safe and can only be used from a single thread at a time.
// Executor can only handle 1 graph at a time.
// To execute multiple graphs at the same time, use multiple executors.
Executor :: struct($T: typeid) where intrinsics.type_has_nil(T) {
harnesses: []Harness(T), // Accessed from slave threads
spin_limit: uint, // Accessed from slave threads
num_cmds_in_round: uint, // Number of commands submitted without join being called
harness_index: int,
cmd_queue_floor: int,
thread_pool: thread.Pool,
initialized: bool,
}
//TODO: Provide a way to set some aspects of context for the executor threads. Namely a logger.
init_executor :: proc(
executor: ^Executor($T),
#any_int num_threads: int,
$on_command_received: proc(command: T),
#any_int spin_limit: uint = DEFT_SPIN_LIMIT,
allocator := context.allocator,
) {
was_initialized, _ := intrinsics.atomic_compare_exchange_strong_explicit(
&executor.initialized,
false,
true,
.Seq_Cst,
.Seq_Cst,
)
assert(!was_initialized, "Executor already initialized.")
slave_task := build_task(on_command_received)
executor.spin_limit = spin_limit
executor.harnesses = make([]Harness(T), num_threads, allocator)
for &harness in executor.harnesses {
q.init(&harness.cmd_queue, allocator = allocator)
harness.spin = true
}
thread.pool_init(&executor.thread_pool, allocator, num_threads)
for i in 0 ..< num_threads {
thread.pool_add_task(&executor.thread_pool, allocator, slave_task, data = executor, user_index = i)
}
thread.pool_start(&executor.thread_pool)
return
}
// Cleanly shuts down all executor tasks then destroys the executor
destroy_executor :: proc(executor: ^Executor($T), allocator := context.allocator) {
was_initialized, _ := intrinsics.atomic_compare_exchange_strong_explicit(
&executor.initialized,
true,
false,
.Seq_Cst,
.Seq_Cst,
)
assert(was_initialized, "Executor not initialized.")
// Exit thread loops
for &harness in executor.harnesses {
for {
if levsync.try_lock(&harness.lock) {
q.push_back(&harness.cmd_queue, nil)
if !harness.spin {
sync.mutex_lock(&harness.mutex)
sync.cond_signal(&harness.condition)
sync.mutex_unlock(&harness.mutex)
}
levsync.unlock(&harness.lock)
break
}
}
}
thread.pool_join(&executor.thread_pool)
thread.pool_destroy(&executor.thread_pool)
for &harness in executor.harnesses {
q.destroy(&harness.cmd_queue)
}
delete(executor.harnesses, allocator)
}
build_task :: proc(
$on_command_received: proc(command: $T),
) -> (
slave_task: proc(task: thread.Task),
) where intrinsics.type_has_nil(T) {
slave_task = proc(task: thread.Task) {
when b.SPALL_TRACE {
spall_data := make([]u8, spall.BUFFER_DEFAULT_SIZE)
spall_buffer = spall.buffer_create(spall_data, u32(sync.current_thread_id()))
defer spall.buffer_destroy(&spall_ctx, &spall_buffer)
}
executor := cast(^Executor(T))task.data
harness := &executor.harnesses[task.user_index]
sync.mutex_lock(&harness.mutex)
for {
defer free_all(context.temp_allocator)
// Spinning
spin_count: uint = 0
spin_loop: for {
if levsync.try_lock(&harness.lock) {
if q.len(harness.cmd_queue) > 0 {
// Execute command
command := q.pop_front(&harness.cmd_queue)
levsync.unlock(&harness.lock)
if command == nil do return
on_command_received(command)
spin_count = 0
intrinsics.atomic_add_explicit(&harness.join_count, 1, .Release)
} else {
defer intrinsics.cpu_relax()
defer levsync.unlock(&harness.lock)
spin_count += 1
if spin_count == executor.spin_limit {
harness.spin = false
break spin_loop
}
}
} else { // If master locked the command queue there will be a new command soon
spin_count = 0
intrinsics.cpu_relax()
}
}
// Sleeping
cond_loop: for { // We have to loop because cond_wait can return without signal sometimes
sync.cond_wait(&harness.condition, &harness.mutex)
for { // Loop to acquire harness lock
defer intrinsics.cpu_relax()
if levsync.try_lock(&harness.lock) {
defer levsync.unlock(&harness.lock)
if q.len(harness.cmd_queue) > 0 {
harness.spin = true
break cond_loop
} else {
continue cond_loop // Spurious wakeup, go back to sleep
}
}
}
}
}
}
return slave_task
}
exec_command :: proc(executor: ^Executor($T), command: T) {
defer executor.num_cmds_in_round += 1
for {
if executor.num_cmds_in_round > 0 { // Avoid spinning multiple locks if we're only using 1 thread
if executor.harness_index == len(executor.harnesses) - 1 {
executor.harness_index = 0
} else {
executor.harness_index += 1
}
}
harness := &executor.harnesses[executor.harness_index]
if levsync.try_lock(&harness.lock) {
if q.len(harness.cmd_queue) <= executor.cmd_queue_floor {
q.push_back(&harness.cmd_queue, command)
executor.cmd_queue_floor = q.len(harness.cmd_queue)
slave_sleeping := !harness.spin
// Must release lock before signalling to avoid race from slave spurious wakeup
levsync.unlock(&harness.lock)
if slave_sleeping {
sync.mutex_lock(&harness.mutex)
sync.cond_signal(&harness.condition)
sync.mutex_unlock(&harness.mutex)
}
break
}
levsync.unlock(&harness.lock)
}
}
}
// Spin check until issued executor commands finish
exec_join :: proc(executor: ^Executor($T)) {
defer executor.num_cmds_in_round = 0
for {
completed_commands: uint = 0
for &harness in executor.harnesses {
completed_commands += intrinsics.atomic_load_explicit(&harness.join_count, .Acquire)
}
if completed_commands == executor.num_cmds_in_round {
for &harness in executor.harnesses {
// We know the slave will never access join_count at this time so we don't need to synchronize
harness.join_count = 0
}
return
}
intrinsics.cpu_relax()
}
}
// ---------------------------------------------------------------------------------------------------------------------
// ----- Tests ---------------------------------------------------------------------
// ---------------------------------------------------------------------------------------------------------------------
import "core:fmt"
import "core:testing"
@(test)
stress_test_executor :: proc(t: ^testing.T) {
STRESS_TOTAL_CMDS :: 200_000
STRESS_NUM_THREADS :: 8
STRESS_NUM_ROUNDS :: 100
STRESS_CMDS_PER_ROUND :: STRESS_TOTAL_CMDS / STRESS_NUM_ROUNDS
Stress_Cmd :: union {
Stress_Payload,
}
Stress_Payload :: struct {
exec_counts: ^[STRESS_TOTAL_CMDS]uint,
id: int,
}
stress_handler :: proc(command: Stress_Cmd) {
payload := command.(Stress_Payload)
intrinsics.atomic_add_explicit(&payload.exec_counts[payload.id], 1, .Release)
}
exec_counts := new([STRESS_TOTAL_CMDS]uint)
defer free(exec_counts)
executor: Executor(Stress_Cmd)
init_executor(&executor, STRESS_NUM_THREADS, stress_handler, spin_limit = 500)
for round in 0 ..< STRESS_NUM_ROUNDS {
base := round * STRESS_CMDS_PER_ROUND
for i in 0 ..< STRESS_CMDS_PER_ROUND {
exec_command(&executor, Stress_Payload{exec_counts = exec_counts, id = base + i})
}
exec_join(&executor)
}
missed, duped: int
for i in 0 ..< STRESS_TOTAL_CMDS {
count := exec_counts[i]
if count == 0 do missed += 1
else if count > 1 do duped += 1
}
testing.expect(t, missed == 0, fmt.tprintf("Missed %d / %d commands", missed, STRESS_TOTAL_CMDS))
testing.expect(t, duped == 0, fmt.tprintf("Duplicated %d / %d commands", duped, STRESS_TOTAL_CMDS))
// Explicitly destroy to verify clean shutdown.
// If destroy_executor returns, all threads received the nil sentinel and exited,
// and thread.pool_join completed without deadlock.
destroy_executor(&executor)
testing.expect(t, !executor.initialized, "Executor still marked initialized after destroy")
}