veecle_os_runtime/datastore/
exclusive_reader.rs1use core::cell::Ref;
2use core::fmt::Debug;
3use core::marker::PhantomData;
4use core::pin::Pin;
5
6use crate::datastore::Storable;
7use crate::datastore::slot::{self, Slot};
8
9#[derive(Debug)]
59pub struct ExclusiveReader<'a, T>
60where
61 T: Storable + 'static,
62{
63 waiter: slot::Waiter<'a, T>,
64
65 marker: PhantomData<fn(T)>,
66}
67
68impl<T> ExclusiveReader<'_, T>
69where
70 T: Storable + 'static,
71{
72 #[cfg_attr(feature = "veecle-telemetry", veecle_telemetry::instrument)]
78 pub fn read<U>(&self, f: impl FnOnce(Option<&T::DataType>) -> U) -> U {
79 self.waiter.read(|value| {
80 let value = value.as_ref();
81
82 #[cfg(feature = "veecle-telemetry")]
84 veecle_telemetry::trace!("Slot read.", type_name = self.waiter.inner_type_name());
85 f(value)
86 })
87 }
88
89 #[cfg_attr(feature = "veecle-telemetry", veecle_telemetry::instrument)]
91 pub fn take(&mut self) -> Option<T::DataType> {
92 let value = self.waiter.take();
93
94 #[cfg(feature = "veecle-telemetry")]
96 veecle_telemetry::trace!(
97 "Slot value taken.",
98 type_name = self.waiter.inner_type_name()
99 );
100
101 value
102 }
103
104 pub fn read_cloned(&self) -> Option<T::DataType>
109 where
110 T::DataType: Clone,
111 {
112 self.read(|t| t.cloned())
113 }
114
115 #[cfg_attr(feature = "veecle-telemetry", veecle_telemetry::instrument)]
123 pub async fn wait_for_update(&mut self) -> &mut Self {
124 self.waiter.wait().await;
125 self.waiter.update_generation();
126 self
127 }
128}
129
130impl<'a, T> ExclusiveReader<'a, T>
131where
132 T: Storable + 'static,
133{
134 pub(crate) fn from_slot(slot: Pin<&'a Slot<T>>) -> Self {
136 ExclusiveReader {
137 waiter: slot.waiter(),
138 marker: PhantomData,
139 }
140 }
141}
142
143impl<T> super::combined_readers::Sealed for ExclusiveReader<'_, T> where T: Storable {}
144
145impl<T> super::combined_readers::CombinableReader for ExclusiveReader<'_, T>
146where
147 T: Storable,
148{
149 type ToBeRead = Option<T::DataType>;
150
151 fn borrow(&self) -> Ref<'_, Self::ToBeRead> {
152 self.waiter.borrow()
153 }
154
155 async fn wait_for_update(&mut self) {
156 self.wait_for_update().await;
157 }
158}
159
160#[cfg(test)]
161mod tests {
162 use core::pin::pin;
163 use futures::FutureExt;
164
165 use crate::datastore::{ExclusiveReader, Slot, Storable, Writer, generational};
166
167 #[test]
168 fn read() {
169 #[derive(Eq, PartialEq, Debug, Clone, Storable)]
170 #[storable(crate = crate)]
171 struct Sensor(u8);
172
173 let source = pin!(generational::Source::new());
174 let slot = pin!(Slot::<Sensor>::new());
175
176 let reader = ExclusiveReader::from_slot(slot.as_ref());
177 let mut writer = Writer::new(source.as_ref().waiter(), slot.as_ref());
178
179 assert_eq!(reader.read(|x| x.cloned()), None);
180 assert_eq!(reader.read_cloned(), None);
181
182 source.as_ref().increment_generation();
183 writer.write(Sensor(1)).now_or_never().unwrap();
184
185 assert_eq!(
186 reader.read(|x: Option<&Sensor>| x.cloned()),
187 Some(Sensor(1))
188 );
189 assert_eq!(reader.read_cloned(), Some(Sensor(1)));
190 }
191
192 #[test]
193 fn take() {
194 #[derive(Eq, PartialEq, Debug, Clone, Storable)]
195 #[storable(crate = crate)]
196 struct Sensor(u8);
197
198 let source = pin!(generational::Source::new());
199 let slot = pin!(Slot::<Sensor>::new());
200
201 let mut reader = ExclusiveReader::from_slot(slot.as_ref());
202 let mut writer = Writer::new(source.as_ref().waiter(), slot.as_ref());
203
204 assert_eq!(reader.take(), None);
205 source.as_ref().increment_generation();
206 writer.write(Sensor(10)).now_or_never().unwrap();
207 assert_eq!(reader.take(), Some(Sensor(10)));
208 assert_eq!(reader.take(), None);
209 }
210
211 #[test]
212 fn wait_for_update() {
213 #[derive(Eq, PartialEq, Debug, Clone, Storable)]
214 #[storable(crate = crate)]
215 struct Sensor(u8);
216
217 let source = pin!(generational::Source::new());
218 let slot = pin!(Slot::<Sensor>::new());
219
220 let mut reader = ExclusiveReader::from_slot(slot.as_ref());
221 let mut writer = Writer::new(source.as_ref().waiter(), slot.as_ref());
222
223 assert!(reader.wait_for_update().now_or_never().is_none());
224
225 source.as_ref().increment_generation();
226 writer.write(Sensor(1)).now_or_never().unwrap();
227
228 reader
229 .wait_for_update()
230 .now_or_never()
231 .unwrap()
232 .read(|x| assert_eq!(x, Some(&Sensor(1))));
233 }
234}