Skip to content

Commit 4d382b9

Browse files
authored
Merge pull request #2477 from input-output-hk/jpraynaud/2470-signature-processor-dmq
Feat: implement a signature processor for DMQ
2 parents 7dd379c + d138a2b commit 4d382b9

File tree

22 files changed

+459
-42
lines changed

22 files changed

+459
-42
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ As a minor extension, we have adopted a slightly different versioning convention
99

1010
## Mithril Distribution [XXXX] - UNRELEASED
1111

12+
- Support for DMQ signature consumer and processor in the aggregator.
13+
1214
- Crates versions:
1315

1416
| Crate | Version |

Cargo.lock

+3-3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/mithril-metric/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "mithril-metric"
3-
version = "0.1.13"
3+
version = "0.1.14"
44
description = "Common tools to expose metrics."
55
authors = { workspace = true }
66
edition = { workspace = true }

internal/mithril-metric/src/server.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use axum::{
88
};
99
use slog::{error, info, warn, Logger};
1010
use std::sync::Arc;
11-
use tokio::sync::oneshot::Receiver;
11+
use tokio::sync::watch::Receiver;
1212

1313
use mithril_common::logging::LoggerExtensions;
1414
use mithril_common::StdResult;
@@ -93,9 +93,10 @@ impl<T: MetricsServiceExporter + 'static> MetricsServer<T> {
9393
.await?;
9494

