hydro_lang/live_collections/stream/mod.rs
1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11use syn::parse_quote;
12use tokio::time::Instant;
13
14use super::boundedness::{Bounded, Boundedness, Unbounded};
15use super::keyed_singleton::KeyedSingleton;
16use super::keyed_stream::KeyedStream;
17use super::optional::Optional;
18use super::singleton::Singleton;
19use crate::compile::ir::{HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
20#[cfg(stageleft_runtime)]
21use crate::forward_handle::{CycleCollection, ReceiverComplete};
22use crate::forward_handle::{ForwardRef, TickCycle};
23#[cfg(stageleft_runtime)]
24use crate::location::dynamic::{DynLocation, LocationId};
25use crate::location::tick::{Atomic, DeferTick, NoAtomic};
26use crate::location::{Location, NoTick, Tick, check_matching_location};
27use crate::nondet::{NonDet, nondet};
28
29pub mod networking;
30
31/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
32#[sealed::sealed]
33pub trait Ordering:
34 MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
35{
36}
37
38/// Marks the stream as being totally ordered, which means that there are
39/// no sources of non-determinism (other than intentional ones) that will
40/// affect the order of elements.
41pub enum TotalOrder {}
42
43#[sealed::sealed]
44impl Ordering for TotalOrder {}
45
46/// Marks the stream as having no order, which means that the order of
47/// elements may be affected by non-determinism.
48///
49/// This restricts certain operators, such as `fold` and `reduce`, to only
50/// be used with commutative aggregation functions.
51pub enum NoOrder {}
52
53#[sealed::sealed]
54impl Ordering for NoOrder {}
55
56/// Helper trait for determining the weakest of two orderings.
57#[sealed::sealed]
58pub trait MinOrder<Other: ?Sized> {
59 /// The weaker of the two orderings.
60 type Min: Ordering;
61}
62
63#[sealed::sealed]
64impl MinOrder<NoOrder> for TotalOrder {
65 type Min = NoOrder;
66}
67
68#[sealed::sealed]
69impl MinOrder<TotalOrder> for TotalOrder {
70 type Min = TotalOrder;
71}
72
73#[sealed::sealed]
74impl MinOrder<TotalOrder> for NoOrder {
75 type Min = NoOrder;
76}
77
78#[sealed::sealed]
79impl MinOrder<NoOrder> for NoOrder {
80 type Min = NoOrder;
81}
82
83/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
84#[sealed::sealed]
85pub trait Retries:
86 MinRetries<Self, Min = Self>
87 + MinRetries<ExactlyOnce, Min = Self>
88 + MinRetries<AtLeastOnce, Min = AtLeastOnce>
89{
90}
91
92/// Marks the stream as having deterministic message cardinality, with no
93/// possibility of duplicates.
94pub enum ExactlyOnce {}
95
96#[sealed::sealed]
97impl Retries for ExactlyOnce {}
98
99/// Marks the stream as having non-deterministic message cardinality, which
100/// means that duplicates may occur, but messages will not be dropped.
101pub enum AtLeastOnce {}
102
103#[sealed::sealed]
104impl Retries for AtLeastOnce {}
105
106/// Helper trait for determining the weakest of two retry guarantees.
107#[sealed::sealed]
108pub trait MinRetries<Other: ?Sized> {
109 /// The weaker of the two retry guarantees.
110 type Min: Retries;
111}
112
113#[sealed::sealed]
114impl MinRetries<AtLeastOnce> for ExactlyOnce {
115 type Min = AtLeastOnce;
116}
117
118#[sealed::sealed]
119impl MinRetries<ExactlyOnce> for ExactlyOnce {
120 type Min = ExactlyOnce;
121}
122
123#[sealed::sealed]
124impl MinRetries<ExactlyOnce> for AtLeastOnce {
125 type Min = AtLeastOnce;
126}
127
128#[sealed::sealed]
129impl MinRetries<AtLeastOnce> for AtLeastOnce {
130 type Min = AtLeastOnce;
131}
132
133/// Streaming sequence of elements with type `Type`.
134///
135/// This live collection represents a growing sequence of elements, with new elements being
136/// asynchronously appended to the end of the sequence. This can be used to model the arrival
137/// of network input, such as API requests, or streaming ingestion.
138///
139/// By default, all streams have deterministic ordering and each element is materialized exactly
140/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
141/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
142/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
143///
144/// Type Parameters:
145/// - `Type`: the type of elements in the stream
146/// - `Loc`: the location where the stream is being materialized
147/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
148/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
149/// (default is [`TotalOrder`])
150/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
151/// [`AtLeastOnce`] (default is [`ExactlyOnce`])
152pub struct Stream<
153 Type,
154 Loc,
155 Bound: Boundedness,
156 Order: Ordering = TotalOrder,
157 Retry: Retries = ExactlyOnce,
158> {
159 pub(crate) location: Loc,
160 pub(crate) ir_node: RefCell<HydroNode>,
161
162 _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
163}
164
165impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
166 for Stream<T, L, Unbounded, O, R>
167where
168 L: Location<'a>,
169{
170 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
171 Stream {
172 location: stream.location,
173 ir_node: stream.ir_node,
174 _phantom: PhantomData,
175 }
176 }
177}
178
179impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
180 for Stream<T, L, B, NoOrder, R>
181where
182 L: Location<'a>,
183{
184 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
185 Stream {
186 location: stream.location,
187 ir_node: stream.ir_node,
188 _phantom: PhantomData,
189 }
190 }
191}
192
193impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
194 for Stream<T, L, B, O, AtLeastOnce>
195where
196 L: Location<'a>,
197{
198 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
199 Stream {
200 location: stream.location,
201 ir_node: stream.ir_node,
202 _phantom: PhantomData,
203 }
204 }
205}
206
207impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
208where
209 L: Location<'a>,
210{
211 fn defer_tick(self) -> Self {
212 Stream::defer_tick(self)
213 }
214}
215
216impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
217 for Stream<T, Tick<L>, Bounded, O, R>
218where
219 L: Location<'a>,
220{
221 type Location = Tick<L>;
222
223 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
224 Stream::new(
225 location.clone(),
226 HydroNode::CycleSource {
227 ident,
228 metadata: location.new_node_metadata::<T>(),
229 },
230 )
231 }
232}
233
234impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
235 for Stream<T, Tick<L>, Bounded, O, R>
236where
237 L: Location<'a>,
238{
239 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
240 assert_eq!(
241 Location::id(&self.location),
242 expected_location,
243 "locations do not match"
244 );
245 self.location
246 .flow_state()
247 .borrow_mut()
248 .push_root(HydroRoot::CycleSink {
249 ident,
250 input: Box::new(self.ir_node.into_inner()),
251 out_location: Location::id(&self.location),
252 op_metadata: HydroIrOpMetadata::new(),
253 });
254 }
255}
256
257impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
258 for Stream<T, L, B, O, R>
259where
260 L: Location<'a> + NoTick,
261{
262 type Location = L;
263
264 fn create_source(ident: syn::Ident, location: L) -> Self {
265 Stream::new(
266 location.clone(),
267 HydroNode::CycleSource {
268 ident,
269 metadata: location.new_node_metadata::<T>(),
270 },
271 )
272 }
273}
274
275impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
276 for Stream<T, L, B, O, R>
277where
278 L: Location<'a> + NoTick,
279{
280 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
281 assert_eq!(
282 Location::id(&self.location),
283 expected_location,
284 "locations do not match"
285 );
286 self.location
287 .flow_state()
288 .borrow_mut()
289 .push_root(HydroRoot::CycleSink {
290 ident,
291 input: Box::new(self.ir_node.into_inner()),
292 out_location: Location::id(&self.location),
293 op_metadata: HydroIrOpMetadata::new(),
294 });
295 }
296}
297
298impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
299where
300 T: Clone,
301 L: Location<'a>,
302{
303 fn clone(&self) -> Self {
304 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
305 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
306 *self.ir_node.borrow_mut() = HydroNode::Tee {
307 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
308 metadata: self.location.new_node_metadata::<T>(),
309 };
310 }
311
312 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
313 Stream {
314 location: self.location.clone(),
315 ir_node: HydroNode::Tee {
316 inner: TeeNode(inner.0.clone()),
317 metadata: metadata.clone(),
318 }
319 .into(),
320 _phantom: PhantomData,
321 }
322 } else {
323 unreachable!()
324 }
325 }
326}
327
328impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
329where
330 L: Location<'a>,
331{
332 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
333 debug_assert_eq!(&Location::id(&location), &ir_node.metadata().location_kind);
334 Stream {
335 location,
336 ir_node: RefCell::new(ir_node),
337 _phantom: PhantomData,
338 }
339 }
340
341 /// Produces a stream based on invoking `f` on each element.
342 /// If you do not want to modify the stream and instead only want to view
343 /// each item use [`Stream::inspect`] instead.
344 ///
345 /// # Example
346 /// ```rust
347 /// # use hydro_lang::prelude::*;
348 /// # use futures::StreamExt;
349 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
350 /// let words = process.source_iter(q!(vec!["hello", "world"]));
351 /// words.map(q!(|x| x.to_uppercase()))
352 /// # }, |mut stream| async move {
353 /// # for w in vec!["HELLO", "WORLD"] {
354 /// # assert_eq!(stream.next().await.unwrap(), w);
355 /// # }
356 /// # }));
357 /// ```
358 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
359 where
360 F: Fn(T) -> U + 'a,
361 {
362 let f = f.splice_fn1_ctx(&self.location).into();
363 Stream::new(
364 self.location.clone(),
365 HydroNode::Map {
366 f,
367 input: Box::new(self.ir_node.into_inner()),
368 metadata: self.location.new_node_metadata::<U>(),
369 },
370 )
371 }
372
373 /// For each item `i` in the input stream, transform `i` using `f` and then treat the
374 /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
375 /// for the output type `U` must produce items in a **deterministic** order.
376 ///
377 /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
378 /// not deterministic, use [`Stream::flat_map_unordered`] instead.
379 ///
380 /// # Example
381 /// ```rust
382 /// # use hydro_lang::prelude::*;
383 /// # use futures::StreamExt;
384 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
385 /// process
386 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
387 /// .flat_map_ordered(q!(|x| x))
388 /// # }, |mut stream| async move {
389 /// // 1, 2, 3, 4
390 /// # for w in (1..5) {
391 /// # assert_eq!(stream.next().await.unwrap(), w);
392 /// # }
393 /// # }));
394 /// ```
395 pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
396 where
397 I: IntoIterator<Item = U>,
398 F: Fn(T) -> I + 'a,
399 {
400 let f = f.splice_fn1_ctx(&self.location).into();
401 Stream::new(
402 self.location.clone(),
403 HydroNode::FlatMap {
404 f,
405 input: Box::new(self.ir_node.into_inner()),
406 metadata: self.location.new_node_metadata::<U>(),
407 },
408 )
409 }
410
411 /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
412 /// for the output type `U` to produce items in any order.
413 ///
414 /// # Example
415 /// ```rust
416 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
417 /// # use futures::StreamExt;
418 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
419 /// process
420 /// .source_iter(q!(vec![
421 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
422 /// std::collections::HashSet::from_iter(vec![3, 4]),
423 /// ]))
424 /// .flat_map_unordered(q!(|x| x))
425 /// # }, |mut stream| async move {
426 /// // 1, 2, 3, 4, but in no particular order
427 /// # let mut results = Vec::new();
428 /// # for w in (1..5) {
429 /// # results.push(stream.next().await.unwrap());
430 /// # }
431 /// # results.sort();
432 /// # assert_eq!(results, vec![1, 2, 3, 4]);
433 /// # }));
434 /// ```
435 pub fn flat_map_unordered<U, I, F>(
436 self,
437 f: impl IntoQuotedMut<'a, F, L>,
438 ) -> Stream<U, L, B, NoOrder, R>
439 where
440 I: IntoIterator<Item = U>,
441 F: Fn(T) -> I + 'a,
442 {
443 let f = f.splice_fn1_ctx(&self.location).into();
444 Stream::new(
445 self.location.clone(),
446 HydroNode::FlatMap {
447 f,
448 input: Box::new(self.ir_node.into_inner()),
449 metadata: self.location.new_node_metadata::<U>(),
450 },
451 )
452 }
453
454 /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
455 /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
456 ///
457 /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
458 /// not deterministic, use [`Stream::flatten_unordered`] instead.
459 ///
460 /// ```rust
461 /// # use hydro_lang::prelude::*;
462 /// # use futures::StreamExt;
463 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
464 /// process
465 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
466 /// .flatten_ordered()
467 /// # }, |mut stream| async move {
468 /// // 1, 2, 3, 4
469 /// # for w in (1..5) {
470 /// # assert_eq!(stream.next().await.unwrap(), w);
471 /// # }
472 /// # }));
473 /// ```
474 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
475 where
476 T: IntoIterator<Item = U>,
477 {
478 self.flat_map_ordered(q!(|d| d))
479 }
480
481 /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
482 /// for the element type `T` to produce items in any order.
483 ///
484 /// # Example
485 /// ```rust
486 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
487 /// # use futures::StreamExt;
488 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
489 /// process
490 /// .source_iter(q!(vec![
491 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
492 /// std::collections::HashSet::from_iter(vec![3, 4]),
493 /// ]))
494 /// .flatten_unordered()
495 /// # }, |mut stream| async move {
496 /// // 1, 2, 3, 4, but in no particular order
497 /// # let mut results = Vec::new();
498 /// # for w in (1..5) {
499 /// # results.push(stream.next().await.unwrap());
500 /// # }
501 /// # results.sort();
502 /// # assert_eq!(results, vec![1, 2, 3, 4]);
503 /// # }));
504 /// ```
505 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
506 where
507 T: IntoIterator<Item = U>,
508 {
509 self.flat_map_unordered(q!(|d| d))
510 }
511
512 /// Creates a stream containing only the elements of the input stream that satisfy a predicate
513 /// `f`, preserving the order of the elements.
514 ///
515 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
516 /// not modify or take ownership of the values. If you need to modify the values while filtering
517 /// use [`Stream::filter_map`] instead.
518 ///
519 /// # Example
520 /// ```rust
521 /// # use hydro_lang::prelude::*;
522 /// # use futures::StreamExt;
523 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
524 /// process
525 /// .source_iter(q!(vec![1, 2, 3, 4]))
526 /// .filter(q!(|&x| x > 2))
527 /// # }, |mut stream| async move {
528 /// // 3, 4
529 /// # for w in (3..5) {
530 /// # assert_eq!(stream.next().await.unwrap(), w);
531 /// # }
532 /// # }));
533 /// ```
534 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<T, L, B, O, R>
535 where
536 F: Fn(&T) -> bool + 'a,
537 {
538 let f = f.splice_fn1_borrow_ctx(&self.location).into();
539 Stream::new(
540 self.location.clone(),
541 HydroNode::Filter {
542 f,
543 input: Box::new(self.ir_node.into_inner()),
544 metadata: self.location.new_node_metadata::<T>(),
545 },
546 )
547 }
548
549 /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
550 ///
551 /// # Example
552 /// ```rust
553 /// # use hydro_lang::prelude::*;
554 /// # use futures::StreamExt;
555 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
556 /// process
557 /// .source_iter(q!(vec!["1", "hello", "world", "2"]))
558 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
559 /// # }, |mut stream| async move {
560 /// // 1, 2
561 /// # for w in (1..3) {
562 /// # assert_eq!(stream.next().await.unwrap(), w);
563 /// # }
564 /// # }));
565 /// ```
566 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
567 where
568 F: Fn(T) -> Option<U> + 'a,
569 {
570 let f = f.splice_fn1_ctx(&self.location).into();
571 Stream::new(
572 self.location.clone(),
573 HydroNode::FilterMap {
574 f,
575 input: Box::new(self.ir_node.into_inner()),
576 metadata: self.location.new_node_metadata::<U>(),
577 },
578 )
579 }
580
581 /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
582 /// where `x` is the final value of `other`, a bounded [`Singleton`].
583 ///
584 /// # Example
585 /// ```rust
586 /// # use hydro_lang::prelude::*;
587 /// # use futures::StreamExt;
588 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
589 /// let tick = process.tick();
590 /// let batch = process
591 /// .source_iter(q!(vec![1, 2, 3, 4]))
592 /// .batch(&tick, nondet!(/** test */));
593 /// let count = batch.clone().count(); // `count()` returns a singleton
594 /// batch.cross_singleton(count).all_ticks()
595 /// # }, |mut stream| async move {
596 /// // (1, 4), (2, 4), (3, 4), (4, 4)
597 /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
598 /// # assert_eq!(stream.next().await.unwrap(), w);
599 /// # }
600 /// # }));
601 /// ```
602 pub fn cross_singleton<O2>(
603 self,
604 other: impl Into<Optional<O2, L, Bounded>>,
605 ) -> Stream<(T, O2), L, B, O, R>
606 where
607 O2: Clone,
608 {
609 let other: Optional<O2, L, Bounded> = other.into();
610 check_matching_location(&self.location, &other.location);
611
612 Stream::new(
613 self.location.clone(),
614 HydroNode::CrossSingleton {
615 left: Box::new(self.ir_node.into_inner()),
616 right: Box::new(other.ir_node.into_inner()),
617 metadata: self.location.new_node_metadata::<(T, O2)>(),
618 },
619 )
620 }
621
622 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
623 ///
624 /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
625 /// leader of a cluster.
626 ///
627 /// # Example
628 /// ```rust
629 /// # use hydro_lang::prelude::*;
630 /// # use futures::StreamExt;
631 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
632 /// let tick = process.tick();
633 /// // ticks are lazy by default, forces the second tick to run
634 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
635 ///
636 /// let batch_first_tick = process
637 /// .source_iter(q!(vec![1, 2, 3, 4]))
638 /// .batch(&tick, nondet!(/** test */));
639 /// let batch_second_tick = process
640 /// .source_iter(q!(vec![5, 6, 7, 8]))
641 /// .batch(&tick, nondet!(/** test */))
642 /// .defer_tick(); // appears on the second tick
643 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
644 /// batch_first_tick.chain(batch_second_tick)
645 /// .filter_if_some(some_on_first_tick)
646 /// .all_ticks()
647 /// # }, |mut stream| async move {
648 /// // [1, 2, 3, 4]
649 /// # for w in vec![1, 2, 3, 4] {
650 /// # assert_eq!(stream.next().await.unwrap(), w);
651 /// # }
652 /// # }));
653 /// ```
654 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
655 self.cross_singleton(signal.map(q!(|_u| ())))
656 .map(q!(|(d, _signal)| d))
657 }
658
659 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
660 ///
661 /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
662 /// some local state.
663 ///
664 /// # Example
665 /// ```rust
666 /// # use hydro_lang::prelude::*;
667 /// # use futures::StreamExt;
668 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
669 /// let tick = process.tick();
670 /// // ticks are lazy by default, forces the second tick to run
671 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
672 ///
673 /// let batch_first_tick = process
674 /// .source_iter(q!(vec![1, 2, 3, 4]))
675 /// .batch(&tick, nondet!(/** test */));
676 /// let batch_second_tick = process
677 /// .source_iter(q!(vec![5, 6, 7, 8]))
678 /// .batch(&tick, nondet!(/** test */))
679 /// .defer_tick(); // appears on the second tick
680 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
681 /// batch_first_tick.chain(batch_second_tick)
682 /// .filter_if_none(some_on_first_tick)
683 /// .all_ticks()
684 /// # }, |mut stream| async move {
685 /// // [5, 6, 7, 8]
686 /// # for w in vec![5, 6, 7, 8] {
687 /// # assert_eq!(stream.next().await.unwrap(), w);
688 /// # }
689 /// # }));
690 /// ```
691 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
692 self.filter_if_some(
693 other
694 .map(q!(|_| ()))
695 .into_singleton()
696 .filter(q!(|o| o.is_none())),
697 )
698 }
699
700 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
701 /// tupled pairs in a non-deterministic order.
702 ///
703 /// # Example
704 /// ```rust
705 /// # use hydro_lang::prelude::*;
706 /// # use std::collections::HashSet;
707 /// # use futures::StreamExt;
708 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
709 /// let tick = process.tick();
710 /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
711 /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
712 /// stream1.cross_product(stream2)
713 /// # }, |mut stream| async move {
714 /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
715 /// # stream.map(|i| assert!(expected.contains(&i)));
716 /// # }));
717 /// ```
718 pub fn cross_product<T2, O2: Ordering>(
719 self,
720 other: Stream<T2, L, B, O2, R>,
721 ) -> Stream<(T, T2), L, B, NoOrder, R>
722 where
723 T: Clone,
724 T2: Clone,
725 {
726 check_matching_location(&self.location, &other.location);
727
728 Stream::new(
729 self.location.clone(),
730 HydroNode::CrossProduct {
731 left: Box::new(self.ir_node.into_inner()),
732 right: Box::new(other.ir_node.into_inner()),
733 metadata: self.location.new_node_metadata::<(T, T2)>(),
734 },
735 )
736 }
737
738 /// Takes one stream as input and filters out any duplicate occurrences. The output
739 /// contains all unique values from the input.
740 ///
741 /// # Example
742 /// ```rust
743 /// # use hydro_lang::prelude::*;
744 /// # use futures::StreamExt;
745 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
746 /// let tick = process.tick();
747 /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
748 /// # }, |mut stream| async move {
749 /// # for w in vec![1, 2, 3, 4] {
750 /// # assert_eq!(stream.next().await.unwrap(), w);
751 /// # }
752 /// # }));
753 /// ```
754 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
755 where
756 T: Eq + Hash,
757 {
758 Stream::new(
759 self.location.clone(),
760 HydroNode::Unique {
761 input: Box::new(self.ir_node.into_inner()),
762 metadata: self.location.new_node_metadata::<T>(),
763 },
764 )
765 }
766
767 /// Outputs everything in this stream that is *not* contained in the `other` stream.
768 ///
769 /// The `other` stream must be [`Bounded`], since this function will wait until
770 /// all its elements are available before producing any output.
771 /// # Example
772 /// ```rust
773 /// # use hydro_lang::prelude::*;
774 /// # use futures::StreamExt;
775 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
776 /// let tick = process.tick();
777 /// let stream = process
778 /// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
779 /// .batch(&tick, nondet!(/** test */));
780 /// let batch = process
781 /// .source_iter(q!(vec![1, 2]))
782 /// .batch(&tick, nondet!(/** test */));
783 /// stream.filter_not_in(batch).all_ticks()
784 /// # }, |mut stream| async move {
785 /// # for w in vec![3, 4] {
786 /// # assert_eq!(stream.next().await.unwrap(), w);
787 /// # }
788 /// # }));
789 /// ```
790 pub fn filter_not_in<O2: Ordering>(
791 self,
792 other: Stream<T, L, Bounded, O2, R>,
793 ) -> Stream<T, L, Bounded, O, R>
794 where
795 T: Eq + Hash,
796 {
797 check_matching_location(&self.location, &other.location);
798
799 Stream::new(
800 self.location.clone(),
801 HydroNode::Difference {
802 pos: Box::new(self.ir_node.into_inner()),
803 neg: Box::new(other.ir_node.into_inner()),
804 metadata: self.location.new_node_metadata::<T>(),
805 },
806 )
807 }
808
809 /// An operator which allows you to "inspect" each element of a stream without
810 /// modifying it. The closure `f` is called on a reference to each item. This is
811 /// mainly useful for debugging, and should not be used to generate side-effects.
812 ///
813 /// # Example
814 /// ```rust
815 /// # use hydro_lang::prelude::*;
816 /// # use futures::StreamExt;
817 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
818 /// let nums = process.source_iter(q!(vec![1, 2]));
819 /// // prints "1 * 10 = 10" and "2 * 10 = 20"
820 /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
821 /// # }, |mut stream| async move {
822 /// # for w in vec![1, 2] {
823 /// # assert_eq!(stream.next().await.unwrap(), w);
824 /// # }
825 /// # }));
826 /// ```
827 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<T, L, B, O, R>
828 where
829 F: Fn(&T) + 'a,
830 {
831 let f = f.splice_fn1_borrow_ctx(&self.location).into();
832
833 Stream::new(
834 self.location.clone(),
835 HydroNode::Inspect {
836 f,
837 input: Box::new(self.ir_node.into_inner()),
838 metadata: self.location.new_node_metadata::<T>(),
839 },
840 )
841 }
842
843 /// An operator which allows you to "name" a `HydroNode`.
844 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
845 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
846 {
847 let mut node = self.ir_node.borrow_mut();
848 let metadata = node.metadata_mut();
849 metadata.tag = Some(name.to_string());
850 }
851 self
852 }
853
854 /// Explicitly "casts" the stream to a type with a different ordering
855 /// guarantee. Useful in unsafe code where the ordering cannot be proven
856 /// by the type-system.
857 ///
858 /// # Non-Determinism
859 /// This function is used as an escape hatch, and any mistakes in the
860 /// provided ordering guarantee will propagate into the guarantees
861 /// for the rest of the program.
862 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
863 Stream::new(self.location, self.ir_node.into_inner())
864 }
865
866 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
867 /// which is always safe because that is the weakest possible guarantee.
868 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
869 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
870 self.assume_ordering::<NoOrder>(nondet)
871 }
872
873 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
874 /// enforcing that `O2` is weaker than the input ordering guarantee.
875 pub fn weaken_ordering<O2: Ordering + MinOrder<O, Min = O2>>(self) -> Stream<T, L, B, O2, R> {
876 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
877 self.assume_ordering::<O2>(nondet)
878 }
879
880 /// Explicitly "casts" the stream to a type with a different retries
881 /// guarantee. Useful in unsafe code where the lack of retries cannot
882 /// be proven by the type-system.
883 ///
884 /// # Non-Determinism
885 /// This function is used as an escape hatch, and any mistakes in the
886 /// provided retries guarantee will propagate into the guarantees
887 /// for the rest of the program.
888 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
889 Stream::new(self.location, self.ir_node.into_inner())
890 }
891
892 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
893 /// which is always safe because that is the weakest possible guarantee.
894 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
895 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
896 self.assume_retries::<AtLeastOnce>(nondet)
897 }
898
899 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
900 /// enforcing that `R2` is weaker than the input retries guarantee.
901 pub fn weaken_retries<R2: Retries + MinRetries<R, Min = R2>>(self) -> Stream<T, L, B, O, R2> {
902 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
903 self.assume_retries::<R2>(nondet)
904 }
905}
906
907impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
908where
909 L: Location<'a>,
910{
911 /// Given a stream with [`ExactlyOnce`] retry guarantees, weakens it to an arbitrary guarantee
912 /// `R2`, which is safe because all guarantees are equal to or weaker than [`ExactlyOnce`]
913 pub fn weaker_retries<R2: Retries>(self) -> Stream<T, L, B, O, R2> {
914 self.assume_retries(
915 nondet!(/** any retry ordering is the same or weaker than ExactlyOnce */),
916 )
917 }
918}
919
920impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
921where
922 L: Location<'a>,
923{
924 /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
925 ///
926 /// # Example
927 /// ```rust
928 /// # use hydro_lang::prelude::*;
929 /// # use futures::StreamExt;
930 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
931 /// process.source_iter(q!(&[1, 2, 3])).cloned()
932 /// # }, |mut stream| async move {
933 /// // 1, 2, 3
934 /// # for w in vec![1, 2, 3] {
935 /// # assert_eq!(stream.next().await.unwrap(), w);
936 /// # }
937 /// # }));
938 /// ```
939 pub fn cloned(self) -> Stream<T, L, B, O, R>
940 where
941 T: Clone,
942 {
943 self.map(q!(|d| d.clone()))
944 }
945}
946
947impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
948where
949 L: Location<'a>,
950{
951 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
952 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
953 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
954 ///
955 /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
956 /// and there may be duplicates.
957 ///
958 /// # Example
959 /// ```rust
960 /// # use hydro_lang::prelude::*;
961 /// # use futures::StreamExt;
962 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
963 /// let tick = process.tick();
964 /// let bools = process.source_iter(q!(vec![false, true, false]));
965 /// let batch = bools.batch(&tick, nondet!(/** test */));
966 /// batch
967 /// .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
968 /// .all_ticks()
969 /// # }, |mut stream| async move {
970 /// // true
971 /// # assert_eq!(stream.next().await.unwrap(), true);
972 /// # }));
973 /// ```
974 pub fn fold_commutative_idempotent<A, I, F>(
975 self,
976 init: impl IntoQuotedMut<'a, I, L>,
977 comb: impl IntoQuotedMut<'a, F, L>,
978 ) -> Singleton<A, L, B>
979 where
980 I: Fn() -> A + 'a,
981 F: Fn(&mut A, T),
982 {
983 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
984 self.assume_ordering(nondet)
985 .assume_retries(nondet)
986 .fold(init, comb)
987 }
988
989 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
990 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
991 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
992 /// reference, so that it can be modified in place.
993 ///
994 /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
995 /// and there may be duplicates.
996 ///
997 /// # Example
998 /// ```rust
999 /// # use hydro_lang::prelude::*;
1000 /// # use futures::StreamExt;
1001 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1002 /// let tick = process.tick();
1003 /// let bools = process.source_iter(q!(vec![false, true, false]));
1004 /// let batch = bools.batch(&tick, nondet!(/** test */));
1005 /// batch
1006 /// .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1007 /// .all_ticks()
1008 /// # }, |mut stream| async move {
1009 /// // true
1010 /// # assert_eq!(stream.next().await.unwrap(), true);
1011 /// # }));
1012 /// ```
1013 pub fn reduce_commutative_idempotent<F>(
1014 self,
1015 comb: impl IntoQuotedMut<'a, F, L>,
1016 ) -> Optional<T, L, B>
1017 where
1018 F: Fn(&mut T, T) + 'a,
1019 {
1020 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1021 self.assume_ordering(nondet)
1022 .assume_retries(nondet)
1023 .reduce(comb)
1024 }
1025
1026 /// Computes the maximum element in the stream as an [`Optional`], which
1027 /// will be empty until the first element in the input arrives.
1028 ///
1029 /// # Example
1030 /// ```rust
1031 /// # use hydro_lang::prelude::*;
1032 /// # use futures::StreamExt;
1033 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1034 /// let tick = process.tick();
1035 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1036 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1037 /// batch.max().all_ticks()
1038 /// # }, |mut stream| async move {
1039 /// // 4
1040 /// # assert_eq!(stream.next().await.unwrap(), 4);
1041 /// # }));
1042 /// ```
1043 pub fn max(self) -> Optional<T, L, B>
1044 where
1045 T: Ord,
1046 {
1047 self.reduce_commutative_idempotent(q!(|curr, new| {
1048 if new > *curr {
1049 *curr = new;
1050 }
1051 }))
1052 }
1053
1054 /// Computes the maximum element in the stream as an [`Optional`], where the
1055 /// maximum is determined according to the `key` function. The [`Optional`] will
1056 /// be empty until the first element in the input arrives.
1057 ///
1058 /// # Example
1059 /// ```rust
1060 /// # use hydro_lang::prelude::*;
1061 /// # use futures::StreamExt;
1062 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1063 /// let tick = process.tick();
1064 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1065 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1066 /// batch.max_by_key(q!(|x| -x)).all_ticks()
1067 /// # }, |mut stream| async move {
1068 /// // 1
1069 /// # assert_eq!(stream.next().await.unwrap(), 1);
1070 /// # }));
1071 /// ```
1072 pub fn max_by_key<K, F>(self, key: impl IntoQuotedMut<'a, F, L> + Copy) -> Optional<T, L, B>
1073 where
1074 K: Ord,
1075 F: Fn(&T) -> K + 'a,
1076 {
1077 let f = key.splice_fn1_borrow_ctx(&self.location);
1078
1079 let wrapped: syn::Expr = parse_quote!({
1080 let key_fn = #f;
1081 move |curr, new| {
1082 if key_fn(&new) > key_fn(&*curr) {
1083 *curr = new;
1084 }
1085 }
1086 });
1087
1088 Optional::new(
1089 self.location.clone(),
1090 HydroNode::Reduce {
1091 f: wrapped.into(),
1092 input: Box::new(self.ir_node.into_inner()),
1093 metadata: self.location.new_node_metadata::<T>(),
1094 },
1095 )
1096 }
1097
1098 /// Computes the minimum element in the stream as an [`Optional`], which
1099 /// will be empty until the first element in the input arrives.
1100 ///
1101 /// # Example
1102 /// ```rust
1103 /// # use hydro_lang::prelude::*;
1104 /// # use futures::StreamExt;
1105 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1106 /// let tick = process.tick();
1107 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1108 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1109 /// batch.min().all_ticks()
1110 /// # }, |mut stream| async move {
1111 /// // 1
1112 /// # assert_eq!(stream.next().await.unwrap(), 1);
1113 /// # }));
1114 /// ```
1115 pub fn min(self) -> Optional<T, L, B>
1116 where
1117 T: Ord,
1118 {
1119 self.reduce_commutative_idempotent(q!(|curr, new| {
1120 if new < *curr {
1121 *curr = new;
1122 }
1123 }))
1124 }
1125}
1126
1127impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1128where
1129 L: Location<'a>,
1130{
1131 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1132 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1133 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1134 ///
1135 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1136 ///
1137 /// # Example
1138 /// ```rust
1139 /// # use hydro_lang::prelude::*;
1140 /// # use futures::StreamExt;
1141 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1142 /// let tick = process.tick();
1143 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1144 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1145 /// batch
1146 /// .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1147 /// .all_ticks()
1148 /// # }, |mut stream| async move {
1149 /// // 10
1150 /// # assert_eq!(stream.next().await.unwrap(), 10);
1151 /// # }));
1152 /// ```
1153 pub fn fold_commutative<A, I, F>(
1154 self,
1155 init: impl IntoQuotedMut<'a, I, L>,
1156 comb: impl IntoQuotedMut<'a, F, L>,
1157 ) -> Singleton<A, L, B>
1158 where
1159 I: Fn() -> A + 'a,
1160 F: Fn(&mut A, T),
1161 {
1162 let nondet = nondet!(/** the combinator function is commutative */);
1163 self.assume_ordering(nondet).fold(init, comb)
1164 }
1165
1166 /// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream,
1167 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1168 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1169 /// reference, so that it can be modified in place.
1170 ///
1171 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1172 ///
1173 /// # Example
1174 /// ```rust
1175 /// # use hydro_lang::prelude::*;
1176 /// # use futures::StreamExt;
1177 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1178 /// let tick = process.tick();
1179 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1180 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1181 /// batch
1182 /// .reduce_commutative(q!(|curr, new| *curr += new))
1183 /// .all_ticks()
1184 /// # }, |mut stream| async move {
1185 /// // 10
1186 /// # assert_eq!(stream.next().await.unwrap(), 10);
1187 /// # }));
1188 /// ```
1189 pub fn reduce_commutative<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1190 where
1191 F: Fn(&mut T, T) + 'a,
1192 {
1193 let nondet = nondet!(/** the combinator function is commutative */);
1194 self.assume_ordering(nondet).reduce(comb)
1195 }
1196
1197 /// Computes the number of elements in the stream as a [`Singleton`].
1198 ///
1199 /// # Example
1200 /// ```rust
1201 /// # use hydro_lang::prelude::*;
1202 /// # use futures::StreamExt;
1203 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1204 /// let tick = process.tick();
1205 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1206 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1207 /// batch.count().all_ticks()
1208 /// # }, |mut stream| async move {
1209 /// // 4
1210 /// # assert_eq!(stream.next().await.unwrap(), 4);
1211 /// # }));
1212 /// ```
1213 pub fn count(self) -> Singleton<usize, L, B> {
1214 self.fold_commutative(q!(|| 0usize), q!(|count, _| *count += 1))
1215 }
1216}
1217
1218impl<'a, T, L, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R>
1219where
1220 L: Location<'a>,
1221{
1222 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1223 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1224 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1225 ///
1226 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1227 ///
1228 /// # Example
1229 /// ```rust
1230 /// # use hydro_lang::prelude::*;
1231 /// # use futures::StreamExt;
1232 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1233 /// let tick = process.tick();
1234 /// let bools = process.source_iter(q!(vec![false, true, false]));
1235 /// let batch = bools.batch(&tick, nondet!(/** test */));
1236 /// batch
1237 /// .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1238 /// .all_ticks()
1239 /// # }, |mut stream| async move {
1240 /// // true
1241 /// # assert_eq!(stream.next().await.unwrap(), true);
1242 /// # }));
1243 /// ```
1244 pub fn fold_idempotent<A, I, F>(
1245 self,
1246 init: impl IntoQuotedMut<'a, I, L>,
1247 comb: impl IntoQuotedMut<'a, F, L>,
1248 ) -> Singleton<A, L, B>
1249 where
1250 I: Fn() -> A + 'a,
1251 F: Fn(&mut A, T),
1252 {
1253 let nondet = nondet!(/** the combinator function is idempotent */);
1254 self.assume_retries(nondet).fold(init, comb)
1255 }
1256
1257 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1258 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1259 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1260 /// reference, so that it can be modified in place.
1261 ///
1262 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1263 ///
1264 /// # Example
1265 /// ```rust
1266 /// # use hydro_lang::prelude::*;
1267 /// # use futures::StreamExt;
1268 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1269 /// let tick = process.tick();
1270 /// let bools = process.source_iter(q!(vec![false, true, false]));
1271 /// let batch = bools.batch(&tick, nondet!(/** test */));
1272 /// batch.reduce_idempotent(q!(|acc, x| *acc |= x)).all_ticks()
1273 /// # }, |mut stream| async move {
1274 /// // true
1275 /// # assert_eq!(stream.next().await.unwrap(), true);
1276 /// # }));
1277 /// ```
1278 pub fn reduce_idempotent<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1279 where
1280 F: Fn(&mut T, T) + 'a,
1281 {
1282 let nondet = nondet!(/** the combinator function is idempotent */);
1283 self.assume_retries(nondet).reduce(comb)
1284 }
1285
1286 /// Computes the first element in the stream as an [`Optional`], which
1287 /// will be empty until the first element in the input arrives.
1288 ///
1289 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1290 /// re-ordering of elements may cause the first element to change.
1291 ///
1292 /// # Example
1293 /// ```rust
1294 /// # use hydro_lang::prelude::*;
1295 /// # use futures::StreamExt;
1296 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1297 /// let tick = process.tick();
1298 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1299 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1300 /// batch.first().all_ticks()
1301 /// # }, |mut stream| async move {
1302 /// // 1
1303 /// # assert_eq!(stream.next().await.unwrap(), 1);
1304 /// # }));
1305 /// ```
1306 pub fn first(self) -> Optional<T, L, B> {
1307 self.reduce_idempotent(q!(|_, _| {}))
1308 }
1309
1310 /// Computes the last element in the stream as an [`Optional`], which
1311 /// will be empty until an element in the input arrives.
1312 ///
1313 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1314 /// re-ordering of elements may cause the last element to change.
1315 ///
1316 /// # Example
1317 /// ```rust
1318 /// # use hydro_lang::prelude::*;
1319 /// # use futures::StreamExt;
1320 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1321 /// let tick = process.tick();
1322 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1323 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1324 /// batch.last().all_ticks()
1325 /// # }, |mut stream| async move {
1326 /// // 4
1327 /// # assert_eq!(stream.next().await.unwrap(), 4);
1328 /// # }));
1329 /// ```
1330 pub fn last(self) -> Optional<T, L, B> {
1331 self.reduce_idempotent(q!(|curr, new| *curr = new))
1332 }
1333}
1334
1335impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
1336where
1337 L: Location<'a>,
1338{
1339 /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1340 ///
1341 /// # Example
1342 /// ```rust
1343 /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1344 /// # use futures::StreamExt;
1345 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, TotalOrder, ExactlyOnce>(|process| {
1346 /// let tick = process.tick();
1347 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1348 /// numbers.enumerate()
1349 /// # }, |mut stream| async move {
1350 /// // (0, 1), (1, 2), (2, 3), (3, 4)
1351 /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1352 /// # assert_eq!(stream.next().await.unwrap(), w);
1353 /// # }
1354 /// # }));
1355 /// ```
1356 pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder, ExactlyOnce> {
1357 Stream::new(
1358 self.location.clone(),
1359 HydroNode::Enumerate {
1360 input: Box::new(self.ir_node.into_inner()),
1361 metadata: self.location.new_node_metadata::<(usize, T)>(),
1362 },
1363 )
1364 }
1365
1366 /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1367 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1368 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1369 ///
1370 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1371 /// to depend on the order of elements in the stream.
1372 ///
1373 /// # Example
1374 /// ```rust
1375 /// # use hydro_lang::prelude::*;
1376 /// # use futures::StreamExt;
1377 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1378 /// let tick = process.tick();
1379 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1380 /// let batch = words.batch(&tick, nondet!(/** test */));
1381 /// batch
1382 /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1383 /// .all_ticks()
1384 /// # }, |mut stream| async move {
1385 /// // "HELLOWORLD"
1386 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1387 /// # }));
1388 /// ```
1389 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
1390 self,
1391 init: impl IntoQuotedMut<'a, I, L>,
1392 comb: impl IntoQuotedMut<'a, F, L>,
1393 ) -> Singleton<A, L, B> {
1394 let init = init.splice_fn0_ctx(&self.location).into();
1395 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1396
1397 let core = HydroNode::Fold {
1398 init,
1399 acc: comb,
1400 input: Box::new(self.ir_node.into_inner()),
1401 metadata: self.location.new_node_metadata::<A>(),
1402 };
1403
1404 Singleton::new(self.location, core)
1405 }
1406
1407 /// Collects all the elements of this stream into a single [`Vec`] element.
1408 ///
1409 /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1410 /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1411 /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1412 /// the vector at an arbitrary point in time.
1413 ///
1414 /// # Example
1415 /// ```rust
1416 /// # use hydro_lang::prelude::*;
1417 /// # use futures::StreamExt;
1418 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1419 /// let tick = process.tick();
1420 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1421 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1422 /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1423 /// # }, |mut stream| async move {
1424 /// // [ vec![1, 2, 3, 4] ]
1425 /// # for w in vec![vec![1, 2, 3, 4]] {
1426 /// # assert_eq!(stream.next().await.unwrap(), w);
1427 /// # }
1428 /// # }));
1429 /// ```
1430 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B> {
1431 self.fold(
1432 q!(|| vec![]),
1433 q!(|acc, v| {
1434 acc.push(v);
1435 }),
1436 )
1437 }
1438
1439 /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1440 /// and emitting each intermediate result.
1441 ///
1442 /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1443 /// containing all intermediate accumulated values. The scan operation can also terminate early
1444 /// by returning `None`.
1445 ///
1446 /// The function takes a mutable reference to the accumulator and the current element, and returns
1447 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1448 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1449 ///
1450 /// # Examples
1451 ///
1452 /// Basic usage - running sum:
1453 /// ```rust
1454 /// # use hydro_lang::prelude::*;
1455 /// # use futures::StreamExt;
1456 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1457 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1458 /// q!(|| 0),
1459 /// q!(|acc, x| {
1460 /// *acc += x;
1461 /// Some(*acc)
1462 /// }),
1463 /// )
1464 /// # }, |mut stream| async move {
1465 /// // Output: 1, 3, 6, 10
1466 /// # for w in vec![1, 3, 6, 10] {
1467 /// # assert_eq!(stream.next().await.unwrap(), w);
1468 /// # }
1469 /// # }));
1470 /// ```
1471 ///
1472 /// Early termination example:
1473 /// ```rust
1474 /// # use hydro_lang::prelude::*;
1475 /// # use futures::StreamExt;
1476 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1477 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1478 /// q!(|| 1),
1479 /// q!(|state, x| {
1480 /// *state = *state * x;
1481 /// if *state > 6 {
1482 /// None // Terminate the stream
1483 /// } else {
1484 /// Some(-*state)
1485 /// }
1486 /// }),
1487 /// )
1488 /// # }, |mut stream| async move {
1489 /// // Output: -1, -2, -6
1490 /// # for w in vec![-1, -2, -6] {
1491 /// # assert_eq!(stream.next().await.unwrap(), w);
1492 /// # }
1493 /// # }));
1494 /// ```
1495 pub fn scan<A, U, I, F>(
1496 self,
1497 init: impl IntoQuotedMut<'a, I, L>,
1498 f: impl IntoQuotedMut<'a, F, L>,
1499 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1500 where
1501 I: Fn() -> A + 'a,
1502 F: Fn(&mut A, T) -> Option<U> + 'a,
1503 {
1504 let init = init.splice_fn0_ctx(&self.location).into();
1505 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1506
1507 Stream::new(
1508 self.location.clone(),
1509 HydroNode::Scan {
1510 init,
1511 acc: f,
1512 input: Box::new(self.ir_node.into_inner()),
1513 metadata: self.location.new_node_metadata::<U>(),
1514 },
1515 )
1516 }
1517
1518 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1519 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1520 /// until the first element in the input arrives.
1521 ///
1522 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1523 /// to depend on the order of elements in the stream.
1524 ///
1525 /// # Example
1526 /// ```rust
1527 /// # use hydro_lang::prelude::*;
1528 /// # use futures::StreamExt;
1529 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1530 /// let tick = process.tick();
1531 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1532 /// let batch = words.batch(&tick, nondet!(/** test */));
1533 /// batch
1534 /// .map(q!(|x| x.to_string()))
1535 /// .reduce(q!(|curr, new| curr.push_str(&new)))
1536 /// .all_ticks()
1537 /// # }, |mut stream| async move {
1538 /// // "HELLOWORLD"
1539 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1540 /// # }));
1541 /// ```
1542 pub fn reduce<F: Fn(&mut T, T) + 'a>(
1543 self,
1544 comb: impl IntoQuotedMut<'a, F, L>,
1545 ) -> Optional<T, L, B> {
1546 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1547 let core = HydroNode::Reduce {
1548 f,
1549 input: Box::new(self.ir_node.into_inner()),
1550 metadata: self.location.new_node_metadata::<T>(),
1551 };
1552
1553 Optional::new(self.location, core)
1554 }
1555}
1556
1557impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
1558 /// Produces a new stream that interleaves the elements of the two input streams.
1559 /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1560 ///
1561 /// Currently, both input streams must be [`Unbounded`]. When the streams are
1562 /// [`Bounded`], you can use [`Stream::chain`] instead.
1563 ///
1564 /// # Example
1565 /// ```rust
1566 /// # use hydro_lang::prelude::*;
1567 /// # use futures::StreamExt;
1568 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1569 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1570 /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1571 /// # }, |mut stream| async move {
1572 /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1573 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1574 /// # assert_eq!(stream.next().await.unwrap(), w);
1575 /// # }
1576 /// # }));
1577 /// ```
1578 pub fn interleave<O2: Ordering, R2: Retries>(
1579 self,
1580 other: Stream<T, L, Unbounded, O2, R2>,
1581 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1582 where
1583 R: MinRetries<R2>,
1584 {
1585 let tick = self.location.tick();
1586 // Because the outputs are unordered, we can interleave batches from both streams.
1587 let nondet_batch_interleaving = nondet!(/** output stream is NoOrder, can interleave */);
1588 self.batch(&tick, nondet_batch_interleaving)
1589 .weakest_ordering()
1590 .chain(
1591 other
1592 .batch(&tick, nondet_batch_interleaving)
1593 .weakest_ordering(),
1594 )
1595 .all_ticks()
1596 }
1597}
1598
1599impl<'a, T, L, O: Ordering, R: Retries> Stream<T, L, Bounded, O, R>
1600where
1601 L: Location<'a>,
1602{
1603 /// Produces a new stream that emits the input elements in sorted order.
1604 ///
1605 /// The input stream can have any ordering guarantee, but the output stream
1606 /// will have a [`TotalOrder`] guarantee. This operator will block until all
1607 /// elements in the input stream are available, so it requires the input stream
1608 /// to be [`Bounded`].
1609 ///
1610 /// # Example
1611 /// ```rust
1612 /// # use hydro_lang::prelude::*;
1613 /// # use futures::StreamExt;
1614 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1615 /// let tick = process.tick();
1616 /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1617 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1618 /// batch.sort().all_ticks()
1619 /// # }, |mut stream| async move {
1620 /// // 1, 2, 3, 4
1621 /// # for w in (1..5) {
1622 /// # assert_eq!(stream.next().await.unwrap(), w);
1623 /// # }
1624 /// # }));
1625 /// ```
1626 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1627 where
1628 T: Ord,
1629 {
1630 Stream::new(
1631 self.location.clone(),
1632 HydroNode::Sort {
1633 input: Box::new(self.ir_node.into_inner()),
1634 metadata: self.location.new_node_metadata::<T>(),
1635 },
1636 )
1637 }
1638
1639 /// Produces a new stream that first emits the elements of the `self` stream,
1640 /// and then emits the elements of the `other` stream. The output stream has
1641 /// a [`TotalOrder`] guarantee if and only if both input streams have a
1642 /// [`TotalOrder`] guarantee.
1643 ///
1644 /// Currently, both input streams must be [`Bounded`]. This operator will block
1645 /// on the first stream until all its elements are available. In a future version,
1646 /// we will relax the requirement on the `other` stream.
1647 ///
1648 /// # Example
1649 /// ```rust
1650 /// # use hydro_lang::prelude::*;
1651 /// # use futures::StreamExt;
1652 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1653 /// let tick = process.tick();
1654 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1655 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1656 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1657 /// # }, |mut stream| async move {
1658 /// // 2, 3, 4, 5, 1, 2, 3, 4
1659 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1660 /// # assert_eq!(stream.next().await.unwrap(), w);
1661 /// # }
1662 /// # }));
1663 /// ```
1664 pub fn chain<O2: Ordering, R2: Retries>(
1665 self,
1666 other: Stream<T, L, Bounded, O2, R2>,
1667 ) -> Stream<T, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1668 where
1669 O: MinOrder<O2>,
1670 R: MinRetries<R2>,
1671 {
1672 check_matching_location(&self.location, &other.location);
1673
1674 Stream::new(
1675 self.location.clone(),
1676 HydroNode::Chain {
1677 first: Box::new(self.ir_node.into_inner()),
1678 second: Box::new(other.ir_node.into_inner()),
1679 metadata: self.location.new_node_metadata::<T>(),
1680 },
1681 )
1682 }
1683
1684 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
1685 /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
1686 /// because this is compiled into a nested loop.
1687 pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
1688 self,
1689 other: Stream<T2, L, Bounded, O2, R>,
1690 ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
1691 where
1692 T: Clone,
1693 T2: Clone,
1694 {
1695 check_matching_location(&self.location, &other.location);
1696
1697 Stream::new(
1698 self.location.clone(),
1699 HydroNode::CrossProduct {
1700 left: Box::new(self.ir_node.into_inner()),
1701 right: Box::new(other.ir_node.into_inner()),
1702 metadata: self.location.new_node_metadata::<(T, T2)>(),
1703 },
1704 )
1705 }
1706
1707 /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
1708 /// `self` used as the values for *each* key.
1709 ///
1710 /// This is helpful when "broadcasting" a set of values so that all the keys have the same
1711 /// values. For example, it can be used to send the same set of elements to several cluster
1712 /// members, if the membership information is available as a [`KeyedSingleton`].
1713 ///
1714 /// # Example
1715 /// ```rust
1716 /// # use hydro_lang::prelude::*;
1717 /// # use futures::StreamExt;
1718 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1719 /// # let tick = process.tick();
1720 /// let keyed_singleton = // { 1: (), 2: () }
1721 /// # process
1722 /// # .source_iter(q!(vec![(1, ()), (2, ())]))
1723 /// # .into_keyed()
1724 /// # .batch(&tick, nondet!(/** test */))
1725 /// # .first();
1726 /// let stream = // [ "a", "b" ]
1727 /// # process
1728 /// # .source_iter(q!(vec!["a".to_string(), "b".to_string()]))
1729 /// # .batch(&tick, nondet!(/** test */));
1730 /// stream.repeat_with_keys(keyed_singleton)
1731 /// # .entries().all_ticks()
1732 /// # }, |mut stream| async move {
1733 /// // { 1: ["a", "b" ], 2: ["a", "b"] }
1734 /// # let mut results = Vec::new();
1735 /// # for _ in 0..4 {
1736 /// # results.push(stream.next().await.unwrap());
1737 /// # }
1738 /// # results.sort();
1739 /// # assert_eq!(results, vec![(1, "a".to_string()), (1, "b".to_string()), (2, "a".to_string()), (2, "b".to_string())]);
1740 /// # }));
1741 /// ```
1742 pub fn repeat_with_keys<K, V2>(
1743 self,
1744 keys: KeyedSingleton<K, V2, L, Bounded>,
1745 ) -> KeyedStream<K, T, L, Bounded, O, R>
1746 where
1747 K: Clone,
1748 T: Clone,
1749 {
1750 keys.keys().weaken_retries().cross_product_nested_loop(self).into_keyed().assume_ordering(
1751 nondet!(/** keyed stream does not depend on ordering of keys, cross_product_nested_loop preserves order of values */)
1752 )
1753 }
1754}
1755
1756impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
1757where
1758 L: Location<'a>,
1759{
1760 #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
1761 /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
1762 /// by equi-joining the two streams on the key attribute `K`.
1763 ///
1764 /// # Example
1765 /// ```rust
1766 /// # use hydro_lang::prelude::*;
1767 /// # use std::collections::HashSet;
1768 /// # use futures::StreamExt;
1769 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1770 /// let tick = process.tick();
1771 /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
1772 /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
1773 /// stream1.join(stream2)
1774 /// # }, |mut stream| async move {
1775 /// // (1, ('a', 'x')), (2, ('b', 'y'))
1776 /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
1777 /// # stream.map(|i| assert!(expected.contains(&i)));
1778 /// # }));
1779 pub fn join<V2, O2: Ordering, R2: Retries>(
1780 self,
1781 n: Stream<(K, V2), L, B, O2, R2>,
1782 ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
1783 where
1784 K: Eq + Hash,
1785 R: MinRetries<R2>,
1786 {
1787 check_matching_location(&self.location, &n.location);
1788
1789 Stream::new(
1790 self.location.clone(),
1791 HydroNode::Join {
1792 left: Box::new(self.ir_node.into_inner()),
1793 right: Box::new(n.ir_node.into_inner()),
1794 metadata: self.location.new_node_metadata::<(K, (V1, V2))>(),
1795 },
1796 )
1797 }
1798
1799 /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
1800 /// computes the anti-join of the items in the input -- i.e. returns
1801 /// unique items in the first input that do not have a matching key
1802 /// in the second input.
1803 ///
1804 /// # Example
1805 /// ```rust
1806 /// # use hydro_lang::prelude::*;
1807 /// # use futures::StreamExt;
1808 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1809 /// let tick = process.tick();
1810 /// let stream = process
1811 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1812 /// .batch(&tick, nondet!(/** test */));
1813 /// let batch = process
1814 /// .source_iter(q!(vec![1, 2]))
1815 /// .batch(&tick, nondet!(/** test */));
1816 /// stream.anti_join(batch).all_ticks()
1817 /// # }, |mut stream| async move {
1818 /// # for w in vec![(3, 'c'), (4, 'd')] {
1819 /// # assert_eq!(stream.next().await.unwrap(), w);
1820 /// # }
1821 /// # }));
1822 pub fn anti_join<O2: Ordering, R2: Retries>(
1823 self,
1824 n: Stream<K, L, Bounded, O2, R2>,
1825 ) -> Stream<(K, V1), L, B, O, R>
1826 where
1827 K: Eq + Hash,
1828 {
1829 check_matching_location(&self.location, &n.location);
1830
1831 Stream::new(
1832 self.location.clone(),
1833 HydroNode::AntiJoin {
1834 pos: Box::new(self.ir_node.into_inner()),
1835 neg: Box::new(n.ir_node.into_inner()),
1836 metadata: self.location.new_node_metadata::<(K, V1)>(),
1837 },
1838 )
1839 }
1840}
1841
1842impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
1843 Stream<(K, V), L, B, O, R>
1844{
1845 /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
1846 /// is used as the key and the second element is added to the entries associated with that key.
1847 ///
1848 /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
1849 /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
1850 /// performing grouped aggregations, but also for more precise ordering guarantees such as
1851 /// total ordering _within_ each group but no ordering _across_ groups.
1852 ///
1853 /// # Example
1854 /// ```rust
1855 /// # use hydro_lang::prelude::*;
1856 /// # use futures::StreamExt;
1857 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1858 /// process
1859 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1860 /// .into_keyed()
1861 /// # .entries()
1862 /// # }, |mut stream| async move {
1863 /// // { 1: [2, 3], 2: [4] }
1864 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
1865 /// # assert_eq!(stream.next().await.unwrap(), w);
1866 /// # }
1867 /// # }));
1868 /// ```
1869 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
1870 KeyedStream {
1871 underlying: self.weakest_ordering(),
1872 _phantom_order: Default::default(),
1873 }
1874 }
1875}
1876
1877impl<'a, K, V, L> Stream<(K, V), Tick<L>, Bounded, TotalOrder, ExactlyOnce>
1878where
1879 K: Eq + Hash,
1880 L: Location<'a>,
1881{
1882 #[deprecated = "use .into_keyed().fold(...) instead"]
1883 /// A special case of [`Stream::fold`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1884 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1885 /// in the second element are accumulated via the `comb` closure.
1886 ///
1887 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1888 /// to depend on the order of elements in the stream.
1889 ///
1890 /// If the input and output value types are the same and do not require initialization then use
1891 /// [`Stream::reduce_keyed`].
1892 ///
1893 /// # Example
1894 /// ```rust
1895 /// # use hydro_lang::prelude::*;
1896 /// # use futures::StreamExt;
1897 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1898 /// let tick = process.tick();
1899 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1900 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1901 /// batch
1902 /// .fold_keyed(q!(|| 0), q!(|acc, x| *acc += x))
1903 /// .all_ticks()
1904 /// # }, |mut stream| async move {
1905 /// // (1, 5), (2, 7)
1906 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1907 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1908 /// # }));
1909 /// ```
1910 pub fn fold_keyed<A, I, F>(
1911 self,
1912 init: impl IntoQuotedMut<'a, I, Tick<L>>,
1913 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1914 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
1915 where
1916 I: Fn() -> A + 'a,
1917 F: Fn(&mut A, V) + 'a,
1918 {
1919 self.into_keyed().fold(init, comb).entries()
1920 }
1921
1922 #[deprecated = "use .into_keyed().reduce(...) instead"]
1923 /// A special case of [`Stream::reduce`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1924 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1925 /// in the second element are accumulated via the `comb` closure.
1926 ///
1927 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1928 /// to depend on the order of elements in the stream.
1929 ///
1930 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed`].
1931 ///
1932 /// # Example
1933 /// ```rust
1934 /// # use hydro_lang::prelude::*;
1935 /// # use futures::StreamExt;
1936 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1937 /// let tick = process.tick();
1938 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1939 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1940 /// batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks()
1941 /// # }, |mut stream| async move {
1942 /// // (1, 5), (2, 7)
1943 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1944 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1945 /// # }));
1946 /// ```
1947 pub fn reduce_keyed<F>(
1948 self,
1949 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1950 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
1951 where
1952 F: Fn(&mut V, V) + 'a,
1953 {
1954 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1955
1956 Stream::new(
1957 self.location.clone(),
1958 HydroNode::ReduceKeyed {
1959 f,
1960 input: Box::new(self.ir_node.into_inner()),
1961 metadata: self.location.new_node_metadata::<(K, V)>(),
1962 },
1963 )
1964 }
1965}
1966
1967impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
1968where
1969 K: Eq + Hash,
1970 L: Location<'a>,
1971{
1972 #[deprecated = "use .into_keyed().fold_commutative_idempotent(...) instead"]
1973 /// A special case of [`Stream::fold_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
1974 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
1975 /// in the second element are accumulated via the `comb` closure.
1976 ///
1977 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1978 /// as there may be non-deterministic duplicates.
1979 ///
1980 /// If the input and output value types are the same and do not require initialization then use
1981 /// [`Stream::reduce_keyed_commutative_idempotent`].
1982 ///
1983 /// # Example
1984 /// ```rust
1985 /// # use hydro_lang::prelude::*;
1986 /// # use futures::StreamExt;
1987 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1988 /// let tick = process.tick();
1989 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
1990 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1991 /// batch
1992 /// .fold_keyed_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1993 /// .all_ticks()
1994 /// # }, |mut stream| async move {
1995 /// // (1, false), (2, true)
1996 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1997 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1998 /// # }));
1999 /// ```
2000 pub fn fold_keyed_commutative_idempotent<A, I, F>(
2001 self,
2002 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2003 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2004 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2005 where
2006 I: Fn() -> A + 'a,
2007 F: Fn(&mut A, V) + 'a,
2008 {
2009 self.into_keyed()
2010 .fold_commutative_idempotent(init, comb)
2011 .entries()
2012 }
2013
2014 /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2015 /// # Example
2016 /// ```rust
2017 /// # use hydro_lang::prelude::*;
2018 /// # use futures::StreamExt;
2019 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2020 /// let tick = process.tick();
2021 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2022 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2023 /// batch.keys().all_ticks()
2024 /// # }, |mut stream| async move {
2025 /// // 1, 2
2026 /// # assert_eq!(stream.next().await.unwrap(), 1);
2027 /// # assert_eq!(stream.next().await.unwrap(), 2);
2028 /// # }));
2029 /// ```
2030 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2031 self.into_keyed()
2032 .fold_commutative_idempotent(q!(|| ()), q!(|_, _| {}))
2033 .keys()
2034 }
2035
2036 #[deprecated = "use .into_keyed().reduce_commutative_idempotent(...) instead"]
2037 /// A special case of [`Stream::reduce_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2038 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2039 /// in the second element are accumulated via the `comb` closure.
2040 ///
2041 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2042 /// as there may be non-deterministic duplicates.
2043 ///
2044 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative_idempotent`].
2045 ///
2046 /// # Example
2047 /// ```rust
2048 /// # use hydro_lang::prelude::*;
2049 /// # use futures::StreamExt;
2050 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2051 /// let tick = process.tick();
2052 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2053 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2054 /// batch
2055 /// .reduce_keyed_commutative_idempotent(q!(|acc, x| *acc |= x))
2056 /// .all_ticks()
2057 /// # }, |mut stream| async move {
2058 /// // (1, false), (2, true)
2059 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2060 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2061 /// # }));
2062 /// ```
2063 pub fn reduce_keyed_commutative_idempotent<F>(
2064 self,
2065 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2066 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2067 where
2068 F: Fn(&mut V, V) + 'a,
2069 {
2070 self.into_keyed()
2071 .reduce_commutative_idempotent(comb)
2072 .entries()
2073 }
2074}
2075
2076impl<'a, K, V, L, O: Ordering> Stream<(K, V), Tick<L>, Bounded, O, ExactlyOnce>
2077where
2078 K: Eq + Hash,
2079 L: Location<'a>,
2080{
2081 #[deprecated = "use .into_keyed().fold_commutative(...) instead"]
2082 /// A special case of [`Stream::fold_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2083 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2084 /// in the second element are accumulated via the `comb` closure.
2085 ///
2086 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2087 ///
2088 /// If the input and output value types are the same and do not require initialization then use
2089 /// [`Stream::reduce_keyed_commutative`].
2090 ///
2091 /// # Example
2092 /// ```rust
2093 /// # use hydro_lang::prelude::*;
2094 /// # use futures::StreamExt;
2095 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2096 /// let tick = process.tick();
2097 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2098 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2099 /// batch
2100 /// .fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x))
2101 /// .all_ticks()
2102 /// # }, |mut stream| async move {
2103 /// // (1, 5), (2, 7)
2104 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2105 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2106 /// # }));
2107 /// ```
2108 pub fn fold_keyed_commutative<A, I, F>(
2109 self,
2110 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2111 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2112 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2113 where
2114 I: Fn() -> A + 'a,
2115 F: Fn(&mut A, V) + 'a,
2116 {
2117 self.into_keyed().fold_commutative(init, comb).entries()
2118 }
2119
2120 #[deprecated = "use .into_keyed().reduce_commutative(...) instead"]
2121 /// A special case of [`Stream::reduce_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2122 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2123 /// in the second element are accumulated via the `comb` closure.
2124 ///
2125 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2126 ///
2127 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative`].
2128 ///
2129 /// # Example
2130 /// ```rust
2131 /// # use hydro_lang::prelude::*;
2132 /// # use futures::StreamExt;
2133 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2134 /// let tick = process.tick();
2135 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2136 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2137 /// batch
2138 /// .reduce_keyed_commutative(q!(|acc, x| *acc += x))
2139 /// .all_ticks()
2140 /// # }, |mut stream| async move {
2141 /// // (1, 5), (2, 7)
2142 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2143 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2144 /// # }));
2145 /// ```
2146 pub fn reduce_keyed_commutative<F>(
2147 self,
2148 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2149 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2150 where
2151 F: Fn(&mut V, V) + 'a,
2152 {
2153 self.into_keyed().reduce_commutative(comb).entries()
2154 }
2155}
2156
2157impl<'a, K, V, L, R: Retries> Stream<(K, V), Tick<L>, Bounded, TotalOrder, R>
2158where
2159 K: Eq + Hash,
2160 L: Location<'a>,
2161{
2162 #[deprecated = "use .into_keyed().fold_idempotent(...) instead"]
2163 /// A special case of [`Stream::fold_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2164 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2165 /// in the second element are accumulated via the `comb` closure.
2166 ///
2167 /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
2168 ///
2169 /// If the input and output value types are the same and do not require initialization then use
2170 /// [`Stream::reduce_keyed_idempotent`].
2171 ///
2172 /// # Example
2173 /// ```rust
2174 /// # use hydro_lang::prelude::*;
2175 /// # use futures::StreamExt;
2176 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2177 /// let tick = process.tick();
2178 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2179 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2180 /// batch
2181 /// .fold_keyed_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2182 /// .all_ticks()
2183 /// # }, |mut stream| async move {
2184 /// // (1, false), (2, true)
2185 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2186 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2187 /// # }));
2188 /// ```
2189 pub fn fold_keyed_idempotent<A, I, F>(
2190 self,
2191 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2192 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2193 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2194 where
2195 I: Fn() -> A + 'a,
2196 F: Fn(&mut A, V) + 'a,
2197 {
2198 self.into_keyed().fold_idempotent(init, comb).entries()
2199 }
2200
2201 #[deprecated = "use .into_keyed().reduce_idempotent(...) instead"]
2202 /// A special case of [`Stream::reduce_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2203 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2204 /// in the second element are accumulated via the `comb` closure.
2205 ///
2206 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
2207 ///
2208 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_idempotent`].
2209 ///
2210 /// # Example
2211 /// ```rust
2212 /// # use hydro_lang::prelude::*;
2213 /// # use futures::StreamExt;
2214 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2215 /// let tick = process.tick();
2216 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2217 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2218 /// batch
2219 /// .reduce_keyed_idempotent(q!(|acc, x| *acc |= x))
2220 /// .all_ticks()
2221 /// # }, |mut stream| async move {
2222 /// // (1, false), (2, true)
2223 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2224 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2225 /// # }));
2226 /// ```
2227 pub fn reduce_keyed_idempotent<F>(
2228 self,
2229 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2230 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2231 where
2232 F: Fn(&mut V, V) + 'a,
2233 {
2234 self.into_keyed().reduce_idempotent(comb).entries()
2235 }
2236}
2237
2238impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2239where
2240 L: Location<'a> + NoTick,
2241{
2242 /// Returns a stream corresponding to the latest batch of elements being atomically
2243 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2244 /// the order of the input.
2245 ///
2246 /// # Non-Determinism
2247 /// The batch boundaries are non-deterministic and may change across executions.
2248 pub fn batch_atomic(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2249 Stream::new(
2250 self.location.clone().tick,
2251 HydroNode::Batch {
2252 inner: Box::new(self.ir_node.into_inner()),
2253 metadata: self.location.tick.new_node_metadata::<T>(),
2254 },
2255 )
2256 }
2257
2258 /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2259 /// See [`Stream::atomic`] for more details.
2260 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2261 Stream::new(
2262 self.location.tick.l.clone(),
2263 HydroNode::EndAtomic {
2264 inner: Box::new(self.ir_node.into_inner()),
2265 metadata: self.location.tick.l.new_node_metadata::<T>(),
2266 },
2267 )
2268 }
2269
2270 /// Gets the [`Tick`] inside which this stream is synchronously processed. See [`Stream::atomic`].
2271 pub fn atomic_source(&self) -> Tick<L> {
2272 self.location.tick.clone()
2273 }
2274}
2275
2276impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2277where
2278 L: Location<'a>,
2279{
2280 /// Shifts this stream into an atomic context, which guarantees that any downstream logic
2281 /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
2282 ///
2283 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2284 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2285 /// argument that declares where the stream will be atomically processed. Batching a stream into
2286 /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2287 /// [`Tick`] will introduce asynchrony.
2288 pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
2289 let out_location = Atomic { tick: tick.clone() };
2290 Stream::new(
2291 out_location.clone(),
2292 HydroNode::BeginAtomic {
2293 inner: Box::new(self.ir_node.into_inner()),
2294 metadata: out_location.new_node_metadata::<T>(),
2295 },
2296 )
2297 }
2298
2299 /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2300 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2301 /// the order of the input. The output stream will execute in the [`Tick`] that was
2302 /// used to create the atomic section.
2303 ///
2304 /// # Non-Determinism
2305 /// The batch boundaries are non-deterministic and may change across executions.
2306 pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2307 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2308 Stream::new(
2309 tick.clone(),
2310 HydroNode::Batch {
2311 inner: Box::new(self.ir_node.into_inner()),
2312 metadata: tick.new_node_metadata::<T>(),
2313 },
2314 )
2315 }
2316
2317 /// Given a time interval, returns a stream corresponding to samples taken from the
2318 /// stream roughly at that interval. The output will have elements in the same order
2319 /// as the input, but with arbitrary elements skipped between samples. There is also
2320 /// no guarantee on the exact timing of the samples.
2321 ///
2322 /// # Non-Determinism
2323 /// The output stream is non-deterministic in which elements are sampled, since this
2324 /// is controlled by a clock.
2325 pub fn sample_every(
2326 self,
2327 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
2328 nondet: NonDet,
2329 ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
2330 where
2331 L: NoTick + NoAtomic,
2332 {
2333 let samples = self.location.source_interval(interval, nondet);
2334
2335 let tick = self.location.tick();
2336 self.batch(&tick, nondet)
2337 .filter_if_some(samples.batch(&tick, nondet).first())
2338 .all_ticks()
2339 .weakest_retries()
2340 }
2341
2342 /// Given a timeout duration, returns an [`Optional`] which will have a value if the
2343 /// stream has not emitted a value since that duration.
2344 ///
2345 /// # Non-Determinism
2346 /// Timeout relies on non-deterministic sampling of the stream, so depending on when
2347 /// samples take place, timeouts may be non-deterministically generated or missed,
2348 /// and the notification of the timeout may be delayed as well. There is also no
2349 /// guarantee on how long the [`Optional`] will have a value after the timeout is
2350 /// detected based on when the next sample is taken.
2351 pub fn timeout(
2352 self,
2353 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
2354 nondet: NonDet,
2355 ) -> Optional<(), L, Unbounded>
2356 where
2357 L: NoTick + NoAtomic,
2358 {
2359 let tick = self.location.tick();
2360
2361 let latest_received = self.assume_retries(nondet).fold_commutative(
2362 q!(|| None),
2363 q!(|latest, _| {
2364 *latest = Some(Instant::now());
2365 }),
2366 );
2367
2368 latest_received
2369 .snapshot(&tick, nondet)
2370 .filter_map(q!(move |latest_received| {
2371 if let Some(latest_received) = latest_received {
2372 if Instant::now().duration_since(latest_received) > duration {
2373 Some(())
2374 } else {
2375 None
2376 }
2377 } else {
2378 Some(())
2379 }
2380 }))
2381 .latest()
2382 }
2383}
2384
2385impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2386where
2387 L: Location<'a> + NoTick + NoAtomic,
2388 F: Future<Output = T>,
2389{
2390 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2391 /// Future outputs are produced as available, regardless of input arrival order.
2392 ///
2393 /// # Example
2394 /// ```rust
2395 /// # use std::collections::HashSet;
2396 /// # use futures::StreamExt;
2397 /// # use hydro_lang::prelude::*;
2398 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2399 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2400 /// .map(q!(|x| async move {
2401 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2402 /// x
2403 /// }))
2404 /// .resolve_futures()
2405 /// # },
2406 /// # |mut stream| async move {
2407 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2408 /// # let mut output = HashSet::new();
2409 /// # for _ in 1..10 {
2410 /// # output.insert(stream.next().await.unwrap());
2411 /// # }
2412 /// # assert_eq!(
2413 /// # output,
2414 /// # HashSet::<i32>::from_iter(1..10)
2415 /// # );
2416 /// # },
2417 /// # ));
2418 pub fn resolve_futures(self) -> Stream<T, L, B, NoOrder, R> {
2419 Stream::new(
2420 self.location.clone(),
2421 HydroNode::ResolveFutures {
2422 input: Box::new(self.ir_node.into_inner()),
2423 metadata: self.location.new_node_metadata::<T>(),
2424 },
2425 )
2426 }
2427
2428 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2429 /// Future outputs are produced in the same order as the input stream.
2430 ///
2431 /// # Example
2432 /// ```rust
2433 /// # use std::collections::HashSet;
2434 /// # use futures::StreamExt;
2435 /// # use hydro_lang::prelude::*;
2436 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2437 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2438 /// .map(q!(|x| async move {
2439 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2440 /// x
2441 /// }))
2442 /// .resolve_futures_ordered()
2443 /// # },
2444 /// # |mut stream| async move {
2445 /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2446 /// # let mut output = Vec::new();
2447 /// # for _ in 1..10 {
2448 /// # output.push(stream.next().await.unwrap());
2449 /// # }
2450 /// # assert_eq!(
2451 /// # output,
2452 /// # vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2453 /// # );
2454 /// # },
2455 /// # ));
2456 pub fn resolve_futures_ordered(self) -> Stream<T, L, B, O, R> {
2457 Stream::new(
2458 self.location.clone(),
2459 HydroNode::ResolveFuturesOrdered {
2460 input: Box::new(self.ir_node.into_inner()),
2461 metadata: self.location.new_node_metadata::<T>(),
2462 },
2463 )
2464 }
2465}
2466
2467impl<'a, T, L, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce>
2468where
2469 L: Location<'a> + NoTick,
2470{
2471 /// Executes the provided closure for every element in this stream.
2472 ///
2473 /// Because the closure may have side effects, the stream must have deterministic order
2474 /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
2475 /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
2476 /// [`Stream::assume_retries`] with an explanation for why this is the case.
2477 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
2478 let f = f.splice_fn1_ctx(&self.location).into();
2479 self.location
2480 .flow_state()
2481 .borrow_mut()
2482 .push_root(HydroRoot::ForEach {
2483 input: Box::new(self.ir_node.into_inner()),
2484 f,
2485 op_metadata: HydroIrOpMetadata::new(),
2486 });
2487 }
2488
2489 /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
2490 /// TCP socket to some other server. You should _not_ use this API for interacting with
2491 /// external clients, instead see [`Location::bidi_external_many_bytes`] and
2492 /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
2493 /// interaction with asynchronous sinks.
2494 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
2495 where
2496 S: 'a + futures::Sink<T> + Unpin,
2497 {
2498 self.location
2499 .flow_state()
2500 .borrow_mut()
2501 .push_root(HydroRoot::DestSink {
2502 sink: sink.splice_typed_ctx(&self.location).into(),
2503 input: Box::new(self.ir_node.into_inner()),
2504 op_metadata: HydroIrOpMetadata::new(),
2505 });
2506 }
2507}
2508
2509impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2510where
2511 L: Location<'a>,
2512{
2513 /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2514 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2515 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2516 Stream::new(
2517 self.location.outer().clone(),
2518 HydroNode::YieldConcat {
2519 inner: Box::new(self.ir_node.into_inner()),
2520 metadata: self.location.outer().new_node_metadata::<T>(),
2521 },
2522 )
2523 }
2524
2525 /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2526 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2527 ///
2528 /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2529 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2530 /// stream's [`Tick`] context.
2531 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2532 let out_location = Atomic {
2533 tick: self.location.clone(),
2534 };
2535
2536 Stream::new(
2537 out_location.clone(),
2538 HydroNode::YieldConcat {
2539 inner: Box::new(self.ir_node.into_inner()),
2540 metadata: out_location.new_node_metadata::<T>(),
2541 },
2542 )
2543 }
2544
2545 /// Accumulates the elements of this stream **across ticks** by concatenating them together.
2546 ///
2547 /// The output stream in tick T will contain the elements of the input at tick 0, 1, ..., up to
2548 /// and including tick T. This is useful for accumulating streaming inputs across ticks, but be
2549 /// careful when using this operator, as its memory usage will grow linearly over time since it
2550 /// must store its inputs indefinitely.
2551 ///
2552 /// # Example
2553 /// ```rust
2554 /// # use hydro_lang::prelude::*;
2555 /// # use futures::StreamExt;
2556 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2557 /// let tick = process.tick();
2558 /// // ticks are lazy by default, forces the second tick to run
2559 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2560 ///
2561 /// let batch_first_tick = process
2562 /// .source_iter(q!(vec![1, 2, 3, 4]))
2563 /// .batch(&tick, nondet!(/** test */));
2564 /// let batch_second_tick = process
2565 /// .source_iter(q!(vec![5, 6, 7, 8]))
2566 /// .batch(&tick, nondet!(/** test */))
2567 /// .defer_tick(); // appears on the second tick
2568 /// batch_first_tick.chain(batch_second_tick)
2569 /// .persist()
2570 /// .all_ticks()
2571 /// # }, |mut stream| async move {
2572 /// // [1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, ...]
2573 /// # for w in vec![1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8] {
2574 /// # assert_eq!(stream.next().await.unwrap(), w);
2575 /// # }
2576 /// # }));
2577 /// ```
2578 pub fn persist(self) -> Stream<T, Tick<L>, Bounded, O, R>
2579 where
2580 T: Clone,
2581 {
2582 Stream::new(
2583 self.location.clone(),
2584 HydroNode::Persist {
2585 inner: Box::new(self.ir_node.into_inner()),
2586 metadata: self.location.new_node_metadata::<T>(),
2587 },
2588 )
2589 }
2590
2591 #[expect(missing_docs, reason = "TODO")]
2592 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2593 Stream::new(
2594 self.location.clone(),
2595 HydroNode::DeferTick {
2596 input: Box::new(self.ir_node.into_inner()),
2597 metadata: self.location.new_node_metadata::<T>(),
2598 },
2599 )
2600 }
2601}
2602
2603#[cfg(test)]
2604mod tests {
2605 use futures::{SinkExt, StreamExt};
2606 use hydro_deploy::Deployment;
2607 use serde::{Deserialize, Serialize};
2608 use stageleft::q;
2609
2610 use crate::compile::builder::FlowBuilder;
2611 use crate::location::Location;
2612 use crate::nondet::nondet;
2613
2614 mod backtrace_chained_ops;
2615
2616 struct P1 {}
2617 struct P2 {}
2618
2619 #[derive(Serialize, Deserialize, Debug)]
2620 struct SendOverNetwork {
2621 n: u32,
2622 }
2623
2624 #[tokio::test]
2625 async fn first_ten_distributed() {
2626 let mut deployment = Deployment::new();
2627
2628 let flow = FlowBuilder::new();
2629 let first_node = flow.process::<P1>();
2630 let second_node = flow.process::<P2>();
2631 let external = flow.external::<P2>();
2632
2633 let numbers = first_node.source_iter(q!(0..10));
2634 let out_port = numbers
2635 .map(q!(|n| SendOverNetwork { n }))
2636 .send_bincode(&second_node)
2637 .send_bincode_external(&external);
2638
2639 let nodes = flow
2640 .with_process(&first_node, deployment.Localhost())
2641 .with_process(&second_node, deployment.Localhost())
2642 .with_external(&external, deployment.Localhost())
2643 .deploy(&mut deployment);
2644
2645 deployment.deploy().await.unwrap();
2646
2647 let mut external_out = nodes.connect_source_bincode(out_port).await;
2648
2649 deployment.start().await.unwrap();
2650
2651 for i in 0..10 {
2652 assert_eq!(external_out.next().await.unwrap().n, i);
2653 }
2654 }
2655
2656 #[tokio::test]
2657 async fn first_cardinality() {
2658 let mut deployment = Deployment::new();
2659
2660 let flow = FlowBuilder::new();
2661 let node = flow.process::<()>();
2662 let external = flow.external::<()>();
2663
2664 let node_tick = node.tick();
2665 let count = node_tick
2666 .singleton(q!([1, 2, 3]))
2667 .into_stream()
2668 .flatten_ordered()
2669 .first()
2670 .into_stream()
2671 .count()
2672 .all_ticks()
2673 .send_bincode_external(&external);
2674
2675 let nodes = flow
2676 .with_process(&node, deployment.Localhost())
2677 .with_external(&external, deployment.Localhost())
2678 .deploy(&mut deployment);
2679
2680 deployment.deploy().await.unwrap();
2681
2682 let mut external_out = nodes.connect_source_bincode(count).await;
2683
2684 deployment.start().await.unwrap();
2685
2686 assert_eq!(external_out.next().await.unwrap(), 1);
2687 }
2688
2689 #[tokio::test]
2690 async fn unbounded_reduce_remembers_state() {
2691 let mut deployment = Deployment::new();
2692
2693 let flow = FlowBuilder::new();
2694 let node = flow.process::<()>();
2695 let external = flow.external::<()>();
2696
2697 let (input_port, input) = node.source_external_bincode(&external);
2698 let out = input
2699 .reduce(q!(|acc, v| *acc += v))
2700 .sample_eager(nondet!(/** test */))
2701 .send_bincode_external(&external);
2702
2703 let nodes = flow
2704 .with_process(&node, deployment.Localhost())
2705 .with_external(&external, deployment.Localhost())
2706 .deploy(&mut deployment);
2707
2708 deployment.deploy().await.unwrap();
2709
2710 let mut external_in = nodes.connect_sink_bincode(input_port).await;
2711 let mut external_out = nodes.connect_source_bincode(out).await;
2712
2713 deployment.start().await.unwrap();
2714
2715 external_in.send(1).await.unwrap();
2716 assert_eq!(external_out.next().await.unwrap(), 1);
2717
2718 external_in.send(2).await.unwrap();
2719 assert_eq!(external_out.next().await.unwrap(), 3);
2720 }
2721
2722 #[tokio::test]
2723 async fn atomic_fold_replays_each_tick() {
2724 let mut deployment = Deployment::new();
2725
2726 let flow = FlowBuilder::new();
2727 let node = flow.process::<()>();
2728 let external = flow.external::<()>();
2729
2730 let (input_port, input) = node.source_external_bincode(&external);
2731 let tick = node.tick();
2732
2733 let out = input
2734 .batch(&tick, nondet!(/** test */))
2735 .cross_singleton(
2736 node.source_iter(q!(vec![1, 2, 3]))
2737 .atomic(&tick)
2738 .fold(q!(|| 0), q!(|acc, v| *acc += v))
2739 .snapshot_atomic(nondet!(/** test */)),
2740 )
2741 .all_ticks()
2742 .send_bincode_external(&external);
2743
2744 let nodes = flow
2745 .with_process(&node, deployment.Localhost())
2746 .with_external(&external, deployment.Localhost())
2747 .deploy(&mut deployment);
2748
2749 deployment.deploy().await.unwrap();
2750
2751 let mut external_in = nodes.connect_sink_bincode(input_port).await;
2752 let mut external_out = nodes.connect_source_bincode(out).await;
2753
2754 deployment.start().await.unwrap();
2755
2756 external_in.send(1).await.unwrap();
2757 assert_eq!(external_out.next().await.unwrap(), (1, 6));
2758
2759 external_in.send(2).await.unwrap();
2760 assert_eq!(external_out.next().await.unwrap(), (2, 6));
2761 }
2762
2763 #[tokio::test]
2764 async fn unbounded_scan_remembers_state() {
2765 let mut deployment = Deployment::new();
2766
2767 let flow = FlowBuilder::new();
2768 let node = flow.process::<()>();
2769 let external = flow.external::<()>();
2770
2771 let (input_port, input) = node.source_external_bincode(&external);
2772 let out = input
2773 .scan(
2774 q!(|| 0),
2775 q!(|acc, v| {
2776 *acc += v;
2777 Some(*acc)
2778 }),
2779 )
2780 .send_bincode_external(&external);
2781
2782 let nodes = flow
2783 .with_process(&node, deployment.Localhost())
2784 .with_external(&external, deployment.Localhost())
2785 .deploy(&mut deployment);
2786
2787 deployment.deploy().await.unwrap();
2788
2789 let mut external_in = nodes.connect_sink_bincode(input_port).await;
2790 let mut external_out = nodes.connect_source_bincode(out).await;
2791
2792 deployment.start().await.unwrap();
2793
2794 external_in.send(1).await.unwrap();
2795 assert_eq!(external_out.next().await.unwrap(), 1);
2796
2797 external_in.send(2).await.unwrap();
2798 assert_eq!(external_out.next().await.unwrap(), 3);
2799 }
2800
2801 #[tokio::test]
2802 async fn unbounded_enumerate_remembers_state() {
2803 let mut deployment = Deployment::new();
2804
2805 let flow = FlowBuilder::new();
2806 let node = flow.process::<()>();
2807 let external = flow.external::<()>();
2808
2809 let (input_port, input) = node.source_external_bincode(&external);
2810 let out = input.enumerate().send_bincode_external(&external);
2811
2812 let nodes = flow
2813 .with_process(&node, deployment.Localhost())
2814 .with_external(&external, deployment.Localhost())
2815 .deploy(&mut deployment);
2816
2817 deployment.deploy().await.unwrap();
2818
2819 let mut external_in = nodes.connect_sink_bincode(input_port).await;
2820 let mut external_out = nodes.connect_source_bincode(out).await;
2821
2822 deployment.start().await.unwrap();
2823
2824 external_in.send(1).await.unwrap();
2825 assert_eq!(external_out.next().await.unwrap(), (0, 1));
2826
2827 external_in.send(2).await.unwrap();
2828 assert_eq!(external_out.next().await.unwrap(), (1, 2));
2829 }
2830}