diff --git a/Cargo.toml b/Cargo.toml index efb5cae..cea3a7f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,9 +5,24 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +default-run = "proxy" + +[[bin]] +name = "proxy" +path = "src/bin/proxy.rs" + +[[bin]] +name = "ping-server" +path = "src/bin/ping_server.rs" + +[[bin]] +name = "ping-client" +path = "src/bin/ping_client.rs" + [dependencies] anyhow = "1.0.69" axum = { version = "0.6.11", features = ["json"] } +clap = { version = "4.3.0", features = ["derive"] } p384 = { version = "0.13.0", features = ["ecdsa", "serde"] } serde = { version = "1.0.155", features = ["derive"] } serde_json = "1.0.94" diff --git a/src/bin/ping_client.rs b/src/bin/ping_client.rs new file mode 100644 index 0000000..0960f3d --- /dev/null +++ b/src/bin/ping_client.rs @@ -0,0 +1,100 @@ +use clap::Parser; +use std::net::SocketAddrV4; +use std::str::FromStr; +use std::sync::{ + atomic::{AtomicBool, AtomicU32, Ordering}, + Arc, Mutex, +}; +use std::time::{Duration, Instant}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::TcpStream, + time::sleep, +}; + +#[tokio::main] +async fn main() { + let args = Args::parse(); + + let in_transit = Arc::new(AtomicBool::new(false)); + let in_transit2 = in_transit.clone(); + let out_timestamp = Arc::new(Mutex::new(Instant::now())); + let out_timestamp2 = out_timestamp.clone(); + let count = AtomicU32::new(args.count); + + let addr = SocketAddrV4::from_str(&args.address).unwrap(); + + let stream = TcpStream::connect(addr).await.unwrap(); + if !args.csv { + println!("Ping {addr}"); + } + + let (mut si, mut so) = stream.into_split(); + + let ping_in = async move { + let mut read_buf = [0; 1024]; + while count.load(Ordering::Relaxed) > 0 { + let bytes = si.read(&mut read_buf).await.unwrap(); + if bytes > 0 { + let in_timestamp = Instant::now(); + // println!("Received {bytes} bytes"); + + let duration = in_timestamp.duration_since(*out_timestamp.lock().unwrap()); + let i = u32::from_be_bytes(read_buf[0..bytes].try_into().unwrap()); + let rtt = duration.as_micros(); + if !args.csv { + println!("Ping {i} arrived with RTT of {rtt}us"); + } else { + println!("{rtt},"); + } + // let old = count.fetch_sub(1, Ordering::Relaxed); + // println!("Old count value: {old}"); + count.fetch_sub(1, Ordering::Relaxed); + in_transit2.store(false, Ordering::Relaxed); + } + } + if !args.csv { + println!("Done receiving"); + } + }; + + let ping_out = async move { + for i in 1..=args.count { + if let Ok(false) = + in_transit.compare_exchange_weak(false, true, Ordering::Relaxed, Ordering::Relaxed) + { + *out_timestamp2.lock().unwrap() = Instant::now(); + so.write_u32(i).await.unwrap(); + } + + if !args.csv { + println!("Sending ping {i}"); + } + sleep(Duration::from_millis(args.time)).await; + } + if !args.csv { + println!("Done sending pings"); + } + }; + + tokio::join!(ping_out, ping_in); +} + +#[derive(Parser, Debug)] +struct Args { + /// Socket address to ping + #[arg(long)] + address: String, + + /// Amount of pings to send + #[arg(short, long)] + count: u32, + + /// Time between pings + #[arg(short, long)] + time: u64, + + /// CSV mode + #[arg(long)] + csv: bool, +} diff --git a/src/bin/ping_server.rs b/src/bin/ping_server.rs new file mode 100644 index 0000000..4e403e1 --- /dev/null +++ b/src/bin/ping_server.rs @@ -0,0 +1,33 @@ +use clap::Parser; +use std::error::Error; +use std::net::SocketAddrV4; +use std::str::FromStr; +use tokio::io::AsyncWriteExt; +use tokio::{io, net::TcpListener}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args = Args::parse(); + + let addr = SocketAddrV4::from_str(&args.address).unwrap(); + + let listener = TcpListener::bind(&addr).await?; + println!("Listening on: {addr}"); + + loop { + let (mut socket, _) = listener.accept().await?; + + tokio::spawn(async move { + let (mut si, mut so) = socket.split(); + io::copy(&mut si, &mut so).await?; + so.shutdown().await + }); + } +} + +#[derive(Parser, Debug)] +struct Args { + /// Socket address to listen on + #[arg(long)] + address: String, +} diff --git a/src/main.rs b/src/bin/proxy.rs similarity index 100% rename from src/main.rs rename to src/bin/proxy.rs diff --git a/src/lib.rs b/src/lib.rs index 2ad49f3..bc746c7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,6 +7,7 @@ use std::collections::{HashMap, HashSet}; use std::net::{IpAddr, SocketAddr}; use std::str::FromStr; use std::sync::{Arc, Mutex, RwLock}; +use std::time; use tokio::io::{self, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::watch::{self, Receiver, Sender}; @@ -16,6 +17,7 @@ use uuid::Uuid; pub struct ProxyCommand { #[serde(flatten)] command: Command, + timestamp: Option, signature: Option, } @@ -23,8 +25,34 @@ impl ProxyCommand { fn verify_signature(&self, verifying_key: &Option) -> bool { match (verifying_key, &self.signature) { (Some(key), Some(signature)) => { - let message = serde_json::to_string(&self.command).unwrap(); - key.verify(message.as_bytes(), signature).is_ok() + let mut message = serde_json::to_string(&self.command).unwrap(); + + let timestamp = if let Some(timestamp) = self.timestamp { + message.push_str(×tamp.to_string()); + time::Duration::from_secs(timestamp) + } else { + tracing::debug!("timestamp missing while signature is present"); + return false; // timestamp missing with signature present + }; + + if !key.verify(message.as_bytes(), signature).is_ok() { + tracing::debug!("signature does not match message"); + return false; // signature doesn't match + } + + let now = time::SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .unwrap(); + if timestamp > (now + time::Duration::from_secs(30)) { + tracing::warn!("command is more than 30s from the future"); + false + } else if now - timestamp <= time::Duration::from_secs(60) { + // less than a minute old + true + } else { + tracing::warn!("command is more than a minute old"); + false + } } (Some(_), None) => false, (None, _) => true, @@ -310,7 +338,10 @@ async fn transfer( #[cfg(test)] mod tests { - use std::net::{IpAddr, Ipv4Addr}; + use std::{ + net::{IpAddr, Ipv4Addr}, + time, + }; use crate::{Command, ProxyCommand}; use p384::{ @@ -330,10 +361,12 @@ mod tests { destination_ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), id: uuid!("67e55044-10b1-426f-9247-bb680e5fe0c8"), }, + timestamp: Some(8888), signature: Some(signature), }; let expected = "{\"create\":{\"incoming_port\":5555,\"destination_port\":6666,\"\ destination_ip\":\"127.0.0.1\",\"id\":\"67e55044-10b1-426f-9247-bb680e5fe0c8\"},\ + \"timestamp\":8888,\ \"signature\":\"\ 5C912C4B3BFF2ADB49885DCBDB53D6D3041D0632E498CDFF\ 2114CD2DCAC936AB0901B47C411E5BB57FE77BEF96044940\ @@ -351,9 +384,11 @@ mod tests { command: Command::Delete { id: uuid!("67e55044-10b1-426f-9247-bb680e5fe0c8"), }, + timestamp: Some(987654), signature: Some(signature), }; let expected = "{\"delete\":{\"id\":\"67e55044-10b1-426f-9247-bb680e5fe0c8\"},\ + \"timestamp\":987654,\ \"signature\":\"\ 5C912C4B3BFF2ADB49885DCBDB53D6D3041D0632E498CDFF\ 2114CD2DCAC936AB0901B47C411E5BB57FE77BEF96044940\ @@ -365,6 +400,11 @@ mod tests { #[test] fn verify_signature() { + let subscriber = tracing_subscriber::FmtSubscriber::builder() + .with_max_level(tracing::Level::TRACE) + .finish(); + tracing::subscriber::set_global_default(subscriber).unwrap(); + let command = Command::Create { incoming_port: 4567, destination_port: 7654, @@ -374,12 +414,18 @@ mod tests { // Create signed message let signing_key = SigningKey::random(&mut OsRng); - let message = serde_json::to_string(&command).unwrap(); + let timestamp = time::SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .unwrap() + .as_secs(); + let mut message = serde_json::to_string(&command).unwrap(); + message.push_str(×tamp.to_string()); let signature: Signature = signing_key.sign(message.as_bytes()); let bytes = signature.to_bytes(); assert_eq!(bytes.len(), 96); let proxy_command = ProxyCommand { command, + timestamp: Some(timestamp), signature: Some(signature), };