hydro_lang/location/
mod.rs

1//! Type definitions for distributed locations, which specify where pieces of a Hydro
2//! program will be executed.
3//!
4//! Hydro is a **global**, **distributed** programming model. This means that the data
5//! and computation in a Hydro program can be spread across multiple machines, data
6//! centers, and even continents. To achieve this, Hydro uses the concept of
7//! **locations** to keep track of _where_ data is located and computation is executed.
8//!
9//! Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
10//! which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
11//! and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
12//! to allow live collections to be _moved_ between locations via network send/receive.
13//!
14//! See [the Hydro docs](https://hydro.run/docs/hydro/locations/) for more information.
15
16use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::time::Duration;
19
20use bytes::BytesMut;
21use futures::stream::Stream as FuturesStream;
22use proc_macro2::Span;
23use serde::de::DeserializeOwned;
24use serde::{Deserialize, Serialize};
25use stageleft::{QuotedWithContext, q, quote_type};
26use syn::parse_quote;
27use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
28
29use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource};
30use crate::forward_handle::ForwardRef;
31#[cfg(stageleft_runtime)]
32use crate::forward_handle::{CycleCollection, ForwardHandle};
33use crate::live_collections::boundedness::Unbounded;
34use crate::live_collections::keyed_stream::KeyedStream;
35use crate::live_collections::singleton::Singleton;
36use crate::live_collections::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
37use crate::location::cluster::ClusterIds;
38use crate::location::dynamic::LocationId;
39use crate::location::external_process::{
40    ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many,
41};
42use crate::nondet::{NonDet, nondet};
43use crate::staging_util::get_this_crate;
44
45pub mod dynamic;
46
47#[expect(missing_docs, reason = "TODO")]
48pub mod external_process;
49pub use external_process::External;
50
51#[expect(missing_docs, reason = "TODO")]
52pub mod process;
53pub use process::Process;
54
55#[expect(missing_docs, reason = "TODO")]
56pub mod cluster;
57pub use cluster::Cluster;
58
59#[expect(missing_docs, reason = "TODO")]
60pub mod member_id;
61pub use member_id::MemberId;
62
63#[expect(missing_docs, reason = "TODO")]
64pub mod tick;
65pub use tick::{Atomic, NoTick, Tick};
66
67#[expect(missing_docs, reason = "TODO")]
68#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
69pub enum MembershipEvent {
70    Joined,
71    Left,
72}
73
74#[expect(missing_docs, reason = "TODO")]
75#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
76pub enum NetworkHint {
77    Auto,
78    TcpPort(Option<u16>),
79}
80
81pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
82    assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
83}
84
85#[expect(missing_docs, reason = "TODO")]
86#[expect(
87    private_bounds,
88    reason = "only internal Hydro code can define location types"
89)]
90pub trait Location<'a>: dynamic::DynLocation {
91    type Root: Location<'a>;
92
93    fn root(&self) -> Self::Root;
94
95    fn try_tick(&self) -> Option<Tick<Self>> {
96        if Self::is_top_level() {
97            let next_id = self.flow_state().borrow_mut().next_clock_id;
98            self.flow_state().borrow_mut().next_clock_id += 1;
99            Some(Tick {
100                id: next_id,
101                l: self.clone(),
102            })
103        } else {
104            None
105        }
106    }
107
108    fn id(&self) -> LocationId {
109        dynamic::DynLocation::id(self)
110    }
111
112    fn tick(&self) -> Tick<Self>
113    where
114        Self: NoTick,
115    {
116        let next_id = self.flow_state().borrow_mut().next_clock_id;
117        self.flow_state().borrow_mut().next_clock_id += 1;
118        Tick {
119            id: next_id,
120            l: self.clone(),
121        }
122    }
123
124    fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
125    where
126        Self: Sized + NoTick,
127    {
128        Stream::new(
129            self.clone(),
130            HydroNode::Source {
131                source: HydroSource::Spin(),
132                metadata: self.new_node_metadata::<()>(),
133            },
134        )
135    }
136
137    fn source_stream<T, E>(
138        &self,
139        e: impl QuotedWithContext<'a, E, Self>,
140    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
141    where
142        E: FuturesStream<Item = T> + Unpin,
143        Self: Sized + NoTick,
144    {
145        let e = e.splice_untyped_ctx(self);
146
147        Stream::new(
148            self.clone(),
149            HydroNode::Source {
150                source: HydroSource::Stream(e.into()),
151                metadata: self.new_node_metadata::<T>(),
152            },
153        )
154    }
155
156    fn source_iter<T, E>(
157        &self,
158        e: impl QuotedWithContext<'a, E, Self>,
159    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
160    where
161        E: IntoIterator<Item = T>,
162        Self: Sized + NoTick,
163    {
164        // TODO(shadaj): we mark this as unbounded because we do not yet have a representation
165        // for bounded top-level streams, and this is the only way to generate one
166        let e = e.splice_untyped_ctx(self);
167
168        Stream::new(
169            self.clone(),
170            HydroNode::Source {
171                source: HydroSource::Iter(e.into()),
172                metadata: self.new_node_metadata::<T>(),
173            },
174        )
175    }
176
177    fn source_cluster_members<C: 'a>(
178        &self,
179        cluster: &Cluster<'a, C>,
180    ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
181    where
182        Self: Sized + NoTick,
183    {
184        let underlying_memberids: ClusterIds<'a, C> = ClusterIds {
185            id: cluster.id,
186            _phantom: PhantomData,
187        };
188
189        self.source_iter(q!(underlying_memberids))
190            .map(q!(|id| (*id, MembershipEvent::Joined)))
191            .into_keyed()
192    }
193
194    fn source_external_bytes<L>(
195        &self,
196        from: &External<L>,
197    ) -> (
198        ExternalBytesPort,
199        Stream<std::io::Result<BytesMut>, Self, Unbounded, TotalOrder, ExactlyOnce>,
200    )
201    where
202        Self: Sized + NoTick,
203    {
204        let next_external_port_id = {
205            let mut flow_state = from.flow_state.borrow_mut();
206            let id = flow_state.next_external_out;
207            flow_state.next_external_out += 1;
208            id
209        };
210
211        (
212            ExternalBytesPort {
213                process_id: from.id,
214                port_id: next_external_port_id,
215                _phantom: Default::default(),
216            },
217            Stream::new(
218                self.clone(),
219                HydroNode::ExternalInput {
220                    from_external_id: from.id,
221                    from_key: next_external_port_id,
222                    from_many: false,
223                    codec_type: quote_type::<LengthDelimitedCodec>().into(),
224                    port_hint: NetworkHint::Auto,
225                    instantiate_fn: DebugInstantiate::Building,
226                    deserialize_fn: None,
227                    metadata: self.new_node_metadata::<std::io::Result<BytesMut>>(),
228                },
229            ),
230        )
231    }
232
233    fn source_external_bincode<L, T>(
234        &self,
235        from: &External<L>,
236    ) -> (
237        ExternalBincodeSink<T>,
238        Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>,
239    )
240    where
241        Self: Sized + NoTick,
242        T: Serialize + DeserializeOwned,
243    {
244        let next_external_port_id = {
245            let mut flow_state = from.flow_state.borrow_mut();
246            let id = flow_state.next_external_out;
247            flow_state.next_external_out += 1;
248            id
249        };
250
251        (
252            ExternalBincodeSink {
253                process_id: from.id,
254                port_id: next_external_port_id,
255                _phantom: PhantomData,
256            },
257            Stream::new(
258                self.clone(),
259                HydroNode::ExternalInput {
260                    from_external_id: from.id,
261                    from_key: next_external_port_id,
262                    from_many: false,
263                    codec_type: quote_type::<LengthDelimitedCodec>().into(),
264                    port_hint: NetworkHint::Auto,
265                    instantiate_fn: DebugInstantiate::Building,
266                    deserialize_fn: Some(
267                        crate::live_collections::stream::networking::deserialize_bincode::<T>(None)
268                            .into(),
269                    ),
270                    metadata: self.new_node_metadata::<T>(),
271                },
272            ),
273        )
274    }
275
276    #[expect(clippy::type_complexity, reason = "stream markers")]
277    fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
278        &self,
279        from: &External<L>,
280        port_hint: NetworkHint,
281    ) -> (
282        ExternalBytesPort<Many>,
283        KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
284        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
285        ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
286    )
287    where
288        Self: Sized + NoTick,
289    {
290        let next_external_port_id = {
291            let mut flow_state = from.flow_state.borrow_mut();
292            let id = flow_state.next_external_out;
293            flow_state.next_external_out += 1;
294            id
295        };
296
297        let (fwd_ref, to_sink) =
298            self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
299        let mut flow_state_borrow = self.flow_state().borrow_mut();
300
301        flow_state_borrow.push_root(HydroRoot::SendExternal {
302            to_external_id: from.id,
303            to_key: next_external_port_id,
304            to_many: true,
305            serialize_fn: None,
306            instantiate_fn: DebugInstantiate::Building,
307            input: Box::new(to_sink.entries().ir_node.into_inner()),
308            op_metadata: HydroIrOpMetadata::new(),
309        });
310
311        let raw_stream: Stream<
312            Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
313            Self,
314            Unbounded,
315            NoOrder,
316            ExactlyOnce,
317        > = Stream::new(
318            self.clone(),
319            HydroNode::ExternalInput {
320                from_external_id: from.id,
321                from_key: next_external_port_id,
322                from_many: true,
323                codec_type: quote_type::<Codec>().into(),
324                port_hint,
325                instantiate_fn: DebugInstantiate::Building,
326                deserialize_fn: None,
327                metadata: self
328                    .new_node_metadata::<std::io::Result<(u64, <Codec as Decoder>::Item)>>(),
329            },
330        );
331
332        let membership_stream_ident = syn::Ident::new(
333            &format!(
334                "__hydro_deploy_many_{}_{}_membership",
335                from.id, next_external_port_id
336            ),
337            Span::call_site(),
338        );
339        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
340        let raw_membership_stream: Stream<(u64, bool), Self, Unbounded, TotalOrder, ExactlyOnce> =
341            Stream::new(
342                self.clone(),
343                HydroNode::Source {
344                    source: HydroSource::Stream(membership_stream_expr.into()),
345                    metadata: self.new_node_metadata::<(u64, bool)>(),
346                },
347            );
348
349        (
350            ExternalBytesPort {
351                process_id: from.id,
352                port_id: next_external_port_id,
353                _phantom: PhantomData,
354            },
355            raw_stream
356                .flatten_ordered() // TODO(shadaj): this silently drops framing errors, decide on right defaults
357                .into_keyed()
358                .assume_ordering::<TotalOrder>(
359                    nondet!(/** order of messages is deterministic within each key due to TCP */)
360                ),
361            raw_membership_stream
362                .into_keyed()
363                .assume_ordering::<TotalOrder>(
364                    nondet!(/** membership events are ordered within each key */),
365                )
366                .map(q!(|join| {
367                    if join {
368                        MembershipEvent::Joined
369                    } else {
370                        MembershipEvent::Left
371                    }
372                })),
373            fwd_ref,
374        )
375    }
376
377    #[expect(clippy::type_complexity, reason = "stream markers")]
378    fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
379        &self,
380        from: &External<L>,
381    ) -> (
382        ExternalBincodeBidi<InT, OutT, Many>,
383        KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
384        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
385        ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
386    )
387    where
388        Self: Sized + NoTick,
389    {
390        let next_external_port_id = {
391            let mut flow_state = from.flow_state.borrow_mut();
392            let id = flow_state.next_external_out;
393            flow_state.next_external_out += 1;
394            id
395        };
396
397        let root = get_this_crate();
398
399        let (fwd_ref, to_sink) =
400            self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
401        let mut flow_state_borrow = self.flow_state().borrow_mut();
402
403        let out_t_type = quote_type::<OutT>();
404        let ser_fn: syn::Expr = syn::parse_quote! {
405            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
406                |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
407            )
408        };
409
410        flow_state_borrow.push_root(HydroRoot::SendExternal {
411            to_external_id: from.id,
412            to_key: next_external_port_id,
413            to_many: true,
414            serialize_fn: Some(ser_fn.into()),
415            instantiate_fn: DebugInstantiate::Building,
416            input: Box::new(to_sink.entries().ir_node.into_inner()),
417            op_metadata: HydroIrOpMetadata::new(),
418        });
419
420        let in_t_type = quote_type::<InT>();
421
422        let deser_fn: syn::Expr = syn::parse_quote! {
423            |res| {
424                let (id, b) = res.unwrap();
425                (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
426            }
427        };
428
429        let raw_stream: Stream<(u64, InT), Self, Unbounded, NoOrder, ExactlyOnce> = Stream::new(
430            self.clone(),
431            HydroNode::ExternalInput {
432                from_external_id: from.id,
433                from_key: next_external_port_id,
434                from_many: true,
435                codec_type: quote_type::<LengthDelimitedCodec>().into(),
436                port_hint: NetworkHint::Auto,
437                instantiate_fn: DebugInstantiate::Building,
438                deserialize_fn: Some(deser_fn.into()),
439                metadata: self.new_node_metadata::<(u64, InT)>(),
440            },
441        );
442
443        let membership_stream_ident = syn::Ident::new(
444            &format!(
445                "__hydro_deploy_many_{}_{}_membership",
446                from.id, next_external_port_id
447            ),
448            Span::call_site(),
449        );
450        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
451        let raw_membership_stream: Stream<(u64, bool), Self, Unbounded, NoOrder, ExactlyOnce> =
452            Stream::new(
453                self.clone(),
454                HydroNode::Source {
455                    source: HydroSource::Stream(membership_stream_expr.into()),
456                    metadata: self.new_node_metadata::<(u64, bool)>(),
457                },
458            );
459
460        (
461            ExternalBincodeBidi {
462                process_id: from.id,
463                port_id: next_external_port_id,
464                _phantom: PhantomData,
465            },
466            raw_stream.into_keyed().assume_ordering::<TotalOrder>(
467                nondet!(/** order of messages is deterministic within each key due to TCP */),
468            ),
469            raw_membership_stream
470                .into_keyed()
471                .assume_ordering::<TotalOrder>(
472                    nondet!(/** membership events are ordered within each key */),
473                )
474                .map(q!(|join| {
475                    if join {
476                        MembershipEvent::Joined
477                    } else {
478                        MembershipEvent::Left
479                    }
480                })),
481            fwd_ref,
482        )
483    }
484
485    /// Constructs a [`Singleton`] materialized at this location with the given static value.
486    ///
487    /// # Example
488    /// ```rust
489    /// # use hydro_lang::prelude::*;
490    /// # use futures::StreamExt;
491    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
492    /// let tick = process.tick();
493    /// let singleton = tick.singleton(q!(5));
494    /// # singleton.all_ticks()
495    /// # }, |mut stream| async move {
496    /// // 5
497    /// # assert_eq!(stream.next().await.unwrap(), 5);
498    /// # }));
499    /// ```
500    fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Unbounded>
501    where
502        T: Clone,
503        Self: Sized,
504    {
505        // TODO(shadaj): we mark this as unbounded because we do not yet have a representation
506        // for bounded top-level singletons, and this is the only way to generate one
507
508        let e_arr = q!([e]);
509        let e = e_arr.splice_untyped_ctx(self);
510
511        if Self::is_top_level() {
512            Singleton::new(
513                self.clone(),
514                HydroNode::Persist {
515                    inner: Box::new(HydroNode::Source {
516                        source: HydroSource::Iter(e.into()),
517                        metadata: self.new_node_metadata::<T>(),
518                    }),
519                    metadata: self.new_node_metadata::<T>(),
520                },
521            )
522        } else {
523            Singleton::new(
524                self.clone(),
525                HydroNode::Source {
526                    source: HydroSource::Iter(e.into()),
527                    metadata: self.new_node_metadata::<T>(),
528                },
529            )
530        }
531    }
532
533    /// Generates a stream with values emitted at a fixed interval, with
534    /// each value being the current time (as an [`tokio::time::Instant`]).
535    ///
536    /// The clock source used is monotonic, so elements will be emitted in
537    /// increasing order.
538    ///
539    /// # Non-Determinism
540    /// Because this stream is generated by an OS timer, it will be
541    /// non-deterministic because each timestamp will be arbitrary.
542    fn source_interval(
543        &self,
544        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
545        _nondet: NonDet,
546    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
547    where
548        Self: Sized + NoTick,
549    {
550        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
551            tokio::time::interval(interval)
552        )))
553    }
554
555    /// Generates a stream with values emitted at a fixed interval (with an
556    /// initial delay), with each value being the current time
557    /// (as an [`tokio::time::Instant`]).
558    ///
559    /// The clock source used is monotonic, so elements will be emitted in
560    /// increasing order.
561    ///
562    /// # Non-Determinism
563    /// Because this stream is generated by an OS timer, it will be
564    /// non-deterministic because each timestamp will be arbitrary.
565    fn source_interval_delayed(
566        &self,
567        delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
568        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
569        _nondet: NonDet,
570    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
571    where
572        Self: Sized + NoTick,
573    {
574        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
575            tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
576        )))
577    }
578
579    fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
580    where
581        S: CycleCollection<'a, ForwardRef, Location = Self>,
582        Self: NoTick,
583    {
584        let next_id = self.flow_state().borrow_mut().next_cycle_id();
585        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
586
587        (
588            ForwardHandle {
589                completed: false,
590                ident: ident.clone(),
591                expected_location: Location::id(self),
592                _phantom: PhantomData,
593            },
594            S::create_source(ident, self.clone()),
595        )
596    }
597}
598
599#[cfg(test)]
600mod tests {
601    use std::collections::HashSet;
602
603    use futures::{SinkExt, StreamExt};
604    use hydro_deploy::Deployment;
605    use stageleft::q;
606    use tokio_util::codec::LengthDelimitedCodec;
607
608    use crate::compile::builder::FlowBuilder;
609    use crate::location::{Location, NetworkHint};
610    use crate::nondet::nondet;
611
612    #[tokio::test]
613    async fn top_level_singleton_replay_cardinality() {
614        let mut deployment = Deployment::new();
615
616        let flow = FlowBuilder::new();
617        let node = flow.process::<()>();
618        let external = flow.external::<()>();
619
620        let (in_port, input) = node.source_external_bincode(&external);
621        let singleton = node.singleton(q!(123));
622        let tick = node.tick();
623        let out = input
624            .batch(&tick, nondet!(/** test */))
625            .cross_singleton(singleton.clone().snapshot(&tick, nondet!(/** test */)))
626            .cross_singleton(
627                singleton
628                    .snapshot(&tick, nondet!(/** test */))
629                    .into_stream()
630                    .count(),
631            )
632            .all_ticks()
633            .send_bincode_external(&external);
634
635        let nodes = flow
636            .with_process(&node, deployment.Localhost())
637            .with_external(&external, deployment.Localhost())
638            .deploy(&mut deployment);
639
640        deployment.deploy().await.unwrap();
641
642        let mut external_in = nodes.connect_sink_bincode(in_port).await;
643        let mut external_out = nodes.connect_source_bincode(out).await;
644
645        deployment.start().await.unwrap();
646
647        external_in.send(1).await.unwrap();
648        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
649
650        external_in.send(2).await.unwrap();
651        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
652    }
653
654    #[tokio::test]
655    async fn tick_singleton_replay_cardinality() {
656        let mut deployment = Deployment::new();
657
658        let flow = FlowBuilder::new();
659        let node = flow.process::<()>();
660        let external = flow.external::<()>();
661
662        let (in_port, input) = node.source_external_bincode(&external);
663        let tick = node.tick();
664        let singleton = tick.singleton(q!(123));
665        let out = input
666            .batch(&tick, nondet!(/** test */))
667            .cross_singleton(singleton.clone())
668            .cross_singleton(singleton.into_stream().count())
669            .all_ticks()
670            .send_bincode_external(&external);
671
672        let nodes = flow
673            .with_process(&node, deployment.Localhost())
674            .with_external(&external, deployment.Localhost())
675            .deploy(&mut deployment);
676
677        deployment.deploy().await.unwrap();
678
679        let mut external_in = nodes.connect_sink_bincode(in_port).await;
680        let mut external_out = nodes.connect_source_bincode(out).await;
681
682        deployment.start().await.unwrap();
683
684        external_in.send(1).await.unwrap();
685        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
686
687        external_in.send(2).await.unwrap();
688        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
689    }
690
691    #[tokio::test]
692    async fn external_bytes() {
693        let mut deployment = Deployment::new();
694
695        let flow = FlowBuilder::new();
696        let first_node = flow.process::<()>();
697        let external = flow.external::<()>();
698
699        let (in_port, input) = first_node.source_external_bytes(&external);
700        let out = input
701            .map(q!(|r| r.unwrap()))
702            .send_bincode_external(&external);
703
704        let nodes = flow
705            .with_process(&first_node, deployment.Localhost())
706            .with_external(&external, deployment.Localhost())
707            .deploy(&mut deployment);
708
709        deployment.deploy().await.unwrap();
710
711        let mut external_in = nodes.connect_sink_bytes(in_port).await;
712        let mut external_out = nodes.connect_source_bincode(out).await;
713
714        deployment.start().await.unwrap();
715
716        external_in.send(vec![1, 2, 3].into()).await.unwrap();
717
718        assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
719    }
720
721    #[tokio::test]
722    async fn multi_external_source() {
723        let mut deployment = Deployment::new();
724
725        let flow = FlowBuilder::new();
726        let first_node = flow.process::<()>();
727        let external = flow.external::<()>();
728
729        let (in_port, input, _membership, complete_sink) =
730            first_node.bidi_external_many_bincode(&external);
731        let out = input.entries().send_bincode_external(&external);
732        complete_sink.complete(first_node.source_iter::<(u64, ()), _>(q!([])).into_keyed());
733
734        let nodes = flow
735            .with_process(&first_node, deployment.Localhost())
736            .with_external(&external, deployment.Localhost())
737            .deploy(&mut deployment);
738
739        deployment.deploy().await.unwrap();
740
741        let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
742        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
743        let external_out = nodes.connect_source_bincode(out).await;
744
745        deployment.start().await.unwrap();
746
747        external_in_1.send(123).await.unwrap();
748        external_in_2.send(456).await.unwrap();
749
750        assert_eq!(
751            external_out.take(2).collect::<HashSet<_>>().await,
752            vec![(0, 123), (1, 456)].into_iter().collect()
753        );
754    }
755
756    #[tokio::test]
757    async fn second_connection_only_multi_source() {
758        let mut deployment = Deployment::new();
759
760        let flow = FlowBuilder::new();
761        let first_node = flow.process::<()>();
762        let external = flow.external::<()>();
763
764        let (in_port, input, _membership, complete_sink) =
765            first_node.bidi_external_many_bincode(&external);
766        let out = input.entries().send_bincode_external(&external);
767        complete_sink.complete(first_node.source_iter::<(u64, ()), _>(q!([])).into_keyed());
768
769        let nodes = flow
770            .with_process(&first_node, deployment.Localhost())
771            .with_external(&external, deployment.Localhost())
772            .deploy(&mut deployment);
773
774        deployment.deploy().await.unwrap();
775
776        // intentionally skipped to test stream waking logic
777        let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
778        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
779        let mut external_out = nodes.connect_source_bincode(out).await;
780
781        deployment.start().await.unwrap();
782
783        external_in_2.send(456).await.unwrap();
784
785        assert_eq!(external_out.next().await.unwrap(), (1, 456));
786    }
787
788    #[tokio::test]
789    async fn multi_external_bytes() {
790        let mut deployment = Deployment::new();
791
792        let flow = FlowBuilder::new();
793        let first_node = flow.process::<()>();
794        let external = flow.external::<()>();
795
796        let (in_port, input, _membership, complete_sink) = first_node
797            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
798        let out = input.entries().send_bincode_external(&external);
799        complete_sink.complete(first_node.source_iter(q!([])).into_keyed());
800
801        let nodes = flow
802            .with_process(&first_node, deployment.Localhost())
803            .with_external(&external, deployment.Localhost())
804            .deploy(&mut deployment);
805
806        deployment.deploy().await.unwrap();
807
808        let mut external_in_1 = nodes.connect_sink_bytes(in_port.clone()).await;
809        let mut external_in_2 = nodes.connect_sink_bytes(in_port).await;
810        let external_out = nodes.connect_source_bincode(out).await;
811
812        deployment.start().await.unwrap();
813
814        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
815        external_in_2.send(vec![4, 5].into()).await.unwrap();
816
817        assert_eq!(
818            external_out.take(2).collect::<HashSet<_>>().await,
819            vec![
820                (0, (&[1u8, 2, 3] as &[u8]).into()),
821                (1, (&[4u8, 5] as &[u8]).into())
822            ]
823            .into_iter()
824            .collect()
825        );
826    }
827
828    #[tokio::test]
829    async fn echo_external_bytes() {
830        let mut deployment = Deployment::new();
831
832        let flow = FlowBuilder::new();
833        let first_node = flow.process::<()>();
834        let external = flow.external::<()>();
835
836        let (port, input, _membership, complete_sink) = first_node
837            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
838        complete_sink
839            .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
840
841        let nodes = flow
842            .with_process(&first_node, deployment.Localhost())
843            .with_external(&external, deployment.Localhost())
844            .deploy(&mut deployment);
845
846        deployment.deploy().await.unwrap();
847
848        let (mut external_out_1, mut external_in_1) = nodes.connect_bytes(port.clone()).await;
849        let (mut external_out_2, mut external_in_2) = nodes.connect_bytes(port).await;
850
851        deployment.start().await.unwrap();
852
853        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
854        external_in_2.send(vec![4, 5].into()).await.unwrap();
855
856        assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
857        assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
858    }
859
860    #[tokio::test]
861    async fn echo_external_bincode() {
862        let mut deployment = Deployment::new();
863
864        let flow = FlowBuilder::new();
865        let first_node = flow.process::<()>();
866        let external = flow.external::<()>();
867
868        let (port, input, _membership, complete_sink) =
869            first_node.bidi_external_many_bincode(&external);
870        complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
871
872        let nodes = flow
873            .with_process(&first_node, deployment.Localhost())
874            .with_external(&external, deployment.Localhost())
875            .deploy(&mut deployment);
876
877        deployment.deploy().await.unwrap();
878
879        let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
880        let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
881
882        deployment.start().await.unwrap();
883
884        external_in_1.send("hi".to_string()).await.unwrap();
885        external_in_2.send("hello".to_string()).await.unwrap();
886
887        assert_eq!(external_out_1.next().await.unwrap(), "HI");
888        assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
889    }
890}