Skip to content

Commit a0d9bc1

Browse files
committed
merge conflicts
1 parent 3e02947 commit a0d9bc1

File tree

8 files changed

+294
-197
lines changed

8 files changed

+294
-197
lines changed

crates/op-rbuilder/src/generator.rs

+131-79
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1+
use alloy_eips::merge::SLOT_DURATION;
12
use alloy_primitives::B256;
23
use futures_util::Future;
34
use futures_util::FutureExt;
45
use reth::builder::BuilderContext;
56
use reth::providers::BlockReaderIdExt;
67
use reth::{providers::StateProviderFactory, tasks::TaskSpawner};
7-
use reth_basic_payload_builder::BasicPayloadJobGeneratorConfig;
88
use reth_basic_payload_builder::BuildOutcome;
99
use reth_basic_payload_builder::HeaderForPayload;
1010
use reth_basic_payload_builder::PayloadConfig;
@@ -24,6 +24,7 @@ use reth_payload_primitives::BuiltPayload;
2424
use reth_primitives_traits::HeaderTy;
2525
use reth_revm::cached::CachedReads;
2626
use reth_revm::cancelled::CancelOnDrop;
27+
use reth_revm::cancelled::ManualCancel;
2728
use reth_transaction_pool::TransactionPool;
2829
use std::sync::{Arc, Mutex};
2930
use std::time::SystemTime;
@@ -102,6 +103,60 @@ pub trait PayloadBuilder: Send + Sync + Clone {
102103
) -> Result<BuildOutcome<Self::BuiltPayload>, PayloadBuilderError>;
103104
}
104105

