1use crate::harness::{CategoryResult, TestContext, TestResult};
7use std::collections::HashSet;
8use std::time::Instant;
9use xlog_core::{ScalarType, Schema};
10
11pub fn run_all(ctx: &TestContext) -> CategoryResult {
13 let mut results = CategoryResult::new("c12_atomics");
14 let start = Instant::now();
15
16 results.add_result(test_hash_join_atomic_correctness(ctx));
17 results.add_result(test_dedup_atomic_correctness(ctx));
18 results.add_result(test_high_contention_join(ctx));
19 results.add_result(test_atomic_counting(ctx));
20 results.add_result(test_concurrent_atomic_updates(ctx));
21
22 results.set_duration(start.elapsed());
23 results
24}
25
26fn test_hash_join_atomic_correctness(ctx: &TestContext) -> TestResult {
31 let start = Instant::now();
32
33 let left_schema = Schema::new(vec![
34 ("key".to_string(), ScalarType::U32),
35 ("lval".to_string(), ScalarType::U32),
36 ]);
37 let right_schema = Schema::new(vec![
38 ("key".to_string(), ScalarType::U32),
39 ("rval".to_string(), ScalarType::U32),
40 ]);
41
42 let test_cases: Vec<(usize, usize, f64)> = vec![
44 (1000, 500, 0.5), (5000, 2500, 0.5), (10000, 1000, 1.0), (10000, 5000, 0.8), ];
49
50 for (left_size, right_size, match_rate) in test_cases {
51 let left_keys: Vec<u32> = (0..left_size as u32).collect();
53 let left_vals: Vec<u32> = left_keys.iter().map(|&k| k * 10).collect();
54
55 let matching_right = (right_size as f64 * match_rate) as usize;
57 let mut right_keys: Vec<u32> = Vec::with_capacity(right_size);
58 let mut right_vals: Vec<u32> = Vec::with_capacity(right_size);
59
60 for i in 0..matching_right {
62 let key = (i * left_size / matching_right.max(1)) as u32;
63 right_keys.push(key);
64 right_vals.push(key * 100);
65 }
66 for i in matching_right..right_size {
68 let key = (left_size as u32) + (i as u32);
69 right_keys.push(key);
70 right_vals.push(key * 100);
71 }
72
73 let left_buffer = match ctx
74 .provider
75 .create_buffer_from_u32_columns(&[&left_keys, &left_vals], left_schema.clone())
76 {
77 Ok(buf) => buf,
78 Err(e) => {
79 return TestResult::error(
80 "test_hash_join_atomic_correctness",
81 start.elapsed(),
82 format!("Left {}: failed to create buffer: {}", left_size, e),
83 )
84 }
85 };
86
87 let right_buffer = match ctx
88 .provider
89 .create_buffer_from_u32_columns(&[&right_keys, &right_vals], right_schema.clone())
90 {
91 Ok(buf) => buf,
92 Err(e) => {
93 return TestResult::error(
94 "test_hash_join_atomic_correctness",
95 start.elapsed(),
96 format!("Right {}: failed to create buffer: {}", right_size, e),
97 )
98 }
99 };
100
101 let joined = match ctx
103 .provider
104 .hash_join(&left_buffer, &right_buffer, &[0], &[0])
105 {
106 Ok(j) => j,
107 Err(e) => {
108 return TestResult::error(
109 "test_hash_join_atomic_correctness",
110 start.elapsed(),
111 format!("Hash join failed ({}x{}): {}", left_size, right_size, e),
112 )
113 }
114 };
115
116 if ctx.device_row_count(&joined) != matching_right as u64 {
118 return TestResult::error(
119 "test_hash_join_atomic_correctness",
120 start.elapsed(),
121 format!(
122 "Join ({}x{}) returned {} rows, expected {}",
123 left_size,
124 right_size,
125 ctx.device_row_count(&joined),
126 matching_right
127 ),
128 );
129 }
130
131 let joined_keys = match ctx.provider.download_column::<u32>(&joined, 0) {
133 Ok(d) => d,
134 Err(e) => {
135 return TestResult::error(
136 "test_hash_join_atomic_correctness",
137 start.elapsed(),
138 format!("Failed to download joined keys: {}", e),
139 )
140 }
141 };
142
143 let joined_lvals = match ctx.provider.download_column::<u32>(&joined, 1) {
144 Ok(d) => d,
145 Err(e) => {
146 return TestResult::error(
147 "test_hash_join_atomic_correctness",
148 start.elapsed(),
149 format!("Failed to download joined lvals: {}", e),
150 )
151 }
152 };
153
154 let joined_rvals = match ctx.provider.download_column::<u32>(&joined, 2) {
155 Ok(d) => d,
156 Err(e) => {
157 return TestResult::error(
158 "test_hash_join_atomic_correctness",
159 start.elapsed(),
160 format!("Failed to download joined rvals: {}", e),
161 )
162 }
163 };
164
165 for i in 0..ctx.device_row_count(&joined) as usize {
167 let key = joined_keys[i];
168 let lval = joined_lvals[i];
169 let rval = joined_rvals[i];
170
171 if key >= left_size as u32 {
173 return TestResult::error(
174 "test_hash_join_atomic_correctness",
175 start.elapsed(),
176 format!(
177 "Row {}: key {} is outside left table range [0, {})",
178 i, key, left_size
179 ),
180 );
181 }
182
183 if lval != key * 10 {
185 return TestResult::error(
186 "test_hash_join_atomic_correctness",
187 start.elapsed(),
188 format!(
189 "Row {}: lval {} doesn't match expected {} for key {}",
190 i,
191 lval,
192 key * 10,
193 key
194 ),
195 );
196 }
197
198 if rval != key * 100 {
200 return TestResult::error(
201 "test_hash_join_atomic_correctness",
202 start.elapsed(),
203 format!(
204 "Row {}: rval {} doesn't match expected {} for key {}",
205 i,
206 rval,
207 key * 100,
208 key
209 ),
210 );
211 }
212 }
213 }
214
215 if let Err(e) = ctx.sync_and_check() {
216 return TestResult::error(
217 "test_hash_join_atomic_correctness",
218 start.elapsed(),
219 format!("Sync failed: {}", e),
220 );
221 }
222
223 TestResult::passed("test_hash_join_atomic_correctness", start.elapsed())
224}
225
226fn test_dedup_atomic_correctness(ctx: &TestContext) -> TestResult {
231 let start = Instant::now();
232 let schema = Schema::new(vec![
233 ("key".to_string(), ScalarType::U32),
234 ("val".to_string(), ScalarType::U32),
235 ]);
236
237 let test_cases: Vec<(&str, Vec<u32>, Vec<u32>)> = vec![
239 ("all_same", vec![42; 10000], (0..10000u32).collect()),
241 (
242 "pairs",
243 (0..5000u32).flat_map(|i| vec![i, i]).collect(),
244 (0..10000u32).collect(),
245 ),
246 (
247 "random_dups",
248 (0..10000usize)
249 .map(|i| ((i * 1103515245 + 12345) % 1000) as u32)
250 .collect(),
251 (0..10000u32).collect(),
252 ),
253 (
254 "clustered_dups",
255 (0..10000usize).map(|i| (i / 10) as u32).collect(),
256 (0..10000u32).collect(),
257 ),
258 (
259 "sparse_dups",
260 (0..10000usize)
261 .map(|i| if i % 100 == 0 { 0 } else { i as u32 })
262 .collect(),
263 (0..10000u32).collect(),
264 ),
265 ];
266
267 for (name, keys, vals) in test_cases {
268 let buffer = match ctx
269 .provider
270 .create_buffer_from_u32_columns(&[&keys, &vals], schema.clone())
271 {
272 Ok(buf) => buf,
273 Err(e) => {
274 return TestResult::error(
275 "test_dedup_atomic_correctness",
276 start.elapsed(),
277 format!("Pattern {}: failed to create buffer: {}", name, e),
278 )
279 }
280 };
281
282 let deduped = match ctx.provider.dedup(&buffer, &[0]) {
284 Ok(d) => d,
285 Err(e) => {
286 return TestResult::error(
287 "test_dedup_atomic_correctness",
288 start.elapsed(),
289 format!("Pattern {}: dedup failed: {}", name, e),
290 )
291 }
292 };
293
294 let unique_keys: HashSet<u32> = keys.iter().cloned().collect();
296 let expected_unique = unique_keys.len();
297
298 if ctx.device_row_count(&deduped) != expected_unique as u64 {
299 return TestResult::error(
300 "test_dedup_atomic_correctness",
301 start.elapsed(),
302 format!(
303 "Pattern {}: dedup returned {} rows, expected {}",
304 name,
305 ctx.device_row_count(&deduped),
306 expected_unique
307 ),
308 );
309 }
310
311 let deduped_keys = match ctx.provider.download_column::<u32>(&deduped, 0) {
313 Ok(d) => d,
314 Err(e) => {
315 return TestResult::error(
316 "test_dedup_atomic_correctness",
317 start.elapsed(),
318 format!("Pattern {}: failed to download deduped keys: {}", name, e),
319 )
320 }
321 };
322
323 let mut seen_keys: HashSet<u32> = HashSet::new();
325 for &k in &deduped_keys {
326 if !seen_keys.insert(k) {
327 return TestResult::error(
328 "test_dedup_atomic_correctness",
329 start.elapsed(),
330 format!("Pattern {}: duplicate key {} in dedup result", name, k),
331 );
332 }
333 }
334
335 if seen_keys != unique_keys {
337 return TestResult::error(
338 "test_dedup_atomic_correctness",
339 start.elapsed(),
340 format!("Pattern {}: dedup result missing some unique keys", name),
341 );
342 }
343 }
344
345 if let Err(e) = ctx.sync_and_check() {
346 return TestResult::error(
347 "test_dedup_atomic_correctness",
348 start.elapsed(),
349 format!("Sync failed: {}", e),
350 );
351 }
352
353 TestResult::passed("test_dedup_atomic_correctness", start.elapsed())
354}
355
356fn test_high_contention_join(ctx: &TestContext) -> TestResult {
362 let start = Instant::now();
363
364 let left_schema = Schema::new(vec![
365 ("key".to_string(), ScalarType::U32),
366 ("lval".to_string(), ScalarType::U32),
367 ]);
368 let right_schema = Schema::new(vec![
369 ("key".to_string(), ScalarType::U32),
370 ("rval".to_string(), ScalarType::U32),
371 ]);
372
373 const SIZE: usize = 5000;
376
377 let collision_keys: Vec<u32> = (0..SIZE).map(|i| (i * 256) as u32).collect();
378 let left_vals: Vec<u32> = collision_keys.iter().map(|&k| k + 1).collect();
379 let right_vals: Vec<u32> = collision_keys.iter().map(|&k| k + 2).collect();
380
381 let left_buffer = match ctx
382 .provider
383 .create_buffer_from_u32_columns(&[&collision_keys, &left_vals], left_schema.clone())
384 {
385 Ok(buf) => buf,
386 Err(e) => {
387 return TestResult::error(
388 "test_high_contention_join",
389 start.elapsed(),
390 format!("Failed to create left buffer: {}", e),
391 )
392 }
393 };
394
395 let right_buffer = match ctx
396 .provider
397 .create_buffer_from_u32_columns(&[&collision_keys, &right_vals], right_schema.clone())
398 {
399 Ok(buf) => buf,
400 Err(e) => {
401 return TestResult::error(
402 "test_high_contention_join",
403 start.elapsed(),
404 format!("Failed to create right buffer: {}", e),
405 )
406 }
407 };
408
409 let joined = match ctx
411 .provider
412 .hash_join(&left_buffer, &right_buffer, &[0], &[0])
413 {
414 Ok(j) => j,
415 Err(e) => {
416 return TestResult::error(
417 "test_high_contention_join",
418 start.elapsed(),
419 format!("Hash join with collision keys failed: {}", e),
420 )
421 }
422 };
423
424 if ctx.device_row_count(&joined) != SIZE as u64 {
426 return TestResult::error(
427 "test_high_contention_join",
428 start.elapsed(),
429 format!(
430 "Collision join returned {} rows, expected {}",
431 ctx.device_row_count(&joined),
432 SIZE
433 ),
434 );
435 }
436
437 let joined_keys = match ctx.provider.download_column::<u32>(&joined, 0) {
439 Ok(d) => d,
440 Err(e) => {
441 return TestResult::error(
442 "test_high_contention_join",
443 start.elapsed(),
444 format!("Failed to download collision join keys: {}", e),
445 )
446 }
447 };
448
449 let joined_lvals = match ctx.provider.download_column::<u32>(&joined, 1) {
450 Ok(d) => d,
451 Err(e) => {
452 return TestResult::error(
453 "test_high_contention_join",
454 start.elapsed(),
455 format!("Failed to download collision join lvals: {}", e),
456 )
457 }
458 };
459
460 let joined_rvals = match ctx.provider.download_column::<u32>(&joined, 2) {
461 Ok(d) => d,
462 Err(e) => {
463 return TestResult::error(
464 "test_high_contention_join",
465 start.elapsed(),
466 format!("Failed to download collision join rvals: {}", e),
467 )
468 }
469 };
470
471 for i in 0..ctx.device_row_count(&joined) as usize {
473 let key = joined_keys[i];
474 let lval = joined_lvals[i];
475 let rval = joined_rvals[i];
476
477 if lval != key + 1 {
478 return TestResult::error(
479 "test_high_contention_join",
480 start.elapsed(),
481 format!(
482 "Collision row {}: lval {} != key + 1 = {}",
483 i,
484 lval,
485 key + 1
486 ),
487 );
488 }
489
490 if rval != key + 2 {
491 return TestResult::error(
492 "test_high_contention_join",
493 start.elapsed(),
494 format!(
495 "Collision row {}: rval {} != key + 2 = {}",
496 i,
497 rval,
498 key + 2
499 ),
500 );
501 }
502 }
503
504 let seen_keys: HashSet<u32> = joined_keys.iter().cloned().collect();
506 let expected_keys: HashSet<u32> = collision_keys.iter().cloned().collect();
507 if seen_keys != expected_keys {
508 return TestResult::error(
509 "test_high_contention_join",
510 start.elapsed(),
511 format!(
512 "Missing keys in collision join: expected {}, got {}",
513 expected_keys.len(),
514 seen_keys.len()
515 ),
516 );
517 }
518
519 let same_key_left: Vec<u32> = vec![12345; 1000];
521 let same_key_left_vals: Vec<u32> = (0..1000u32).collect();
522 let same_key_right: Vec<u32> = vec![12345; 100];
523 let same_key_right_vals: Vec<u32> = (0..100u32).map(|i| i * 1000).collect();
524
525 let left_same = match ctx
526 .provider
527 .create_buffer_from_u32_columns(&[&same_key_left, &same_key_left_vals], left_schema.clone())
528 {
529 Ok(buf) => buf,
530 Err(e) => {
531 return TestResult::error(
532 "test_high_contention_join",
533 start.elapsed(),
534 format!("Failed to create same-key left buffer: {}", e),
535 )
536 }
537 };
538
539 let right_same = match ctx.provider.create_buffer_from_u32_columns(
540 &[&same_key_right, &same_key_right_vals],
541 right_schema.clone(),
542 ) {
543 Ok(buf) => buf,
544 Err(e) => {
545 return TestResult::error(
546 "test_high_contention_join",
547 start.elapsed(),
548 format!("Failed to create same-key right buffer: {}", e),
549 )
550 }
551 };
552
553 let same_joined = match ctx.provider.hash_join(&left_same, &right_same, &[0], &[0]) {
555 Ok(j) => j,
556 Err(e) => {
557 return TestResult::error(
558 "test_high_contention_join",
559 start.elapsed(),
560 format!("Same-key hash join failed: {}", e),
561 )
562 }
563 };
564
565 let expected_same = 1000 * 100;
566 if ctx.device_row_count(&same_joined) != expected_same as u64 {
567 return TestResult::error(
568 "test_high_contention_join",
569 start.elapsed(),
570 format!(
571 "Same-key join returned {} rows, expected {} (Cartesian)",
572 ctx.device_row_count(&same_joined),
573 expected_same
574 ),
575 );
576 }
577
578 let same_joined_keys = match ctx.provider.download_column::<u32>(&same_joined, 0) {
580 Ok(d) => d,
581 Err(e) => {
582 return TestResult::error(
583 "test_high_contention_join",
584 start.elapsed(),
585 format!("Failed to download same-key join keys: {}", e),
586 )
587 }
588 };
589
590 for (i, &key) in same_joined_keys.iter().enumerate() {
591 if key != 12345 {
592 return TestResult::error(
593 "test_high_contention_join",
594 start.elapsed(),
595 format!("Same-key row {}: key {} != 12345", i, key),
596 );
597 }
598 }
599
600 if let Err(e) = ctx.sync_and_check() {
601 return TestResult::error(
602 "test_high_contention_join",
603 start.elapsed(),
604 format!("Sync failed: {}", e),
605 );
606 }
607
608 TestResult::passed("test_high_contention_join", start.elapsed())
609}
610
611fn test_atomic_counting(ctx: &TestContext) -> TestResult {
616 let start = Instant::now();
617 let schema = Schema::new(vec![("val".to_string(), ScalarType::U32)]);
618
619 let test_cases: Vec<(usize, Box<dyn Fn(usize) -> bool>)> = vec![
621 (1000, Box::new(|i| i % 2 == 0)), (10000, Box::new(|i| i % 10 == 0)), (50000, Box::new(|i| i < 10000)), (100000, Box::new(|i| i % 7 == 0)), ];
627
628 for (size, predicate) in test_cases {
629 let mask: Vec<u8> = (0..size)
631 .map(|i| if predicate(i) { 1 } else { 0 })
632 .collect();
633 let expected_count: usize = mask.iter().map(|&m| m as usize).sum();
634
635 let (prefix_sums, total) = match ctx.provider.prefix_sum_mask(&mask) {
637 Ok(result) => result,
638 Err(e) => {
639 return TestResult::error(
640 "test_atomic_counting",
641 start.elapsed(),
642 format!("Size {}: prefix_sum_mask failed: {}", size, e),
643 )
644 }
645 };
646
647 if total != expected_count as u32 {
649 return TestResult::error(
650 "test_atomic_counting",
651 start.elapsed(),
652 format!(
653 "Size {}: prefix_sum total {} != expected {}",
654 size, total, expected_count
655 ),
656 );
657 }
658
659 let mut running_sum = 0u32;
661 for (i, &m) in mask.iter().enumerate() {
662 if prefix_sums[i] != running_sum {
663 return TestResult::error(
664 "test_atomic_counting",
665 start.elapsed(),
666 format!(
667 "Size {}: prefix_sums[{}] = {}, expected {}",
668 size, i, prefix_sums[i], running_sum
669 ),
670 );
671 }
672 running_sum += m as u32;
673 }
674
675 if running_sum != total {
677 return TestResult::error(
678 "test_atomic_counting",
679 start.elapsed(),
680 format!(
681 "Size {}: final sum {} != total {}",
682 size, running_sum, total
683 ),
684 );
685 }
686
687 let data: Vec<u32> = (0..size as u32).collect();
689
690 let buffer = match ctx
691 .provider
692 .create_buffer_from_slice::<u32>(&data, schema.clone())
693 {
694 Ok(buf) => buf,
695 Err(e) => {
696 return TestResult::error(
697 "test_atomic_counting",
698 start.elapsed(),
699 format!("Size {}: failed to create buffer: {}", size, e),
700 )
701 }
702 };
703
704 let filtered = match ctx.provider.filter_by_mask(&buffer, &mask) {
705 Ok(f) => f,
706 Err(e) => {
707 return TestResult::error(
708 "test_atomic_counting",
709 start.elapsed(),
710 format!("Size {}: filter failed: {}", size, e),
711 )
712 }
713 };
714
715 if ctx.device_row_count(&filtered) != expected_count as u64 {
716 return TestResult::error(
717 "test_atomic_counting",
718 start.elapsed(),
719 format!(
720 "Size {}: filter returned {} rows, expected {}",
721 size,
722 ctx.device_row_count(&filtered),
723 expected_count
724 ),
725 );
726 }
727
728 let filtered_data = match ctx.provider.download_column::<u32>(&filtered, 0) {
730 Ok(d) => d,
731 Err(e) => {
732 return TestResult::error(
733 "test_atomic_counting",
734 start.elapsed(),
735 format!("Size {}: failed to download filtered: {}", size, e),
736 )
737 }
738 };
739
740 let mut expected_idx = 0;
741 for (i, &m) in mask.iter().enumerate() {
742 if m == 1 {
743 if expected_idx >= filtered_data.len() {
744 return TestResult::error(
745 "test_atomic_counting",
746 start.elapsed(),
747 format!(
748 "Size {}: filtered data too short at expected index {}",
749 size, expected_idx
750 ),
751 );
752 }
753 if filtered_data[expected_idx] != i as u32 {
754 return TestResult::error(
755 "test_atomic_counting",
756 start.elapsed(),
757 format!(
758 "Size {}: filtered[{}] = {}, expected {}",
759 size, expected_idx, filtered_data[expected_idx], i
760 ),
761 );
762 }
763 expected_idx += 1;
764 }
765 }
766 }
767
768 if let Err(e) = ctx.sync_and_check() {
769 return TestResult::error(
770 "test_atomic_counting",
771 start.elapsed(),
772 format!("Sync failed: {}", e),
773 );
774 }
775
776 TestResult::passed("test_atomic_counting", start.elapsed())
777}
778
779fn test_concurrent_atomic_updates(ctx: &TestContext) -> TestResult {
784 let start = Instant::now();
785
786 let schema = Schema::new(vec![
787 ("key".to_string(), ScalarType::U32),
788 ("val".to_string(), ScalarType::U32),
789 ]);
790
791 const NUM_ITERATIONS: usize = 5;
793 const SIZE: usize = 10000;
794
795 for iteration in 0..NUM_ITERATIONS {
796 let keys: Vec<u32> = (0..SIZE).map(|i| (i + iteration * 1000) as u32).collect();
798 let vals: Vec<u32> = keys.iter().map(|&k| k * 10).collect();
799
800 let buffer = match ctx
801 .provider
802 .create_buffer_from_u32_columns(&[&keys, &vals], schema.clone())
803 {
804 Ok(buf) => buf,
805 Err(e) => {
806 return TestResult::error(
807 "test_concurrent_atomic_updates",
808 start.elapsed(),
809 format!("Iteration {}: failed to create buffer: {}", iteration, e),
810 )
811 }
812 };
813
814 let deduped = match ctx.provider.dedup(&buffer, &[0]) {
816 Ok(d) => d,
817 Err(e) => {
818 return TestResult::error(
819 "test_concurrent_atomic_updates",
820 start.elapsed(),
821 format!("Iteration {}: dedup failed: {}", iteration, e),
822 )
823 }
824 };
825
826 if ctx.device_row_count(&deduped) != SIZE as u64 {
828 return TestResult::error(
829 "test_concurrent_atomic_updates",
830 start.elapsed(),
831 format!(
832 "Iteration {}: dedup returned {} rows, expected {}",
833 iteration,
834 ctx.device_row_count(&deduped),
835 SIZE
836 ),
837 );
838 }
839
840 let mask: Vec<u8> = (0..SIZE).map(|i| if i % 3 == 0 { 1 } else { 0 }).collect();
842 let expected_filter = (SIZE + 2) / 3;
843
844 let filtered = match ctx.provider.filter_by_mask(&buffer, &mask) {
845 Ok(f) => f,
846 Err(e) => {
847 return TestResult::error(
848 "test_concurrent_atomic_updates",
849 start.elapsed(),
850 format!("Iteration {}: filter failed: {}", iteration, e),
851 )
852 }
853 };
854
855 if ctx.device_row_count(&filtered) != expected_filter as u64 {
856 return TestResult::error(
857 "test_concurrent_atomic_updates",
858 start.elapsed(),
859 format!(
860 "Iteration {}: filter returned {} rows, expected {}",
861 iteration,
862 ctx.device_row_count(&filtered),
863 expected_filter
864 ),
865 );
866 }
867
868 let joined = match ctx.provider.hash_join(&buffer, &buffer, &[0], &[0]) {
871 Ok(j) => j,
872 Err(e) => {
873 return TestResult::error(
874 "test_concurrent_atomic_updates",
875 start.elapsed(),
876 format!("Iteration {}: self-join failed: {}", iteration, e),
877 )
878 }
879 };
880
881 if ctx.device_row_count(&joined) != SIZE as u64 {
883 return TestResult::error(
884 "test_concurrent_atomic_updates",
885 start.elapsed(),
886 format!(
887 "Iteration {}: self-join returned {} rows, expected {}",
888 iteration,
889 ctx.device_row_count(&joined),
890 SIZE
891 ),
892 );
893 }
894
895 let joined_keys = match ctx.provider.download_column::<u32>(&joined, 0) {
897 Ok(d) => d,
898 Err(e) => {
899 return TestResult::error(
900 "test_concurrent_atomic_updates",
901 start.elapsed(),
902 format!(
903 "Iteration {}: failed to download join keys: {}",
904 iteration, e
905 ),
906 )
907 }
908 };
909
910 let joined_key_set: HashSet<u32> = joined_keys.iter().cloned().collect();
912 let original_key_set: HashSet<u32> = keys.iter().cloned().collect();
913
914 if joined_key_set != original_key_set {
915 return TestResult::error(
916 "test_concurrent_atomic_updates",
917 start.elapsed(),
918 format!(
919 "Iteration {}: self-join keys don't match original ({} vs {})",
920 iteration,
921 joined_key_set.len(),
922 original_key_set.len()
923 ),
924 );
925 }
926
927 let deduped2 = match ctx.provider.dedup(&buffer, &[0]) {
929 Ok(d) => d,
930 Err(e) => {
931 return TestResult::error(
932 "test_concurrent_atomic_updates",
933 start.elapsed(),
934 format!("Iteration {}: second dedup failed: {}", iteration, e),
935 )
936 }
937 };
938
939 if ctx.device_row_count(&deduped2) != SIZE as u64 {
940 return TestResult::error(
941 "test_concurrent_atomic_updates",
942 start.elapsed(),
943 format!(
944 "Iteration {}: second dedup returned {} rows, expected {} (atomics may not have reset)",
945 iteration, ctx.device_row_count(&deduped2), SIZE
946 ),
947 );
948 }
949 }
950
951 if let Err(e) = ctx.sync_and_check() {
952 return TestResult::error(
953 "test_concurrent_atomic_updates",
954 start.elapsed(),
955 format!("Sync failed: {}", e),
956 );
957 }
958
959 TestResult::passed("test_concurrent_atomic_updates", start.elapsed())
960}