100
									
								
								src/bin/ping_client.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										100
									
								
								src/bin/ping_client.rs
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,100 @@
 | 
			
		||||
use clap::Parser;
 | 
			
		||||
use std::net::SocketAddrV4;
 | 
			
		||||
use std::str::FromStr;
 | 
			
		||||
use std::sync::{
 | 
			
		||||
    atomic::{AtomicBool, AtomicU32, Ordering},
 | 
			
		||||
    Arc, Mutex,
 | 
			
		||||
};
 | 
			
		||||
use std::time::{Duration, Instant};
 | 
			
		||||
use tokio::{
 | 
			
		||||
    io::{AsyncReadExt, AsyncWriteExt},
 | 
			
		||||
    net::TcpStream,
 | 
			
		||||
    time::sleep,
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
#[tokio::main]
 | 
			
		||||
async fn main() {
 | 
			
		||||
    let args = Args::parse();
 | 
			
		||||
 | 
			
		||||
    let in_transit = Arc::new(AtomicBool::new(false));
 | 
			
		||||
    let in_transit2 = in_transit.clone();
 | 
			
		||||
    let out_timestamp = Arc::new(Mutex::new(Instant::now()));
 | 
			
		||||
    let out_timestamp2 = out_timestamp.clone();
 | 
			
		||||
    let count = AtomicU32::new(args.count);
 | 
			
		||||
 | 
			
		||||
    let addr = SocketAddrV4::from_str(&args.address).unwrap();
 | 
			
		||||
 | 
			
		||||
    let stream = TcpStream::connect(addr).await.unwrap();
 | 
			
		||||
    if !args.csv {
 | 
			
		||||
        println!("Ping {addr}");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    let (mut si, mut so) = stream.into_split();
 | 
			
		||||
 | 
			
		||||
    let ping_in = async move {
 | 
			
		||||
        let mut read_buf = [0; 1024];
 | 
			
		||||
        while count.load(Ordering::Relaxed) > 0 {
 | 
			
		||||
            let bytes = si.read(&mut read_buf).await.unwrap();
 | 
			
		||||
            if bytes > 0 {
 | 
			
		||||
                let in_timestamp = Instant::now();
 | 
			
		||||
                // println!("Received {bytes} bytes");
 | 
			
		||||
 | 
			
		||||
                let duration = in_timestamp.duration_since(*out_timestamp.lock().unwrap());
 | 
			
		||||
                let i = u32::from_be_bytes(read_buf[0..bytes].try_into().unwrap());
 | 
			
		||||
                let rtt = duration.as_micros();
 | 
			
		||||
                if !args.csv {
 | 
			
		||||
                    println!("Ping {i} arrived with RTT of {rtt}us");
 | 
			
		||||
                } else {
 | 
			
		||||
                    println!("{rtt},");
 | 
			
		||||
                }
 | 
			
		||||
                // let old = count.fetch_sub(1, Ordering::Relaxed);
 | 
			
		||||
                // println!("Old count value: {old}");
 | 
			
		||||
                count.fetch_sub(1, Ordering::Relaxed);
 | 
			
		||||
                in_transit2.store(false, Ordering::Relaxed);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        if !args.csv {
 | 
			
		||||
            println!("Done receiving");
 | 
			
		||||
        }
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    let ping_out = async move {
 | 
			
		||||
        for i in 1..=args.count {
 | 
			
		||||
            if let Ok(false) =
 | 
			
		||||
                in_transit.compare_exchange_weak(false, true, Ordering::Relaxed, Ordering::Relaxed)
 | 
			
		||||
            {
 | 
			
		||||
                *out_timestamp2.lock().unwrap() = Instant::now();
 | 
			
		||||
                so.write_u32(i).await.unwrap();
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            if !args.csv {
 | 
			
		||||
                println!("Sending ping {i}");
 | 
			
		||||
            }
 | 
			
		||||
            sleep(Duration::from_millis(args.time)).await;
 | 
			
		||||
        }
 | 
			
		||||
        if !args.csv {
 | 
			
		||||
            println!("Done sending pings");
 | 
			
		||||
        }
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    tokio::join!(ping_out, ping_in);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Parser, Debug)]
 | 
			
		||||
struct Args {
 | 
			
		||||
    /// Socket address to ping
 | 
			
		||||
    #[arg(long)]
 | 
			
		||||
    address: String,
 | 
			
		||||
 | 
			
		||||
    /// Amount of pings to send
 | 
			
		||||
    #[arg(short, long)]
 | 
			
		||||
    count: u32,
 | 
			
		||||
 | 
			
		||||
    /// Time between pings
 | 
			
		||||
    #[arg(short, long)]
 | 
			
		||||
    time: u64,
 | 
			
		||||
 | 
			
		||||
    /// CSV mode
 | 
			
		||||
    #[arg(long)]
 | 
			
		||||
    csv: bool,
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										33
									
								
								src/bin/ping_server.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										33
									
								
								src/bin/ping_server.rs
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,33 @@
 | 
			
		||||
use clap::Parser;
 | 
			
		||||
use std::error::Error;
 | 
			
		||||
use std::net::SocketAddrV4;
 | 
			
		||||
use std::str::FromStr;
 | 
			
		||||
use tokio::io::AsyncWriteExt;
 | 
			
		||||
use tokio::{io, net::TcpListener};
 | 
			
		||||
 | 
			
		||||
#[tokio::main]
 | 
			
		||||
async fn main() -> Result<(), Box<dyn Error>> {
 | 
			
		||||
    let args = Args::parse();
 | 
			
		||||
 | 
			
		||||
    let addr = SocketAddrV4::from_str(&args.address).unwrap();
 | 
			
		||||
 | 
			
		||||
    let listener = TcpListener::bind(&addr).await?;
 | 
			
		||||
    println!("Listening on: {addr}");
 | 
			
		||||
 | 
			
		||||
    loop {
 | 
			
		||||
        let (mut socket, _) = listener.accept().await?;
 | 
			
		||||
 | 
			
		||||
        tokio::spawn(async move {
 | 
			
		||||
            let (mut si, mut so) = socket.split();
 | 
			
		||||
            io::copy(&mut si, &mut so).await?;
 | 
			
		||||
            so.shutdown().await
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Parser, Debug)]
 | 
			
		||||
struct Args {
 | 
			
		||||
    /// Socket address to listen on
 | 
			
		||||
    #[arg(long)]
 | 
			
		||||
    address: String,
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										54
									
								
								src/lib.rs
									
									
									
									
									
								
							
							
						
						
									
										54
									
								
								src/lib.rs
									
									
									
									
									
								
							@@ -7,6 +7,7 @@ use std::collections::{HashMap, HashSet};
 | 
			
		||||
use std::net::{IpAddr, SocketAddr};
 | 
			
		||||
use std::str::FromStr;
 | 
			
		||||
use std::sync::{Arc, Mutex, RwLock};
 | 
			
		||||
use std::time;
 | 
			
		||||
use tokio::io::{self, AsyncWriteExt};
 | 
			
		||||
use tokio::net::{TcpListener, TcpStream};
 | 
			
		||||
use tokio::sync::watch::{self, Receiver, Sender};
 | 
			
		||||
@@ -16,6 +17,7 @@ use uuid::Uuid;
 | 
			
		||||
pub struct ProxyCommand {
 | 
			
		||||
    #[serde(flatten)]
 | 
			
		||||
    command: Command,
 | 
			
		||||
    timestamp: Option<u64>,
 | 
			
		||||
    signature: Option<Signature>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -23,8 +25,34 @@ impl ProxyCommand {
 | 
			
		||||
    fn verify_signature(&self, verifying_key: &Option<VerifyingKey>) -> bool {
 | 
			
		||||
        match (verifying_key, &self.signature) {
 | 
			
		||||
            (Some(key), Some(signature)) => {
 | 
			
		||||
                let message = serde_json::to_string(&self.command).unwrap();
 | 
			
		||||
                key.verify(message.as_bytes(), signature).is_ok()
 | 
			
		||||
                let mut message = serde_json::to_string(&self.command).unwrap();
 | 
			
		||||
 | 
			
		||||
                let timestamp = if let Some(timestamp) = self.timestamp {
 | 
			
		||||
                    message.push_str(×tamp.to_string());
 | 
			
		||||
                    time::Duration::from_secs(timestamp)
 | 
			
		||||
                } else {
 | 
			
		||||
                    tracing::debug!("timestamp missing while signature is present");
 | 
			
		||||
                    return false; // timestamp missing with signature present
 | 
			
		||||
                };
 | 
			
		||||
 | 
			
		||||
                if !key.verify(message.as_bytes(), signature).is_ok() {
 | 
			
		||||
                    tracing::debug!("signature does not match message");
 | 
			
		||||
                    return false; // signature doesn't match
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                let now = time::SystemTime::now()
 | 
			
		||||
                    .duration_since(time::UNIX_EPOCH)
 | 
			
		||||
                    .unwrap();
 | 
			
		||||
                if timestamp > (now + time::Duration::from_secs(30)) {
 | 
			
		||||
                    tracing::warn!("command is more than 30s from the future");
 | 
			
		||||
                    false
 | 
			
		||||
                } else if now - timestamp <= time::Duration::from_secs(60) {
 | 
			
		||||
                    // less than a minute old
 | 
			
		||||
                    true
 | 
			
		||||
                } else {
 | 
			
		||||
                    tracing::warn!("command is more than a minute old");
 | 
			
		||||
                    false
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            (Some(_), None) => false,
 | 
			
		||||
            (None, _) => true,
 | 
			
		||||
@@ -310,7 +338,10 @@ async fn transfer(
 | 
			
		||||
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
mod tests {
 | 
			
		||||
    use std::net::{IpAddr, Ipv4Addr};
 | 
			
		||||
    use std::{
 | 
			
		||||
        net::{IpAddr, Ipv4Addr},
 | 
			
		||||
        time,
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    use crate::{Command, ProxyCommand};
 | 
			
		||||
    use p384::{
 | 
			
		||||
@@ -330,10 +361,12 @@ mod tests {
 | 
			
		||||
                destination_ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
 | 
			
		||||
                id: uuid!("67e55044-10b1-426f-9247-bb680e5fe0c8"),
 | 
			
		||||
            },
 | 
			
		||||
            timestamp: Some(8888),
 | 
			
		||||
            signature: Some(signature),
 | 
			
		||||
        };
 | 
			
		||||
        let expected = "{\"create\":{\"incoming_port\":5555,\"destination_port\":6666,\"\
 | 
			
		||||
                        destination_ip\":\"127.0.0.1\",\"id\":\"67e55044-10b1-426f-9247-bb680e5fe0c8\"},\
 | 
			
		||||
                        \"timestamp\":8888,\
 | 
			
		||||
                        \"signature\":\"\
 | 
			
		||||
                            5C912C4B3BFF2ADB49885DCBDB53D6D3041D0632E498CDFF\
 | 
			
		||||
                            2114CD2DCAC936AB0901B47C411E5BB57FE77BEF96044940\
 | 
			
		||||
@@ -351,9 +384,11 @@ mod tests {
 | 
			
		||||
            command: Command::Delete {
 | 
			
		||||
                id: uuid!("67e55044-10b1-426f-9247-bb680e5fe0c8"),
 | 
			
		||||
            },
 | 
			
		||||
            timestamp: Some(987654),
 | 
			
		||||
            signature: Some(signature),
 | 
			
		||||
        };
 | 
			
		||||
        let expected = "{\"delete\":{\"id\":\"67e55044-10b1-426f-9247-bb680e5fe0c8\"},\
 | 
			
		||||
                        \"timestamp\":987654,\
 | 
			
		||||
                        \"signature\":\"\
 | 
			
		||||
                            5C912C4B3BFF2ADB49885DCBDB53D6D3041D0632E498CDFF\
 | 
			
		||||
                            2114CD2DCAC936AB0901B47C411E5BB57FE77BEF96044940\
 | 
			
		||||
@@ -365,6 +400,11 @@ mod tests {
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn verify_signature() {
 | 
			
		||||
        let subscriber = tracing_subscriber::FmtSubscriber::builder()
 | 
			
		||||
            .with_max_level(tracing::Level::TRACE)
 | 
			
		||||
            .finish();
 | 
			
		||||
        tracing::subscriber::set_global_default(subscriber).unwrap();
 | 
			
		||||
 | 
			
		||||
        let command = Command::Create {
 | 
			
		||||
            incoming_port: 4567,
 | 
			
		||||
            destination_port: 7654,
 | 
			
		||||
@@ -374,12 +414,18 @@ mod tests {
 | 
			
		||||
 | 
			
		||||
        // Create signed message
 | 
			
		||||
        let signing_key = SigningKey::random(&mut OsRng);
 | 
			
		||||
        let message = serde_json::to_string(&command).unwrap();
 | 
			
		||||
        let timestamp = time::SystemTime::now()
 | 
			
		||||
            .duration_since(time::UNIX_EPOCH)
 | 
			
		||||
            .unwrap()
 | 
			
		||||
            .as_secs();
 | 
			
		||||
        let mut message = serde_json::to_string(&command).unwrap();
 | 
			
		||||
        message.push_str(×tamp.to_string());
 | 
			
		||||
        let signature: Signature = signing_key.sign(message.as_bytes());
 | 
			
		||||
        let bytes = signature.to_bytes();
 | 
			
		||||
        assert_eq!(bytes.len(), 96);
 | 
			
		||||
        let proxy_command = ProxyCommand {
 | 
			
		||||
            command,
 | 
			
		||||
            timestamp: Some(timestamp),
 | 
			
		||||
            signature: Some(signature),
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user