veecle_os_runtime/
memory_pool.rs

1//! An interrupt/thread-safe memory pool.
2//!
3//! The memory pool allows using static, stack or heap memory to store `SIZE` instances of `T`.
4//! [`MemoryPool::chunk`] provides [`Chunk`]s to interact with instances of `T`.
5//! [`Chunk`] is a pointer type, which means it is cheap to move.
6//! This makes the memory pool well suited for moving data between actors without copying.
7//! The memory pool is especially useful for large chunks of data or data that is expensive to move.
8//!
9//! [`Chunk`]s are automatically made available for re-use on drop.
10//!
11//! [`Chunk`]s can be created by:
12//! - [`MemoryPool::reserve`] and [`MemoryPoolToken::init`], which uses the provided value of `T` to initialize the
13//!   chunk. [`MemoryPool::chunk`] combines both into a single method call.
14//! - [`MemoryPool::reserve`] and [`MemoryPoolToken::init_in_place`] to initialize `T` in place.
15//!
16//! # Example
17//!
18//! ```
19//! use veecle_os_runtime::{ExclusiveReader, Writer};
20//! use veecle_os_runtime::memory_pool::{Chunk, MemoryPool};
21//! use core::convert::Infallible;
22//! use veecle_os_runtime::Storable;
23//!
24//! #[derive(Debug, Storable)]
25//! #[storable(data_type = "Chunk<'static, u8>")]
26//! pub struct Data;
27//!
28//! #[veecle_os_runtime::actor]
29//! async fn exclusive_read_actor(mut reader: ExclusiveReader<'_, Data>) -> Infallible {
30//!     loop {
31//!         if let Some(chunk) = reader.take() {
32//!             println!("Chunk received: {:?}", chunk);
33//!             println!("Chunk content: {:?}", *chunk);
34//!         } else {
35//!             reader.wait_for_update().await;
36//!         }
37//!     }
38//! }
39//!
40//! #[veecle_os_runtime::actor]
41//! async fn write_actor(
42//!     mut writer: Writer<'_, Data>,
43//!     #[init_context] pool: &'static MemoryPool<u8, 5>,
44//! ) -> Infallible {
45//!     for index in 0..10 {
46//!         writer.write(pool.chunk(index).unwrap()).await;
47//!     }
48//! #       // Exit the application to allow doc-tests to complete.
49//! #       std::process::exit(0);
50//! }
51//!
52//! static POOL: MemoryPool<u8, 5> = MemoryPool::new();
53//!
54//! # futures::executor::block_on(
55//! #
56//! veecle_os_runtime::execute! {
57//!    store: [Data],
58//!    actors: [
59//!        ExclusiveReadActor,
60//!        WriteActor: &POOL,
61//!    ]
62//! }
63//! # );
64//!  ```
65
66use core::cell::UnsafeCell;
67use core::fmt;
68use core::fmt::{Debug, Formatter};
69use core::mem::MaybeUninit;
70use core::ops::{Deref, DerefMut};
71use core::sync::atomic::{AtomicBool, Ordering};
72
73/// Interrupt- and thread-safe memory pool.
74///
75/// See [module-level documentation][self] for more information.
76#[derive(Debug)]
77pub struct MemoryPool<T, const SIZE: usize> {
78    chunks: [MemoryPoolInner<T>; SIZE],
79}
80
81impl<T, const SIZE: usize> Default for MemoryPool<T, SIZE> {
82    fn default() -> Self {
83        Self::new()
84    }
85}
86
87impl<T, const SIZE: usize> MemoryPool<T, SIZE> {
88    /// Creates a new [`MemoryPool`].
89    ///
90    /// `SIZE` is required to be larger than 0.
91    pub const fn new() -> Self {
92        const {
93            assert!(SIZE > 0, "empty ObjectPool");
94        }
95
96        Self {
97            chunks: [const { MemoryPoolInner::new() }; SIZE],
98        }
99    }
100
101    /// Reserves an element in the [`MemoryPool`].
102    ///
103    /// Returns `None` if no element is available.
104    ///
105    /// The returned token has to be initialized via [`MemoryPoolToken::init`] before use.
106    /// See [`MemoryPool::chunk`] for a convenience wrapper combining reserving and initializing a [`Chunk`].
107    pub fn reserve(&self) -> Option<MemoryPoolToken<'_, T>> {
108        self.chunks.iter().find_map(|chunk| chunk.reserve())
109    }
110
111    /// Retrieves a [`Chunk`] from the [`MemoryPool`] and initializes it with `init_value`.
112    ///
113    /// Returns `Err(init_value)` if no more [`Chunk`]s are available.
114    ///
115    /// Convenience wrapper combining [`MemoryPool::reserve`] and [`MemoryPoolToken::init].
116    pub fn chunk(&self, init_value: T) -> Result<Chunk<'_, T>, T> {
117        // We need to split reserving and initializing of the `Chunk` because we cannot copy the `init_value` into
118        // every `reserve` call.
119        let token = self.reserve();
120
121        if let Some(token) = token {
122            Ok(token.init(init_value))
123        } else {
124            Err(init_value)
125        }
126    }
127
128    /// Calculates the amount of chunks currently available.
129    ///
130    /// Due to accesses from interrupts and/or other threads, this value might not be correct.
131    /// Only intended for metrics.
132    pub fn chunks_available(&self) -> usize {
133        self.chunks
134            .iter()
135            .map(|chunk| usize::from(chunk.is_available()))
136            .sum()
137    }
138}
139
140// SAFETY: All accesses to the `MemoryPool` are done through the `MemoryPool::chunk` method which is synchronized by
141// atomics.
142unsafe impl<T, const N: usize> Sync for MemoryPool<T, N> {}
143
144/// Container for the `T` instance and synchronization atomic for the [`MemoryPool`].
145#[derive(Debug)]
146struct MemoryPoolInner<T> {
147    data: UnsafeCell<MaybeUninit<T>>,
148    available: AtomicBool,
149}
150
151impl<T> MemoryPoolInner<T> {
152    /// Creates a new `MemoryPoolInner`.
153    ///
154    /// Marked available and uninitialized.
155    const fn new() -> Self {
156        Self {
157            data: UnsafeCell::new(MaybeUninit::uninit()),
158            available: AtomicBool::new(true),
159        }
160    }
161
162    /// Reserves this [`MemoryPoolInner`].
163    fn reserve(&self) -> Option<MemoryPoolToken<'_, T>> {
164        if self.available.swap(false, Ordering::AcqRel) {
165            Some(MemoryPoolToken { inner: Some(self) })
166        } else {
167            None
168        }
169    }
170
171    /// Returns `true` if the [`MemoryPoolInner`] is currently available.
172    fn is_available(&self) -> bool {
173        self.available.load(Ordering::Acquire)
174    }
175}
176
177/// A token reserving an element in a [`MemoryPool`] which can be initialized to create a [`Chunk`].
178#[derive(Debug)]
179pub struct MemoryPoolToken<'a, T> {
180    inner: Option<&'a MemoryPoolInner<T>>,
181}
182
183impl<'a, T> MemoryPoolToken<'a, T> {
184    /// Consumes the [`MemoryPoolToken.inner`][field@MemoryPoolToken::inner] to prevent [`MemoryPoolToken`]'s drop
185    /// implementation from making the element available.
186    fn consume(&mut self) -> (&'a mut MaybeUninit<T>, &'a AtomicBool) {
187        let Some(inner) = self.inner.take() else {
188            unreachable!("`MemoryPoolToken` should only be consumed once");
189        };
190
191        let inner_data = {
192            let inner_data_ptr = inner.data.get();
193            // SAFETY:
194            // - `UnsafeCell` has the same layout as its content, thus the `chunk_ptr` points to an aligned and valid
195            //   value of `MaybeUninit<T>`.
196            // - We ensure via the `ChunkMetadata` that only this single mutable reference to the content of the
197            //   `UnsafeCell` exists.
198            unsafe { inner_data_ptr.as_mut() }
199                .expect("pointer to the contents of an `UnsafeCell` should not be null")
200        };
201
202        (inner_data, &inner.available)
203    }
204
205    /// Consumes and turns the [`MemoryPoolToken`] into an initialized [`Chunk`].
206    pub fn init(mut self, init_value: T) -> Chunk<'a, T> {
207        let (inner_data, available) = self.consume();
208
209        inner_data.write(init_value);
210
211        // SAFETY:
212        // `inner_data` has be initialized by writing the `init_value`.
213        unsafe { Chunk::new(inner_data, available) }
214    }
215
216    /// Initializes a [`Chunk`] in place via `init_function`.
217    ///
218    /// # Safety
219    ///
220    /// `init_function` must initialize the passed parameter to a valid `T` before the function returns.
221    pub unsafe fn init_in_place(
222        mut self,
223        init_function: impl FnOnce(&mut MaybeUninit<T>),
224    ) -> Chunk<'a, T> {
225        let (inner_data, available) = self.consume();
226
227        init_function(inner_data);
228
229        // SAFETY:
230        // `inner_data` has be initialized by `init_function`.
231        unsafe { Chunk::new(inner_data, available) }
232    }
233}
234
235impl<T> Drop for MemoryPoolToken<'_, T> {
236    fn drop(&mut self) {
237        if let Some(inner) = self.inner.take() {
238            inner.available.store(true, Ordering::Release);
239        }
240    }
241}
242
243/// A pointer type pointing to an instance of `T` in a [`MemoryPool`].
244///
245/// See [module-level documentation][self] for more information.
246pub struct Chunk<'a, T> {
247    // We're using `&mut MaybeUninit<T>` instead of `&mut T` to be able to drop `T` without going through a pointer
248    // while only having a reference.
249    // We cannot drop the contents of a reference without creating a dangling reference in the `Drop` implementation.
250    inner: &'a mut MaybeUninit<T>,
251    // Only held to ensure the chunk is made available on drop.
252    token: &'a AtomicBool,
253}
254
255impl<T> Debug for Chunk<'_, T>
256where
257    T: Debug,
258{
259    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
260        Debug::fmt(&**self, f)
261    }
262}
263
264impl<'a, T> Chunk<'a, T> {
265    /// Creates a new [`Chunk`].
266    ///
267    /// # Safety
268    ///
269    /// The `chunk` must be initialized.
270    unsafe fn new(chunk: &'a mut MaybeUninit<T>, token: &'a AtomicBool) -> Self {
271        Self {
272            inner: chunk,
273            token,
274        }
275    }
276}
277
278impl<T> Deref for Chunk<'_, T> {
279    type Target = T;
280
281    fn deref(&self) -> &Self::Target {
282        // SAFETY: The `Self::new` safety documentation requires the chunk to be initialized.
283        // It is only dropped in the drop implementation and cannot be un-initialized by any `Chunk` method, thus it is
284        // initialized here.
285        unsafe { self.inner.assume_init_ref() }
286    }
287}
288
289impl<T> DerefMut for Chunk<'_, T> {
290    fn deref_mut(&mut self) -> &mut <Self as Deref>::Target {
291        // SAFETY: The `Self::new` safety documentation requires the chunk to be initialized.
292        // It is only dropped in the drop implementation and cannot be un-initialized by any `Chunk` method, thus it is
293        // initialized here.
294        unsafe { self.inner.assume_init_mut() }
295    }
296}
297
298impl<T> Drop for Chunk<'_, T> {
299    fn drop(&mut self) {
300        // SAFETY: The `Self::new` safety documentation requires the chunk to be initialized.
301        // It is only dropped in the drop implementation and cannot be un-initialized by any `Chunk` method, thus it is
302        // initialized here.
303        unsafe { self.inner.assume_init_drop() };
304        debug_assert!(
305            !self.token.swap(true, Ordering::AcqRel),
306            "chunk was made available a second time"
307        );
308    }
309}
310
311#[cfg(test)]
312mod test {
313    use std::format;
314    use std::sync::atomic::AtomicUsize;
315
316    use super::*;
317
318    #[test]
319    fn pool() {
320        static POOL: MemoryPool<[u8; 10], 2> = MemoryPool::new();
321
322        let mut chunk = POOL.chunk([0; 10]).unwrap();
323        let chunk1 = POOL.chunk([0; 10]).unwrap();
324        assert!(POOL.chunk([0; 10]).is_err());
325        assert_eq!(chunk[0], 0);
326        chunk[0] += 1;
327        assert_eq!(chunk[0], 1);
328        assert_eq!(chunk1[0], 0);
329    }
330
331    #[test]
332    fn drop_test() {
333        #[derive(Debug)]
334        pub struct Dropper {}
335        impl Drop for Dropper {
336            fn drop(&mut self) {
337                COUNTER.fetch_add(1, Ordering::Relaxed);
338            }
339        }
340
341        static COUNTER: AtomicUsize = AtomicUsize::new(0);
342
343        {
344            let pool: MemoryPool<Dropper, 2> = MemoryPool::new();
345
346            let _ = pool.chunk(Dropper {});
347            assert_eq!(COUNTER.load(Ordering::Relaxed), 1);
348
349            {
350                let _dropper1 = pool.chunk(Dropper {}).unwrap();
351                let _dropper2 = pool.chunk(Dropper {}).unwrap();
352                assert!(pool.chunk(Dropper {}).is_err());
353            }
354            assert_eq!(COUNTER.load(Ordering::Relaxed), 4);
355            let _ = pool.chunk(Dropper {});
356            assert_eq!(COUNTER.load(Ordering::Relaxed), 5);
357        }
358
359        // After dropping `pool`, there were no additional drops of the contained type.
360        assert_eq!(COUNTER.load(Ordering::Relaxed), 5);
361    }
362
363    #[test]
364    fn drop_memory_pool_token() {
365        let pool = MemoryPool::<usize, 1>::new();
366        assert_eq!(pool.chunks_available(), 1);
367        {
368            let _token = pool.reserve().unwrap();
369            assert_eq!(pool.chunks_available(), 0);
370        }
371        assert_eq!(pool.chunks_available(), 1);
372    }
373
374    #[test]
375    fn chunks_available() {
376        let pool = MemoryPool::<usize, 2>::new();
377        assert_eq!(pool.chunks_available(), 2);
378        {
379            let _chunk = pool.chunk(0);
380            assert_eq!(pool.chunks_available(), 1);
381            let _chunk = pool.chunk(0);
382            assert_eq!(pool.chunks_available(), 0);
383        }
384        assert_eq!(pool.chunks_available(), 2);
385    }
386
387    #[test]
388    fn reserve_init() {
389        let pool = MemoryPool::<usize, 2>::new();
390        let token = pool.reserve().unwrap();
391        let chunk = token.init(2);
392        assert_eq!(*chunk, 2);
393    }
394
395    #[test]
396    fn reserve_init_in_place() {
397        let pool = MemoryPool::<usize, 2>::new();
398        let token = pool.reserve().unwrap();
399        // SAFETY: The passed closure initializes the chunk correctly.
400        let chunk = unsafe {
401            token.init_in_place(|m| {
402                m.write(2);
403            })
404        };
405        assert_eq!(*chunk, 2);
406    }
407
408    #[test]
409    #[should_panic(expected = "`MemoryPoolToken` should only be consumed once")]
410    fn consume_none() {
411        let pool = MemoryPool::<usize, 2>::new();
412        let mut token = pool.reserve().unwrap();
413        let _ = token.consume();
414        let _ = token.consume();
415    }
416
417    /// Ensures the `MemoryPool` and `Chunk` don't lose their `Send` & `Sync` auto trait implementations when
418    /// refactoring.
419    #[test]
420    fn send_sync() {
421        fn send<T>()
422        where
423            T: Send,
424        {
425        }
426        fn sync<T>()
427        where
428            T: Sync,
429        {
430        }
431        send::<MemoryPool<[u8; 10], 2>>();
432        sync::<MemoryPool<[u8; 10], 2>>();
433
434        send::<Chunk<[u8; 10]>>();
435        sync::<Chunk<[u8; 10]>>();
436    }
437
438    #[test]
439    fn debug_chunk() {
440        let pool = MemoryPool::<usize, 2>::new();
441        let chunk = pool.chunk(0).unwrap();
442        assert_eq!(format!("{chunk:?}"), "0");
443    }
444
445    #[test]
446    fn default_memory_pool() {
447        let pool: MemoryPool<usize, 2> = MemoryPool::default();
448        assert_eq!(pool.chunks_available(), 2);
449    }
450}