veecle_os_runtime/datastore/
initialized_reader.rs

1use core::cell::Ref;
2use core::marker::PhantomData;
3
4use crate::datastore::{Storable, slot};
5
6/// Reader for a [`Storable`] type.
7///
8/// Allows [`Actor`]s to read a value of a type written by another actor.
9/// The generic type `T` from the reader specifies the type of the value that is being read.
10///
11/// This reader can be requested directly as an actor input in simple cases, this will mean your actor does not start
12/// running until all `InitializedReader`s it takes have been initialized by their writers.
13/// If you need to do something more complex (e.g. you have interdependencies between actors so one must write an
14/// initial value earlier) then you can take a `Reader` and convert via [`Reader::wait_init`][super::Reader::wait_init]
15/// when ready.
16/// By ensuring the presence of a value for `T` has been written at least once, this reader avoids `Option` when
17/// reading.
18///
19/// # Example
20///
21/// ```rust
22/// # use std::fmt::Debug;
23/// #
24/// # use veecle_os_runtime::{Storable, Reader, InitializedReader};
25/// #
26/// # #[derive(Debug, Default, Storable)]
27/// # pub struct Foo;
28/// #
29/// #[veecle_os_runtime::actor]
30/// async fn foo_reader(mut reader: InitializedReader<'_, Foo>) -> std::convert::Infallible {
31///     loop {
32///         let processed_value = reader.wait_for_update().await.read(|value: &Foo| {
33///             // Do something with the value.
34///         });
35///     }
36/// }
37///
38/// #[veecle_os_runtime::actor]
39/// async fn foo_reader_complex(mut reader: Reader<'_, Foo>) -> std::convert::Infallible {
40///     // Do some initialization that must be completed before waiting for the reader to have an initial value.
41///     let mut reader = reader.wait_init().await;
42///     loop {
43///         let processed_value = reader.wait_for_update().await.read(|value: &Foo| {
44///             // Do something with the value.
45///         });
46///     }
47/// }
48/// ```
49///
50/// [`Actor`]: crate::actor::Actor
51#[derive(Debug)]
52pub struct InitializedReader<'a, T>
53where
54    T: Storable + 'static,
55{
56    waiter: slot::Waiter<'a, T>,
57
58    marker: PhantomData<fn(T)>,
59}
60
61impl<T> InitializedReader<'_, T>
62where
63    T: Storable + 'static,
64{
65    /// Reads the current value of a type.
66    ///
67    /// Can be combined with [`Self::wait_for_update`] to wait for the value to be updated before reading it.
68    ///
69    /// This method takes a closure to ensure the reference is not held across await points.
70    #[cfg_attr(feature = "veecle-telemetry", veecle_telemetry::instrument)]
71    pub fn read<U>(&self, f: impl FnOnce(&T::DataType) -> U) -> U {
72        self.waiter.read(|value| {
73            let value = value
74                .as_ref()
75                .expect("initialized reader should only access initialized values");
76
77            // TODO(DEV-532): add debug format
78            #[cfg(feature = "veecle-telemetry")]
79            veecle_telemetry::trace!("Slot read.", type_name = self.waiter.inner_type_name());
80            f(value)
81        })
82    }
83
84    /// Reads and clones the current value of a type.
85    ///
86    /// This is a wrapper around [`Self::read`] that additionally clones the value.
87    /// You can use it instead of `reader.read(|c| c.clone())`.
88    pub fn read_cloned(&self) -> T::DataType
89    where
90        T::DataType: Clone,
91    {
92        self.read(|t| t.clone())
93    }
94
95    /// Waits for any write to occur.
96    ///
97    /// This future resolving does not imply that `previous_value != new_value`, just that a
98    /// [`Writer`][super::Writer] has written a value of `T` since the last time this future resolved.
99    ///
100    /// This returns `&mut Self` to allow chaining a call to methods accessing the value, for example
101    /// [`read`][Self::read`].
102    #[cfg_attr(feature = "veecle-telemetry", veecle_telemetry::instrument)]
103    pub async fn wait_for_update(&mut self) -> &mut Self {
104        self.waiter.wait().await;
105        self.waiter.update_generation();
106        self
107    }
108}
109
110impl<'a, T> InitializedReader<'a, T>
111where
112    T: Storable + 'static,
113{
114    /// Creates a new `InitializedReader` from a [`Waiter`][slot::Waiter].
115    pub(crate) fn new(waiter: slot::Waiter<'a, T>) -> Self {
116        Self {
117            waiter,
118            marker: Default::default(),
119        }
120    }
121}
122
123impl<T> super::combined_readers::Sealed for InitializedReader<'_, T> where T: Storable {}
124
125impl<T> super::combined_readers::CombinableReader for InitializedReader<'_, T>
126where
127    T: Storable,
128{
129    type ToBeRead = T::DataType;
130
131    fn borrow(&self) -> Ref<'_, Self::ToBeRead> {
132        Ref::map(self.waiter.borrow(), |t| t.as_ref().unwrap())
133    }
134
135    async fn wait_for_update(&mut self) {
136        self.wait_for_update().await;
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use core::pin::pin;
143    use futures::FutureExt;
144
145    use crate::datastore::{Reader, Slot, Storable, Writer, generational};
146
147    #[test]
148    fn read() {
149        #[derive(Eq, PartialEq, Debug, Clone, Storable)]
150        #[storable(crate = crate)]
151        struct Sensor(u8);
152
153        let source = pin!(generational::Source::new());
154        let slot = pin!(Slot::<Sensor>::new());
155
156        let mut writer = Writer::new(source.as_ref().waiter(), slot.as_ref());
157        let reader = Reader::from_slot(slot.as_ref());
158
159        assert!(reader.wait_init().now_or_never().is_none());
160
161        source.as_ref().increment_generation();
162        writer.write(Sensor(5)).now_or_never().unwrap();
163
164        let reader = Reader::from_slot(slot.as_ref())
165            .wait_init()
166            .now_or_never()
167            .unwrap();
168
169        assert_eq!(reader.read(|x: &Sensor| x.clone()), Sensor(5));
170        assert_eq!(reader.read_cloned(), Sensor(5));
171    }
172
173    #[test]
174    fn wait_for_update() {
175        #[derive(Eq, PartialEq, Debug, Clone, Storable)]
176        #[storable(crate = crate)]
177        struct Sensor(u8);
178
179        let source = pin!(generational::Source::new());
180        let slot = pin!(Slot::<Sensor>::new());
181
182        let mut writer = Writer::new(source.as_ref().waiter(), slot.as_ref());
183        let reader = Reader::from_slot(slot.as_ref());
184
185        source.as_ref().increment_generation();
186        writer.write(Sensor(1)).now_or_never().unwrap();
187
188        let mut reader = reader.wait_init().now_or_never().unwrap();
189
190        assert!(reader.wait_for_update().now_or_never().is_some());
191        assert!(reader.wait_for_update().now_or_never().is_none());
192
193        source.as_ref().increment_generation();
194        writer.write(Sensor(1)).now_or_never().unwrap();
195
196        reader
197            .wait_for_update()
198            .now_or_never()
199            .unwrap()
200            .read(|x| assert_eq!(x, &Sensor(1)));
201    }
202
203    #[test]
204    fn wait_init_wait_for_update() {
205        #[derive(Eq, PartialEq, Debug, Clone, Storable)]
206        #[storable(crate = crate)]
207        struct Sensor(u8);
208
209        let source = pin!(generational::Source::new());
210        let slot = pin!(Slot::<Sensor>::new());
211
212        let mut writer = Writer::new(source.as_ref().waiter(), slot.as_ref());
213        let reader = Reader::from_slot(slot.as_ref());
214
215        let mut wait_init_fut = pin!(reader.wait_init());
216        assert!(wait_init_fut.as_mut().now_or_never().is_none());
217        // Increment generation to allow the writer to write.
218        source.as_ref().increment_generation();
219        writer.write(Sensor(1)).now_or_never().unwrap();
220
221        let mut reader = wait_init_fut.now_or_never().unwrap();
222
223        // If `wait_init` does not increment the waiter generation, `now_or_never` must return `Some`.
224        reader
225            .wait_for_update()
226            .now_or_never()
227            .unwrap()
228            .read(|x| assert_eq!(x, &Sensor(1)));
229    }
230}