hydro_lang/live_collections/stream/networking.rs
1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::stream::Retries;
16#[cfg(stageleft_runtime)]
17use crate::location::dynamic::DynLocation;
18use crate::location::external_process::ExternalBincodeStream;
19use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
20use crate::nondet::NonDet;
21use crate::staging_util::get_this_crate;
22
23// same as the one in `hydro_std`, but internal use only
24fn track_membership<'a, C, L: Location<'a> + NoTick>(
25 membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
26) -> KeyedSingleton<MemberId<C>, (), L, Unbounded> {
27 membership
28 .fold(
29 q!(|| false),
30 q!(|present, event| {
31 match event {
32 MembershipEvent::Joined => *present = true,
33 MembershipEvent::Left => *present = false,
34 }
35 }),
36 )
37 .filter_map(q!(|v| if v { Some(()) } else { None }))
38}
39
40fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
41 let root = get_this_crate();
42
43 if is_demux {
44 parse_quote! {
45 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::location::MemberId<_>, #t_type), _>(
46 |(id, data)| {
47 (id.raw_id, #root::runtime_support::bincode::serialize(&data).unwrap().into())
48 }
49 )
50 }
51 } else {
52 parse_quote! {
53 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
54 |data| {
55 #root::runtime_support::bincode::serialize(&data).unwrap().into()
56 }
57 )
58 }
59 }
60}
61
62pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
63 serialize_bincode_with_type(is_demux, "e_type::<T>())
64}
65
66fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
67 let root = get_this_crate();
68
69 if let Some(c_type) = tagged {
70 parse_quote! {
71 |res| {
72 let (id, b) = res.unwrap();
73 (#root::location::MemberId::<#c_type>::from_raw(id), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
74 }
75 }
76 } else {
77 parse_quote! {
78 |res| {
79 #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
80 }
81 }
82 }
83}
84
85pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
86 deserialize_bincode_with_type(tagged, "e_type::<T>())
87}
88
89impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
90 /// "Moves" elements of this stream to a new distributed location by sending them over the network,
91 /// using [`bincode`] to serialize/deserialize messages.
92 ///
93 /// The returned stream captures the elements received at the destination, where values will
94 /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
95 /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
96 /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
97 /// dropped no further messages will be sent.
98 ///
99 /// # Example
100 /// ```rust
101 /// # use hydro_lang::prelude::*;
102 /// # use futures::StreamExt;
103 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
104 /// let p1 = flow.process::<()>();
105 /// let numbers: Stream<_, Process<_>, Unbounded> = p1.source_iter(q!(vec![1, 2, 3]));
106 /// let p2 = flow.process::<()>();
107 /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
108 /// // 1, 2, 3
109 /// # on_p2.send_bincode(&p_out)
110 /// # }, |mut stream| async move {
111 /// # for w in 1..=3 {
112 /// # assert_eq!(stream.next().await, Some(w));
113 /// # }
114 /// # }));
115 /// ```
116 pub fn send_bincode<L2>(
117 self,
118 other: &Process<'a, L2>,
119 ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
120 where
121 T: Serialize + DeserializeOwned,
122 {
123 let serialize_pipeline = Some(serialize_bincode::<T>(false));
124
125 let deserialize_pipeline = Some(deserialize_bincode::<T>(None));
126
127 Stream::new(
128 other.clone(),
129 HydroNode::Network {
130 serialize_fn: serialize_pipeline.map(|e| e.into()),
131 instantiate_fn: DebugInstantiate::Building,
132 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
133 input: Box::new(self.ir_node.into_inner()),
134 metadata: other.new_node_metadata::<T>(),
135 },
136 )
137 }
138
139 /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
140 /// using [`bincode`] to serialize/deserialize messages.
141 ///
142 /// Each element in the stream will be sent to **every** member of the cluster based on the latest
143 /// membership information. This is a common pattern in distributed systems for broadcasting data to
144 /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
145 /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
146 /// each element to all cluster members.
147 ///
148 /// # Non-Determinism
149 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
150 /// to the current cluster members _at that point in time_. Depending on when we are notified of
151 /// membership changes, we will broadcast each element to different members.
152 ///
153 /// # Example
154 /// ```rust
155 /// # use hydro_lang::prelude::*;
156 /// # use futures::StreamExt;
157 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
158 /// let p1 = flow.process::<()>();
159 /// let workers: Cluster<()> = flow.cluster::<()>();
160 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
161 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
162 /// # on_worker.send_bincode(&p2).entries()
163 /// // if there are 4 members in the cluster, each receives one element
164 /// // - MemberId::<()>(0): [123]
165 /// // - MemberId::<()>(1): [123]
166 /// // - MemberId::<()>(2): [123]
167 /// // - MemberId::<()>(3): [123]
168 /// # }, |mut stream| async move {
169 /// # let mut results = Vec::new();
170 /// # for w in 0..4 {
171 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
172 /// # }
173 /// # results.sort();
174 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
175 /// # }));
176 /// ```
177 pub fn broadcast_bincode<L2: 'a>(
178 self,
179 other: &Cluster<'a, L2>,
180 nondet_membership: NonDet,
181 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
182 where
183 T: Clone + Serialize + DeserializeOwned,
184 {
185 let ids = track_membership(self.location.source_cluster_members(other));
186 let join_tick = self.location.tick();
187 let current_members = ids.snapshot(&join_tick, nondet_membership);
188
189 self.batch(&join_tick, nondet_membership)
190 .repeat_with_keys(current_members)
191 .all_ticks()
192 .demux_bincode(other)
193 }
194
195 /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
196 /// serialization. The external process can receive these elements by establishing a TCP
197 /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
198 ///
199 /// # Example
200 /// ```rust
201 /// # use hydro_lang::prelude::*;
202 /// # use futures::StreamExt;
203 /// # tokio_test::block_on(async move {
204 /// let flow = FlowBuilder::new();
205 /// let process = flow.process::<()>();
206 /// let numbers: Stream<_, Process<_>, Unbounded> = process.source_iter(q!(vec![1, 2, 3]));
207 /// let external = flow.external::<()>();
208 /// let external_handle = numbers.send_bincode_external(&external);
209 ///
210 /// let mut deployment = hydro_deploy::Deployment::new();
211 /// let nodes = flow
212 /// .with_process(&process, deployment.Localhost())
213 /// .with_external(&external, deployment.Localhost())
214 /// .deploy(&mut deployment);
215 ///
216 /// deployment.deploy().await.unwrap();
217 /// // establish the TCP connection
218 /// let mut external_recv_stream = nodes.connect_source_bincode(external_handle).await;
219 /// deployment.start().await.unwrap();
220 ///
221 /// for w in 1..=3 {
222 /// assert_eq!(external_recv_stream.next().await, Some(w));
223 /// }
224 /// # });
225 /// ```
226 pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T>
227 where
228 T: Serialize + DeserializeOwned,
229 {
230 let serialize_pipeline = Some(serialize_bincode::<T>(false));
231
232 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
233
234 let external_key = flow_state_borrow.next_external_out;
235 flow_state_borrow.next_external_out += 1;
236
237 flow_state_borrow.push_root(HydroRoot::SendExternal {
238 to_external_id: other.id,
239 to_key: external_key,
240 to_many: false,
241 serialize_fn: serialize_pipeline.map(|e| e.into()),
242 instantiate_fn: DebugInstantiate::Building,
243 input: Box::new(self.ir_node.into_inner()),
244 op_metadata: HydroIrOpMetadata::new(),
245 });
246
247 ExternalBincodeStream {
248 process_id: other.id,
249 port_id: external_key,
250 _phantom: PhantomData,
251 }
252 }
253}
254
255impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
256 Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
257{
258 /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
259 /// using [`bincode`] to serialize/deserialize messages.
260 ///
261 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
262 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
263 /// this API allows precise targeting of specific cluster members rather than broadcasting to
264 /// all members.
265 ///
266 /// # Example
267 /// ```rust
268 /// # use hydro_lang::prelude::*;
269 /// # use futures::StreamExt;
270 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
271 /// let p1 = flow.process::<()>();
272 /// let workers: Cluster<()> = flow.cluster::<()>();
273 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
274 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
275 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)))
276 /// .demux_bincode(&workers);
277 /// # on_worker.send_bincode(&p2).entries()
278 /// // if there are 4 members in the cluster, each receives one element
279 /// // - MemberId::<()>(0): [0]
280 /// // - MemberId::<()>(1): [1]
281 /// // - MemberId::<()>(2): [2]
282 /// // - MemberId::<()>(3): [3]
283 /// # }, |mut stream| async move {
284 /// # let mut results = Vec::new();
285 /// # for w in 0..4 {
286 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
287 /// # }
288 /// # results.sort();
289 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
290 /// # }));
291 /// ```
292 pub fn demux_bincode(
293 self,
294 other: &Cluster<'a, L2>,
295 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
296 where
297 T: Serialize + DeserializeOwned,
298 {
299 self.into_keyed().demux_bincode(other)
300 }
301}
302
303impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
304 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
305 /// [`bincode`] to serialize/deserialize messages.
306 ///
307 /// This provides load balancing by evenly distributing work across cluster members. The
308 /// distribution is deterministic based on element order - the first element goes to member 0,
309 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
310 ///
311 /// # Non-Determinism
312 /// The set of cluster members may asynchronously change over time. Each element is distributed
313 /// based on the current cluster membership _at that point in time_. Depending on when cluster
314 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
315 /// membership is stable, the order of members in the round-robin pattern may change across runs.
316 ///
317 /// # Ordering Requirements
318 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
319 /// order of messages and retries affects the round-robin pattern.
320 ///
321 /// # Example
322 /// ```rust
323 /// # use hydro_lang::prelude::*;
324 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
325 /// # use futures::StreamExt;
326 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
327 /// let p1 = flow.process::<()>();
328 /// let workers: Cluster<()> = flow.cluster::<()>();
329 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
330 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
331 /// on_worker.send_bincode(&p2)
332 /// # .first().values() // we use first to assert that each member gets one element
333 /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
334 /// // - MemberId::<()>(?): [1]
335 /// // - MemberId::<()>(?): [2]
336 /// // - MemberId::<()>(?): [3]
337 /// // - MemberId::<()>(?): [4]
338 /// # }, |mut stream| async move {
339 /// # let mut results = Vec::new();
340 /// # for w in 0..4 {
341 /// # results.push(stream.next().await.unwrap());
342 /// # }
343 /// # results.sort();
344 /// # assert_eq!(results, vec![1, 2, 3, 4]);
345 /// # }));
346 /// ```
347 pub fn round_robin_bincode<L2: 'a>(
348 self,
349 other: &Cluster<'a, L2>,
350 nondet_membership: NonDet,
351 ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
352 where
353 T: Serialize + DeserializeOwned,
354 {
355 let ids = track_membership(self.location.source_cluster_members(other));
356 let join_tick = self.location.tick();
357 let current_members = ids
358 .snapshot(&join_tick, nondet_membership)
359 .keys()
360 .assume_ordering(nondet_membership)
361 .collect_vec();
362
363 self.enumerate()
364 .batch(&join_tick, nondet_membership)
365 .cross_singleton(current_members)
366 .map(q!(|(data, members)| (
367 members[data.0 % members.len()],
368 data.1
369 )))
370 .all_ticks()
371 .demux_bincode(other)
372 }
373}
374
375impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
376 /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
377 /// using [`bincode`] to serialize/deserialize messages.
378 ///
379 /// Each cluster member sends its local stream elements, and they are collected at the destination
380 /// as a [`KeyedStream`] where keys identify the source cluster member.
381 ///
382 /// # Example
383 /// ```rust
384 /// # use hydro_lang::prelude::*;
385 /// # use futures::StreamExt;
386 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
387 /// let workers: Cluster<()> = flow.cluster::<()>();
388 /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
389 /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
390 /// # all_received.entries()
391 /// # }, |mut stream| async move {
392 /// // if there are 4 members in the cluster, we should receive 4 elements
393 /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
394 /// # let mut results = Vec::new();
395 /// # for w in 0..4 {
396 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
397 /// # }
398 /// # results.sort();
399 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
400 /// # }));
401 /// ```
402 ///
403 /// If you don't need to know the source for each element, you can use `.values()`
404 /// to get just the data:
405 /// ```rust
406 /// # use hydro_lang::prelude::*;
407 /// # use hydro_lang::live_collections::stream::NoOrder;
408 /// # use futures::StreamExt;
409 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
410 /// # let workers: Cluster<()> = flow.cluster::<()>();
411 /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
412 /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
413 /// # values
414 /// # }, |mut stream| async move {
415 /// # let mut results = Vec::new();
416 /// # for w in 0..4 {
417 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
418 /// # }
419 /// # results.sort();
420 /// // if there are 4 members in the cluster, we should receive 4 elements
421 /// // 1, 1, 1, 1
422 /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
423 /// # }));
424 /// ```
425 pub fn send_bincode<L2>(
426 self,
427 other: &Process<'a, L2>,
428 ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
429 where
430 T: Serialize + DeserializeOwned,
431 {
432 let serialize_pipeline = Some(serialize_bincode::<T>(false));
433
434 let deserialize_pipeline = Some(deserialize_bincode::<T>(Some("e_type::<L>())));
435
436 let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
437 other.clone(),
438 HydroNode::Network {
439 serialize_fn: serialize_pipeline.map(|e| e.into()),
440 instantiate_fn: DebugInstantiate::Building,
441 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
442 input: Box::new(self.ir_node.into_inner()),
443 metadata: other.new_node_metadata::<(MemberId<L>, T)>(),
444 },
445 );
446
447 raw_stream.into_keyed()
448 }
449
450 /// Broadcasts elements of this stream at each source member to all members of a destination
451 /// cluster, using [`bincode`] to serialize/deserialize messages.
452 ///
453 /// Each source member sends each of its stream elements to **every** member of the cluster
454 /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
455 /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
456 /// **only data elements** and sends each element to all cluster members.
457 ///
458 /// # Non-Determinism
459 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
460 /// to the current cluster members known _at that point in time_ at the source member. Depending
461 /// on when each source member is notified of membership changes, it will broadcast each element
462 /// to different members.
463 ///
464 /// # Example
465 /// ```rust
466 /// # use hydro_lang::prelude::*;
467 /// # use hydro_lang::location::MemberId;
468 /// # use futures::StreamExt;
469 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
470 /// # type Source = ();
471 /// # type Destination = ();
472 /// let source: Cluster<Source> = flow.cluster::<Source>();
473 /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
474 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
475 /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
476 /// # on_destination.entries().send_bincode(&p2).entries()
477 /// // if there are 4 members in the desination, each receives one element from each source member
478 /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
479 /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
480 /// // - ...
481 /// # }, |mut stream| async move {
482 /// # let mut results = Vec::new();
483 /// # for w in 0..16 {
484 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
485 /// # }
486 /// # results.sort();
487 /// # assert_eq!(results, vec![
488 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
489 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
490 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
491 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
492 /// # ]);
493 /// # }));
494 /// ```
495 pub fn broadcast_bincode<L2: 'a>(
496 self,
497 other: &Cluster<'a, L2>,
498 nondet_membership: NonDet,
499 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
500 where
501 T: Clone + Serialize + DeserializeOwned,
502 {
503 let ids = track_membership(self.location.source_cluster_members(other));
504 let join_tick = self.location.tick();
505 let current_members = ids.snapshot(&join_tick, nondet_membership);
506
507 self.batch(&join_tick, nondet_membership)
508 .repeat_with_keys(current_members)
509 .all_ticks()
510 .demux_bincode(other)
511 }
512}
513
514impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
515 Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
516{
517 /// Sends elements of this stream at each source member to specific members of a destination
518 /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
519 ///
520 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
521 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
522 /// this API allows precise targeting of specific cluster members rather than broadcasting to
523 /// all members.
524 ///
525 /// Each cluster member sends its local stream elements, and they are collected at each
526 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
527 ///
528 /// # Example
529 /// ```rust
530 /// # use hydro_lang::prelude::*;
531 /// # use futures::StreamExt;
532 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
533 /// # type Source = ();
534 /// # type Destination = ();
535 /// let source: Cluster<Source> = flow.cluster::<Source>();
536 /// let to_send: Stream<_, Cluster<_>, _> = source
537 /// .source_iter(q!(vec![0, 1, 2, 3]))
538 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)));
539 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
540 /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
541 /// # all_received.entries().send_bincode(&p2).entries()
542 /// # }, |mut stream| async move {
543 /// // if there are 4 members in the destination cluster, each receives one message from each source member
544 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
545 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
546 /// // - ...
547 /// # let mut results = Vec::new();
548 /// # for w in 0..16 {
549 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
550 /// # }
551 /// # results.sort();
552 /// # assert_eq!(results, vec![
553 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
554 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
555 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
556 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
557 /// # ]);
558 /// # }));
559 /// ```
560 pub fn demux_bincode(
561 self,
562 other: &Cluster<'a, L2>,
563 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
564 where
565 T: Serialize + DeserializeOwned,
566 {
567 self.into_keyed().demux_bincode(other)
568 }
569}