106+
/// Settings for the [`BasicPayloadJobGenerator`].
107+
#[derive(Debug, Clone)]
108+
pub struct BasicPayloadJobGeneratorConfig {
109+
/// The interval at which the job should build a new payload after the last.
110+
pub interval: Duration,
111+
/// The deadline for when the payload builder job should resolve.
112+
///
113+
/// By default this is [`SLOT_DURATION`]: 12s
114+
pub deadline: Duration,
115+
/// Maximum number of tasks to spawn for building a payload.
116+
pub max_payload_tasks: usize,
117+
}
118+
119+
// === impl BasicPayloadJobGeneratorConfig ===
120+
121+
impl BasicPayloadJobGeneratorConfig {
122+
/// Sets the interval at which the job should build a new payload after the last.
123+
pub const fn interval(mut self, interval: Duration) -> Self {
124+
self.interval = interval;
125+
self
126+
}
127+
128+
/// Sets the deadline when this job should resolve.
129+
pub const fn deadline(mut self, deadline: Duration) -> Self {
130+
self.deadline = deadline;
131+
self
132+
}
133+
134+
/// Sets the maximum number of tasks to spawn for building a payload(s).
135+
///
136+
/// # Panics
137+
///
138+
/// If `max_payload_tasks` is 0.
139+
pub fn max_payload_tasks(mut self, max_payload_tasks: usize) -> Self {
140+
assert!(
141+
max_payload_tasks > 0,
142+
"max_payload_tasks must be greater than 0"
143+
);
144+
self.max_payload_tasks = max_payload_tasks;
145+
self
146+
}
147+
}
148+
149+
impl Default for BasicPayloadJobGeneratorConfig {
150+
fn default() -> Self {
151+
Self {
152+
interval: Duration::from_secs(1),
153+
// 12s slot time
154+
deadline: SLOT_DURATION,
155+
max_payload_tasks: 3,
156+
}
157+
}
158+
}
159+
105160
/// The generator type that creates new jobs that builds empty blocks.
106161
#[derive(Debug)]
107162
pub struct BlockPayloadJobGenerator<Client, Tasks, Builder> {
@@ -110,7 +165,7 @@ pub struct BlockPayloadJobGenerator<Client, Tasks, Builder> {
110165
/// How to spawn building tasks
111166
executor: Tasks,
112167
/// The configuration for the job generator.
113-
_config: BasicPayloadJobGeneratorConfig,
168+
config: BasicPayloadJobGeneratorConfig,
114169
/// Restricts how many generator tasks can be executed at once.
115170
payload_task_guard: PayloadTaskGuard,
116171
/// The type responsible for building payloads.
@@ -135,8 +190,8 @@ impl<Client, Tasks, Builder> BlockPayloadJobGenerator<Client, Tasks, Builder> {
135190
Self {
136191
client,
137192
executor,
138-
payload_task_guard: PayloadTaskGuard::new(3), // TODO: use configured value
139-
_config: config,
193+
payload_task_guard: PayloadTaskGuard::new(config.max_payload_tasks),
194+
config,
140195
builder,
141196
pre_cached: None,
142197
}
@@ -173,7 +228,7 @@ where
173228
type Job = BlockPayloadJob<Tasks, Builder>;
174229

175230
/// This is invoked when the node receives payload attributes from the beacon node via
176-
/// `engine_forkchoiceUpdatedV1`
231+
/// `engine_forkchoiceUpdated`
177232
fn new_payload_job(
178233
&self,
179234
attributes: <Builder as PayloadBuilder>::Attributes,
@@ -213,7 +268,7 @@ where
213268
config,
214269
deadline,
215270
// ticks immediately
216-
interval: tokio::time::interval(Duration::from_secs(1)), // TODO: use configured value
271+
_interval: tokio::time::interval(self.config.interval),
217272
best_payload: PayloadState::Missing,
218273
pending_block: None,
219274
cached_reads,
@@ -234,6 +289,12 @@ use std::{
234289

235290
use crate::metrics::PayloadBuilderMetrics;
236291

292+
#[derive(Debug)]
293+
pub struct PendingBlock<T> {
294+
cancel: ManualCancel,
295+
pending: PendingPayload<T>,
296+
}
297+
237298
/// A [PayloadJob] that builds empty blocks.
238299
pub struct BlockPayloadJob<Tasks, Builder>
239300
where
@@ -246,11 +307,11 @@ where
246307
/// The deadline when this job should resolve.
247308
deadline: Pin<Box<Sleep>>,
248309
/// The interval at which the job should build a new payload after the last.
249-
interval: Interval,
310+
_interval: Interval,
250311
/// The best payload so far and its state.
251312
best_payload: PayloadState<Builder::BuiltPayload>,
252313
/// Receiver for the block that is currently being built.
253-
pending_block: Option<PendingPayload<Builder::BuiltPayload>>,
314+
pending_block: Option<PendingBlock<Builder::BuiltPayload>>,
254315
/// Restricts how many generator tasks can be executed at once.
255316
payload_task_guard: PayloadTaskGuard,
256317
/// Caches all disk reads for the state the new payloads builds on
@@ -306,7 +367,11 @@ where
306367
self.spawn_build_job();
307368
}
308369

309-
let maybe_better = self.pending_block.take();
370+
let maybe_better = self.pending_block.take().map(|pending_block| {
371+
// cancel any pending blocks
372+
pending_block.cancel.cancel();
373+
pending_block.pending
374+
});
310375
let empty_payload = None;
311376

312377
if best_payload.is_none() {
@@ -329,7 +394,7 @@ pub struct BuildArguments<Attributes, Payload: BuiltPayload> {
329394
/// How to configure the payload.
330395
pub config: PayloadConfig<Attributes, HeaderTy<Payload::Primitives>>,
331396
/// A marker that can be used to cancel the job.
332-
pub cancel: CancelOnDrop,
397+
pub cancel: ManualCancel,
333398
/// The best payload achieved so far.
334399
pub best_payload: Option<Payload>,
335400
}
@@ -345,8 +410,8 @@ where
345410
pub fn spawn_build_job(&mut self) {
346411
trace!(target: "payload_builder", id = %self.config.payload_id(), "spawn new payload build task");
347412
let (tx, rx) = oneshot::channel();
348-
let cancel = CancelOnDrop::default();
349-
let _cancel = cancel.clone();
413+
let cancel = ManualCancel::default();
414+
let cancel_clone = cancel.clone();
350415
let guard = self.payload_task_guard.clone();
351416
let payload_config = self.config.clone();
352417
let best_payload = self.best_payload.payload().cloned();
@@ -365,8 +430,10 @@ where
365430
let result = builder.try_build(args);
366431
let _ = tx.send(result);
367432
}));
368-
369-
self.pending_block = Some(PendingPayload::new(_cancel, rx));
433+
self.pending_block = Some(PendingBlock {
434+
cancel: cancel_clone,
435+
pending: PendingPayload::new(CancelOnDrop::default(), rx),
436+
});
370437
}
371438
}
372439

@@ -383,24 +450,27 @@ where
383450
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
384451
tracing::trace!("Polling job");
385452
let this = self.get_mut();
386-
453+
let (cancel, pending) = this
454+
.pending_block
455+
.take()
456+
.map(|pending_block| {
457+
(
458+
Some(pending_block.cancel.clone()),
459+
Some(pending_block.pending),
460+
)
461+
})
462+
.unwrap_or_default();
387463
// check if the deadline is reached
388464
if this.deadline.as_mut().poll(cx).is_ready() {
389465
trace!(target: "payload_builder", "payload building deadline reached");
390-
return Poll::Ready(Ok(()));
391-
}
392-
393-
// check if the interval is reached
394-
while this.interval.poll_tick(cx).is_ready() {
395-
// start a new job if there is no pending block, we haven't reached the deadline,
396-
// and the payload isn't frozen
397-
if this.pending_block.is_none() && !this.best_payload.is_frozen() {
398-
this.spawn_build_job();
466+
if let Some(cancel) = cancel {
467+
cancel.cancel();
399468
}
469+
return Poll::Ready(Ok(()));
400470
}
401471

402472
// poll the pending block
403-
if let Some(mut fut) = this.pending_block.take() {
473+
if let Some(mut fut) = pending {
404474
match fut.poll_unpin(cx) {
405475
Poll::Ready(Ok(outcome)) => match outcome {
406476
BuildOutcome::Better {
@@ -429,7 +499,10 @@ where
429499
this.metrics.inc_failed_payload_builds();
430500
}
431501
Poll::Pending => {
432-
this.pending_block = Some(fut);
502+
this.pending_block = Some(PendingBlock {
503+
cancel: cancel.unwrap(),
504+
pending: fut,
505+
});
433506
}
434507
}
435508
}
@@ -548,14 +621,18 @@ mod tests {
548621
use reth::tasks::TokioTaskExecutor;
549622
use reth_chain_state::ExecutedBlockWithTrieUpdates;
550623
use reth_node_api::NodePrimitives;
624+
use reth_optimism_node::{OpBuiltPayload, OpEngineTypes};
551625
use reth_optimism_payload_builder::payload::OpPayloadBuilderAttributes;
552626
use reth_optimism_payload_builder::OpPayloadPrimitives;
553627
use reth_optimism_primitives::OpPrimitives;
628+
use reth_payload_builder::PayloadBuilderService;
554629
use reth_primitives::SealedBlock;
555630
use reth_provider::test_utils::MockEthProvider;
631+
use reth_provider::CanonStateSubscriptions;
556632
use reth_testing_utils::generators::{random_block_range, BlockRangeParams};
557633
use tokio::task;
558634
use tokio::time::{sleep, Duration};
635+
use tracing::Level;
559636

560637
#[tokio::test]
561638
async fn test_block_cell_wait_for_value() {
@@ -649,32 +726,6 @@ mod tests {
649726
}
650727
}
651728

652-
#[derive(Clone, Debug, Default)]
653-
struct MockPayload;
654-
655-
impl BuiltPayload for MockPayload {
656-
type Primitives = OpPrimitives;
657-
658-
fn block(&self) -> &SealedBlock<<Self::Primitives as NodePrimitives>::Block> {
659-
unimplemented!()
660-
}
661-
662-
/// Returns the fees collected for the built block
663-
fn fees(&self) -> U256 {
664-
unimplemented!()
665-
}
666-
667-
/// Returns the entire execution data for the built block, if available.
668-
fn executed_block(&self) -> Option<ExecutedBlockWithTrieUpdates<Self::Primitives>> {
669-
None
670-
}
671-
672-
/// Returns the EIP-7865 requests for the payload if any.
673-
fn requests(&self) -> Option<Requests> {
674-
unimplemented!()
675-
}
676-
}
677-
678729
#[derive(Debug, PartialEq, Clone)]
679730
enum BlockEvent {
680731
Started,
@@ -686,7 +737,7 @@ mod tests {
686737
N: OpPayloadPrimitives,
687738
{
688739
type Attributes = OpPayloadBuilderAttributes<N::SignedTx>;
689-
type BuiltPayload = MockPayload;
740+
type BuiltPayload = OpBuiltPayload<N>;
690741

691742
fn try_build(
692743
&self,
@@ -718,28 +769,30 @@ mod tests {
718769
// 2 seconds in the future
719770
let future_unix_timestamp = current_unix_time + 2;
720771
let deadline = job_deadline(future_unix_timestamp);
721-
assert!(deadline <= now + Duration::from_secs(2));
772+
assert!(deadline.duration_since(now).as_secs() <= 2);
722773
assert!(deadline > now);
723774

724775
// Test past deadline
725776
let past_unix_timestamp = current_unix_time - 10;
726777
let deadline = job_deadline(past_unix_timestamp);
727778
// Should default to 1 second when timestamp is in the past
728-
assert_eq!(deadline, now + Duration::from_secs(1));
779+
assert_eq!(deadline.duration_since(now).as_secs(), 1);
729780

730781
// Test current timestamp
731782
let deadline = job_deadline(current_unix_time);
732783
// Should use 1 second when timestamp is current
733-
assert_eq!(deadline, now + Duration::from_secs(1));
784+
assert_eq!(deadline.duration_since(now).as_secs(), 1);
734785
}
735786

736787
#[tokio::test]
737788
async fn test_payload_generator() -> eyre::Result<()> {
789+
tracing::subscriber::set_global_default(tracing_subscriber::fmt().finish()).unwrap();
790+
738791
let mut rng = thread_rng();
739792

740793
let client = MockEthProvider::default();
741794
let executor = TokioTaskExecutor::default();
742-
let config = BasicPayloadJobGeneratorConfig::default();
795+
let config = BasicPayloadJobGeneratorConfig::default().max_payload_tasks(1);
743796
let builder = MockBuilder::<OpPrimitives>::new();
744797

745798
let (start, count) = (1, 10);
@@ -761,32 +814,31 @@ mod tests {
761814
builder.clone(),
762815
);
763816

817+
let (payload_service, payload_service_handle) =
818+
PayloadBuilderService::<_, _, OpEngineTypes>::new(
819+
generator,
820+
client.canonical_state_stream(),
821+
);
822+
tokio::spawn(async move {
823+
let _ = payload_service.await;
824+
});
825+
764826
// this is not nice but necessary
765827
let mut attr = OpPayloadBuilderAttributes::default();
766828
attr.payload_attributes.parent = client.latest_header()?.unwrap().hash();
767829

768-
{
769-
let job = generator.new_payload_job(attr.clone())?;
770-
let _ = job.await;
771-
772-
// you need to give one second for the job to be dropped and cancelled the internal job
773-
tokio::time::sleep(Duration::from_secs(1)).await;
774-
775-
let events = builder.get_events();
776-
assert_eq!(events, vec![BlockEvent::Started, BlockEvent::Cancelled]);
777-
}
778-
779-
{
780-
// job resolve triggers cancellations from the build task
781-
let mut job = generator.new_payload_job(attr.clone())?;
782-
let _ = job.resolve();
783-
let _ = job.await;
784-
785-
tokio::time::sleep(Duration::from_secs(1)).await;
786-
787-
let events = builder.get_events();
788-
assert_eq!(events, vec![BlockEvent::Started, BlockEvent::Cancelled]);
789-
}
830+
let _ = payload_service_handle.send_new_payload(attr.clone()).await;
831+
// job resolve triggers cancellations from the build task
832+
let _ = payload_service_handle
833+
.resolve_kind(attr.payload_id(), PayloadKind::Earliest)
834+
.await;
835+
let events = builder.get_events();
836+
assert_eq!(events, vec![BlockEvent::Started, BlockEvent::Cancelled]);
837+
838+
let _ = payload_service_handle.send_new_payload(attr.clone()).await;
839+
tokio::time::sleep(Duration::from_secs(3)).await;
840+
let events: Vec<BlockEvent> = builder.get_events();
841+
assert_eq!(events, vec![BlockEvent::Started, BlockEvent::Cancelled]);
790842

791843
Ok(())
792844
}

0 commit comments

Comments
 (0)