phased-executor #4
@@ -27,6 +27,11 @@
|
||||
"command": "odin test levmath -out=out/debug/test_levmath",
|
||||
"cwd": "$ZED_WORKTREE_ROOT"
|
||||
},
|
||||
{
|
||||
"label": "Test phased_executor",
|
||||
"command": "odin test phased_executor -out=out/debug/test_phased_executor",
|
||||
"cwd": "$ZED_WORKTREE_ROOT"
|
||||
},
|
||||
// ---------------------------------------------------------------------------------------------------------------------
|
||||
// ----- LMDB Examples ------------------------
|
||||
// ---------------------------------------------------------------------------------------------------------------------
|
||||
|
||||
@@ -2,7 +2,6 @@ package levmath
|
||||
|
||||
import "base:intrinsics"
|
||||
import "core:math"
|
||||
import "core:testing"
|
||||
|
||||
// ---------------------------------------------------------------------------------------------------------------------
|
||||
// ----- Fast Exp (Schraudolph IEEE 754 bit trick) ---------------------------------------------------------------------
|
||||
@@ -77,6 +76,7 @@ fast_exp :: #force_inline proc "contextless" (x: $FLOAT) -> FLOAT where intrinsi
|
||||
// ---------------------------------------------------------------------------------------------------------------------
|
||||
// ----- Testing -------------------------------------------------------------------------------------------------------
|
||||
// ---------------------------------------------------------------------------------------------------------------------
|
||||
import "core:testing"
|
||||
|
||||
@(test)
|
||||
test_fast_exp_identity :: proc(t: ^testing.T) {
|
||||
|
||||
305
phased_executor/phased_executor.odin
Normal file
305
phased_executor/phased_executor.odin
Normal file
@@ -0,0 +1,305 @@
|
||||
// Executor for fan out/in phases where each phase must enitrely finish before the next begins.
|
||||
// This executor does not gaurentee strict ordering of commands withing a phase so it is only suited
|
||||
// to tasks where order within a phase is not critical.
|
||||
package phased_executor
|
||||
|
||||
import "base:intrinsics"
|
||||
import q "core:container/queue"
|
||||
import "core:prof/spall"
|
||||
import "core:sync"
|
||||
import "core:thread"
|
||||
|
||||
import b "../basic"
|
||||
|
||||
DEFT_BATCH_SIZE :: 1024 // Number of nodes in each batch
|
||||
DEFT_SPIN_LIMIT :: 2_500_000
|
||||
|
||||
Harness :: struct($T: typeid) where intrinsics.type_has_nil(T) {
|
||||
mutex: sync.Mutex,
|
||||
condition: sync.Cond,
|
||||
cmd_queue: q.Queue(T),
|
||||
spin, locked: bool,
|
||||
_pad: [64 - size_of(uint)]u8, // We want join_count to have its own cache line
|
||||
join_count: uint, // Number of commands completed since last exec_join
|
||||
}
|
||||
|
||||
// `nil` for type `T` is reserved for executor shutdown. If you need `nil` for something else wrap `T`
|
||||
// in a union even if there is only a single type.
|
||||
//
|
||||
// Executor is not thread safe and can only be used from a single thread at a time.
|
||||
// Executor can only handle 1 graph at a time.
|
||||
// To execute multiple graphs at the same time, use multiple executors.
|
||||
Executor :: struct($T: typeid) where intrinsics.type_has_nil(T) {
|
||||
harnesses: []Harness(T), // Accessed from slave threads
|
||||
spin_limit: uint, // Accessed from slave threads
|
||||
num_cmds_in_round: uint, // Number of commands submitted without join being called
|
||||
harness_index: int,
|
||||
cmd_queue_floor: int,
|
||||
thread_pool: thread.Pool,
|
||||
initialized: bool,
|
||||
}
|
||||
|
||||
//TODO: Provide a way to set some aspects of context for the executor threads. Namely a logger.
|
||||
init_executor :: proc(
|
||||
executor: ^Executor($T),
|
||||
#any_int num_threads: int,
|
||||
$on_command_received: proc(command: T),
|
||||
#any_int spin_limit: uint = DEFT_SPIN_LIMIT,
|
||||
allocator := context.allocator,
|
||||
) {
|
||||
was_initialized, _ := intrinsics.atomic_compare_exchange_strong_explicit(
|
||||
&executor.initialized,
|
||||
false,
|
||||
true,
|
||||
.Seq_Cst,
|
||||
.Seq_Cst,
|
||||
)
|
||||
assert(!was_initialized, "Executor already initialized.")
|
||||
|
||||
slave_task := build_task(on_command_received)
|
||||
executor.spin_limit = spin_limit
|
||||
executor.harnesses = make([]Harness(T), num_threads, allocator)
|
||||
for &harness in executor.harnesses {
|
||||
q.init(&harness.cmd_queue, allocator = allocator)
|
||||
harness.spin = true
|
||||
}
|
||||
|
||||
thread.pool_init(&executor.thread_pool, allocator, num_threads)
|
||||
for i in 0 ..< num_threads {
|
||||
thread.pool_add_task(&executor.thread_pool, allocator, slave_task, data = executor, user_index = i)
|
||||
}
|
||||
thread.pool_start(&executor.thread_pool)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Cleanly shuts down all executor tasks then destroys the executor
|
||||
destroy_executor :: proc(executor: ^Executor($T), allocator := context.allocator) {
|
||||
was_initialized, _ := intrinsics.atomic_compare_exchange_strong_explicit(
|
||||
&executor.initialized,
|
||||
true,
|
||||
false,
|
||||
.Seq_Cst,
|
||||
.Seq_Cst,
|
||||
)
|
||||
assert(was_initialized, "Executor not initialized.")
|
||||
|
||||
// Exit thread loops
|
||||
for &harness in executor.harnesses {
|
||||
for {
|
||||
if try_lock_harness(&harness.locked) {
|
||||
q.push_back(&harness.cmd_queue, nil)
|
||||
if !harness.spin {
|
||||
sync.mutex_lock(&harness.mutex)
|
||||
sync.cond_signal(&harness.condition)
|
||||
sync.mutex_unlock(&harness.mutex)
|
||||
}
|
||||
intrinsics.atomic_store_explicit(&harness.locked, false, .Release)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
thread.pool_join(&executor.thread_pool)
|
||||
thread.pool_destroy(&executor.thread_pool)
|
||||
for &harness in executor.harnesses {
|
||||
q.destroy(&harness.cmd_queue)
|
||||
}
|
||||
delete(executor.harnesses, allocator)
|
||||
}
|
||||
|
||||
// Returns true if lock successfuly acquired, false otherwise
|
||||
try_lock_harness :: #force_inline proc "contextless" (locked: ^bool) -> bool {
|
||||
was_locked, lock_acquired := intrinsics.atomic_compare_exchange_weak_explicit(
|
||||
locked,
|
||||
false,
|
||||
true,
|
||||
.Acq_Rel,
|
||||
.Relaxed,
|
||||
)
|
||||
return lock_acquired
|
||||
}
|
||||
|
||||
build_task :: proc(
|
||||
$on_command_received: proc(command: $T),
|
||||
) -> (
|
||||
slave_task: proc(task: thread.Task),
|
||||
) where intrinsics.type_has_nil(T) {
|
||||
slave_task = proc(task: thread.Task) {
|
||||
when b.SPALL_TRACE {
|
||||
spall_data := make([]u8, spall.BUFFER_DEFAULT_SIZE)
|
||||
spall_buffer = spall.buffer_create(spall_data, u32(sync.current_thread_id()))
|
||||
defer spall.buffer_destroy(&spall_ctx, &spall_buffer)
|
||||
}
|
||||
|
||||
executor := cast(^Executor(T))task.data
|
||||
harness := &executor.harnesses[task.user_index]
|
||||
sync.mutex_lock(&harness.mutex)
|
||||
for {
|
||||
defer free_all(context.temp_allocator)
|
||||
// Spinning
|
||||
spin_count: uint = 0
|
||||
spin_loop: for {
|
||||
if try_lock_harness(&harness.locked) {
|
||||
if q.len(harness.cmd_queue) > 0 {
|
||||
|
||||
// Execute command
|
||||
command := q.pop_front(&harness.cmd_queue)
|
||||
intrinsics.atomic_store_explicit(&harness.locked, false, .Release)
|
||||
if command == nil do return
|
||||
on_command_received(command)
|
||||
|
||||
spin_count = 0
|
||||
intrinsics.atomic_add_explicit(&harness.join_count, 1, .Release)
|
||||
} else {
|
||||
defer intrinsics.cpu_relax()
|
||||
defer intrinsics.atomic_store_explicit(&harness.locked, false, .Release)
|
||||
spin_count += 1
|
||||
if spin_count == executor.spin_limit {
|
||||
harness.spin = false
|
||||
break spin_loop
|
||||
}
|
||||
}
|
||||
} else { // If master locked the command queue there will be a new command soon
|
||||
spin_count = 0
|
||||
intrinsics.cpu_relax()
|
||||
}
|
||||
}
|
||||
|
||||
// Sleeping
|
||||
cond_loop: for { // We have to loop because cond_wait can return without signal sometimes
|
||||
sync.cond_wait(&harness.condition, &harness.mutex)
|
||||
for { // Loop to acquire harness lock
|
||||
defer intrinsics.cpu_relax()
|
||||
if try_lock_harness(&harness.locked) {
|
||||
defer intrinsics.atomic_store_explicit(&harness.locked, false, .Release)
|
||||
if q.len(harness.cmd_queue) > 0 {
|
||||
harness.spin = true
|
||||
break cond_loop
|
||||
} else {
|
||||
continue cond_loop // Spurious wakeup, go back to sleep
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return slave_task
|
||||
}
|
||||
|
||||
exec_command :: proc(executor: ^Executor($T), command: T) {
|
||||
defer executor.num_cmds_in_round += 1
|
||||
for {
|
||||
if executor.num_cmds_in_round > 0 { // Avoid spinning multiple locks if we're only using 1 thread
|
||||
if executor.harness_index == len(executor.harnesses) - 1 {
|
||||
executor.harness_index = 0
|
||||
} else {
|
||||
executor.harness_index += 1
|
||||
}
|
||||
}
|
||||
harness := &executor.harnesses[executor.harness_index]
|
||||
if try_lock_harness(&harness.locked) {
|
||||
if q.len(harness.cmd_queue) <= executor.cmd_queue_floor {
|
||||
q.push_back(&harness.cmd_queue, command)
|
||||
executor.cmd_queue_floor = q.len(harness.cmd_queue)
|
||||
slave_sleeping := !harness.spin
|
||||
// Must release lock before signalling to avoid race from slave spurious wakeup
|
||||
intrinsics.atomic_store_explicit(&harness.locked, false, .Release)
|
||||
if slave_sleeping {
|
||||
sync.mutex_lock(&harness.mutex)
|
||||
sync.cond_signal(&harness.condition)
|
||||
sync.mutex_unlock(&harness.mutex)
|
||||
}
|
||||
break
|
||||
}
|
||||
intrinsics.atomic_store_explicit(&harness.locked, false, .Release)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Spin check until issued executor commands finish
|
||||
exec_join :: proc(executor: ^Executor($T)) {
|
||||
defer executor.num_cmds_in_round = 0
|
||||
for {
|
||||
completed_commands: uint = 0
|
||||
for &harness in executor.harnesses {
|
||||
completed_commands += intrinsics.atomic_load_explicit(&harness.join_count, .Acquire)
|
||||
}
|
||||
if completed_commands == executor.num_cmds_in_round {
|
||||
for &harness in executor.harnesses {
|
||||
// We know the slave will never access join_count at this time so we don't need to synchronize
|
||||
harness.join_count = 0
|
||||
}
|
||||
return
|
||||
}
|
||||
intrinsics.cpu_relax()
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------------------------------------------------
|
||||
// ----- Tests ---------------------------------------------------------------------
|
||||
// ---------------------------------------------------------------------------------------------------------------------
|
||||
import "core:fmt"
|
||||
import "core:testing"
|
||||
|
||||
when ODIN_TEST {
|
||||
@(private = "file")
|
||||
STRESS_TOTAL_CMDS :: 200_000
|
||||
@(private = "file")
|
||||
STRESS_NUM_THREADS :: 8
|
||||
@(private = "file")
|
||||
STRESS_NUM_ROUNDS :: 100
|
||||
@(private = "file")
|
||||
STRESS_CMDS_PER_ROUND :: STRESS_TOTAL_CMDS / STRESS_NUM_ROUNDS
|
||||
|
||||
@(private = "file")
|
||||
Stress_Cmd :: union {
|
||||
Stress_Payload,
|
||||
}
|
||||
|
||||
@(private = "file")
|
||||
Stress_Payload :: struct {
|
||||
exec_counts: ^[STRESS_TOTAL_CMDS]uint,
|
||||
id: int,
|
||||
}
|
||||
|
||||
@(private = "file")
|
||||
stress_handler :: proc(command: Stress_Cmd) {
|
||||
payload := command.(Stress_Payload)
|
||||
intrinsics.atomic_add_explicit(&payload.exec_counts[payload.id], 1, .Release)
|
||||
}
|
||||
|
||||
@(test)
|
||||
stress_test_executor :: proc(t: ^testing.T) {
|
||||
exec_counts := new([STRESS_TOTAL_CMDS]uint)
|
||||
defer free(exec_counts)
|
||||
|
||||
executor: Executor(Stress_Cmd)
|
||||
init_executor(&executor, STRESS_NUM_THREADS, stress_handler, spin_limit = 500)
|
||||
|
||||
for round in 0 ..< STRESS_NUM_ROUNDS {
|
||||
base := round * STRESS_CMDS_PER_ROUND
|
||||
for i in 0 ..< STRESS_CMDS_PER_ROUND {
|
||||
exec_command(&executor, Stress_Payload{exec_counts = exec_counts, id = base + i})
|
||||
}
|
||||
exec_join(&executor)
|
||||
}
|
||||
|
||||
missed, duped: int
|
||||
for i in 0 ..< STRESS_TOTAL_CMDS {
|
||||
count := exec_counts[i]
|
||||
if count == 0 do missed += 1
|
||||
else if count > 1 do duped += 1
|
||||
}
|
||||
|
||||
testing.expect(t, missed == 0, fmt.tprintf("Missed %d / %d commands", missed, STRESS_TOTAL_CMDS))
|
||||
testing.expect(t, duped == 0, fmt.tprintf("Duplicated %d / %d commands", duped, STRESS_TOTAL_CMDS))
|
||||
|
||||
// Explicitly destroy to verify clean shutdown.
|
||||
// If destroy_executor returns, all threads received the nil sentinel and exited,
|
||||
// and thread.pool_join completed without deadlock.
|
||||
destroy_executor(&executor)
|
||||
testing.expect(t, !executor.initialized, "Executor still marked initialized after destroy")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user