veecle_os_runtime/datastore/
combined_readers.rs1use core::future::{Future, poll_fn};
2use core::pin::pin;
3use core::task::Poll;
4
5pub trait CombineReaders {
11 type ToBeRead<'b>;
13
14 fn read<U>(&self, f: impl FnOnce(Self::ToBeRead<'_>) -> U) -> U;
16
17 #[allow(async_fn_in_trait)]
23 async fn wait_for_update(&mut self) -> &mut Self;
24}
25
26pub(super) trait Sealed {}
27
28#[allow(private_bounds)]
29pub trait CombinableReader: Sealed {
31 type ToBeRead: 'static;
33
34 #[doc(hidden)]
38 fn borrow(&self) -> core::cell::Ref<'_, Self::ToBeRead>;
39
40 #[doc(hidden)]
46 #[allow(async_fn_in_trait)]
47 async fn wait_for_update(&mut self);
48}
49
50macro_rules! impl_combined_reader_helper {
52 (
53 tuples: [
54 $(($($generic_type:ident)*),)*
55 ],
56 ) => {
57 $(
58 impl<$($generic_type,)*> CombineReaders for ( $( &mut $generic_type, )* )
59 where
60 $($generic_type: CombinableReader,)*
61 {
62 type ToBeRead<'x> = (
63 $(&'x <$generic_type as CombinableReader>::ToBeRead,)*
64 );
65
66 #[allow(non_snake_case)]
67 #[cfg_attr(feature = "veecle-telemetry", veecle_telemetry::instrument)]
68 fn read<A>(&self, f: impl FnOnce(Self::ToBeRead<'_>) -> A) -> A {
69 let ($($generic_type,)*) = self;
70 let ($($generic_type,)*) = ($({
71 $generic_type.borrow()
72 },)*);
73 f(($(&*$generic_type,)*))
74 }
75
76 #[allow(non_snake_case)]
77 #[cfg_attr(feature = "veecle-telemetry", veecle_telemetry::instrument)]
78 async fn wait_for_update(&mut self) -> &mut Self {
79 {
80 let ($($generic_type,)*) = self;
81 let ($(mut $generic_type,)*) = ($(pin!($generic_type.wait_for_update()),)*);
82 poll_fn(move |cx| {
83 let mut update_available = false;
85 $(
86 if $generic_type.as_mut().poll(cx).is_ready() {
87 update_available = true;
88 }
89 )*
90 if update_available {
91 Poll::Ready(())
92 } else {
93 Poll::Pending
94 }
95 }).await;
96 }
97 self
98 }
99 }
100 )*
101 };
102}
103
104impl_combined_reader_helper!(
105 tuples: [
106 (T U),
108 (T U V),
109 (T U V W),
110 (T U V W X),
111 (T U V W X Y),
112 (T U V W X Y Z),
113 ],
114);
115
116#[cfg(test)]
117mod tests {
118 use core::pin::pin;
119 use futures::FutureExt;
120
121 use crate::datastore::{
122 CombineReaders, ExclusiveReader, Reader, Slot, Storable, Writer, generational,
123 };
124
125 #[test]
126 fn read_exclusive_reader() {
127 #[derive(Eq, PartialEq, Debug, Clone, Storable)]
128 #[storable(crate = crate)]
129 struct Sensor0(u8);
130 #[derive(Eq, PartialEq, Debug, Clone, Storable)]
131 #[storable(crate = crate)]
132 struct Sensor1(u8);
133
134 let slot0 = pin!(Slot::<Sensor0>::new());
135 let slot1 = pin!(Slot::<Sensor1>::new());
136
137 let mut reader0 = ExclusiveReader::from_slot(slot0.as_ref());
138 let mut reader1 = ExclusiveReader::from_slot(slot1.as_ref());
139
140 (&mut reader0, &mut reader1).read(|(a, b)| assert_eq!(a.is_none(), b.is_none()));
141 }
142
143 #[test]
144 fn wait_for_update_exclusive_reader() {
145 #[derive(Eq, PartialEq, Debug, Clone, Storable)]
146 #[storable(crate = crate)]
147 struct Sensor0(u8);
148 #[derive(Eq, PartialEq, Debug, Clone, Storable)]
149 #[storable(crate = crate)]
150 struct Sensor1(u8);
151
152 let source = pin!(generational::Source::new());
153 let slot0 = pin!(Slot::<Sensor0>::new());
154 let slot1 = pin!(Slot::<Sensor1>::new());
155
156 let mut writer0 = Writer::new(source.as_ref().waiter(), slot0.as_ref());
157 let mut writer1 = Writer::new(source.as_ref().waiter(), slot1.as_ref());
158 let mut reader0 = ExclusiveReader::from_slot(slot0.as_ref());
159 let mut reader1 = ExclusiveReader::from_slot(slot1.as_ref());
160
161 assert!(
162 (&mut reader0, &mut reader1)
163 .wait_for_update()
164 .now_or_never()
165 .is_none()
166 );
167
168 source.as_ref().increment_generation();
169 writer0.write(Sensor0(2)).now_or_never().unwrap();
170 writer1.write(Sensor1(2)).now_or_never().unwrap();
171
172 assert!(
173 (&mut reader0, &mut reader1)
174 .wait_for_update()
175 .now_or_never()
176 .is_some()
177 );
178 assert!(
179 (&mut reader0, &mut reader1)
180 .wait_for_update()
181 .now_or_never()
182 .is_none()
183 );
184 }
185
186 #[test]
187 fn read() {
188 #[derive(Eq, PartialEq, Debug, Clone, Storable)]
189 #[storable(crate = crate)]
190 struct Sensor0(u8);
191 #[derive(Eq, PartialEq, Debug, Clone, Storable)]
192 #[storable(crate = crate)]
193 struct Sensor1(u8);
194
195 let source = pin!(generational::Source::new());
196 let slot0 = pin!(Slot::<Sensor0>::new());
197 let slot1 = pin!(Slot::<Sensor1>::new());
198
199 let mut reader0 = Reader::from_slot(slot0.as_ref());
200 let mut reader1 = Reader::from_slot(slot1.as_ref());
201
202 (&mut reader0, &mut reader1).read(|(a, b)| assert_eq!(a.is_none(), b.is_none()));
203
204 let mut writer0 = Writer::new(source.as_ref().waiter(), slot0.as_ref());
205 let mut writer1 = Writer::new(source.as_ref().waiter(), slot1.as_ref());
206 source.as_ref().increment_generation();
207 writer0.write(Sensor0(2)).now_or_never().unwrap();
208 writer1.write(Sensor1(2)).now_or_never().unwrap();
209
210 let mut reader0 = reader0.wait_init().now_or_never().unwrap();
211 let mut reader1 = reader1.wait_init().now_or_never().unwrap();
212
213 (&mut reader0, &mut reader1).read(|(a, b)| assert_eq!(a.0, b.0));
214 }
215
216 #[test]
217 fn wait_for_update() {
218 #[derive(Eq, PartialEq, Debug, Clone, Storable)]
219 #[storable(crate = crate)]
220 struct Sensor0(u8);
221 #[derive(Eq, PartialEq, Debug, Clone, Storable)]
222 #[storable(crate = crate)]
223 struct Sensor1(u8);
224
225 let source = pin!(generational::Source::new());
226 let slot0 = pin!(Slot::<Sensor0>::new());
227 let slot1 = pin!(Slot::<Sensor1>::new());
228
229 let mut writer0 = Writer::new(source.as_ref().waiter(), slot0.as_ref());
230 let mut writer1 = Writer::new(source.as_ref().waiter(), slot1.as_ref());
231 let mut reader0 = Reader::from_slot(slot0.as_ref());
232 let mut reader1 = Reader::from_slot(slot1.as_ref());
233
234 assert!(
235 (&mut reader0, &mut reader1)
236 .wait_for_update()
237 .now_or_never()
238 .is_none()
239 );
240
241 source.as_ref().increment_generation();
242 writer0.write(Sensor0(2)).now_or_never().unwrap();
243 writer1.write(Sensor1(2)).now_or_never().unwrap();
244
245 assert!(
246 (&mut reader0, &mut reader1)
247 .wait_for_update()
248 .now_or_never()
249 .is_some()
250 );
251 assert!(
252 (&mut reader0, &mut reader1)
253 .wait_for_update()
254 .now_or_never()
255 .is_none()
256 );
257
258 let mut reader0 = reader0.wait_init().now_or_never().unwrap();
259 let mut reader1 = reader1.wait_init().now_or_never().unwrap();
260
261 assert!(
262 (&mut reader0, &mut reader1)
263 .wait_for_update()
264 .now_or_never()
265 .is_none()
266 );
267
268 source.as_ref().increment_generation();
269 writer0.write(Sensor0(3)).now_or_never().unwrap();
270 writer1.write(Sensor1(3)).now_or_never().unwrap();
271
272 (&mut reader0, &mut reader1)
273 .wait_for_update()
274 .now_or_never()
275 .unwrap()
276 .read(|(a, b)| assert_eq!(a.0, b.0));
277 assert!(
278 (&mut reader0, &mut reader1)
279 .wait_for_update()
280 .now_or_never()
281 .is_none()
282 );
283 }
284
285 #[test]
286 fn read_mixed() {
287 #[derive(Eq, PartialEq, Debug, Clone, Storable)]
288 #[storable(crate = crate)]
289 struct Sensor0(u8);
290 #[derive(Eq, PartialEq, Debug, Clone, Storable)]
291 #[storable(crate = crate)]
292 struct Sensor1(u8);
293
294 let source = pin!(generational::Source::new());
295 let slot0 = pin!(Slot::<Sensor0>::new());
296 let slot1 = pin!(Slot::<Sensor1>::new());
297
298 let mut reader0 = Reader::from_slot(slot0.as_ref());
299 let mut reader1 = ExclusiveReader::from_slot(slot1.as_ref());
300
301 (&mut reader0, &mut reader1).read(|(a, b)| assert_eq!(a.is_none(), b.is_none()));
302
303 let mut writer0 = Writer::new(source.as_ref().waiter(), slot0.as_ref());
304 let mut writer1 = Writer::new(source.as_ref().waiter(), slot1.as_ref());
305 source.as_ref().increment_generation();
306 writer0.write(Sensor0(2)).now_or_never().unwrap();
307 writer1.write(Sensor1(2)).now_or_never().unwrap();
308
309 let mut reader0 = reader0.wait_init().now_or_never().unwrap();
310
311 (&mut reader0, &mut reader1)
312 .read(|(a, b): (&Sensor0, &Option<Sensor1>)| assert_eq!(a.0, b.as_ref().unwrap().0));
313 }
314}