9595
let serve_logger = self.logger.clone();
96+
let mut shutdown_rx = shutdown_rx;
9697
axum::serve(listener, app)
9798
.with_graceful_shutdown(async move {
98-
shutdown_rx.await.ok();
99+
shutdown_rx.changed().await.ok();
99100
warn!(
100101
serve_logger,
101102
"shutting down HTTP server after receiving signal"
@@ -112,7 +113,7 @@ mod tests {
112113
use anyhow::anyhow;
113114
use reqwest::StatusCode;
114115
use std::time::Duration;
115-
use tokio::{sync::oneshot, task::yield_now, time::sleep};
116+
use tokio::{sync::watch, task::yield_now, time::sleep};
116117

117118
use crate::helper::test_tools::TestLogger;
118119

@@ -135,7 +136,7 @@ mod tests {
135136
#[tokio::test]
136137
async fn test_metrics_server() {
137138
let logger = TestLogger::stdout();
138-
let (shutdown_tx, shutdown_rx) = oneshot::channel();
139+
let (shutdown_tx, shutdown_rx) = watch::channel(());
139140
let metrics_service = Arc::new(PseudoMetricsService::new());
140141
let metrics_server = Arc::new(MetricsServer::new(
141142
"0.0.0.0",

mithril-aggregator/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "mithril-aggregator"
3-
version = "0.7.50"
3+
version = "0.7.51"
44
description = "A Mithril Aggregator server"
55
authors = { workspace = true }
66
edition = { workspace = true }

mithril-aggregator/src/commands/serve_command.rs

+37-18
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use clap::Parser;
1313
use config::{builder::DefaultState, ConfigBuilder, Map, Source, Value};
1414

1515
use slog::{crit, debug, info, warn, Logger};
16-
use tokio::{sync::oneshot, task::JoinSet};
16+
use tokio::task::JoinSet;
1717

1818
use mithril_cli_helper::{
1919
register_config_value, register_config_value_bool, register_config_value_option,
@@ -132,18 +132,23 @@ impl ServeCommand {
132132
let mut dependencies_builder =
133133
DependenciesBuilder::new(root_logger.clone(), Arc::new(config.clone()));
134134

135-
// start servers
136135
println!("Starting server...");
137136
println!("Press Ctrl+C to stop");
138137

139-
// start the monitoring thread
138+
// Create the stop signal channel
139+
let (stop_tx, stop_rx) = dependencies_builder
140+
.get_stop_signal_channel()
141+
.await
142+
.with_context(|| "Dependencies Builder can not create stop signal channel")?;
143+
144+
// Start the monitoring thread
140145
let mut event_store = dependencies_builder
141146
.create_event_store()
142147
.await
143148
.with_context(|| "Dependencies Builder can not create event store")?;
144149
let event_store_thread = tokio::spawn(async move { event_store.run().await.unwrap() });
145150

146-
// start the database vacuum operation, if needed
151+
// Start the database vacuum operation, if needed
147152
self.perform_database_vacuum_if_needed(
148153
&config.data_stores_directory,
149154
&mut dependencies_builder,
@@ -152,15 +157,15 @@ impl ServeCommand {
152157
)
153158
.await?;
154159

155-
// start the aggregator runtime
160+
// Start the aggregator runtime
156161
let mut runtime = dependencies_builder
157162
.create_aggregator_runner()
158163
.await
159164
.with_context(|| "Dependencies Builder can not create aggregator runner")?;
160165
let mut join_set = JoinSet::new();
161166
join_set.spawn(async move { runtime.run().await.map_err(|e| e.to_string()) });
162167

163-
// start the cardano transactions preloader
168+
// Start the cardano transactions preloader
164169
let cardano_transactions_preloader = dependencies_builder
165170
.create_cardano_transactions_preloader()
166171
.await
@@ -170,27 +175,41 @@ impl ServeCommand {
170175
let preload_task =
171176
tokio::spawn(async move { cardano_transactions_preloader.preload().await });
172177

173-
// start the HTTP server
174-
let (shutdown_tx, shutdown_rx) = oneshot::channel();
178+
// Start the HTTP server
175179
let routes = dependencies_builder
176180
.create_http_routes()
177181
.await
178182
.with_context(|| "Dependencies Builder can not create http routes")?;
183+
let mut stop_rx_clone = stop_rx.clone();
179184
join_set.spawn(async move {
180185
let (_, server) = warp::serve(routes).bind_with_graceful_shutdown(
181186
(
182187
config.server_ip.clone().parse::<IpAddr>().unwrap(),
183188
config.server_port,
184189
),
185-
async {
186-
shutdown_rx.await.ok();
190+
async move {
191+
stop_rx_clone.changed().await.ok();
187192
},
188193
);
189194
server.await;
190195

191196
Ok(())
192197
});
193198

199+
let signature_processor = dependencies_builder
200+
.create_signature_processor()
201+
.await
202+
.with_context(|| "Dependencies Builder can not create signature processor")?;
203+
let signature_processor_clone = signature_processor.clone();
204+
join_set.spawn(async move {
205+
signature_processor_clone
206+
.run()
207+
.await
208+
.map_err(|e| e.to_string())?;
209+
210+
Ok(())
211+
});
212+
194213
// Create a SignersImporter only if the `cexplorer_pools_url` is provided in the config.
195214
if let Some(cexplorer_pools_url) = config.cexplorer_pools_url {
196215
match dependencies_builder
@@ -236,7 +255,7 @@ impl ServeCommand {
236255
.get_metrics_service()
237256
.await
238257
.with_context(|| "Metrics service initialization error")?;
239-
let (metrics_server_shutdown_tx, metrics_server_shutdown_rx) = oneshot::channel();
258+
let stop_rx_clone = stop_rx.clone();
240259
if config.enable_metrics_server {
241260
let metrics_logger = root_logger.clone();
242261
join_set.spawn(async move {
@@ -246,7 +265,7 @@ impl ServeCommand {
246265
metrics_service,
247266
metrics_logger.clone(),
248267
)
249-
.start(metrics_server_shutdown_rx)
268+
.start(stop_rx_clone)
250269
.await
251270
.map_err(|e| anyhow!(e));
252271

@@ -261,13 +280,13 @@ impl ServeCommand {
261280
crit!(root_logger, "A critical error occurred"; "error" => e);
262281
}
263282

264-
metrics_server_shutdown_tx
265-
.send(())
266-
.map_err(|e| anyhow!("Metrics server shutdown signal could not be sent: {e:?}"))?;
267-
268-
// stop servers
283+
// Stop servers
269284
join_set.shutdown().await;
270-
let _ = shutdown_tx.send(());
285+
286+
// Send the stop signal
287+
stop_tx
288+
.send(())
289+
.map_err(|e| anyhow!("Stop signal could not be sent: {e:?}"))?;
271290

272291
if !preload_task.is_finished() {
273292
preload_task.abort();

mithril-aggregator/src/dependency_injection/builder/enablers/misc.rs

+23-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use crate::dependency_injection::{DependenciesBuilder, Result};
1414
use crate::get_dependency;
1515
use crate::services::{
1616
AggregatorClient, AggregatorHTTPClient, MessageService, MithrilMessageService,
17+
SequentialSignatureProcessor, SignatureConsumer, SignatureConsumerNoop, SignatureProcessor,
1718
};
1819
impl DependenciesBuilder {
1920
async fn build_signed_entity_type_lock(&mut self) -> Result<Arc<SignedEntityTypeLock>> {
@@ -26,7 +27,7 @@ impl DependenciesBuilder {
2627
get_dependency!(self.signed_entity_type_lock)
2728
}
2829

29-
/// build HTTP message service
30+
/// Builds HTTP message service
3031
pub async fn build_message_service(&mut self) -> Result<Arc<dyn MessageService>> {
3132
let certificate_repository = Arc::new(CertificateRepository::new(
3233
self.get_sqlite_connection().await?,
@@ -49,7 +50,7 @@ impl DependenciesBuilder {
4950
get_dependency!(self.message_service)
5051
}
5152

52-
/// build an [AggregatorClient]
53+
/// Builds an [AggregatorClient]
5354
pub async fn build_leader_aggregator_client(&mut self) -> Result<Arc<dyn AggregatorClient>> {
5455
let leader_aggregator_endpoint = self
5556
.configuration
@@ -70,4 +71,24 @@ impl DependenciesBuilder {
7071
pub async fn get_leader_aggregator_client(&mut self) -> Result<Arc<dyn AggregatorClient>> {
7172
get_dependency!(self.leader_aggregator_client)
7273
}
74+
75+
/// Builds a [SignatureConsumer]
76+
pub async fn build_signature_consumer(&mut self) -> Result<Arc<dyn SignatureConsumer>> {
77+
let signature_consumer = SignatureConsumerNoop;
78+
79+
Ok(Arc::new(signature_consumer))
80+
}
81+
82+
/// Builds a [SignatureProcessor]
83+
pub async fn create_signature_processor(&mut self) -> Result<Arc<dyn SignatureProcessor>> {
84+
let (_stop_tx, stop_rx) = self.get_stop_signal_channel().await?;
85+
let signature_processor = SequentialSignatureProcessor::new(
86+
self.build_signature_consumer().await?,
87+
self.get_certifier_service().await?,
88+
stop_rx,
89+
self.root_logger(),
90+
);
91+
92+
Ok(Arc::new(signature_processor))
93+
}
7394
}

mithril-aggregator/src/dependency_injection/builder/mod.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::{path::PathBuf, sync::Arc};
88
use tokio::{
99
sync::{
1010
mpsc::{UnboundedReceiver, UnboundedSender},
11-
Mutex,
11+
watch, Mutex,
1212
},
1313
time::Duration,
1414
};
@@ -273,6 +273,9 @@ pub struct DependenciesBuilder {
273273

274274
/// Protocol parameters retriever
275275
pub protocol_parameters_retriever: Option<Arc<dyn ProtocolParametersRetriever>>,
276+
277+
/// Stop signal channel
278+
pub stop_signal_channel: Option<(watch::Sender<()>, watch::Receiver<()>)>,
276279
}
277280

278281
impl DependenciesBuilder {
@@ -335,6 +338,7 @@ impl DependenciesBuilder {
335338
metrics_service: None,
336339
leader_aggregator_client: None,
337340
protocol_parameters_retriever: None,
341+
stop_signal_channel: None,
338342
}
339343
}
340344

mithril-aggregator/src/dependency_injection/builder/support/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
55
mod compatibility;
66
mod observability;
7+
mod signal;
78
mod sqlite;
89
mod stores;
910
mod upkeep;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
use tokio::sync::watch;
2+
3+
use crate::dependency_injection::{DependenciesBuilder, Result};
4+
use crate::get_dependency;
5+
6+
impl DependenciesBuilder {
7+
/// Builds a stop signal channel
8+
pub async fn build_stop_signal_channel(
9+
&mut self,
10+
) -> Result<(watch::Sender<()>, watch::Receiver<()>)> {
11+
Ok(watch::channel(()))
12+
}
13+
14+
/// Get the stop signal channel
15+
pub async fn get_stop_signal_channel(
16+
&mut self,
17+
) -> Result<(watch::Sender<()>, watch::Receiver<()>)> {
18+
get_dependency!(self.stop_signal_channel)
19+
}
20+
}

mithril-aggregator/src/services/mod.rs

+4
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ mod epoch_service;
1616
mod message;
1717
mod prover;
1818
mod signable_builder;
19+
mod signature_consumer;
20+
mod signature_processor;
1921
mod signed_entity;
2022
mod signer_registration;
2123
mod snapshotter;
@@ -30,6 +32,8 @@ pub use epoch_service::*;
3032
pub use message::*;
3133
pub use prover::*;
3234
pub use signable_builder::*;
35+
pub use signature_consumer::*;
36+
pub use signature_processor::*;
3337
pub use signed_entity::*;
3438
pub use signer_registration::*;
3539
pub use snapshotter::*;

0 commit comments

Comments
 (0)