Skip to main content

xlog_cuda_tests/categories/
c08_synchronization.rs

1//! Category 8: Synchronization and Memory Ordering
2//!
3//! Tests synchronization primitives and memory ordering including atomics,
4//! block synchronization barriers, and concurrent operations.
5
6use crate::harness::{CategoryResult, TestContext, TestResult};
7use std::time::Instant;
8use xlog_core::{ScalarType, Schema};
9
10/// Run all tests in this category.
11pub fn run_all(ctx: &TestContext) -> CategoryResult {
12    let mut results = CategoryResult::new("c08_synchronization");
13    let start = Instant::now();
14
15    results.add_result(test_hash_join_atomics(ctx));
16    results.add_result(test_filter_scan_sync(ctx));
17    results.add_result(test_sort_barrier_correctness(ctx));
18    results.add_result(test_dedup_atomic_marking(ctx));
19    results.add_result(test_concurrent_operations(ctx));
20
21    results.set_duration(start.elapsed());
22    results
23}
24
25/// Test 1: Hash join uses atomics for hash table - verify correctness.
26///
27/// Hash join operations use atomic operations for building the hash table.
28/// This test verifies that concurrent atomic operations produce correct results.
29fn test_hash_join_atomics(ctx: &TestContext) -> TestResult {
30    let start = Instant::now();
31
32    let left_schema = Schema::new(vec![
33        ("key".to_string(), ScalarType::U32),
34        ("lval".to_string(), ScalarType::U32),
35    ]);
36    let right_schema = Schema::new(vec![
37        ("key".to_string(), ScalarType::U32),
38        ("rval".to_string(), ScalarType::U32),
39    ]);
40
41    // Create left table with sequential keys
42    const LEFT_SIZE: usize = 10000;
43    let left_keys: Vec<u32> = (0..LEFT_SIZE as u32).collect();
44    let left_vals: Vec<u32> = (0..LEFT_SIZE as u32).map(|i| i * 10).collect();
45
46    // Create right table with overlapping keys (every 2nd key)
47    const RIGHT_SIZE: usize = 5000;
48    let right_keys: Vec<u32> = (0..RIGHT_SIZE as u32).map(|i| i * 2).collect();
49    let right_vals: Vec<u32> = (0..RIGHT_SIZE as u32).map(|i| i * 100).collect();
50
51    let left_buffer = match ctx
52        .provider
53        .create_buffer_from_u32_columns(&[&left_keys, &left_vals], left_schema.clone())
54    {
55        Ok(buf) => buf,
56        Err(e) => {
57            return TestResult::error(
58                "test_hash_join_atomics",
59                start.elapsed(),
60                format!("Failed to create left buffer: {}", e),
61            )
62        }
63    };
64
65    let right_buffer = match ctx
66        .provider
67        .create_buffer_from_u32_columns(&[&right_keys, &right_vals], right_schema.clone())
68    {
69        Ok(buf) => buf,
70        Err(e) => {
71            return TestResult::error(
72                "test_hash_join_atomics",
73                start.elapsed(),
74                format!("Failed to create right buffer: {}", e),
75            )
76        }
77    };
78
79    // Perform hash join
80    let joined = match ctx
81        .provider
82        .hash_join(&left_buffer, &right_buffer, &[0], &[0])
83    {
84        Ok(j) => j,
85        Err(e) => {
86            return TestResult::error(
87                "test_hash_join_atomics",
88                start.elapsed(),
89                format!("Hash join failed: {}", e),
90            )
91        }
92    };
93
94    // Expected: matches for keys 0, 2, 4, ..., 9998 = 5000 matches
95    let expected_matches = RIGHT_SIZE;
96    if ctx.device_row_count(&joined) != expected_matches as u64 {
97        return TestResult::error(
98            "test_hash_join_atomics",
99            start.elapsed(),
100            format!(
101                "Join returned {} rows, expected {}",
102                ctx.device_row_count(&joined),
103                expected_matches
104            ),
105        );
106    }
107
108    // Download and verify join results
109    let joined_keys = match ctx.provider.download_column::<u32>(&joined, 0) {
110        Ok(d) => d,
111        Err(e) => {
112            return TestResult::error(
113                "test_hash_join_atomics",
114                start.elapsed(),
115                format!("Failed to download joined keys: {}", e),
116            )
117        }
118    };
119
120    let joined_lvals = match ctx.provider.download_column::<u32>(&joined, 1) {
121        Ok(d) => d,
122        Err(e) => {
123            return TestResult::error(
124                "test_hash_join_atomics",
125                start.elapsed(),
126                format!("Failed to download joined lvals: {}", e),
127            )
128        }
129    };
130
131    let joined_rvals = match ctx.provider.download_column::<u32>(&joined, 2) {
132        Ok(d) => d,
133        Err(e) => {
134            return TestResult::error(
135                "test_hash_join_atomics",
136                start.elapsed(),
137                format!("Failed to download joined rvals: {}", e),
138            )
139        }
140    };
141
142    // Verify that all joined rows have matching keys and correct values
143    for i in 0..ctx.device_row_count(&joined) as usize {
144        let key = joined_keys[i];
145        let lval = joined_lvals[i];
146        let rval = joined_rvals[i];
147
148        // Key should be even (matching pattern)
149        if key % 2 != 0 {
150            return TestResult::error(
151                "test_hash_join_atomics",
152                start.elapsed(),
153                format!("Row {}: key {} should be even", i, key),
154            );
155        }
156
157        // lval should be key * 10
158        let expected_lval = key * 10;
159        if lval != expected_lval {
160            return TestResult::error(
161                "test_hash_join_atomics",
162                start.elapsed(),
163                format!(
164                    "Row {}: lval {} doesn't match expected {} for key {}",
165                    i, lval, expected_lval, key
166                ),
167            );
168        }
169
170        // rval should be (key/2) * 100
171        let expected_rval = (key / 2) * 100;
172        if rval != expected_rval {
173            return TestResult::error(
174                "test_hash_join_atomics",
175                start.elapsed(),
176                format!(
177                    "Row {}: rval {} doesn't match expected {} for key {}",
178                    i, rval, expected_rval, key
179                ),
180            );
181        }
182    }
183
184    if let Err(e) = ctx.sync_and_check() {
185        return TestResult::error(
186            "test_hash_join_atomics",
187            start.elapsed(),
188            format!("Sync failed: {}", e),
189        );
190    }
191
192    TestResult::passed("test_hash_join_atomics", start.elapsed())
193}
194
195/// Test 2: Filter uses scan which requires block synchronization.
196///
197/// Filter operations use prefix scan to compute output positions. Prefix scan
198/// requires __syncthreads() for correctness within blocks.
199fn test_filter_scan_sync(ctx: &TestContext) -> TestResult {
200    let start = Instant::now();
201    let schema = Schema::new(vec![("val".to_string(), ScalarType::U32)]);
202
203    // Test various sizes that exercise different scan patterns
204    let test_cases: Vec<(usize, Box<dyn Fn(usize) -> bool>)> = vec![
205        // (size, predicate)
206        (1000, Box::new(|i| i % 2 == 0)),          // Keep even
207        (10000, Box::new(|i| i % 3 == 0)),         // Keep multiples of 3
208        (50000, Box::new(|i| i < 25000)),          // Keep first half
209        (100000, Box::new(|i| i % 7 == 0)),        // Keep multiples of 7
210        (65536, Box::new(|i| (i / 256) % 2 == 0)), // Keep alternating chunks
211    ];
212
213    for (size, predicate) in test_cases {
214        // Create sequential data
215        let data: Vec<u32> = (0..size as u32).collect();
216
217        let buffer = match ctx
218            .provider
219            .create_buffer_from_slice::<u32>(&data, schema.clone())
220        {
221            Ok(buf) => buf,
222            Err(e) => {
223                return TestResult::error(
224                    "test_filter_scan_sync",
225                    start.elapsed(),
226                    format!("Size {}: failed to create buffer: {}", size, e),
227                )
228            }
229        };
230
231        // Create mask based on predicate
232        let mask: Vec<u8> = (0..size)
233            .map(|i| if predicate(i) { 1 } else { 0 })
234            .collect();
235        let expected_count: usize = mask.iter().map(|&m| m as usize).sum();
236
237        let filtered = match ctx.provider.filter_by_mask(&buffer, &mask) {
238            Ok(f) => f,
239            Err(e) => {
240                return TestResult::error(
241                    "test_filter_scan_sync",
242                    start.elapsed(),
243                    format!("Size {}: filter failed: {}", size, e),
244                )
245            }
246        };
247
248        // Verify count
249        if ctx.device_row_count(&filtered) != expected_count as u64 {
250            return TestResult::error(
251                "test_filter_scan_sync",
252                start.elapsed(),
253                format!(
254                    "Size {}: filter returned {} rows, expected {}",
255                    size,
256                    ctx.device_row_count(&filtered),
257                    expected_count
258                ),
259            );
260        }
261
262        // Download and verify values
263        let filtered_data = match ctx.provider.download_column::<u32>(&filtered, 0) {
264            Ok(d) => d,
265            Err(e) => {
266                return TestResult::error(
267                    "test_filter_scan_sync",
268                    start.elapsed(),
269                    format!("Size {}: failed to download: {}", size, e),
270                )
271            }
272        };
273
274        // Verify each filtered value matches predicate
275        let mut expected_idx = 0;
276        for i in 0..size {
277            if predicate(i) {
278                if expected_idx >= filtered_data.len() {
279                    return TestResult::error(
280                        "test_filter_scan_sync",
281                        start.elapsed(),
282                        format!(
283                            "Size {}: filtered data too short at expected index {}",
284                            size, expected_idx
285                        ),
286                    );
287                }
288                if filtered_data[expected_idx] != i as u32 {
289                    return TestResult::error(
290                        "test_filter_scan_sync",
291                        start.elapsed(),
292                        format!(
293                            "Size {}: filtered[{}] = {}, expected {}",
294                            size, expected_idx, filtered_data[expected_idx], i
295                        ),
296                    );
297                }
298                expected_idx += 1;
299            }
300        }
301    }
302
303    if let Err(e) = ctx.sync_and_check() {
304        return TestResult::error(
305            "test_filter_scan_sync",
306            start.elapsed(),
307            format!("Sync failed: {}", e),
308        );
309    }
310
311    TestResult::passed("test_filter_scan_sync", start.elapsed())
312}
313
314/// Test 3: Sort uses __syncthreads() - verify correct ordering.
315///
316/// Sort algorithms like radix sort use block synchronization barriers
317/// (__syncthreads) for correctness. This test verifies that synchronization
318/// works correctly across various data patterns.
319fn test_sort_barrier_correctness(ctx: &TestContext) -> TestResult {
320    let start = Instant::now();
321    let schema = Schema::new(vec![
322        ("key".to_string(), ScalarType::U32),
323        ("val".to_string(), ScalarType::U32),
324    ]);
325
326    // Test patterns that stress synchronization
327    let test_patterns: Vec<(&str, Vec<u32>)> = vec![
328        ("reverse", (0..10000u32).rev().collect()),
329        (
330            "alternating",
331            (0..10000u32)
332                .map(|i| if i % 2 == 0 { i } else { 10000 - i })
333                .collect(),
334        ),
335        ("sawtooth", (0..10000u32).map(|i| i % 100).collect()),
336        (
337            "random_lcg",
338            (0..10000usize)
339                .map(|i| ((i * 1103515245 + 12345) % 10000) as u32)
340                .collect(),
341        ),
342        (
343            "blocks_reversed",
344            (0..10000u32)
345                .map(|i| {
346                    let block = i / 256;
347                    let offset = i % 256;
348                    block * 256 + (255 - offset)
349                })
350                .collect(),
351        ),
352    ];
353
354    for (name, keys) in test_patterns {
355        let size = keys.len();
356        let vals: Vec<u32> = (0..size as u32).collect();
357
358        let buffer = match ctx
359            .provider
360            .create_buffer_from_u32_columns(&[&keys, &vals], schema.clone())
361        {
362            Ok(buf) => buf,
363            Err(e) => {
364                return TestResult::error(
365                    "test_sort_barrier_correctness",
366                    start.elapsed(),
367                    format!("Pattern {}: failed to create buffer: {}", name, e),
368                )
369            }
370        };
371
372        // Sort by key
373        let sorted = match ctx.provider.sort(&buffer, &[0]) {
374            Ok(s) => s,
375            Err(e) => {
376                return TestResult::error(
377                    "test_sort_barrier_correctness",
378                    start.elapsed(),
379                    format!("Pattern {}: sort failed: {}", name, e),
380                )
381            }
382        };
383
384        // Verify row count
385        if ctx.device_row_count(&sorted) != size as u64 {
386            return TestResult::error(
387                "test_sort_barrier_correctness",
388                start.elapsed(),
389                format!(
390                    "Pattern {}: sort returned {} rows, expected {}",
391                    name,
392                    ctx.device_row_count(&sorted),
393                    size
394                ),
395            );
396        }
397
398        // Download sorted keys
399        let sorted_keys = match ctx.provider.download_column::<u32>(&sorted, 0) {
400            Ok(d) => d,
401            Err(e) => {
402                return TestResult::error(
403                    "test_sort_barrier_correctness",
404                    start.elapsed(),
405                    format!("Pattern {}: failed to download sorted keys: {}", name, e),
406                )
407            }
408        };
409
410        // Verify sorted order
411        for i in 1..size {
412            if sorted_keys[i] < sorted_keys[i - 1] {
413                return TestResult::error(
414                    "test_sort_barrier_correctness",
415                    start.elapsed(),
416                    format!(
417                        "Pattern {}: sort order incorrect at {}: {} > {}",
418                        name,
419                        i,
420                        sorted_keys[i - 1],
421                        sorted_keys[i]
422                    ),
423                );
424            }
425        }
426
427        // Verify all original keys are present (same multiset)
428        let mut original_sorted = keys.clone();
429        original_sorted.sort();
430        if sorted_keys != original_sorted {
431            return TestResult::error(
432                "test_sort_barrier_correctness",
433                start.elapsed(),
434                format!("Pattern {}: keys not preserved through sort", name),
435            );
436        }
437    }
438
439    if let Err(e) = ctx.sync_and_check() {
440        return TestResult::error(
441            "test_sort_barrier_correctness",
442            start.elapsed(),
443            format!("Sync failed: {}", e),
444        );
445    }
446
447    TestResult::passed("test_sort_barrier_correctness", start.elapsed())
448}
449
450/// Test 4: Dedup uses atomic operations - verify correctness.
451///
452/// Dedup operations use atomic operations to mark duplicates. This test
453/// verifies that concurrent atomic marking produces correct deduplication.
454fn test_dedup_atomic_marking(ctx: &TestContext) -> TestResult {
455    let start = Instant::now();
456    let schema = Schema::new(vec![
457        ("key".to_string(), ScalarType::U32),
458        ("val".to_string(), ScalarType::U32),
459    ]);
460
461    // Test various duplicate patterns
462    let test_cases: Vec<(&str, Vec<u32>, Vec<u32>)> = vec![
463        // (name, keys, vals)
464        ("all_same", vec![42; 10000], (0..10000u32).collect()),
465        (
466            "pairs",
467            (0..5000u32).flat_map(|i| vec![i, i]).collect(),
468            (0..10000u32).collect(),
469        ),
470        (
471            "triples",
472            (0..3333u32)
473                .flat_map(|i| vec![i, i, i])
474                .take(9999)
475                .collect(),
476            (0..9999u32).collect(),
477        ),
478        ("no_dups", (0..10000u32).collect(), (0..10000u32).collect()),
479        (
480            "many_dups",
481            (0..10000u32).map(|i| i % 100).collect(),
482            (0..10000u32).collect(),
483        ),
484    ];
485
486    for (name, keys, vals) in test_cases {
487        let buffer = match ctx
488            .provider
489            .create_buffer_from_u32_columns(&[&keys, &vals], schema.clone())
490        {
491            Ok(buf) => buf,
492            Err(e) => {
493                return TestResult::error(
494                    "test_dedup_atomic_marking",
495                    start.elapsed(),
496                    format!("Pattern {}: failed to create buffer: {}", name, e),
497                )
498            }
499        };
500
501        // Dedup by key column
502        let deduped = match ctx.provider.dedup(&buffer, &[0]) {
503            Ok(d) => d,
504            Err(e) => {
505                return TestResult::error(
506                    "test_dedup_atomic_marking",
507                    start.elapsed(),
508                    format!("Pattern {}: dedup failed: {}", name, e),
509                )
510            }
511        };
512
513        // Calculate expected unique count
514        let mut unique_keys: std::collections::HashSet<u32> = std::collections::HashSet::new();
515        for &k in &keys {
516            unique_keys.insert(k);
517        }
518        let expected_unique = unique_keys.len();
519
520        if ctx.device_row_count(&deduped) != expected_unique as u64 {
521            return TestResult::error(
522                "test_dedup_atomic_marking",
523                start.elapsed(),
524                format!(
525                    "Pattern {}: dedup returned {} rows, expected {}",
526                    name,
527                    ctx.device_row_count(&deduped),
528                    expected_unique
529                ),
530            );
531        }
532
533        // Download and verify uniqueness
534        let deduped_keys = match ctx.provider.download_column::<u32>(&deduped, 0) {
535            Ok(d) => d,
536            Err(e) => {
537                return TestResult::error(
538                    "test_dedup_atomic_marking",
539                    start.elapsed(),
540                    format!("Pattern {}: failed to download deduped keys: {}", name, e),
541                )
542            }
543        };
544
545        // Verify all keys are unique
546        let mut seen_keys: std::collections::HashSet<u32> = std::collections::HashSet::new();
547        for &k in &deduped_keys {
548            if !seen_keys.insert(k) {
549                return TestResult::error(
550                    "test_dedup_atomic_marking",
551                    start.elapsed(),
552                    format!("Pattern {}: duplicate key {} in dedup result", name, k),
553                );
554            }
555        }
556
557        // Verify all original unique keys are present
558        if seen_keys != unique_keys {
559            return TestResult::error(
560                "test_dedup_atomic_marking",
561                start.elapsed(),
562                format!("Pattern {}: dedup result missing some unique keys", name),
563            );
564        }
565    }
566
567    if let Err(e) = ctx.sync_and_check() {
568        return TestResult::error(
569            "test_dedup_atomic_marking",
570            start.elapsed(),
571            format!("Sync failed: {}", e),
572        );
573    }
574
575    TestResult::passed("test_dedup_atomic_marking", start.elapsed())
576}
577
578/// Test 5: Run multiple independent operations, verify all correct.
579///
580/// Tests that multiple independent operations can run and complete correctly,
581/// verifying that GPU resource management and synchronization work properly.
582fn test_concurrent_operations(ctx: &TestContext) -> TestResult {
583    let start = Instant::now();
584    let schema = Schema::new(vec![("val".to_string(), ScalarType::U32)]);
585
586    // Create multiple buffers
587    const NUM_BUFFERS: usize = 5;
588    const BUFFER_SIZE: usize = 10000;
589
590    let mut buffers = Vec::with_capacity(NUM_BUFFERS);
591    let mut original_data = Vec::with_capacity(NUM_BUFFERS);
592
593    for i in 0..NUM_BUFFERS {
594        // Each buffer has different data pattern
595        let data: Vec<u32> = (0..BUFFER_SIZE)
596            .map(|j| ((j * (i + 1) * 7 + i * 13) % BUFFER_SIZE) as u32)
597            .collect();
598
599        let buffer = match ctx
600            .provider
601            .create_buffer_from_slice::<u32>(&data, schema.clone())
602        {
603            Ok(buf) => buf,
604            Err(e) => {
605                return TestResult::error(
606                    "test_concurrent_operations",
607                    start.elapsed(),
608                    format!("Failed to create buffer {}: {}", i, e),
609                )
610            }
611        };
612
613        original_data.push(data);
614        buffers.push(buffer);
615    }
616
617    // Perform different operations on each buffer
618    let mut results = Vec::with_capacity(NUM_BUFFERS);
619
620    // Buffer 0: Sort
621    let sorted0 = match ctx.provider.sort(&buffers[0], &[0]) {
622        Ok(s) => s,
623        Err(e) => {
624            return TestResult::error(
625                "test_concurrent_operations",
626                start.elapsed(),
627                format!("Sort of buffer 0 failed: {}", e),
628            )
629        }
630    };
631    results.push(("sort", sorted0));
632
633    // Buffer 1: Filter (keep first half)
634    let mask1: Vec<u8> = (0..BUFFER_SIZE)
635        .map(|i| if i < BUFFER_SIZE / 2 { 1 } else { 0 })
636        .collect();
637    let filtered1 = match ctx.provider.filter_by_mask(&buffers[1], &mask1) {
638        Ok(f) => f,
639        Err(e) => {
640            return TestResult::error(
641                "test_concurrent_operations",
642                start.elapsed(),
643                format!("Filter of buffer 1 failed: {}", e),
644            )
645        }
646    };
647    results.push(("filter", filtered1));
648
649    // Buffer 2: Sort
650    let sorted2 = match ctx.provider.sort(&buffers[2], &[0]) {
651        Ok(s) => s,
652        Err(e) => {
653            return TestResult::error(
654                "test_concurrent_operations",
655                start.elapsed(),
656                format!("Sort of buffer 2 failed: {}", e),
657            )
658        }
659    };
660    results.push(("sort", sorted2));
661
662    // Buffer 3: Filter (keep even indices)
663    let mask3: Vec<u8> = (0..BUFFER_SIZE)
664        .map(|i| if i % 2 == 0 { 1 } else { 0 })
665        .collect();
666    let filtered3 = match ctx.provider.filter_by_mask(&buffers[3], &mask3) {
667        Ok(f) => f,
668        Err(e) => {
669            return TestResult::error(
670                "test_concurrent_operations",
671                start.elapsed(),
672                format!("Filter of buffer 3 failed: {}", e),
673            )
674        }
675    };
676    results.push(("filter", filtered3));
677
678    // Buffer 4: Sort
679    let sorted4 = match ctx.provider.sort(&buffers[4], &[0]) {
680        Ok(s) => s,
681        Err(e) => {
682            return TestResult::error(
683                "test_concurrent_operations",
684                start.elapsed(),
685                format!("Sort of buffer 4 failed: {}", e),
686            )
687        }
688    };
689    results.push(("sort", sorted4));
690
691    // Verify all results
692    // Sort results (0, 2, 4)
693    for &idx in &[0usize, 2, 4] {
694        let (op, ref result) = results[idx];
695        assert!(op == "sort");
696
697        if ctx.device_row_count(&result) != BUFFER_SIZE as u64 {
698            return TestResult::error(
699                "test_concurrent_operations",
700                start.elapsed(),
701                format!(
702                    "Buffer {}: {} returned {} rows, expected {}",
703                    idx,
704                    op,
705                    ctx.device_row_count(&result),
706                    BUFFER_SIZE
707                ),
708            );
709        }
710
711        let sorted_data = match ctx.provider.download_column::<u32>(result, 0) {
712            Ok(d) => d,
713            Err(e) => {
714                return TestResult::error(
715                    "test_concurrent_operations",
716                    start.elapsed(),
717                    format!("Buffer {}: failed to download: {}", idx, e),
718                )
719            }
720        };
721
722        // Verify sorted order
723        for i in 1..BUFFER_SIZE {
724            if sorted_data[i] < sorted_data[i - 1] {
725                return TestResult::error(
726                    "test_concurrent_operations",
727                    start.elapsed(),
728                    format!(
729                        "Buffer {}: sort order incorrect at {}: {} > {}",
730                        idx,
731                        i,
732                        sorted_data[i - 1],
733                        sorted_data[i]
734                    ),
735                );
736            }
737        }
738
739        // Verify same values (sorted)
740        let mut expected = original_data[idx].clone();
741        expected.sort();
742        if sorted_data != expected {
743            return TestResult::error(
744                "test_concurrent_operations",
745                start.elapsed(),
746                format!("Buffer {}: sorted data doesn't match expected", idx),
747            );
748        }
749    }
750
751    // Filter results (1, 3)
752    // Buffer 1: first half
753    let (op1, ref result1) = results[1];
754    assert!(op1 == "filter");
755    if ctx.device_row_count(&result1) != (BUFFER_SIZE / 2) as u64 {
756        return TestResult::error(
757            "test_concurrent_operations",
758            start.elapsed(),
759            format!(
760                "Buffer 1: filter returned {} rows, expected {}",
761                ctx.device_row_count(&result1),
762                BUFFER_SIZE / 2
763            ),
764        );
765    }
766
767    // Buffer 3: even indices
768    let (op3, ref result3) = results[3];
769    assert!(op3 == "filter");
770    if ctx.device_row_count(&result3) != ((BUFFER_SIZE + 1) / 2) as u64 {
771        return TestResult::error(
772            "test_concurrent_operations",
773            start.elapsed(),
774            format!(
775                "Buffer 3: filter returned {} rows, expected {}",
776                ctx.device_row_count(&result3),
777                (BUFFER_SIZE + 1) / 2
778            ),
779        );
780    }
781
782    // Verify original buffers are unchanged
783    for i in 0..NUM_BUFFERS {
784        let current_data = match ctx.provider.download_column::<u32>(&buffers[i], 0) {
785            Ok(d) => d,
786            Err(e) => {
787                return TestResult::error(
788                    "test_concurrent_operations",
789                    start.elapsed(),
790                    format!("Failed to verify buffer {}: {}", i, e),
791                )
792            }
793        };
794
795        if current_data != original_data[i] {
796            return TestResult::error(
797                "test_concurrent_operations",
798                start.elapsed(),
799                format!("Buffer {} was modified by operations", i),
800            );
801        }
802    }
803
804    if let Err(e) = ctx.sync_and_check() {
805        return TestResult::error(
806            "test_concurrent_operations",
807            start.elapsed(),
808            format!("Sync failed: {}", e),
809        );
810    }
811
812    TestResult::passed("test_concurrent_operations", start.elapsed())
813}