// Executor for fan out/in phases where each phase must enitrely finish before the next begins. // This executor does not gaurentee strict ordering of commands withing a phase so it is only suited // to tasks where order within a phase is not critical. package phased_executor import "base:intrinsics" import q "core:container/queue" import "core:prof/spall" import "core:sync" import "core:thread" import b "../basic" DEFT_BATCH_SIZE :: 1024 // Number of nodes in each batch DEFT_SPIN_LIMIT :: 2_500_000 Harness :: struct($T: typeid) where intrinsics.type_has_nil(T) { mutex: sync.Mutex, condition: sync.Cond, cmd_queue: q.Queue(T), spin, locked: bool, _pad: [64 - size_of(uint)]u8, // We want join_count to have its own cache line join_count: uint, // Number of commands completed since last exec_join } // `nil` for type `T` is reserved for executor shutdown. If you need `nil` for something else wrap `T` // in a union even if there is only a single type. // // Executor is not thread safe and can only be used from a single thread at a time. // Executor can only handle 1 graph at a time. // To execute multiple graphs at the same time, use multiple executors. Executor :: struct($T: typeid) where intrinsics.type_has_nil(T) { harnesses: []Harness(T), // Accessed from slave threads spin_limit: uint, // Accessed from slave threads num_cmds_in_round: uint, // Number of commands submitted without join being called harness_index: int, cmd_queue_floor: int, thread_pool: thread.Pool, initialized: bool, } //TODO: Provide a way to set some aspects of context for the executor threads. Namely a logger. init_executor :: proc( executor: ^Executor($T), #any_int num_threads: int, $on_command_received: proc(command: T), #any_int spin_limit: uint = DEFT_SPIN_LIMIT, allocator := context.allocator, ) { was_initialized, _ := intrinsics.atomic_compare_exchange_strong_explicit( &executor.initialized, false, true, .Seq_Cst, .Seq_Cst, ) assert(!was_initialized, "Executor already initialized.") slave_task := build_task(on_command_received) executor.spin_limit = spin_limit executor.harnesses = make([]Harness(T), num_threads, allocator) for &harness in executor.harnesses { q.init(&harness.cmd_queue, allocator = allocator) harness.spin = true } thread.pool_init(&executor.thread_pool, allocator, num_threads) for i in 0 ..< num_threads { thread.pool_add_task(&executor.thread_pool, allocator, slave_task, data = executor, user_index = i) } thread.pool_start(&executor.thread_pool) return } // Cleanly shuts down all executor tasks then destroys the executor destroy_executor :: proc(executor: ^Executor($T), allocator := context.allocator) { was_initialized, _ := intrinsics.atomic_compare_exchange_strong_explicit( &executor.initialized, true, false, .Seq_Cst, .Seq_Cst, ) assert(was_initialized, "Executor not initialized.") // Exit thread loops for &harness in executor.harnesses { for { if try_lock_harness(&harness.locked) { q.push_back(&harness.cmd_queue, nil) if !harness.spin { sync.mutex_lock(&harness.mutex) sync.cond_signal(&harness.condition) sync.mutex_unlock(&harness.mutex) } intrinsics.atomic_store_explicit(&harness.locked, false, .Release) break } } } thread.pool_join(&executor.thread_pool) thread.pool_destroy(&executor.thread_pool) for &harness in executor.harnesses { q.destroy(&harness.cmd_queue) } delete(executor.harnesses, allocator) } // Returns true if lock successfuly acquired, false otherwise try_lock_harness :: #force_inline proc "contextless" (locked: ^bool) -> bool { was_locked, lock_acquired := intrinsics.atomic_compare_exchange_weak_explicit( locked, false, true, .Acq_Rel, .Relaxed, ) return lock_acquired } build_task :: proc( $on_command_received: proc(command: $T), ) -> ( slave_task: proc(task: thread.Task), ) where intrinsics.type_has_nil(T) { slave_task = proc(task: thread.Task) { when b.SPALL_TRACE { spall_data := make([]u8, spall.BUFFER_DEFAULT_SIZE) spall_buffer = spall.buffer_create(spall_data, u32(sync.current_thread_id())) defer spall.buffer_destroy(&spall_ctx, &spall_buffer) } executor := cast(^Executor(T))task.data harness := &executor.harnesses[task.user_index] sync.mutex_lock(&harness.mutex) for { defer free_all(context.temp_allocator) // Spinning spin_count: uint = 0 spin_loop: for { if try_lock_harness(&harness.locked) { if q.len(harness.cmd_queue) > 0 { // Execute command command := q.pop_front(&harness.cmd_queue) intrinsics.atomic_store_explicit(&harness.locked, false, .Release) if command == nil do return on_command_received(command) spin_count = 0 intrinsics.atomic_add_explicit(&harness.join_count, 1, .Release) } else { defer intrinsics.cpu_relax() defer intrinsics.atomic_store_explicit(&harness.locked, false, .Release) spin_count += 1 if spin_count == executor.spin_limit { harness.spin = false break spin_loop } } } else { // If master locked the command queue there will be a new command soon spin_count = 0 intrinsics.cpu_relax() } } // Sleeping cond_loop: for { // We have to loop because cond_wait can return without signal sometimes sync.cond_wait(&harness.condition, &harness.mutex) for { // Loop to acquire harness lock defer intrinsics.cpu_relax() if try_lock_harness(&harness.locked) { defer intrinsics.atomic_store_explicit(&harness.locked, false, .Release) if q.len(harness.cmd_queue) > 0 { harness.spin = true break cond_loop } else { continue cond_loop // Spurious wakeup, go back to sleep } } } } } } return slave_task } exec_command :: proc(executor: ^Executor($T), command: T) { defer executor.num_cmds_in_round += 1 for { if executor.num_cmds_in_round > 0 { // Avoid spinning multiple locks if we're only using 1 thread if executor.harness_index == len(executor.harnesses) - 1 { executor.harness_index = 0 } else { executor.harness_index += 1 } } harness := &executor.harnesses[executor.harness_index] if try_lock_harness(&harness.locked) { if q.len(harness.cmd_queue) <= executor.cmd_queue_floor { q.push_back(&harness.cmd_queue, command) executor.cmd_queue_floor = q.len(harness.cmd_queue) slave_sleeping := !harness.spin // Must release lock before signalling to avoid race from slave spurious wakeup intrinsics.atomic_store_explicit(&harness.locked, false, .Release) if slave_sleeping { sync.mutex_lock(&harness.mutex) sync.cond_signal(&harness.condition) sync.mutex_unlock(&harness.mutex) } break } intrinsics.atomic_store_explicit(&harness.locked, false, .Release) } } } // Spin check until issued executor commands finish exec_join :: proc(executor: ^Executor($T)) { defer executor.num_cmds_in_round = 0 for { completed_commands: uint = 0 for &harness in executor.harnesses { completed_commands += intrinsics.atomic_load_explicit(&harness.join_count, .Acquire) } if completed_commands == executor.num_cmds_in_round { for &harness in executor.harnesses { // We know the slave will never access join_count at this time so we don't need to synchronize harness.join_count = 0 } return } intrinsics.cpu_relax() } } // --------------------------------------------------------------------------------------------------------------------- // ----- Tests --------------------------------------------------------------------- // --------------------------------------------------------------------------------------------------------------------- import "core:fmt" import "core:testing" @(test) stress_test_executor :: proc(t: ^testing.T) { STRESS_TOTAL_CMDS :: 200_000 STRESS_NUM_THREADS :: 8 STRESS_NUM_ROUNDS :: 100 STRESS_CMDS_PER_ROUND :: STRESS_TOTAL_CMDS / STRESS_NUM_ROUNDS Stress_Cmd :: union { Stress_Payload, } Stress_Payload :: struct { exec_counts: ^[STRESS_TOTAL_CMDS]uint, id: int, } stress_handler :: proc(command: Stress_Cmd) { payload := command.(Stress_Payload) intrinsics.atomic_add_explicit(&payload.exec_counts[payload.id], 1, .Release) } exec_counts := new([STRESS_TOTAL_CMDS]uint) defer free(exec_counts) executor: Executor(Stress_Cmd) init_executor(&executor, STRESS_NUM_THREADS, stress_handler, spin_limit = 500) for round in 0 ..< STRESS_NUM_ROUNDS { base := round * STRESS_CMDS_PER_ROUND for i in 0 ..< STRESS_CMDS_PER_ROUND { exec_command(&executor, Stress_Payload{exec_counts = exec_counts, id = base + i}) } exec_join(&executor) } missed, duped: int for i in 0 ..< STRESS_TOTAL_CMDS { count := exec_counts[i] if count == 0 do missed += 1 else if count > 1 do duped += 1 } testing.expect(t, missed == 0, fmt.tprintf("Missed %d / %d commands", missed, STRESS_TOTAL_CMDS)) testing.expect(t, duped == 0, fmt.tprintf("Duplicated %d / %d commands", duped, STRESS_TOTAL_CMDS)) // Explicitly destroy to verify clean shutdown. // If destroy_executor returns, all threads received the nil sentinel and exited, // and thread.pool_join completed without deadlock. destroy_executor(&executor) testing.expect(t, !executor.initialized, "Executor still marked initialized after destroy") }