veecle_os_runtime/datastore/
writer.rs

1use core::fmt::Debug;
2use core::marker::PhantomData;
3use core::pin::Pin;
4
5use super::slot::Slot;
6use super::{Storable, generational};
7
8/// Writer for a [`Storable`] type.
9///
10/// Allows [`Actor`]s to write a particular type read by another actor.
11/// The generic type `T` from the writer specifies the type of the value that is being written.
12///
13/// # Usage
14///
15/// All [`Reader`]s are guaranteed to be able to observe every write.
16/// For this reason, [`Writer::write`] is an async method.
17/// It will resolve once all [`Actor`]s awaiting a [`Reader`] for the same type had the chance to read the value.
18/// Typically, this only occurs when trying to write two values back to back.
19/// If all [`Reader`]s already had the chance to read the value, [`Writer::write`] will resolve immediately.
20/// The same is true for [`Writer::modify`].
21///
22/// # Examples
23///
24/// ```rust
25/// // Writing a value.
26/// # use std::fmt::Debug;
27/// #
28/// # use veecle_os_runtime::{Storable, Writer};
29/// #
30/// # #[derive(Debug, Default, Storable)]
31/// # pub struct Foo;
32/// #
33/// #[veecle_os_runtime::actor]
34/// async fn foo_writer(mut writer: Writer<'_, Foo>) -> std::convert::Infallible {
35///     loop {
36///         // This call will yield to any readers needing to read the last value.
37///         writer.write(Foo::default()).await;
38///     }
39/// }
40/// ```
41///
42/// ```rust
43/// // Modifying a value.
44/// # use std::fmt::Debug;
45/// #
46/// # use veecle_os_runtime::{Storable, Writer};
47/// #
48/// # #[derive(Debug, Default, Storable)]
49/// # pub struct Foo;
50/// #
51/// #[veecle_os_runtime::actor]
52/// async fn foo_writer(
53///     mut writer: Writer<'_, Foo>,
54/// ) -> std::convert::Infallible {
55///     loop {
56///         // This call will yield to any readers needing to read the last value.
57///         // The closure will run after yielding and right before continuing to the rest of the function.
58///         writer.modify(|previous_value: &mut Option<Foo>| {
59///             // mutate the previous value
60///         }).await;
61///     }
62/// }
63/// ```
64///
65/// [`Writer::ready`] allows separating the "waiting" from the "writing",
66/// After [`Writer::ready`] returns, the next write or modification will happen immediately.
67///
68/// ```rust
69/// # use std::fmt::Debug;
70/// #
71/// # use veecle_os_runtime::{Storable, Reader, Writer};
72/// #
73/// # #[derive(Debug, Default, Storable)]
74/// # pub struct Foo;
75/// #
76/// #[veecle_os_runtime::actor]
77/// async fn foo_writer(mut writer: Writer<'_, Foo>) -> std::convert::Infallible {
78///     loop {
79///         // This call may yield to any readers needing to read the last value.
80///         writer.ready().await;
81///
82///         // This call will return immediately.
83///         writer.write(Foo::default()).await;
84///         // This call will yield to any readers needing to read the last value.
85///         writer.write(Foo::default()).await;
86///     }
87/// }
88/// ```
89///
90/// [`Actor`]: crate::Actor
91/// [`Reader`]: crate::Reader
92#[derive(Debug)]
93pub struct Writer<'a, T>
94where
95    T: Storable + 'static,
96{
97    slot: Pin<&'a Slot<T>>,
98    waiter: generational::Waiter<'a>,
99    marker: PhantomData<fn(T)>,
100}
101
102impl<T> Writer<'_, T>
103where
104    T: Storable + 'static,
105{
106    /// Writes a new value and notifies readers.
107    #[cfg_attr(feature = "veecle-telemetry", veecle_telemetry::instrument)]
108    pub async fn write(&mut self, item: T::DataType) {
109        self.modify(|slot| {
110            let _ = slot.insert(item);
111        })
112        .await;
113    }
114
115    /// Waits for the writer to be ready to perform a write operation.
116    ///
117    /// After awaiting this method, the next call to [`Writer::write()`]
118    /// or [`Writer::modify()`] is guaranteed to resolve immediately.
119    pub async fn ready(&mut self) {
120        let _ = self.waiter.wait().await;
121    }
122
123    /// Updates the value in-place and notifies readers.
124    #[cfg_attr(feature = "veecle-telemetry", veecle_telemetry::instrument)]
125    pub async fn modify(&mut self, f: impl FnOnce(&mut Option<T::DataType>)) {
126        self.ready().await;
127        self.waiter.update_generation();
128
129        #[cfg(feature = "veecle-telemetry")]
130        let type_name = self.slot.inner_type_name();
131
132        self.slot.modify(|value| {
133            f(value);
134
135            // TODO(DEV-532): add debug format
136            #[cfg(feature = "veecle-telemetry")]
137            veecle_telemetry::trace!("Type update.", type_name);
138        });
139        self.slot.increment_generation();
140    }
141
142    /// Reads the current value of a type.
143    ///
144    /// This method takes a closure to ensure the reference is not held across await points.
145    #[cfg_attr(feature = "veecle-telemetry", veecle_telemetry::instrument)]
146    pub fn read<U>(&self, f: impl FnOnce(Option<&T::DataType>) -> U) -> U {
147        #[cfg(feature = "veecle-telemetry")]
148        let type_name = self.slot.inner_type_name();
149        self.slot.read(|value| {
150            let value = value.as_ref();
151            // TODO(DEV-532): add debug format
152            #[cfg(feature = "veecle-telemetry")]
153            veecle_telemetry::trace!("Slot read.", type_name);
154            f(value)
155        })
156    }
157}
158
159impl<'a, T> Writer<'a, T>
160where
161    T: Storable + 'static,
162{
163    pub(crate) fn new(waiter: generational::Waiter<'a>, slot: Pin<&'a Slot<T>>) -> Self {
164        slot.take_writer();
165        Self {
166            slot,
167            waiter,
168            marker: PhantomData,
169        }
170    }
171}
172
173#[cfg(test)]
174mod tests {
175    use crate::datastore::{Slot, Storable, Writer, generational};
176    use core::pin::pin;
177
178    #[test]
179    fn ready_waits_for_increment() {
180        use futures::FutureExt;
181        #[derive(Debug)]
182        pub struct Data();
183        impl Storable for Data {
184            type DataType = Self;
185        }
186
187        let source = pin!(generational::Source::new());
188        let slot = pin!(Slot::<Data>::new());
189        let mut writer = Writer::new(source.as_ref().waiter(), slot.as_ref());
190
191        // Part 1. Initially, the writer is not ready. Calls to
192        // ready() will not resolve immediately in a single Future::poll() call,
193        // indicating that the writer needs more time. Additionally we check that
194        // calls to write() are also not resolving immediately, demonstrating that
195        // ready() actually was correct.
196        assert!(writer.ready().now_or_never().is_none());
197        assert!(writer.write(Data {}).now_or_never().is_none());
198
199        // Part 2. Increment the generation, which signals that the writer
200        // should be ready again. After the increment, ready() and write()
201        // are expected to resolve in a single Future::poll() call.
202        source.as_ref().increment_generation();
203        assert!(writer.ready().now_or_never().is_some());
204        assert!(writer.write(Data {}).now_or_never().is_some());
205
206        // Part 3. Trying to write again before the generation increments should be blocked.
207        assert!(writer.ready().now_or_never().is_none());
208        assert!(writer.write(Data {}).now_or_never().is_none());
209    }
210
211    #[test]
212    fn read_reads_latest_written_value() {
213        use futures::FutureExt;
214        #[derive(Debug, Clone, Copy, PartialEq, Eq)]
215        pub struct Data(usize);
216        impl Storable for Data {
217            type DataType = Self;
218        }
219
220        let source = pin!(generational::Source::new());
221        let slot = pin!(Slot::<Data>::new());
222        let mut writer = Writer::new(source.as_ref().waiter(), slot.as_ref());
223
224        writer.read(|current_data| assert!(current_data.is_none()));
225
226        source.as_ref().increment_generation();
227
228        let want = Data(1);
229        writer.write(want).now_or_never().unwrap();
230        writer.read(|got| assert_eq!(got, Some(&want)));
231
232        source.as_ref().increment_generation();
233
234        let want = Data(2);
235        writer.write(want).now_or_never().unwrap();
236        writer.read(|got| assert_eq!(got, Some(&want)));
237    }
238}