hydro_lang/live_collections/keyed_singleton.rs
1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::hash::Hash;
4
5use stageleft::{IntoQuotedMut, QuotedWithContext, q};
6
7use super::boundedness::{Bounded, Boundedness, Unbounded};
8use super::keyed_stream::KeyedStream;
9use super::optional::Optional;
10use super::singleton::Singleton;
11use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
12use crate::forward_handle::ForwardRef;
13#[cfg(stageleft_runtime)]
14use crate::forward_handle::{CycleCollection, ReceiverComplete};
15use crate::live_collections::stream::{Ordering, Retries};
16use crate::location::dynamic::LocationId;
17use crate::location::{Atomic, Location, NoTick, Tick};
18use crate::manual_expr::ManualExpr;
19use crate::nondet::{NonDet, nondet};
20
21/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
22///
23/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
24/// removed / changed), this also includes an additional variant [`BoundedValue`], which indicates
25/// that entries may be added over time, but once an entry is added it will never be removed and
26/// its value will never change.
27pub trait KeyedSingletonBound {
28 /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
29 type UnderlyingBound: Boundedness;
30 /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
31 type ValueBound: Boundedness;
32
33 /// The type of the keyed singleton if the value for each key is immutable.
34 type WithBoundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Bounded>;
35
36 /// The type of the keyed singleton if the value for each key may change asynchronously.
37 type WithUnboundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Unbounded>;
38}
39
40impl KeyedSingletonBound for Unbounded {
41 type UnderlyingBound = Unbounded;
42 type ValueBound = Unbounded;
43 type WithBoundedValue = BoundedValue;
44 type WithUnboundedValue = Unbounded;
45}
46
47impl KeyedSingletonBound for Bounded {
48 type UnderlyingBound = Bounded;
49 type ValueBound = Bounded;
50 type WithBoundedValue = Bounded;
51 type WithUnboundedValue = UnreachableBound;
52}
53
54/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
55/// its value is bounded and will never change. If the `KeyBound` is [`Bounded`], then the entire set of entries
56/// is bounded, but if it is [`Unbounded`], then new entries may appear asynchronously.
57pub struct BoundedValue;
58
59impl KeyedSingletonBound for BoundedValue {
60 type UnderlyingBound = Unbounded;
61 type ValueBound = Bounded;
62 type WithBoundedValue = BoundedValue;
63 type WithUnboundedValue = Unbounded;
64}
65
66#[doc(hidden)]
67pub struct UnreachableBound;
68
69impl KeyedSingletonBound for UnreachableBound {
70 type UnderlyingBound = Bounded;
71 type ValueBound = Unbounded;
72
73 type WithBoundedValue = Bounded;
74 type WithUnboundedValue = UnreachableBound;
75}
76
77/// Mapping from keys of type `K` to values of type `V`.
78///
79/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
80/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
81/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
82/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
83/// keys cannot be removed and the value for each key is immutable.
84///
85/// Type Parameters:
86/// - `K`: the type of the key for each entry
87/// - `V`: the type of the value for each entry
88/// - `Loc`: the [`Location`] where the keyed singleton is materialized
89/// - `Bound`: tracks whether the entries are:
90/// - [`Bounded`] (local and finite)
91/// - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
92/// - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
93pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
94 pub(crate) underlying: Stream<(K, V), Loc, Bound::UnderlyingBound, NoOrder, ExactlyOnce>,
95}
96
97impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
98 for KeyedSingleton<K, V, Loc, Bound>
99{
100 fn clone(&self) -> Self {
101 KeyedSingleton {
102 underlying: self.underlying.clone(),
103 }
104 }
105}
106
107impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
108 for KeyedSingleton<K, V, L, B>
109where
110 L: Location<'a> + NoTick,
111{
112 type Location = L;
113
114 fn create_source(ident: syn::Ident, location: L) -> Self {
115 KeyedSingleton {
116 underlying: Stream::create_source(ident, location),
117 }
118 }
119}
120
121impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
122 for KeyedSingleton<K, V, L, B>
123where
124 L: Location<'a> + NoTick,
125{
126 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
127 self.underlying.complete(ident, expected_location);
128 }
129}
130
131impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
132 KeyedSingleton<K, V, L, B>
133{
134 /// Flattens the keyed singleton into an unordered stream of key-value pairs.
135 ///
136 /// The value for each key must be bounded, otherwise the resulting stream elements would be
137 /// non-determinstic. As new entries are added to the keyed singleton, they will be streamed
138 /// into the output.
139 ///
140 /// # Example
141 /// ```rust
142 /// # use hydro_lang::prelude::*;
143 /// # use futures::StreamExt;
144 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
145 /// let keyed_singleton = // { 1: 2, 2: 4 }
146 /// # process
147 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
148 /// # .into_keyed()
149 /// # .first();
150 /// keyed_singleton.entries()
151 /// # }, |mut stream| async move {
152 /// // (1, 2), (2, 4) in any order
153 /// # let mut results = Vec::new();
154 /// # for _ in 0..2 {
155 /// # results.push(stream.next().await.unwrap());
156 /// # }
157 /// # results.sort();
158 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
159 /// # }));
160 /// ```
161 pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
162 self.underlying
163 }
164
165 /// Flattens the keyed singleton into an unordered stream of just the values.
166 ///
167 /// The value for each key must be bounded, otherwise the resulting stream elements would be
168 /// non-determinstic. As new entries are added to the keyed singleton, they will be streamed
169 /// into the output.
170 ///
171 /// # Example
172 /// ```rust
173 /// # use hydro_lang::prelude::*;
174 /// # use futures::StreamExt;
175 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
176 /// let keyed_singleton = // { 1: 2, 2: 4 }
177 /// # process
178 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
179 /// # .into_keyed()
180 /// # .first();
181 /// keyed_singleton.values()
182 /// # }, |mut stream| async move {
183 /// // 2, 4 in any order
184 /// # let mut results = Vec::new();
185 /// # for _ in 0..2 {
186 /// # results.push(stream.next().await.unwrap());
187 /// # }
188 /// # results.sort();
189 /// # assert_eq!(results, vec![2, 4]);
190 /// # }));
191 /// ```
192 pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
193 self.entries().map(q!(|(_, v)| v))
194 }
195
196 /// Flattens the keyed singleton into an unordered stream of just the keys.
197 ///
198 /// The value for each key must be bounded, otherwise the removal of keys would result in
199 /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
200 /// into the output.
201 ///
202 /// # Example
203 /// ```rust
204 /// # use hydro_lang::prelude::*;
205 /// # use futures::StreamExt;
206 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
207 /// let keyed_singleton = // { 1: 2, 2: 4 }
208 /// # process
209 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
210 /// # .into_keyed()
211 /// # .first();
212 /// keyed_singleton.keys()
213 /// # }, |mut stream| async move {
214 /// // 1, 2 in any order
215 /// # let mut results = Vec::new();
216 /// # for _ in 0..2 {
217 /// # results.push(stream.next().await.unwrap());
218 /// # }
219 /// # results.sort();
220 /// # assert_eq!(results, vec![1, 2]);
221 /// # }));
222 /// ```
223 pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
224 self.entries().map(q!(|(k, _)| k))
225 }
226
227 /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
228 /// entries whose keys are not in the provided stream.
229 ///
230 /// # Example
231 /// ```rust
232 /// # use hydro_lang::prelude::*;
233 /// # use futures::StreamExt;
234 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
235 /// let tick = process.tick();
236 /// let keyed_singleton = // { 1: 2, 2: 4 }
237 /// # process
238 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
239 /// # .into_keyed()
240 /// # .first()
241 /// # .batch(&tick, nondet!(/** test */));
242 /// let keys_to_remove = process
243 /// .source_iter(q!(vec![1]))
244 /// .batch(&tick, nondet!(/** test */));
245 /// keyed_singleton.filter_key_not_in(keys_to_remove)
246 /// # .entries().all_ticks()
247 /// # }, |mut stream| async move {
248 /// // { 2: 4 }
249 /// # for w in vec![(2, 4)] {
250 /// # assert_eq!(stream.next().await.unwrap(), w);
251 /// # }
252 /// # }));
253 /// ```
254 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
255 self,
256 other: Stream<K, L, Bounded, O2, R2>,
257 ) -> Self
258 where
259 K: Hash + Eq,
260 {
261 KeyedSingleton {
262 underlying: self.entries().anti_join(other),
263 }
264 }
265
266 /// An operator which allows you to "inspect" each value of a keyed singleton without
267 /// modifying it. The closure `f` is called on a reference to each value. This is
268 /// mainly useful for debugging, and should not be used to generate side-effects.
269 ///
270 /// # Example
271 /// ```rust
272 /// # use hydro_lang::prelude::*;
273 /// # use futures::StreamExt;
274 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
275 /// let keyed_singleton = // { 1: 2, 2: 4 }
276 /// # process
277 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
278 /// # .into_keyed()
279 /// # .first();
280 /// keyed_singleton
281 /// .inspect(q!(|v| println!("{}", v)))
282 /// # .entries()
283 /// # }, |mut stream| async move {
284 /// // { 1: 2, 2: 4 }
285 /// # for w in vec![(1, 2), (2, 4)] {
286 /// # assert_eq!(stream.next().await.unwrap(), w);
287 /// # }
288 /// # }));
289 /// ```
290 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
291 where
292 F: Fn(&V) + 'a,
293 {
294 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
295 KeyedSingleton {
296 underlying: self.underlying.inspect(q!({
297 let orig = f;
298 move |(_k, v)| orig(v)
299 })),
300 }
301 }
302
303 /// An operator which allows you to "inspect" each entry of a keyed singleton without
304 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
305 /// mainly useful for debugging, and should not be used to generate side-effects.
306 ///
307 /// # Example
308 /// ```rust
309 /// # use hydro_lang::prelude::*;
310 /// # use futures::StreamExt;
311 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
312 /// let keyed_singleton = // { 1: 2, 2: 4 }
313 /// # process
314 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
315 /// # .into_keyed()
316 /// # .first();
317 /// keyed_singleton
318 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
319 /// # .entries()
320 /// # }, |mut stream| async move {
321 /// // { 1: 2, 2: 4 }
322 /// # for w in vec![(1, 2), (2, 4)] {
323 /// # assert_eq!(stream.next().await.unwrap(), w);
324 /// # }
325 /// # }));
326 /// ```
327 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> KeyedSingleton<K, V, L, B>
328 where
329 F: Fn(&(K, V)) + 'a,
330 {
331 KeyedSingleton {
332 underlying: self.underlying.inspect(f),
333 }
334 }
335
336 /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
337 /// element, the value.
338 ///
339 /// This is the equivalent of [`Singleton::into_stream`] but keyed.
340 ///
341 /// # Example
342 /// ```rust
343 /// # use hydro_lang::prelude::*;
344 /// # use futures::StreamExt;
345 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
346 /// let keyed_singleton = // { 1: 2, 2: 4 }
347 /// # process
348 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
349 /// # .into_keyed()
350 /// # .first();
351 /// keyed_singleton
352 /// .clone()
353 /// .into_keyed_stream()
354 /// .interleave(
355 /// keyed_singleton.into_keyed_stream()
356 /// )
357 /// # .entries()
358 /// # }, |mut stream| async move {
359 /// /// // { 1: [2, 2], 2: [4, 4] }
360 /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
361 /// # assert_eq!(stream.next().await.unwrap(), w);
362 /// # }
363 /// # }));
364 /// ```
365 pub fn into_keyed_stream(
366 self,
367 ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
368 self.underlying
369 .into_keyed()
370 .assume_ordering(nondet!(/** only one element per key */))
371 }
372}
373
374#[cfg(stageleft_runtime)]
375fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
376 me: KeyedSingleton<K, V, L, Bounded>,
377) -> Singleton<usize, L, Bounded> {
378 me.underlying.count()
379}
380
381impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
382 /// Transforms each value by invoking `f` on each element, with keys staying the same
383 /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
384 ///
385 /// If you do not want to modify the stream and instead only want to view
386 /// each item use [`KeyedStream::inspect`] instead.
387 ///
388 /// # Example
389 /// ```rust
390 /// # use hydro_lang::prelude::*;
391 /// # use futures::StreamExt;
392 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
393 /// let keyed_singleton = // { 1: 2, 2: 4 }
394 /// # process
395 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
396 /// # .into_keyed()
397 /// # .first();
398 /// keyed_singleton.map(q!(|v| v + 1))
399 /// # .entries()
400 /// # }, |mut stream| async move {
401 /// // { 1: 3, 2: 5 }
402 /// # let mut results = Vec::new();
403 /// # for _ in 0..2 {
404 /// # results.push(stream.next().await.unwrap());
405 /// # }
406 /// # results.sort();
407 /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
408 /// # }));
409 /// ```
410 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, U, L, B>
411 where
412 F: Fn(V) -> U + 'a,
413 {
414 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
415 KeyedSingleton {
416 underlying: self.underlying.map(q!({
417 let orig = f;
418 move |(k, v)| (k, orig(v))
419 })),
420 }
421 }
422
423 /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
424 /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
425 ///
426 /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
427 /// the new value `U`. The key remains unchanged in the output.
428 ///
429 /// # Example
430 /// ```rust
431 /// # use hydro_lang::prelude::*;
432 /// # use futures::StreamExt;
433 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
434 /// let keyed_singleton = // { 1: 2, 2: 4 }
435 /// # process
436 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
437 /// # .into_keyed()
438 /// # .first();
439 /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
440 /// # .entries()
441 /// # }, |mut stream| async move {
442 /// // { 1: 3, 2: 6 }
443 /// # let mut results = Vec::new();
444 /// # for _ in 0..2 {
445 /// # results.push(stream.next().await.unwrap());
446 /// # }
447 /// # results.sort();
448 /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
449 /// # }));
450 /// ```
451 pub fn map_with_key<U, F>(
452 self,
453 f: impl IntoQuotedMut<'a, F, L> + Copy,
454 ) -> KeyedSingleton<K, U, L, B>
455 where
456 F: Fn((K, V)) -> U + 'a,
457 K: Clone,
458 {
459 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
460 KeyedSingleton {
461 underlying: self.underlying.map(q!({
462 let orig = f;
463 move |(k, v)| {
464 let out = orig((k.clone(), v));
465 (k, out)
466 }
467 })),
468 }
469 }
470
471 /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
472 ///
473 /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
474 /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
475 /// is filtered out.
476 ///
477 /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
478 /// not modify or take ownership of the values. If you need to modify the values while filtering
479 /// use [`KeyedSingleton::filter_map`] instead.
480 ///
481 /// # Example
482 /// ```rust
483 /// # use hydro_lang::prelude::*;
484 /// # use futures::StreamExt;
485 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
486 /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
487 /// # process
488 /// # .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
489 /// # .into_keyed()
490 /// # .first();
491 /// keyed_singleton.filter(q!(|&v| v > 1))
492 /// # .entries()
493 /// # }, |mut stream| async move {
494 /// // { 1: 2, 2: 4 }
495 /// # let mut results = Vec::new();
496 /// # for _ in 0..2 {
497 /// # results.push(stream.next().await.unwrap());
498 /// # }
499 /// # results.sort();
500 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
501 /// # }));
502 /// ```
503 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
504 where
505 F: Fn(&V) -> bool + 'a,
506 {
507 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
508 KeyedSingleton {
509 underlying: self.underlying.filter(q!({
510 let orig = f;
511 move |(_k, v)| orig(v)
512 })),
513 }
514 }
515
516 /// An operator that both filters and maps values. It yields only the key-value pairs where
517 /// the supplied closure `f` returns `Some(value)`.
518 ///
519 /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
520 /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
521 /// If it returns `None`, the key-value pair is filtered out.
522 ///
523 /// # Example
524 /// ```rust
525 /// # use hydro_lang::prelude::*;
526 /// # use futures::StreamExt;
527 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
528 /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
529 /// # process
530 /// # .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
531 /// # .into_keyed()
532 /// # .first();
533 /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
534 /// # .entries()
535 /// # }, |mut stream| async move {
536 /// // { 1: 42, 3: 100 }
537 /// # let mut results = Vec::new();
538 /// # for _ in 0..2 {
539 /// # results.push(stream.next().await.unwrap());
540 /// # }
541 /// # results.sort();
542 /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
543 /// # }));
544 /// ```
545 pub fn filter_map<F, U>(
546 self,
547 f: impl IntoQuotedMut<'a, F, L> + Copy,
548 ) -> KeyedSingleton<K, U, L, B>
549 where
550 F: Fn(V) -> Option<U> + 'a,
551 {
552 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
553 KeyedSingleton {
554 underlying: self.underlying.filter_map(q!({
555 let orig = f;
556 move |(k, v)| orig(v).map(|v| (k, v))
557 })),
558 }
559 }
560
561 /// Gets the number of keys in the keyed singleton.
562 ///
563 /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
564 /// since keys may be added / removed over time. When the set of keys changes, the count will
565 /// be asynchronously updated.
566 ///
567 /// # Example
568 /// ```rust
569 /// # use hydro_lang::prelude::*;
570 /// # use futures::StreamExt;
571 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
572 /// # let tick = process.tick();
573 /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
574 /// # process
575 /// # .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
576 /// # .into_keyed()
577 /// # .batch(&tick, nondet!(/** test */))
578 /// # .first();
579 /// keyed_singleton.key_count()
580 /// # .all_ticks()
581 /// # }, |mut stream| async move {
582 /// // 3
583 /// # assert_eq!(stream.next().await.unwrap(), 3);
584 /// # }));
585 /// ```
586 pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
587 if L::is_top_level()
588 && let Some(tick) = self.underlying.location.try_tick()
589 {
590 if B::ValueBound::is_bounded() {
591 let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
592 underlying: self.underlying,
593 };
594
595 me.entries().count()
596 } else {
597 let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
598 underlying: self.underlying,
599 };
600
601 let out = key_count_inside_tick(
602 me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
603 )
604 .latest();
605 Singleton::new(out.location, out.ir_node.into_inner())
606 }
607 } else {
608 self.underlying.count()
609 }
610 }
611
612 /// An operator which allows you to "name" a `HydroNode`.
613 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
614 pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
615 {
616 let mut node = self.underlying.ir_node.borrow_mut();
617 let metadata = node.metadata_mut();
618 metadata.tag = Some(name.to_string());
619 }
620 self
621 }
622}
623
624impl<'a, K: Hash + Eq, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
625 /// Gets the value associated with a specific key from the keyed singleton.
626 ///
627 /// # Example
628 /// ```rust
629 /// # use hydro_lang::prelude::*;
630 /// # use futures::StreamExt;
631 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
632 /// let tick = process.tick();
633 /// let keyed_data = process
634 /// .source_iter(q!(vec![(1, 2), (2, 3)]))
635 /// .into_keyed()
636 /// .batch(&tick, nondet!(/** test */))
637 /// .first();
638 /// let key = tick.singleton(q!(1));
639 /// keyed_data.get(key).all_ticks()
640 /// # }, |mut stream| async move {
641 /// // 2
642 /// # assert_eq!(stream.next().await.unwrap(), 2);
643 /// # }));
644 /// ```
645 pub fn get(self, key: Singleton<K, Tick<L>, Bounded>) -> Optional<V, Tick<L>, Bounded> {
646 self.entries()
647 .join(key.into_stream().map(q!(|k| (k, ()))))
648 .map(q!(|(_, (v, _))| v))
649 .assume_ordering::<TotalOrder>(nondet!(/** only a single key, so totally ordered */))
650 .first()
651 }
652
653 /// Given a keyed stream of lookup requests, where the key is the lookup and the value
654 /// is some additional metadata, emits a keyed stream of lookup results where the key
655 /// is the same as before, but the value is a tuple of the lookup result and the metadata
656 /// of the request. If the key is not found, no output will be produced.
657 ///
658 /// # Example
659 /// ```rust
660 /// # use hydro_lang::prelude::*;
661 /// # use futures::StreamExt;
662 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
663 /// let tick = process.tick();
664 /// let keyed_data = process
665 /// .source_iter(q!(vec![(1, 10), (2, 20)]))
666 /// .into_keyed()
667 /// .batch(&tick, nondet!(/** test */))
668 /// .first();
669 /// let other_data = process
670 /// .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
671 /// .into_keyed()
672 /// .batch(&tick, nondet!(/** test */));
673 /// keyed_data.get_many_if_present(other_data).entries().all_ticks()
674 /// # }, |mut stream| async move {
675 /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
676 /// # let mut results = vec![];
677 /// # for _ in 0..3 {
678 /// # results.push(stream.next().await.unwrap());
679 /// # }
680 /// # results.sort();
681 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
682 /// # }));
683 /// ```
684 pub fn get_many_if_present<O2: Ordering, R2: Retries, V2>(
685 self,
686 requests: KeyedStream<K, V2, Tick<L>, Bounded, O2, R2>,
687 ) -> KeyedStream<K, (V, V2), Tick<L>, Bounded, NoOrder, R2> {
688 self.entries()
689 .weaker_retries::<R2>()
690 .join(requests.entries())
691 .into_keyed()
692 }
693
694 /// For each entry in `self`, looks up the entry in the `from` with a key that matches the
695 /// **value** of the entry in `self`. The output is a keyed singleton with tuple values
696 /// containing the value from `self` and an option of the value from `from`. If the key is not
697 /// present in `from`, the option will be [`None`].
698 ///
699 /// # Example
700 /// ```rust
701 /// # use hydro_lang::prelude::*;
702 /// # use futures::StreamExt;
703 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
704 /// # let tick = process.tick();
705 /// let requests = // { 1: 10, 2: 20 }
706 /// # process
707 /// # .source_iter(q!(vec![(1, 10), (2, 20)]))
708 /// # .into_keyed()
709 /// # .batch(&tick, nondet!(/** test */))
710 /// # .first();
711 /// let other_data = // { 10: 100, 11: 101 }
712 /// # process
713 /// # .source_iter(q!(vec![(10, 100), (11, 101)]))
714 /// # .into_keyed()
715 /// # .batch(&tick, nondet!(/** test */))
716 /// # .first();
717 /// requests.get_from(other_data)
718 /// # .entries().all_ticks()
719 /// # }, |mut stream| async move {
720 /// // { 1: (10, Some(100)), 2: (20, None) }
721 /// # let mut results = vec![];
722 /// # for _ in 0..2 {
723 /// # results.push(stream.next().await.unwrap());
724 /// # }
725 /// # results.sort();
726 /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
727 /// # }));
728 /// ```
729 pub fn get_from<V2: Clone>(
730 self,
731 from: KeyedSingleton<V, V2, Tick<L>, Bounded>,
732 ) -> KeyedSingleton<K, (V, Option<V2>), Tick<L>, Bounded>
733 where
734 K: Clone,
735 V: Hash + Eq + Clone,
736 {
737 let to_lookup = self.entries().map(q!(|(k, v)| (v, k))).into_keyed();
738 let lookup_result = from.get_many_if_present(to_lookup.clone());
739 let missing_values =
740 to_lookup.filter_key_not_in(lookup_result.clone().entries().map(q!(|t| t.0)));
741 KeyedSingleton {
742 underlying: lookup_result
743 .entries()
744 .map(q!(|(v, (v2, k))| (k, (v, Some(v2)))))
745 .chain(missing_values.entries().map(q!(|(v, k)| (k, (v, None))))),
746 }
747 }
748}
749
750impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
751where
752 L: Location<'a>,
753{
754 /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
755 /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
756 ///
757 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
758 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
759 /// argument that declares where the keyed singleton will be atomically processed. Batching a
760 /// keyed singleton into the _same_ [`Tick`] will preserve the synchronous execution, while
761 /// batching into a different [`Tick`] will introduce asynchrony.
762 pub fn atomic(self, tick: &Tick<L>) -> KeyedSingleton<K, V, Atomic<L>, B> {
763 KeyedSingleton {
764 underlying: self.underlying.atomic(tick),
765 }
766 }
767}
768
769impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
770where
771 L: Location<'a> + NoTick,
772{
773 /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
774 /// See [`KeyedSingleton::atomic`] for more details.
775 pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
776 KeyedSingleton {
777 underlying: self.underlying.end_atomic(),
778 }
779 }
780}
781
782impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
783 /// Asynchronously yields this keyed singleton outside the tick, which will
784 /// be asynchronously updated with the latest set of entries inside the tick.
785 ///
786 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
787 /// tick that tracks the inner value. This is useful for getting the value as of the
788 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
789 ///
790 /// The entire set of entries are propagated on each tick, which means that if a tick
791 /// does not have a key "XYZ" that was present in the previous tick, the entry for "XYZ" will
792 /// also be removed from the output.
793 ///
794 /// # Example
795 /// ```rust
796 /// # use hydro_lang::prelude::*;
797 /// # use futures::StreamExt;
798 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
799 /// let tick = process.tick();
800 /// # // ticks are lazy by default, forces the second tick to run
801 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
802 /// # let batch_first_tick = process
803 /// # .source_iter(q!(vec![(1, 2), (2, 3)]))
804 /// # .batch(&tick, nondet!(/** test */))
805 /// # .into_keyed();
806 /// # let batch_second_tick = process
807 /// # .source_iter(q!(vec![(2, 4), (3, 5)]))
808 /// # .batch(&tick, nondet!(/** test */))
809 /// # .into_keyed()
810 /// # .defer_tick(); // appears on the second tick
811 /// # let input_batch = batch_first_tick.chain(batch_second_tick).first();
812 /// input_batch // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
813 /// .latest()
814 /// # .snapshot(&tick, nondet!(/** test */))
815 /// # .entries()
816 /// # .all_ticks()
817 /// # }, |mut stream| async move {
818 /// // asynchronously changes from { 1: 2, 2: 3 } ~> { 2: 4, 3: 5 }
819 /// # for w in vec![(1, 2), (2, 3), (2, 4), (3, 5)] {
820 /// # assert_eq!(stream.next().await.unwrap(), w);
821 /// # }
822 /// # }));
823 /// ```
824 pub fn latest(self) -> KeyedSingleton<K, V, L, Unbounded> {
825 KeyedSingleton {
826 underlying: self.underlying.all_ticks(),
827 }
828 }
829
830 /// Synchronously yields this keyed singleton outside the tick as an unbounded keyed singleton,
831 /// which will be updated with the latest set of entries inside the tick.
832 ///
833 /// Unlike [`KeyedSingleton::latest`], this preserves synchronous execution, as the output
834 /// keyed singleton is emitted in an [`Atomic`] context that will process elements synchronously
835 /// with the input keyed singleton's [`Tick`] context.
836 pub fn latest_atomic(self) -> KeyedSingleton<K, V, Atomic<L>, Unbounded> {
837 KeyedSingleton {
838 underlying: self.underlying.all_ticks_atomic(),
839 }
840 }
841
842 #[expect(missing_docs, reason = "TODO")]
843 pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
844 KeyedSingleton {
845 underlying: self.underlying.defer_tick(),
846 }
847 }
848}
849
850impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
851where
852 L: Location<'a>,
853{
854 /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
855 /// point in time.
856 ///
857 /// # Non-Determinism
858 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
859 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
860 pub fn snapshot(
861 self,
862 tick: &Tick<L>,
863 nondet: NonDet,
864 ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
865 KeyedSingleton {
866 underlying: self.underlying.batch(tick, nondet),
867 }
868 }
869}
870
871impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
872where
873 L: Location<'a> + NoTick,
874{
875 /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
876 /// state of the keyed singleton being atomically processed.
877 ///
878 /// # Non-Determinism
879 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
880 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
881 pub fn snapshot_atomic(self, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
882 KeyedSingleton {
883 underlying: self.underlying.batch_atomic(nondet),
884 }
885 }
886}
887
888impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
889where
890 L: Location<'a> + NoTick,
891{
892 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
893 /// arrived since the previous batch was released.
894 ///
895 /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
896 /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
897 ///
898 /// # Non-Determinism
899 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
900 /// has a non-deterministic set of key-value pairs.
901 pub fn batch(self, tick: &Tick<L>, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
902 self.atomic(tick).batch_atomic(nondet)
903 }
904}
905
906impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
907where
908 L: Location<'a> + NoTick,
909{
910 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
911 /// atomically processed.
912 ///
913 /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
914 /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
915 ///
916 /// # Non-Determinism
917 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
918 /// has a non-deterministic set of key-value pairs.
919 pub fn batch_atomic(self, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
920 KeyedSingleton {
921 underlying: self.underlying.batch_atomic(nondet),
922 }
923 }
924}
925
926#[cfg(test)]
927mod tests {
928 use futures::{SinkExt, StreamExt};
929 use hydro_deploy::Deployment;
930 use stageleft::q;
931
932 use crate::compile::builder::FlowBuilder;
933 use crate::location::Location;
934 use crate::nondet::nondet;
935
936 #[tokio::test]
937 async fn key_count_bounded_value() {
938 let mut deployment = Deployment::new();
939
940 let flow = FlowBuilder::new();
941 let node = flow.process::<()>();
942 let external = flow.external::<()>();
943
944 let (input_port, input) = node.source_external_bincode(&external);
945 let out = input
946 .into_keyed()
947 .first()
948 .key_count()
949 .sample_eager(nondet!(/** test */))
950 .send_bincode_external(&external);
951
952 let nodes = flow
953 .with_process(&node, deployment.Localhost())
954 .with_external(&external, deployment.Localhost())
955 .deploy(&mut deployment);
956
957 deployment.deploy().await.unwrap();
958
959 let mut external_in = nodes.connect_sink_bincode(input_port).await;
960 let mut external_out = nodes.connect_source_bincode(out).await;
961
962 deployment.start().await.unwrap();
963
964 assert_eq!(external_out.next().await.unwrap(), 0);
965
966 external_in.send((1, 1)).await.unwrap();
967 assert_eq!(external_out.next().await.unwrap(), 1);
968
969 external_in.send((2, 2)).await.unwrap();
970 assert_eq!(external_out.next().await.unwrap(), 2);
971 }
972
973 #[tokio::test]
974 async fn key_count_unbounded_value() {
975 let mut deployment = Deployment::new();
976
977 let flow = FlowBuilder::new();
978 let node = flow.process::<()>();
979 let external = flow.external::<()>();
980
981 let (input_port, input) = node.source_external_bincode(&external);
982 let out = input
983 .into_keyed()
984 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
985 .key_count()
986 .sample_eager(nondet!(/** test */))
987 .send_bincode_external(&external);
988
989 let nodes = flow
990 .with_process(&node, deployment.Localhost())
991 .with_external(&external, deployment.Localhost())
992 .deploy(&mut deployment);
993
994 deployment.deploy().await.unwrap();
995
996 let mut external_in = nodes.connect_sink_bincode(input_port).await;
997 let mut external_out = nodes.connect_source_bincode(out).await;
998
999 deployment.start().await.unwrap();
1000
1001 assert_eq!(external_out.next().await.unwrap(), 0);
1002
1003 external_in.send((1, 1)).await.unwrap();
1004 assert_eq!(external_out.next().await.unwrap(), 1);
1005
1006 external_in.send((1, 2)).await.unwrap();
1007 assert_eq!(external_out.next().await.unwrap(), 1);
1008
1009 external_in.send((2, 2)).await.unwrap();
1010 assert_eq!(external_out.next().await.unwrap(), 2);
1011
1012 external_in.send((1, 1)).await.unwrap();
1013 assert_eq!(external_out.next().await.unwrap(), 2);
1014
1015 external_in.send((3, 1)).await.unwrap();
1016 assert_eq!(external_out.next().await.unwrap(), 3);
1017 }
1018}