Skip to content

Refactor encryption #23

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 133 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 118 commits
Commits
Show all changes
133 commits
Select commit Hold shift + click to select a range
dc70505
Add initial setup for noise wrapper
cowlicks Mar 6, 2025
640efc9
use futures
cowlicks Mar 7, 2025
11afd0b
fix logger
cowlicks Mar 7, 2025
5f7d10b
Add test_utils
cowlicks Mar 7, 2025
4ee231c
Add standalone noise wrapper
cowlicks Mar 7, 2025
ca061fe
lint test_utils
cowlicks Mar 7, 2025
3e82e0f
wip noise
cowlicks Mar 9, 2025
241622f
wip poll impl
cowlicks Mar 10, 2025
2591d28
Add start_raw and read_raw
cowlicks Mar 10, 2025
400d57d
lint
cowlicks Mar 11, 2025
7c9e2f5
use read_raw & start_raw
cowlicks Mar 11, 2025
20302eb
Encrypted stream is now working!
cowlicks Mar 11, 2025
792358d
Add RawEncrytpCipher
cowlicks Mar 11, 2025
1f6fb96
More tests, logging, lints
cowlicks Mar 12, 2025
730baac
rm encrypt_in_place
cowlicks Mar 12, 2025
bb7354b
rm stuff used for development
cowlicks Mar 12, 2025
4e6217a
Add LengthPrefixed framing
cowlicks Mar 13, 2025
b576c9f
use var for header len
cowlicks Mar 13, 2025
a3a50d0
refactor encrypted poll functions
cowlicks Mar 13, 2025
94785a5
rename writer fields
cowlicks Mar 13, 2025
fb88b89
s/3/header_len/g
cowlicks Mar 14, 2025
1bb619d
add tokio-util for tests
cowlicks Mar 15, 2025
b6db233
Add result channel to test utils.
cowlicks Mar 15, 2025
7780cb2
refactor framing tests
cowlicks Mar 15, 2025
ec9edc2
Get Encrypted working with Result<Vec<u8>> from io
cowlicks Mar 15, 2025
c9a7709
Make Encrypted receive a Result
cowlicks Mar 16, 2025
f2cd806
Fix impl of Sink fro Framing poll_flush
cowlicks Mar 16, 2025
0805657
Add docs handle todos
cowlicks Mar 17, 2025
b9c5482
Add encryption_established
cowlicks Mar 17, 2025
e547a1f
logs
cowlicks Mar 17, 2025
83dbe3a
Add framing buffer rotation
cowlicks Mar 17, 2025
a585aef
bump futures to non-yanked version
cowlicks Mar 17, 2025
31e6281
handle setup errors and add test
cowlicks Mar 17, 2025
8a2370b
RMME show logs in example
cowlicks Mar 18, 2025
918a307
RMME extra docs
cowlicks Mar 18, 2025
ecc499a
add const header len
cowlicks Mar 18, 2025
8a96462
lint
cowlicks Mar 18, 2025
a85e42e
helpful names
cowlicks Mar 18, 2025
3e1483a
rename framing struct
cowlicks Mar 18, 2025
2a60291
Add func for building encrypted framed channel
cowlicks Mar 18, 2025
41fd39e
Move Protocol in prep for feature flagging
cowlicks Mar 18, 2025
8fca159
add second protocol impl behind feature flag
cowlicks Mar 18, 2025
e7e7dd7
fix spelling
cowlicks Mar 18, 2025
bb390b4
WIP Drop in encrypted channel
cowlicks Mar 18, 2025
df1e634
make Encoder trait immutable self
cowlicks Mar 18, 2025
a860221
rm unused State from ChannelMessage
cowlicks Mar 18, 2025
d6e1e72
split out Vec<ChannelMessage> encoding
cowlicks Mar 18, 2025
7dbb125
make integration tests use tokio
cowlicks Mar 18, 2025
bbad25b
RESETME
cowlicks Mar 19, 2025
e9d6236
Add frame encoding test
cowlicks Mar 19, 2025
15d2895
fix ChannelMessage Encoding. Restore Frame encoding
cowlicks Mar 20, 2025
002ba36
rm/gate old unused stuff. add instrument
cowlicks Mar 25, 2025
fae4804
doc comments
cowlicks Mar 25, 2025
b019e9f
feature gate oldmessage
cowlicks Mar 25, 2025
24fe4a1
fix all warnings
cowlicks Mar 25, 2025
b23bead
Add tracing::instrument
cowlicks Apr 1, 2025
16029ee
rm dbg
cowlicks Apr 1, 2025
5bf98ed
rm unused
cowlicks Apr 2, 2025
d3bfd07
rm unused
cowlicks Apr 2, 2025
094caa8
pub HandshakeResult
cowlicks Apr 2, 2025
7a8b2da
custom debug for Framig
cowlicks Apr 2, 2025
a7fac6d
feature gates
cowlicks Apr 2, 2025
a189b04
rm println
cowlicks Apr 2, 2025
9db631b
rm print
cowlicks Apr 2, 2025
a732507
refactor mqueue to pass through non byte messages
cowlicks Apr 2, 2025
c5888f5
use tracing-tree for viewing logs in tests
cowlicks Apr 2, 2025
d21d4a9
use tracing-tree
cowlicks Apr 2, 2025
3062f07
rm unused async
cowlicks Apr 2, 2025
2941840
expose handshake result, refactor, fix deadlock
cowlicks Apr 2, 2025
4a35d1f
wait for setup to handle commands
cowlicks Apr 2, 2025
35a6bac
move unused framed stuff into tests and do rename
cowlicks Apr 9, 2025
483e9ff
impl CompactEncodable for Schema
cowlicks Apr 9, 2025
350f03a
encoded_bytes renamed to encode
cowlicks Apr 14, 2025
bf0d309
use new CompactEncoding in schema.rs
cowlicks Apr 22, 2025
1700d90
make test easier to debug
cowlicks Apr 23, 2025
1d51159
lint
cowlicks Apr 23, 2025
a3395c9
rename old encoder trait to fix name collisio
cowlicks Apr 23, 2025
a9bab56
cleaning up messages
cowlicks Apr 23, 2025
80c29ee
removing Encoder trait
cowlicks Apr 24, 2025
a57a885
wip impl VecEncodable for CompactEncoding
cowlicks Apr 24, 2025
564c006
Only use ChanMsg::channel when not Open & Close
cowlicks Apr 30, 2025
861ee29
Add #[instrument]
cowlicks Apr 30, 2025
a62ce55
rm old stuff
cowlicks Apr 30, 2025
6f1995d
cargo clippy --fix
cowlicks Apr 30, 2025
ca329b4
Remove protocol feature
cowlicks Apr 30, 2025
3548f43
cargo fmt
cowlicks Apr 30, 2025
72d1d9e
clippy fixes
cowlicks Apr 30, 2025
1e3edd1
rm nested messaged & protocol modules
cowlicks Apr 30, 2025
3235036
remove encoder trait
cowlicks Apr 30, 2025
b9e5b49
clippy fixes
cowlicks Apr 30, 2025
2f6f694
add compact_encoding dependency
cowlicks May 2, 2025
f1188c7
remove decode macro
cowlicks May 5, 2025
c15aff9
update compact-encoding version
cowlicks May 6, 2025
6a44f81
Remove unused features
cowlicks May 6, 2025
e718807
remove unused uint24 feature
cowlicks May 6, 2025
ec47b84
remove use of test_log
cowlicks May 6, 2025
3357e6a
remove test-log dep
cowlicks May 6, 2025
778a192
Add instrument to some funcs
cowlicks May 6, 2025
b5ed63e
remove redundant 'simple' from every func name
cowlicks May 6, 2025
4c0402f
rm async_std test wrappers
cowlicks May 7, 2025
917b837
instrument and rename vec_encoded_size for cm
cowlicks May 8, 2025
d7bd06d
fix Vec<ChannelMessage> encoding
cowlicks May 8, 2025
3da6c62
remove redundant names
cowlicks May 9, 2025
74033f1
RMME
cowlicks May 11, 2025
d87c498
More logging rm unused
cowlicks May 18, 2025
539a017
Notes
cowlicks May 18, 2025
39d0dbe
use checked get
cowlicks May 19, 2025
d97bc64
RMME
cowlicks May 19, 2025
5926999
un-ignore tests
cowlicks May 19, 2025
81e0dad
rm debug
cowlicks May 19, 2025
fb14f92
rm framing stuff from messages
cowlicks May 20, 2025
3b9430b
Add manifest & priority to Request
cowlicks May 20, 2025
f535d3b
rm unused framing stuff
cowlicks May 20, 2025
b59077e
rm test logging
cowlicks May 20, 2025
f6b37e5
cargo fmt
cowlicks May 20, 2025
76b3a27
cargo clippy --fix
cowlicks May 20, 2025
2d1ef1b
RawEncCipher -> EncCipher
cowlicks May 20, 2025
fe2eed1
Remove old notes
cowlicks May 20, 2025
f92b5b2
lint
cowlicks May 20, 2025
f4fb371
rename tests
cowlicks May 20, 2025
b247a36
rm old notse
cowlicks May 20, 2025
cefc744
group imports
cowlicks May 20, 2025
f841a2b
format code in docs
cowlicks May 20, 2025
0ee4be6
rm unused
cowlicks May 20, 2025
7f38fda
rm unwraps
cowlicks May 20, 2025
c04e2b5
lint
cowlicks May 20, 2025
9b40a73
clean up noise module
cowlicks May 20, 2025
0851270
lint
cowlicks May 20, 2025
df6d311
Remove log and env_log depndencies
cowlicks May 20, 2025
b52c91e
clean up test_utils
cowlicks May 20, 2025
ffa54bb
rename js_interop_tests to js_tests
cowlicks May 20, 2025
069538c
refactor tsets
cowlicks May 20, 2025
90f081c
RMME reset
cowlicks May 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ futures-lite = "1"
sha2 = "0.10"
curve25519-dalek = "4"
crypto_secretstream = "0.2"
futures = "0.3.31"
compact-encoding = "2"

