hydro_lang/live_collections/keyed_stream/
mod.rs

1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::collections::HashMap;
4use std::hash::Hash;
5use std::marker::PhantomData;
6
7use stageleft::{IntoQuotedMut, QuotedWithContext, q};
8
9use super::boundedness::{Bounded, Boundedness, Unbounded};
10use super::keyed_singleton::KeyedSingleton;
11use super::optional::Optional;
12use super::stream::{ExactlyOnce, MinOrder, MinRetries, NoOrder, Stream, TotalOrder};
13use crate::compile::ir::HydroNode;
14use crate::forward_handle::ForwardRef;
15#[cfg(stageleft_runtime)]
16use crate::forward_handle::{CycleCollection, ReceiverComplete};
17use crate::live_collections::stream::{Ordering, Retries};
18use crate::location::dynamic::LocationId;
19use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
20use crate::manual_expr::ManualExpr;
21use crate::nondet::{NonDet, nondet};
22
23pub mod networking;
24
25/// Streaming elements of type `V` grouped by a key of type `K`.
26///
27/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
28/// order of keys is non-deterministic but the order *within* each group may be deterministic.
29///
30/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
31/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
32/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
33///
34/// Type Parameters:
35/// - `K`: the type of the key for each group
36/// - `V`: the type of the elements inside each group
37/// - `Loc`: the [`Location`] where the keyed stream is materialized
38/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
39/// - `Order`: tracks whether the elements within each group have deterministic order
40///   ([`TotalOrder`]) or not ([`NoOrder`])
41/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
42///   ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
43pub struct KeyedStream<
44    K,
45    V,
46    Loc,
47    Bound: Boundedness,
48    Order: Ordering = TotalOrder,
49    Retry: Retries = ExactlyOnce,
50> {
51    pub(crate) underlying: Stream<(K, V), Loc, Bound, NoOrder, Retry>,
52    pub(crate) _phantom_order: PhantomData<Order>,
53}
54
55impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
56    for KeyedStream<K, V, L, B, NoOrder, R>
57where
58    L: Location<'a>,
59{
60    fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
61        KeyedStream {
62            underlying: stream.underlying,
63            _phantom_order: Default::default(),
64        }
65    }
66}
67
68impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
69    Clone for KeyedStream<K, V, Loc, Bound, Order, R>
70{
71    fn clone(&self) -> Self {
72        KeyedStream {
73            underlying: self.underlying.clone(),
74            _phantom_order: PhantomData,
75        }
76    }
77}
78
79impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
80    for KeyedStream<K, V, L, B, O, R>
81where
82    L: Location<'a> + NoTick,
83{
84    type Location = L;
85
86    fn create_source(ident: syn::Ident, location: L) -> Self {
87        Stream::create_source(ident, location).into_keyed()
88    }
89}
90
91impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
92    for KeyedStream<K, V, L, B, O, R>
93where
94    L: Location<'a> + NoTick,
95{
96    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
97        self.underlying.complete(ident, expected_location);
98    }
99}
100
101impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
102    KeyedStream<K, V, L, B, O, R>
103{
104    /// Explicitly "casts" the keyed stream to a type with a different ordering
105    /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
106    /// by the type-system.
107    ///
108    /// # Non-Determinism
109    /// This function is used as an escape hatch, and any mistakes in the
110    /// provided ordering guarantee will propagate into the guarantees
111    /// for the rest of the program.
112    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
113        KeyedStream {
114            underlying: self.underlying,
115            _phantom_order: PhantomData,
116        }
117    }
118
119    /// Explicitly "casts" the keyed stream to a type with a different retries
120    /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
121    /// be proven by the type-system.
122    ///
123    /// # Non-Determinism
124    /// This function is used as an escape hatch, and any mistakes in the
125    /// provided retries guarantee will propagate into the guarantees
126    /// for the rest of the program.
127    pub fn assume_retries<R2: Retries>(self, nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
128        KeyedStream {
129            underlying: self.underlying.assume_retries::<R2>(nondet),
130            _phantom_order: PhantomData,
131        }
132    }
133
134    /// Flattens the keyed stream into an unordered stream of key-value pairs.
135    ///
136    /// # Example
137    /// ```rust
138    /// # use hydro_lang::prelude::*;
139    /// # use futures::StreamExt;
140    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
141    /// process
142    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
143    ///     .into_keyed()
144    ///     .entries()
145    /// # }, |mut stream| async move {
146    /// // (1, 2), (1, 3), (2, 4) in any order
147    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
148    /// #     assert_eq!(stream.next().await.unwrap(), w);
149    /// # }
150    /// # }));
151    /// ```
152    pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
153        self.underlying
154    }
155
156    /// Flattens the keyed stream into an unordered stream of only the values.
157    ///
158    /// # Example
159    /// ```rust
160    /// # use hydro_lang::prelude::*;
161    /// # use futures::StreamExt;
162    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
163    /// process
164    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
165    ///     .into_keyed()
166    ///     .values()
167    /// # }, |mut stream| async move {
168    /// // 2, 3, 4 in any order
169    /// # for w in vec![2, 3, 4] {
170    /// #     assert_eq!(stream.next().await.unwrap(), w);
171    /// # }
172    /// # }));
173    /// ```
174    pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
175        self.underlying.map(q!(|(_, v)| v))
176    }
177
178    /// Transforms each value by invoking `f` on each element, with keys staying the same
179    /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
180    ///
181    /// If you do not want to modify the stream and instead only want to view
182    /// each item use [`KeyedStream::inspect`] instead.
183    ///
184    /// # Example
185    /// ```rust
186    /// # use hydro_lang::prelude::*;
187    /// # use futures::StreamExt;
188    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
189    /// process
190    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
191    ///     .into_keyed()
192    ///     .map(q!(|v| v + 1))
193    /// #   .entries()
194    /// # }, |mut stream| async move {
195    /// // { 1: [3, 4], 2: [5] }
196    /// # for w in vec![(1, 3), (1, 4), (2, 5)] {
197    /// #     assert_eq!(stream.next().await.unwrap(), w);
198    /// # }
199    /// # }));
200    /// ```
201    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
202    where
203        F: Fn(V) -> U + 'a,
204    {
205        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
206        KeyedStream {
207            underlying: self.underlying.map(q!({
208                let orig = f;
209                move |(k, v)| (k, orig(v))
210            })),
211            _phantom_order: Default::default(),
212        }
213    }
214
215    /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
216    /// re-grouped even they are tuples; instead they will be grouped under the original key.
217    ///
218    /// If you do not want to modify the stream and instead only want to view
219    /// each item use [`KeyedStream::inspect_with_key`] instead.
220    ///
221    /// # Example
222    /// ```rust
223    /// # use hydro_lang::prelude::*;
224    /// # use futures::StreamExt;
225    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
226    /// process
227    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
228    ///     .into_keyed()
229    ///     .map_with_key(q!(|(k, v)| k + v))
230    /// #   .entries()
231    /// # }, |mut stream| async move {
232    /// // { 1: [3, 4], 2: [6] }
233    /// # for w in vec![(1, 3), (1, 4), (2, 6)] {
234    /// #     assert_eq!(stream.next().await.unwrap(), w);
235    /// # }
236    /// # }));
237    /// ```
238    pub fn map_with_key<U, F>(
239        self,
240        f: impl IntoQuotedMut<'a, F, L> + Copy,
241    ) -> KeyedStream<K, U, L, B, O, R>
242    where
243        F: Fn((K, V)) -> U + 'a,
244        K: Clone,
245    {
246        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
247        KeyedStream {
248            underlying: self.underlying.map(q!({
249                let orig = f;
250                move |(k, v)| {
251                    let out = orig((k.clone(), v));
252                    (k, out)
253                }
254            })),
255            _phantom_order: Default::default(),
256        }
257    }
258
259    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
260    /// `f`, preserving the order of the elements within the group.
261    ///
262    /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
263    /// not modify or take ownership of the values. If you need to modify the values while filtering
264    /// use [`KeyedStream::filter_map`] instead.
265    ///
266    /// # Example
267    /// ```rust
268    /// # use hydro_lang::prelude::*;
269    /// # use futures::StreamExt;
270    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
271    /// process
272    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
273    ///     .into_keyed()
274    ///     .filter(q!(|&x| x > 2))
275    /// #   .entries()
276    /// # }, |mut stream| async move {
277    /// // { 1: [3], 2: [4] }
278    /// # for w in vec![(1, 3), (2, 4)] {
279    /// #     assert_eq!(stream.next().await.unwrap(), w);
280    /// # }
281    /// # }));
282    /// ```
283    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
284    where
285        F: Fn(&V) -> bool + 'a,
286    {
287        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
288        KeyedStream {
289            underlying: self.underlying.filter(q!({
290                let orig = f;
291                move |(_k, v)| orig(v)
292            })),
293            _phantom_order: Default::default(),
294        }
295    }
296
297    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
298    /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
299    ///
300    /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
301    /// not modify or take ownership of the values. If you need to modify the values while filtering
302    /// use [`KeyedStream::filter_map_with_key`] instead.
303    ///
304    /// # Example
305    /// ```rust
306    /// # use hydro_lang::prelude::*;
307    /// # use futures::StreamExt;
308    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
309    /// process
310    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
311    ///     .into_keyed()
312    ///     .filter_with_key(q!(|&(k, v)| v - k == 2))
313    /// #   .entries()
314    /// # }, |mut stream| async move {
315    /// // { 1: [3], 2: [4] }
316    /// # for w in vec![(1, 3), (2, 4)] {
317    /// #     assert_eq!(stream.next().await.unwrap(), w);
318    /// # }
319    /// # }));
320    /// ```
321    pub fn filter_with_key<F>(
322        self,
323        f: impl IntoQuotedMut<'a, F, L> + Copy,
324    ) -> KeyedStream<K, V, L, B, O, R>
325    where
326        F: Fn(&(K, V)) -> bool + 'a,
327    {
328        KeyedStream {
329            underlying: self.underlying.filter(f),
330            _phantom_order: Default::default(),
331        }
332    }
333
334    /// An operator that both filters and maps each value, with keys staying the same.
335    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
336    /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
337    ///
338    /// # Example
339    /// ```rust
340    /// # use hydro_lang::prelude::*;
341    /// # use futures::StreamExt;
342    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
343    /// process
344    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
345    ///     .into_keyed()
346    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
347    /// #   .entries()
348    /// # }, |mut stream| async move {
349    /// // { 1: [2], 2: [4] }
350    /// # for w in vec![(1, 2), (2, 4)] {
351    /// #     assert_eq!(stream.next().await.unwrap(), w);
352    /// # }
353    /// # }));
354    /// ```
355    pub fn filter_map<U, F>(
356        self,
357        f: impl IntoQuotedMut<'a, F, L> + Copy,
358    ) -> KeyedStream<K, U, L, B, O, R>
359    where
360        F: Fn(V) -> Option<U> + 'a,
361    {
362        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
363        KeyedStream {
364            underlying: self.underlying.filter_map(q!({
365                let orig = f;
366                move |(k, v)| orig(v).map(|o| (k, o))
367            })),
368            _phantom_order: Default::default(),
369        }
370    }
371
372    /// An operator that both filters and maps each key-value pair. The resulting values are **not**
373    /// re-grouped even they are tuples; instead they will be grouped under the original key.
374    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
375    ///
376    /// # Example
377    /// ```rust
378    /// # use hydro_lang::prelude::*;
379    /// # use futures::StreamExt;
380    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
381    /// process
382    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
383    ///     .into_keyed()
384    ///     .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
385    /// #   .entries()
386    /// # }, |mut stream| async move {
387    /// // { 2: [2] }
388    /// # for w in vec![(2, 2)] {
389    /// #     assert_eq!(stream.next().await.unwrap(), w);
390    /// # }
391    /// # }));
392    /// ```
393    pub fn filter_map_with_key<U, F>(
394        self,
395        f: impl IntoQuotedMut<'a, F, L> + Copy,
396    ) -> KeyedStream<K, U, L, B, O, R>
397    where
398        F: Fn((K, V)) -> Option<U> + 'a,
399        K: Clone,
400    {
401        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
402        KeyedStream {
403            underlying: self.underlying.filter_map(q!({
404                let orig = f;
405                move |(k, v)| {
406                    let out = orig((k.clone(), v));
407                    out.map(|o| (k, o))
408                }
409            })),
410            _phantom_order: Default::default(),
411        }
412    }
413
414    /// For each value `v` in each group, transform `v` using `f` and then treat the
415    /// result as an [`Iterator`] to produce values one by one within the same group.
416    /// The implementation for [`Iterator`] for the output type `I` must produce items
417    /// in a **deterministic** order.
418    ///
419    /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
420    /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
421    ///
422    /// # Example
423    /// ```rust
424    /// # use hydro_lang::prelude::*;
425    /// # use futures::StreamExt;
426    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
427    /// process
428    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
429    ///     .into_keyed()
430    ///     .flat_map_ordered(q!(|x| x))
431    /// #   .entries()
432    /// # }, |mut stream| async move {
433    /// // { 1: [2, 3, 4], 2: [5, 6] }
434    /// # for w in vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)] {
435    /// #     assert_eq!(stream.next().await.unwrap(), w);
436    /// # }
437    /// # }));
438    /// ```
439    pub fn flat_map_ordered<U, I, F>(
440        self,
441        f: impl IntoQuotedMut<'a, F, L> + Copy,
442    ) -> KeyedStream<K, U, L, B, O, R>
443    where
444        I: IntoIterator<Item = U>,
445        F: Fn(V) -> I + 'a,
446        K: Clone,
447    {
448        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
449        KeyedStream {
450            underlying: self.underlying.flat_map_ordered(q!({
451                let orig = f;
452                move |(k, v)| orig(v).into_iter().map(move |u| (k.clone(), u))
453            })),
454            _phantom_order: Default::default(),
455        }
456    }
457
458    /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
459    /// for the output type `I` to produce items in any order.
460    ///
461    /// # Example
462    /// ```rust
463    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
464    /// # use futures::StreamExt;
465    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
466    /// process
467    ///     .source_iter(q!(vec![
468    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
469    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
470    ///     ]))
471    ///     .into_keyed()
472    ///     .flat_map_unordered(q!(|x| x))
473    /// #   .entries()
474    /// # }, |mut stream| async move {
475    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
476    /// # let mut results = Vec::new();
477    /// # for _ in 0..4 {
478    /// #     results.push(stream.next().await.unwrap());
479    /// # }
480    /// # results.sort();
481    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
482    /// # }));
483    /// ```
484    pub fn flat_map_unordered<U, I, F>(
485        self,
486        f: impl IntoQuotedMut<'a, F, L> + Copy,
487    ) -> KeyedStream<K, U, L, B, NoOrder, R>
488    where
489        I: IntoIterator<Item = U>,
490        F: Fn(V) -> I + 'a,
491        K: Clone,
492    {
493        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
494        KeyedStream {
495            underlying: self.underlying.flat_map_unordered(q!({
496                let orig = f;
497                move |(k, v)| orig(v).into_iter().map(move |u| (k.clone(), u))
498            })),
499            _phantom_order: Default::default(),
500        }
501    }
502
503    /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
504    /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
505    /// items in a **deterministic** order.
506    ///
507    /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
508    /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
509    ///
510    /// # Example
511    /// ```rust
512    /// # use hydro_lang::prelude::*;
513    /// # use futures::StreamExt;
514    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
515    /// process
516    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
517    ///     .into_keyed()
518    ///     .flatten_ordered()
519    /// #   .entries()
520    /// # }, |mut stream| async move {
521    /// // { 1: [2, 3, 4], 2: [5, 6] }
522    /// # for w in vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)] {
523    /// #     assert_eq!(stream.next().await.unwrap(), w);
524    /// # }
525    /// # }));
526    /// ```
527    pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
528    where
529        V: IntoIterator<Item = U>,
530        K: Clone,
531    {
532        self.flat_map_ordered(q!(|d| d))
533    }
534
535    /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
536    /// for the value type `V` to produce items in any order.
537    ///
538    /// # Example
539    /// ```rust
540    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
541    /// # use futures::StreamExt;
542    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
543    /// process
544    ///     .source_iter(q!(vec![
545    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
546    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
547    ///     ]))
548    ///     .into_keyed()
549    ///     .flatten_unordered()
550    /// #   .entries()
551    /// # }, |mut stream| async move {
552    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
553    /// # let mut results = Vec::new();
554    /// # for _ in 0..4 {
555    /// #     results.push(stream.next().await.unwrap());
556    /// # }
557    /// # results.sort();
558    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
559    /// # }));
560    /// ```
561    pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
562    where
563        V: IntoIterator<Item = U>,
564        K: Clone,
565    {
566        self.flat_map_unordered(q!(|d| d))
567    }
568
569    /// An operator which allows you to "inspect" each element of a stream without
570    /// modifying it. The closure `f` is called on a reference to each value. This is
571    /// mainly useful for debugging, and should not be used to generate side-effects.
572    ///
573    /// # Example
574    /// ```rust
575    /// # use hydro_lang::prelude::*;
576    /// # use futures::StreamExt;
577    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
578    /// process
579    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
580    ///     .into_keyed()
581    ///     .inspect(q!(|v| println!("{}", v)))
582    /// #   .entries()
583    /// # }, |mut stream| async move {
584    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
585    /// #     assert_eq!(stream.next().await.unwrap(), w);
586    /// # }
587    /// # }));
588    /// ```
589    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
590    where
591        F: Fn(&V) + 'a,
592    {
593        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
594        KeyedStream {
595            underlying: self.underlying.inspect(q!({
596                let orig = f;
597                move |(_k, v)| orig(v)
598            })),
599            _phantom_order: Default::default(),
600        }
601    }
602
603    /// An operator which allows you to "inspect" each element of a stream without
604    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
605    /// mainly useful for debugging, and should not be used to generate side-effects.
606    ///
607    /// # Example
608    /// ```rust
609    /// # use hydro_lang::prelude::*;
610    /// # use futures::StreamExt;
611    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
612    /// process
613    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
614    ///     .into_keyed()
615    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
616    /// #   .entries()
617    /// # }, |mut stream| async move {
618    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
619    /// #     assert_eq!(stream.next().await.unwrap(), w);
620    /// # }
621    /// # }));
622    /// ```
623    pub fn inspect_with_key<F>(
624        self,
625        f: impl IntoQuotedMut<'a, F, L>,
626    ) -> KeyedStream<K, V, L, B, O, R>
627    where
628        F: Fn(&(K, V)) + 'a,
629    {
630        KeyedStream {
631            underlying: self.underlying.inspect(f),
632            _phantom_order: Default::default(),
633        }
634    }
635
636    /// An operator which allows you to "name" a `HydroNode`.
637    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
638    pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
639        {
640            let mut node = self.underlying.ir_node.borrow_mut();
641            let metadata = node.metadata_mut();
642            metadata.tag = Some(name.to_string());
643        }
644        self
645    }
646}
647
648impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries>
649    KeyedStream<K, V, L, Unbounded, O, R>
650{
651    /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
652    /// of any overlapping groups. The result has [`NoOrder`] on each group because the
653    /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
654    /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
655    ///
656    /// Currently, both input streams must be [`Unbounded`].
657    ///
658    /// # Example
659    /// ```rust
660    /// # use hydro_lang::prelude::*;
661    /// # use futures::StreamExt;
662    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
663    /// let numbers1 = process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed();
664    /// let numbers2 = process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed();
665    /// numbers1.interleave(numbers2)
666    /// #   .entries()
667    /// # }, |mut stream| async move {
668    /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
669    /// # for w in vec![(1, 2), (3, 4), (1, 3), (3, 5)] {
670    /// #     assert_eq!(stream.next().await.unwrap(), w);
671    /// # }
672    /// # }));
673    /// ```
674    pub fn interleave<O2: Ordering, R2: Retries>(
675        self,
676        other: KeyedStream<K, V, L, Unbounded, O2, R2>,
677    ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
678    where
679        R: MinRetries<R2>,
680    {
681        self.entries().interleave(other.entries()).into_keyed()
682    }
683}
684
685/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
686/// control the processing of future elements.
687pub enum Generate<T> {
688    /// Emit the provided element, and keep processing future inputs.
689    Yield(T),
690    /// Emit the provided element as the _final_ element, do not process future inputs.
691    Return(T),
692    /// Do not emit anything, but continue processing future inputs.
693    Continue,
694    /// Do not emit anything, and do not process further inputs.
695    Break,
696}
697
698impl<'a, K, V, L, B: Boundedness> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
699where
700    K: Eq + Hash,
701    L: Location<'a>,
702{
703    /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
704    ///
705    /// Unlike [`Stream::fold_keyed`] which only returns the final accumulated value, `scan` produces a new stream
706    /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
707    /// early by returning `None`.
708    ///
709    /// The function takes a mutable reference to the accumulator and the current element, and returns
710    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
711    /// If the function returns `None`, the stream is terminated and no more elements are processed.
712    ///
713    /// # Example
714    /// ```rust
715    /// # use hydro_lang::prelude::*;
716    /// # use futures::StreamExt;
717    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
718    /// process
719    ///     .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
720    ///     .into_keyed()
721    ///     .scan(
722    ///         q!(|| 0),
723    ///         q!(|acc, x| {
724    ///             *acc += x;
725    ///             if *acc % 2 == 0 { None } else { Some(*acc) }
726    ///         }),
727    ///     )
728    /// #   .entries()
729    /// # }, |mut stream| async move {
730    /// // Output: { 0: [1], 1: [3, 7] }
731    /// # for w in vec![(0, 1), (1, 3), (1, 7)] {
732    /// #     assert_eq!(stream.next().await.unwrap(), w);
733    /// # }
734    /// # }));
735    /// ```
736    pub fn scan<A, U, I, F>(
737        self,
738        init: impl IntoQuotedMut<'a, I, L> + Copy,
739        f: impl IntoQuotedMut<'a, F, L> + Copy,
740    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
741    where
742        K: Clone,
743        I: Fn() -> A + 'a,
744        F: Fn(&mut A, V) -> Option<U> + 'a,
745    {
746        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
747        self.generator(
748            init,
749            q!({
750                let orig = f;
751                move |state, v| {
752                    if let Some(out) = orig(state, v) {
753                        Generate::Yield(out)
754                    } else {
755                        Generate::Break
756                    }
757                }
758            }),
759        )
760    }
761
762    /// Iteratively processes the elements in each group using a state machine that can yield
763    /// elements as it processes its inputs. This is designed to mirror the unstable generator
764    /// syntax in Rust, without requiring special syntax.
765    ///
766    /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
767    /// state for each group. The second argument defines the processing logic, taking in a
768    /// mutable reference to the group's state and the value to be processed. It emits a
769    /// [`Generate`] value, whose variants define what is emitted and whether further inputs
770    /// should be processed.
771    ///
772    /// # Example
773    /// ```rust
774    /// # use hydro_lang::prelude::*;
775    /// # use futures::StreamExt;
776    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
777    /// process
778    ///     .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
779    ///     .into_keyed()
780    ///     .generator(
781    ///         q!(|| 0),
782    ///         q!(|acc, x| {
783    ///             *acc += x;
784    ///             if *acc > 100 {
785    ///                 hydro_lang::live_collections::keyed_stream::Generate::Return(
786    ///                     "done!".to_string()
787    ///                 )
788    ///             } else if *acc % 2 == 0 {
789    ///                 hydro_lang::live_collections::keyed_stream::Generate::Yield(
790    ///                     "even".to_string()
791    ///                 )
792    ///             } else {
793    ///                 hydro_lang::live_collections::keyed_stream::Generate::Continue
794    ///             }
795    ///         }),
796    ///     )
797    /// #   .entries()
798    /// # }, |mut stream| async move {
799    /// // Output: { 0: ["even", "done!"], 1: ["even"] }
800    /// # for w in vec![(0, "even".to_string()), (0, "done!".to_string()), (1, "even".to_string())] {
801    /// #     assert_eq!(stream.next().await.unwrap(), w);
802    /// # }
803    /// # }));
804    /// ```
805    pub fn generator<A, U, I, F>(
806        self,
807        init: impl IntoQuotedMut<'a, I, L> + Copy,
808        f: impl IntoQuotedMut<'a, F, L> + Copy,
809    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
810    where
811        K: Clone,
812        I: Fn() -> A + 'a,
813        F: Fn(&mut A, V) -> Generate<U> + 'a,
814    {
815        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
816        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
817        let underlying_scanned = self
818            .underlying
819            .assume_ordering(nondet!(
820                /** we do not rely on the order of keys */
821            ))
822            .scan(
823                q!(|| HashMap::new()),
824                q!(move |acc, (k, v)| {
825                    let existing_state = acc.entry(k.clone()).or_insert_with(|| Some(init()));
826                    if let Some(existing_state_value) = existing_state {
827                        match f(existing_state_value, v) {
828                            Generate::Yield(out) => Some(Some((k, out))),
829                            Generate::Return(out) => {
830                                let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
831                                Some(Some((k, out)))
832                            }
833                            Generate::Break => {
834                                let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
835                                Some(None)
836                            }
837                            Generate::Continue => Some(None),
838                        }
839                    } else {
840                        Some(None)
841                    }
842                }),
843            )
844            .flatten_ordered();
845
846        KeyedStream {
847            underlying: underlying_scanned.into(),
848            _phantom_order: Default::default(),
849        }
850    }
851
852    /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
853    /// in-order across the values in each group. But the aggregation function returns a boolean,
854    /// which when true indicates that the aggregated result is complete and can be released to
855    /// downstream computation. Unlike [`Stream::fold_keyed`], this means that even if the input
856    /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
857    /// normal stream elements.
858    ///
859    /// # Example
860    /// ```rust
861    /// # use hydro_lang::prelude::*;
862    /// # use futures::StreamExt;
863    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
864    /// process
865    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
866    ///     .into_keyed()
867    ///     .fold_early_stop(
868    ///         q!(|| 0),
869    ///         q!(|acc, x| {
870    ///             *acc += x;
871    ///             x % 2 == 0
872    ///         }),
873    ///     )
874    /// #   .entries()
875    /// # }, |mut stream| async move {
876    /// // Output: { 0: 2, 1: 9 }
877    /// # for w in vec![(0, 2), (1, 9)] {
878    /// #     assert_eq!(stream.next().await.unwrap(), w);
879    /// # }
880    /// # }));
881    /// ```
882    pub fn fold_early_stop<A, I, F>(
883        self,
884        init: impl IntoQuotedMut<'a, I, L> + Copy,
885        f: impl IntoQuotedMut<'a, F, L> + Copy,
886    ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
887    where
888        K: Clone,
889        I: Fn() -> A + 'a,
890        F: Fn(&mut A, V) -> bool + 'a,
891    {
892        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
893        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
894        let out_without_bound_cast = self
895            .generator(
896                q!(move || Some(init())),
897                q!(move |key_state, v| {
898                    if let Some(key_state_value) = key_state.as_mut() {
899                        if f(key_state_value, v) {
900                            Generate::Return(key_state.take().unwrap())
901                        } else {
902                            Generate::Continue
903                        }
904                    } else {
905                        unreachable!()
906                    }
907                }),
908            )
909            .underlying;
910
911        KeyedSingleton {
912            underlying: out_without_bound_cast,
913        }
914    }
915
916    /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
917    /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
918    /// otherwise the first element would be non-deterministic.
919    ///
920    /// # Example
921    /// ```rust
922    /// # use hydro_lang::prelude::*;
923    /// # use futures::StreamExt;
924    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
925    /// process
926    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
927    ///     .into_keyed()
928    ///     .first()
929    /// #   .entries()
930    /// # }, |mut stream| async move {
931    /// // Output: { 0: 2, 1: 3 }
932    /// # for w in vec![(0, 2), (1, 3)] {
933    /// #     assert_eq!(stream.next().await.unwrap(), w);
934    /// # }
935    /// # }));
936    /// ```
937    pub fn first(self) -> KeyedSingleton<K, V, L, B::WhenValueBounded>
938    where
939        K: Clone,
940    {
941        self.fold_early_stop(
942            q!(|| None),
943            q!(|acc, v| {
944                *acc = Some(v);
945                true
946            }),
947        )
948        .map(q!(|v| v.unwrap()))
949    }
950
951    /// Like [`Stream::fold`], aggregates the values in each group via the `comb` closure.
952    ///
953    /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
954    /// to depend on the order of elements in the group.
955    ///
956    /// If the input and output value types are the same and do not require initialization then use
957    /// [`KeyedStream::reduce`].
958    ///
959    /// # Example
960    /// ```rust
961    /// # use hydro_lang::prelude::*;
962    /// # use futures::StreamExt;
963    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
964    /// let tick = process.tick();
965    /// let numbers = process
966    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
967    ///     .into_keyed();
968    /// let batch = numbers.batch(&tick, nondet!(/** test */));
969    /// batch
970    ///     .fold(q!(|| 0), q!(|acc, x| *acc += x))
971    ///     .entries()
972    ///     .all_ticks()
973    /// # }, |mut stream| async move {
974    /// // (1, 5), (2, 7)
975    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
976    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
977    /// # }));
978    /// ```
979    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
980        self,
981        init: impl IntoQuotedMut<'a, I, L>,
982        comb: impl IntoQuotedMut<'a, F, L>,
983    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
984        let init = init.splice_fn0_ctx(&self.underlying.location).into();
985        let comb = comb
986            .splice_fn2_borrow_mut_ctx(&self.underlying.location)
987            .into();
988
989        let out_ir = HydroNode::FoldKeyed {
990            init,
991            acc: comb,
992            input: Box::new(self.underlying.ir_node.into_inner()),
993            metadata: self.underlying.location.new_node_metadata::<(K, A)>(),
994        };
995
996        KeyedSingleton {
997            underlying: Stream::new(self.underlying.location, out_ir),
998        }
999    }
1000
1001    /// Like [`Stream::reduce`], aggregates the values in each group via the `comb` closure.
1002    ///
1003    /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1004    /// to depend on the order of elements in the stream.
1005    ///
1006    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1007    ///
1008    /// # Example
1009    /// ```rust
1010    /// # use hydro_lang::prelude::*;
1011    /// # use futures::StreamExt;
1012    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1013    /// let tick = process.tick();
1014    /// let numbers = process
1015    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1016    ///     .into_keyed();
1017    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1018    /// batch.reduce(q!(|acc, x| *acc += x)).entries().all_ticks()
1019    /// # }, |mut stream| async move {
1020    /// // (1, 5), (2, 7)
1021    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1022    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1023    /// # }));
1024    /// ```
1025    pub fn reduce<F: Fn(&mut V, V) + 'a>(
1026        self,
1027        comb: impl IntoQuotedMut<'a, F, L>,
1028    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1029        let f = comb
1030            .splice_fn2_borrow_mut_ctx(&self.underlying.location)
1031            .into();
1032
1033        let out_ir = HydroNode::ReduceKeyed {
1034            f,
1035            input: Box::new(self.underlying.ir_node.into_inner()),
1036            metadata: self.underlying.location.new_node_metadata::<(K, V)>(),
1037        };
1038
1039        KeyedSingleton {
1040            underlying: Stream::new(self.underlying.location, out_ir),
1041        }
1042    }
1043
1044    /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark are automatically deleted.
1045    ///
1046    /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1047    /// to depend on the order of elements in the stream.
1048    ///
1049    /// # Example
1050    /// ```rust
1051    /// # use hydro_lang::prelude::*;
1052    /// # use futures::StreamExt;
1053    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1054    /// let tick = process.tick();
1055    /// let watermark = tick.singleton(q!(1));
1056    /// let numbers = process
1057    ///     .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1058    ///     .into_keyed();
1059    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1060    /// batch
1061    ///     .reduce_watermark(watermark, q!(|acc, x| *acc += x))
1062    ///     .entries()
1063    ///     .all_ticks()
1064    /// # }, |mut stream| async move {
1065    /// // (2, 204)
1066    /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
1067    /// # }));
1068    /// ```
1069    pub fn reduce_watermark<O, F>(
1070        self,
1071        other: impl Into<Optional<O, Tick<L::Root>, Bounded>>,
1072        comb: impl IntoQuotedMut<'a, F, L>,
1073    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1074    where
1075        O: Clone,
1076        F: Fn(&mut V, V) + 'a,
1077    {
1078        let other: Optional<O, Tick<L::Root>, Bounded> = other.into();
1079        check_matching_location(&self.underlying.location.root(), other.location.outer());
1080        let f = comb
1081            .splice_fn2_borrow_mut_ctx(&self.underlying.location)
1082            .into();
1083
1084        let out_ir = Stream::new(
1085            self.underlying.location.clone(),
1086            HydroNode::ReduceKeyedWatermark {
1087                f,
1088                input: Box::new(self.underlying.ir_node.into_inner()),
1089                watermark: Box::new(other.ir_node.into_inner()),
1090                metadata: self.underlying.location.new_node_metadata::<(K, V)>(),
1091            },
1092        );
1093
1094        KeyedSingleton { underlying: out_ir }
1095    }
1096}
1097
1098impl<'a, K, V, L, B: Boundedness, O: Ordering> KeyedStream<K, V, L, B, O, ExactlyOnce>
1099where
1100    K: Eq + Hash,
1101    L: Location<'a>,
1102{
1103    /// Like [`Stream::fold_commutative`], aggregates the values in each group via the `comb` closure.
1104    ///
1105    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1106    ///
1107    /// If the input and output value types are the same and do not require initialization then use
1108    /// [`KeyedStream::reduce_commutative`].
1109    ///
1110    /// # Example
1111    /// ```rust
1112    /// # use hydro_lang::prelude::*;
1113    /// # use futures::StreamExt;
1114    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1115    /// let tick = process.tick();
1116    /// let numbers = process
1117    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1118    ///     .into_keyed();
1119    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1120    /// batch
1121    ///     .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1122    ///     .entries()
1123    ///     .all_ticks()
1124    /// # }, |mut stream| async move {
1125    /// // (1, 5), (2, 7)
1126    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1127    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1128    /// # }));
1129    /// ```
1130    pub fn fold_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1131        self,
1132        init: impl IntoQuotedMut<'a, I, L>,
1133        comb: impl IntoQuotedMut<'a, F, L>,
1134    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1135        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1136            .fold(init, comb)
1137    }
1138
1139    /// Like [`Stream::reduce_commutative`], aggregates the values in each group via the `comb` closure.
1140    ///
1141    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1142    ///
1143    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative`].
1144    ///
1145    /// # Example
1146    /// ```rust
1147    /// # use hydro_lang::prelude::*;
1148    /// # use futures::StreamExt;
1149    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1150    /// let tick = process.tick();
1151    /// let numbers = process
1152    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
1153    ///     .into_keyed();
1154    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1155    /// batch
1156    ///     .reduce_commutative(q!(|acc, x| *acc += x))
1157    ///     .entries()
1158    ///     .all_ticks()
1159    /// # }, |mut stream| async move {
1160    /// // (1, 5), (2, 7)
1161    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1162    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1163    /// # }));
1164    /// ```
1165    pub fn reduce_commutative<F: Fn(&mut V, V) + 'a>(
1166        self,
1167        comb: impl IntoQuotedMut<'a, F, L>,
1168    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1169        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1170            .reduce(comb)
1171    }
1172
1173    /// A special case of [`KeyedStream::reduce_commutative`] where tuples with keys less than the watermark are automatically deleted.
1174    ///
1175    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1176    ///
1177    /// # Example
1178    /// ```rust
1179    /// # use hydro_lang::prelude::*;
1180    /// # use futures::StreamExt;
1181    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1182    /// let tick = process.tick();
1183    /// let watermark = tick.singleton(q!(1));
1184    /// let numbers = process
1185    ///     .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1186    ///     .into_keyed();
1187    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1188    /// batch
1189    ///     .reduce_watermark_commutative(watermark, q!(|acc, x| *acc += x))
1190    ///     .entries()
1191    ///     .all_ticks()
1192    /// # }, |mut stream| async move {
1193    /// // (2, 204)
1194    /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
1195    /// # }));
1196    /// ```
1197    pub fn reduce_watermark_commutative<O2, F>(
1198        self,
1199        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1200        comb: impl IntoQuotedMut<'a, F, L>,
1201    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1202    where
1203        O2: Clone,
1204        F: Fn(&mut V, V) + 'a,
1205    {
1206        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1207            .reduce_watermark(other, comb)
1208    }
1209}
1210
1211impl<'a, K, V, L, B: Boundedness, R: Retries> KeyedStream<K, V, L, B, TotalOrder, R>
1212where
1213    K: Eq + Hash,
1214    L: Location<'a>,
1215{
1216    /// Like [`Stream::fold_idempotent`], aggregates the values in each group via the `comb` closure.
1217    ///
1218    /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
1219    ///
1220    /// If the input and output value types are the same and do not require initialization then use
1221    /// [`KeyedStream::reduce_idempotent`].
1222    ///
1223    /// # Example
1224    /// ```rust
1225    /// # use hydro_lang::prelude::*;
1226    /// # use futures::StreamExt;
1227    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1228    /// let tick = process.tick();
1229    /// let numbers = process
1230    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1231    ///     .into_keyed();
1232    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1233    /// batch
1234    ///     .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1235    ///     .entries()
1236    ///     .all_ticks()
1237    /// # }, |mut stream| async move {
1238    /// // (1, false), (2, true)
1239    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1240    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1241    /// # }));
1242    /// ```
1243    pub fn fold_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1244        self,
1245        init: impl IntoQuotedMut<'a, I, L>,
1246        comb: impl IntoQuotedMut<'a, F, L>,
1247    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1248        self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1249            .fold(init, comb)
1250    }
1251
1252    /// Like [`Stream::reduce_idempotent`], aggregates the values in each group via the `comb` closure.
1253    ///
1254    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1255    ///
1256    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_idempotent`].
1257    ///
1258    /// # Example
1259    /// ```rust
1260    /// # use hydro_lang::prelude::*;
1261    /// # use futures::StreamExt;
1262    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1263    /// let tick = process.tick();
1264    /// let numbers = process
1265    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1266    ///     .into_keyed();
1267    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1268    /// batch
1269    ///     .reduce_idempotent(q!(|acc, x| *acc |= x))
1270    ///     .entries()
1271    ///     .all_ticks()
1272    /// # }, |mut stream| async move {
1273    /// // (1, false), (2, true)
1274    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1275    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1276    /// # }));
1277    /// ```
1278    pub fn reduce_idempotent<F: Fn(&mut V, V) + 'a>(
1279        self,
1280        comb: impl IntoQuotedMut<'a, F, L>,
1281    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1282        self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1283            .reduce(comb)
1284    }
1285
1286    /// A special case of [`KeyedStream::reduce_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1287    ///
1288    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1289    ///
1290    /// # Example
1291    /// ```rust
1292    /// # use hydro_lang::prelude::*;
1293    /// # use futures::StreamExt;
1294    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1295    /// let tick = process.tick();
1296    /// let watermark = tick.singleton(q!(1));
1297    /// let numbers = process
1298    ///     .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1299    ///     .into_keyed();
1300    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1301    /// batch
1302    ///     .reduce_watermark_idempotent(watermark, q!(|acc, x| *acc |= x))
1303    ///     .entries()
1304    ///     .all_ticks()
1305    /// # }, |mut stream| async move {
1306    /// // (2, true)
1307    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1308    /// # }));
1309    /// ```
1310    pub fn reduce_watermark_idempotent<O2, F>(
1311        self,
1312        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1313        comb: impl IntoQuotedMut<'a, F, L>,
1314    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1315    where
1316        O2: Clone,
1317        F: Fn(&mut V, V) + 'a,
1318    {
1319        self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1320            .reduce_watermark(other, comb)
1321    }
1322}
1323
1324impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1325where
1326    K: Eq + Hash,
1327    L: Location<'a>,
1328{
1329    /// Like [`Stream::fold_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1330    ///
1331    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1332    /// as there may be non-deterministic duplicates.
1333    ///
1334    /// If the input and output value types are the same and do not require initialization then use
1335    /// [`KeyedStream::reduce_commutative_idempotent`].
1336    ///
1337    /// # Example
1338    /// ```rust
1339    /// # use hydro_lang::prelude::*;
1340    /// # use futures::StreamExt;
1341    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1342    /// let tick = process.tick();
1343    /// let numbers = process
1344    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1345    ///     .into_keyed();
1346    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1347    /// batch
1348    ///     .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1349    ///     .entries()
1350    ///     .all_ticks()
1351    /// # }, |mut stream| async move {
1352    /// // (1, false), (2, true)
1353    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1354    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1355    /// # }));
1356    /// ```
1357    pub fn fold_commutative_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1358        self,
1359        init: impl IntoQuotedMut<'a, I, L>,
1360        comb: impl IntoQuotedMut<'a, F, L>,
1361    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1362        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1363            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1364            .fold(init, comb)
1365    }
1366
1367    /// Like [`Stream::reduce_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1368    ///
1369    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1370    /// as there may be non-deterministic duplicates.
1371    ///
1372    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative_idempotent`].
1373    ///
1374    /// # Example
1375    /// ```rust
1376    /// # use hydro_lang::prelude::*;
1377    /// # use futures::StreamExt;
1378    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1379    /// let tick = process.tick();
1380    /// let numbers = process
1381    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1382    ///     .into_keyed();
1383    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1384    /// batch
1385    ///     .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1386    ///     .entries()
1387    ///     .all_ticks()
1388    /// # }, |mut stream| async move {
1389    /// // (1, false), (2, true)
1390    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1391    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1392    /// # }));
1393    /// ```
1394    pub fn reduce_commutative_idempotent<F: Fn(&mut V, V) + 'a>(
1395        self,
1396        comb: impl IntoQuotedMut<'a, F, L>,
1397    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1398        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1399            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1400            .reduce(comb)
1401    }
1402
1403    /// A special case of [`Stream::reduce_keyed_commutative_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1404    ///
1405    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1406    /// as there may be non-deterministic duplicates.
1407    ///
1408    /// # Example
1409    /// ```rust
1410    /// # use hydro_lang::prelude::*;
1411    /// # use futures::StreamExt;
1412    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1413    /// let tick = process.tick();
1414    /// let watermark = tick.singleton(q!(1));
1415    /// let numbers = process
1416    ///     .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1417    ///     .into_keyed();
1418    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1419    /// batch
1420    ///     .reduce_watermark_commutative_idempotent(watermark, q!(|acc, x| *acc |= x))
1421    ///     .entries()
1422    ///     .all_ticks()
1423    /// # }, |mut stream| async move {
1424    /// // (2, true)
1425    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1426    /// # }));
1427    /// ```
1428    pub fn reduce_watermark_commutative_idempotent<O2, F>(
1429        self,
1430        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1431        comb: impl IntoQuotedMut<'a, F, L>,
1432    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1433    where
1434        O2: Clone,
1435        F: Fn(&mut V, V) + 'a,
1436    {
1437        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1438            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1439            .reduce_watermark(other, comb)
1440    }
1441
1442    /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1443    /// whose keys are not in the bounded stream.
1444    ///
1445    /// # Example
1446    /// ```rust
1447    /// # use hydro_lang::prelude::*;
1448    /// # use futures::StreamExt;
1449    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1450    /// let tick = process.tick();
1451    /// let keyed_stream = process
1452    ///     .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1453    ///     .batch(&tick, nondet!(/** test */))
1454    ///     .into_keyed();
1455    /// let keys_to_remove = process
1456    ///     .source_iter(q!(vec![1, 2]))
1457    ///     .batch(&tick, nondet!(/** test */));
1458    /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1459    /// #   .entries()
1460    /// # }, |mut stream| async move {
1461    /// // { 3: ['c'], 4: ['d'] }
1462    /// # for w in vec![(3, 'c'), (4, 'd')] {
1463    /// #     assert_eq!(stream.next().await.unwrap(), w);
1464    /// # }
1465    /// # }));
1466    /// ```
1467    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1468        self,
1469        other: Stream<K, L, Bounded, O2, R2>,
1470    ) -> Self {
1471        KeyedStream {
1472            underlying: self.entries().anti_join(other),
1473            _phantom_order: Default::default(),
1474        }
1475    }
1476}
1477
1478impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1479where
1480    L: Location<'a> + NoTick,
1481{
1482    /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
1483    /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
1484    ///
1485    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1486    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1487    /// argument that declares where the stream will be atomically processed. Batching a stream into
1488    /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
1489    /// [`Tick`] will introduce asynchrony.
1490    pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
1491        KeyedStream {
1492            underlying: self.underlying.atomic(tick),
1493            _phantom_order: Default::default(),
1494        }
1495    }
1496
1497    /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
1498    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1499    /// the order of the input.
1500    ///
1501    /// # Non-Determinism
1502    /// The batch boundaries are non-deterministic and may change across executions.
1503    pub fn batch(
1504        self,
1505        tick: &Tick<L>,
1506        nondet: NonDet,
1507    ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1508        KeyedStream {
1509            underlying: self.underlying.batch(tick, nondet),
1510            _phantom_order: Default::default(),
1511        }
1512    }
1513}
1514
1515impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
1516where
1517    L: Location<'a> + NoTick,
1518{
1519    /// Returns a keyed stream corresponding to the latest batch of elements being atomically
1520    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
1521    /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
1522    /// used to create the atomic section.
1523    ///
1524    /// # Non-Determinism
1525    /// The batch boundaries are non-deterministic and may change across executions.
1526    pub fn batch_atomic(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1527        KeyedStream {
1528            underlying: self.underlying.batch_atomic(nondet),
1529            _phantom_order: Default::default(),
1530        }
1531    }
1532
1533    /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
1534    /// See [`KeyedStream::atomic`] for more details.
1535    pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
1536        KeyedStream {
1537            underlying: self.underlying.end_atomic(),
1538            _phantom_order: Default::default(),
1539        }
1540    }
1541}
1542
1543impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, L, Bounded, O, R>
1544where
1545    L: Location<'a>,
1546{
1547    /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
1548    /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
1549    /// is only present in one of the inputs, its values are passed through as-is). The output has
1550    /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
1551    ///
1552    /// Currently, both input streams must be [`Bounded`]. This operator will block
1553    /// on the first stream until all its elements are available. In a future version,
1554    /// we will relax the requirement on the `other` stream.
1555    ///
1556    /// # Example
1557    /// ```rust
1558    /// # use hydro_lang::prelude::*;
1559    /// # use futures::StreamExt;
1560    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1561    /// let tick = process.tick();
1562    /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
1563    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1564    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1565    /// # .entries()
1566    /// # }, |mut stream| async move {
1567    /// // { 0: [2, 1], 1: [4, 3] }
1568    /// # for w in vec![(0, 2), (1, 4), (0, 1), (1, 3)] {
1569    /// #     assert_eq!(stream.next().await.unwrap(), w);
1570    /// # }
1571    /// # }));
1572    /// ```
1573    pub fn chain<O2: Ordering>(
1574        self,
1575        other: KeyedStream<K, V, L, Bounded, O2, R>,
1576    ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, R>
1577    where
1578        O: MinOrder<O2>,
1579    {
1580        KeyedStream {
1581            underlying: self.underlying.chain(other.underlying),
1582            _phantom_order: Default::default(),
1583        }
1584    }
1585}
1586
1587impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
1588where
1589    L: Location<'a>,
1590{
1591    /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
1592    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
1593    /// each key.
1594    pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
1595        KeyedStream {
1596            underlying: self.underlying.all_ticks(),
1597            _phantom_order: Default::default(),
1598        }
1599    }
1600
1601    /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
1602    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
1603    /// each key.
1604    ///
1605    /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
1606    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1607    /// stream's [`Tick`] context.
1608    pub fn all_ticks_atomic(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
1609        KeyedStream {
1610            underlying: self.underlying.all_ticks(),
1611            _phantom_order: Default::default(),
1612        }
1613    }
1614
1615    #[expect(missing_docs, reason = "TODO")]
1616    pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1617        KeyedStream {
1618            underlying: self.underlying.defer_tick(),
1619            _phantom_order: Default::default(),
1620        }
1621    }
1622}
1623
1624#[cfg(test)]
1625mod tests {
1626    use futures::{SinkExt, StreamExt};
1627    use hydro_deploy::Deployment;
1628    use stageleft::q;
1629
1630    use crate::compile::builder::FlowBuilder;
1631    use crate::location::Location;
1632    use crate::nondet::nondet;
1633
1634    #[tokio::test]
1635    async fn reduce_watermark_filter() {
1636        let mut deployment = Deployment::new();
1637
1638        let flow = FlowBuilder::new();
1639        let node = flow.process::<()>();
1640        let external = flow.external::<()>();
1641
1642        let node_tick = node.tick();
1643        let watermark = node_tick.singleton(q!(1));
1644
1645        let sum = node
1646            .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1647            .into_keyed()
1648            .reduce_watermark(
1649                watermark,
1650                q!(|acc, v| {
1651                    *acc += v;
1652                }),
1653            )
1654            .snapshot(&node_tick, nondet!(/** test */))
1655            .entries()
1656            .all_ticks()
1657            .send_bincode_external(&external);
1658
1659        let nodes = flow
1660            .with_process(&node, deployment.Localhost())
1661            .with_external(&external, deployment.Localhost())
1662            .deploy(&mut deployment);
1663
1664        deployment.deploy().await.unwrap();
1665
1666        let mut out = nodes.connect_source_bincode(sum).await;
1667
1668        deployment.start().await.unwrap();
1669
1670        assert_eq!(out.next().await.unwrap(), (2, 204));
1671    }
1672
1673    #[tokio::test]
1674    async fn reduce_watermark_garbage_collect() {
1675        let mut deployment = Deployment::new();
1676
1677        let flow = FlowBuilder::new();
1678        let node = flow.process::<()>();
1679        let external = flow.external::<()>();
1680        let (tick_send, tick_trigger) = node.source_external_bincode(&external);
1681
1682        let node_tick = node.tick();
1683        let (watermark_complete_cycle, watermark) =
1684            node_tick.cycle_with_initial(node_tick.singleton(q!(1)));
1685        let next_watermark = watermark.clone().map(q!(|v| v + 1));
1686        watermark_complete_cycle.complete_next_tick(next_watermark);
1687
1688        let tick_triggered_input = node
1689            .source_iter(q!([(3, 103)]))
1690            .batch(&node_tick, nondet!(/** test */))
1691            .filter_if_some(
1692                tick_trigger
1693                    .clone()
1694                    .batch(&node_tick, nondet!(/** test */))
1695                    .first(),
1696            )
1697            .all_ticks();
1698
1699        let sum = node
1700            .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1701            .interleave(tick_triggered_input)
1702            .into_keyed()
1703            .reduce_watermark_commutative(
1704                watermark,
1705                q!(|acc, v| {
1706                    *acc += v;
1707                }),
1708            )
1709            .snapshot(&node_tick, nondet!(/** test */))
1710            .entries()
1711            .all_ticks()
1712            .send_bincode_external(&external);
1713
1714        let nodes = flow
1715            .with_default_optimize()
1716            .with_process(&node, deployment.Localhost())
1717            .with_external(&external, deployment.Localhost())
1718            .deploy(&mut deployment);
1719
1720        deployment.deploy().await.unwrap();
1721
1722        let mut tick_send = nodes.connect_sink_bincode(tick_send).await;
1723        let mut out_recv = nodes.connect_source_bincode(sum).await;
1724
1725        deployment.start().await.unwrap();
1726
1727        assert_eq!(out_recv.next().await.unwrap(), (2, 204));
1728
1729        tick_send.send(()).await.unwrap();
1730
1731        assert_eq!(out_recv.next().await.unwrap(), (3, 103));
1732    }
1733}