Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change(consensus): Updates transaction verifier to use orchard::bundle::BatchValidator #9308

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6049,6 +6049,7 @@ dependencies = [
"jubjub",
"lazy_static",
"metrics",
"nonempty",
"num-integer",
"once_cell",
"orchard",
Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ metrics-exporter-prometheus = { version = "0.16.1", default-features = false }
mset = "0.1.1"
nix = "0.29.0"
num-integer = "0.1.46"
nonempty = "0.7.0"
once_cell = "1.20.2"
ordered-map = "0.4.2"
owo-colors = "4.1.0"
Expand Down Expand Up @@ -152,6 +153,7 @@ x25519-dalek = "2.0.1"
zcash_note_encryption = "0.4.1"
zcash_script = "0.2.0"


[workspace.metadata.release]

# We always do releases from the main branch
Expand Down
4 changes: 3 additions & 1 deletion tower-batch-control/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use std::{fmt, marker::PhantomData};
use tower::layer::Layer;
use tower::Service;

use crate::ItemSize;

use super::{service::Batch, BatchControl};

/// Adds a layer performing batch processing of requests.
Expand Down Expand Up @@ -43,7 +45,7 @@ impl<Request> BatchLayer<Request> {
}
}

impl<S, Request> Layer<S> for BatchLayer<Request>
impl<S, Request: ItemSize> Layer<S> for BatchLayer<Request>
where
S: Service<BatchControl<Request>> + Send + 'static,
S::Future: Send,
Expand Down
17 changes: 15 additions & 2 deletions tower-batch-control/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,18 +105,31 @@ type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
/// Signaling mechanism for batchable services that allows explicit flushing.
///
/// This request type is a generic wrapper for the inner `Req` type.
pub enum BatchControl<Req> {
pub enum BatchControl<Req: ItemSize> {
/// A new batch item.
Item(Req),
/// The current batch should be flushed.
Flush,
}

impl<Req> From<Req> for BatchControl<Req> {
impl<Req: ItemSize> From<Req> for BatchControl<Req> {
fn from(req: Req) -> BatchControl<Req> {
BatchControl::Item(req)
}
}

pub use self::layer::BatchLayer;
pub use self::service::Batch;

/// A trait for reading the size of a request to [`BatchControl`] services.
pub trait ItemSize {
/// Returns the size of this item relative to the maximum threshold for flushing
/// requests to the underlying service.
fn item_size(&self) -> usize {
1
}
}

// [`ItemSize`] impls for test `Item` types
impl ItemSize for () {}
impl ItemSize for &'static str {}
12 changes: 7 additions & 5 deletions tower-batch-control/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use tokio_util::sync::PollSemaphore;
use tower::Service;
use tracing::{info_span, Instrument};

use crate::ItemSize;

use super::{
future::ResponseFuture,
message::Message,
Expand All @@ -34,7 +36,7 @@ pub const QUEUE_BATCH_LIMIT: usize = 64;
/// Allows batch processing of requests.
///
/// See the crate documentation for more details.
pub struct Batch<T, Request>
pub struct Batch<T, Request: ItemSize>
where
T: Service<BatchControl<Request>>,
{
Expand Down Expand Up @@ -72,7 +74,7 @@ where
worker_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
}

impl<T, Request> fmt::Debug for Batch<T, Request>
impl<T, Request: ItemSize> fmt::Debug for Batch<T, Request>
where
T: Service<BatchControl<Request>>,
{
Expand All @@ -88,7 +90,7 @@ where
}
}

impl<T, Request> Batch<T, Request>
impl<T, Request: ItemSize> Batch<T, Request>
where
T: Service<BatchControl<Request>>,
T::Future: Send + 'static,
Expand Down Expand Up @@ -218,7 +220,7 @@ where
}
}

impl<T, Request> Service<Request> for Batch<T, Request>
impl<T, Request: ItemSize> Service<Request> for Batch<T, Request>
where
T: Service<BatchControl<Request>>,
T::Future: Send + 'static,
Expand Down Expand Up @@ -322,7 +324,7 @@ where
}
}

