Improved consistency with naming of init / create / destroy and when to propagate allocation errors and #18
@@ -4,7 +4,8 @@
|
||||
package phased_executor
|
||||
|
||||
import "base:intrinsics"
|
||||
import q "core:container/queue"
|
||||
import "base:runtime"
|
||||
import que "core:container/queue"
|
||||
import "core:prof/spall"
|
||||
import "core:sync"
|
||||
import "core:thread"
|
||||
@@ -18,7 +19,7 @@ DEFT_SPIN_LIMIT :: 2_500_000
|
||||
Harness :: struct($T: typeid) where intrinsics.type_has_nil(T) {
|
||||
mutex: sync.Mutex,
|
||||
condition: sync.Cond,
|
||||
cmd_queue: q.Queue(T),
|
||||
cmd_queue: que.Queue(T),
|
||||
spin: bool,
|
||||
lock: levsync.Spinlock,
|
||||
_pad: [64 - size_of(uint)]u8, // We want join_count to have its own cache line
|
||||
@@ -42,13 +43,13 @@ Executor :: struct($T: typeid) where intrinsics.type_has_nil(T) {
|
||||
}
|
||||
|
||||
//TODO: Provide a way to set some aspects of context for the executor threads. Namely a logger.
|
||||
init_executor :: proc(
|
||||
init :: proc(
|
||||
executor: ^Executor($T),
|
||||
#any_int num_threads: int,
|
||||
$on_command_received: proc(command: T),
|
||||
#any_int spin_limit: uint = DEFT_SPIN_LIMIT,
|
||||
allocator := context.allocator,
|
||||
) {
|
||||
) -> runtime.Allocator_Error {
|
||||
was_initialized, _ := intrinsics.atomic_compare_exchange_strong_explicit(
|
||||
&executor.initialized,
|
||||
false,
|
||||
@@ -60,9 +61,9 @@ init_executor :: proc(
|
||||
|
||||
slave_task := build_task(on_command_received)
|
||||
executor.spin_limit = spin_limit
|
||||
executor.harnesses = make([]Harness(T), num_threads, allocator)
|
||||
executor.harnesses = make([]Harness(T), num_threads, allocator) or_return
|
||||
for &harness in executor.harnesses {
|
||||
q.init(&harness.cmd_queue, allocator = allocator)
|
||||
que.init(&harness.cmd_queue, allocator = allocator) or_return
|
||||
harness.spin = true
|
||||
}
|
||||
|
||||
@@ -72,11 +73,11 @@ init_executor :: proc(
|
||||
}
|
||||
thread.pool_start(&executor.thread_pool)
|
||||
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
// Cleanly shuts down all executor tasks then destroys the executor
|
||||
destroy_executor :: proc(executor: ^Executor($T), allocator := context.allocator) {
|
||||
destroy :: proc(executor: ^Executor($T), allocator := context.allocator) -> runtime.Allocator_Error {
|
||||
was_initialized, _ := intrinsics.atomic_compare_exchange_strong_explicit(
|
||||
&executor.initialized,
|
||||
true,
|
||||
@@ -90,7 +91,7 @@ destroy_executor :: proc(executor: ^Executor($T), allocator := context.allocator
|
||||
for &harness in executor.harnesses {
|
||||
for {
|
||||
if levsync.try_lock(&harness.lock) {
|
||||
q.push_back(&harness.cmd_queue, nil)
|
||||
que.push_back(&harness.cmd_queue, nil)
|
||||
if !harness.spin {
|
||||
sync.mutex_lock(&harness.mutex)
|
||||
sync.cond_signal(&harness.condition)
|
||||
@@ -105,9 +106,11 @@ destroy_executor :: proc(executor: ^Executor($T), allocator := context.allocator
|
||||
thread.pool_join(&executor.thread_pool)
|
||||
thread.pool_destroy(&executor.thread_pool)
|
||||
for &harness in executor.harnesses {
|
||||
q.destroy(&harness.cmd_queue)
|
||||
que.destroy(&harness.cmd_queue)
|
||||
}
|
||||
delete(executor.harnesses, allocator)
|
||||
delete(executor.harnesses, allocator) or_return
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
build_task :: proc(
|
||||
@@ -131,10 +134,10 @@ build_task :: proc(
|
||||
spin_count: uint = 0
|
||||
spin_loop: for {
|
||||
if levsync.try_lock(&harness.lock) {
|
||||
if q.len(harness.cmd_queue) > 0 {
|
||||
if que.len(harness.cmd_queue) > 0 {
|
||||
|
||||
// Execute command
|
||||
command := q.pop_front(&harness.cmd_queue)
|
||||
command := que.pop_front(&harness.cmd_queue)
|
||||
levsync.unlock(&harness.lock)
|
||||
if command == nil do return
|
||||
on_command_received(command)
|
||||
@@ -163,7 +166,7 @@ build_task :: proc(
|
||||
defer intrinsics.cpu_relax()
|
||||
if levsync.try_lock(&harness.lock) {
|
||||
defer levsync.unlock(&harness.lock)
|
||||
if q.len(harness.cmd_queue) > 0 {
|
||||
if que.len(harness.cmd_queue) > 0 {
|
||||
harness.spin = true
|
||||
break cond_loop
|
||||
} else {
|
||||
@@ -190,9 +193,9 @@ exec_command :: proc(executor: ^Executor($T), command: T) {
|
||||
}
|
||||
harness := &executor.harnesses[executor.harness_index]
|
||||
if levsync.try_lock(&harness.lock) {
|
||||
if q.len(harness.cmd_queue) <= executor.cmd_queue_floor {
|
||||
q.push_back(&harness.cmd_queue, command)
|
||||
executor.cmd_queue_floor = q.len(harness.cmd_queue)
|
||||
if que.len(harness.cmd_queue) <= executor.cmd_queue_floor {
|
||||
que.push_back(&harness.cmd_queue, command)
|
||||
executor.cmd_queue_floor = que.len(harness.cmd_queue)
|
||||
slave_sleeping := !harness.spin
|
||||
// Must release lock before signalling to avoid race from slave spurious wakeup
|
||||
levsync.unlock(&harness.lock)
|
||||
@@ -258,7 +261,7 @@ stress_test_executor :: proc(t: ^testing.T) {
|
||||
defer free(exec_counts)
|
||||
|
||||
executor: Executor(Stress_Cmd)
|
||||
init_executor(&executor, STRESS_NUM_THREADS, stress_handler, spin_limit = 500)
|
||||
init(&executor, STRESS_NUM_THREADS, stress_handler, spin_limit = 500)
|
||||
|
||||
for round in 0 ..< STRESS_NUM_ROUNDS {
|
||||
base := round * STRESS_CMDS_PER_ROUND
|
||||
@@ -281,6 +284,6 @@ stress_test_executor :: proc(t: ^testing.T) {
|
||||
// Explicitly destroy to verify clean shutdown.
|
||||
// If destroy_executor returns, all threads received the nil sentinel and exited,
|
||||
// and thread.pool_join completed without deadlock.
|
||||
destroy_executor(&executor)
|
||||
destroy(&executor)
|
||||
testing.expect(t, !executor.initialized, "Executor still marked initialized after destroy")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user