veecle_os_runtime/
actor.rs

1//! Smallest unit of work within a runtime instance.
2use core::convert::Infallible;
3use core::pin::Pin;
4
5/// Generates an [`Actor`] from a function.
6///
7///
8/// ```rust
9/// use veecle_os_runtime::{Reader, Writer};
10/// # use std::convert::Infallible;
11/// # use veecle_os_runtime::Storable;
12/// #
13/// # #[derive(Debug, PartialEq, Clone, Default, Storable)]
14/// # pub struct Sensor(pub u8);
15///
16/// #[veecle_os_runtime::actor]
17/// async fn macro_test_actor(
18///     _sensor_reader: Reader<'_, Sensor>,
19///     _sensor_writer: Writer<'_, Sensor>,
20///     #[init_context] _my_init_context: u32,
21/// ) -> Infallible {
22///     loop {
23///         // Do things.
24///     }
25/// }
26/// ```
27///
28/// # Attribute Arguments
29///
30/// ## `crate`
31///
32/// If necessary the path to [`veecle-os-runtime`][crate] can be overridden by passing a `crate = ::some::path` argument.
33///
34/// ```rust
35/// extern crate veecle_os_runtime as my_veecle_os_runtime;
36///
37/// use my_veecle_os_runtime::{Reader, Writer};
38/// # use std::convert::Infallible;
39/// # use my_veecle_os_runtime::Storable;
40/// #
41/// # #[derive(Debug, PartialEq, Clone, Default, Storable)]
42/// # pub struct Sensor(pub u8);
43///
44/// #[my_veecle_os_runtime::actor(crate = my_veecle_os_runtime)]
45/// async fn macro_test_actor(
46///     _sensor_reader: Reader<'_, Sensor>,
47///     _sensor_writer: Writer<'_, Sensor>,
48///     #[init_context] _my_init_context: u32,
49/// ) -> Infallible {
50///     loop {
51///         // Do things.
52///     }
53/// }
54/// ```
55#[doc(inline)]
56pub use veecle_os_runtime_macros::actor;
57
58use crate::datastore::{ExclusiveReader, InitializedReader, Reader, Storable, Writer};
59use crate::datastore::{Slot, generational};
60
61mod sealed {
62    pub trait Sealed {}
63}
64
65/// Actor interface.
66///
67/// The [`Actor`] trait allows writing actors that communicate within a runtime.
68/// It allows to define an initial context, which will be available for the whole life of the actor;
69/// a constructor method, with all the [`StoreRequest`] types it needs to communicate with other actors;
70/// and also the [`Actor::run`] method.
71///
72/// # Usage
73///
74/// Add the `Actor` implementing types to the actor list in [`veecle_os::runtime::execute!`](crate::execute!) when
75/// constructing a runtime instance.
76///
77/// The [`Actor::run`] method implements the actor's event loop.
78/// To yield back to the executor, every event loop must contain at least one `await`.
79/// Otherwise, the endless loop of the actor will block the executor and other actors.
80///
81/// ## Macros
82///
83/// The [`actor`][macro@crate::actor::actor] attribute macro can be used to implement actors.
84/// The function the macro is applied to is converted into the event loop.
85/// See its documentation for more details.
86///
87/// ### Example
88///
89/// ```rust
90/// # use std::convert::Infallible;
91/// # use std::fmt::Debug;
92/// #
93/// # use veecle_os_runtime::{Storable, Reader, Writer};
94/// #
95/// # #[derive(Debug, Default, Storable)]
96/// # pub struct Foo;
97/// #
98/// # #[derive(Debug, Default, Storable)]
99/// # pub struct Bar;
100/// #
101/// # pub struct Ctx;
102///
103/// #[veecle_os_runtime::actor]
104/// async fn my_actor(
105///     reader: Reader<'_, Foo>,
106///     writer: Writer<'_, Bar>,
107///     #[init_context] ctx: Ctx,
108/// ) -> Infallible {
109///     loop {
110///         // Do something here.
111///     }
112/// }
113/// ```
114///
115/// This will create a new struct called `MyActor` which implements [`Actor`], letting you register it into a runtime.
116///
117/// ## Manual
118///
119/// For cases where the macro is not sufficient, the [`Actor`] trait can also be implemented manually:
120///
121/// ```rust
122/// # use std::convert::Infallible;
123/// # use std::fmt::Debug;
124/// #
125/// # use veecle_os_runtime::{Storable, Reader, Writer, Actor};
126/// #
127/// # #[derive(Debug, Default, Storable)]
128/// # pub struct Foo;
129/// #
130/// # #[derive(Debug, Default, Storable)]
131/// # pub struct Bar;
132/// #
133/// # pub struct Ctx;
134///
135/// struct MyActor<'a> {
136///     reader: Reader<'a, Foo>,
137///     writer: Writer<'a, Bar>,
138///     context: Ctx,
139/// }
140///
141/// impl<'a> Actor<'a> for MyActor<'a> {
142///     type StoreRequest = (Reader<'a, Foo>, Writer<'a, Bar>);
143///     type InitContext = Ctx;
144///     type Error = Infallible;
145///
146///     fn new((reader, writer): Self::StoreRequest, context: Self::InitContext) -> Self {
147///         Self {
148///             reader,
149///             writer,
150///             context,
151///         }
152///     }
153///
154///     async fn run(mut self) -> Result<Infallible, Self::Error> {
155///         loop {
156///             // Do something here.
157///         }
158///     }
159/// }
160/// ```
161pub trait Actor<'a> {
162    /// [`Reader`]s and [`Writer`]s this actor requires.
163    type StoreRequest: StoreRequest<'a>;
164
165    /// Context that needs to be passed to the actor at initialisation.
166    type InitContext;
167
168    /// Error that this actor might return while running.
169    ///
170    /// This error is treated as fatal, if any actor returns an error the whole runtime will shutdown.
171    type Error: core::error::Error;
172
173    /// Creates a new instance of the struct implementing [`Actor`].
174    ///
175    /// See the [crate documentation][crate] for examples.
176    fn new(input: Self::StoreRequest, init_context: Self::InitContext) -> Self;
177
178    /// Runs the [`Actor`] event loop.
179    ///
180    /// See the [crate documentation][crate] for examples.
181    fn run(
182        self,
183    ) -> impl core::future::Future<Output = Result<core::convert::Infallible, Self::Error>>;
184}
185
186/// Allows requesting a (nearly) arbitrary amount of [`Reader`]s and [`Writer`]s in an [`Actor`].
187///
188/// This trait is not intended for direct usage by users.
189// Developer notes: This works by using type inference via `Datastore::reader` etc. to request `Reader`s etc. from the
190// `Datastore`.
191pub trait StoreRequest<'a>: sealed::Sealed {
192    /// Requests an instance of `Self` from the [`Datastore`].
193    #[doc(hidden)]
194    #[allow(async_fn_in_trait)] // It's actually private so it's fine.
195    async fn request(datastore: Pin<&'a impl Datastore>) -> Self;
196}
197
198impl sealed::Sealed for () {}
199
200/// Internal trait to abstract out type-erased and concrete data stores.
201pub trait Datastore {
202    /// Returns a generational source tracking the global datastore generation.
203    ///
204    /// This is used to ensure that every reader has had (or will have) a chance to read a value before a writer may
205    /// overwrite it.
206    fn source(self: Pin<&Self>) -> Pin<&generational::Source>;
207
208    #[expect(rustdoc::private_intra_doc_links)] // `rustdoc` is buggy with links from "pub" but unreachable types.
209    /// Returns a reference to the slot for a specific type.
210    ///
211    /// # Panics
212    ///
213    /// * If there is no [`Slot`] for `T` in the [`Datastore`].
214    #[expect(private_interfaces)] // The methods are internal.
215    fn slot<T>(self: Pin<&Self>) -> Pin<&Slot<T>>
216    where
217        T: Storable + 'static;
218}
219
220impl<S> Datastore for Pin<&S>
221where
222    S: Datastore,
223{
224    fn source(self: Pin<&Self>) -> Pin<&generational::Source> {
225        Pin::into_inner(self).source()
226    }
227
228    #[expect(private_interfaces)] // The methods are internal.
229    fn slot<T>(self: Pin<&Self>) -> Pin<&Slot<T>>
230    where
231        T: Storable + 'static,
232    {
233        Pin::into_inner(self).slot()
234    }
235}
236
237pub(crate) trait DatastoreExt<'a>: Copy {
238    #[cfg(test)]
239    /// Increments the global datastore generation.
240    ///
241    /// Asserts that every reader has had (or will have) a chance to read a value before a writer may overwrite it.
242    fn increment_generation(self);
243
244    /// Returns the [`Reader`] for a specific slot.
245    ///
246    /// # Panics
247    ///
248    /// * If there is no [`Slot`] for `T` in the [`Datastore`].
249    fn reader<T>(self) -> Reader<'a, T>
250    where
251        T: Storable + 'static;
252
253    /// Returns the [`ExclusiveReader`] for a specific slot.
254    ///
255    /// Exclusivity of the reader is not guaranteed by this method and must be ensured via other means (e.g.
256    /// [`crate::execute::validate_actors`]).
257    ///
258    /// # Panics
259    ///
260    /// * If there is no [`Slot`] for `T` in the [`Datastore`].
261    fn exclusive_reader<T>(self) -> ExclusiveReader<'a, T>
262    where
263        T: Storable + 'static;
264
265    /// Returns the [`Writer`] for a specific slot.
266    ///
267    /// # Panics
268    ///
269    /// * If the [`Writer`] for this slot has already been acquired.
270    ///
271    /// * If there is no [`Slot`] for `T` in the [`Datastore`].
272    fn writer<T>(self) -> Writer<'a, T>
273    where
274        T: Storable + 'static;
275}
276
277impl<'a, S> DatastoreExt<'a> for Pin<&'a S>
278where
279    S: Datastore,
280{
281    #[cfg(test)]
282    fn increment_generation(self) {
283        self.source().increment_generation()
284    }
285
286    fn reader<T>(self) -> Reader<'a, T>
287    where
288        T: Storable + 'static,
289    {
290        Reader::from_slot(self.slot::<T>())
291    }
292
293    fn exclusive_reader<T>(self) -> ExclusiveReader<'a, T>
294    where
295        T: Storable + 'static,
296    {
297        ExclusiveReader::from_slot(self.slot::<T>())
298    }
299
300    fn writer<T>(self) -> Writer<'a, T>
301    where
302        T: Storable + 'static,
303    {
304        Writer::new(self.source().waiter(), self.slot::<T>())
305    }
306}
307
308/// Implements a no-op for Actors that do not read or write any values.
309impl<'a> StoreRequest<'a> for () {
310    async fn request(_store: Pin<&'a impl Datastore>) -> Self {}
311}
312
313impl<T> sealed::Sealed for Reader<'_, T> where T: Storable + 'static {}
314
315impl<'a, T> StoreRequest<'a> for Reader<'a, T>
316where
317    T: Storable + 'static,
318{
319    async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
320        datastore.reader()
321    }
322}
323
324impl<T> sealed::Sealed for ExclusiveReader<'_, T> where T: Storable + 'static {}
325
326impl<'a, T> StoreRequest<'a> for ExclusiveReader<'a, T>
327where
328    T: Storable + 'static,
329{
330    async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
331        datastore.exclusive_reader()
332    }
333}
334
335impl<T> sealed::Sealed for InitializedReader<'_, T> where T: Storable + 'static {}
336
337impl<'a, T> StoreRequest<'a> for InitializedReader<'a, T>
338where
339    T: Storable + 'static,
340{
341    async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
342        Reader::from_slot(datastore.slot()).wait_init().await
343    }
344}
345
346impl<T> sealed::Sealed for Writer<'_, T> where T: Storable + 'static {}
347
348impl<'a, T> StoreRequest<'a> for Writer<'a, T>
349where
350    T: Storable + 'static,
351{
352    async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
353        datastore.writer()
354    }
355}
356
357/// Implements [`StoreRequest`] for provided types.
358macro_rules! impl_request_helper {
359    ($t:ident) => {
360        #[cfg_attr(docsrs, doc(fake_variadic))]
361        /// This trait is implemented for tuples up to seven items long.
362        impl<'a, $t> sealed::Sealed for ($t,) { }
363
364        #[cfg_attr(docsrs, doc(fake_variadic))]
365        /// This trait is implemented for tuples up to seven items long.
366        impl<'a, $t> StoreRequest<'a> for ($t,)
367        where
368            $t: StoreRequest<'a>,
369        {
370            async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
371                (<$t as StoreRequest>::request(datastore).await,)
372            }
373        }
374    };
375
376    (@impl $($t:ident)*) => {
377        #[cfg_attr(docsrs, doc(hidden))]
378        impl<'a, $($t),*> sealed::Sealed for ( $( $t, )* )
379        where
380            $($t: sealed::Sealed),*
381        { }
382
383        #[cfg_attr(docsrs, doc(hidden))]
384        impl<'a, $($t),*> StoreRequest<'a> for ( $( $t, )* )
385        where
386            $($t: StoreRequest<'a>),*
387        {
388            async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
389                // join! is necessary here to avoid argument-order-dependence with the #[actor] macro.
390                // This ensures that any `InitializedReaders` in self correctly track the generation at which they were
391                // first ready, so that the first `wait_for_update` sees the value that caused them to become
392                // initialized.
393                // See `multi_request_order_independence` for the verification of this.
394                futures::join!($( <$t as StoreRequest>::request(datastore), )*)
395            }
396        }
397    };
398
399    ($head:ident $($rest:ident)*) => {
400        impl_request_helper!(@impl $head $($rest)*);
401        impl_request_helper!($($rest)*);
402    };
403}
404
405impl_request_helper!(Z Y X W V U T);
406
407/// Macro helper to allow actors to return either a [`Result`] type or [`Infallible`] (and eventually [`!`]).
408#[diagnostic::on_unimplemented(
409    message = "#[veecle_os_runtime::actor] functions should return either a `Result<Infallible, _>` or `Infallible`",
410    label = "not a valid actor return type"
411)]
412pub trait IsActorResult: sealed::Sealed {
413    /// The error type this result converts into.
414    type Error;
415
416    /// Convert the result into an actual [`Result`] value.
417    fn into_result(self) -> Result<Infallible, Self::Error>;
418}
419
420impl<E> sealed::Sealed for Result<Infallible, E> {}
421
422impl<E> IsActorResult for Result<Infallible, E> {
423    type Error = E;
424
425    fn into_result(self) -> Result<Infallible, E> {
426        self
427    }
428}
429
430impl sealed::Sealed for Infallible {}
431
432impl IsActorResult for Infallible {
433    type Error = Infallible;
434
435    fn into_result(self) -> Result<Infallible, Self::Error> {
436        match self {}
437    }
438}
439
440#[cfg(test)]
441mod tests {
442    use core::future::Future;
443    use core::pin::pin;
444    use core::task::{Context, Poll};
445
446    use futures::future::FutureExt;
447
448    use crate::actor::{DatastoreExt, StoreRequest};
449    use crate::cons::{Cons, Nil};
450    use crate::datastore::{InitializedReader, Storable};
451
452    #[test]
453    fn multi_request_order_independence() {
454        #[derive(Debug, Storable)]
455        #[storable(crate = crate)]
456        struct A;
457
458        #[derive(Debug, Storable)]
459        #[storable(crate = crate)]
460        struct B;
461
462        let datastore = pin!(crate::execute::make_store::<Cons<A, Cons<B, Nil>>>());
463
464        let mut a_writer = datastore.as_ref().writer::<A>();
465        let mut b_writer = datastore.as_ref().writer::<B>();
466
467        // No matter the order these two request the readers, they should both resolve during the generation where the
468        // later of the two is first written.
469        let mut request_1 = pin!(<(InitializedReader<A>, InitializedReader<B>)>::request(
470            datastore.as_ref()
471        ));
472        let mut request_2 = pin!(<(InitializedReader<B>, InitializedReader<A>)>::request(
473            datastore.as_ref()
474        ));
475
476        let (request_1_waker, request_1_wake_count) = futures_test::task::new_count_waker();
477        let (request_2_waker, request_2_wake_count) = futures_test::task::new_count_waker();
478
479        let mut request_1_context = Context::from_waker(&request_1_waker);
480        let mut request_2_context = Context::from_waker(&request_2_waker);
481
482        assert!(matches!(
483            request_1.as_mut().poll(&mut request_1_context),
484            Poll::Pending
485        ));
486        assert!(matches!(
487            request_2.as_mut().poll(&mut request_2_context),
488            Poll::Pending
489        ));
490
491        let old_request_1_wake_count = request_1_wake_count.get();
492        let old_request_2_wake_count = request_2_wake_count.get();
493
494        datastore.as_ref().increment_generation();
495
496        a_writer.write(A).now_or_never().unwrap();
497
498        // When the first value is written, each future may or may not wake up, but if they do we need to poll them.
499        if request_1_wake_count.get() > old_request_1_wake_count {
500            assert!(matches!(
501                request_1.as_mut().poll(&mut request_1_context),
502                Poll::Pending
503            ));
504        }
505        if request_2_wake_count.get() > old_request_2_wake_count {
506            assert!(matches!(
507                request_2.as_mut().poll(&mut request_2_context),
508                Poll::Pending
509            ));
510        }
511
512        let old_request_1_wake_count = request_1_wake_count.get();
513        let old_request_2_wake_count = request_2_wake_count.get();
514
515        datastore.as_ref().increment_generation();
516
517        b_writer.write(B).now_or_never().unwrap();
518
519        // When the second value is written, both futures _must_ wake up and complete.
520        assert!(request_1_wake_count.get() > old_request_1_wake_count);
521        assert!(request_2_wake_count.get() > old_request_2_wake_count);
522
523        let Poll::Ready((mut request_1_a, mut request_1_b)) =
524            request_1.as_mut().poll(&mut request_1_context)
525        else {
526            panic!("request 1 was not ready")
527        };
528
529        let Poll::Ready((mut request_2_a, mut request_2_b)) =
530            request_2.as_mut().poll(&mut request_2_context)
531        else {
532            panic!("request 2 was not ready")
533        };
534
535        // All readers should see an update, since they've just been initialized but not `wait_for_update`d.
536        assert!(request_1_a.wait_for_update().now_or_never().is_some());
537        assert!(request_1_b.wait_for_update().now_or_never().is_some());
538
539        assert!(request_2_a.wait_for_update().now_or_never().is_some());
540        assert!(request_2_b.wait_for_update().now_or_never().is_some());
541    }
542}