veecle_os_runtime/datastore/
exclusive_reader.rs

1use core::cell::Ref;
2use core::fmt::Debug;
3use core::marker::PhantomData;
4use core::pin::Pin;
5
6use crate::datastore::Storable;
7use crate::datastore::slot::{self, Slot};
8
9/// Exclusive reader for a [`Storable`] type.
10///
11/// By being the sole reader for a [`Storable`] type, this reader can move the read value out.
12/// The generic type `T` from the reader specifies the type of the value that is being read.
13///
14/// The reader allows reading the current value.
15/// If no value for type `T` has been written yet, [`ExclusiveReader::read`] and
16/// [`ExclusiveReader::take`] will return `None`.
17///
18/// # Usage
19///
20/// [`ExclusiveReader::wait_for_update`] allows waiting until the type is written to.
21/// It will return immediately if an unseen value is available.
22/// Unseen does not imply the value actually changed, just that an [`Actor`] has written a value.
23/// A write of the same value still triggers [`ExclusiveReader::wait_for_update`] to resolve.
24///
25/// To illustrate:
26/// ```text
27/// - Writer writes 5
28/// - Reader is woken and reads 5.
29///   Reader waits for updates.
30/// ...
31/// - Writer writes 5 once again.
32/// - Reader is woken and reads 5.
33/// ...
34/// ```
35///
36/// The reader is woken, even if the new value equals the old one. The [`ExclusiveReader`] is only aware of the act of
37/// writing.
38///
39/// # Example
40///
41/// ```rust
42/// # use std::fmt::Debug;
43/// #
44/// # use veecle_os_runtime::{Storable,  ExclusiveReader};
45/// #
46/// # #[derive(Debug, Default, Storable)]
47/// # pub struct Foo;
48/// #
49/// #[veecle_os_runtime::actor]
50/// async fn foo_reader(mut reader: ExclusiveReader<'_, Foo>) -> std::convert::Infallible {
51///     loop {
52///         let value = reader.wait_for_update().await.take();
53///     }
54/// }
55/// ```
56///
57/// [`Actor`]: crate::actor::Actor
58#[derive(Debug)]
59pub struct ExclusiveReader<'a, T>
60where
61    T: Storable + 'static,
62{
63    waiter: slot::Waiter<'a, T>,
64
65    marker: PhantomData<fn(T)>,
66}
67
68impl<T> ExclusiveReader<'_, T>
69where
70    T: Storable + 'static,
71{
72    /// Reads the current value of a type.
73    ///
74    /// Can be combined with [`Self::wait_for_update`] to wait for the value to be updated before reading it.
75    ///
76    /// This method takes a closure to ensure the reference is not held across await points.
77    #[cfg_attr(feature = "veecle-telemetry", veecle_telemetry::instrument)]
78    pub fn read<U>(&self, f: impl FnOnce(Option<&T::DataType>) -> U) -> U {
79        self.waiter.read(|value| {
80            let value = value.as_ref();
81
82            // TODO(DEV-532): add debug format
83            #[cfg(feature = "veecle-telemetry")]
84            veecle_telemetry::trace!("Slot read.", type_name = self.waiter.inner_type_name());
85            f(value)
86        })
87    }
88
89    /// Takes the current value of the type, leaving behind `None`.
90    #[cfg_attr(feature = "veecle-telemetry", veecle_telemetry::instrument)]
91    pub fn take(&mut self) -> Option<T::DataType> {
92        let value = self.waiter.take();
93
94        // TODO(DEV-532): add debug format
95        #[cfg(feature = "veecle-telemetry")]
96        veecle_telemetry::trace!(
97            "Slot value taken.",
98            type_name = self.waiter.inner_type_name()
99        );
100
101        value
102    }
103
104    /// Reads and clones the current value.
105    ///
106    /// This is a wrapper around [`Self::read`] that additionally clones the value.
107    /// You can use it instead of `reader.read(|c| c.clone())`.
108    pub fn read_cloned(&self) -> Option<T::DataType>
109    where
110        T::DataType: Clone,
111    {
112        self.read(|t| t.cloned())
113    }
114
115    /// Waits for any write to occur.
116    ///
117    /// This future resolving does not imply that `previous_value != new_value`, just that a
118    /// [`Writer`][super::Writer] has written a value of `T` since the last time this future resolved.
119    ///
120    /// This returns `&mut Self` to allow chaining a call to methods accessing the value, for example
121    /// [`read`][Self::read`].
122    #[cfg_attr(feature = "veecle-telemetry", veecle_telemetry::instrument)]
123    pub async fn wait_for_update(&mut self) -> &mut Self {
124        self.waiter.wait().await;
125        self.waiter.update_generation();
126        self
127    }
128}
129
130impl<'a, T> ExclusiveReader<'a, T>
131where
132    T: Storable + 'static,
133{
134    /// Creates a new `ExclusiveReader` from a `slot`.
135    pub(crate) fn from_slot(slot: Pin<&'a Slot<T>>) -> Self {
136        ExclusiveReader {
137            waiter: slot.waiter(),
138            marker: PhantomData,
139        }
140    }
141}
142
143impl<T> super::combined_readers::Sealed for ExclusiveReader<'_, T> where T: Storable {}
144
145impl<T> super::combined_readers::CombinableReader for ExclusiveReader<'_, T>
146where
147    T: Storable,
148{
149    type ToBeRead = Option<T::DataType>;
150
151    fn borrow(&self) -> Ref<'_, Self::ToBeRead> {
152        self.waiter.borrow()
153    }
154
155    async fn wait_for_update(&mut self) {
156        self.wait_for_update().await;
157    }
158}
159
160#[cfg(test)]
161mod tests {
162    use core::pin::pin;
163    use futures::FutureExt;
164
165    use crate::datastore::{ExclusiveReader, Slot, Storable, Writer, generational};
166
167    #[test]
168    fn read() {
169        #[derive(Eq, PartialEq, Debug, Clone, Storable)]
170        #[storable(crate = crate)]
171        struct Sensor(u8);
172
173        let source = pin!(generational::Source::new());
174        let slot = pin!(Slot::<Sensor>::new());
175
176        let reader = ExclusiveReader::from_slot(slot.as_ref());
177        let mut writer = Writer::new(source.as_ref().waiter(), slot.as_ref());
178
179        assert_eq!(reader.read(|x| x.cloned()), None);
180        assert_eq!(reader.read_cloned(), None);
181
182        source.as_ref().increment_generation();
183        writer.write(Sensor(1)).now_or_never().unwrap();
184
185        assert_eq!(
186            reader.read(|x: Option<&Sensor>| x.cloned()),
187            Some(Sensor(1))
188        );
189        assert_eq!(reader.read_cloned(), Some(Sensor(1)));
190    }
191
192    #[test]
193    fn take() {
194        #[derive(Eq, PartialEq, Debug, Clone, Storable)]
195        #[storable(crate = crate)]
196        struct Sensor(u8);
197
198        let source = pin!(generational::Source::new());
199        let slot = pin!(Slot::<Sensor>::new());
200
201        let mut reader = ExclusiveReader::from_slot(slot.as_ref());
202        let mut writer = Writer::new(source.as_ref().waiter(), slot.as_ref());
203
204        assert_eq!(reader.take(), None);
205        source.as_ref().increment_generation();
206        writer.write(Sensor(10)).now_or_never().unwrap();
207        assert_eq!(reader.take(), Some(Sensor(10)));
208        assert_eq!(reader.take(), None);
209    }
210
211    #[test]
212    fn wait_for_update() {
213        #[derive(Eq, PartialEq, Debug, Clone, Storable)]
214        #[storable(crate = crate)]
215        struct Sensor(u8);
216
217        let source = pin!(generational::Source::new());
218        let slot = pin!(Slot::<Sensor>::new());
219
220        let mut reader = ExclusiveReader::from_slot(slot.as_ref());
221        let mut writer = Writer::new(source.as_ref().waiter(), slot.as_ref());
222
223        assert!(reader.wait_for_update().now_or_never().is_none());
224
225        source.as_ref().increment_generation();
226        writer.write(Sensor(1)).now_or_never().unwrap();
227
228        reader
229            .wait_for_update()
230            .now_or_never()
231            .unwrap()
232            .read(|x| assert_eq!(x, Some(&Sensor(1))));
233    }
234}