veecle_os_runtime/actor.rs
1//! Smallest unit of work within a runtime instance.
2use core::convert::Infallible;
3use core::pin::Pin;
4
5/// Generates an [`Actor`] from a function.
6///
7///
8/// ```rust
9/// use veecle_os_runtime::{Reader, Writer};
10/// # use std::convert::Infallible;
11/// # use veecle_os_runtime::Storable;
12/// #
13/// # #[derive(Debug, PartialEq, Clone, Default, Storable)]
14/// # pub struct Sensor(pub u8);
15///
16/// #[veecle_os_runtime::actor]
17/// async fn macro_test_actor(
18/// _sensor_reader: Reader<'_, Sensor>,
19/// _sensor_writer: Writer<'_, Sensor>,
20/// #[init_context] _my_init_context: u32,
21/// ) -> Infallible {
22/// loop {
23/// // Do things.
24/// }
25/// }
26/// ```
27///
28/// # Attribute Arguments
29///
30/// ## `crate`
31///
32/// If necessary the path to [`veecle-os-runtime`][crate] can be overridden by passing a `crate = ::some::path` argument.
33///
34/// ```rust
35/// extern crate veecle_os_runtime as my_veecle_os_runtime;
36///
37/// use my_veecle_os_runtime::{Reader, Writer};
38/// # use std::convert::Infallible;
39/// # use my_veecle_os_runtime::Storable;
40/// #
41/// # #[derive(Debug, PartialEq, Clone, Default, Storable)]
42/// # pub struct Sensor(pub u8);
43///
44/// #[my_veecle_os_runtime::actor(crate = my_veecle_os_runtime)]
45/// async fn macro_test_actor(
46/// _sensor_reader: Reader<'_, Sensor>,
47/// _sensor_writer: Writer<'_, Sensor>,
48/// #[init_context] _my_init_context: u32,
49/// ) -> Infallible {
50/// loop {
51/// // Do things.
52/// }
53/// }
54/// ```
55#[doc(inline)]
56pub use veecle_os_runtime_macros::actor;
57
58use crate::datastore::{ExclusiveReader, InitializedReader, Reader, Storable, Writer};
59use crate::datastore::{Slot, generational};
60
61mod sealed {
62 pub trait Sealed {}
63}
64
65/// Actor interface.
66///
67/// The [`Actor`] trait allows writing actors that communicate within a runtime.
68/// It allows to define an initial context, which will be available for the whole life of the actor;
69/// a constructor method, with all the [`StoreRequest`] types it needs to communicate with other actors;
70/// and also the [`Actor::run`] method.
71///
72/// # Usage
73///
74/// Add the `Actor` implementing types to the actor list in [`veecle_os::runtime::execute!`](crate::execute!) when
75/// constructing a runtime instance.
76///
77/// The [`Actor::run`] method implements the actor's event loop.
78/// To yield back to the executor, every event loop must contain at least one `await`.
79/// Otherwise, the endless loop of the actor will block the executor and other actors.
80///
81/// ## Macros
82///
83/// The [`actor`][macro@crate::actor::actor] attribute macro can be used to implement actors.
84/// The function the macro is applied to is converted into the event loop.
85/// See its documentation for more details.
86///
87/// ### Example
88///
89/// ```rust
90/// # use std::convert::Infallible;
91/// # use std::fmt::Debug;
92/// #
93/// # use veecle_os_runtime::{Storable, Reader, Writer};
94/// #
95/// # #[derive(Debug, Default, Storable)]
96/// # pub struct Foo;
97/// #
98/// # #[derive(Debug, Default, Storable)]
99/// # pub struct Bar;
100/// #
101/// # pub struct Ctx;
102///
103/// #[veecle_os_runtime::actor]
104/// async fn my_actor(
105/// reader: Reader<'_, Foo>,
106/// writer: Writer<'_, Bar>,
107/// #[init_context] ctx: Ctx,
108/// ) -> Infallible {
109/// loop {
110/// // Do something here.
111/// }
112/// }
113/// ```
114///
115/// This will create a new struct called `MyActor` which implements [`Actor`], letting you register it into a runtime.
116///
117/// ## Manual
118///
119/// For cases where the macro is not sufficient, the [`Actor`] trait can also be implemented manually:
120///
121/// ```rust
122/// # use std::convert::Infallible;
123/// # use std::fmt::Debug;
124/// #
125/// # use veecle_os_runtime::{Storable, Reader, Writer, Actor};
126/// #
127/// # #[derive(Debug, Default, Storable)]
128/// # pub struct Foo;
129/// #
130/// # #[derive(Debug, Default, Storable)]
131/// # pub struct Bar;
132/// #
133/// # pub struct Ctx;
134///
135/// struct MyActor<'a> {
136/// reader: Reader<'a, Foo>,
137/// writer: Writer<'a, Bar>,
138/// context: Ctx,
139/// }
140///
141/// impl<'a> Actor<'a> for MyActor<'a> {
142/// type StoreRequest = (Reader<'a, Foo>, Writer<'a, Bar>);
143/// type InitContext = Ctx;
144/// type Error = Infallible;
145///
146/// fn new((reader, writer): Self::StoreRequest, context: Self::InitContext) -> Self {
147/// Self {
148/// reader,
149/// writer,
150/// context,
151/// }
152/// }
153///
154/// async fn run(mut self) -> Result<Infallible, Self::Error> {
155/// loop {
156/// // Do something here.
157/// }
158/// }
159/// }
160/// ```
161pub trait Actor<'a> {
162 /// [`Reader`]s and [`Writer`]s this actor requires.
163 type StoreRequest: StoreRequest<'a>;
164
165 /// Context that needs to be passed to the actor at initialisation.
166 type InitContext;
167
168 /// Error that this actor might return while running.
169 ///
170 /// This error is treated as fatal, if any actor returns an error the whole runtime will shutdown.
171 type Error: core::error::Error;
172
173 /// Creates a new instance of the struct implementing [`Actor`].
174 ///
175 /// See the [crate documentation][crate] for examples.
176 fn new(input: Self::StoreRequest, init_context: Self::InitContext) -> Self;
177
178 /// Runs the [`Actor`] event loop.
179 ///
180 /// See the [crate documentation][crate] for examples.
181 fn run(
182 self,
183 ) -> impl core::future::Future<Output = Result<core::convert::Infallible, Self::Error>>;
184}
185
186/// Allows requesting a (nearly) arbitrary amount of [`Reader`]s and [`Writer`]s in an [`Actor`].
187///
188/// This trait is not intended for direct usage by users.
189// Developer notes: This works by using type inference via `Datastore::reader` etc. to request `Reader`s etc. from the
190// `Datastore`.
191pub trait StoreRequest<'a>: sealed::Sealed {
192 /// Requests an instance of `Self` from the [`Datastore`].
193 #[doc(hidden)]
194 #[allow(async_fn_in_trait)] // It's actually private so it's fine.
195 async fn request(datastore: Pin<&'a impl Datastore>) -> Self;
196}
197
198impl sealed::Sealed for () {}
199
200/// Internal trait to abstract out type-erased and concrete data stores.
201pub trait Datastore {
202 /// Returns a generational source tracking the global datastore generation.
203 ///
204 /// This is used to ensure that every reader has had (or will have) a chance to read a value before a writer may
205 /// overwrite it.
206 fn source(self: Pin<&Self>) -> Pin<&generational::Source>;
207
208 #[expect(rustdoc::private_intra_doc_links)] // `rustdoc` is buggy with links from "pub" but unreachable types.
209 /// Returns a reference to the slot for a specific type.
210 ///
211 /// # Panics
212 ///
213 /// * If there is no [`Slot`] for `T` in the [`Datastore`].
214 #[expect(private_interfaces)] // The methods are internal.
215 fn slot<T>(self: Pin<&Self>) -> Pin<&Slot<T>>
216 where
217 T: Storable + 'static;
218}
219
220impl<S> Datastore for Pin<&S>
221where
222 S: Datastore,
223{
224 fn source(self: Pin<&Self>) -> Pin<&generational::Source> {
225 Pin::into_inner(self).source()
226 }
227
228 #[expect(private_interfaces)] // The methods are internal.
229 fn slot<T>(self: Pin<&Self>) -> Pin<&Slot<T>>
230 where
231 T: Storable + 'static,
232 {
233 Pin::into_inner(self).slot()
234 }
235}
236
237pub(crate) trait DatastoreExt<'a>: Copy {
238 #[cfg(test)]
239 /// Increments the global datastore generation.
240 ///
241 /// Asserts that every reader has had (or will have) a chance to read a value before a writer may overwrite it.
242 fn increment_generation(self);
243
244 /// Returns the [`Reader`] for a specific slot.
245 ///
246 /// # Panics
247 ///
248 /// * If there is no [`Slot`] for `T` in the [`Datastore`].
249 fn reader<T>(self) -> Reader<'a, T>
250 where
251 T: Storable + 'static;
252
253 /// Returns the [`ExclusiveReader`] for a specific slot.
254 ///
255 /// Exclusivity of the reader is not guaranteed by this method and must be ensured via other means (e.g.
256 /// [`crate::execute::validate_actors`]).
257 ///
258 /// # Panics
259 ///
260 /// * If there is no [`Slot`] for `T` in the [`Datastore`].
261 fn exclusive_reader<T>(self) -> ExclusiveReader<'a, T>
262 where
263 T: Storable + 'static;
264
265 /// Returns the [`Writer`] for a specific slot.
266 ///
267 /// # Panics
268 ///
269 /// * If the [`Writer`] for this slot has already been acquired.
270 ///
271 /// * If there is no [`Slot`] for `T` in the [`Datastore`].
272 fn writer<T>(self) -> Writer<'a, T>
273 where
274 T: Storable + 'static;
275}
276
277impl<'a, S> DatastoreExt<'a> for Pin<&'a S>
278where
279 S: Datastore,
280{
281 #[cfg(test)]
282 fn increment_generation(self) {
283 self.source().increment_generation()
284 }
285
286 fn reader<T>(self) -> Reader<'a, T>
287 where
288 T: Storable + 'static,
289 {
290 Reader::from_slot(self.slot::<T>())
291 }
292
293 fn exclusive_reader<T>(self) -> ExclusiveReader<'a, T>
294 where
295 T: Storable + 'static,
296 {
297 ExclusiveReader::from_slot(self.slot::<T>())
298 }
299
300 fn writer<T>(self) -> Writer<'a, T>
301 where
302 T: Storable + 'static,
303 {
304 Writer::new(self.source().waiter(), self.slot::<T>())
305 }
306}
307
308/// Implements a no-op for Actors that do not read or write any values.
309impl<'a> StoreRequest<'a> for () {
310 async fn request(_store: Pin<&'a impl Datastore>) -> Self {}
311}
312
313impl<T> sealed::Sealed for Reader<'_, T> where T: Storable + 'static {}
314
315impl<'a, T> StoreRequest<'a> for Reader<'a, T>
316where
317 T: Storable + 'static,
318{
319 async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
320 datastore.reader()
321 }
322}
323
324impl<T> sealed::Sealed for ExclusiveReader<'_, T> where T: Storable + 'static {}
325
326impl<'a, T> StoreRequest<'a> for ExclusiveReader<'a, T>
327where
328 T: Storable + 'static,
329{
330 async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
331 datastore.exclusive_reader()
332 }
333}
334
335impl<T> sealed::Sealed for InitializedReader<'_, T> where T: Storable + 'static {}
336
337impl<'a, T> StoreRequest<'a> for InitializedReader<'a, T>
338where
339 T: Storable + 'static,
340{
341 async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
342 Reader::from_slot(datastore.slot()).wait_init().await
343 }
344}
345
346impl<T> sealed::Sealed for Writer<'_, T> where T: Storable + 'static {}
347
348impl<'a, T> StoreRequest<'a> for Writer<'a, T>
349where
350 T: Storable + 'static,
351{
352 async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
353 datastore.writer()
354 }
355}
356
357/// Implements [`StoreRequest`] for provided types.
358macro_rules! impl_request_helper {
359 ($t:ident) => {
360 #[cfg_attr(docsrs, doc(fake_variadic))]
361 /// This trait is implemented for tuples up to seven items long.
362 impl<'a, $t> sealed::Sealed for ($t,) { }
363
364 #[cfg_attr(docsrs, doc(fake_variadic))]
365 /// This trait is implemented for tuples up to seven items long.
366 impl<'a, $t> StoreRequest<'a> for ($t,)
367 where
368 $t: StoreRequest<'a>,
369 {
370 async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
371 (<$t as StoreRequest>::request(datastore).await,)
372 }
373 }
374 };
375
376 (@impl $($t:ident)*) => {
377 #[cfg_attr(docsrs, doc(hidden))]
378 impl<'a, $($t),*> sealed::Sealed for ( $( $t, )* )
379 where
380 $($t: sealed::Sealed),*
381 { }
382
383 #[cfg_attr(docsrs, doc(hidden))]
384 impl<'a, $($t),*> StoreRequest<'a> for ( $( $t, )* )
385 where
386 $($t: StoreRequest<'a>),*
387 {
388 async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
389 // join! is necessary here to avoid argument-order-dependence with the #[actor] macro.
390 // This ensures that any `InitializedReaders` in self correctly track the generation at which they were
391 // first ready, so that the first `wait_for_update` sees the value that caused them to become
392 // initialized.
393 // See `multi_request_order_independence` for the verification of this.
394 futures::join!($( <$t as StoreRequest>::request(datastore), )*)
395 }
396 }
397 };
398
399 ($head:ident $($rest:ident)*) => {
400 impl_request_helper!(@impl $head $($rest)*);
401 impl_request_helper!($($rest)*);
402 };
403}
404
405impl_request_helper!(Z Y X W V U T);
406
407/// Macro helper to allow actors to return either a [`Result`] type or [`Infallible`] (and eventually [`!`]).
408#[diagnostic::on_unimplemented(
409 message = "#[veecle_os_runtime::actor] functions should return either a `Result<Infallible, _>` or `Infallible`",
410 label = "not a valid actor return type"
411)]
412pub trait IsActorResult: sealed::Sealed {
413 /// The error type this result converts into.
414 type Error;
415
416 /// Convert the result into an actual [`Result`] value.
417 fn into_result(self) -> Result<Infallible, Self::Error>;
418}
419
420impl<E> sealed::Sealed for Result<Infallible, E> {}
421
422impl<E> IsActorResult for Result<Infallible, E> {
423 type Error = E;
424
425 fn into_result(self) -> Result<Infallible, E> {
426 self
427 }
428}
429
430impl sealed::Sealed for Infallible {}
431
432impl IsActorResult for Infallible {
433 type Error = Infallible;
434
435 fn into_result(self) -> Result<Infallible, Self::Error> {
436 match self {}
437 }
438}
439
440#[cfg(test)]
441mod tests {
442 use core::future::Future;
443 use core::pin::pin;
444 use core::task::{Context, Poll};
445
446 use futures::future::FutureExt;
447
448 use crate::actor::{DatastoreExt, StoreRequest};
449 use crate::cons::{Cons, Nil};
450 use crate::datastore::{InitializedReader, Storable};
451
452 #[test]
453 fn multi_request_order_independence() {
454 #[derive(Debug, Storable)]
455 #[storable(crate = crate)]
456 struct A;
457
458 #[derive(Debug, Storable)]
459 #[storable(crate = crate)]
460 struct B;
461
462 let datastore = pin!(crate::execute::make_store::<Cons<A, Cons<B, Nil>>>());
463
464 let mut a_writer = datastore.as_ref().writer::<A>();
465 let mut b_writer = datastore.as_ref().writer::<B>();
466
467 // No matter the order these two request the readers, they should both resolve during the generation where the
468 // later of the two is first written.
469 let mut request_1 = pin!(<(InitializedReader<A>, InitializedReader<B>)>::request(
470 datastore.as_ref()
471 ));
472 let mut request_2 = pin!(<(InitializedReader<B>, InitializedReader<A>)>::request(
473 datastore.as_ref()
474 ));
475
476 let (request_1_waker, request_1_wake_count) = futures_test::task::new_count_waker();
477 let (request_2_waker, request_2_wake_count) = futures_test::task::new_count_waker();
478
479 let mut request_1_context = Context::from_waker(&request_1_waker);
480 let mut request_2_context = Context::from_waker(&request_2_waker);
481
482 assert!(matches!(
483 request_1.as_mut().poll(&mut request_1_context),
484 Poll::Pending
485 ));
486 assert!(matches!(
487 request_2.as_mut().poll(&mut request_2_context),
488 Poll::Pending
489 ));
490
491 let old_request_1_wake_count = request_1_wake_count.get();
492 let old_request_2_wake_count = request_2_wake_count.get();
493
494 datastore.as_ref().increment_generation();
495
496 a_writer.write(A).now_or_never().unwrap();
497
498 // When the first value is written, each future may or may not wake up, but if they do we need to poll them.
499 if request_1_wake_count.get() > old_request_1_wake_count {
500 assert!(matches!(
501 request_1.as_mut().poll(&mut request_1_context),
502 Poll::Pending
503 ));
504 }
505 if request_2_wake_count.get() > old_request_2_wake_count {
506 assert!(matches!(
507 request_2.as_mut().poll(&mut request_2_context),
508 Poll::Pending
509 ));
510 }
511
512 let old_request_1_wake_count = request_1_wake_count.get();
513 let old_request_2_wake_count = request_2_wake_count.get();
514
515 datastore.as_ref().increment_generation();
516
517 b_writer.write(B).now_or_never().unwrap();
518
519 // When the second value is written, both futures _must_ wake up and complete.
520 assert!(request_1_wake_count.get() > old_request_1_wake_count);
521 assert!(request_2_wake_count.get() > old_request_2_wake_count);
522
523 let Poll::Ready((mut request_1_a, mut request_1_b)) =
524 request_1.as_mut().poll(&mut request_1_context)
525 else {
526 panic!("request 1 was not ready")
527 };
528
529 let Poll::Ready((mut request_2_a, mut request_2_b)) =
530 request_2.as_mut().poll(&mut request_2_context)
531 else {
532 panic!("request 2 was not ready")
533 };
534
535 // All readers should see an update, since they've just been initialized but not `wait_for_update`d.
536 assert!(request_1_a.wait_for_update().now_or_never().is_some());
537 assert!(request_1_b.wait_for_update().now_or_never().is_some());
538
539 assert!(request_2_a.wait_for_update().now_or_never().is_some());
540 assert!(request_2_b.wait_for_update().now_or_never().is_some());
541 }
542}