1use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::time::Duration;
19
20use bytes::BytesMut;
21use futures::stream::Stream as FuturesStream;
22use proc_macro2::Span;
23use serde::de::DeserializeOwned;
24use serde::{Deserialize, Serialize};
25use stageleft::{QuotedWithContext, q, quote_type};
26use syn::parse_quote;
27use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
28
29use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource};
30use crate::forward_handle::ForwardRef;
31#[cfg(stageleft_runtime)]
32use crate::forward_handle::{CycleCollection, ForwardHandle};
33use crate::live_collections::boundedness::Unbounded;
34use crate::live_collections::keyed_stream::KeyedStream;
35use crate::live_collections::singleton::Singleton;
36use crate::live_collections::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
37use crate::location::cluster::ClusterIds;
38use crate::location::dynamic::LocationId;
39use crate::location::external_process::{
40 ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many,
41};
42use crate::nondet::{NonDet, nondet};
43use crate::staging_util::get_this_crate;
44
45pub mod dynamic;
46
47#[expect(missing_docs, reason = "TODO")]
48pub mod external_process;
49pub use external_process::External;
50
51#[expect(missing_docs, reason = "TODO")]
52pub mod process;
53pub use process::Process;
54
55#[expect(missing_docs, reason = "TODO")]
56pub mod cluster;
57pub use cluster::Cluster;
58
59#[expect(missing_docs, reason = "TODO")]
60pub mod member_id;
61pub use member_id::MemberId;
62
63#[expect(missing_docs, reason = "TODO")]
64pub mod tick;
65pub use tick::{Atomic, NoTick, Tick};
66
67#[expect(missing_docs, reason = "TODO")]
68#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
69pub enum MembershipEvent {
70 Joined,
71 Left,
72}
73
74#[expect(missing_docs, reason = "TODO")]
75#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
76pub enum NetworkHint {
77 Auto,
78 TcpPort(Option<u16>),
79}
80
81pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
82 assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
83}
84
85#[expect(missing_docs, reason = "TODO")]
86#[expect(
87 private_bounds,
88 reason = "only internal Hydro code can define location types"
89)]
90pub trait Location<'a>: dynamic::DynLocation {
91 type Root: Location<'a>;
92
93 fn root(&self) -> Self::Root;
94
95 fn try_tick(&self) -> Option<Tick<Self>> {
96 if Self::is_top_level() {
97 let next_id = self.flow_state().borrow_mut().next_clock_id;
98 self.flow_state().borrow_mut().next_clock_id += 1;
99 Some(Tick {
100 id: next_id,
101 l: self.clone(),
102 })
103 } else {
104 None
105 }
106 }
107
108 fn id(&self) -> LocationId {
109 dynamic::DynLocation::id(self)
110 }
111
112 fn tick(&self) -> Tick<Self>
113 where
114 Self: NoTick,
115 {
116 let next_id = self.flow_state().borrow_mut().next_clock_id;
117 self.flow_state().borrow_mut().next_clock_id += 1;
118 Tick {
119 id: next_id,
120 l: self.clone(),
121 }
122 }
123
124 fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
125 where
126 Self: Sized + NoTick,
127 {
128 Stream::new(
129 self.clone(),
130 HydroNode::Source {
131 source: HydroSource::Spin(),
132 metadata: self.new_node_metadata::<()>(),
133 },
134 )
135 }
136
137 fn source_stream<T, E>(
138 &self,
139 e: impl QuotedWithContext<'a, E, Self>,
140 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
141 where
142 E: FuturesStream<Item = T> + Unpin,
143 Self: Sized + NoTick,
144 {
145 let e = e.splice_untyped_ctx(self);
146
147 Stream::new(
148 self.clone(),
149 HydroNode::Source {
150 source: HydroSource::Stream(e.into()),
151 metadata: self.new_node_metadata::<T>(),
152 },
153 )
154 }
155
156 fn source_iter<T, E>(
157 &self,
158 e: impl QuotedWithContext<'a, E, Self>,
159 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
160 where
161 E: IntoIterator<Item = T>,
162 Self: Sized + NoTick,
163 {
164 let e = e.splice_untyped_ctx(self);
167
168 Stream::new(
169 self.clone(),
170 HydroNode::Source {
171 source: HydroSource::Iter(e.into()),
172 metadata: self.new_node_metadata::<T>(),
173 },
174 )
175 }
176
177 fn source_cluster_members<C: 'a>(
178 &self,
179 cluster: &Cluster<'a, C>,
180 ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
181 where
182 Self: Sized + NoTick,
183 {
184 let underlying_memberids: ClusterIds<'a, C> = ClusterIds {
185 id: cluster.id,
186 _phantom: PhantomData,
187 };
188
189 self.source_iter(q!(underlying_memberids))
190 .map(q!(|id| (*id, MembershipEvent::Joined)))
191 .into_keyed()
192 }
193
194 fn source_external_bytes<L>(
195 &self,
196 from: &External<L>,
197 ) -> (
198 ExternalBytesPort,
199 Stream<std::io::Result<BytesMut>, Self, Unbounded, TotalOrder, ExactlyOnce>,
200 )
201 where
202 Self: Sized + NoTick,
203 {
204 let next_external_port_id = {
205 let mut flow_state = from.flow_state.borrow_mut();
206 let id = flow_state.next_external_out;
207 flow_state.next_external_out += 1;
208 id
209 };
210
211 (
212 ExternalBytesPort {
213 process_id: from.id,
214 port_id: next_external_port_id,
215 _phantom: Default::default(),
216 },
217 Stream::new(
218 self.clone(),
219 HydroNode::ExternalInput {
220 from_external_id: from.id,
221 from_key: next_external_port_id,
222 from_many: false,
223 codec_type: quote_type::<LengthDelimitedCodec>().into(),
224 port_hint: NetworkHint::Auto,
225 instantiate_fn: DebugInstantiate::Building,
226 deserialize_fn: None,
227 metadata: self.new_node_metadata::<std::io::Result<BytesMut>>(),
228 },
229 ),
230 )
231 }
232
233 fn source_external_bincode<L, T>(
234 &self,
235 from: &External<L>,
236 ) -> (
237 ExternalBincodeSink<T>,
238 Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>,
239 )
240 where
241 Self: Sized + NoTick,
242 T: Serialize + DeserializeOwned,
243 {
244 let next_external_port_id = {
245 let mut flow_state = from.flow_state.borrow_mut();
246 let id = flow_state.next_external_out;
247 flow_state.next_external_out += 1;
248 id
249 };
250
251 (
252 ExternalBincodeSink {
253 process_id: from.id,
254 port_id: next_external_port_id,
255 _phantom: PhantomData,
256 },
257 Stream::new(
258 self.clone(),
259 HydroNode::ExternalInput {
260 from_external_id: from.id,
261 from_key: next_external_port_id,
262 from_many: false,
263 codec_type: quote_type::<LengthDelimitedCodec>().into(),
264 port_hint: NetworkHint::Auto,
265 instantiate_fn: DebugInstantiate::Building,
266 deserialize_fn: Some(
267 crate::live_collections::stream::networking::deserialize_bincode::<T>(None)
268 .into(),
269 ),
270 metadata: self.new_node_metadata::<T>(),
271 },
272 ),
273 )
274 }
275
276 #[expect(clippy::type_complexity, reason = "stream markers")]
277 fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
278 &self,
279 from: &External<L>,
280 port_hint: NetworkHint,
281 ) -> (
282 ExternalBytesPort<Many>,
283 KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
284 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
285 ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
286 )
287 where
288 Self: Sized + NoTick,
289 {
290 let next_external_port_id = {
291 let mut flow_state = from.flow_state.borrow_mut();
292 let id = flow_state.next_external_out;
293 flow_state.next_external_out += 1;
294 id
295 };
296
297 let (fwd_ref, to_sink) =
298 self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
299 let mut flow_state_borrow = self.flow_state().borrow_mut();
300
301 flow_state_borrow.push_root(HydroRoot::SendExternal {
302 to_external_id: from.id,
303 to_key: next_external_port_id,
304 to_many: true,
305 serialize_fn: None,
306 instantiate_fn: DebugInstantiate::Building,
307 input: Box::new(to_sink.entries().ir_node.into_inner()),
308 op_metadata: HydroIrOpMetadata::new(),
309 });
310
311 let raw_stream: Stream<
312 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
313 Self,
314 Unbounded,
315 NoOrder,
316 ExactlyOnce,
317 > = Stream::new(
318 self.clone(),
319 HydroNode::ExternalInput {
320 from_external_id: from.id,
321 from_key: next_external_port_id,
322 from_many: true,
323 codec_type: quote_type::<Codec>().into(),
324 port_hint,
325 instantiate_fn: DebugInstantiate::Building,
326 deserialize_fn: None,
327 metadata: self
328 .new_node_metadata::<std::io::Result<(u64, <Codec as Decoder>::Item)>>(),
329 },
330 );
331
332 let membership_stream_ident = syn::Ident::new(
333 &format!(
334 "__hydro_deploy_many_{}_{}_membership",
335 from.id, next_external_port_id
336 ),
337 Span::call_site(),
338 );
339 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
340 let raw_membership_stream: Stream<(u64, bool), Self, Unbounded, TotalOrder, ExactlyOnce> =
341 Stream::new(
342 self.clone(),
343 HydroNode::Source {
344 source: HydroSource::Stream(membership_stream_expr.into()),
345 metadata: self.new_node_metadata::<(u64, bool)>(),
346 },
347 );
348
349 (
350 ExternalBytesPort {
351 process_id: from.id,
352 port_id: next_external_port_id,
353 _phantom: PhantomData,
354 },
355 raw_stream
356 .flatten_ordered() .into_keyed()
358 .assume_ordering::<TotalOrder>(
359 nondet!()
360 ),
361 raw_membership_stream
362 .into_keyed()
363 .assume_ordering::<TotalOrder>(
364 nondet!(),
365 )
366 .map(q!(|join| {
367 if join {
368 MembershipEvent::Joined
369 } else {
370 MembershipEvent::Left
371 }
372 })),
373 fwd_ref,
374 )
375 }
376
377 #[expect(clippy::type_complexity, reason = "stream markers")]
378 fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
379 &self,
380 from: &External<L>,
381 ) -> (
382 ExternalBincodeBidi<InT, OutT, Many>,
383 KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
384 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
385 ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
386 )
387 where
388 Self: Sized + NoTick,
389 {
390 let next_external_port_id = {
391 let mut flow_state = from.flow_state.borrow_mut();
392 let id = flow_state.next_external_out;
393 flow_state.next_external_out += 1;
394 id
395 };
396
397 let root = get_this_crate();
398
399 let (fwd_ref, to_sink) =
400 self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
401 let mut flow_state_borrow = self.flow_state().borrow_mut();
402
403 let out_t_type = quote_type::<OutT>();
404 let ser_fn: syn::Expr = syn::parse_quote! {
405 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
406 |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
407 )
408 };
409
410 flow_state_borrow.push_root(HydroRoot::SendExternal {
411 to_external_id: from.id,
412 to_key: next_external_port_id,
413 to_many: true,
414 serialize_fn: Some(ser_fn.into()),
415 instantiate_fn: DebugInstantiate::Building,
416 input: Box::new(to_sink.entries().ir_node.into_inner()),
417 op_metadata: HydroIrOpMetadata::new(),
418 });
419
420 let in_t_type = quote_type::<InT>();
421
422 let deser_fn: syn::Expr = syn::parse_quote! {
423 |res| {
424 let (id, b) = res.unwrap();
425 (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
426 }
427 };
428
429 let raw_stream: Stream<(u64, InT), Self, Unbounded, NoOrder, ExactlyOnce> = Stream::new(
430 self.clone(),
431 HydroNode::ExternalInput {
432 from_external_id: from.id,
433 from_key: next_external_port_id,
434 from_many: true,
435 codec_type: quote_type::<LengthDelimitedCodec>().into(),
436 port_hint: NetworkHint::Auto,
437 instantiate_fn: DebugInstantiate::Building,
438 deserialize_fn: Some(deser_fn.into()),
439 metadata: self.new_node_metadata::<(u64, InT)>(),
440 },
441 );
442
443 let membership_stream_ident = syn::Ident::new(
444 &format!(
445 "__hydro_deploy_many_{}_{}_membership",
446 from.id, next_external_port_id
447 ),
448 Span::call_site(),
449 );
450 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
451 let raw_membership_stream: Stream<(u64, bool), Self, Unbounded, NoOrder, ExactlyOnce> =
452 Stream::new(
453 self.clone(),
454 HydroNode::Source {
455 source: HydroSource::Stream(membership_stream_expr.into()),
456 metadata: self.new_node_metadata::<(u64, bool)>(),
457 },
458 );
459
460 (
461 ExternalBincodeBidi {
462 process_id: from.id,
463 port_id: next_external_port_id,
464 _phantom: PhantomData,
465 },
466 raw_stream.into_keyed().assume_ordering::<TotalOrder>(
467 nondet!(),
468 ),
469 raw_membership_stream
470 .into_keyed()
471 .assume_ordering::<TotalOrder>(
472 nondet!(),
473 )
474 .map(q!(|join| {
475 if join {
476 MembershipEvent::Joined
477 } else {
478 MembershipEvent::Left
479 }
480 })),
481 fwd_ref,
482 )
483 }
484
485 fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Unbounded>
501 where
502 T: Clone,
503 Self: Sized,
504 {
505 let e_arr = q!([e]);
509 let e = e_arr.splice_untyped_ctx(self);
510
511 if Self::is_top_level() {
512 Singleton::new(
513 self.clone(),
514 HydroNode::Persist {
515 inner: Box::new(HydroNode::Source {
516 source: HydroSource::Iter(e.into()),
517 metadata: self.new_node_metadata::<T>(),
518 }),
519 metadata: self.new_node_metadata::<T>(),
520 },
521 )
522 } else {
523 Singleton::new(
524 self.clone(),
525 HydroNode::Source {
526 source: HydroSource::Iter(e.into()),
527 metadata: self.new_node_metadata::<T>(),
528 },
529 )
530 }
531 }
532
533 fn source_interval(
543 &self,
544 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
545 _nondet: NonDet,
546 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
547 where
548 Self: Sized + NoTick,
549 {
550 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
551 tokio::time::interval(interval)
552 )))
553 }
554
555 fn source_interval_delayed(
566 &self,
567 delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
568 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
569 _nondet: NonDet,
570 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
571 where
572 Self: Sized + NoTick,
573 {
574 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
575 tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
576 )))
577 }
578
579 fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
580 where
581 S: CycleCollection<'a, ForwardRef, Location = Self>,
582 Self: NoTick,
583 {
584 let next_id = self.flow_state().borrow_mut().next_cycle_id();
585 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
586
587 (
588 ForwardHandle {
589 completed: false,
590 ident: ident.clone(),
591 expected_location: Location::id(self),
592 _phantom: PhantomData,
593 },
594 S::create_source(ident, self.clone()),
595 )
596 }
597}
598
599#[cfg(test)]
600mod tests {
601 use std::collections::HashSet;
602
603 use futures::{SinkExt, StreamExt};
604 use hydro_deploy::Deployment;
605 use stageleft::q;
606 use tokio_util::codec::LengthDelimitedCodec;
607
608 use crate::compile::builder::FlowBuilder;
609 use crate::location::{Location, NetworkHint};
610 use crate::nondet::nondet;
611
612 #[tokio::test]
613 async fn top_level_singleton_replay_cardinality() {
614 let mut deployment = Deployment::new();
615
616 let flow = FlowBuilder::new();
617 let node = flow.process::<()>();
618 let external = flow.external::<()>();
619
620 let (in_port, input) = node.source_external_bincode(&external);
621 let singleton = node.singleton(q!(123));
622 let tick = node.tick();
623 let out = input
624 .batch(&tick, nondet!())
625 .cross_singleton(singleton.clone().snapshot(&tick, nondet!()))
626 .cross_singleton(
627 singleton
628 .snapshot(&tick, nondet!())
629 .into_stream()
630 .count(),
631 )
632 .all_ticks()
633 .send_bincode_external(&external);
634
635 let nodes = flow
636 .with_process(&node, deployment.Localhost())
637 .with_external(&external, deployment.Localhost())
638 .deploy(&mut deployment);
639
640 deployment.deploy().await.unwrap();
641
642 let mut external_in = nodes.connect_sink_bincode(in_port).await;
643 let mut external_out = nodes.connect_source_bincode(out).await;
644
645 deployment.start().await.unwrap();
646
647 external_in.send(1).await.unwrap();
648 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
649
650 external_in.send(2).await.unwrap();
651 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
652 }
653
654 #[tokio::test]
655 async fn tick_singleton_replay_cardinality() {
656 let mut deployment = Deployment::new();
657
658 let flow = FlowBuilder::new();
659 let node = flow.process::<()>();
660 let external = flow.external::<()>();
661
662 let (in_port, input) = node.source_external_bincode(&external);
663 let tick = node.tick();
664 let singleton = tick.singleton(q!(123));
665 let out = input
666 .batch(&tick, nondet!())
667 .cross_singleton(singleton.clone())
668 .cross_singleton(singleton.into_stream().count())
669 .all_ticks()
670 .send_bincode_external(&external);
671
672 let nodes = flow
673 .with_process(&node, deployment.Localhost())
674 .with_external(&external, deployment.Localhost())
675 .deploy(&mut deployment);
676
677 deployment.deploy().await.unwrap();
678
679 let mut external_in = nodes.connect_sink_bincode(in_port).await;
680 let mut external_out = nodes.connect_source_bincode(out).await;
681
682 deployment.start().await.unwrap();
683
684 external_in.send(1).await.unwrap();
685 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
686
687 external_in.send(2).await.unwrap();
688 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
689 }
690
691 #[tokio::test]
692 async fn external_bytes() {
693 let mut deployment = Deployment::new();
694
695 let flow = FlowBuilder::new();
696 let first_node = flow.process::<()>();
697 let external = flow.external::<()>();
698
699 let (in_port, input) = first_node.source_external_bytes(&external);
700 let out = input
701 .map(q!(|r| r.unwrap()))
702 .send_bincode_external(&external);
703
704 let nodes = flow
705 .with_process(&first_node, deployment.Localhost())
706 .with_external(&external, deployment.Localhost())
707 .deploy(&mut deployment);
708
709 deployment.deploy().await.unwrap();
710
711 let mut external_in = nodes.connect_sink_bytes(in_port).await;
712 let mut external_out = nodes.connect_source_bincode(out).await;
713
714 deployment.start().await.unwrap();
715
716 external_in.send(vec![1, 2, 3].into()).await.unwrap();
717
718 assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
719 }
720
721 #[tokio::test]
722 async fn multi_external_source() {
723 let mut deployment = Deployment::new();
724
725 let flow = FlowBuilder::new();
726 let first_node = flow.process::<()>();
727 let external = flow.external::<()>();
728
729 let (in_port, input, _membership, complete_sink) =
730 first_node.bidi_external_many_bincode(&external);
731 let out = input.entries().send_bincode_external(&external);
732 complete_sink.complete(first_node.source_iter::<(u64, ()), _>(q!([])).into_keyed());
733
734 let nodes = flow
735 .with_process(&first_node, deployment.Localhost())
736 .with_external(&external, deployment.Localhost())
737 .deploy(&mut deployment);
738
739 deployment.deploy().await.unwrap();
740
741 let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
742 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
743 let external_out = nodes.connect_source_bincode(out).await;
744
745 deployment.start().await.unwrap();
746
747 external_in_1.send(123).await.unwrap();
748 external_in_2.send(456).await.unwrap();
749
750 assert_eq!(
751 external_out.take(2).collect::<HashSet<_>>().await,
752 vec![(0, 123), (1, 456)].into_iter().collect()
753 );
754 }
755
756 #[tokio::test]
757 async fn second_connection_only_multi_source() {
758 let mut deployment = Deployment::new();
759
760 let flow = FlowBuilder::new();
761 let first_node = flow.process::<()>();
762 let external = flow.external::<()>();
763
764 let (in_port, input, _membership, complete_sink) =
765 first_node.bidi_external_many_bincode(&external);
766 let out = input.entries().send_bincode_external(&external);
767 complete_sink.complete(first_node.source_iter::<(u64, ()), _>(q!([])).into_keyed());
768
769 let nodes = flow
770 .with_process(&first_node, deployment.Localhost())
771 .with_external(&external, deployment.Localhost())
772 .deploy(&mut deployment);
773
774 deployment.deploy().await.unwrap();
775
776 let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
778 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
779 let mut external_out = nodes.connect_source_bincode(out).await;
780
781 deployment.start().await.unwrap();
782
783 external_in_2.send(456).await.unwrap();
784
785 assert_eq!(external_out.next().await.unwrap(), (1, 456));
786 }
787
788 #[tokio::test]
789 async fn multi_external_bytes() {
790 let mut deployment = Deployment::new();
791
792 let flow = FlowBuilder::new();
793 let first_node = flow.process::<()>();
794 let external = flow.external::<()>();
795
796 let (in_port, input, _membership, complete_sink) = first_node
797 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
798 let out = input.entries().send_bincode_external(&external);
799 complete_sink.complete(first_node.source_iter(q!([])).into_keyed());
800
801 let nodes = flow
802 .with_process(&first_node, deployment.Localhost())
803 .with_external(&external, deployment.Localhost())
804 .deploy(&mut deployment);
805
806 deployment.deploy().await.unwrap();
807
808 let mut external_in_1 = nodes.connect_sink_bytes(in_port.clone()).await;
809 let mut external_in_2 = nodes.connect_sink_bytes(in_port).await;
810 let external_out = nodes.connect_source_bincode(out).await;
811
812 deployment.start().await.unwrap();
813
814 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
815 external_in_2.send(vec![4, 5].into()).await.unwrap();
816
817 assert_eq!(
818 external_out.take(2).collect::<HashSet<_>>().await,
819 vec![
820 (0, (&[1u8, 2, 3] as &[u8]).into()),
821 (1, (&[4u8, 5] as &[u8]).into())
822 ]
823 .into_iter()
824 .collect()
825 );
826 }
827
828 #[tokio::test]
829 async fn echo_external_bytes() {
830 let mut deployment = Deployment::new();
831
832 let flow = FlowBuilder::new();
833 let first_node = flow.process::<()>();
834 let external = flow.external::<()>();
835
836 let (port, input, _membership, complete_sink) = first_node
837 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
838 complete_sink
839 .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
840
841 let nodes = flow
842 .with_process(&first_node, deployment.Localhost())
843 .with_external(&external, deployment.Localhost())
844 .deploy(&mut deployment);
845
846 deployment.deploy().await.unwrap();
847
848 let (mut external_out_1, mut external_in_1) = nodes.connect_bytes(port.clone()).await;
849 let (mut external_out_2, mut external_in_2) = nodes.connect_bytes(port).await;
850
851 deployment.start().await.unwrap();
852
853 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
854 external_in_2.send(vec![4, 5].into()).await.unwrap();
855
856 assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
857 assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
858 }
859
860 #[tokio::test]
861 async fn echo_external_bincode() {
862 let mut deployment = Deployment::new();
863
864 let flow = FlowBuilder::new();
865 let first_node = flow.process::<()>();
866 let external = flow.external::<()>();
867
868 let (port, input, _membership, complete_sink) =
869 first_node.bidi_external_many_bincode(&external);
870 complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
871
872 let nodes = flow
873 .with_process(&first_node, deployment.Localhost())
874 .with_external(&external, deployment.Localhost())
875 .deploy(&mut deployment);
876
877 deployment.deploy().await.unwrap();
878
879 let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
880 let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
881
882 deployment.start().await.unwrap();
883
884 external_in_1.send("hi".to_string()).await.unwrap();
885 external_in_2.send("hello".to_string()).await.unwrap();
886
887 assert_eq!(external_out_1.next().await.unwrap(), "HI");
888 assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
889 }
890}