1use std::fmt;
49use std::sync::atomic::{AtomicU64, Ordering};
50use std::sync::Mutex;
51use std::time::{SystemTime, UNIX_EPOCH};
52
53use super::resource::{
54 Access, AllocTag, BlockId, DeviceBlock, DeviceMemoryResource, Generation, ResourceError,
55 ResourceResult, StreamId,
56};
57
58#[derive(Clone, Copy, Debug, Eq, PartialEq)]
62pub enum LogAction {
63 Allocate,
64 Deallocate,
65 ReapPending,
66}
67
68impl fmt::Display for LogAction {
69 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70 match self {
71 LogAction::Allocate => f.write_str("allocate"),
72 LogAction::Deallocate => f.write_str("deallocate"),
73 LogAction::ReapPending => f.write_str("reap_pending"),
74 }
75 }
76}
77
78#[derive(Clone, Debug, Eq, PartialEq)]
83pub enum LogResult {
84 Ok,
85 Err { kind: &'static str, message: String },
86}
87
88impl LogResult {
89 pub fn from_result<T>(r: &ResourceResult<T>) -> Self {
92 match r {
93 Ok(_) => LogResult::Ok,
94 Err(e) => LogResult::Err {
95 kind: classify_error(e),
96 message: truncate(format!("{}", e), 256),
97 },
98 }
99 }
100}
101
102fn classify_error(e: &ResourceError) -> &'static str {
103 match e {
104 ResourceError::OutOfBudget { .. } => "OutOfBudget",
105 ResourceError::Driver(_) => "Driver",
106 ResourceError::StreamMisuse(_) => "StreamMisuse",
107 ResourceError::UseAfterFree { .. } => "UseAfterFree",
108 ResourceError::OutOfBounds { .. } => "OutOfBounds",
109 }
110}
111
112fn truncate(mut s: String, cap: usize) -> String {
113 if s.len() > cap {
114 let mut end = cap;
121 while end > 0 && !s.is_char_boundary(end) {
122 end -= 1;
123 }
124 s.truncate(end);
125 s.push('…');
126 }
127 s
128}
129
130#[derive(Clone, Debug)]
133pub struct LogRecord {
134 pub action: LogAction,
135 pub device_ordinal: u32,
136 pub stream_id: Option<StreamId>,
137 pub ptr: Option<u64>,
138 pub bytes: Option<usize>,
139 pub tag: Option<AllocTag>,
140 pub generation: Option<Generation>,
141 pub thread_id: u64,
142 pub order_counter: u64,
143 pub timestamp_nanos: u128,
144 pub result: LogResult,
145}
146
147static ORDER_COUNTER: AtomicU64 = AtomicU64::new(1);
152
153fn next_order_counter() -> u64 {
154 ORDER_COUNTER.fetch_add(1, Ordering::Relaxed)
155}
156
157fn now_nanos() -> u128 {
158 SystemTime::now()
159 .duration_since(UNIX_EPOCH)
160 .map(|d| d.as_nanos())
161 .unwrap_or(0)
162}
163
164fn current_thread_id_u64() -> u64 {
165 let s = format!("{:?}", std::thread::current().id());
169 let mut h: u64 = 0xcbf2_9ce4_8422_2325;
170 for b in s.as_bytes() {
171 h ^= *b as u64;
172 h = h.wrapping_mul(0x100_0000_01b3);
173 }
174 h
175}
176
177#[derive(Debug)]
183pub enum SinkError {
184 Refused(String),
186 Io(String),
188}
189
190impl fmt::Display for SinkError {
191 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
192 match self {
193 SinkError::Refused(m) => write!(f, "log sink refused record: {}", m),
194 SinkError::Io(m) => write!(f, "log sink io error: {}", m),
195 }
196 }
197}
198
199impl std::error::Error for SinkError {}
200
201pub trait LoggingSink: Send + Sync {
205 fn emit(&self, record: LogRecord) -> Result<(), SinkError>;
208}
209
210pub struct InMemorySink {
214 records: Mutex<Vec<LogRecord>>,
215}
216
217impl InMemorySink {
218 pub fn new() -> Self {
219 Self {
220 records: Mutex::new(Vec::new()),
221 }
222 }
223
224 pub fn snapshot(&self) -> Vec<LogRecord> {
226 self.records.lock().expect("InMemorySink poisoned").clone()
227 }
228
229 pub fn clear(&self) {
232 self.records.lock().expect("InMemorySink poisoned").clear();
233 }
234
235 pub fn len(&self) -> usize {
237 self.records.lock().expect("InMemorySink poisoned").len()
238 }
239
240 pub fn is_empty(&self) -> bool {
241 self.len() == 0
242 }
243}
244
245impl Default for InMemorySink {
246 fn default() -> Self {
247 Self::new()
248 }
249}
250
251impl LoggingSink for InMemorySink {
252 fn emit(&self, record: LogRecord) -> Result<(), SinkError> {
253 self.records
254 .lock()
255 .expect("InMemorySink poisoned")
256 .push(record);
257 Ok(())
258 }
259}
260
261pub struct NullSink;
277
278impl NullSink {
279 pub fn new() -> Self {
280 Self
281 }
282}
283
284impl Default for NullSink {
285 fn default() -> Self {
286 Self
287 }
288}
289
290impl LoggingSink for NullSink {
291 fn emit(&self, _record: LogRecord) -> Result<(), SinkError> {
292 Ok(())
293 }
294}
295
296pub struct LoggingResource {
298 inner: Box<dyn DeviceMemoryResource + Send + Sync>,
299 sink: std::sync::Arc<dyn LoggingSink>,
300 dropped_records: AtomicU64,
304}
305
306impl LoggingResource {
307 pub fn new(
310 inner: Box<dyn DeviceMemoryResource + Send + Sync>,
311 sink: std::sync::Arc<dyn LoggingSink>,
312 ) -> Self {
313 Self {
314 inner,
315 sink,
316 dropped_records: AtomicU64::new(0),
317 }
318 }
319
320 pub fn dropped_records(&self) -> u64 {
323 self.dropped_records.load(Ordering::Relaxed)
324 }
325
326 fn emit(&self, record: LogRecord) {
327 if self.sink.emit(record).is_err() {
328 self.dropped_records.fetch_add(1, Ordering::Relaxed);
329 }
330 }
331}
332
333impl DeviceMemoryResource for LoggingResource {
334 fn allocate(
335 &self,
336 bytes: usize,
337 stream: StreamId,
338 tag: AllocTag,
339 ) -> ResourceResult<DeviceBlock> {
340 let result = self.inner.allocate(bytes, stream, tag);
341 let (ptr, gen, recorded_bytes) = match &result {
342 Ok(b) => (Some(b.ptr), Some(b.generation), Some(b.bytes)),
343 Err(_) => (None, None, Some(bytes)),
344 };
345 self.emit(LogRecord {
346 action: LogAction::Allocate,
347 device_ordinal: self.inner.device_ordinal(),
348 stream_id: Some(stream),
349 ptr,
350 bytes: recorded_bytes,
351 tag: Some(tag),
352 generation: gen,
353 thread_id: current_thread_id_u64(),
354 order_counter: next_order_counter(),
355 timestamp_nanos: now_nanos(),
356 result: LogResult::from_result(&result),
357 });
358 result
359 }
360
361 fn deallocate(&self, block: DeviceBlock) -> ResourceResult<()> {
362 let ptr = block.ptr;
366 let bytes = block.bytes;
367 let tag = block.tag;
368 let gen = block.generation;
369 let stream = block.alloc_stream;
370 let dev = block.device_ordinal;
371
372 let result = self.inner.deallocate(block);
373 self.emit(LogRecord {
374 action: LogAction::Deallocate,
375 device_ordinal: dev,
376 stream_id: Some(stream),
377 ptr: Some(ptr),
378 bytes: Some(bytes),
379 tag: Some(tag),
380 generation: Some(gen),
381 thread_id: current_thread_id_u64(),
382 order_counter: next_order_counter(),
383 timestamp_nanos: now_nanos(),
384 result: LogResult::from_result(&result),
385 });
386 result
387 }
388
389 fn device_ordinal(&self) -> u32 {
390 self.inner.device_ordinal()
391 }
392
393 fn bytes_outstanding(&self) -> usize {
394 self.inner.bytes_outstanding()
395 }
396
397 fn reap_pending(&self) -> ResourceResult<()> {
398 let result = self.inner.reap_pending();
399 self.emit(LogRecord {
400 action: LogAction::ReapPending,
401 device_ordinal: self.inner.device_ordinal(),
402 stream_id: None,
403 ptr: None,
404 bytes: None,
405 tag: None,
406 generation: None,
407 thread_id: current_thread_id_u64(),
408 order_counter: next_order_counter(),
409 timestamp_nanos: now_nanos(),
410 result: LogResult::from_result(&result),
411 });
412 result
413 }
414
415 fn record_block_use(&self, block: &DeviceBlock, use_stream: StreamId) -> ResourceResult<()> {
416 self.inner.record_block_use(block, use_stream)
425 }
426
427 fn supports_block_use_tracking(&self) -> bool {
428 self.inner.supports_block_use_tracking()
429 }
430
431 fn prepare_block_use(
432 &self,
433 block: BlockId,
434 use_stream: StreamId,
435 access: Access,
436 ) -> ResourceResult<()> {
437 self.inner.prepare_block_use(block, use_stream, access)
440 }
441
442 fn finish_block_use(
443 &self,
444 block: BlockId,
445 use_stream: StreamId,
446 access: Access,
447 ) -> ResourceResult<()> {
448 self.inner.finish_block_use(block, use_stream, access)
450 }
451}
452
453#[cfg(test)]
454mod tests {
455 use super::super::direct::DirectCudaResource;
456 use super::super::resource::BlockState;
457 use super::*;
458 use std::sync::Arc;
459
460 use crate::CudaDevice;
461
462 fn try_device() -> Option<Arc<CudaDevice>> {
463 CudaDevice::new(0).ok().map(Arc::new)
464 }
465
466 #[test]
467 fn pass_through_alloc_dealloc_emits_two_ok_records() {
468 let Some(device) = try_device() else {
469 return;
470 };
471 let inner = Box::new(DirectCudaResource::new(Arc::clone(&device), 0));
472 let sink = Arc::new(InMemorySink::new());
473 let r = LoggingResource::new(inner, sink.clone());
474
475 let block = r
476 .allocate(1024, StreamId::DEFAULT, AllocTag("logging-test"))
477 .expect("alloc");
478 assert_eq!(block.bytes, 1024);
479 assert_eq!(block.state, BlockState::Live);
480 r.deallocate(block).expect("dealloc");
481
482 let recs = sink.snapshot();
483 assert_eq!(recs.len(), 2, "expected 2 records, got {:?}", recs);
484 assert_eq!(recs[0].action, LogAction::Allocate);
485 assert_eq!(recs[0].result, LogResult::Ok);
486 assert_eq!(recs[0].bytes, Some(1024));
487 assert_eq!(recs[0].stream_id, Some(StreamId::DEFAULT));
488 assert!(recs[0].ptr.is_some());
489
490 assert_eq!(recs[1].action, LogAction::Deallocate);
491 assert_eq!(recs[1].result, LogResult::Ok);
492 assert_eq!(recs[1].ptr, recs[0].ptr);
493 assert_eq!(recs[1].generation, recs[0].generation);
494 }
495
496 #[test]
497 fn order_counter_strictly_increases_across_records() {
498 let Some(device) = try_device() else {
499 return;
500 };
501 let inner = Box::new(DirectCudaResource::new(Arc::clone(&device), 0));
502 let sink = Arc::new(InMemorySink::new());
503 let r = LoggingResource::new(inner, sink.clone());
504
505 for _ in 0..4 {
506 let b = r
507 .allocate(64, StreamId::DEFAULT, AllocTag::UNTAGGED)
508 .expect("alloc");
509 r.deallocate(b).expect("dealloc");
510 }
511 r.reap_pending().expect("reap");
512
513 let recs = sink.snapshot();
514 assert_eq!(recs.len(), 9); let mut last = 0u64;
516 for rec in &recs {
517 assert!(
518 rec.order_counter > last,
519 "order_counter must strictly increase: prev={}, now={}",
520 last,
521 rec.order_counter
522 );
523 last = rec.order_counter;
524 }
525 }
526
527 #[test]
528 fn failed_alloc_records_error_result() {
529 let Some(device) = try_device() else {
530 return;
531 };
532 let inner = Box::new(DirectCudaResource::new(Arc::clone(&device), 0));
533 let sink = Arc::new(InMemorySink::new());
534 let r = LoggingResource::new(inner, sink.clone());
535
536 let _ = r.allocate(0, StreamId::DEFAULT, AllocTag::UNTAGGED);
538 let recs = sink.snapshot();
539 assert_eq!(recs.len(), 1);
540 assert_eq!(recs[0].action, LogAction::Allocate);
541 assert!(matches!(recs[0].result, LogResult::Err { kind, .. } if kind == "Driver"));
542 assert_eq!(recs[0].bytes, Some(0));
544 assert!(recs[0].ptr.is_none());
545 assert!(recs[0].generation.is_none());
546 }
547
548 #[test]
549 fn failed_dealloc_records_error_result() {
550 let Some(device) = try_device() else {
551 return;
552 };
553 let inner = Box::new(DirectCudaResource::new(Arc::clone(&device), 0));
554 let sink = Arc::new(InMemorySink::new());
555 let r = LoggingResource::new(inner, sink.clone());
556
557 let bogus = DeviceBlock {
558 ptr: 0xdead_beef,
559 device_ordinal: 0,
560 alloc_stream: StreamId::DEFAULT,
561 bytes: 16,
562 align: 1,
563 tag: AllocTag::UNTAGGED,
564 generation: Generation::next(),
565 state: BlockState::Live,
566 };
567 let res = r.deallocate(bogus);
568 assert!(res.is_err());
569 let recs = sink.snapshot();
570 assert_eq!(recs.len(), 1);
571 assert_eq!(recs[0].action, LogAction::Deallocate);
572 assert!(matches!(recs[0].result, LogResult::Err { kind, .. } if kind == "UseAfterFree"));
573 assert_eq!(recs[0].ptr, Some(0xdead_beef));
574 }
575
576 #[test]
577 fn reap_pending_emits_record() {
578 let Some(device) = try_device() else {
579 return;
580 };
581 let inner = Box::new(DirectCudaResource::new(Arc::clone(&device), 0));
582 let sink = Arc::new(InMemorySink::new());
583 let r = LoggingResource::new(inner, sink.clone());
584
585 r.reap_pending().expect("reap");
586 let recs = sink.snapshot();
587 assert_eq!(recs.len(), 1);
588 assert_eq!(recs[0].action, LogAction::ReapPending);
589 assert_eq!(recs[0].result, LogResult::Ok);
590 assert!(recs[0].stream_id.is_none());
591 assert!(recs[0].ptr.is_none());
592 }
593
594 #[test]
595 fn sink_failure_increments_dropped_records_but_does_not_break_alloc() {
596 struct RefuseAllSink;
599 impl LoggingSink for RefuseAllSink {
600 fn emit(&self, _r: LogRecord) -> Result<(), SinkError> {
601 Err(SinkError::Refused("test sink refuses all".into()))
602 }
603 }
604
605 let Some(device) = try_device() else {
606 return;
607 };
608 let inner = Box::new(DirectCudaResource::new(Arc::clone(&device), 0));
609 let sink = Arc::new(RefuseAllSink);
610 let r = LoggingResource::new(inner, sink);
611
612 let block = r
614 .allocate(128, StreamId::DEFAULT, AllocTag("refuse-test"))
615 .expect("alloc must succeed even when sink refuses");
616 r.deallocate(block).expect("dealloc must succeed too");
617 assert_eq!(r.dropped_records(), 2);
619 }
620
621 #[test]
622 fn forwards_bytes_outstanding_and_device_ordinal() {
623 let Some(device) = try_device() else {
624 return;
625 };
626 let inner = Box::new(DirectCudaResource::new(Arc::clone(&device), 3));
627 let sink = Arc::new(InMemorySink::new());
628 let r = LoggingResource::new(inner, sink);
629
630 assert_eq!(r.device_ordinal(), 3);
631 assert_eq!(r.bytes_outstanding(), 0);
632 }
633
634 #[test]
635 fn truncate_handles_non_ascii_at_cap_without_panicking() {
636 let s = String::from("héllo");
645 let out = truncate(s, 2);
646 assert!(out.starts_with('h'));
647 assert!(out.ends_with('…'));
648
649 let out = truncate(String::from("ok"), 100);
651 assert_eq!(out, "ok");
652
653 let out = truncate(String::from("héllo"), 0);
655 assert_eq!(out, "…");
656
657 let out = truncate(String::from("abcdefgh"), 3);
660 assert_eq!(out, "abc…");
661 }
662
663 #[test]
664 fn null_sink_accepts_records_without_retention() {
665 let sink = NullSink::new();
666 let rec = LogRecord {
667 action: LogAction::Allocate,
668 device_ordinal: 0,
669 stream_id: Some(StreamId::DEFAULT),
670 ptr: Some(0xdead_beef),
671 bytes: Some(64),
672 tag: Some(AllocTag::UNTAGGED),
673 generation: Some(Generation::next()),
674 thread_id: 0,
675 order_counter: 1,
676 timestamp_nanos: 0,
677 result: LogResult::Ok,
678 };
679 sink.emit(rec).expect("NullSink never errors");
680 }
683}