From 5559ed2e0b63103e042a1366c21e24f6aa756d63 Mon Sep 17 00:00:00 2001 From: Zachary Levy Date: Thu, 2 Apr 2026 17:42:36 -0700 Subject: [PATCH 1/5] Added phased executor --- .zed/tasks.json | 5 + levmath/levmath.odin | 2 +- phased_executor/phased_executor.odin | 305 +++++++++++++++++++++++++++ 3 files changed, 311 insertions(+), 1 deletion(-) create mode 100644 phased_executor/phased_executor.odin diff --git a/.zed/tasks.json b/.zed/tasks.json index d49b77d..c24534c 100644 --- a/.zed/tasks.json +++ b/.zed/tasks.json @@ -27,6 +27,11 @@ "command": "odin test levmath -out=out/debug/test_levmath", "cwd": "$ZED_WORKTREE_ROOT" }, + { + "label": "Test phased_executor", + "command": "odin test phased_executor -out=out/debug/test_phased_executor", + "cwd": "$ZED_WORKTREE_ROOT" + }, // --------------------------------------------------------------------------------------------------------------------- // ----- LMDB Examples ------------------------ // --------------------------------------------------------------------------------------------------------------------- diff --git a/levmath/levmath.odin b/levmath/levmath.odin index b9ddaa6..86ee0f9 100644 --- a/levmath/levmath.odin +++ b/levmath/levmath.odin @@ -2,7 +2,6 @@ package levmath import "base:intrinsics" import "core:math" -import "core:testing" // --------------------------------------------------------------------------------------------------------------------- // ----- Fast Exp (Schraudolph IEEE 754 bit trick) --------------------------------------------------------------------- @@ -77,6 +76,7 @@ fast_exp :: #force_inline proc "contextless" (x: $FLOAT) -> FLOAT where intrinsi // --------------------------------------------------------------------------------------------------------------------- // ----- Testing ------------------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------------------------------- +import "core:testing" @(test) test_fast_exp_identity :: proc(t: ^testing.T) { diff --git a/phased_executor/phased_executor.odin b/phased_executor/phased_executor.odin new file mode 100644 index 0000000..0c88c09 --- /dev/null +++ b/phased_executor/phased_executor.odin @@ -0,0 +1,305 @@ +// Executor for fan out/in phases where each phase must enitrely finish before the next begins. +// This executor does not gaurentee strict ordering of commands withing a phase so it is only suited +// to tasks where order within a phase is not critical. +package phased_executor + +import "base:intrinsics" +import q "core:container/queue" +import "core:prof/spall" +import "core:sync" +import "core:thread" + +import b "../basic" + +DEFT_BATCH_SIZE :: 1024 // Number of nodes in each batch +DEFT_SPIN_LIMIT :: 2_500_000 + +Harness :: struct($T: typeid) where intrinsics.type_has_nil(T) { + mutex: sync.Mutex, + condition: sync.Cond, + cmd_queue: q.Queue(T), + spin, locked: bool, + _pad: [64 - size_of(uint)]u8, // We want join_count to have its own cache line + join_count: uint, // Number of commands completed since last exec_join +} + +// `nil` for type `T` is reserved for executor shutdown. If you need `nil` for something else wrap `T` +// in a union even if there is only a single type. +// +// Executor is not thread safe and can only be used from a single thread at a time. +// Executor can only handle 1 graph at a time. +// To execute multiple graphs at the same time, use multiple executors. +Executor :: struct($T: typeid) where intrinsics.type_has_nil(T) { + harnesses: []Harness(T), // Accessed from slave threads + spin_limit: uint, // Accessed from slave threads + num_cmds_in_round: uint, // Number of commands submitted without join being called + harness_index: int, + cmd_queue_floor: int, + thread_pool: thread.Pool, + initialized: bool, +} + +//TODO: Provide a way to set some aspects of context for the executor threads. Namely a logger. +init_executor :: proc( + executor: ^Executor($T), + #any_int num_threads: int, + $on_command_received: proc(command: T), + #any_int spin_limit: uint = DEFT_SPIN_LIMIT, + allocator := context.allocator, +) { + was_initialized, _ := intrinsics.atomic_compare_exchange_strong_explicit( + &executor.initialized, + false, + true, + .Seq_Cst, + .Seq_Cst, + ) + assert(!was_initialized, "Executor already initialized.") + + slave_task := build_task(on_command_received) + executor.spin_limit = spin_limit + executor.harnesses = make([]Harness(T), num_threads, allocator) + for &harness in executor.harnesses { + q.init(&harness.cmd_queue, allocator = allocator) + harness.spin = true + } + + thread.pool_init(&executor.thread_pool, allocator, num_threads) + for i in 0 ..< num_threads { + thread.pool_add_task(&executor.thread_pool, allocator, slave_task, data = executor, user_index = i) + } + thread.pool_start(&executor.thread_pool) + + return +} + +// Cleanly shuts down all executor tasks then destroys the executor +destroy_executor :: proc(executor: ^Executor($T), allocator := context.allocator) { + was_initialized, _ := intrinsics.atomic_compare_exchange_strong_explicit( + &executor.initialized, + true, + false, + .Seq_Cst, + .Seq_Cst, + ) + assert(was_initialized, "Executor not initialized.") + + // Exit thread loops + for &harness in executor.harnesses { + for { + if try_lock_harness(&harness.locked) { + q.push_back(&harness.cmd_queue, nil) + if !harness.spin { + sync.mutex_lock(&harness.mutex) + sync.cond_signal(&harness.condition) + sync.mutex_unlock(&harness.mutex) + } + intrinsics.atomic_store_explicit(&harness.locked, false, .Release) + break + } + } + } + + thread.pool_join(&executor.thread_pool) + thread.pool_destroy(&executor.thread_pool) + for &harness in executor.harnesses { + q.destroy(&harness.cmd_queue) + } + delete(executor.harnesses, allocator) +} + +// Returns true if lock successfuly acquired, false otherwise +try_lock_harness :: #force_inline proc "contextless" (locked: ^bool) -> bool { + was_locked, lock_acquired := intrinsics.atomic_compare_exchange_weak_explicit( + locked, + false, + true, + .Acq_Rel, + .Relaxed, + ) + return lock_acquired +} + +build_task :: proc( + $on_command_received: proc(command: $T), +) -> ( + slave_task: proc(task: thread.Task), +) where intrinsics.type_has_nil(T) { + slave_task = proc(task: thread.Task) { + when b.SPALL_TRACE { + spall_data := make([]u8, spall.BUFFER_DEFAULT_SIZE) + spall_buffer = spall.buffer_create(spall_data, u32(sync.current_thread_id())) + defer spall.buffer_destroy(&spall_ctx, &spall_buffer) + } + + executor := cast(^Executor(T))task.data + harness := &executor.harnesses[task.user_index] + sync.mutex_lock(&harness.mutex) + for { + defer free_all(context.temp_allocator) + // Spinning + spin_count: uint = 0 + spin_loop: for { + if try_lock_harness(&harness.locked) { + if q.len(harness.cmd_queue) > 0 { + + // Execute command + command := q.pop_front(&harness.cmd_queue) + intrinsics.atomic_store_explicit(&harness.locked, false, .Release) + if command == nil do return + on_command_received(command) + + spin_count = 0 + intrinsics.atomic_add_explicit(&harness.join_count, 1, .Release) + } else { + defer intrinsics.cpu_relax() + defer intrinsics.atomic_store_explicit(&harness.locked, false, .Release) + spin_count += 1 + if spin_count == executor.spin_limit { + harness.spin = false + break spin_loop + } + } + } else { // If master locked the command queue there will be a new command soon + spin_count = 0 + intrinsics.cpu_relax() + } + } + + // Sleeping + cond_loop: for { // We have to loop because cond_wait can return without signal sometimes + sync.cond_wait(&harness.condition, &harness.mutex) + for { // Loop to acquire harness lock + defer intrinsics.cpu_relax() + if try_lock_harness(&harness.locked) { + defer intrinsics.atomic_store_explicit(&harness.locked, false, .Release) + if q.len(harness.cmd_queue) > 0 { + harness.spin = true + break cond_loop + } else { + continue cond_loop // Spurious wakeup, go back to sleep + } + } + } + } + } + } + + return slave_task +} + +exec_command :: proc(executor: ^Executor($T), command: T) { + defer executor.num_cmds_in_round += 1 + for { + if executor.num_cmds_in_round > 0 { // Avoid spinning multiple locks if we're only using 1 thread + if executor.harness_index == len(executor.harnesses) - 1 { + executor.harness_index = 0 + } else { + executor.harness_index += 1 + } + } + harness := &executor.harnesses[executor.harness_index] + if try_lock_harness(&harness.locked) { + if q.len(harness.cmd_queue) <= executor.cmd_queue_floor { + q.push_back(&harness.cmd_queue, command) + executor.cmd_queue_floor = q.len(harness.cmd_queue) + slave_sleeping := !harness.spin + // Must release lock before signalling to avoid race from slave spurious wakeup + intrinsics.atomic_store_explicit(&harness.locked, false, .Release) + if slave_sleeping { + sync.mutex_lock(&harness.mutex) + sync.cond_signal(&harness.condition) + sync.mutex_unlock(&harness.mutex) + } + break + } + intrinsics.atomic_store_explicit(&harness.locked, false, .Release) + } + } +} + +// Spin check until issued executor commands finish +exec_join :: proc(executor: ^Executor($T)) { + defer executor.num_cmds_in_round = 0 + for { + completed_commands: uint = 0 + for &harness in executor.harnesses { + completed_commands += intrinsics.atomic_load_explicit(&harness.join_count, .Acquire) + } + if completed_commands == executor.num_cmds_in_round { + for &harness in executor.harnesses { + // We know the slave will never access join_count at this time so we don't need to synchronize + harness.join_count = 0 + } + return + } + intrinsics.cpu_relax() + } +} + +// --------------------------------------------------------------------------------------------------------------------- +// ----- Tests --------------------------------------------------------------------- +// --------------------------------------------------------------------------------------------------------------------- +import "core:fmt" +import "core:testing" + +when ODIN_TEST { + @(private = "file") + STRESS_TOTAL_CMDS :: 200_000 + @(private = "file") + STRESS_NUM_THREADS :: 8 + @(private = "file") + STRESS_NUM_ROUNDS :: 100 + @(private = "file") + STRESS_CMDS_PER_ROUND :: STRESS_TOTAL_CMDS / STRESS_NUM_ROUNDS + + @(private = "file") + Stress_Cmd :: union { + Stress_Payload, + } + + @(private = "file") + Stress_Payload :: struct { + exec_counts: ^[STRESS_TOTAL_CMDS]uint, + id: int, + } + + @(private = "file") + stress_handler :: proc(command: Stress_Cmd) { + payload := command.(Stress_Payload) + intrinsics.atomic_add_explicit(&payload.exec_counts[payload.id], 1, .Release) + } + + @(test) + stress_test_executor :: proc(t: ^testing.T) { + exec_counts := new([STRESS_TOTAL_CMDS]uint) + defer free(exec_counts) + + executor: Executor(Stress_Cmd) + init_executor(&executor, STRESS_NUM_THREADS, stress_handler, spin_limit = 500) + + for round in 0 ..< STRESS_NUM_ROUNDS { + base := round * STRESS_CMDS_PER_ROUND + for i in 0 ..< STRESS_CMDS_PER_ROUND { + exec_command(&executor, Stress_Payload{exec_counts = exec_counts, id = base + i}) + } + exec_join(&executor) + } + + missed, duped: int + for i in 0 ..< STRESS_TOTAL_CMDS { + count := exec_counts[i] + if count == 0 do missed += 1 + else if count > 1 do duped += 1 + } + + testing.expect(t, missed == 0, fmt.tprintf("Missed %d / %d commands", missed, STRESS_TOTAL_CMDS)) + testing.expect(t, duped == 0, fmt.tprintf("Duplicated %d / %d commands", duped, STRESS_TOTAL_CMDS)) + + // Explicitly destroy to verify clean shutdown. + // If destroy_executor returns, all threads received the nil sentinel and exited, + // and thread.pool_join completed without deadlock. + destroy_executor(&executor) + testing.expect(t, !executor.initialized, "Executor still marked initialized after destroy") + } +} -- 2.43.0 From fa3fee52f695ea6d0ff7f3c36601b616635a646e Mon Sep 17 00:00:00 2001 From: Zachary Levy Date: Thu, 2 Apr 2026 18:24:38 -0700 Subject: [PATCH 2/5] Added test all task --- .zed/tasks.json | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/.zed/tasks.json b/.zed/tasks.json index c24534c..8fc5867 100644 --- a/.zed/tasks.json +++ b/.zed/tasks.json @@ -5,32 +5,37 @@ { "label": "Test many_bits", "command": "odin test many_bits -out=out/debug/test_many_bits", - "cwd": "$ZED_WORKTREE_ROOT" + "cwd": "$ZED_WORKTREE_ROOT", }, { "label": "Test ring", "command": "odin test ring -out=out/debug/test_ring", - "cwd": "$ZED_WORKTREE_ROOT" + "cwd": "$ZED_WORKTREE_ROOT", }, { "label": "Test levsort", "command": "odin test levsort -out=out/debug/test_levsort", - "cwd": "$ZED_WORKTREE_ROOT" + "cwd": "$ZED_WORKTREE_ROOT", }, { "label": "Test levsync", "command": "odin test levsync -out=out/debug/test_levsync", - "cwd": "$ZED_WORKTREE_ROOT" + "cwd": "$ZED_WORKTREE_ROOT", }, { "label": "Test levmath", "command": "odin test levmath -out=out/debug/test_levmath", - "cwd": "$ZED_WORKTREE_ROOT" + "cwd": "$ZED_WORKTREE_ROOT", }, { "label": "Test phased_executor", "command": "odin test phased_executor -out=out/debug/test_phased_executor", - "cwd": "$ZED_WORKTREE_ROOT" + "cwd": "$ZED_WORKTREE_ROOT", + }, + { + "label": "Test all", + "command": "odin test many_bits -out=out/debug/test_many_bits && odin test ring -out=out/debug/test_ring && odin test levsort -out=out/debug/test_levsort && odin test levsync -out=out/debug/test_levsync && odin test levmath -out=out/debug/test_levmath && odin test phased_executor -out=out/debug/test_phased_executor", + "cwd": "$ZED_WORKTREE_ROOT", }, // --------------------------------------------------------------------------------------------------------------------- // ----- LMDB Examples ------------------------ @@ -38,7 +43,7 @@ { "label": "Run lmdb example", "command": "odin run vendor/lmdb/examples -debug -out=out/debug/lmdb-examples", - "cwd": "$ZED_WORKTREE_ROOT" + "cwd": "$ZED_WORKTREE_ROOT", }, // --------------------------------------------------------------------------------------------------------------------- // ----- Other ------------------------ @@ -46,6 +51,6 @@ { "label": "Run debug", "command": "odin run debug -debug -out=out/debug/debug", - "cwd": "$ZED_WORKTREE_ROOT" - } + "cwd": "$ZED_WORKTREE_ROOT", + }, ] -- 2.43.0 From ec54afebb2874adb06eb2a9e3b52cdde8c063de7 Mon Sep 17 00:00:00 2001 From: Zachary Levy Date: Thu, 2 Apr 2026 18:26:06 -0700 Subject: [PATCH 3/5] Removed using statement from many_bits --- many_bits/many_bits.odin | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/many_bits/many_bits.odin b/many_bits/many_bits.odin index 0ad1e0d..659cb03 100644 --- a/many_bits/many_bits.odin +++ b/many_bits/many_bits.odin @@ -25,8 +25,8 @@ Bits :: struct { length: int, // Total number of bits being stored } -delete :: proc(using bits: Bits, allocator := context.allocator) { - delete_slice(int_array, allocator) +delete :: proc(bits: Bits, allocator := context.allocator) { + delete_slice(bits.int_array, allocator) } make :: proc(#any_int length: int, allocator := context.allocator) -> Bits { -- 2.43.0 From fd3cd1b6e6e58f2dcac8efdd22e6c19f20780323 Mon Sep 17 00:00:00 2001 From: Zachary Levy Date: Thu, 2 Apr 2026 18:30:12 -0700 Subject: [PATCH 4/5] Cleaned up phased_executor test --- phased_executor/phased_executor.odin | 63 ++++++++++++---------------- 1 file changed, 27 insertions(+), 36 deletions(-) diff --git a/phased_executor/phased_executor.odin b/phased_executor/phased_executor.odin index 0c88c09..f52b148 100644 --- a/phased_executor/phased_executor.odin +++ b/phased_executor/phased_executor.odin @@ -243,63 +243,54 @@ exec_join :: proc(executor: ^Executor($T)) { import "core:fmt" import "core:testing" -when ODIN_TEST { - @(private = "file") +@(test) +stress_test_executor :: proc(t: ^testing.T) { STRESS_TOTAL_CMDS :: 200_000 - @(private = "file") STRESS_NUM_THREADS :: 8 - @(private = "file") STRESS_NUM_ROUNDS :: 100 - @(private = "file") STRESS_CMDS_PER_ROUND :: STRESS_TOTAL_CMDS / STRESS_NUM_ROUNDS - @(private = "file") Stress_Cmd :: union { Stress_Payload, } - @(private = "file") Stress_Payload :: struct { exec_counts: ^[STRESS_TOTAL_CMDS]uint, id: int, } - @(private = "file") stress_handler :: proc(command: Stress_Cmd) { payload := command.(Stress_Payload) intrinsics.atomic_add_explicit(&payload.exec_counts[payload.id], 1, .Release) } - @(test) - stress_test_executor :: proc(t: ^testing.T) { - exec_counts := new([STRESS_TOTAL_CMDS]uint) - defer free(exec_counts) + exec_counts := new([STRESS_TOTAL_CMDS]uint) + defer free(exec_counts) - executor: Executor(Stress_Cmd) - init_executor(&executor, STRESS_NUM_THREADS, stress_handler, spin_limit = 500) + executor: Executor(Stress_Cmd) + init_executor(&executor, STRESS_NUM_THREADS, stress_handler, spin_limit = 500) - for round in 0 ..< STRESS_NUM_ROUNDS { - base := round * STRESS_CMDS_PER_ROUND - for i in 0 ..< STRESS_CMDS_PER_ROUND { - exec_command(&executor, Stress_Payload{exec_counts = exec_counts, id = base + i}) - } - exec_join(&executor) + for round in 0 ..< STRESS_NUM_ROUNDS { + base := round * STRESS_CMDS_PER_ROUND + for i in 0 ..< STRESS_CMDS_PER_ROUND { + exec_command(&executor, Stress_Payload{exec_counts = exec_counts, id = base + i}) } - - missed, duped: int - for i in 0 ..< STRESS_TOTAL_CMDS { - count := exec_counts[i] - if count == 0 do missed += 1 - else if count > 1 do duped += 1 - } - - testing.expect(t, missed == 0, fmt.tprintf("Missed %d / %d commands", missed, STRESS_TOTAL_CMDS)) - testing.expect(t, duped == 0, fmt.tprintf("Duplicated %d / %d commands", duped, STRESS_TOTAL_CMDS)) - - // Explicitly destroy to verify clean shutdown. - // If destroy_executor returns, all threads received the nil sentinel and exited, - // and thread.pool_join completed without deadlock. - destroy_executor(&executor) - testing.expect(t, !executor.initialized, "Executor still marked initialized after destroy") + exec_join(&executor) } + + missed, duped: int + for i in 0 ..< STRESS_TOTAL_CMDS { + count := exec_counts[i] + if count == 0 do missed += 1 + else if count > 1 do duped += 1 + } + + testing.expect(t, missed == 0, fmt.tprintf("Missed %d / %d commands", missed, STRESS_TOTAL_CMDS)) + testing.expect(t, duped == 0, fmt.tprintf("Duplicated %d / %d commands", duped, STRESS_TOTAL_CMDS)) + + // Explicitly destroy to verify clean shutdown. + // If destroy_executor returns, all threads received the nil sentinel and exited, + // and thread.pool_join completed without deadlock. + destroy_executor(&executor) + testing.expect(t, !executor.initialized, "Executor still marked initialized after destroy") } -- 2.43.0 From 3fde4bebf922b1f509bc43f404314f8031662c56 Mon Sep 17 00:00:00 2001 From: Zachary Levy Date: Thu, 2 Apr 2026 18:51:52 -0700 Subject: [PATCH 5/5] Switch phased_executor to spinlock type instead of raw bool --- levsync/levsync.odin | 8 +++++ phased_executor/phased_executor.odin | 46 +++++++++++----------------- 2 files changed, 26 insertions(+), 28 deletions(-) diff --git a/levsync/levsync.odin b/levsync/levsync.odin index 318b10c..e9bbd94 100644 --- a/levsync/levsync.odin +++ b/levsync/levsync.odin @@ -124,6 +124,14 @@ spinlock_unlock :: #force_inline proc "contextless" (lock: ^Spinlock) { intrinsics.atomic_store_explicit(lock, false, .Release) } +try_lock :: proc { + spinlock_try_lock, +} + +unlock :: proc { + spinlock_unlock, +} + // --------------------------------------------------------------------------------------------------------------------- // ----- Tests ------------------------ // --------------------------------------------------------------------------------------------------------------------- diff --git a/phased_executor/phased_executor.odin b/phased_executor/phased_executor.odin index f52b148..cfed63b 100644 --- a/phased_executor/phased_executor.odin +++ b/phased_executor/phased_executor.odin @@ -10,17 +10,19 @@ import "core:sync" import "core:thread" import b "../basic" +import "../levsync" DEFT_BATCH_SIZE :: 1024 // Number of nodes in each batch DEFT_SPIN_LIMIT :: 2_500_000 Harness :: struct($T: typeid) where intrinsics.type_has_nil(T) { - mutex: sync.Mutex, - condition: sync.Cond, - cmd_queue: q.Queue(T), - spin, locked: bool, - _pad: [64 - size_of(uint)]u8, // We want join_count to have its own cache line - join_count: uint, // Number of commands completed since last exec_join + mutex: sync.Mutex, + condition: sync.Cond, + cmd_queue: q.Queue(T), + spin: bool, + lock: levsync.Spinlock, + _pad: [64 - size_of(uint)]u8, // We want join_count to have its own cache line + join_count: uint, // Number of commands completed since last exec_join } // `nil` for type `T` is reserved for executor shutdown. If you need `nil` for something else wrap `T` @@ -87,14 +89,14 @@ destroy_executor :: proc(executor: ^Executor($T), allocator := context.allocator // Exit thread loops for &harness in executor.harnesses { for { - if try_lock_harness(&harness.locked) { + if levsync.try_lock(&harness.lock) { q.push_back(&harness.cmd_queue, nil) if !harness.spin { sync.mutex_lock(&harness.mutex) sync.cond_signal(&harness.condition) sync.mutex_unlock(&harness.mutex) } - intrinsics.atomic_store_explicit(&harness.locked, false, .Release) + levsync.unlock(&harness.lock) break } } @@ -108,18 +110,6 @@ destroy_executor :: proc(executor: ^Executor($T), allocator := context.allocator delete(executor.harnesses, allocator) } -// Returns true if lock successfuly acquired, false otherwise -try_lock_harness :: #force_inline proc "contextless" (locked: ^bool) -> bool { - was_locked, lock_acquired := intrinsics.atomic_compare_exchange_weak_explicit( - locked, - false, - true, - .Acq_Rel, - .Relaxed, - ) - return lock_acquired -} - build_task :: proc( $on_command_received: proc(command: $T), ) -> ( @@ -140,12 +130,12 @@ build_task :: proc( // Spinning spin_count: uint = 0 spin_loop: for { - if try_lock_harness(&harness.locked) { + if levsync.try_lock(&harness.lock) { if q.len(harness.cmd_queue) > 0 { // Execute command command := q.pop_front(&harness.cmd_queue) - intrinsics.atomic_store_explicit(&harness.locked, false, .Release) + levsync.unlock(&harness.lock) if command == nil do return on_command_received(command) @@ -153,7 +143,7 @@ build_task :: proc( intrinsics.atomic_add_explicit(&harness.join_count, 1, .Release) } else { defer intrinsics.cpu_relax() - defer intrinsics.atomic_store_explicit(&harness.locked, false, .Release) + defer levsync.unlock(&harness.lock) spin_count += 1 if spin_count == executor.spin_limit { harness.spin = false @@ -171,8 +161,8 @@ build_task :: proc( sync.cond_wait(&harness.condition, &harness.mutex) for { // Loop to acquire harness lock defer intrinsics.cpu_relax() - if try_lock_harness(&harness.locked) { - defer intrinsics.atomic_store_explicit(&harness.locked, false, .Release) + if levsync.try_lock(&harness.lock) { + defer levsync.unlock(&harness.lock) if q.len(harness.cmd_queue) > 0 { harness.spin = true break cond_loop @@ -199,13 +189,13 @@ exec_command :: proc(executor: ^Executor($T), command: T) { } } harness := &executor.harnesses[executor.harness_index] - if try_lock_harness(&harness.locked) { + if levsync.try_lock(&harness.lock) { if q.len(harness.cmd_queue) <= executor.cmd_queue_floor { q.push_back(&harness.cmd_queue, command) executor.cmd_queue_floor = q.len(harness.cmd_queue) slave_sleeping := !harness.spin // Must release lock before signalling to avoid race from slave spurious wakeup - intrinsics.atomic_store_explicit(&harness.locked, false, .Release) + levsync.unlock(&harness.lock) if slave_sleeping { sync.mutex_lock(&harness.mutex) sync.cond_signal(&harness.condition) @@ -213,7 +203,7 @@ exec_command :: proc(executor: ^Executor($T), command: T) { } break } - intrinsics.atomic_store_explicit(&harness.locked, false, .Release) + levsync.unlock(&harness.lock) } } } -- 2.43.0