Skip to content

Commit 96a0b4e

Browse files
committed
DirectIO timer added
1 parent 266fd2e commit 96a0b4e

4 files changed

Lines changed: 107 additions & 40 deletions

File tree

Cargo.lock

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ futures-util = "0.3.31"
2121
uuid = { version = "1.18.1", features = ["v4"] }
2222
flate2 = "1.1.4"
2323
tar = "0.4.44"
24+
libc = "0.2.177"
2425

2526
[dev-dependencies]
2627
criterion = "0.3"

src/lib.rs

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1-
use std::time::Duration;
1+
use std::{alloc::{self, Layout}, cmp::min, fs::{File, OpenOptions}, io::{Read, Write}, os::unix::fs::OpenOptionsExt, time::{Duration, Instant}};
22

3+
use anyhow::Error;
4+
use futures::StreamExt;
5+
use indicatif::{ProgressBar, ProgressStyle};
36
use sqd_query::ParquetChunk;
47

58
pub type Dataset = String;
@@ -44,3 +47,84 @@ pub async fn execute_query(
4447
rx.await
4548
.unwrap_or_else(|_| Err(anyhow::anyhow!("Query processor didn't produce a result")))
4649
}
50+
51+
pub fn time_direct_io(file_path: String) -> Result<Duration, Error> {
52+
if std::fs::exists(&file_path)? {
53+
std::fs::remove_file(&file_path)?;
54+
};
55+
56+
let mut options = OpenOptions::new();
57+
options.read(true);
58+
options.write(true);
59+
options.create(true);
60+
options.custom_flags(libc::O_DIRECT | libc::O_SYNC);
61+
let block_size = 4096;
62+
let iterations = 10;
63+
assert_eq!(block_size % 512, 0);
64+
let elapsed;
65+
let layout = Layout::from_size_align(block_size, 512).unwrap();
66+
{
67+
let mut ff = options.open(&file_path).unwrap();
68+
let s = Instant::now();
69+
let mut acc = 0;
70+
unsafe {
71+
let ptr = alloc::alloc(layout);
72+
for i in 0..block_size {
73+
*ptr.wrapping_add(i) = 1;
74+
}
75+
76+
let wrap = core::slice::from_raw_parts_mut(ptr, block_size);
77+
for _ in 0..iterations {
78+
let _ = ff.write(wrap);
79+
}
80+
81+
for _ in 0..iterations {
82+
let _ = ff.read(wrap);
83+
for i in 0..block_size {
84+
acc += wrap[i];
85+
}
86+
}
87+
libc::free(ptr as *mut libc::c_void);
88+
}
89+
assert_eq!(acc, 0);
90+
elapsed = s.elapsed();
91+
}
92+
93+
std::fs::remove_file(&file_path)?;
94+
95+
Ok(elapsed)
96+
}
97+
98+
pub async fn download_file(url: &str, path: &str) -> Result<f32, String> {
99+
let client = reqwest::Client::new();
100+
let res = client
101+
.get(url)
102+
.send()
103+
.await
104+
.or(Err(format!("Failed to GET from '{}'", &url)))?;
105+
let total_size = res
106+
.content_length()
107+
.ok_or(format!("Failed to get content length from '{}'", &url))?;
108+
109+
let pb = ProgressBar::new(total_size);
110+
pb.set_style(ProgressStyle::with_template("{msg}\n{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})").unwrap()
111+
.progress_chars("#>-"));
112+
pb.set_message(format!("Downloading {url}").to_owned());
113+
114+
let mut file = File::create(path).or(Err(format!("Failed to create file '{path}'")))?;
115+
let mut downloaded: u64 = 0;
116+
let mut stream = res.bytes_stream();
117+
118+
while let Some(item) = stream.next().await {
119+
let chunk = item.or(Err("Error while downloading file".to_owned()))?;
120+
file.write_all(&chunk)
121+
.or(Err("Error while writing to file".to_owned()))?;
122+
let new = min(downloaded + (chunk.len() as u64), total_size);
123+
downloaded = new;
124+
pb.set_position(new);
125+
}
126+
127+
pb.finish_with_message(format!("Downloaded {url} to {path}").to_owned());
128+
let speed = total_size as f64 / pb.elapsed().as_secs_f64() / 1024.0 / 1024.0;
129+
Ok(speed as f32)
130+
}

