Files
levlib/levsync/levsync.odin
2026-03-08 19:00:41 -07:00

392 lines
10 KiB
Odin

package levsync
import "base:intrinsics"
@(private)
Flop :: enum {
Add,
Subtract,
Multiply,
Divide,
}
// Returns the value at `dst` that was atomically replaced by the result of the operation.
@(private)
atomic_float_op_cas :: #force_inline proc "contextless" (
dst: ^$FLOAT,
val: FLOAT,
$OP: Flop,
$ORDER: intrinsics.Atomic_Memory_Order,
) -> FLOAT where intrinsics.type_is_float(FLOAT) {
when FLOAT == f16 {
dst_i := cast(^u16)(dst)
} else when FLOAT == f32 {
dst_i := cast(^u32)(dst)
} else when FLOAT == f64 {
dst_i := cast(^u64)(dst)
} else {
#panic("atomic_float_op only supports f16, f32, and f64")
}
for {
old_f := intrinsics.atomic_load_explicit(dst, .Relaxed)
when OP == .Add {
new_f := old_f + val
} else when OP == .Subtract {
new_f := old_f - val
} else when OP == .Multiply {
new_f := old_f * val
} else when OP == .Divide {
new_f := old_f / val
} else {
#panic("Flop support not yet added for operation. This should never happen.")
}
when FLOAT == f16 {
old_i := transmute(u16)old_f
new_i := transmute(u16)new_f
} else when FLOAT == f32 {
old_i := transmute(u32)old_f
new_i := transmute(u32)new_f
} else when FLOAT == f64 {
old_i := transmute(u64)old_f
new_i := transmute(u64)new_f
}
// Setting order of compare exchange success alone guarentees overall order of the flop.
_, ok := intrinsics.atomic_compare_exchange_weak_explicit(dst_i, old_i, new_i, ORDER, .Relaxed)
if ok do return old_f
}
}
// Returns the value at `dst` that was atomically replaced by the result of the operation.
atomic_add_float :: #force_inline proc "contextless" (
dst: ^$FLOAT,
val: FLOAT,
$ORDER: intrinsics.Atomic_Memory_Order,
) -> FLOAT where intrinsics.type_is_float(FLOAT) {
return atomic_float_op_cas(dst, val, .Add, ORDER)
}
// Returns the value at `dst` that was atomically replaced by the result of the operation.
atomic_sub_float :: #force_inline proc "contextless" (
dst: ^$FLOAT,
val: FLOAT,
$ORDER: intrinsics.Atomic_Memory_Order,
) -> FLOAT where intrinsics.type_is_float(FLOAT) {
return atomic_float_op_cas(dst, val, .Subtract, ORDER)
}
// Returns the value at `dst` that was atomically replaced by the result of the operation.
atomic_mul_float :: #force_inline proc "contextless" (
dst: ^$FLOAT,
val: FLOAT,
$ORDER: intrinsics.Atomic_Memory_Order,
) -> FLOAT where intrinsics.type_is_float(FLOAT) {
return atomic_float_op_cas(dst, val, .Multiply, ORDER)
}
// Returns the value at `dst` that was atomically replaced by the result of the operation.
atomic_div_float :: #force_inline proc "contextless" (
dst: ^$FLOAT,
val: FLOAT,
$ORDER: intrinsics.Atomic_Memory_Order,
) -> FLOAT where intrinsics.type_is_float(FLOAT) {
return atomic_float_op_cas(dst, val, .Divide, ORDER)
}
// ---------------------------------------------------------------------------------------------------------------------
// ----- Tests ------------------------
// ---------------------------------------------------------------------------------------------------------------------
import "core:sync"
import "core:testing"
import "core:thread"
@(test)
test_concurrent_atomic_add_no_lost_updates :: proc(t: ^testing.T) {
// Multiple threads will each add 1.0 this many times.
// If any updates are lost due to race conditions, the final sum will be wrong.
NUM_THREADS :: 8
ITERATIONS_PER_THREAD :: 10_000
shared_value: f64 = 0.0
barrier: sync.Barrier
sync.barrier_init(&barrier, NUM_THREADS)
Thread_Data :: struct {
value: ^f64,
barrier: ^sync.Barrier,
}
thread_proc :: proc(th: ^thread.Thread) {
ctx := cast(^Thread_Data)th.data
// Wait for all threads to be ready before starting
sync.barrier_wait(ctx.barrier)
for _ in 0 ..< ITERATIONS_PER_THREAD {
atomic_add_float(ctx.value, 1.0, .Relaxed)
}
}
thread_data := Thread_Data{&shared_value, &barrier}
threads: [NUM_THREADS]^thread.Thread
for &th in threads {
th = thread.create(thread_proc)
th.data = &thread_data
}
for th in threads {
thread.start(th)
}
for th in threads {
thread.join(th)
thread.destroy(th)
}
expected := f64(NUM_THREADS * ITERATIONS_PER_THREAD)
testing.expect_value(t, shared_value, expected)
}
@(test)
test_concurrent_atomic_sub_no_lost_updates :: proc(t: ^testing.T) {
// Start with a known value, multiple threads subtract.
// If any updates are lost due to race conditions, the final result will be wrong.
NUM_THREADS :: 8
ITERATIONS_PER_THREAD :: 10_000
shared_value: f64 = f64(NUM_THREADS * ITERATIONS_PER_THREAD)
barrier: sync.Barrier
sync.barrier_init(&barrier, NUM_THREADS)
Thread_Data :: struct {
value: ^f64,
barrier: ^sync.Barrier,
}
thread_proc :: proc(th: ^thread.Thread) {
ctx := cast(^Thread_Data)th.data
// Wait for all threads to be ready before starting
sync.barrier_wait(ctx.barrier)
for _ in 0 ..< ITERATIONS_PER_THREAD {
atomic_sub_float(ctx.value, 1.0, .Relaxed)
}
}
thread_data := Thread_Data{&shared_value, &barrier}
threads: [NUM_THREADS]^thread.Thread
for &th in threads {
th = thread.create(thread_proc)
th.data = &thread_data
}
for th in threads {
thread.start(th)
}
for th in threads {
thread.join(th)
thread.destroy(th)
}
testing.expect_value(t, shared_value, 0.0)
}
@(test)
test_concurrent_atomic_mul_div_round_trip :: proc(t: ^testing.T) {
// Each thread multiplies by 2.0 then divides by 2.0.
// Since these are inverses, the final value should equal the starting value
// regardless of how operations interleave.
NUM_THREADS :: 8
ITERATIONS_PER_THREAD :: 10_000
shared_value: f64 = 1000.0 // Start with a value that won't underflow/overflow
barrier: sync.Barrier
sync.barrier_init(&barrier, NUM_THREADS)
Thread_Data :: struct {
value: ^f64,
barrier: ^sync.Barrier,
}
thread_proc :: proc(th: ^thread.Thread) {
ctx := cast(^Thread_Data)th.data
// Wait for all threads to be ready before starting
sync.barrier_wait(ctx.barrier)
for _ in 0 ..< ITERATIONS_PER_THREAD {
atomic_mul_float(ctx.value, 2.0, .Relaxed)
atomic_div_float(ctx.value, 2.0, .Relaxed)
}
}
thread_data := Thread_Data{&shared_value, &barrier}
threads: [NUM_THREADS]^thread.Thread
for &th in threads {
th = thread.create(thread_proc)
th.data = &thread_data
}
for th in threads {
thread.start(th)
}
for th in threads {
thread.join(th)
thread.destroy(th)
}
testing.expect_value(t, shared_value, 1000.0)
}
@(test)
test_atomic_add_with_f32 :: proc(t: ^testing.T) {
// Verify the f32 type dispatch works correctly under contention.
// Same approach as the f64 add test but with f32.
NUM_THREADS :: 8
ITERATIONS_PER_THREAD :: 10_000
shared_value: f32 = 0.0
barrier: sync.Barrier
sync.barrier_init(&barrier, NUM_THREADS)
Thread_Data :: struct {
value: ^f32,
barrier: ^sync.Barrier,
}
thread_proc :: proc(th: ^thread.Thread) {
ctx := cast(^Thread_Data)th.data
// Wait for all threads to be ready before starting
sync.barrier_wait(ctx.barrier)
for _ in 0 ..< ITERATIONS_PER_THREAD {
atomic_add_float(ctx.value, 1.0, .Relaxed)
}
}
thread_data := Thread_Data{&shared_value, &barrier}
threads: [NUM_THREADS]^thread.Thread
for &th in threads {
th = thread.create(thread_proc)
th.data = &thread_data
}
for th in threads {
thread.start(th)
}
for th in threads {
thread.join(th)
thread.destroy(th)
}
expected := f32(NUM_THREADS * ITERATIONS_PER_THREAD)
testing.expect_value(t, shared_value, expected)
}
@(test)
test_atomic_release_acquire_publish_visibility :: proc(t: ^testing.T) {
// Tests that the memory order passed to atomic_float_op's CAS success condition
// provides full ordering guarantees for the entire float operation.
//
// Both sides use atomic_add_float (not raw intrinsics) to verify:
// - Release on CAS success publishes prior non-atomic writes
// - Acquire on CAS success makes those writes visible to the reader
//
// NOTE: This test may pass even with Relaxed ordering on x86 due to its strong memory model.
// On ARM or other weak-memory architectures, using Relaxed here would likely cause failures.
NUM_READERS :: 4
Shared_State :: struct {
flag: f64,
// Padding to avoid false sharing between flag and data
_padding: [64]u8,
published_data: [4]int,
}
shared: Shared_State
barrier: sync.Barrier
sync.barrier_init(&barrier, NUM_READERS + 1) // +1 for writer
Reader_Data :: struct {
shared: ^Shared_State,
barrier: ^sync.Barrier,
saw_data: bool,
data_valid: bool,
}
Writer_Data :: struct {
shared: ^Shared_State,
barrier: ^sync.Barrier,
}
writer_proc :: proc(th: ^thread.Thread) {
ctx := cast(^Writer_Data)th.data
sync.barrier_wait(ctx.barrier)
// Write data that readers will verify
ctx.shared.published_data[0] = 42
ctx.shared.published_data[1] = 43
ctx.shared.published_data[2] = 44
ctx.shared.published_data[3] = 45
// Release via the float op: CAS success ordering must publish all writes above
atomic_add_float(&ctx.shared.flag, 1.0, .Release)
}
reader_proc :: proc(th: ^thread.Thread) {
ctx := cast(^Reader_Data)th.data
sync.barrier_wait(ctx.barrier)
// Spin using the float op with Acquire ordering.
// Adding 0.0 is a no-op on the value but exercises the full CAS loop.
// When the CAS succeeds with Acquire, all writes before the writer's Release must be visible.
for {
old := atomic_add_float(&ctx.shared.flag, 0.0, .Acquire)
if old > 0.0 do break
intrinsics.cpu_relax()
}
// If the CAS success ordering provides full guarantees, we MUST see all published data
ctx.saw_data = true
d0 := ctx.shared.published_data[0]
d1 := ctx.shared.published_data[1]
d2 := ctx.shared.published_data[2]
d3 := ctx.shared.published_data[3]
ctx.data_valid = (d0 == 42 && d1 == 43 && d2 == 44 && d3 == 45)
}
writer_data := Writer_Data{&shared, &barrier}
reader_data: [NUM_READERS]Reader_Data
for &rd in reader_data {
rd = Reader_Data{&shared, &barrier, false, false}
}
writer_thread := thread.create(writer_proc)
writer_thread.data = &writer_data
reader_threads: [NUM_READERS]^thread.Thread
for &th, i in reader_threads {
th = thread.create(reader_proc)
th.data = &reader_data[i]
}
thread.start(writer_thread)
for th in reader_threads {
thread.start(th)
}
thread.join(writer_thread)
thread.destroy(writer_thread)
for th in reader_threads {
thread.join(th)
thread.destroy(th)
}
// Verify all readers saw the data correctly
for rd, i in reader_data {
testing.expectf(t, rd.saw_data, "Reader %d didn't observe the flag", i)
testing.expectf(t, rd.data_valid, "Reader %d saw flag but data was not visible (memory ordering bug)", i)
}
}