diff --git a/frameworks/Rust/water-http/.gitignore b/frameworks/Rust/water-http/.gitignore new file mode 100644 index 00000000000..2da6cdebaac --- /dev/null +++ b/frameworks/Rust/water-http/.gitignore @@ -0,0 +1,3 @@ +target +Cargo.lock +.idea \ No newline at end of file diff --git a/frameworks/Rust/water-http/Cargo.toml b/frameworks/Rust/water-http/Cargo.toml new file mode 100644 index 00000000000..fe22c5a9a68 --- /dev/null +++ b/frameworks/Rust/water-http/Cargo.toml @@ -0,0 +1,45 @@ +[package] +name = "water-http" +version = "0.1.0" +edition = "2018" + +[dependencies] +askama = "0.14.0" +tokio = { version = "1.47.1", features = ["full"] } +water_http = {git = "https://github.com/HassanSharara/water_http.git", branch = "beta", features = ["use_only_http1"],optional = true , version = "3.0.8-beta.5" } +smallvec = "1.15.1" +nanorand = "0.8.0" +tokio-postgres = "0.7.15" +sonic-rs = "0.5.5" +bytes = "1.10.1" +serde = { version = "1.0.228", features = ["derive","rc"] } +futures-util = "0.3.31" +num_cpus = "1.17.0" +httpdate = "1.0.3" +parking_lot = "0.12.5" +yarte = { version = "0.15.7" ,features = ["bytes-buf", "json"] } +itoa = {version = "1.0.15" ,optional = true} + + +[[bin]] +name = "plaintext" +path = "src/plaintext.rs" +required-features = ["json_plaintext"] + +[[bin]] +name = "json" +path = "src/json.rs" +required-features = ["json_plaintext"] + + +[[bin]] +name = "cache" +path = "src/cached.rs" +required-features = ["cache"] + + +[features] +json_plaintext = ["water_http"] +db = ["water_http/thread_shared_struct"] +cache = ["water_http/thread_shared_struct","itoa"] +all = ["water_http/thread_shared_struct"] diff --git a/frameworks/Rust/water-http/README b/frameworks/Rust/water-http/README new file mode 100644 index 00000000000..67e4ecc228a --- /dev/null +++ b/frameworks/Rust/water-http/README @@ -0,0 +1,39 @@ +🌊 Water HTTP — TechEmpower Benchmarks + +Water HTTP is a high-performance Rust web framework built and optimized for the TechEmpower Framework Benchmarks (TFB) +. It is designed to push the limits of speed, stability, and scalability using pure asynchronous Rust and Tokio’s runtime. Every part of the framework is hand-tuned to achieve predictable latency, minimal allocations, and efficient CPU utilization under extreme concurrency. + +This repository contains the official benchmark implementations for all test types, including Plaintext, JSON serialization, Single query, Multiple queries, Fortunes, Database updates, and Cached queries. + +âš¡ Highlights + +🚀 One of the fastest and most stable frameworks in the TechEmpower Benchmarks + +🧵 Built entirely on Tokio’s asynchronous runtime — no io_uring or unsafe code + +💾 Zero-copy I/O, preallocated buffers, and predictable memory layout + +🔒 100% safe Rust with no hidden synchronization overhead + +🧱 Designed for consistent high performance at scale + + +🧠 Architecture + +Water HTTP is built around a fully asynchronous, event-driven architecture that leverages non-blocking I/O and efficient request processing. The system is designed to eliminate unnecessary locks and minimize latency even under heavy load. + +[ tokio::net::TcpListener ] + ↓ + [ connection handler ] + ↓ + [ water::http::parser ] + ↓ + [ application logic ] + ↓ + [ response encoder ] +This approach ensures optimal throughput and minimal per-request overhead while keeping code simple and safe. + + +🧭 Mission + +Water HTTP’s mission is to demonstrate how Rust’s async ecosystem can reach record-breaking performance without compromising simplicity, safety, or maintainability. It shows that carefully engineered async Rust can deliver unmatched speed and reliability in real-world workloads, setting a new standard for what modern web frameworks can achieve. diff --git a/frameworks/Rust/water-http/benchmark_config.json b/frameworks/Rust/water-http/benchmark_config.json new file mode 100644 index 00000000000..2b28f0476a3 --- /dev/null +++ b/frameworks/Rust/water-http/benchmark_config.json @@ -0,0 +1,90 @@ +{ + "framework": "water-http", + "tests": [ + { + "default": { + "json_url": "/json", + "plaintext_url": "/plaintext", + "fortune_url": "/fortunes", + "db_url": "/db", + "query_url": "/queries?q=", + "update_url": "/updates?q=", + "port": 8080, + "approach": "Realistic", + "classification": "Micro", + "database": "Postgres", + "framework": "water_http", + "language": "Rust", + "orm": "raw", + "platform": "Rust", + "webserver": "water_http", + "os": "Linux", + "database_os": "Linux", + "display_name": "water_http" + }, + + "db": { + "fortune_url": "/fortunes", + "db_url": "/db", + "query_url": "/queries?q=", + "update_url": "/updates?q=", + "port": 8080, + "approach": "Realistic", + "classification": "Micro", + "database": "Postgres", + "framework": "water_http", + "language": "Rust", + "orm": "raw", + "platform": "Rust", + "webserver": "water_http", + "os": "Linux", + "database_os": "Linux", + "display_name": "water_http" + }, + + "cached": { + "cached_query_url": "/cached-queries?q=", + "port": 8080, + "approach": "Realistic", + "classification": "Micro", + "database": "Postgres", + "framework": "water_http", + "language": "Rust", + "orm": "raw", + "platform": "Rust", + "webserver": "water_http", + "os": "Linux", + "database_os": "Linux", + "display_name": "water_http" + }, + "json": { + "json_url": "/json", + "port": 8080, + "approach": "Realistic", + "classification": "Micro", + "framework": "water_http", + "language": "Rust", + "orm": "raw", + "platform": "Rust", + "webserver": "water_http", + "os": "Linux", + "database_os": "Linux", + "display_name": "water_http" + }, + "plaintext": { + "plaintext_url": "/plaintext", + "port": 8080, + "approach": "Realistic", + "classification": "Micro", + "framework": "water_http", + "language": "Rust", + "orm": "raw", + "platform": "Rust", + "webserver": "water_http", + "os": "Linux", + "database_os": "Linux", + "display_name": "water_http" + } + } + ] +} \ No newline at end of file diff --git a/frameworks/Rust/water-http/src/cached.rs b/frameworks/Rust/water-http/src/cached.rs new file mode 100644 index 00000000000..6e1cea1a77f --- /dev/null +++ b/frameworks/Rust/water-http/src/cached.rs @@ -0,0 +1,305 @@ +#![allow(static_mut_refs)] +use std::io; +use std::fmt::Arguments; +use std::io::Write; +use std::mem::MaybeUninit; +use std::rc::Rc; +use std::cell::UnsafeCell; +use std::collections::HashMap; +use nanorand::{Rng, WyRand}; +use tokio_postgres::{connect, Client, NoTls}; +use tokio_postgres::types::private::BytesMut; +use sonic_rs::prelude::WriteExt; +use std::pin::Pin; +use tokio::task::LocalSet; +use water_http::{InitControllersRoot, RunServer, WaterController}; +use water_http::http::{HttpSender, ResponseData}; +use water_http::server::{HttpContext, ServerConfigurations}; +use water_http::http::HttpSenderTrait; + +pub struct DbConnectionPool { + pub connections: Vec>, + pub next: UnsafeCell, +} + +impl DbConnectionPool { + /// Get a connection from the pool (round-robin, relaxed ordering) + #[inline(always)] + pub fn get_connection(&self) -> &Rc { + let n = unsafe{&mut *self.next.get()}; + *n +=1; + let idx = *n % self.connections.len(); + unsafe { self.connections.get_unchecked(idx) } + } + + /// Fill the pool with connections + pub async fn fill_pool(&mut self, url: &'static str, size: usize) { + let mut tasks = Vec::with_capacity(size); + for _ in 0..size { + tasks.push(tokio::task::spawn_local(async move { + for attempt in 0..5 { + match PgConnection::connect(url).await { + Ok(conn) => { + + return Ok(conn); }, + Err(_) if attempt < 4 => { + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + Err(_) => return Err(()), + } + } + Err(()) + })); + } + for t in tasks { + if let Ok(Ok(conn)) = t.await { + self.connections.push(Rc::new(conn)); + } + } + } + + +} + + + + +pub struct PgConnection { + cl:Client, + _connection_task: tokio::task::JoinHandle<()>, +} + +// Safety: Only used within LocalSet, no cross-thread access +impl PgConnection { + /// Connect to the database + + pub async fn connect(db_url: &str) -> Result { + let (cl, c) = tokio::time::timeout( + std::time::Duration::from_secs(5), + connect(db_url, NoTls), + ) + .await + .map_err(|_| ())? + .map_err(|_| ())?; + + let connection_task = tokio::task::spawn_local(async move { + let _ = c.await; + }); + + Ok(PgConnection { + _connection_task: connection_task, + cl + }) + } +} + +/// Zero-copy writer for BytesMut +pub struct BytesMuteWriter<'a>(pub &'a mut BytesMut); + +impl BytesMuteWriter<'_> { + + #[inline(always)] + pub fn extend_from_slice(&mut self,data:&[u8]){ + self.0.extend_from_slice(data); + } +} + +impl Write for BytesMuteWriter<'_> { + #[inline(always)] + fn write(&mut self, src: &[u8]) -> Result { + self.0.extend_from_slice(src); + Ok(src.len()) + } + + #[inline(always)] + fn flush(&mut self) -> Result<(), io::Error> { + Ok(()) + } +} + +impl std::fmt::Write for BytesMuteWriter<'_> { + #[inline(always)] + fn write_str(&mut self, s: &str) -> std::fmt::Result { + self.0.extend_from_slice(s.as_bytes()); + Ok(()) + } + + #[inline(always)] + fn write_char(&mut self, c: char) -> std::fmt::Result { + let mut buf = [0u8; 4]; + self.0.extend_from_slice(c.encode_utf8(&mut buf).as_bytes()); + Ok(()) + } + + #[inline(always)] + fn write_fmt(&mut self, args: Arguments<'_>) -> std::fmt::Result { + std::fmt::write(self, args) + } +} + +impl WriteExt for BytesMuteWriter<'_> { + #[inline(always)] + fn reserve_with(&mut self, additional: usize) -> Result<&mut [MaybeUninit], io::Error> { + self.0.reserve(additional); + unsafe { + let ptr = self.0.as_mut_ptr().add(self.0.len()) as *mut MaybeUninit; + Ok(std::slice::from_raw_parts_mut(ptr, additional)) + } + } + + #[inline(always)] + unsafe fn flush_len(&mut self, additional: usize) -> io::Result<()> { + self.0.set_len(self.0.len() + additional); + Ok(()) + } +} + + +InitControllersRoot! { + name:ROOT, + holder_type:MainType, + shared_type:SH, +} + +pub struct ThreadSharedStruct{ + writing_buffer:UnsafeCell, + rng:WyRand, +} + + +impl ThreadSharedStruct { + + #[inline(always)] + pub fn get_value(id:i32)->&'static i32{ + let map = unsafe {CACHED_VALUES.as_ref().unwrap().get(&id)} ; + map.unwrap() + } + pub fn get_cached_queries(&self,num:usize)->&[u8]{ + let buf = unsafe{&mut *(self.writing_buffer.get())}; + buf.clear(); + buf.extend_from_slice(br#"["#); + let mut writer = BytesMuteWriter(buf); + let mut rn = self.rng.clone(); + for _ in 0..num { + let rd: i32 = (rn.generate::() & 0x3FFF) as i32 % 10_000 + 1; + let v = Self::get_value(rd); + writer.extend_from_slice(br"{"); + _ = write!(writer, r#""id":{},"randomnumber":{}"#, rd, v); + writer.extend_from_slice(br"},"); + } + if buf.len() >1 {buf.truncate(buf.len() - 1);} + buf.extend_from_slice(b"]"); + return &buf[..] + } +} + +pub type MainType = u8; +pub type SH = Rc; + + +static mut CACHED_VALUES:Option> = None; + +pub fn run_server(){ + + _= std::thread::spawn( + ||{ + let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); + rt.block_on(async move { + const URL:&'static str = "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world"; + // const URL:&'static str = "postgres://postgres:root@localhost:5432/techmpower"; + + let mut pool = DbConnectionPool{ + connections:Vec::with_capacity( 1 + ), + next:0.into(), + // rt:tokio::runtime::Builder::new_multi_thread().enable_all().worker_threads(cpu_nums).build().unwrap() + }; + + let local_set = LocalSet::new(); + + _= local_set.run_until(async move { + tokio::task::spawn_local(async move { + pool.fill_pool(URL, 1).await; + let connection = pool.get_connection(); + let statement = connection.cl.prepare("SELECT id,randomnumber FROM World").await.unwrap(); + let res = connection.cl.query(&statement,&[]).await.unwrap(); + let mut map = HashMap::new(); + for row in res { + map.insert(row.get(0),row.get(1)); + } + unsafe { + let static_map = &mut CACHED_VALUES; + *static_map = Some(map); + } + }).await + }).await; + + }); + } + ).join(); + let cpu_nums = num_cpus::get(); + + + println!("start listening on port 8080 while workers count {cpu_nums}"); + let mut conf = ServerConfigurations::bind("0.0.0.0",8080); + conf.worker_threads_count = cpu_nums * 1 ; + + // let addresses = (0..cpu_nums).map(|_| { + // ("0.0.0.0".to_string(),8080) + // }).collect::>(); + // conf.addresses = addresses; + RunServer!( + conf, + ROOT, + EntryController, + shared_factory + ); +} + +fn shared_factory()->Pin>>{ + Box::pin(async { + + // const URL:&'static str = "postgres://postgres:root@localhost:5432/techmpower"; + + Rc::new(ThreadSharedStruct{ + writing_buffer:UnsafeCell::new(BytesMut::with_capacity(100_000)), + rng:WyRand::new() + }) + }) +} + +pub async fn handle_cached_queries(context:&mut HttpContext<'_,MainType,SH,HS,QS>){ + let q = context + .get_from_path_query("q") + .and_then(|v| v.parse::().ok()) // safely parse + .unwrap_or(1) // default to 1 if missing or invalid + .clamp(1, 500); + + let connection:SH = context.thread_shared_struct.clone().unwrap().clone(); + let data = connection.get_cached_queries(q); + let mut sender:HttpSender = context.sender(); + sender.set_header_ef("Content-Type","application/json"); + sender.set_header_ef("Server","water"); + let date = httpdate::fmt_http_date(std::time::SystemTime::now()); + sender.set_header_ef("Date",date); + _= sender.send_data_as_final_response( + ResponseData::Slice(data) + ).await; +} + +WaterController! { + holder -> super::MainType, + shared -> super::SH, + name -> EntryController, + functions -> { + + GET -> "cached-queries" -> query (context) { + _=super::handle_cached_queries(context).await; + } + } +} + +fn main() { + run_server(); +} + diff --git a/frameworks/Rust/water-http/src/db.rs b/frameworks/Rust/water-http/src/db.rs new file mode 100644 index 00000000000..40640735f88 --- /dev/null +++ b/frameworks/Rust/water-http/src/db.rs @@ -0,0 +1,320 @@ +#![cfg(any(feature = "db",feature = "all"))] +use std::{borrow::Cow, io}; +use std::fmt::Arguments; +use std::io::Write; +use std::mem::MaybeUninit; +use std::rc::Rc; +use std::cell::UnsafeCell; +use bytes::Buf; +use nanorand::{Rng, WyRand}; +use tokio_postgres::{connect, Client, Statement, NoTls}; +use tokio_postgres::types::private::BytesMut; +use crate::models::{Fortune, FortuneTemplate, World}; +use sonic_rs::prelude::WriteExt; +use yarte::TemplateBytesTrait; + +/// Database connection pool with thread-local RNG +pub struct DbConnectionPool { + pub connections: Vec>, + pub next: UnsafeCell, +} + +impl DbConnectionPool { + /// Get a connection from the pool (round-robin, relaxed ordering) + #[inline(always)] + pub fn get_connection(&self) -> &Rc { + let n = unsafe{&mut *self.next.get()}; + *n +=1; + let idx = *n % self.connections.len(); + unsafe { self.connections.get_unchecked(idx) } + } + + /// Fill the pool with connections + pub async fn fill_pool(&mut self, url: &'static str, size: usize) { + let mut tasks = Vec::with_capacity(size); + for _ in 0..size { + tasks.push(tokio::task::spawn_local(async move { + for attempt in 0..5 { + match PgConnection::connect(url).await { + Ok(conn) => { + + return Ok(conn); }, + Err(_) if attempt < 4 => { + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + Err(_) => return Err(()), + } + } + Err(()) + })); + } + for t in tasks { + if let Ok(Ok(conn)) = t.await { + self.connections.push(Rc::new(conn)); + } + } + } +} + + +/// Reusable buffer pool per connection +struct BufferPool { + body: BytesMut, + worlds: Vec, + numbers: Vec, + fortunes: Vec, + fortune_output: Vec, +} + +impl BufferPool { + fn new() -> Self { + Self { + body: BytesMut::with_capacity(4096), + worlds: Vec::with_capacity(501), + numbers: Vec::with_capacity(501), + + fortunes: Vec::with_capacity(501), + fortune_output: Vec::with_capacity(4096), + } + } + +} + +/// PostgreSQL connection wrapper with pre-allocated buffers +pub struct PgConnection { + pub cl: Client, + pub fortune: Statement, + pub world: Statement, + pub updates: Vec, + rang:WyRand, + buffers: UnsafeCell, + _connection_task: tokio::task::JoinHandle<()>, +} + +// Safety: Only used within LocalSet, no cross-thread access +impl PgConnection { + /// Connect to the database + + pub async fn connect(db_url: &str) -> Result { + let (cl, conn) = tokio::time::timeout( + std::time::Duration::from_secs(5), + connect(db_url, NoTls), + ) + .await + .map_err(|_| ())? + .map_err(|_| ())?; + + let connection_task = tokio::task::spawn_local(async move { + let _ = conn.await; + }); + + let fortune = cl.prepare("SELECT * FROM fortune").await.map_err(|_| ())?; + let world = cl.prepare("SELECT id,randomnumber FROM world WHERE id=$1 LIMIT 1").await.map_err(|_| ())?; + + // Pre-compile update statements for batch sizes 1-500 + let mut updates = vec![]; + for num in 1..=500 { + let sql = Self::generate_update_values_stmt(num); + updates.push(cl.prepare(&sql).await.unwrap()); + } + + Ok(PgConnection { + cl, + fortune, + world, + updates, + buffers: UnsafeCell::new(BufferPool::new()), + _connection_task: connection_task, + rang: WyRand::new() + }) + } /// Connect to the database + + #[inline(always)] + pub fn generate_update_values_stmt(batch_size: usize) -> String { + + let mut sql = String::from("UPDATE world SET randomNumber = w.r FROM (VALUES "); + + for i in 0..batch_size { + let id_param = i * 2 + 1; + let val_param = id_param + 1; + sql.push_str(&format!("(${}::int, ${}::int),", id_param, val_param)); + } + + // Remove the trailing comma + sql.pop(); + + sql.push_str(") AS w(i, r) WHERE world.id = w.i"); + sql + } + + /// Get mutable access to buffers (safe because connection pool ensures single access) + #[inline(always)] + fn buffers(&self) -> &mut BufferPool { + unsafe { &mut *self.buffers.get() } + } + + /// Get a single random world - optimized with buffer reuse + #[inline] + pub async fn get_world(&self) -> &[u8] { + let rd = (self.rang.clone().generate::() % 10_000 + 1) as i32; + let row = self.cl.query_one(&self.world, &[&rd]).await.unwrap(); + + let buffers = self.buffers(); + buffers.body.clear(); + + sonic_rs::to_writer( + BytesMuteWriter(&mut buffers.body), + &World { + id: row.get(0), + randomnumber: row.get(1), + }, + ).unwrap(); + + buffers.body.chunk() + } + + /// Get multiple random worlds - optimized with buffer reuse + pub async fn get_worlds(&self, num: usize) -> &[u8] { + let buffers = self.buffers(); + buffers.worlds.clear(); + let mut rn = self.rang.clone(); + for _ in 0..num { + let id: i32 = (rn.generate::() & 0x3FFF) as i32 % 10_000 + 1; + let row = self.cl.query_one(&self.world, &[&id]).await.unwrap(); + buffers.worlds.push(World { + id: row.get(0), + randomnumber: row.get(1), + }); + } + buffers.body.clear(); + sonic_rs::to_writer(BytesMuteWriter(&mut buffers.body), &buffers.worlds).unwrap(); + buffers.body.chunk() + } + /// Update worlds in batch - optimized with buffer reuse + /// Update worlds in batch - optimized with buffer reuse + + /// Update worlds in batch - optimized with buffer reuse + + /// Update worlds in batch - optimized with buffer reuse + + /// Update worlds in batch - optimized with RETURNING clause to minimize reads + /// Update worlds - fetch and update each row to handle duplicates correctly + /// Update worlds in batch using CASE statement + pub async fn update(&self, num: usize) -> &[u8] { + + let buffers = self.buffers(); + let mut ids:Vec = Vec::with_capacity(num); + let mut rng = self.rang.clone(); + let mut params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = + Vec::with_capacity(num * 2); + let mut futures =vec![]; + for _ in 0..num { + let w_id = (rng.generate::() % 10_000 + 1) as i32; + ids.push(w_id); + } + futures.extend(ids.iter().map(|x| async move {self.cl.query_one(&self.world,&[&x]).await})); + futures_util::future::join_all(futures).await; + ids.sort_unstable(); + buffers.worlds.clear(); + for index in 0..num { + let s_id = (rng.generate::() % 10_000 + 1 ) as i32; + buffers.worlds.push(World{ + id:ids[index], + randomnumber:s_id + }); + buffers.numbers.push(s_id); + } + buffers.body.clear(); + for index in 0..num { + params.push(&ids[index]); + params.push(&buffers.numbers[index]); + } + + _=self.cl.execute(&self.updates[num - 1], ¶ms).await.unwrap(); + sonic_rs::to_writer(BytesMuteWriter(&mut buffers.body), &buffers.worlds).unwrap(); + buffers.body.chunk() + } + + + /// Tell fortunes - optimized with buffer reuse + pub async fn tell_fortune(&self) -> Result<&[u8], ()> { + let res = self.cl.query(&self.fortune, &[]).await.map_err(|_| ())?; + + let buffers = self.buffers(); + buffers.fortunes.clear(); + buffers.fortune_output.clear(); + + buffers.fortunes.push(Fortune { + id: 0, + message: Cow::Borrowed("Additional fortune added at request time."), + }); + + for row in res { + buffers.fortunes.push(Fortune { + id: row.get(0), + message: Cow::Owned(row.get(1)), + }); + } + + buffers.fortunes.sort_unstable_by(|a, b| a.message.cmp(&b.message)); + + let template = FortuneTemplate { items: &buffers.fortunes }; + template.write_call(&mut buffers.fortune_output); + + // Return reference to buffer - zero-copy! + Ok(&buffers.fortune_output) + } +} + +/// Zero-copy writer for BytesMut +pub struct BytesMuteWriter<'a>(pub &'a mut BytesMut); + +impl Write for BytesMuteWriter<'_> { + #[inline(always)] + fn write(&mut self, src: &[u8]) -> Result { + self.0.extend_from_slice(src); + Ok(src.len()) + } + + #[inline(always)] + fn flush(&mut self) -> Result<(), io::Error> { + Ok(()) + } +} + +impl std::fmt::Write for BytesMuteWriter<'_> { + #[inline(always)] + fn write_str(&mut self, s: &str) -> std::fmt::Result { + self.0.extend_from_slice(s.as_bytes()); + Ok(()) + } + + #[inline(always)] + fn write_char(&mut self, c: char) -> std::fmt::Result { + let mut buf = [0u8; 4]; + self.0.extend_from_slice(c.encode_utf8(&mut buf).as_bytes()); + Ok(()) + } + + #[inline(always)] + fn write_fmt(&mut self, args: Arguments<'_>) -> std::fmt::Result { + std::fmt::write(self, args) + } +} + +impl WriteExt for BytesMuteWriter<'_> { + #[inline(always)] + fn reserve_with(&mut self, additional: usize) -> Result<&mut [MaybeUninit], io::Error> { + self.0.reserve(additional); + unsafe { + let ptr = self.0.as_mut_ptr().add(self.0.len()) as *mut MaybeUninit; + Ok(std::slice::from_raw_parts_mut(ptr, additional)) + } + } + + #[inline(always)] + unsafe fn flush_len(&mut self, additional: usize) -> io::Result<()> { + self.0.set_len(self.0.len() + additional); + Ok(()) + } +} \ No newline at end of file diff --git a/frameworks/Rust/water-http/src/json.rs b/frameworks/Rust/water-http/src/json.rs new file mode 100644 index 00000000000..71f406b6051 --- /dev/null +++ b/frameworks/Rust/water-http/src/json.rs @@ -0,0 +1,55 @@ +use water_http::{InitControllersRoot, RunServer, WaterController}; +use water_http::server::ServerConfigurations; + +InitControllersRoot! { + name:ROOT, + holder_type:MainType, +} + + + +pub type MainType = u8; + + +fn main() { + run_server(); +} + + +pub fn run_server(){ + + let cpu_nums = num_cpus::get(); + + + println!("start listening on port 8080 while workers count {cpu_nums}"); + let mut conf = ServerConfigurations::bind("0.0.0.0",8080); + conf.worker_threads_count = cpu_nums * 2 ; + + RunServer!( + conf, + ROOT, + EntryController + ); +} + +const JSON_RESPONSE:&'static [u8] = br#"{"message":"Hello, World!"}"#; + + +WaterController! { + holder -> super::MainType, + name -> EntryController, + functions -> { + + + GET => json => j(cx){ + let mut sender = cx.sender(); + sender.set_header_ef("Content-Type","application/json"); + sender.set_header_ef("Server","water"); + let date = httpdate::fmt_http_date(std::time::SystemTime::now()); + sender.set_header_ef("Date",date); + _=sender.send_data_as_final_response(http::ResponseData::Slice(super::JSON_RESPONSE)).await; + } + + } +} + diff --git a/frameworks/Rust/water-http/src/main.rs b/frameworks/Rust/water-http/src/main.rs new file mode 100644 index 00000000000..487409af1d6 --- /dev/null +++ b/frameworks/Rust/water-http/src/main.rs @@ -0,0 +1,7 @@ +mod server; +pub mod models; +mod db; + +fn main() { + server::run_server(); +} diff --git a/frameworks/Rust/water-http/src/models.rs b/frameworks/Rust/water-http/src/models.rs new file mode 100644 index 00000000000..1fee7283ba9 --- /dev/null +++ b/frameworks/Rust/water-http/src/models.rs @@ -0,0 +1,25 @@ +use std::borrow::Cow; +use sonic_rs::{Serialize}; +// use askama::Template; +#[derive(Serialize)] +pub struct World { + pub id: i32, + pub randomnumber: i32, +} +#[derive(Serialize,Debug)] +pub struct Fortune { + pub id: i32, + pub message: Cow<'static, str>, +} + +#[derive(yarte::TemplateBytes)] +#[template(path = "fortune.hbs")] +pub struct FortuneTemplate<'a>{ + pub items:&'a Vec +} + + + +// pub async fn to(model:FortuneTemplate<'_>){ +// model.r +// } \ No newline at end of file diff --git a/frameworks/Rust/water-http/src/plaintext.rs b/frameworks/Rust/water-http/src/plaintext.rs new file mode 100644 index 00000000000..197da216e9c --- /dev/null +++ b/frameworks/Rust/water-http/src/plaintext.rs @@ -0,0 +1,54 @@ +use water_http::{InitControllersRoot, RunServer, WaterController}; +use water_http::server::ServerConfigurations; + +InitControllersRoot! { + name:ROOT, + holder_type:MainType, +} + + + +pub type MainType = u8; + + +fn main() { + run_server(); +} + + +pub fn run_server(){ + + let cpu_nums = num_cpus::get(); + + + println!("start listening on port 8080 while workers count {cpu_nums}"); + let mut conf = ServerConfigurations::bind("0.0.0.0",8080); + conf.worker_threads_count = cpu_nums * 1 ; + + RunServer!( + conf, + ROOT, + EntryController + ); +} + +const JSON_RESPONSE:&'static [u8] = br#"{"message":"Hello, World!"}"#; +const P:&'static [u8] = br#"Hello, World!"#; + + +WaterController! { + holder -> super::MainType, + name -> EntryController, + functions -> { + + GET => plaintext => p(cx) { + let mut sender = cx.sender(); + sender.set_header_ef("Content-Type","text/plain; charset=utf-8"); + sender.set_header_ef("Server","water"); + let date = httpdate::fmt_http_date(std::time::SystemTime::now()); + sender.set_header_ef("Date",date); + _=sender.send_data_as_final_response(http::ResponseData::Str("Hello, World!")).await; + } + } +} + diff --git a/frameworks/Rust/water-http/src/server.rs b/frameworks/Rust/water-http/src/server.rs new file mode 100644 index 00000000000..be464a0e3d6 --- /dev/null +++ b/frameworks/Rust/water-http/src/server.rs @@ -0,0 +1,290 @@ + +use std::pin::Pin; +use std::rc::Rc; +use water_http::{InitControllersRoot, RunServer, WaterController}; +use water_http::server::ServerConfigurations; +use crate::db::{DbConnectionPool}; +InitControllersRoot! { + name:ROOT, + holder_type:MainType, + shared_type:SharedType, +} + +pub struct ThreadSharedStruct{ + pg_connection: DbConnectionPool +} + +pub type MainType = u8; +pub type SharedType = Rc; + + + + +pub fn run_server(){ + + let cpu_nums = num_cpus::get(); + + + println!("start listening on port 8080 while workers count {cpu_nums}"); + let mut conf = ServerConfigurations::bind("0.0.0.0",8080); + #[cfg(feature = "json_plaintext")] + { + conf.worker_threads_count = cpu_nums * 2 ; + } + #[cfg(not(feature = "json_plaintext"))] + { + conf.worker_threads_count = cpu_nums * 1 ; + } + + // let addresses = (0..cpu_nums).map(|_| { + // ("0.0.0.0".to_string(),8080) + // }).collect::>(); + // conf.addresses = addresses; + RunServer!( + conf, + ROOT, + EntryController, + shared_factory + ); +} + +fn shared_factory()->Pin>>{ + Box::pin(async { + + // const URL:&'static str = "postgres://postgres:root@localhost:5432/techmpower"; + const URL:&'static str = "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world"; + let mut pool = DbConnectionPool{ + connections:Vec::with_capacity( 1 + ), + next:0.into(), + // rt:tokio::runtime::Builder::new_multi_thread().enable_all().worker_threads(cpu_nums).build().unwrap() + }; + pool.fill_pool(URL, 1).await; + Rc::new(ThreadSharedStruct{ + pg_connection:pool + }) + }) +} + +#[cfg(any(feature = "json_plaintext",feature = "all"))] +const JSON_RESPONSE:&'static [u8] = br#"{"message":"Hello, World!"}"#; +#[cfg(any(feature = "json_plaintext",feature = "all"))] +const P:&'static [u8] = br#"Hello, World!"#; + +#[cfg(feature = "all")] +WaterController! { + holder -> super::MainType, + shared -> super::SharedType, + name -> EntryController, + functions -> { + + + GET => json => j(cx){ + let mut sender = cx.sender(); + sender.set_header_ef("Content-Type","application/json"); + sender.set_header_ef("Server","water"); + let date = httpdate::fmt_http_date(std::time::SystemTime::now()); + sender.set_header_ef("Date",date); + _=sender.send_data_as_final_response(http::ResponseData::Slice(super::JSON_RESPONSE)).await; + } + + GET => plaintext => p(cx){ + let mut sender = cx.sender(); + sender.set_header_ef("Content-Type","text/plain; charset=utf-8"); + sender.set_header_ef("Server","water"); + let date = httpdate::fmt_http_date(std::time::SystemTime::now()); + sender.set_header_ef("Date",date); + _=sender.send_data_as_final_response(http::ResponseData::Slice(super::P)).await; + } + + + GET -> db -> db (context){ + let connection:Shared = context.thread_shared_struct.clone().unwrap().clone(); + let connection = connection.pg_connection.get_connection(); + let data = connection.get_world().await; + let mut sender = context.sender(); + sender.set_header_ef("Content-Type","application/json"); + sender.set_header_ef("Server","water"); + let date = httpdate::fmt_http_date(std::time::SystemTime::now()); + sender.set_header_ef("Date",date); + _= sender.send_data_as_final_response( + http::ResponseData::Slice(data) + ).await; + } + GET -> queries -> query (context){ + let q = context + .get_from_path_query("q") + .and_then(|v| v.parse::().ok()) // safely parse + .unwrap_or(1) // default to 1 if missing or invalid + .clamp(1, 500); + + let connection:Shared = context.thread_shared_struct.clone().unwrap().clone(); + let connection = connection.pg_connection.get_connection(); + let data = connection.get_worlds(q).await; + let mut sender = context.sender(); + sender.set_header_ef("Content-Type","application/json"); + sender.set_header_ef("Server","water"); + let date = httpdate::fmt_http_date(std::time::SystemTime::now()); + sender.set_header_ef("Date",date); + _= sender.send_data_as_final_response( + http::ResponseData::Slice(data) + ).await; + } + + GET -> updates -> update (context){ + let q = context + .get_from_path_query("q") + .and_then(|v| v.parse::().ok()) // safely parse + .unwrap_or(1) // default to 1 if missing or invalid + .clamp(1, 500); + let connection:Shared = context.thread_shared_struct.clone().unwrap().clone(); + let connection = connection.pg_connection.get_connection(); + let data = connection.update(q).await; + let mut sender = context.sender(); + sender.set_header_ef("Content-Type","application/json"); + sender.set_header_ef("Server","water"); + let date = httpdate::fmt_http_date(std::time::SystemTime::now()); + sender.set_header_ef("Date",date); + _= sender.send_data_as_final_response( + http::ResponseData::Slice(data) + ).await; + } + + + GET -> fortunes -> ft (context){ + + let connection:Shared = context.thread_shared_struct.clone().unwrap().clone(); + let connection = connection.pg_connection.get_connection(); + let data = match connection.tell_fortune().await { + Ok(r)=>{r}, + _=>{ + _= context.send_str("failed to connect").await; + return + } + }; + let mut sender = context.sender(); + sender.set_header_ef("Content-Type","text/html; charset=UTF-8"); + sender.set_header_ef("Server","water"); + let date = httpdate::fmt_http_date(std::time::SystemTime::now()); + sender.set_header_ef("Date",date); + _= sender.send_data_as_final_response( + http::ResponseData::Slice(&data) + ).await; + } + + } +} + + + +#[cfg(all(not(feature = "all"),feature = "json_plaintext"))] +WaterController! { + holder -> super::MainType, + shared -> super::SharedType, + name -> EntryController, + functions -> { + + + GET => json => j(cx){ + let mut sender = cx.sender(); + sender.set_header_ef("Content-Type","application/json"); + sender.set_header_ef("Server","water"); + let date = httpdate::fmt_http_date(std::time::SystemTime::now()); + sender.set_header_ef("Date",date); + _=sender.send_data_as_final_response(http::ResponseData::Slice(super::JSON_RESPONSE)).await; + } + + GET => plaintext => p(cx) { + let mut sender = cx.sender(); + sender.set_header_ef("Content-Type","text/plain; charset=utf-8"); + sender.set_header_ef("Server","water"); + let date = httpdate::fmt_http_date(std::time::SystemTime::now()); + sender.set_header_ef("Date",date); + _=sender.send_data_as_final_response(http::ResponseData::Slice(super::P)).await; + } + } +} + +#[cfg(all(not(feature = "all"),feature = "db"))] +WaterController! { + holder -> super::MainType, + shared -> super::SharedType, + name -> EntryController, + functions -> { + + + GET -> db -> db (context){ + let connection:Shared = context.thread_shared_struct.clone().unwrap().clone(); + let connection = connection.pg_connection.get_connection(); + let data = connection.get_world().await; + let mut sender = context.sender(); + sender.set_header_ef("Content-Type","application/json"); + sender.set_header_ef("Server","water"); + let date = httpdate::fmt_http_date(std::time::SystemTime::now()); + sender.set_header_ef("Date",date); + _= sender.send_data_as_final_response( + http::ResponseData::Slice(data) + ).await; + } + GET -> queries -> query (context){ + let q = context + .get_from_path_query("q") + .and_then(|v| v.parse::().ok()) // safely parse + .unwrap_or(1) // default to 1 if missing or invalid + .clamp(1, 500); + + let connection:Shared = context.thread_shared_struct.clone().unwrap().clone(); + let connection = connection.pg_connection.get_connection(); + let data = connection.get_worlds(q).await; + let mut sender = context.sender(); + sender.set_header_ef("Content-Type","application/json"); + sender.set_header_ef("Server","water"); + let date = httpdate::fmt_http_date(std::time::SystemTime::now()); + sender.set_header_ef("Date",date); + _= sender.send_data_as_final_response( + http::ResponseData::Slice(data) + ).await; + } + + GET -> updates -> update (context){ + let q = context + .get_from_path_query("q") + .and_then(|v| v.parse::().ok()) // safely parse + .unwrap_or(1) // default to 1 if missing or invalid + .clamp(1, 500); + let connection:Shared = context.thread_shared_struct.clone().unwrap().clone(); + let connection = connection.pg_connection.get_connection(); + let data = connection.update(q).await; + let mut sender = context.sender(); + sender.set_header_ef("Content-Type","application/json"); + sender.set_header_ef("Server","water"); + let date = httpdate::fmt_http_date(std::time::SystemTime::now()); + sender.set_header_ef("Date",date); + _= sender.send_data_as_final_response( + http::ResponseData::Slice(data) + ).await; + } + + + GET -> fortunes -> ft (context){ + + let connection:Shared = context.thread_shared_struct.clone().unwrap().clone(); + let connection = connection.pg_connection.get_connection(); + let data = match connection.tell_fortune().await { + Ok(r)=>{r}, + _=>{ + _= context.send_str("failed to connect").await; + return + } + }; + let mut sender = context.sender(); + sender.set_header_ef("Content-Type","text/html; charset=UTF-8"); + sender.set_header_ef("Server","water"); + let date = httpdate::fmt_http_date(std::time::SystemTime::now()); + sender.set_header_ef("Date",date); + _= sender.send_data_as_final_response( + http::ResponseData::Slice(&data) + ).await; + } + } +} \ No newline at end of file diff --git a/frameworks/Rust/water-http/templates/fortune.hbs b/frameworks/Rust/water-http/templates/fortune.hbs new file mode 100644 index 00000000000..b219172a075 --- /dev/null +++ b/frameworks/Rust/water-http/templates/fortune.hbs @@ -0,0 +1,5 @@ +Fortunes +{{~# each items ~}} + +{{~/each ~}} +
idmessage
{{id}}{{message}}
diff --git a/frameworks/Rust/water-http/templates/fortune.html b/frameworks/Rust/water-http/templates/fortune.html new file mode 100644 index 00000000000..35923e04b21 --- /dev/null +++ b/frameworks/Rust/water-http/templates/fortune.html @@ -0,0 +1,12 @@ + + +Fortunes + + + + {% for item in items %} + + {% endfor %} +
idmessage
{{item.id}}{{item.message}}
+ + diff --git a/frameworks/Rust/water-http/water-http-cached.dockerfile b/frameworks/Rust/water-http/water-http-cached.dockerfile new file mode 100644 index 00000000000..3c873c71520 --- /dev/null +++ b/frameworks/Rust/water-http/water-http-cached.dockerfile @@ -0,0 +1,13 @@ +FROM rust:latest + +RUN apt-get update -yqq && apt-get install -yqq cmake g++ + +ADD ./ /water +WORKDIR /water + +RUN cargo clean +RUN RUSTFLAGS="-C target-cpu=native" cargo build --release --bin cache --features cache + +EXPOSE 8080 + +CMD ./target/release/cache \ No newline at end of file diff --git a/frameworks/Rust/water-http/water-http-db.dockerfile b/frameworks/Rust/water-http/water-http-db.dockerfile new file mode 100644 index 00000000000..01d9507b35f --- /dev/null +++ b/frameworks/Rust/water-http/water-http-db.dockerfile @@ -0,0 +1,13 @@ +FROM rust:latest + +RUN apt-get update -yqq && apt-get install -yqq cmake g++ + +ADD ./ /water +WORKDIR /water + +RUN cargo clean +RUN RUSTFLAGS="-C target-cpu=native" cargo build --release --bin water-http --features db + +EXPOSE 8080 + +CMD ./target/release/water-http \ No newline at end of file diff --git a/frameworks/Rust/water-http/water-http-json.dockerfile b/frameworks/Rust/water-http/water-http-json.dockerfile new file mode 100644 index 00000000000..4c457496cec --- /dev/null +++ b/frameworks/Rust/water-http/water-http-json.dockerfile @@ -0,0 +1,13 @@ +FROM rust:latest + +RUN apt-get update -yqq && apt-get install -yqq cmake g++ + +ADD ./ /water +WORKDIR /water + +RUN cargo clean +RUN RUSTFLAGS="-C target-cpu=native" cargo build --release --bin json --features json_plaintext + +EXPOSE 8080 + +CMD ./target/release/json \ No newline at end of file diff --git a/frameworks/Rust/water-http/water-http-plaintext.dockerfile b/frameworks/Rust/water-http/water-http-plaintext.dockerfile new file mode 100644 index 00000000000..ab53eb54496 --- /dev/null +++ b/frameworks/Rust/water-http/water-http-plaintext.dockerfile @@ -0,0 +1,13 @@ +FROM rust:latest + +RUN apt-get update -yqq && apt-get install -yqq cmake g++ + +ADD ./ /water +WORKDIR /water + +RUN cargo clean +RUN RUSTFLAGS="-C target-cpu=native" cargo build --release --bin plaintext --features json_plaintext + +EXPOSE 8080 + +CMD ./target/release/plaintext \ No newline at end of file diff --git a/frameworks/Rust/water-http/water-http.dockerfile b/frameworks/Rust/water-http/water-http.dockerfile new file mode 100644 index 00000000000..5946c8b3821 --- /dev/null +++ b/frameworks/Rust/water-http/water-http.dockerfile @@ -0,0 +1,13 @@ +FROM rust:latest + +RUN apt-get update -yqq && apt-get install -yqq cmake g++ + +ADD ./ /water +WORKDIR /water + +RUN cargo clean +RUN RUSTFLAGS="-C target-cpu=native" cargo build --release --bin water-http --features all + +EXPOSE 8080 + +CMD ./target/release/water-http \ No newline at end of file