package levsync import "base:intrinsics" @(private) Flop :: enum { Add, Subtract, Multiply, Divide, } // Returns the value at `dst` that was atomically replaced by the result of the operation. @(private) atomic_float_op_cas :: #force_inline proc "contextless" ( dst: ^$FLOAT, val: FLOAT, $OP: Flop, $ORDER: intrinsics.Atomic_Memory_Order, ) -> FLOAT where intrinsics.type_is_float(FLOAT) { when FLOAT == f16 { dst_i := cast(^u16)(dst) } else when FLOAT == f32 { dst_i := cast(^u32)(dst) } else when FLOAT == f64 { dst_i := cast(^u64)(dst) } else { #panic("atomic_float_op only supports f16, f32, and f64") } for { old_f := intrinsics.atomic_load_explicit(dst, .Relaxed) when OP == .Add { new_f := old_f + val } else when OP == .Subtract { new_f := old_f - val } else when OP == .Multiply { new_f := old_f * val } else when OP == .Divide { new_f := old_f / val } else { #panic("Flop support not yet added for operation. This should never happen.") } when FLOAT == f16 { old_i := transmute(u16)old_f new_i := transmute(u16)new_f } else when FLOAT == f32 { old_i := transmute(u32)old_f new_i := transmute(u32)new_f } else when FLOAT == f64 { old_i := transmute(u64)old_f new_i := transmute(u64)new_f } // Setting order of compare exchange success alone guarentees overall order of the flop. _, ok := intrinsics.atomic_compare_exchange_weak_explicit(dst_i, old_i, new_i, ORDER, .Relaxed) if ok do return old_f } } // Returns the value at `dst` that was atomically replaced by the result of the operation. atomic_add_float :: #force_inline proc "contextless" ( dst: ^$FLOAT, val: FLOAT, $ORDER: intrinsics.Atomic_Memory_Order, ) -> FLOAT where intrinsics.type_is_float(FLOAT) { return atomic_float_op_cas(dst, val, .Add, ORDER) } // Returns the value at `dst` that was atomically replaced by the result of the operation. atomic_sub_float :: #force_inline proc "contextless" ( dst: ^$FLOAT, val: FLOAT, $ORDER: intrinsics.Atomic_Memory_Order, ) -> FLOAT where intrinsics.type_is_float(FLOAT) { return atomic_float_op_cas(dst, val, .Subtract, ORDER) } // Returns the value at `dst` that was atomically replaced by the result of the operation. atomic_mul_float :: #force_inline proc "contextless" ( dst: ^$FLOAT, val: FLOAT, $ORDER: intrinsics.Atomic_Memory_Order, ) -> FLOAT where intrinsics.type_is_float(FLOAT) { return atomic_float_op_cas(dst, val, .Multiply, ORDER) } // Returns the value at `dst` that was atomically replaced by the result of the operation. atomic_div_float :: #force_inline proc "contextless" ( dst: ^$FLOAT, val: FLOAT, $ORDER: intrinsics.Atomic_Memory_Order, ) -> FLOAT where intrinsics.type_is_float(FLOAT) { return atomic_float_op_cas(dst, val, .Divide, ORDER) } // --------------------------------------------------------------------------------------------------------------------- // ----- Tests ------------------------ // --------------------------------------------------------------------------------------------------------------------- import "core:sync" import "core:testing" import "core:thread" @(test) test_concurrent_atomic_add_no_lost_updates :: proc(t: ^testing.T) { // Multiple threads will each add 1.0 this many times. // If any updates are lost due to race conditions, the final sum will be wrong. NUM_THREADS :: 8 ITERATIONS_PER_THREAD :: 10_000 shared_value: f64 = 0.0 barrier: sync.Barrier sync.barrier_init(&barrier, NUM_THREADS) Thread_Data :: struct { value: ^f64, barrier: ^sync.Barrier, } thread_proc :: proc(th: ^thread.Thread) { ctx := cast(^Thread_Data)th.data // Wait for all threads to be ready before starting sync.barrier_wait(ctx.barrier) for _ in 0 ..< ITERATIONS_PER_THREAD { atomic_add_float(ctx.value, 1.0, .Relaxed) } } thread_data := Thread_Data{&shared_value, &barrier} threads: [NUM_THREADS]^thread.Thread for &th in threads { th = thread.create(thread_proc) th.data = &thread_data } for th in threads { thread.start(th) } for th in threads { thread.join(th) thread.destroy(th) } expected := f64(NUM_THREADS * ITERATIONS_PER_THREAD) testing.expect_value(t, shared_value, expected) } @(test) test_concurrent_atomic_sub_no_lost_updates :: proc(t: ^testing.T) { // Start with a known value, multiple threads subtract. // If any updates are lost due to race conditions, the final result will be wrong. NUM_THREADS :: 8 ITERATIONS_PER_THREAD :: 10_000 shared_value: f64 = f64(NUM_THREADS * ITERATIONS_PER_THREAD) barrier: sync.Barrier sync.barrier_init(&barrier, NUM_THREADS) Thread_Data :: struct { value: ^f64, barrier: ^sync.Barrier, } thread_proc :: proc(th: ^thread.Thread) { ctx := cast(^Thread_Data)th.data // Wait for all threads to be ready before starting sync.barrier_wait(ctx.barrier) for _ in 0 ..< ITERATIONS_PER_THREAD { atomic_sub_float(ctx.value, 1.0, .Relaxed) } } thread_data := Thread_Data{&shared_value, &barrier} threads: [NUM_THREADS]^thread.Thread for &th in threads { th = thread.create(thread_proc) th.data = &thread_data } for th in threads { thread.start(th) } for th in threads { thread.join(th) thread.destroy(th) } testing.expect_value(t, shared_value, 0.0) } @(test) test_concurrent_atomic_mul_div_round_trip :: proc(t: ^testing.T) { // Each thread multiplies by 2.0 then divides by 2.0. // Since these are inverses, the final value should equal the starting value // regardless of how operations interleave. NUM_THREADS :: 8 ITERATIONS_PER_THREAD :: 10_000 shared_value: f64 = 1000.0 // Start with a value that won't underflow/overflow barrier: sync.Barrier sync.barrier_init(&barrier, NUM_THREADS) Thread_Data :: struct { value: ^f64, barrier: ^sync.Barrier, } thread_proc :: proc(th: ^thread.Thread) { ctx := cast(^Thread_Data)th.data // Wait for all threads to be ready before starting sync.barrier_wait(ctx.barrier) for _ in 0 ..< ITERATIONS_PER_THREAD { atomic_mul_float(ctx.value, 2.0, .Relaxed) atomic_div_float(ctx.value, 2.0, .Relaxed) } } thread_data := Thread_Data{&shared_value, &barrier} threads: [NUM_THREADS]^thread.Thread for &th in threads { th = thread.create(thread_proc) th.data = &thread_data } for th in threads { thread.start(th) } for th in threads { thread.join(th) thread.destroy(th) } testing.expect_value(t, shared_value, 1000.0) } @(test) test_atomic_add_with_f32 :: proc(t: ^testing.T) { // Verify the f32 type dispatch works correctly under contention. // Same approach as the f64 add test but with f32. NUM_THREADS :: 8 ITERATIONS_PER_THREAD :: 10_000 shared_value: f32 = 0.0 barrier: sync.Barrier sync.barrier_init(&barrier, NUM_THREADS) Thread_Data :: struct { value: ^f32, barrier: ^sync.Barrier, } thread_proc :: proc(th: ^thread.Thread) { ctx := cast(^Thread_Data)th.data // Wait for all threads to be ready before starting sync.barrier_wait(ctx.barrier) for _ in 0 ..< ITERATIONS_PER_THREAD { atomic_add_float(ctx.value, 1.0, .Relaxed) } } thread_data := Thread_Data{&shared_value, &barrier} threads: [NUM_THREADS]^thread.Thread for &th in threads { th = thread.create(thread_proc) th.data = &thread_data } for th in threads { thread.start(th) } for th in threads { thread.join(th) thread.destroy(th) } expected := f32(NUM_THREADS * ITERATIONS_PER_THREAD) testing.expect_value(t, shared_value, expected) } @(test) test_atomic_release_acquire_publish_visibility :: proc(t: ^testing.T) { // Tests that the memory order passed to atomic_float_op's CAS success condition // provides full ordering guarantees for the entire float operation. // // Both sides use atomic_add_float (not raw intrinsics) to verify: // - Release on CAS success publishes prior non-atomic writes // - Acquire on CAS success makes those writes visible to the reader // // NOTE: This test may pass even with Relaxed ordering on x86 due to its strong memory model. // On ARM or other weak-memory architectures, using Relaxed here would likely cause failures. NUM_READERS :: 4 Shared_State :: struct { flag: f64, // Padding to avoid false sharing between flag and data _padding: [64]u8, published_data: [4]int, } shared: Shared_State barrier: sync.Barrier sync.barrier_init(&barrier, NUM_READERS + 1) // +1 for writer Reader_Data :: struct { shared: ^Shared_State, barrier: ^sync.Barrier, saw_data: bool, data_valid: bool, } Writer_Data :: struct { shared: ^Shared_State, barrier: ^sync.Barrier, } writer_proc :: proc(th: ^thread.Thread) { ctx := cast(^Writer_Data)th.data sync.barrier_wait(ctx.barrier) // Write data that readers will verify ctx.shared.published_data[0] = 42 ctx.shared.published_data[1] = 43 ctx.shared.published_data[2] = 44 ctx.shared.published_data[3] = 45 // Release via the float op: CAS success ordering must publish all writes above atomic_add_float(&ctx.shared.flag, 1.0, .Release) } reader_proc :: proc(th: ^thread.Thread) { ctx := cast(^Reader_Data)th.data sync.barrier_wait(ctx.barrier) // Spin using the float op with Acquire ordering. // Adding 0.0 is a no-op on the value but exercises the full CAS loop. // When the CAS succeeds with Acquire, all writes before the writer's Release must be visible. for { old := atomic_add_float(&ctx.shared.flag, 0.0, .Acquire) if old > 0.0 do break intrinsics.cpu_relax() } // If the CAS success ordering provides full guarantees, we MUST see all published data ctx.saw_data = true d0 := ctx.shared.published_data[0] d1 := ctx.shared.published_data[1] d2 := ctx.shared.published_data[2] d3 := ctx.shared.published_data[3] ctx.data_valid = (d0 == 42 && d1 == 43 && d2 == 44 && d3 == 45) } writer_data := Writer_Data{&shared, &barrier} reader_data: [NUM_READERS]Reader_Data for &rd in reader_data { rd = Reader_Data{&shared, &barrier, false, false} } writer_thread := thread.create(writer_proc) writer_thread.data = &writer_data reader_threads: [NUM_READERS]^thread.Thread for &th, i in reader_threads { th = thread.create(reader_proc) th.data = &reader_data[i] } thread.start(writer_thread) for th in reader_threads { thread.start(th) } thread.join(writer_thread) thread.destroy(writer_thread) for th in reader_threads { thread.join(th) thread.destroy(th) } // Verify all readers saw the data correctly for rd, i in reader_data { testing.expectf(t, rd.saw_data, "Reader %d didn't observe the flag", i) testing.expectf(t, rd.data_valid, "Reader %d saw flag but data was not visible (memory ordering bug)", i) } }