Skip to content

Commit 31b5548

Browse files
authored
feat: add stream helper types (paradigmxyz#10163)
1 parent 9ad7ebb commit 31b5548

File tree

3 files changed

+78
-1
lines changed

3 files changed

+78
-1
lines changed

Diff for: Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: crates/payload/builder/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ reth-ethereum-engine-primitives.workspace = true
2525
tokio = { workspace = true, features = ["sync"] }
2626
tokio-stream.workspace = true
2727
futures-util.workspace = true
28+
pin-project.workspace = true
2829

2930
# metrics
3031
reth-metrics.workspace = true

Diff for: crates/payload/builder/src/events.rs

+76-1
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
use reth_payload_primitives::PayloadTypes;
2+
use std::{
3+
pin::Pin,
4+
task::{ready, Context, Poll},
5+
};
26
use tokio::sync::broadcast;
37
use tokio_stream::{
48
wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
5-
StreamExt,
9+
Stream, StreamExt,
610
};
11+
use tracing::debug;
712

813
/// Payload builder events.
914
#[derive(Clone, Debug)]
@@ -34,4 +39,74 @@ impl<Engine: PayloadTypes + 'static> PayloadEvents<Engine> {
3439
let mut event_stream = self.into_stream();
3540
event_stream.next().await
3641
}
42+
43+
/// Returns a new stream that yields all built payloads.
44+
pub fn into_built_payload_stream(self) -> BuiltPayloadStream<Engine> {
45+
BuiltPayloadStream { st: self.into_stream() }
46+
}
47+
48+
/// Returns a new stream that yields received payload attributes
49+
pub fn into_attributes_stream(self) -> PayloadAttributeStream<Engine> {
50+
PayloadAttributeStream { st: self.into_stream() }
51+
}
52+
}
53+
54+
/// A stream that yields built payloads.
55+
#[derive(Debug)]
56+
#[pin_project::pin_project]
57+
pub struct BuiltPayloadStream<T: PayloadTypes> {
58+
/// The stream of events.
59+
#[pin]
60+
st: BroadcastStream<Events<T>>,
61+
}
62+
63+
impl<T: PayloadTypes + 'static> Stream for BuiltPayloadStream<T> {
64+
type Item = T::BuiltPayload;
65+
66+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
67+
loop {
68+
return match ready!(self.as_mut().project().st.poll_next(cx)) {
69+
Some(Ok(Events::BuiltPayload(payload))) => Poll::Ready(Some(payload)),
70+
Some(Ok(Events::Attributes(_))) => {
71+
// ignoring attributes
72+
continue
73+
}
74+
Some(Err(err)) => {
75+
debug!(%err, "payload event stream stream lagging behind");
76+
continue
77+
}
78+
None => Poll::Ready(None),
79+
}
80+
}
81+
}
82+
}
83+
84+
/// A stream that yields received payload attributes
85+
#[derive(Debug)]
86+
#[pin_project::pin_project]
87+
pub struct PayloadAttributeStream<T: PayloadTypes> {
88+
/// The stream of events.
89+
#[pin]
90+
st: BroadcastStream<Events<T>>,
91+
}
92+
93+
impl<T: PayloadTypes + 'static> Stream for PayloadAttributeStream<T> {
94+
type Item = T::PayloadBuilderAttributes;
95+
96+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
97+
loop {
98+
return match ready!(self.as_mut().project().st.poll_next(cx)) {
99+
Some(Ok(Events::Attributes(attr))) => Poll::Ready(Some(attr)),
100+
Some(Ok(Events::BuiltPayload(_))) => {
101+
// ignoring payloads
102+
continue
103+
}
104+
Some(Err(err)) => {
105+
debug!(%err, "payload event stream stream lagging behind");
106+
continue
107+
}
108+
None => Poll::Ready(None),
109+
}
110+
}
111+
}
37112
}

0 commit comments

Comments
 (0)