phased-executor #4

Merged
zack merged 5 commits from phased-executor into master 2026-04-03 01:53:24 +00:00
2 changed files with 26 additions and 28 deletions
Showing only changes of commit 3fde4bebf9 - Show all commits

View File

@@ -124,6 +124,14 @@ spinlock_unlock :: #force_inline proc "contextless" (lock: ^Spinlock) {
intrinsics.atomic_store_explicit(lock, false, .Release) intrinsics.atomic_store_explicit(lock, false, .Release)
} }
try_lock :: proc {
spinlock_try_lock,
}
unlock :: proc {
spinlock_unlock,
}
// --------------------------------------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------------------------------------
// ----- Tests ------------------------ // ----- Tests ------------------------
// --------------------------------------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------------------------------------

View File

@@ -10,17 +10,19 @@ import "core:sync"
import "core:thread" import "core:thread"
import b "../basic" import b "../basic"
import "../levsync"
DEFT_BATCH_SIZE :: 1024 // Number of nodes in each batch DEFT_BATCH_SIZE :: 1024 // Number of nodes in each batch
DEFT_SPIN_LIMIT :: 2_500_000 DEFT_SPIN_LIMIT :: 2_500_000
Harness :: struct($T: typeid) where intrinsics.type_has_nil(T) { Harness :: struct($T: typeid) where intrinsics.type_has_nil(T) {
mutex: sync.Mutex, mutex: sync.Mutex,
condition: sync.Cond, condition: sync.Cond,
cmd_queue: q.Queue(T), cmd_queue: q.Queue(T),
spin, locked: bool, spin: bool,
_pad: [64 - size_of(uint)]u8, // We want join_count to have its own cache line lock: levsync.Spinlock,
join_count: uint, // Number of commands completed since last exec_join _pad: [64 - size_of(uint)]u8, // We want join_count to have its own cache line
join_count: uint, // Number of commands completed since last exec_join
} }
// `nil` for type `T` is reserved for executor shutdown. If you need `nil` for something else wrap `T` // `nil` for type `T` is reserved for executor shutdown. If you need `nil` for something else wrap `T`
@@ -87,14 +89,14 @@ destroy_executor :: proc(executor: ^Executor($T), allocator := context.allocator
// Exit thread loops // Exit thread loops
for &harness in executor.harnesses { for &harness in executor.harnesses {
for { for {
if try_lock_harness(&harness.locked) { if levsync.try_lock(&harness.lock) {
q.push_back(&harness.cmd_queue, nil) q.push_back(&harness.cmd_queue, nil)
if !harness.spin { if !harness.spin {
sync.mutex_lock(&harness.mutex) sync.mutex_lock(&harness.mutex)
sync.cond_signal(&harness.condition) sync.cond_signal(&harness.condition)
sync.mutex_unlock(&harness.mutex) sync.mutex_unlock(&harness.mutex)
} }
intrinsics.atomic_store_explicit(&harness.locked, false, .Release) levsync.unlock(&harness.lock)
break break
} }
} }
@@ -108,18 +110,6 @@ destroy_executor :: proc(executor: ^Executor($T), allocator := context.allocator
delete(executor.harnesses, allocator) delete(executor.harnesses, allocator)
} }
// Returns true if lock successfuly acquired, false otherwise
try_lock_harness :: #force_inline proc "contextless" (locked: ^bool) -> bool {
was_locked, lock_acquired := intrinsics.atomic_compare_exchange_weak_explicit(
locked,
false,
true,
.Acq_Rel,
.Relaxed,
)
return lock_acquired
}
build_task :: proc( build_task :: proc(
$on_command_received: proc(command: $T), $on_command_received: proc(command: $T),
) -> ( ) -> (
@@ -140,12 +130,12 @@ build_task :: proc(
// Spinning // Spinning
spin_count: uint = 0 spin_count: uint = 0
spin_loop: for { spin_loop: for {
if try_lock_harness(&harness.locked) { if levsync.try_lock(&harness.lock) {
if q.len(harness.cmd_queue) > 0 { if q.len(harness.cmd_queue) > 0 {
// Execute command // Execute command
command := q.pop_front(&harness.cmd_queue) command := q.pop_front(&harness.cmd_queue)
intrinsics.atomic_store_explicit(&harness.locked, false, .Release) levsync.unlock(&harness.lock)
if command == nil do return if command == nil do return
on_command_received(command) on_command_received(command)
@@ -153,7 +143,7 @@ build_task :: proc(
intrinsics.atomic_add_explicit(&harness.join_count, 1, .Release) intrinsics.atomic_add_explicit(&harness.join_count, 1, .Release)
} else { } else {
defer intrinsics.cpu_relax() defer intrinsics.cpu_relax()
defer intrinsics.atomic_store_explicit(&harness.locked, false, .Release) defer levsync.unlock(&harness.lock)
spin_count += 1 spin_count += 1
if spin_count == executor.spin_limit { if spin_count == executor.spin_limit {
harness.spin = false harness.spin = false
@@ -171,8 +161,8 @@ build_task :: proc(
sync.cond_wait(&harness.condition, &harness.mutex) sync.cond_wait(&harness.condition, &harness.mutex)
for { // Loop to acquire harness lock for { // Loop to acquire harness lock
defer intrinsics.cpu_relax() defer intrinsics.cpu_relax()
if try_lock_harness(&harness.locked) { if levsync.try_lock(&harness.lock) {
defer intrinsics.atomic_store_explicit(&harness.locked, false, .Release) defer levsync.unlock(&harness.lock)
if q.len(harness.cmd_queue) > 0 { if q.len(harness.cmd_queue) > 0 {
harness.spin = true harness.spin = true
break cond_loop break cond_loop
@@ -199,13 +189,13 @@ exec_command :: proc(executor: ^Executor($T), command: T) {
} }
} }
harness := &executor.harnesses[executor.harness_index] harness := &executor.harnesses[executor.harness_index]
if try_lock_harness(&harness.locked) { if levsync.try_lock(&harness.lock) {
if q.len(harness.cmd_queue) <= executor.cmd_queue_floor { if q.len(harness.cmd_queue) <= executor.cmd_queue_floor {
q.push_back(&harness.cmd_queue, command) q.push_back(&harness.cmd_queue, command)
executor.cmd_queue_floor = q.len(harness.cmd_queue) executor.cmd_queue_floor = q.len(harness.cmd_queue)
slave_sleeping := !harness.spin slave_sleeping := !harness.spin
// Must release lock before signalling to avoid race from slave spurious wakeup // Must release lock before signalling to avoid race from slave spurious wakeup
intrinsics.atomic_store_explicit(&harness.locked, false, .Release) levsync.unlock(&harness.lock)
if slave_sleeping { if slave_sleeping {
sync.mutex_lock(&harness.mutex) sync.mutex_lock(&harness.mutex)
sync.cond_signal(&harness.condition) sync.cond_signal(&harness.condition)
@@ -213,7 +203,7 @@ exec_command :: proc(executor: ^Executor($T), command: T) {
} }
break break
} }
intrinsics.atomic_store_explicit(&harness.locked, false, .Release) levsync.unlock(&harness.lock)
} }
} }
} }