veecle_os_runtime/datastore/reader.rs
1use core::cell::Ref;
2use core::fmt::Debug;
3use core::marker::PhantomData;
4use core::pin::{Pin, pin};
5
6use pin_project::pin_project;
7
8use crate::datastore::Storable;
9use crate::datastore::initialized_reader::InitializedReader;
10use crate::datastore::slot::{self, Slot};
11
12/// Reader for a [`Storable`] type.
13///
14/// Allows [`Actor`]s to read a value of a type written by another actor.
15/// The generic type `T` from the reader specifies the type of the value that is being read.
16///
17/// The reader allows reading the current value.
18/// If no value for type `T` has been written to yet, [`Reader::read`] will return `None`.
19/// See [`Self::wait_init`] for creating a reader that ensures available values for `T`.
20///
21/// # Usage
22///
23/// [`Reader::wait_for_update`] allows waiting until the type is written to.
24/// It will return immediately if an unseen value is available.
25/// Unseen does not imply the value actually changed, just that an [`Actor`] has written a value.
26/// A write of the same value still triggers [`Reader::wait_for_update`] to resolve.
27///
28/// To illustrate:
29/// ```text
30/// - Writer writes 5
31/// - Reader is woken and reads 5.
32/// Reader waits for updates.
33/// ...
34/// - Writer writes 5 once again.
35/// - Reader is woken and reads 5.
36/// ...
37/// ```
38///
39/// The reader is woken, even if the new value equals the old one. The [`Reader`] is only aware of the act of writing.
40///
41/// # Example
42///
43/// ```rust
44/// # use std::fmt::Debug;
45/// #
46/// # use veecle_os_runtime::{Storable, Reader};
47/// #
48/// # #[derive(Debug, Default, Storable)]
49/// # pub struct Foo;
50/// #
51/// #[veecle_os_runtime::actor]
52/// async fn foo_reader(mut reader: Reader<'_, Foo>) -> std::convert::Infallible {
53/// loop {
54/// let processed_value = reader.wait_for_update().await.read(|value: Option<&Foo>| {
55/// // do something with the value.
56/// });
57/// }
58/// }
59/// ```
60///
61/// [`Actor`]: crate::actor::Actor
62#[derive(Debug)]
63#[pin_project]
64pub struct Reader<'a, T>
65where
66 T: Storable + 'static,
67{
68 #[pin]
69 waiter: slot::Waiter<'a, T>,
70
71 marker: PhantomData<fn(T)>,
72}
73
74impl<T> Reader<'_, T>
75where
76 T: Storable + 'static,
77{
78 /// Reads the current value of a type.
79 ///
80 /// Can be combined with [`Self::wait_for_update`] to wait for the value to be updated before reading it.
81 ///
82 /// This method takes a closure to ensure the reference is not held across await points.
83 #[cfg_attr(feature = "veecle-telemetry", veecle_telemetry::instrument)]
84 pub fn read<U>(&self, f: impl FnOnce(Option<&T::DataType>) -> U) -> U {
85 self.waiter.read(|value| {
86 let value = value.as_ref();
87
88 // TODO(DEV-532): add debug format
89 #[cfg(feature = "veecle-telemetry")]
90 veecle_telemetry::trace!("Slot read.", type_name = self.waiter.inner_type_name());
91 f(value)
92 })
93 }
94
95 /// Reads and clones the current value.
96 ///
97 /// This is a wrapper around [`Self::read`] that additionally clones the value.
98 /// You can use it instead of `reader.read(|c| c.clone())`.
99 pub fn read_cloned(&self) -> Option<T::DataType>
100 where
101 T::DataType: Clone,
102 {
103 self.read(|t| t.cloned())
104 }
105
106 /// Waits for any write to occur.
107 ///
108 /// This future resolving does not imply that `previous_value != new_value`, just that a
109 /// [`Writer`][super::Writer] has written a value of `T` since the last time this future resolved.
110 ///
111 /// This returns `&mut Self` to allow chaining a call to methods accessing the value, for example
112 /// [`read`][Self::read`].
113 #[cfg_attr(feature = "veecle-telemetry", veecle_telemetry::instrument)]
114 pub async fn wait_for_update(&mut self) -> &mut Self {
115 self.waiter.wait().await;
116 self.waiter.update_generation();
117 self
118 }
119}
120
121impl<'a, T> Reader<'a, T>
122where
123 T: Storable + 'static,
124{
125 /// Creates a new `Reader` from a `slot`.
126 pub(crate) fn from_slot(slot: Pin<&'a Slot<T>>) -> Self {
127 Reader {
128 waiter: slot.waiter(),
129 marker: PhantomData,
130 }
131 }
132
133 /// Converts the `Reader` into an [`InitializedReader`].
134 ///
135 /// Pends until a value for `T` is available or resolves immediately if a value is already available.
136 /// This will not mark the value as seen, [`InitializedReader::wait_for_update`] is unaffected by this method.
137 pub async fn wait_init(self) -> InitializedReader<'a, T> {
138 if self.read(|t| t.is_none()) {
139 self.waiter.wait().await;
140 }
141 InitializedReader::new(self.waiter)
142 }
143}
144
145impl<T> super::combined_readers::Sealed for Reader<'_, T> where T: Storable {}
146
147impl<T> super::combined_readers::CombinableReader for Reader<'_, T>
148where
149 T: Storable,
150{
151 type ToBeRead = Option<T::DataType>;
152
153 fn borrow(&self) -> Ref<'_, Self::ToBeRead> {
154 self.waiter.borrow()
155 }
156
157 async fn wait_for_update(&mut self) {
158 self.wait_for_update().await;
159 }
160}
161
162#[cfg(test)]
163mod tests {
164 use core::pin::pin;
165 use futures::FutureExt;
166
167 use crate::datastore::{Reader, Slot, Storable, Writer, generational};
168
169 #[test]
170 fn wait_for_update() {
171 #[derive(Eq, PartialEq, Debug, Clone, Storable)]
172 #[storable(crate = crate)]
173 struct Sensor(u8);
174
175 let source = pin!(generational::Source::new());
176 let slot = pin!(Slot::<Sensor>::new());
177
178 let mut reader = Reader::from_slot(slot.as_ref());
179 let mut writer = Writer::new(source.as_ref().waiter(), slot.as_ref());
180
181 assert!(reader.wait_for_update().now_or_never().is_none());
182
183 source.as_ref().increment_generation();
184 writer.write(Sensor(1)).now_or_never().unwrap();
185
186 reader
187 .wait_for_update()
188 .now_or_never()
189 .unwrap()
190 .read(|x| assert_eq!(x, Some(&Sensor(1))));
191 }
192}