Skip to main content

xlog_cuda/device_runtime/
budget.rs

1//! [`GlobalDeviceBudget`] — per-runtime byte-limit decorator.
2//!
3//! Wraps a [`DeviceMemoryResource`] and enforces a single byte limit
4//! across all allocations that flow through it. Designed to be the
5//! per-runtime singleton replacement for the v0.5 per-provider
6//! `GpuMemoryManager` (which had no way to enforce a coherent budget
7//! across parallel tests, multiple providers, or Python callers
8//! sharing one physical GPU).
9//!
10//! # Accounting model
11//!
12//! `GlobalDeviceBudget` keeps `reserved_bytes` strictly equal to
13//! `inner.bytes_outstanding()` at every quiescent moment. This is the
14//! "live + retired-but-not-yet-freed" view from the trait — exactly
15//! the bytes the budget should be guarding.
16//!
17//! To keep that invariant under both synchronous and stream-ordered
18//! async inners, every public method is serialized through a single
19//! `Mutex<BudgetState>` and the inner call is invoked **inside** the
20//! lock. The lock window is bounded by the inner's CUDA call, which
21//! is in any case the dominant cost — the budget decorator does not
22//! add hot-path overhead beyond what the inner already imposes.
23//!
24//! ## Allocate
25//!
26//!   1. Lock state.
27//!   2. If `reserved + bytes > limit`: return
28//!      `ResourceError::OutOfBudget { requested, remaining }`.
29//!   3. Optimistically reserve: `reserved += bytes`.
30//!   4. Call `inner.allocate(bytes, ..)` under the lock. The inner's
31//!      own bookkeeping moves `bytes` from "free" to "live".
32//!   5. If inner returned `Err`, roll back the reservation:
33//!      `reserved -= bytes`. Forward the error.
34//!
35//! ## Deallocate / Reap
36//!
37//! For both methods we sample `inner.bytes_outstanding()` before and
38//! after the inner call (under the lock), and decrement `reserved`
39//! by the observed delta. The pattern handles both backends without
40//! branching:
41//!
42//!   * Synchronous inner (`DirectCudaResource`): `bytes_outstanding`
43//!     drops by the block's bytes on `deallocate`, so the delta is
44//!     `block.bytes`. `reap_pending` is a no-op (delta zero).
45//!   * Stream-ordered async inner (`AsyncCudaResource`): `deallocate`
46//!     moves bytes from "live" to "pending"; `bytes_outstanding`
47//!     stays the same, so the delta is zero — the budget is *not*
48//!     released yet. `reap_pending` drains the pending bytes whose
49//!     queued `cuMemFreeAsync` has completed; `bytes_outstanding`
50//!     drops by the drained total and the budget releases that
51//!     same total.
52//!
53//! Because the inner call and the before/after samples happen under
54//! the same lock, no concurrent budget op can perturb the inner's
55//! `bytes_outstanding` between our reads — the delta strictly
56//! reflects this call's effect on the inner.
57//!
58//! # Composition
59//!
60//! `GlobalDeviceBudget` is a normal `DeviceMemoryResource`, so it
61//! plugs into [`XlogDeviceRuntime::with_resource`] and stacks under
62//! / over [`LoggingResource`]. Recommended ordering for production:
63//! `GlobalDeviceBudget(LoggingResource(AsyncCudaResource))`. That
64//! gives the budget atomic accounting, the logger sees the
65//! eventually-applied call (so `OutOfBudget` errors do not get
66//! double-logged), and the underlying allocator is reached last.
67//! Tests can stack either way.
68
69use std::sync::Mutex;
70
71use super::resource::{
72    Access, AllocTag, BlockId, DeviceBlock, DeviceMemoryResource, ResourceError, ResourceResult,
73    StreamId,
74};
75
76/// Internal state guarded by the budget mutex. Kept in its own
77/// struct so the lock guard syntactically scopes all updates.
78struct BudgetState {
79    reserved: usize,
80}
81
82/// Per-runtime byte-limit decorator.
83pub struct GlobalDeviceBudget {
84    inner: Box<dyn DeviceMemoryResource + Send + Sync>,
85    limit: usize,
86    state: Mutex<BudgetState>,
87}
88
89impl GlobalDeviceBudget {
90    /// Wrap `inner` with a hard `limit` in bytes. The initial
91    /// reserved tally is sampled from `inner.bytes_outstanding()`
92    /// so callers may compose around an inner that already has live
93    /// allocations — though in practice the decorator is installed
94    /// before any allocation flows through it.
95    pub fn new(inner: Box<dyn DeviceMemoryResource + Send + Sync>, limit: usize) -> Self {
96        let initial = inner.bytes_outstanding();
97        Self {
98            inner,
99            limit,
100            state: Mutex::new(BudgetState { reserved: initial }),
101        }
102    }
103
104    /// Hard byte limit. Set at construction; not adjustable.
105    pub fn limit(&self) -> usize {
106        self.limit
107    }
108
109    /// Bytes currently reserved against the budget (live + pending
110    /// async free). Matches `inner.bytes_outstanding()` at every
111    /// quiescent moment.
112    pub fn reserved_bytes(&self) -> usize {
113        self.state
114            .lock()
115            .expect("GlobalDeviceBudget poisoned")
116            .reserved
117    }
118
119    /// Headroom in bytes for the next allocation. Equal to
120    /// `limit - reserved_bytes`, saturating at zero.
121    pub fn remaining(&self) -> usize {
122        let state = self.state.lock().expect("GlobalDeviceBudget poisoned");
123        self.limit.saturating_sub(state.reserved)
124    }
125}
126
127impl DeviceMemoryResource for GlobalDeviceBudget {
128    fn allocate(
129        &self,
130        bytes: usize,
131        stream: StreamId,
132        tag: AllocTag,
133    ) -> ResourceResult<DeviceBlock> {
134        // First-pass reservation attempt under the budget lock.
135        // If the request fits, reserve and forward to the inner
136        // immediately.
137        {
138            let mut state = self.state.lock().expect("GlobalDeviceBudget poisoned");
139            let remaining = self.limit.saturating_sub(state.reserved);
140            if bytes <= remaining {
141                state.reserved = state.reserved.saturating_add(bytes);
142                drop(state);
143                return match self.inner.allocate(bytes, stream, tag) {
144                    Ok(block) => Ok(block),
145                    Err(e) => {
146                        let mut state = self.state.lock().expect("GlobalDeviceBudget poisoned");
147                        state.reserved = state.reserved.saturating_sub(bytes);
148                        Err(e)
149                    }
150                };
151            }
152            // Genuinely oversized requests can never fit even
153            // after a reap. Short-circuit before touching the
154            // inner stack so the rejection stays cheap and does
155            // not emit a reap log record.
156            if bytes > self.limit {
157                return Err(ResourceError::OutOfBudget {
158                    requested: bytes,
159                    remaining,
160                });
161            }
162        }
163
164        // Second pass: reservation didn't fit. With the
165        // stream-ordered async backend, dropped buffers transit
166        // through `pending_per_stream` until `reap_pending` runs;
167        // their bytes still count against `state.reserved`. Tight
168        // allocate-then-drop loops (cert hardware sustained
169        // tests; recursive Datalog inner loops without explicit
170        // reap) hit this even when the GPU has plenty of free
171        // memory. Drain pending frees once and retry. If the
172        // retry still fails, the budget is genuinely exhausted
173        // and the caller should see `OutOfBudget` as before.
174        //
175        // Reap is performed WITHOUT holding `state` so the inner
176        // resource's own locks can run; reap itself updates
177        // `state.reserved` via `Self::reap_pending` (which takes
178        // the lock) so a concurrent racing allocate sees the
179        // freed bytes.
180        let _ = self.reap_pending();
181
182        let mut state = self.state.lock().expect("GlobalDeviceBudget poisoned");
183        let remaining = self.limit.saturating_sub(state.reserved);
184        if bytes > remaining {
185            return Err(ResourceError::OutOfBudget {
186                requested: bytes,
187                remaining,
188            });
189        }
190        state.reserved = state.reserved.saturating_add(bytes);
191        drop(state);
192
193        match self.inner.allocate(bytes, stream, tag) {
194            Ok(block) => Ok(block),
195            Err(e) => {
196                let mut state = self.state.lock().expect("GlobalDeviceBudget poisoned");
197                state.reserved = state.reserved.saturating_sub(bytes);
198                Err(e)
199            }
200        }
201    }
202
203    fn deallocate(&self, block: DeviceBlock) -> ResourceResult<()> {
204        let mut state = self.state.lock().expect("GlobalDeviceBudget poisoned");
205
206        let before = self.inner.bytes_outstanding();
207        let result = self.inner.deallocate(block);
208        let after = self.inner.bytes_outstanding();
209        let freed = before.saturating_sub(after);
210        if freed > 0 {
211            state.reserved = state.reserved.saturating_sub(freed);
212        }
213        result
214    }
215
216    fn device_ordinal(&self) -> u32 {
217        self.inner.device_ordinal()
218    }
219
220    fn bytes_outstanding(&self) -> usize {
221        // Authoritative view is the inner's. We could return our
222        // own `reserved` instead, but matching the inner sidesteps
223        // any transient skew during error rollback.
224        self.inner.bytes_outstanding()
225    }
226
227    fn reap_pending(&self) -> ResourceResult<()> {
228        let mut state = self.state.lock().expect("GlobalDeviceBudget poisoned");
229
230        let before = self.inner.bytes_outstanding();
231        let result = self.inner.reap_pending();
232        let after = self.inner.bytes_outstanding();
233        let freed = before.saturating_sub(after);
234        if freed > 0 {
235            state.reserved = state.reserved.saturating_sub(freed);
236        }
237        result
238    }
239
240    fn record_block_use(&self, block: &DeviceBlock, use_stream: StreamId) -> ResourceResult<()> {
241        // Pass-through: budget enforcement does not affect
242        // cross-stream lifetime tracking; the inner resource (the
243        // stream-ordered backend) is the only layer that owns
244        // last-use events.
245        self.inner.record_block_use(block, use_stream)
246    }
247
248    fn supports_block_use_tracking(&self) -> bool {
249        self.inner.supports_block_use_tracking()
250    }
251
252    fn prepare_block_use(
253        &self,
254        block: BlockId,
255        use_stream: StreamId,
256        access: Access,
257    ) -> ResourceResult<()> {
258        // Pass-through: cross-stream waits live in the
259        // stream-ordered backend; budget accounting is unaffected.
260        self.inner.prepare_block_use(block, use_stream, access)
261    }
262
263    fn finish_block_use(
264        &self,
265        block: BlockId,
266        use_stream: StreamId,
267        access: Access,
268    ) -> ResourceResult<()> {
269        // Pass-through: see prepare_block_use rationale above.
270        self.inner.finish_block_use(block, use_stream, access)
271    }
272}
273
274#[cfg(test)]
275mod tests {
276    use super::super::async_resource::AsyncCudaResource;
277    use super::super::direct::DirectCudaResource;
278    use super::super::resource::{BlockState, Generation};
279    use super::super::stream_pool::StreamPool;
280    use super::*;
281    use std::sync::Arc;
282
283    use crate::CudaDevice;
284
285    fn try_device() -> Option<Arc<CudaDevice>> {
286        CudaDevice::new(0).ok().map(Arc::new)
287    }
288
289    /// Test fixture that always fails `allocate` so we can exercise
290    /// the rollback path without touching CUDA. `deallocate` and
291    /// `reap_pending` are no-ops; `bytes_outstanding` reflects an
292    /// internally tracked tally so the budget's delta-sampling logic
293    /// is also exercised.
294    struct AlwaysFailAllocResource {
295        ord: u32,
296        outstanding: std::sync::atomic::AtomicUsize,
297    }
298
299    impl AlwaysFailAllocResource {
300        fn new(ord: u32) -> Self {
301            Self {
302                ord,
303                outstanding: std::sync::atomic::AtomicUsize::new(0),
304            }
305        }
306    }
307
308    impl DeviceMemoryResource for AlwaysFailAllocResource {
309        fn allocate(
310            &self,
311            _bytes: usize,
312            _stream: StreamId,
313            _tag: AllocTag,
314        ) -> ResourceResult<DeviceBlock> {
315            Err(ResourceError::Driver("inner always fails".into()))
316        }
317        fn deallocate(&self, _block: DeviceBlock) -> ResourceResult<()> {
318            Ok(())
319        }
320        fn device_ordinal(&self) -> u32 {
321            self.ord
322        }
323        fn bytes_outstanding(&self) -> usize {
324            self.outstanding.load(std::sync::atomic::Ordering::Relaxed)
325        }
326    }
327
328    #[test]
329    fn allocate_within_limit_succeeds_and_updates_reserved() {
330        let Some(device) = try_device() else {
331            return;
332        };
333        let inner = Box::new(DirectCudaResource::new(Arc::clone(&device), 0));
334        let budget = GlobalDeviceBudget::new(inner, 64 * 1024);
335
336        let block = budget
337            .allocate(2048, StreamId::DEFAULT, AllocTag("budget-success"))
338            .expect("alloc within limit");
339        assert_eq!(budget.reserved_bytes(), 2048);
340        assert_eq!(budget.remaining(), 64 * 1024 - 2048);
341        assert_eq!(budget.bytes_outstanding(), 2048);
342
343        budget.deallocate(block).expect("dealloc");
344        assert_eq!(budget.reserved_bytes(), 0);
345        assert_eq!(budget.bytes_outstanding(), 0);
346    }
347
348    #[test]
349    fn allocate_at_exact_limit_succeeds_then_next_byte_rejected() {
350        let Some(device) = try_device() else {
351            return;
352        };
353        let inner = Box::new(DirectCudaResource::new(Arc::clone(&device), 0));
354        let budget = GlobalDeviceBudget::new(inner, 4096);
355
356        let block = budget
357            .allocate(4096, StreamId::DEFAULT, AllocTag::UNTAGGED)
358            .expect("alloc at exact limit");
359        assert_eq!(budget.reserved_bytes(), 4096);
360        assert_eq!(budget.remaining(), 0);
361
362        let err = budget.allocate(1, StreamId::DEFAULT, AllocTag::UNTAGGED);
363        assert!(
364            matches!(
365                err,
366                Err(ResourceError::OutOfBudget {
367                    requested: 1,
368                    remaining: 0
369                })
370            ),
371            "expected OutOfBudget {{1,0}}, got {:?}",
372            err
373        );
374        // Failed alloc must not perturb reserved.
375        assert_eq!(budget.reserved_bytes(), 4096);
376
377        budget.deallocate(block).expect("dealloc");
378        assert_eq!(budget.reserved_bytes(), 0);
379    }
380
381    #[test]
382    fn over_limit_alloc_returns_out_of_budget_with_correct_remaining() {
383        let Some(device) = try_device() else {
384            return;
385        };
386        let inner = Box::new(DirectCudaResource::new(Arc::clone(&device), 0));
387        let budget = GlobalDeviceBudget::new(inner, 1024);
388
389        // First alloc takes 768 bytes → 256 remaining.
390        let block = budget
391            .allocate(768, StreamId::DEFAULT, AllocTag::UNTAGGED)
392            .expect("first alloc");
393        assert_eq!(budget.remaining(), 256);
394
395        let err = budget.allocate(512, StreamId::DEFAULT, AllocTag::UNTAGGED);
396        assert!(
397            matches!(
398                err,
399                Err(ResourceError::OutOfBudget {
400                    requested: 512,
401                    remaining: 256
402                })
403            ),
404            "expected OutOfBudget {{512,256}}, got {:?}",
405            err
406        );
407
408        budget.deallocate(block).expect("dealloc");
409    }
410
411    #[test]
412    fn failed_inner_allocation_rolls_back_reservation() {
413        // No CUDA dependency — the fake inner always errors.
414        let inner = Box::new(AlwaysFailAllocResource::new(0));
415        let budget = GlobalDeviceBudget::new(inner, 1024 * 1024);
416        assert_eq!(budget.reserved_bytes(), 0);
417
418        let err = budget.allocate(2048, StreamId::DEFAULT, AllocTag::UNTAGGED);
419        assert!(matches!(err, Err(ResourceError::Driver(_))));
420        // Reservation must be rolled back: no live or pending bytes
421        // landed on the inner, so reserved stays at the pre-call
422        // value (0).
423        assert_eq!(budget.reserved_bytes(), 0);
424        assert_eq!(budget.remaining(), 1024 * 1024);
425    }
426
427    #[test]
428    fn deallocate_releases_budget_immediately_for_synchronous_inner() {
429        // DirectCudaResource is treated as synchronous from the
430        // budget's perspective: bytes_outstanding drops at
431        // deallocate time, so the delta-based release fires there.
432        let Some(device) = try_device() else {
433            return;
434        };
435        let inner = Box::new(DirectCudaResource::new(Arc::clone(&device), 0));
436        let budget = GlobalDeviceBudget::new(inner, 16 * 1024);
437
438        let block = budget
439            .allocate(8 * 1024, StreamId::DEFAULT, AllocTag::UNTAGGED)
440            .expect("alloc");
441        assert_eq!(budget.reserved_bytes(), 8 * 1024);
442        budget.deallocate(block).expect("dealloc");
443        assert_eq!(
444            budget.reserved_bytes(),
445            0,
446            "synchronous inner releases budget at deallocate"
447        );
448        // reap is a no-op for sync inners; budget unchanged.
449        budget.reap_pending().expect("reap noop");
450        assert_eq!(budget.reserved_bytes(), 0);
451    }
452
453    #[test]
454    fn deallocate_holds_budget_for_async_inner_until_reap_pending() {
455        let Some(device) = try_device() else {
456            return;
457        };
458        let pool = Arc::new(StreamPool::with_defaults(Arc::clone(&device)));
459        let inner = Box::new(AsyncCudaResource::new(
460            Arc::clone(&device),
461            0,
462            Arc::clone(&pool),
463        ));
464        let budget = GlobalDeviceBudget::new(inner, 32 * 1024);
465
466        let block = budget
467            .allocate(4096, StreamId::DEFAULT, AllocTag("budget-async"))
468            .expect("alloc");
469        assert_eq!(budget.reserved_bytes(), 4096);
470
471        // After deallocate the cuMemFreeAsync is queued but not
472        // drained; bytes_outstanding still shows 4096 (live → pending),
473        // so the budget MUST NOT release yet.
474        budget.deallocate(block).expect("dealloc");
475        assert_eq!(
476            budget.reserved_bytes(),
477            4096,
478            "async inner: budget must stay reserved until reap_pending drains pending free"
479        );
480        assert_eq!(budget.bytes_outstanding(), 4096);
481
482        budget.reap_pending().expect("reap");
483        assert_eq!(
484            budget.reserved_bytes(),
485            0,
486            "async inner: reap_pending releases the pending bytes"
487        );
488        assert_eq!(budget.bytes_outstanding(), 0);
489    }
490
491    #[test]
492    fn deallocate_unknown_block_does_not_release_budget() {
493        let Some(device) = try_device() else {
494            return;
495        };
496        let inner = Box::new(DirectCudaResource::new(Arc::clone(&device), 0));
497        let budget = GlobalDeviceBudget::new(inner, 16 * 1024);
498
499        let block = budget
500            .allocate(2048, StreamId::DEFAULT, AllocTag::UNTAGGED)
501            .expect("alloc");
502        assert_eq!(budget.reserved_bytes(), 2048);
503
504        // Bogus block — inner returns UseAfterFree without freeing
505        // anything; budget must not move.
506        let bogus = DeviceBlock {
507            ptr: 0xfeed_face,
508            device_ordinal: 0,
509            alloc_stream: StreamId::DEFAULT,
510            bytes: 1024,
511            align: 1,
512            tag: AllocTag::UNTAGGED,
513            generation: Generation::next(),
514            state: BlockState::Live,
515        };
516        let res = budget.deallocate(bogus);
517        assert!(matches!(res, Err(ResourceError::UseAfterFree { .. })));
518        assert_eq!(
519            budget.reserved_bytes(),
520            2048,
521            "bogus dealloc must not release budget"
522        );
523
524        budget.deallocate(block).expect("real dealloc");
525        assert_eq!(budget.reserved_bytes(), 0);
526    }
527
528    #[test]
529    fn forwards_device_ordinal() {
530        let inner = Box::new(AlwaysFailAllocResource::new(7));
531        let budget = GlobalDeviceBudget::new(inner, 1024);
532        assert_eq!(budget.device_ordinal(), 7);
533    }
534}