1use crate::harness::{CategoryResult, TestContext, TestResult};
8use std::time::Instant;
9use xlog_core::{ScalarType, Schema};
10
11pub fn run_all(ctx: &TestContext) -> CategoryResult {
13 let mut results = CategoryResult::new("c21_hardware");
14 let start = Instant::now();
15
16 results.add_result(test_error_detection(ctx));
17 results.add_result(test_recovery_after_error(ctx));
18 results.add_result(test_stress_operations(ctx));
19 results.add_result(test_memory_pressure(ctx));
20 results.add_result(test_sustained_operation(ctx));
21
22 results.set_duration(start.elapsed());
23 results
24}
25
26fn test_error_detection(ctx: &TestContext) -> TestResult {
31 let start = Instant::now();
32 let schema = Schema::new(vec![("val".to_string(), ScalarType::U32)]);
33
34 let data: Vec<u32> = (0..10000u32).collect();
36
37 let buffer = match ctx
38 .provider
39 .create_buffer_from_slice::<u32>(&data, schema.clone())
40 {
41 Ok(buf) => buf,
42 Err(e) => {
43 return TestResult::error(
44 "test_error_detection",
45 start.elapsed(),
46 format!("Buffer creation failed: {}", e),
47 )
48 }
49 };
50
51 let sorted = match ctx.provider.sort(&buffer, &[0]) {
52 Ok(s) => s,
53 Err(e) => {
54 return TestResult::error(
55 "test_error_detection",
56 start.elapsed(),
57 format!("Sort failed: {}", e),
58 )
59 }
60 };
61
62 if let Err(e) = ctx.sync_and_check() {
64 return TestResult::error(
65 "test_error_detection",
66 start.elapsed(),
67 format!("sync_and_check failed for valid operation: {}", e),
68 );
69 }
70
71 let result = match ctx.provider.download_column::<u32>(&sorted, 0) {
73 Ok(d) => d,
74 Err(e) => {
75 return TestResult::error(
76 "test_error_detection",
77 start.elapsed(),
78 format!("Download failed: {}", e),
79 )
80 }
81 };
82
83 for (i, &val) in result.iter().enumerate() {
84 if val != i as u32 {
85 return TestResult::error(
86 "test_error_detection",
87 start.elapsed(),
88 format!(
89 "Result incorrect at index {}: expected {}, got {}",
90 i, i, val
91 ),
92 );
93 }
94 }
95
96 for i in 0..5 {
98 let check_data: Vec<u32> = (0..1000u32).map(|j| j + i * 1000).collect();
99
100 let check_buffer = match ctx
101 .provider
102 .create_buffer_from_slice::<u32>(&check_data, schema.clone())
103 {
104 Ok(buf) => buf,
105 Err(e) => {
106 return TestResult::error(
107 "test_error_detection",
108 start.elapsed(),
109 format!("Check {} buffer creation failed: {}", i, e),
110 )
111 }
112 };
113
114 let _check_sorted = match ctx.provider.sort(&check_buffer, &[0]) {
115 Ok(s) => s,
116 Err(e) => {
117 return TestResult::error(
118 "test_error_detection",
119 start.elapsed(),
120 format!("Check {} sort failed: {}", i, e),
121 )
122 }
123 };
124
125 if let Err(e) = ctx.sync_and_check() {
126 return TestResult::error(
127 "test_error_detection",
128 start.elapsed(),
129 format!("Check {} sync_and_check failed: {}", i, e),
130 );
131 }
132 }
133
134 let empty_data: Vec<u32> = vec![];
136 let empty_buffer = match ctx
137 .provider
138 .create_buffer_from_slice::<u32>(&empty_data, schema.clone())
139 {
140 Ok(buf) => buf,
141 Err(e) => {
142 return TestResult::error(
143 "test_error_detection",
144 start.elapsed(),
145 format!("Empty buffer creation failed: {}", e),
146 )
147 }
148 };
149
150 let _empty_sorted = match ctx.provider.sort(&empty_buffer, &[0]) {
151 Ok(s) => s,
152 Err(e) => {
153 return TestResult::error(
154 "test_error_detection",
155 start.elapsed(),
156 format!("Empty sort failed: {}", e),
157 )
158 }
159 };
160
161 if let Err(e) = ctx.sync_and_check() {
162 return TestResult::error(
163 "test_error_detection",
164 start.elapsed(),
165 format!("sync_and_check failed for empty operation: {}", e),
166 );
167 }
168
169 let single_data: Vec<u32> = vec![42];
171 let single_buffer = match ctx
172 .provider
173 .create_buffer_from_slice::<u32>(&single_data, schema.clone())
174 {
175 Ok(buf) => buf,
176 Err(e) => {
177 return TestResult::error(
178 "test_error_detection",
179 start.elapsed(),
180 format!("Single buffer creation failed: {}", e),
181 )
182 }
183 };
184
185 let single_sorted = match ctx.provider.sort(&single_buffer, &[0]) {
186 Ok(s) => s,
187 Err(e) => {
188 return TestResult::error(
189 "test_error_detection",
190 start.elapsed(),
191 format!("Single sort failed: {}", e),
192 )
193 }
194 };
195
196 if let Err(e) = ctx.sync_and_check() {
197 return TestResult::error(
198 "test_error_detection",
199 start.elapsed(),
200 format!("sync_and_check failed for single element: {}", e),
201 );
202 }
203
204 let single_result = match ctx.provider.download_column::<u32>(&single_sorted, 0) {
205 Ok(d) => d,
206 Err(e) => {
207 return TestResult::error(
208 "test_error_detection",
209 start.elapsed(),
210 format!("Single download failed: {}", e),
211 )
212 }
213 };
214
215 if single_result != vec![42] {
216 return TestResult::error(
217 "test_error_detection",
218 start.elapsed(),
219 format!("Single result incorrect: {:?}", single_result),
220 );
221 }
222
223 if let Err(e) = ctx.sync_and_check() {
224 return TestResult::error(
225 "test_error_detection",
226 start.elapsed(),
227 format!("Final sync failed: {}", e),
228 );
229 }
230
231 TestResult::passed("test_error_detection", start.elapsed())
232}
233
234fn test_recovery_after_error(ctx: &TestContext) -> TestResult {
239 let start = Instant::now();
240 let schema = Schema::new(vec![("val".to_string(), ScalarType::U32)]);
241
242 let data1: Vec<u32> = (0..5000u32).collect();
245 let buffer1 = match ctx
246 .provider
247 .create_buffer_from_slice::<u32>(&data1, schema.clone())
248 {
249 Ok(buf) => buf,
250 Err(e) => {
251 return TestResult::error(
252 "test_recovery_after_error",
253 start.elapsed(),
254 format!("Buffer1 creation failed: {}", e),
255 )
256 }
257 };
258
259 let sorted1 = match ctx.provider.sort(&buffer1, &[0]) {
260 Ok(s) => s,
261 Err(e) => {
262 return TestResult::error(
263 "test_recovery_after_error",
264 start.elapsed(),
265 format!("Sort1 failed: {}", e),
266 )
267 }
268 };
269
270 if let Err(e) = ctx.sync_and_check() {
271 return TestResult::error(
272 "test_recovery_after_error",
273 start.elapsed(),
274 format!("Sync1 failed: {}", e),
275 );
276 }
277
278 let edge_data: Vec<u32> = vec![1, 2, 3, 4, 5];
280 let edge_buffer = match ctx
281 .provider
282 .create_buffer_from_slice::<u32>(&edge_data, schema.clone())
283 {
284 Ok(buf) => buf,
285 Err(e) => {
286 return TestResult::error(
287 "test_recovery_after_error",
288 start.elapsed(),
289 format!("Edge buffer creation failed: {}", e),
290 )
291 }
292 };
293
294 let wrong_mask: Vec<u8> = vec![1, 0, 1]; let _edge_result = ctx.provider.filter_by_mask(&edge_buffer, &wrong_mask);
297
298 let _ = ctx.sync_and_check();
300
301 let data2: Vec<u32> = (0..10000u32).collect();
303 let buffer2 = match ctx
304 .provider
305 .create_buffer_from_slice::<u32>(&data2, schema.clone())
306 {
307 Ok(buf) => buf,
308 Err(e) => {
309 return TestResult::error(
310 "test_recovery_after_error",
311 start.elapsed(),
312 format!("Recovery buffer creation failed: {}", e),
313 )
314 }
315 };
316
317 let sorted2 = match ctx.provider.sort(&buffer2, &[0]) {
318 Ok(s) => s,
319 Err(e) => {
320 return TestResult::error(
321 "test_recovery_after_error",
322 start.elapsed(),
323 format!("Recovery sort failed: {}", e),
324 )
325 }
326 };
327
328 if let Err(e) = ctx.sync_and_check() {
329 return TestResult::error(
330 "test_recovery_after_error",
331 start.elapsed(),
332 format!("Recovery sync failed: {}", e),
333 );
334 }
335
336 let result2 = match ctx.provider.download_column::<u32>(&sorted2, 0) {
338 Ok(d) => d,
339 Err(e) => {
340 return TestResult::error(
341 "test_recovery_after_error",
342 start.elapsed(),
343 format!("Recovery download failed: {}", e),
344 )
345 }
346 };
347
348 for (i, &val) in result2.iter().enumerate() {
349 if val != i as u32 {
350 return TestResult::error(
351 "test_recovery_after_error",
352 start.elapsed(),
353 format!(
354 "Recovery result incorrect at {}: expected {}, got {}",
355 i, i, val
356 ),
357 );
358 }
359 }
360
361 for cycle in 0..3 {
363 let valid_data: Vec<u32> = (0..1000u32).map(|j| j + cycle * 1000).collect();
365 let valid_buffer = match ctx
366 .provider
367 .create_buffer_from_slice::<u32>(&valid_data, schema.clone())
368 {
369 Ok(buf) => buf,
370 Err(e) => {
371 return TestResult::error(
372 "test_recovery_after_error",
373 start.elapsed(),
374 format!("Cycle {} valid buffer failed: {}", cycle, e),
375 )
376 }
377 };
378
379 let valid_sorted = match ctx.provider.sort(&valid_buffer, &[0]) {
380 Ok(s) => s,
381 Err(e) => {
382 return TestResult::error(
383 "test_recovery_after_error",
384 start.elapsed(),
385 format!("Cycle {} valid sort failed: {}", cycle, e),
386 )
387 }
388 };
389
390 if let Err(e) = ctx.sync_and_check() {
391 return TestResult::error(
392 "test_recovery_after_error",
393 start.elapsed(),
394 format!("Cycle {} valid sync failed: {}", cycle, e),
395 );
396 }
397
398 let valid_result = match ctx.provider.download_column::<u32>(&valid_sorted, 0) {
400 Ok(d) => d,
401 Err(e) => {
402 return TestResult::error(
403 "test_recovery_after_error",
404 start.elapsed(),
405 format!("Cycle {} valid download failed: {}", cycle, e),
406 )
407 }
408 };
409
410 if valid_result.len() != 1000 {
411 return TestResult::error(
412 "test_recovery_after_error",
413 start.elapsed(),
414 format!(
415 "Cycle {}: expected 1000 rows, got {}",
416 cycle,
417 valid_result.len()
418 ),
419 );
420 }
421 }
422
423 let result1 = match ctx.provider.download_column::<u32>(&sorted1, 0) {
425 Ok(d) => d,
426 Err(e) => {
427 return TestResult::error(
428 "test_recovery_after_error",
429 start.elapsed(),
430 format!("Previous result download failed: {}", e),
431 )
432 }
433 };
434
435 for (i, &val) in result1.iter().enumerate() {
436 if val != i as u32 {
437 return TestResult::error(
438 "test_recovery_after_error",
439 start.elapsed(),
440 format!(
441 "Previous result corrupted at {}: expected {}, got {}",
442 i, i, val
443 ),
444 );
445 }
446 }
447
448 if let Err(e) = ctx.sync_and_check() {
449 return TestResult::error(
450 "test_recovery_after_error",
451 start.elapsed(),
452 format!("Final sync failed: {}", e),
453 );
454 }
455
456 TestResult::passed("test_recovery_after_error", start.elapsed())
457}
458
459fn test_stress_operations(ctx: &TestContext) -> TestResult {
464 let start = Instant::now();
465 let schema = Schema::new(vec![("val".to_string(), ScalarType::U32)]);
466
467 const NUM_OPERATIONS: usize = 200;
468 const DATA_SIZE: usize = 10000;
469
470 for i in 0..NUM_OPERATIONS {
472 let data: Vec<u32> = (0..DATA_SIZE)
473 .map(|j| ((j * (i + 1) * 1103515245 + 12345) % DATA_SIZE) as u32)
474 .collect();
475
476 let buffer = match ctx
477 .provider
478 .create_buffer_from_slice::<u32>(&data, schema.clone())
479 {
480 Ok(buf) => buf,
481 Err(e) => {
482 return TestResult::error(
483 "test_stress_operations",
484 start.elapsed(),
485 format!("Stress sort {}: buffer creation failed: {}", i, e),
486 )
487 }
488 };
489
490 let sorted = match ctx.provider.sort(&buffer, &[0]) {
491 Ok(s) => s,
492 Err(e) => {
493 return TestResult::error(
494 "test_stress_operations",
495 start.elapsed(),
496 format!("Stress sort {}: sort failed: {}", i, e),
497 )
498 }
499 };
500
501 if i % 20 == 0 {
503 let result = match ctx.provider.download_column::<u32>(&sorted, 0) {
504 Ok(d) => d,
505 Err(e) => {
506 return TestResult::error(
507 "test_stress_operations",
508 start.elapsed(),
509 format!("Stress sort {}: download failed: {}", i, e),
510 )
511 }
512 };
513
514 for j in 1..result.len() {
515 if result[j] < result[j - 1] {
516 return TestResult::error(
517 "test_stress_operations",
518 start.elapsed(),
519 format!("Stress sort {}: incorrect at index {}", i, j),
520 );
521 }
522 }
523
524 if let Err(e) = ctx.sync_and_check() {
525 return TestResult::error(
526 "test_stress_operations",
527 start.elapsed(),
528 format!("Stress sort {}: sync failed: {}", i, e),
529 );
530 }
531 }
532 }
533
534 let filter_data: Vec<u32> = (0..DATA_SIZE as u32).collect();
536 let filter_buffer = match ctx
537 .provider
538 .create_buffer_from_slice::<u32>(&filter_data, schema.clone())
539 {
540 Ok(buf) => buf,
541 Err(e) => {
542 return TestResult::error(
543 "test_stress_operations",
544 start.elapsed(),
545 format!("Filter buffer creation failed: {}", e),
546 )
547 }
548 };
549
550 for i in 0..NUM_OPERATIONS {
551 let selectivity = (i % 10 + 1) * 10; let mask: Vec<u8> = (0..DATA_SIZE)
553 .map(|j| {
554 if (j * 100 / DATA_SIZE) < selectivity {
555 1
556 } else {
557 0
558 }
559 })
560 .collect();
561
562 let filtered = match ctx.provider.filter_by_mask(&filter_buffer, &mask) {
563 Ok(f) => f,
564 Err(e) => {
565 return TestResult::error(
566 "test_stress_operations",
567 start.elapsed(),
568 format!("Stress filter {}: failed: {}", i, e),
569 )
570 }
571 };
572
573 if i % 20 == 0 {
575 let expected_min = (DATA_SIZE * selectivity / 100).saturating_sub(DATA_SIZE / 20);
576 let expected_max = (DATA_SIZE * selectivity / 100) + DATA_SIZE / 20 + 1;
577
578 let count = ctx.device_row_count(&filtered) as usize;
579 if count < expected_min || count > expected_max {
580 return TestResult::error(
581 "test_stress_operations",
582 start.elapsed(),
583 format!(
584 "Stress filter {}: got {} rows, expected ~{}",
585 i,
586 count,
587 DATA_SIZE * selectivity / 100
588 ),
589 );
590 }
591
592 if let Err(e) = ctx.sync_and_check() {
593 return TestResult::error(
594 "test_stress_operations",
595 start.elapsed(),
596 format!("Stress filter {}: sync failed: {}", i, e),
597 );
598 }
599 }
600 }
601
602 let schema2 = Schema::new(vec![
604 ("key".to_string(), ScalarType::U32),
605 ("val".to_string(), ScalarType::U32),
606 ]);
607
608 for i in 0..NUM_OPERATIONS / 2 {
609 let dedup_keys: Vec<u32> = (0..DATA_SIZE).map(|j| (j % 100) as u32).collect();
611 let dedup_vals: Vec<u32> = (0..DATA_SIZE as u32).collect();
612
613 let dedup_buffer = match ctx
614 .provider
615 .create_buffer_from_u32_columns(&[&dedup_keys, &dedup_vals], schema2.clone())
616 {
617 Ok(buf) => buf,
618 Err(e) => {
619 return TestResult::error(
620 "test_stress_operations",
621 start.elapsed(),
622 format!("Stress dedup {}: buffer creation failed: {}", i, e),
623 )
624 }
625 };
626
627 let deduped = match ctx.provider.dedup(&dedup_buffer, &[0]) {
628 Ok(d) => d,
629 Err(e) => {
630 return TestResult::error(
631 "test_stress_operations",
632 start.elapsed(),
633 format!("Stress dedup {}: failed: {}", i, e),
634 )
635 }
636 };
637
638 if ctx.device_row_count(&deduped) != 100 {
639 return TestResult::error(
640 "test_stress_operations",
641 start.elapsed(),
642 format!(
643 "Stress dedup {}: expected 100, got {}",
644 i,
645 ctx.device_row_count(&deduped)
646 ),
647 );
648 }
649
650 if i % 20 == 0 {
651 if let Err(e) = ctx.sync_and_check() {
652 return TestResult::error(
653 "test_stress_operations",
654 start.elapsed(),
655 format!("Stress dedup {}: sync failed: {}", i, e),
656 );
657 }
658 }
659 }
660
661 if let Err(e) = ctx.sync_and_check() {
663 return TestResult::error(
664 "test_stress_operations",
665 start.elapsed(),
666 format!("Final stress sync failed: {}", e),
667 );
668 }
669
670 TestResult::passed("test_stress_operations", start.elapsed())
671}
672
673fn test_memory_pressure(ctx: &TestContext) -> TestResult {
677 let start = Instant::now();
678 let schema = Schema::new(vec![("val".to_string(), ScalarType::U32)]);
679
680 let budget = ctx.memory_budget();
681
682 let buffer_size = (budget / 20 / 4).min(5_000_000);
685
686 if buffer_size < 1000 {
687 return TestResult::skipped(
688 "test_memory_pressure",
689 "Memory budget too small for pressure test",
690 );
691 }
692
693 let mut buffers = Vec::new();
695 let mut successful_allocations = 0;
696
697 for i in 0..5 {
698 let data: Vec<u32> = (0..buffer_size)
699 .map(|j| ((j + i * buffer_size) % buffer_size) as u32)
700 .collect();
701
702 match ctx
703 .provider
704 .create_buffer_from_slice::<u32>(&data, schema.clone())
705 {
706 Ok(buf) => {
707 buffers.push(buf);
708 successful_allocations += 1;
709 }
710 Err(_) => {
711 break;
713 }
714 }
715 }
716
717 if successful_allocations == 0 {
719 return TestResult::error(
720 "test_memory_pressure",
721 start.elapsed(),
722 "Could not allocate any buffers".to_string(),
723 );
724 }
725
726 for (i, buffer) in buffers.iter().enumerate() {
728 let sorted = match ctx.provider.sort(buffer, &[0]) {
729 Ok(s) => s,
730 Err(e) => {
731 return TestResult::error(
732 "test_memory_pressure",
733 start.elapsed(),
734 format!("Buffer {}: sort failed under pressure: {}", i, e),
735 )
736 }
737 };
738
739 let result = match ctx.provider.download_column::<u32>(&sorted, 0) {
741 Ok(d) => d,
742 Err(e) => {
743 return TestResult::error(
744 "test_memory_pressure",
745 start.elapsed(),
746 format!("Buffer {}: download failed under pressure: {}", i, e),
747 )
748 }
749 };
750
751 for j in (1..result.len()).step_by(10000) {
753 if result[j] < result[j - 1] {
754 return TestResult::error(
755 "test_memory_pressure",
756 start.elapsed(),
757 format!("Buffer {}: sort incorrect at index {}", i, j),
758 );
759 }
760 }
761 }
762
763 if let Err(e) = ctx.sync_and_check() {
764 return TestResult::error(
765 "test_memory_pressure",
766 start.elapsed(),
767 format!("Sync under pressure failed: {}", e),
768 );
769 }
770
771 for (i, buffer) in buffers.iter().enumerate() {
773 let mask: Vec<u8> = (0..buffer_size)
774 .map(|j| if j % 2 == 0 { 1 } else { 0 })
775 .collect();
776
777 let filtered = match ctx.provider.filter_by_mask(buffer, &mask) {
778 Ok(f) => f,
779 Err(e) => {
780 return TestResult::error(
781 "test_memory_pressure",
782 start.elapsed(),
783 format!("Buffer {}: filter failed under pressure: {}", i, e),
784 )
785 }
786 };
787
788 let expected = (buffer_size + 1) / 2;
789 if ctx.device_row_count(&filtered) != expected as u64 {
790 return TestResult::error(
791 "test_memory_pressure",
792 start.elapsed(),
793 format!(
794 "Buffer {}: filter expected {} rows, got {}",
795 i,
796 expected,
797 ctx.device_row_count(&filtered)
798 ),
799 );
800 }
801 }
802
803 drop(buffers);
805
806 if let Err(e) = ctx.sync_and_check() {
807 return TestResult::error(
808 "test_memory_pressure",
809 start.elapsed(),
810 format!("Sync after release failed: {}", e),
811 );
812 }
813
814 let fresh_data: Vec<u32> = (0..10000u32).collect();
816 let fresh_buffer = match ctx
817 .provider
818 .create_buffer_from_slice::<u32>(&fresh_data, schema.clone())
819 {
820 Ok(buf) => buf,
821 Err(e) => {
822 return TestResult::error(
823 "test_memory_pressure",
824 start.elapsed(),
825 format!("Fresh buffer after pressure failed: {}", e),
826 )
827 }
828 };
829
830 let fresh_sorted = match ctx.provider.sort(&fresh_buffer, &[0]) {
831 Ok(s) => s,
832 Err(e) => {
833 return TestResult::error(
834 "test_memory_pressure",
835 start.elapsed(),
836 format!("Fresh sort after pressure failed: {}", e),
837 )
838 }
839 };
840
841 if ctx.device_row_count(&fresh_sorted) != 10000 {
842 return TestResult::error(
843 "test_memory_pressure",
844 start.elapsed(),
845 format!(
846 "Fresh result after pressure: expected 10000, got {}",
847 ctx.device_row_count(&fresh_sorted)
848 ),
849 );
850 }
851
852 if let Err(e) = ctx.sync_and_check() {
853 return TestResult::error(
854 "test_memory_pressure",
855 start.elapsed(),
856 format!("Final sync failed: {}", e),
857 );
858 }
859
860 TestResult::passed("test_memory_pressure", start.elapsed())
861}
862
863fn test_sustained_operation(ctx: &TestContext) -> TestResult {
868 let start = Instant::now();
869 let schema = Schema::new(vec![("val".to_string(), ScalarType::U32)]);
870
871 const SIZE: usize = 50000;
872 const DURATION_SECONDS: u64 = 5; let deadline = start + std::time::Duration::from_secs(DURATION_SECONDS);
875 let mut operation_count = 0;
876 let mut error_count = 0;
877
878 let reference_data: Vec<u32> = (0..SIZE)
880 .map(|i| ((i * 1103515245 + 12345) % SIZE) as u32)
881 .collect();
882
883 let mut expected_sorted = reference_data.clone();
884 expected_sorted.sort();
885
886 while Instant::now() < deadline {
888 let data: Vec<u32> = reference_data.clone();
889
890 let buffer = match ctx
891 .provider
892 .create_buffer_from_slice::<u32>(&data, schema.clone())
893 {
894 Ok(buf) => buf,
895 Err(_) => {
896 error_count += 1;
897 continue;
898 }
899 };
900
901 let sorted = match ctx.provider.sort(&buffer, &[0]) {
902 Ok(s) => s,
903 Err(_) => {
904 error_count += 1;
905 continue;
906 }
907 };
908
909 if operation_count % 10 == 0 {
911 let result = match ctx.provider.download_column::<u32>(&sorted, 0) {
912 Ok(d) => d,
913 Err(_) => {
914 error_count += 1;
915 continue;
916 }
917 };
918
919 if result != expected_sorted {
920 return TestResult::error(
921 "test_sustained_operation",
922 start.elapsed(),
923 format!(
924 "Operation {} produced incorrect result after {:?}",
925 operation_count,
926 start.elapsed()
927 ),
928 );
929 }
930
931 if let Err(e) = ctx.sync_and_check() {
932 return TestResult::error(
933 "test_sustained_operation",
934 start.elapsed(),
935 format!("Sync failed at operation {}: {}", operation_count, e),
936 );
937 }
938 }
939
940 operation_count += 1;
941 }
942
943 if operation_count < 10 {
945 return TestResult::error(
946 "test_sustained_operation",
947 start.elapsed(),
948 format!(
949 "Too few operations completed: {} (expected >= 10)",
950 operation_count
951 ),
952 );
953 }
954
955 if error_count > operation_count / 100 {
957 return TestResult::error(
958 "test_sustained_operation",
959 start.elapsed(),
960 format!(
961 "High error rate: {}/{} operations failed",
962 error_count, operation_count
963 ),
964 );
965 }
966
967 let filter_data: Vec<u32> = (0..SIZE as u32).collect();
969 let filter_buffer = match ctx
970 .provider
971 .create_buffer_from_slice::<u32>(&filter_data, schema.clone())
972 {
973 Ok(buf) => buf,
974 Err(e) => {
975 return TestResult::error(
976 "test_sustained_operation",
977 start.elapsed(),
978 format!("Sustained filter buffer failed: {}", e),
979 )
980 }
981 };
982
983 let deadline2 = Instant::now() + std::time::Duration::from_secs(DURATION_SECONDS / 2);
984 let mut filter_count = 0;
985
986 while Instant::now() < deadline2 {
987 let selectivity = (filter_count % 10 + 1) * 10;
988 let mask: Vec<u8> = (0..SIZE)
989 .map(|j| if (j * 100 / SIZE) < selectivity { 1 } else { 0 })
990 .collect();
991
992 let filtered = match ctx.provider.filter_by_mask(&filter_buffer, &mask) {
993 Ok(f) => f,
994 Err(e) => {
995 return TestResult::error(
996 "test_sustained_operation",
997 start.elapsed(),
998 format!("Sustained filter {} failed: {}", filter_count, e),
999 )
1000 }
1001 };
1002
1003 let expected_min = (SIZE * selectivity / 100).saturating_sub(SIZE / 10);
1005 let expected_max = (SIZE * selectivity / 100) + SIZE / 10 + 1;
1006
1007 let count = ctx.device_row_count(&filtered) as usize;
1008 if count < expected_min || count > expected_max {
1009 return TestResult::error(
1010 "test_sustained_operation",
1011 start.elapsed(),
1012 format!(
1013 "Sustained filter {}: got {} rows, expected ~{}",
1014 filter_count,
1015 count,
1016 SIZE * selectivity / 100
1017 ),
1018 );
1019 }
1020
1021 filter_count += 1;
1022 }
1023
1024 let schema2 = Schema::new(vec![
1026 ("key".to_string(), ScalarType::U32),
1027 ("val".to_string(), ScalarType::U32),
1028 ]);
1029
1030 let left_keys: Vec<u32> = (0..1000u32).collect();
1031 let left_vals: Vec<u32> = left_keys.iter().map(|&k| k * 2).collect();
1032
1033 let right_keys: Vec<u32> = (0..500u32).map(|i| i * 2).collect();
1034 let right_vals: Vec<u32> = right_keys.iter().map(|&k| k * 3).collect();
1035
1036 let left_buffer = match ctx
1037 .provider
1038 .create_buffer_from_u32_columns(&[&left_keys, &left_vals], schema2.clone())
1039 {
1040 Ok(buf) => buf,
1041 Err(e) => {
1042 return TestResult::error(
1043 "test_sustained_operation",
1044 start.elapsed(),
1045 format!("Sustained join left buffer failed: {}", e),
1046 )
1047 }
1048 };
1049
1050 let right_buffer = match ctx
1051 .provider
1052 .create_buffer_from_u32_columns(&[&right_keys, &right_vals], schema2)
1053 {
1054 Ok(buf) => buf,
1055 Err(e) => {
1056 return TestResult::error(
1057 "test_sustained_operation",
1058 start.elapsed(),
1059 format!("Sustained join right buffer failed: {}", e),
1060 )
1061 }
1062 };
1063
1064 let deadline3 = Instant::now() + std::time::Duration::from_secs(DURATION_SECONDS / 2);
1065 let mut join_count = 0;
1066
1067 while Instant::now() < deadline3 {
1068 let joined = match ctx
1069 .provider
1070 .hash_join(&left_buffer, &right_buffer, &[0], &[0])
1071 {
1072 Ok(j) => j,
1073 Err(e) => {
1074 return TestResult::error(
1075 "test_sustained_operation",
1076 start.elapsed(),
1077 format!("Sustained join {} failed: {}", join_count, e),
1078 )
1079 }
1080 };
1081
1082 if ctx.device_row_count(&joined) != 500 {
1083 return TestResult::error(
1084 "test_sustained_operation",
1085 start.elapsed(),
1086 format!(
1087 "Sustained join {}: expected 500 rows, got {}",
1088 join_count,
1089 ctx.device_row_count(&joined)
1090 ),
1091 );
1092 }
1093
1094 join_count += 1;
1095 }
1096
1097 if let Err(e) = ctx.sync_and_check() {
1099 return TestResult::error(
1100 "test_sustained_operation",
1101 start.elapsed(),
1102 format!("Final sustained sync failed: {}", e),
1103 );
1104 }
1105
1106 TestResult::passed("test_sustained_operation", start.elapsed())
1107}