src/main.rs

Lines changed: 18 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,13 @@
1-
use std::cmp::min;
21
use std::{
32
env,
43
fs::{self, File},
5-
io::Write,
64
time::{Duration, Instant},
75
};
86

9-
use benchmark::execute_query;
7+
use benchmark::{download_file, execute_query, time_direct_io};
108
use clap::Parser;
119
use flate2::read::GzDecoder;
1210
use futures::future::join_all;
13-
use futures_util::StreamExt;
1411
use indicatif::{ProgressBar, ProgressStyle};
1512
use serde::{Deserialize, Serialize};
1613
use tar::Archive;
@@ -91,38 +88,6 @@ pub struct Cli {
9188
pub force_update: bool,
9289
}
9390

94-
pub async fn download_file(url: &str, path: &str) -> Result<(), String> {
95-
let client = reqwest::Client::new();
96-
let res = client
97-
.get(url)
98-
.send()
99-
.await
100-
.or(Err(format!("Failed to GET from '{}'", &url)))?;
101-
let total_size = res
102-
.content_length()
103-
.ok_or(format!("Failed to get content length from '{}'", &url))?;
104-
105-
let pb = ProgressBar::new(total_size);
106-
pb.set_style(ProgressStyle::with_template("{msg}\n{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})").unwrap()
107-
.progress_chars("#>-"));
108-
pb.set_message(format!("Downloading {url}").to_owned());
109-
110-
let mut file = File::create(path).or(Err(format!("Failed to create file '{path}'")))?;
111-
let mut downloaded: u64 = 0;
112-
let mut stream = res.bytes_stream();
113-
114-
while let Some(item) = stream.next().await {
115-
let chunk = item.or(Err("Error while downloading file".to_owned()))?;
116-
file.write_all(&chunk)
117-
.or(Err("Error while writing to file".to_owned()))?;
118-
let new = min(downloaded + (chunk.len() as u64), total_size);
119-
downloaded = new;
120-
pb.set_position(new);
121-
}
122-
123-
pb.finish_with_message(format!("Downloaded {url} to {path}").to_owned());
124-
Ok(())
125-
}
12691

12792
#[tokio::main]
12893
async fn main() {
@@ -150,7 +115,10 @@ async fn main() {
150115
let id = Uuid::new_v4();
151116
dir.push(id.to_string());
152117

153-
let _ = download_file(&url, dir.to_str().unwrap()).await;
118+
let big_download = download_file(&url, dir.to_str().unwrap()).await;
119+
let download_speed = big_download.unwrap();
120+
println!("Download speed: {:?}", download_speed);
121+
154122
let tar_gz = File::open(dir).unwrap();
155123
let tar = GzDecoder::new(tar_gz);
156124
let mut archive = Archive::new(tar);
@@ -161,6 +129,19 @@ async fn main() {
161129
bench_set = serde_yaml::from_reader(reader).unwrap();
162130
}
163131

132+
{
133+
// let file_path = "/home/denis/worker_bench/data/testfile.bin";
134+
let file_path = "./data/testfile.bin";
135+
let mut io_time = Duration::new(0, 0);
136+
let iterations = 10;
137+
for _ in 0..iterations {
138+
io_time += time_direct_io(file_path.to_string()).unwrap();
139+
}
140+
io_time = io_time / iterations;
141+
println!("Direct IO time: {:?}", io_time);
142+
}
143+
// exit(0);
144+
164145
let width = bench_set.len();
165146
let max_concurrent = args.max_concurrency;
166147
let total_samples = args.total_samples;

0 commit comments

Comments
 (0)