[dependencies.hypercore]
version = "0.14.0"
default-features = false

path = "../core"
#version = "0.14.0"
#default-features = false

[dev-dependencies]
async-std = { version = "1.12.0", features = ["attributes", "unstable"] }
Expand All @@ -58,8 +60,9 @@ duplexify = "1.1.0"
sluice = "0.5.4"
futures = "0.3.13"
log = "0.4"
test-log = { version = "0.2.11", default-features = false, features = ["trace"] }
tracing-subscriber = { version = "0.3.16", features = ["env-filter", "fmt"] }
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "fmt"] }
tracing-tree = "0.4.0"
tokio-util = { version = "0.7.14", features = ["compat"] }

[features]
default = ["tokio", "sparse"]
Expand Down
29 changes: 13 additions & 16 deletions benches/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ fn bench_throughput(c: &mut Criterion) {
env_logger::from_env(env_logger::Env::default().default_filter_or("error")).init();
let mut group = c.benchmark_group("pipe");
group.sample_size(10);
group.throughput(Throughput::Bytes(SIZE * COUNT * CONNS as u64));
group.throughput(Throughput::Bytes(SIZE * COUNT * CONNS));
group.bench_function("pipe_echo", |b| {
b.iter(|| {
task::block_on(async move {
Expand Down Expand Up @@ -72,7 +72,7 @@ where
debug!("[{}] EVENT {:?}", is_initiator, event);
match event {
Event::Handshake(_) => {
protocol.open(key.clone()).await?;
protocol.open(key).await?;
}
Event::DiscoveryKey(_dkey) => {}
Event::Channel(channel) => {
Expand All @@ -92,7 +92,7 @@ where
}
Some(Err(err)) => {
error!("ERROR {:?}", err);
return Err(err.into());
return Err(err);
}
None => return Ok(0),
}
Expand Down Expand Up @@ -127,20 +127,17 @@ async fn on_channel_init(i: u64, mut channel: Channel) -> Result<u64> {
let start = std::time::Instant::now();

while let Some(message) = channel.next().await {
match message {
Message::Data(mut data) => {
len += value_len(&data);
debug!("[a] recv {}", index(&data));
if index(&data) >= COUNT {
debug!("close at {}", index(&data));
channel.close().await?;
break;
} else {
increment_index(&mut data);
channel.send(Message::Data(data)).await?;
}
if let Message::Data(mut data) = message {
len += value_len(&data);
debug!("[a] recv {}", index(&data));
if index(&data) >= COUNT {
debug!("close at {}", index(&data));
channel.close().await?;
break;
} else {
increment_index(&mut data);
channel.send(Message::Data(data)).await?;
}
_ => {}
}
}
// let bytes = (COUNT * SIZE) as f64;
Expand Down
52 changes: 23 additions & 29 deletions benches/throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use futures::future::Either;
use futures::io::{AsyncRead, AsyncWrite};
use futures::stream::{FuturesUnordered, StreamExt};
use hypercore_protocol::{schema::*, Duplex};
use hypercore_protocol::schema::*;
use hypercore_protocol::{Channel, Event, Message, ProtocolBuilder};
use log::*;
use std::time::Instant;
Expand Down Expand Up @@ -71,24 +71,23 @@ async fn start_server(address: &str) -> futures::channel::oneshot::Sender<()> {
// let kill_rx = &mut kill_rx;
loop {
match futures::future::select(incoming.next(), &mut kill_rx).await {
Either::Left((next, _)) => match next {
Some(Ok(stream)) => {
Either::Left((next, _)) => {
if let Some(Ok(stream)) = next {
let peer_addr = stream.peer_addr().unwrap();
debug!("new connection from {}", peer_addr);
task::spawn(async move {
onconnection(stream.clone(), stream, false).await;
});
}
_ => {}
},
}
Either::Right((_, _)) => return,
}
}
});
kill_tx
}

async fn onconnection<R, W>(reader: R, writer: W, is_initiator: bool) -> Duplex<R, W>
async fn onconnection<R, W>(reader: R, writer: W, is_initiator: bool)
where
R: AsyncRead + Send + Unpin + 'static,
W: AsyncWrite + Send + Unpin + 'static,
Expand All @@ -101,19 +100,18 @@ where
// eprintln!("RECV EVENT [{}] {:?}", protocol.is_initiator(), event);
match event {
Event::Handshake(_) => {
protocol.open(key.clone()).await.unwrap();
protocol.open(key).await.unwrap();
}
Event::DiscoveryKey(_) => {}
Event::Channel(channel) => {
task::spawn(onchannel(channel, is_initiator));
}
Event::Close(_dkey) => {
return protocol.release();
return;
}
_ => {}
}
}
protocol.release()
}

async fn onchannel(mut channel: Channel, is_initiator: bool) {
Expand All @@ -127,9 +125,8 @@ async fn onchannel(mut channel: Channel, is_initiator: bool) {

async fn channel_server(channel: &mut Channel) {
while let Some(message) = channel.next().await {
match message {
Message::Data(_) => channel.send(message).await.unwrap(),
_ => {}
if let Message::Data(_) = message {
channel.send(message).await.unwrap()
}
}
}
Expand All @@ -140,24 +137,21 @@ async fn channel_client(channel: &mut Channel) {
let message = msg_data(0, data.clone());
channel.send(message).await.unwrap();
while let Some(message) = channel.next().await {
match message {
Message::Data(ref msg) => {
if index(msg) < COUNT {
let message = msg_data(index(msg) + 1, data.clone());
channel.send(message).await.unwrap();
} else {
let time = start.elapsed();
let bytes = COUNT * SIZE;
trace!(
"client completed. {} blocks, {} bytes, {:?}",
index(msg),
bytes,
time
);
break;
}
if let Message::Data(ref msg) = message {
if index(msg) < COUNT {
let message = msg_data(index(msg) + 1, data.clone());
channel.send(message).await.unwrap();
} else {
let time = start.elapsed();
let bytes = COUNT * SIZE;
trace!(
"client completed. {} blocks, {} bytes, {:?}",
index(msg),
bytes,
time
);
break;
}
_ => {}
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion examples-nodejs/run.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ function startRust (mode, key, color, name) {
color: color || 'blue',
env: {
...process.env,
RUST_LOG_STYLE: 'always'
RUST_LOG_STYLE: 'always',
RUST_LOG: 'trace'
}
})
return rust
Expand Down
30 changes: 23 additions & 7 deletions examples/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,23 @@ use async_std::net::{TcpListener, TcpStream};
use async_std::prelude::*;
use async_std::sync::{Arc, Mutex};
use async_std::task;
use env_logger::Env;
use futures_lite::stream::StreamExt;
use hypercore::{
Hypercore, HypercoreBuilder, PartialKeypair, RequestBlock, RequestUpgrade, Storage,
VerifyingKey,
};
use log::*;
use std::collections::HashMap;
use std::convert::TryInto;
use std::env;
use std::fmt::Debug;
use std::sync::OnceLock;
use tracing::{error, info};

use hypercore_protocol::schema::*;
use hypercore_protocol::{discovery_key, Channel, Event, Message, ProtocolBuilder};

fn main() {
init_logger();
log();
if env::args().count() < 3 {
usage();
}
Expand Down Expand Up @@ -93,8 +93,8 @@ async fn onconnection(
let mut protocol = ProtocolBuilder::new(is_initiator).connect(stream);
info!("protocol created, polling for next()");
while let Some(event) = protocol.next().await {
let event = event?;
info!("protocol event {:?}", event);
let event = event?;
match event {
Event::Handshake(_) => {
if is_initiator {
Expand Down Expand Up @@ -299,6 +299,8 @@ async fn onmessage(
start: info.length,
length: peer_state.remote_length - info.length,
}),
manifest: false,
priority: 0,
};
messages.push(Message::Request(msg));
}
Expand Down Expand Up @@ -405,6 +407,8 @@ async fn onmessage(
block: Some(request_block),
seek: None,
upgrade: None,
manifest: false,
priority: 0,
}));
}
channel.send_batch(&messages).await.unwrap();
Expand All @@ -414,9 +418,21 @@ async fn onmessage(
Ok(())
}

/// Init EnvLogger, logging info, warn and error messages to stdout.
pub fn init_logger() {
env_logger::from_env(Env::default().default_filter_or("info")).init();
#[allow(unused)]
pub fn log() {
use tracing_subscriber::{fmt::format::FmtSpan, EnvFilter};
static START_LOGS: OnceLock<()> = OnceLock::new();
START_LOGS.get_or_init(|| {
tracing_subscriber::fmt()
.with_target(true)
.with_line_number(true)
// print when instrumented funtion enters
.with_span_events(FmtSpan::ENTER | FmtSpan::EXIT)
.with_file(true)
.with_env_filter(EnvFilter::from_default_env()) // Reads `RUST_LOG` environment variable
.without_time()
.init();
});
}

/// Log a result if it's an error.
Expand Down
12 changes: 6 additions & 6 deletions src/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::Poll;
use tracing::debug;
use tracing::instrument;

/// A protocol channel.
///
Expand Down Expand Up @@ -93,7 +93,6 @@ impl Channel {
"Channel is closed",
));
}
debug!("TX:\n{message:?}\n");
let message = ChannelMessage::new(self.local_id as u64, message);
self.outbound_tx
.send(vec![message])
Expand Down Expand Up @@ -122,10 +121,7 @@ impl Channel {

let messages = messages
.iter()
.map(|message| {
debug!("TX:\n{message:?}\n");
ChannelMessage::new(self.local_id as u64, message.clone())
})
.map(|message| ChannelMessage::new(self.local_id as u64, message.clone()))
.collect();
self.outbound_tx
.send(messages)
Expand Down Expand Up @@ -249,6 +245,7 @@ impl ChannelHandle {
self.remote_state.as_ref().map(|s| s.remote_id)
}

#[instrument(skip_all, fields(local_id = local_id))]
pub(crate) fn attach_local(&mut self, local_id: usize, key: Key) {
let local_state = LocalState { local_id, key };
self.local_state = Some(local_state);
Expand Down Expand Up @@ -276,6 +273,7 @@ impl ChannelHandle {
Ok((&local_state.key, remote_state.remote_capability.as_ref()))
}

#[instrument(skip_all)]
pub(crate) fn open(&mut self, outbound_tx: Sender<Vec<ChannelMessage>>) -> Channel {
let local_state = self
.local_state
Expand Down Expand Up @@ -433,6 +431,7 @@ impl ChannelMap {
self.channels.remove(&hdkey);
}

#[instrument(skip(self))]
pub(crate) fn prepare_to_verify(&self, local_id: usize) -> Result<(&Key, Option<&Vec<u8>>)> {
let channel_handle = self
.get_local(local_id)
Expand Down Expand Up @@ -477,6 +476,7 @@ impl ChannelMap {
Ok(())
}

#[instrument(skip_all)]
fn alloc_local(&mut self) -> usize {
let empty_id = self
.local_id
Expand Down
7 changes: 0 additions & 7 deletions src/constants.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
/// Seed for the discovery key hash
pub(crate) const DISCOVERY_NS_BUF: &[u8] = b"hypercore";

/// Default timeout (in seconds)
pub(crate) const DEFAULT_TIMEOUT: u32 = 20;

/// Default keepalive interval (in seconds)
pub(crate) const DEFAULT_KEEPALIVE: u32 = 10;

// 16,78MB is the max encrypted wire message size (will be much smaller usually).
// This limitation stems from the 24bit header.
pub(crate) const MAX_MESSAGE_SIZE: u64 = 0xFFFFFF;

/// v10: Protocol name
pub(crate) const PROTOCOL_NAME: &str = "hypercore/alpha";
Loading
Loading