veecle_os_runtime/datastore/
combined_readers.rs

1use core::future::{Future, poll_fn};
2use core::pin::pin;
3use core::task::Poll;
4
5/// Allows combining (nearly) arbitrary amounts of [`Reader`]s, [`ExclusiveReader`]s or [`InitializedReader`]s.
6///
7/// [`ExclusiveReader`]: super::ExclusiveReader
8/// [`InitializedReader`]: super::InitializedReader
9/// [`Reader`]: super::Reader
10pub trait CombineReaders {
11    /// The (tuple) value that will be read from the combined readers.
12    type ToBeRead<'b>;
13
14    /// Reads a tuple of values from all combined readers in the provided function.
15    fn read<U>(&self, f: impl FnOnce(Self::ToBeRead<'_>) -> U) -> U;
16
17    /// Observes the combined readers for updates.
18    ///
19    /// Will return if **any** of the readers is updated.
20    ///
21    /// This returns `&mut Self` to allow chaining a call to [`read`][Self::read`].
22    #[allow(async_fn_in_trait)]
23    async fn wait_for_update(&mut self) -> &mut Self;
24}
25
26pub(super) trait Sealed {}
27
28#[allow(private_bounds)]
29/// A marker trait for types that can be used with [`CombineReaders`], see that for more details.
30pub trait CombinableReader: Sealed {
31    /// The (owned) type that this type reads, will be exposed as a reference in the [`CombineReaders::read`] callback.
32    type ToBeRead: 'static;
33
34    /// Internal implementation details.
35    ///
36    /// Borrows the value of the reader from the slot's internal [`RefCell`][core::cell::RefCell].
37    #[doc(hidden)]
38    fn borrow(&self) -> core::cell::Ref<'_, Self::ToBeRead>;
39
40    /// Internal implementation details.
41    ///
42    /// See [`Reader::wait_for_update`] for more.
43    ///
44    /// [`Reader::wait_for_update`]: super::Reader::wait_for_update
45    #[doc(hidden)]
46    #[allow(async_fn_in_trait)]
47    async fn wait_for_update(&mut self);
48}
49
50/// Implements [`CombineReaders`] for provided types for the various reader types.
51macro_rules! impl_combined_reader_helper {
52    (
53        tuples: [
54            $(($($generic_type:ident)*),)*
55        ],
56    ) => {
57        $(
58            impl<$($generic_type,)*> CombineReaders for ( $( &mut $generic_type, )* )
59            where
60                $($generic_type: CombinableReader,)*
61            {
62                type ToBeRead<'x> = (
63                    $(&'x <$generic_type as CombinableReader>::ToBeRead,)*
64                );
65
66                #[allow(non_snake_case)]
67                #[cfg_attr(feature = "veecle-telemetry", veecle_telemetry::instrument)]
68                fn read<A>(&self, f: impl FnOnce(Self::ToBeRead<'_>) -> A) -> A {
69                    let ($($generic_type,)*) = self;
70                    let ($($generic_type,)*) = ($({
71                        $generic_type.borrow()
72                    },)*);
73                    f(($(&*$generic_type,)*))
74                }
75
76                #[allow(non_snake_case)]
77                #[cfg_attr(feature = "veecle-telemetry", veecle_telemetry::instrument)]
78                async fn wait_for_update(&mut self) -> &mut Self {
79                    {
80                        let ($($generic_type,)*) = self;
81                        let ($(mut $generic_type,)*) = ($(pin!($generic_type.wait_for_update()),)*);
82                        poll_fn(move |cx| {
83                            // We check every reader to increment the generation for every reader.
84                            let mut update_available = false;
85                            $(
86                                if $generic_type.as_mut().poll(cx).is_ready() {
87                                    update_available = true;
88                                }
89                            )*
90                            if update_available {
91                                Poll::Ready(())
92                            } else {
93                                Poll::Pending
94                            }
95                        }).await;
96                    }
97                    self
98                }
99            }
100        )*
101    };
102}
103
104impl_combined_reader_helper!(
105    tuples: [
106        // We don't implement this for a tuple with only one type, as that is just a reader.
107        (T U),
108        (T U V),
109        (T U V W),
110        (T U V W X),
111        (T U V W X Y),
112        (T U V W X Y Z),
113    ],
114);
115
116#[cfg(test)]
117mod tests {
118    use core::pin::pin;
119    use futures::FutureExt;
120
121    use crate::datastore::{
122        CombineReaders, ExclusiveReader, Reader, Slot, Storable, Writer, generational,
123    };
124
125    #[test]
126    fn read_exclusive_reader() {
127        #[derive(Eq, PartialEq, Debug, Clone, Storable)]
128        #[storable(crate = crate)]
129        struct Sensor0(u8);
130        #[derive(Eq, PartialEq, Debug, Clone, Storable)]
131        #[storable(crate = crate)]
132        struct Sensor1(u8);
133
134        let slot0 = pin!(Slot::<Sensor0>::new());
135        let slot1 = pin!(Slot::<Sensor1>::new());
136
137        let mut reader0 = ExclusiveReader::from_slot(slot0.as_ref());
138        let mut reader1 = ExclusiveReader::from_slot(slot1.as_ref());
139
140        (&mut reader0, &mut reader1).read(|(a, b)| assert_eq!(a.is_none(), b.is_none()));
141    }
142
143    #[test]
144    fn wait_for_update_exclusive_reader() {
145        #[derive(Eq, PartialEq, Debug, Clone, Storable)]
146        #[storable(crate = crate)]
147        struct Sensor0(u8);
148        #[derive(Eq, PartialEq, Debug, Clone, Storable)]
149        #[storable(crate = crate)]
150        struct Sensor1(u8);
151
152        let source = pin!(generational::Source::new());
153        let slot0 = pin!(Slot::<Sensor0>::new());
154        let slot1 = pin!(Slot::<Sensor1>::new());
155
156        let mut writer0 = Writer::new(source.as_ref().waiter(), slot0.as_ref());
157        let mut writer1 = Writer::new(source.as_ref().waiter(), slot1.as_ref());
158        let mut reader0 = ExclusiveReader::from_slot(slot0.as_ref());
159        let mut reader1 = ExclusiveReader::from_slot(slot1.as_ref());
160
161        assert!(
162            (&mut reader0, &mut reader1)
163                .wait_for_update()
164                .now_or_never()
165                .is_none()
166        );
167
168        source.as_ref().increment_generation();
169        writer0.write(Sensor0(2)).now_or_never().unwrap();
170        writer1.write(Sensor1(2)).now_or_never().unwrap();
171
172        assert!(
173            (&mut reader0, &mut reader1)
174                .wait_for_update()
175                .now_or_never()
176                .is_some()
177        );
178        assert!(
179            (&mut reader0, &mut reader1)
180                .wait_for_update()
181                .now_or_never()
182                .is_none()
183        );
184    }
185
186    #[test]
187    fn read() {
188        #[derive(Eq, PartialEq, Debug, Clone, Storable)]
189        #[storable(crate = crate)]
190        struct Sensor0(u8);
191        #[derive(Eq, PartialEq, Debug, Clone, Storable)]
192        #[storable(crate = crate)]
193        struct Sensor1(u8);
194
195        let source = pin!(generational::Source::new());
196        let slot0 = pin!(Slot::<Sensor0>::new());
197        let slot1 = pin!(Slot::<Sensor1>::new());
198
199        let mut reader0 = Reader::from_slot(slot0.as_ref());
200        let mut reader1 = Reader::from_slot(slot1.as_ref());
201
202        (&mut reader0, &mut reader1).read(|(a, b)| assert_eq!(a.is_none(), b.is_none()));
203
204        let mut writer0 = Writer::new(source.as_ref().waiter(), slot0.as_ref());
205        let mut writer1 = Writer::new(source.as_ref().waiter(), slot1.as_ref());
206        source.as_ref().increment_generation();
207        writer0.write(Sensor0(2)).now_or_never().unwrap();
208        writer1.write(Sensor1(2)).now_or_never().unwrap();
209
210        let mut reader0 = reader0.wait_init().now_or_never().unwrap();
211        let mut reader1 = reader1.wait_init().now_or_never().unwrap();
212
213        (&mut reader0, &mut reader1).read(|(a, b)| assert_eq!(a.0, b.0));
214    }
215
216    #[test]
217    fn wait_for_update() {
218        #[derive(Eq, PartialEq, Debug, Clone, Storable)]
219        #[storable(crate = crate)]
220        struct Sensor0(u8);
221        #[derive(Eq, PartialEq, Debug, Clone, Storable)]
222        #[storable(crate = crate)]
223        struct Sensor1(u8);
224
225        let source = pin!(generational::Source::new());
226        let slot0 = pin!(Slot::<Sensor0>::new());
227        let slot1 = pin!(Slot::<Sensor1>::new());
228
229        let mut writer0 = Writer::new(source.as_ref().waiter(), slot0.as_ref());
230        let mut writer1 = Writer::new(source.as_ref().waiter(), slot1.as_ref());
231        let mut reader0 = Reader::from_slot(slot0.as_ref());
232        let mut reader1 = Reader::from_slot(slot1.as_ref());
233
234        assert!(
235            (&mut reader0, &mut reader1)
236                .wait_for_update()
237                .now_or_never()
238                .is_none()
239        );
240
241        source.as_ref().increment_generation();
242        writer0.write(Sensor0(2)).now_or_never().unwrap();
243        writer1.write(Sensor1(2)).now_or_never().unwrap();
244
245        assert!(
246            (&mut reader0, &mut reader1)
247                .wait_for_update()
248                .now_or_never()
249                .is_some()
250        );
251        assert!(
252            (&mut reader0, &mut reader1)
253                .wait_for_update()
254                .now_or_never()
255                .is_none()
256        );
257
258        let mut reader0 = reader0.wait_init().now_or_never().unwrap();
259        let mut reader1 = reader1.wait_init().now_or_never().unwrap();
260
261        assert!(
262            (&mut reader0, &mut reader1)
263                .wait_for_update()
264                .now_or_never()
265                .is_none()
266        );
267
268        source.as_ref().increment_generation();
269        writer0.write(Sensor0(3)).now_or_never().unwrap();
270        writer1.write(Sensor1(3)).now_or_never().unwrap();
271
272        (&mut reader0, &mut reader1)
273            .wait_for_update()
274            .now_or_never()
275            .unwrap()
276            .read(|(a, b)| assert_eq!(a.0, b.0));
277        assert!(
278            (&mut reader0, &mut reader1)
279                .wait_for_update()
280                .now_or_never()
281                .is_none()
282        );
283    }
284
285    #[test]
286    fn read_mixed() {
287        #[derive(Eq, PartialEq, Debug, Clone, Storable)]
288        #[storable(crate = crate)]
289        struct Sensor0(u8);
290        #[derive(Eq, PartialEq, Debug, Clone, Storable)]
291        #[storable(crate = crate)]
292        struct Sensor1(u8);
293
294        let source = pin!(generational::Source::new());
295        let slot0 = pin!(Slot::<Sensor0>::new());
296        let slot1 = pin!(Slot::<Sensor1>::new());
297
298        let mut reader0 = Reader::from_slot(slot0.as_ref());
299        let mut reader1 = ExclusiveReader::from_slot(slot1.as_ref());
300
301        (&mut reader0, &mut reader1).read(|(a, b)| assert_eq!(a.is_none(), b.is_none()));
302
303        let mut writer0 = Writer::new(source.as_ref().waiter(), slot0.as_ref());
304        let mut writer1 = Writer::new(source.as_ref().waiter(), slot1.as_ref());
305        source.as_ref().increment_generation();
306        writer0.write(Sensor0(2)).now_or_never().unwrap();
307        writer1.write(Sensor1(2)).now_or_never().unwrap();
308
309        let mut reader0 = reader0.wait_init().now_or_never().unwrap();
310
311        (&mut reader0, &mut reader1)
312            .read(|(a, b): (&Sensor0, &Option<Sensor1>)| assert_eq!(a.0, b.as_ref().unwrap().0));
313    }
314}