veecle_os_runtime/memory_pool.rs
1//! An interrupt/thread-safe memory pool.
2//!
3//! The memory pool allows using static, stack or heap memory to store `SIZE` instances of `T`.
4//! [`MemoryPool::chunk`] provides [`Chunk`]s to interact with instances of `T`.
5//! [`Chunk`] is a pointer type, which means it is cheap to move.
6//! This makes the memory pool well suited for moving data between actors without copying.
7//! The memory pool is especially useful for large chunks of data or data that is expensive to move.
8//!
9//! [`Chunk`]s are automatically made available for re-use on drop.
10//!
11//! [`Chunk`]s can be created by:
12//! - [`MemoryPool::reserve`] and [`MemoryPoolToken::init`], which uses the provided value of `T` to initialize the
13//! chunk. [`MemoryPool::chunk`] combines both into a single method call.
14//! - [`MemoryPool::reserve`] and [`MemoryPoolToken::init_in_place`] to initialize `T` in place.
15//!
16//! # Example
17//!
18//! ```
19//! use veecle_os_runtime::{ExclusiveReader, Writer};
20//! use veecle_os_runtime::memory_pool::{Chunk, MemoryPool};
21//! use core::convert::Infallible;
22//! use veecle_os_runtime::Storable;
23//!
24//! #[derive(Debug, Storable)]
25//! #[storable(data_type = "Chunk<'static, u8>")]
26//! pub struct Data;
27//!
28//! #[veecle_os_runtime::actor]
29//! async fn exclusive_read_actor(mut reader: ExclusiveReader<'_, Data>) -> Infallible {
30//! loop {
31//! if let Some(chunk) = reader.take() {
32//! println!("Chunk received: {:?}", chunk);
33//! println!("Chunk content: {:?}", *chunk);
34//! } else {
35//! reader.wait_for_update().await;
36//! }
37//! }
38//! }
39//!
40//! #[veecle_os_runtime::actor]
41//! async fn write_actor(
42//! mut writer: Writer<'_, Data>,
43//! #[init_context] pool: &'static MemoryPool<u8, 5>,
44//! ) -> Infallible {
45//! for index in 0..10 {
46//! writer.write(pool.chunk(index).unwrap()).await;
47//! }
48//! # // Exit the application to allow doc-tests to complete.
49//! # std::process::exit(0);
50//! }
51//!
52//! static POOL: MemoryPool<u8, 5> = MemoryPool::new();
53//!
54//! # futures::executor::block_on(
55//! #
56//! veecle_os_runtime::execute! {
57//! store: [Data],
58//! actors: [
59//! ExclusiveReadActor,
60//! WriteActor: &POOL,
61//! ]
62//! }
63//! # );
64//! ```
65
66use core::cell::UnsafeCell;
67use core::fmt;
68use core::fmt::{Debug, Formatter};
69use core::mem::MaybeUninit;
70use core::ops::{Deref, DerefMut};
71use core::sync::atomic::{AtomicBool, Ordering};
72
73/// Interrupt- and thread-safe memory pool.
74///
75/// See [module-level documentation][self] for more information.
76#[derive(Debug)]
77pub struct MemoryPool<T, const SIZE: usize> {
78 chunks: [MemoryPoolInner<T>; SIZE],
79}
80
81impl<T, const SIZE: usize> Default for MemoryPool<T, SIZE> {
82 fn default() -> Self {
83 Self::new()
84 }
85}
86
87impl<T, const SIZE: usize> MemoryPool<T, SIZE> {
88 /// Creates a new [`MemoryPool`].
89 ///
90 /// `SIZE` is required to be larger than 0.
91 pub const fn new() -> Self {
92 const {
93 assert!(SIZE > 0, "empty ObjectPool");
94 }
95
96 Self {
97 chunks: [const { MemoryPoolInner::new() }; SIZE],
98 }
99 }
100
101 /// Reserves an element in the [`MemoryPool`].
102 ///
103 /// Returns `None` if no element is available.
104 ///
105 /// The returned token has to be initialized via [`MemoryPoolToken::init`] before use.
106 /// See [`MemoryPool::chunk`] for a convenience wrapper combining reserving and initializing a [`Chunk`].
107 pub fn reserve(&self) -> Option<MemoryPoolToken<'_, T>> {
108 self.chunks.iter().find_map(|chunk| chunk.reserve())
109 }
110
111 /// Retrieves a [`Chunk`] from the [`MemoryPool`] and initializes it with `init_value`.
112 ///
113 /// Returns `Err(init_value)` if no more [`Chunk`]s are available.
114 ///
115 /// Convenience wrapper combining [`MemoryPool::reserve`] and [`MemoryPoolToken::init].
116 pub fn chunk(&self, init_value: T) -> Result<Chunk<'_, T>, T> {
117 // We need to split reserving and initializing of the `Chunk` because we cannot copy the `init_value` into
118 // every `reserve` call.
119 let token = self.reserve();
120
121 if let Some(token) = token {
122 Ok(token.init(init_value))
123 } else {
124 Err(init_value)
125 }
126 }
127
128 /// Calculates the amount of chunks currently available.
129 ///
130 /// Due to accesses from interrupts and/or other threads, this value might not be correct.
131 /// Only intended for metrics.
132 pub fn chunks_available(&self) -> usize {
133 self.chunks
134 .iter()
135 .map(|chunk| usize::from(chunk.is_available()))
136 .sum()
137 }
138}
139
140// SAFETY: All accesses to the `MemoryPool` are done through the `MemoryPool::chunk` method which is synchronized by
141// atomics.
142unsafe impl<T, const N: usize> Sync for MemoryPool<T, N> {}
143
144/// Container for the `T` instance and synchronization atomic for the [`MemoryPool`].
145#[derive(Debug)]
146struct MemoryPoolInner<T> {
147 data: UnsafeCell<MaybeUninit<T>>,
148 available: AtomicBool,
149}
150
151impl<T> MemoryPoolInner<T> {
152 /// Creates a new `MemoryPoolInner`.
153 ///
154 /// Marked available and uninitialized.
155 const fn new() -> Self {
156 Self {
157 data: UnsafeCell::new(MaybeUninit::uninit()),
158 available: AtomicBool::new(true),
159 }
160 }
161
162 /// Reserves this [`MemoryPoolInner`].
163 fn reserve(&self) -> Option<MemoryPoolToken<'_, T>> {
164 if self.available.swap(false, Ordering::AcqRel) {
165 Some(MemoryPoolToken { inner: Some(self) })
166 } else {
167 None
168 }
169 }
170
171 /// Returns `true` if the [`MemoryPoolInner`] is currently available.
172 fn is_available(&self) -> bool {
173 self.available.load(Ordering::Acquire)
174 }
175}
176
177/// A token reserving an element in a [`MemoryPool`] which can be initialized to create a [`Chunk`].
178#[derive(Debug)]
179pub struct MemoryPoolToken<'a, T> {
180 inner: Option<&'a MemoryPoolInner<T>>,
181}
182
183impl<'a, T> MemoryPoolToken<'a, T> {
184 /// Consumes the [`MemoryPoolToken.inner`][field@MemoryPoolToken::inner] to prevent [`MemoryPoolToken`]'s drop
185 /// implementation from making the element available.
186 fn consume(&mut self) -> (&'a mut MaybeUninit<T>, &'a AtomicBool) {
187 let Some(inner) = self.inner.take() else {
188 unreachable!("`MemoryPoolToken` should only be consumed once");
189 };
190
191 let inner_data = {
192 let inner_data_ptr = inner.data.get();
193 // SAFETY:
194 // - `UnsafeCell` has the same layout as its content, thus the `chunk_ptr` points to an aligned and valid
195 // value of `MaybeUninit<T>`.
196 // - We ensure via the `ChunkMetadata` that only this single mutable reference to the content of the
197 // `UnsafeCell` exists.
198 unsafe { inner_data_ptr.as_mut() }
199 .expect("pointer to the contents of an `UnsafeCell` should not be null")
200 };
201
202 (inner_data, &inner.available)
203 }
204
205 /// Consumes and turns the [`MemoryPoolToken`] into an initialized [`Chunk`].
206 pub fn init(mut self, init_value: T) -> Chunk<'a, T> {
207 let (inner_data, available) = self.consume();
208
209 inner_data.write(init_value);
210
211 // SAFETY:
212 // `inner_data` has be initialized by writing the `init_value`.
213 unsafe { Chunk::new(inner_data, available) }
214 }
215
216 /// Initializes a [`Chunk`] in place via `init_function`.
217 ///
218 /// # Safety
219 ///
220 /// `init_function` must initialize the passed parameter to a valid `T` before the function returns.
221 pub unsafe fn init_in_place(
222 mut self,
223 init_function: impl FnOnce(&mut MaybeUninit<T>),
224 ) -> Chunk<'a, T> {
225 let (inner_data, available) = self.consume();
226
227 init_function(inner_data);
228
229 // SAFETY:
230 // `inner_data` has be initialized by `init_function`.
231 unsafe { Chunk::new(inner_data, available) }
232 }
233}
234
235impl<T> Drop for MemoryPoolToken<'_, T> {
236 fn drop(&mut self) {
237 if let Some(inner) = self.inner.take() {
238 inner.available.store(true, Ordering::Release);
239 }
240 }
241}
242
243/// A pointer type pointing to an instance of `T` in a [`MemoryPool`].
244///
245/// See [module-level documentation][self] for more information.
246pub struct Chunk<'a, T> {
247 // We're using `&mut MaybeUninit<T>` instead of `&mut T` to be able to drop `T` without going through a pointer
248 // while only having a reference.
249 // We cannot drop the contents of a reference without creating a dangling reference in the `Drop` implementation.
250 inner: &'a mut MaybeUninit<T>,
251 // Only held to ensure the chunk is made available on drop.
252 token: &'a AtomicBool,
253}
254
255impl<T> Debug for Chunk<'_, T>
256where
257 T: Debug,
258{
259 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
260 Debug::fmt(&**self, f)
261 }
262}
263
264impl<'a, T> Chunk<'a, T> {
265 /// Creates a new [`Chunk`].
266 ///
267 /// # Safety
268 ///
269 /// The `chunk` must be initialized.
270 unsafe fn new(chunk: &'a mut MaybeUninit<T>, token: &'a AtomicBool) -> Self {
271 Self {
272 inner: chunk,
273 token,
274 }
275 }
276}
277
278impl<T> Deref for Chunk<'_, T> {
279 type Target = T;
280
281 fn deref(&self) -> &Self::Target {
282 // SAFETY: The `Self::new` safety documentation requires the chunk to be initialized.
283 // It is only dropped in the drop implementation and cannot be un-initialized by any `Chunk` method, thus it is
284 // initialized here.
285 unsafe { self.inner.assume_init_ref() }
286 }
287}
288
289impl<T> DerefMut for Chunk<'_, T> {
290 fn deref_mut(&mut self) -> &mut <Self as Deref>::Target {
291 // SAFETY: The `Self::new` safety documentation requires the chunk to be initialized.
292 // It is only dropped in the drop implementation and cannot be un-initialized by any `Chunk` method, thus it is
293 // initialized here.
294 unsafe { self.inner.assume_init_mut() }
295 }
296}
297
298impl<T> Drop for Chunk<'_, T> {
299 fn drop(&mut self) {
300 // SAFETY: The `Self::new` safety documentation requires the chunk to be initialized.
301 // It is only dropped in the drop implementation and cannot be un-initialized by any `Chunk` method, thus it is
302 // initialized here.
303 unsafe { self.inner.assume_init_drop() };
304 debug_assert!(
305 !self.token.swap(true, Ordering::AcqRel),
306 "chunk was made available a second time"
307 );
308 }
309}
310
311#[cfg(test)]
312mod test {
313 use std::format;
314 use std::sync::atomic::AtomicUsize;
315
316 use super::*;
317
318 #[test]
319 fn pool() {
320 static POOL: MemoryPool<[u8; 10], 2> = MemoryPool::new();
321
322 let mut chunk = POOL.chunk([0; 10]).unwrap();
323 let chunk1 = POOL.chunk([0; 10]).unwrap();
324 assert!(POOL.chunk([0; 10]).is_err());
325 assert_eq!(chunk[0], 0);
326 chunk[0] += 1;
327 assert_eq!(chunk[0], 1);
328 assert_eq!(chunk1[0], 0);
329 }
330
331 #[test]
332 fn drop_test() {
333 #[derive(Debug)]
334 pub struct Dropper {}
335 impl Drop for Dropper {
336 fn drop(&mut self) {
337 COUNTER.fetch_add(1, Ordering::Relaxed);
338 }
339 }
340
341 static COUNTER: AtomicUsize = AtomicUsize::new(0);
342
343 {
344 let pool: MemoryPool<Dropper, 2> = MemoryPool::new();
345
346 let _ = pool.chunk(Dropper {});
347 assert_eq!(COUNTER.load(Ordering::Relaxed), 1);
348
349 {
350 let _dropper1 = pool.chunk(Dropper {}).unwrap();
351 let _dropper2 = pool.chunk(Dropper {}).unwrap();
352 assert!(pool.chunk(Dropper {}).is_err());
353 }
354 assert_eq!(COUNTER.load(Ordering::Relaxed), 4);
355 let _ = pool.chunk(Dropper {});
356 assert_eq!(COUNTER.load(Ordering::Relaxed), 5);
357 }
358
359 // After dropping `pool`, there were no additional drops of the contained type.
360 assert_eq!(COUNTER.load(Ordering::Relaxed), 5);
361 }
362
363 #[test]
364 fn drop_memory_pool_token() {
365 let pool = MemoryPool::<usize, 1>::new();
366 assert_eq!(pool.chunks_available(), 1);
367 {
368 let _token = pool.reserve().unwrap();
369 assert_eq!(pool.chunks_available(), 0);
370 }
371 assert_eq!(pool.chunks_available(), 1);
372 }
373
374 #[test]
375 fn chunks_available() {
376 let pool = MemoryPool::<usize, 2>::new();
377 assert_eq!(pool.chunks_available(), 2);
378 {
379 let _chunk = pool.chunk(0);
380 assert_eq!(pool.chunks_available(), 1);
381 let _chunk = pool.chunk(0);
382 assert_eq!(pool.chunks_available(), 0);
383 }
384 assert_eq!(pool.chunks_available(), 2);
385 }
386
387 #[test]
388 fn reserve_init() {
389 let pool = MemoryPool::<usize, 2>::new();
390 let token = pool.reserve().unwrap();
391 let chunk = token.init(2);
392 assert_eq!(*chunk, 2);
393 }
394
395 #[test]
396 fn reserve_init_in_place() {
397 let pool = MemoryPool::<usize, 2>::new();
398 let token = pool.reserve().unwrap();
399 // SAFETY: The passed closure initializes the chunk correctly.
400 let chunk = unsafe {
401 token.init_in_place(|m| {
402 m.write(2);
403 })
404 };
405 assert_eq!(*chunk, 2);
406 }
407
408 #[test]
409 #[should_panic(expected = "`MemoryPoolToken` should only be consumed once")]
410 fn consume_none() {
411 let pool = MemoryPool::<usize, 2>::new();
412 let mut token = pool.reserve().unwrap();
413 let _ = token.consume();
414 let _ = token.consume();
415 }
416
417 /// Ensures the `MemoryPool` and `Chunk` don't lose their `Send` & `Sync` auto trait implementations when
418 /// refactoring.
419 #[test]
420 fn send_sync() {
421 fn send<T>()
422 where
423 T: Send,
424 {
425 }
426 fn sync<T>()
427 where
428 T: Sync,
429 {
430 }
431 send::<MemoryPool<[u8; 10], 2>>();
432 sync::<MemoryPool<[u8; 10], 2>>();
433
434 send::<Chunk<[u8; 10]>>();
435 sync::<Chunk<[u8; 10]>>();
436 }
437
438 #[test]
439 fn debug_chunk() {
440 let pool = MemoryPool::<usize, 2>::new();
441 let chunk = pool.chunk(0).unwrap();
442 assert_eq!(format!("{chunk:?}"), "0");
443 }
444
445 #[test]
446 fn default_memory_pool() {
447 let pool: MemoryPool<usize, 2> = MemoryPool::default();
448 assert_eq!(pool.chunks_available(), 2);
449 }
450}