Files
levlib/levsync/levsync.odin
Zachary Levy 59c600d630 phased-executor (#4)
Co-authored-by: Zachary Levy <zachary@sunforge.is>
Reviewed-on: #4
2026-04-03 01:53:23 +00:00

534 lines
15 KiB
Odin

package levsync
import "base:intrinsics"
// ---------------------------------------------------------------------------------------------------------------------
// ----- Atomic Float Ops ------------------------
// ---------------------------------------------------------------------------------------------------------------------
@(private)
Flop :: enum {
Add,
Subtract,
Multiply,
Divide,
}
// Returns the value at `dst` that was atomically replaced by the result of the operation.
@(private)
atomic_float_op_cas :: #force_inline proc "contextless" (
dst: ^$FLOAT,
val: FLOAT,
$OP: Flop,
$ORDER: intrinsics.Atomic_Memory_Order,
) -> FLOAT where intrinsics.type_is_float(FLOAT) {
when FLOAT == f16 {
dst_i := cast(^u16)(dst)
} else when FLOAT == f32 {
dst_i := cast(^u32)(dst)
} else when FLOAT == f64 {
dst_i := cast(^u64)(dst)
} else {
#panic("atomic_float_op only supports f16, f32, and f64")
}
for {
old_f := intrinsics.atomic_load_explicit(dst, .Relaxed)
when OP == .Add {
new_f := old_f + val
} else when OP == .Subtract {
new_f := old_f - val
} else when OP == .Multiply {
new_f := old_f * val
} else when OP == .Divide {
new_f := old_f / val
} else {
#panic("Flop support not yet added for operation. This should never happen.")
}
when FLOAT == f16 {
old_i := transmute(u16)old_f
new_i := transmute(u16)new_f
} else when FLOAT == f32 {
old_i := transmute(u32)old_f
new_i := transmute(u32)new_f
} else when FLOAT == f64 {
old_i := transmute(u64)old_f
new_i := transmute(u64)new_f
}
// Setting order of compare exchange success alone guarentees overall order of the flop.
_, ok := intrinsics.atomic_compare_exchange_weak_explicit(dst_i, old_i, new_i, ORDER, .Relaxed)
if ok do return old_f
}
}
// Returns the value at `dst` that was atomically replaced by the result of the operation.
atomic_add_float :: #force_inline proc "contextless" (
dst: ^$FLOAT,
val: FLOAT,
$ORDER: intrinsics.Atomic_Memory_Order,
) -> FLOAT where intrinsics.type_is_float(FLOAT) {
return atomic_float_op_cas(dst, val, .Add, ORDER)
}
// Returns the value at `dst` that was atomically replaced by the result of the operation.
atomic_sub_float :: #force_inline proc "contextless" (
dst: ^$FLOAT,
val: FLOAT,
$ORDER: intrinsics.Atomic_Memory_Order,
) -> FLOAT where intrinsics.type_is_float(FLOAT) {
return atomic_float_op_cas(dst, val, .Subtract, ORDER)
}
// Returns the value at `dst` that was atomically replaced by the result of the operation.
atomic_mul_float :: #force_inline proc "contextless" (
dst: ^$FLOAT,
val: FLOAT,
$ORDER: intrinsics.Atomic_Memory_Order,
) -> FLOAT where intrinsics.type_is_float(FLOAT) {
return atomic_float_op_cas(dst, val, .Multiply, ORDER)
}
// Returns the value at `dst` that was atomically replaced by the result of the operation.
atomic_div_float :: #force_inline proc "contextless" (
dst: ^$FLOAT,
val: FLOAT,
$ORDER: intrinsics.Atomic_Memory_Order,
) -> FLOAT where intrinsics.type_is_float(FLOAT) {
return atomic_float_op_cas(dst, val, .Divide, ORDER)
}
// ---------------------------------------------------------------------------------------------------------------------
// ----- Spinlock ------------------------
// ---------------------------------------------------------------------------------------------------------------------
// Spinlock for when you need manual control over trying to acquire. If you always loop try_lock then
// Odin `core:sync -> Ticket_Mutex' is probably a better fit.
Spinlock :: distinct bool
// Returns true if lock successfuly acquired, false otherwise
spinlock_try_lock :: #force_inline proc "contextless" (lock: ^Spinlock) -> bool {
was_locked, lock_acquired := intrinsics.atomic_compare_exchange_weak_explicit(
lock,
false,
true,
.Acq_Rel,
.Relaxed,
)
return lock_acquired
}
spinlock_unlock :: #force_inline proc "contextless" (lock: ^Spinlock) {
intrinsics.atomic_store_explicit(lock, false, .Release)
}
try_lock :: proc {
spinlock_try_lock,
}
unlock :: proc {
spinlock_unlock,
}
// ---------------------------------------------------------------------------------------------------------------------
// ----- Tests ------------------------
// ---------------------------------------------------------------------------------------------------------------------
import "core:sync"
import "core:testing"
import "core:thread"
@(test)
test_concurrent_atomic_add_no_lost_updates :: proc(t: ^testing.T) {
// Multiple threads will each add 1.0 this many times.
// If any updates are lost due to race conditions, the final sum will be wrong.
NUM_THREADS :: 8
ITERATIONS_PER_THREAD :: 10_000
shared_value: f64 = 0.0
barrier: sync.Barrier
sync.barrier_init(&barrier, NUM_THREADS)
Thread_Data :: struct {
value: ^f64,
barrier: ^sync.Barrier,
}
thread_proc :: proc(th: ^thread.Thread) {
ctx := cast(^Thread_Data)th.data
// Wait for all threads to be ready before starting
sync.barrier_wait(ctx.barrier)
for _ in 0 ..< ITERATIONS_PER_THREAD {
atomic_add_float(ctx.value, 1.0, .Relaxed)
}
}
thread_data := Thread_Data{&shared_value, &barrier}
threads: [NUM_THREADS]^thread.Thread
for &th in threads {
th = thread.create(thread_proc)
th.data = &thread_data
}
for th in threads {
thread.start(th)
}
for th in threads {
thread.join(th)
thread.destroy(th)
}
expected := f64(NUM_THREADS * ITERATIONS_PER_THREAD)
testing.expect_value(t, shared_value, expected)
}
@(test)
test_concurrent_atomic_sub_no_lost_updates :: proc(t: ^testing.T) {
// Start with a known value, multiple threads subtract.
// If any updates are lost due to race conditions, the final result will be wrong.
NUM_THREADS :: 8
ITERATIONS_PER_THREAD :: 10_000
shared_value: f64 = f64(NUM_THREADS * ITERATIONS_PER_THREAD)
barrier: sync.Barrier
sync.barrier_init(&barrier, NUM_THREADS)
Thread_Data :: struct {
value: ^f64,
barrier: ^sync.Barrier,
}
thread_proc :: proc(th: ^thread.Thread) {
ctx := cast(^Thread_Data)th.data
// Wait for all threads to be ready before starting
sync.barrier_wait(ctx.barrier)
for _ in 0 ..< ITERATIONS_PER_THREAD {
atomic_sub_float(ctx.value, 1.0, .Relaxed)
}
}
thread_data := Thread_Data{&shared_value, &barrier}
threads: [NUM_THREADS]^thread.Thread
for &th in threads {
th = thread.create(thread_proc)
th.data = &thread_data
}
for th in threads {
thread.start(th)
}
for th in threads {
thread.join(th)
thread.destroy(th)
}
testing.expect_value(t, shared_value, 0.0)
}
@(test)
test_concurrent_atomic_mul_div_round_trip :: proc(t: ^testing.T) {
// Each thread multiplies by 2.0 then divides by 2.0.
// Since these are inverses, the final value should equal the starting value
// regardless of how operations interleave.
NUM_THREADS :: 8
ITERATIONS_PER_THREAD :: 10_000
shared_value: f64 = 1000.0 // Start with a value that won't underflow/overflow
barrier: sync.Barrier
sync.barrier_init(&barrier, NUM_THREADS)
Thread_Data :: struct {
value: ^f64,
barrier: ^sync.Barrier,
}
thread_proc :: proc(th: ^thread.Thread) {
ctx := cast(^Thread_Data)th.data
// Wait for all threads to be ready before starting
sync.barrier_wait(ctx.barrier)
for _ in 0 ..< ITERATIONS_PER_THREAD {
atomic_mul_float(ctx.value, 2.0, .Relaxed)
atomic_div_float(ctx.value, 2.0, .Relaxed)
}
}
thread_data := Thread_Data{&shared_value, &barrier}
threads: [NUM_THREADS]^thread.Thread
for &th in threads {
th = thread.create(thread_proc)
th.data = &thread_data
}
for th in threads {
thread.start(th)
}
for th in threads {
thread.join(th)
thread.destroy(th)
}
testing.expect_value(t, shared_value, 1000.0)
}
@(test)
test_atomic_add_with_f32 :: proc(t: ^testing.T) {
// Verify the f32 type dispatch works correctly under contention.
// Same approach as the f64 add test but with f32.
NUM_THREADS :: 8
ITERATIONS_PER_THREAD :: 10_000
shared_value: f32 = 0.0
barrier: sync.Barrier
sync.barrier_init(&barrier, NUM_THREADS)
Thread_Data :: struct {
value: ^f32,
barrier: ^sync.Barrier,
}
thread_proc :: proc(th: ^thread.Thread) {
ctx := cast(^Thread_Data)th.data
// Wait for all threads to be ready before starting
sync.barrier_wait(ctx.barrier)
for _ in 0 ..< ITERATIONS_PER_THREAD {
atomic_add_float(ctx.value, 1.0, .Relaxed)
}
}
thread_data := Thread_Data{&shared_value, &barrier}
threads: [NUM_THREADS]^thread.Thread
for &th in threads {
th = thread.create(thread_proc)
th.data = &thread_data
}
for th in threads {
thread.start(th)
}
for th in threads {
thread.join(th)
thread.destroy(th)
}
expected := f32(NUM_THREADS * ITERATIONS_PER_THREAD)
testing.expect_value(t, shared_value, expected)
}
@(test)
test_atomic_release_acquire_publish_visibility :: proc(t: ^testing.T) {
// Tests that the memory order passed to atomic_float_op's CAS success condition
// provides full ordering guarantees for the entire float operation.
//
// Both sides use atomic_add_float (not raw intrinsics) to verify:
// - Release on CAS success publishes prior non-atomic writes
// - Acquire on CAS success makes those writes visible to the reader
//
// NOTE: This test may pass even with Relaxed ordering on x86 due to its strong memory model.
// On ARM or other weak-memory architectures, using Relaxed here would likely cause failures.
NUM_READERS :: 4
Shared_State :: struct {
flag: f64,
// Padding to avoid false sharing between flag and data
_padding: [64]u8,
published_data: [4]int,
}
shared: Shared_State
barrier: sync.Barrier
sync.barrier_init(&barrier, NUM_READERS + 1) // +1 for writer
Reader_Data :: struct {
shared: ^Shared_State,
barrier: ^sync.Barrier,
saw_data: bool,
data_valid: bool,
}
Writer_Data :: struct {
shared: ^Shared_State,
barrier: ^sync.Barrier,
}
writer_proc :: proc(th: ^thread.Thread) {
ctx := cast(^Writer_Data)th.data
sync.barrier_wait(ctx.barrier)
// Write data that readers will verify
ctx.shared.published_data[0] = 42
ctx.shared.published_data[1] = 43
ctx.shared.published_data[2] = 44
ctx.shared.published_data[3] = 45
// Release via the float op: CAS success ordering must publish all writes above
atomic_add_float(&ctx.shared.flag, 1.0, .Release)
}
reader_proc :: proc(th: ^thread.Thread) {
ctx := cast(^Reader_Data)th.data
sync.barrier_wait(ctx.barrier)
// Spin using the float op with Acquire ordering.
// Adding 0.0 is a no-op on the value but exercises the full CAS loop.
// When the CAS succeeds with Acquire, all writes before the writer's Release must be visible.
for {
old := atomic_add_float(&ctx.shared.flag, 0.0, .Acquire)
if old > 0.0 do break
intrinsics.cpu_relax()
}
// If the CAS success ordering provides full guarantees, we MUST see all published data
ctx.saw_data = true
d0 := ctx.shared.published_data[0]
d1 := ctx.shared.published_data[1]
d2 := ctx.shared.published_data[2]
d3 := ctx.shared.published_data[3]
ctx.data_valid = (d0 == 42 && d1 == 43 && d2 == 44 && d3 == 45)
}
writer_data := Writer_Data{&shared, &barrier}
reader_data: [NUM_READERS]Reader_Data
for &rd in reader_data {
rd = Reader_Data{&shared, &barrier, false, false}
}
writer_thread := thread.create(writer_proc)
writer_thread.data = &writer_data
reader_threads: [NUM_READERS]^thread.Thread
for &th, i in reader_threads {
th = thread.create(reader_proc)
th.data = &reader_data[i]
}
thread.start(writer_thread)
for th in reader_threads {
thread.start(th)
}
thread.join(writer_thread)
thread.destroy(writer_thread)
for th in reader_threads {
thread.join(th)
thread.destroy(th)
}
// Verify all readers saw the data correctly
for rd, i in reader_data {
testing.expectf(t, rd.saw_data, "Reader %d didn't observe the flag", i)
testing.expectf(t, rd.data_valid, "Reader %d saw flag but data was not visible (memory ordering bug)", i)
}
}
@(test)
test_spinlock_try_lock_mutual_exclusion :: proc(t: ^testing.T) {
// Stress test for spinlock_try_lock: N threads spin-acquire the lock and
// perform a deliberate non-atomic read-modify-write on shared data.
//
// If mutual exclusion holds:
// - `counter` ends at exactly NUM_THREADS * ITERATIONS_PER_THREAD
// - `concurrent_holders` never exceeds 1
//
// A multi-step RMW (read → relax → write) widens the critical section so
// any failure to exclude is virtually guaranteed to corrupt the counter.
NUM_THREADS :: 8
ITERATIONS_PER_THREAD :: 50_000
Shared :: struct {
lock: Spinlock,
// Padding to avoid false sharing between the lock and the data it protects.
_pad0: [64]u8,
counter: int,
// Tracks how many threads believe they hold the lock simultaneously.
// Must never exceed 1.
concurrent_holders: int,
max_holders: int,
_pad1: [64]u8,
}
Thread_Data :: struct {
shared: ^Shared,
barrier: ^sync.Barrier,
}
shared: Shared
barrier: sync.Barrier
sync.barrier_init(&barrier, NUM_THREADS)
thread_proc :: proc(th: ^thread.Thread) {
ctx := cast(^Thread_Data)th.data
s := ctx.shared
// All threads rendezvous here for maximum contention.
sync.barrier_wait(ctx.barrier)
for _ in 0 ..< ITERATIONS_PER_THREAD {
// Spin on try_lock until we acquire it.
for !spinlock_try_lock(&s.lock) {
intrinsics.cpu_relax()
}
// --- critical section start ---
// Atomically bump the holder count so we can detect overlapping holders.
holders := intrinsics.atomic_add_explicit(&s.concurrent_holders, 1, .Relaxed)
// Track the maximum we ever observed (relaxed is fine, this is
// purely diagnostic and protected by the spinlock for writes).
if holders + 1 > s.max_holders {
s.max_holders = holders + 1
}
// Non-atomic RMW: read, spin a tiny bit, then write.
// This deliberately creates a wide window where a second holder
// would cause a lost update.
val := s.counter
intrinsics.cpu_relax()
intrinsics.cpu_relax()
s.counter = val + 1
intrinsics.atomic_sub_explicit(&s.concurrent_holders, 1, .Relaxed)
// --- critical section end ---
spinlock_unlock(&s.lock)
}
}
td := Thread_Data{&shared, &barrier}
threads: [NUM_THREADS]^thread.Thread
for &th in threads {
th = thread.create(thread_proc)
th.data = &td
}
for th in threads {
thread.start(th)
}
for th in threads {
thread.join(th)
thread.destroy(th)
}
expected := NUM_THREADS * ITERATIONS_PER_THREAD
testing.expectf(
t,
shared.counter == expected,
"Counter mismatch: got %d, expected %d (mutual exclusion violated — lost updates)",
shared.counter,
expected,
)
testing.expectf(
t,
shared.max_holders == 1,
"Max concurrent lock holders was %d (expected 1 — lock was held by multiple threads simultaneously)",
shared.max_holders,
)
}