impl<T, Request> Clone for Batch<T, Request>
impl<T, Request: ItemSize> Clone for Batch<T, Request>
where
T: Service<BatchControl<Request>>,
{
Expand Down
11 changes: 6 additions & 5 deletions tower-batch-control/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use tokio_util::sync::PollSemaphore;
use tower::{Service, ServiceExt};
use tracing_futures::Instrument;

use crate::ItemSize;

use super::{
error::{Closed, ServiceError},
message::{self, Message},
Expand All @@ -34,7 +36,7 @@ use super::{
/// implement (only call).
#[pin_project(PinnedDrop)]
#[derive(Debug)]
pub struct Worker<T, Request>
pub struct Worker<T, Request: ItemSize>
where
T: Service<BatchControl<Request>>,
T::Future: Send + 'static,
Expand Down Expand Up @@ -91,7 +93,7 @@ pub(crate) struct ErrorHandle {
inner: Arc<Mutex<Option<ServiceError>>>,
}

impl<T, Request> Worker<T, Request>
impl<T, Request: ItemSize> Worker<T, Request>
where
T: Service<BatchControl<Request>>,
T::Future: Send + 'static,
Expand Down Expand Up @@ -142,10 +144,9 @@ where

match self.service.ready().await {
Ok(svc) => {
self.pending_items += req.item_size();
let rsp = svc.call(req.into());
let _ = tx.send(Ok(rsp));

self.pending_items += 1;
}
Err(e) => {
self.failed(e.into());
Expand Down Expand Up @@ -351,7 +352,7 @@ impl Clone for ErrorHandle {
}

#[pin_project::pinned_drop]
impl<T, Request> PinnedDrop for Worker<T, Request>
impl<T, Request: ItemSize> PinnedDrop for Worker<T, Request>
where
T: Service<BatchControl<Request>>,
T::Future: Send + 'static,
Expand Down
25 changes: 20 additions & 5 deletions tower-batch-control/tests/ed25519.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ use std::{
};

use color_eyre::{eyre::eyre, Report};
use ed25519_zebra::{batch, Error, SigningKey, VerificationKeyBytes};
use ed25519_zebra::{batch, Error, Signature, SigningKey, VerificationKeyBytes};
use futures::stream::{FuturesOrdered, StreamExt};
use futures::FutureExt;
use futures_core::Future;
use rand::thread_rng;
use tokio::sync::{oneshot::error::RecvError, watch};
use tower::{Service, ServiceExt};
use tower_batch_control::{Batch, BatchControl};
use tower_batch_control::{Batch, BatchControl, ItemSize};
use tower_fallback::Fallback;

// ============ service impl ============
Expand All @@ -33,8 +33,23 @@ type VerifyResult = Result<(), Error>;
type Sender = watch::Sender<Option<VerifyResult>>;

/// The type of the batch item.
/// This is an `Ed25519Item`.
type Item = batch::Item;
/// This is a newtype around an `Ed25519Item`.
#[derive(Clone, Debug)]
struct Item(batch::Item);

impl ItemSize for Item {}

impl<'msg, M: AsRef<[u8]> + ?Sized> From<(VerificationKeyBytes, Signature, &'msg M)> for Item {
fn from(tup: (VerificationKeyBytes, Signature, &'msg M)) -> Self {
Self(batch::Item::from(tup))
}
}

impl Item {
fn verify_single(self) -> VerifyResult {
self.0.verify_single()
}
}

/// Ed25519 signature verifier service
struct Verifier {
Expand Down Expand Up @@ -106,7 +121,7 @@ impl Service<BatchControl<Item>> for Verifier {

fn call(&mut self, req: BatchControl<Item>) -> Self::Future {
match req {
BatchControl::Item(item) => {
BatchControl::Item(Item(item)) => {
tracing::trace!("got ed25519 item");
self.batch.queue(item);
let mut rx = self.tx.subscribe();
Expand Down
11 changes: 11 additions & 0 deletions zebra-chain/src/orchard/note/ciphertexts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ impl From<EncryptedNote> for [u8; 580] {
enc_ciphertext.0
}
}
impl From<&EncryptedNote> for [u8; 580] {
fn from(&EncryptedNote(enc_ciphertext): &EncryptedNote) -> Self {
enc_ciphertext
}
}

impl PartialEq for EncryptedNote {
fn eq(&self, other: &Self) -> bool {
Expand Down Expand Up @@ -102,6 +107,12 @@ impl From<WrappedNoteKey> for [u8; 80] {
}
}

impl From<&WrappedNoteKey> for [u8; 80] {
fn from(out_ciphertext: &WrappedNoteKey) -> Self {
out_ciphertext.0
}
}

impl PartialEq for WrappedNoteKey {
fn eq(&self, other: &Self) -> bool {
self.0[..] == other.0[..]
Expand Down
13 changes: 13 additions & 0 deletions zebra-chain/src/orchard/note/nullifiers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,19 @@ use crate::serialization::{serde_helpers, SerializationError};
#[derive(Clone, Copy, Debug, Eq, Serialize, Deserialize)]
pub struct Nullifier(#[serde(with = "serde_helpers::Base")] pub(crate) pallas::Base);

impl From<Nullifier> for orchard::note::Nullifier {
fn from(value: Nullifier) -> Self {
// TODO: impl From<pallas::Base> for orchard::note::Nullifier in the orchard repo
Self::from_bytes(&value.into()).expect("should be valid")
}
}

impl From<&Nullifier> for orchard::note::Nullifier {
fn from(&value: &Nullifier) -> Self {
Self::from(value)
}
}

impl Hash for Nullifier {
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.to_repr().hash(state);
Expand Down
2 changes: 2 additions & 0 deletions zebra-consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ zebra-state = { path = "../zebra-state", version = "1.0.0-beta.45" }
zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.45" }
zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.45" }

nonempty.workspace = true

# prod feature progress-bar
howudoin = { workspace = true, optional = true }

Expand Down
7 changes: 2 additions & 5 deletions zebra-consensus/src/primitives.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,9 @@ pub async fn spawn_fifo_and_convert<
}

/// Fires off a task into the Rayon threadpool and awaits the result through a oneshot channel.
pub async fn spawn_fifo<
E: 'static + std::error::Error + Sync + Send,
F: 'static + FnOnce() -> Result<(), E> + Send,
>(
pub async fn spawn_fifo<T: 'static + Send, F: 'static + FnOnce() -> T + Send>(
f: F,
) -> Result<Result<(), E>, RecvError> {
) -> Result<T, RecvError> {
// Rayon doesn't have a spawn function that returns a value,
// so we use a oneshot channel instead.
let (rsp_tx, rsp_rx) = tokio::sync::oneshot::channel();
Expand Down
27 changes: 24 additions & 3 deletions zebra-consensus/src/primitives/ed25519.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use rand::thread_rng;

use tokio::sync::watch;
use tower::{util::ServiceFn, Service};
use tower_batch_control::{Batch, BatchControl};
use tower_batch_control::{Batch, BatchControl, ItemSize};
use tower_fallback::Fallback;
use zebra_chain::primitives::ed25519::*;

Expand All @@ -34,8 +34,29 @@ type VerifyResult = Result<(), Error>;
type Sender = watch::Sender<Option<VerifyResult>>;

/// The type of the batch item.
/// This is an `Ed25519Item`.
pub type Item = batch::Item;
/// This is a newtype around an `Ed25519Item`.
#[derive(Clone, Debug)]
pub struct Item(batch::Item);

impl ItemSize for Item {}

impl<'msg, M: AsRef<[u8]> + ?Sized> From<(VerificationKeyBytes, Signature, &'msg M)> for Item {
fn from(tup: (VerificationKeyBytes, Signature, &'msg M)) -> Self {
Self(batch::Item::from(tup))
}
}

impl From<Item> for batch::Item {
fn from(Item(item): Item) -> Self {
item
}
}

impl Item {
fn verify_single(self) -> VerifyResult {
self.0.verify_single()
}
}

/// Global batch verification context for Ed25519 signatures.
///
Expand Down
24 changes: 20 additions & 4 deletions zebra-consensus/src/primitives/groth16.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use rand::thread_rng;
use tokio::sync::watch;
use tower::{util::ServiceFn, Service};

use tower_batch_control::{Batch, BatchControl};
use tower_batch_control::{Batch, BatchControl, ItemSize};
use tower_fallback::{BoxedError, Fallback};

use zebra_chain::{
Expand Down Expand Up @@ -57,8 +57,24 @@ type VerifyResult = Result<(), VerificationError>;
type Sender = watch::Sender<Option<VerifyResult>>;

/// The type of the batch item.
/// This is a Groth16 verification item.
pub type Item = batch::Item<Bls12>;
/// This is a newtype around a Groth16 verification item.
#[derive(Clone, Debug)]
pub struct Item(batch::Item<Bls12>);

impl ItemSize for Item {}

impl<T: Into<batch::Item<Bls12>>> From<T> for Item {
fn from(value: T) -> Self {
Self(value.into())
}
}

impl Item {
/// Convenience method to call a method on the inner value to perform non-batched verification.
pub fn verify_single(self, pvk: &PreparedVerifyingKey<Bls12>) -> VerifyResult {
self.0.verify_single(pvk)
}
}

/// The type of a raw verifying key.
/// This is the key used to verify batches.
Expand Down Expand Up @@ -469,7 +485,7 @@ impl Service<BatchControl<Item>> for Verifier {
match req {
BatchControl::Item(item) => {
tracing::trace!("got item");
self.batch.queue(item);
self.batch.queue(item.0);
let mut rx = self.tx.subscribe();

Box::pin(async move {
Expand Down
Loading
Loading