From 014c6df8f5b832b64c38723c8dee5d53876c9bc4 Mon Sep 17 00:00:00 2001 From: Erik Date: Mon, 13 Mar 2023 14:48:29 +0100 Subject: [PATCH 1/4] Create Rust project and add proxy functionality --- .gitignore | 5 ++ Cargo.toml | 16 ++++++ src/lib.rs | 149 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 32 +++++++++++ 4 files changed, 202 insertions(+) create mode 100644 Cargo.toml create mode 100644 src/lib.rs create mode 100644 src/main.rs diff --git a/.gitignore b/.gitignore index 088ba6b..22d3516 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,8 @@ Cargo.lock # These are backup files generated by rustfmt **/*.rs.bk + + +# Added by cargo + +/target diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..bf57738 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "proxima-centauri" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0.69" +axum = { version = "0.6.11", features = ["json"] } +serde = { version = "1.0.155", features = ["derive"] } +serde_json = "1.0.94" +tokio = { version = "1.26.0", features = ["full"] } +tracing = "0.1.37" +tracing-subscriber = "0.3.16" +uuid = { version = "1.3.0", features = ["v4", "serde"] } diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..09febde --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,149 @@ +use axum::{http::StatusCode, Json}; +use serde::{Deserialize, Serialize}; +use std::net::{IpAddr, SocketAddr}; +use tokio::io::{self, AsyncWriteExt}; +use tokio::net::{TcpListener, TcpStream}; +use uuid::Uuid; + +#[derive(Deserialize, Serialize, Debug)] +pub struct ProxyCommand { + command: Command, + signature: [u8; 32], +} + +#[derive(Deserialize, Serialize, Debug)] +enum Command { + New { + incoming_port: u16, + destination_port: u16, + destination_ip: IpAddr, + id: Uuid, + }, + Modify { + destionation_ip: IpAddr, + id: Uuid, + }, + Delete { + id: Uuid, + }, +} + +#[derive(Serialize)] +pub struct ProxyResponse { + message: String, +} + +pub async fn root() -> &'static str { + "Hello, World!" +} + +pub async fn process_command( + Json(payload): Json, +) -> (StatusCode, Json) { + tracing::error!("Received payload: {:?}", payload); + // TODO: verify signature + match payload.command { + Command::New { + incoming_port, + destination_port, + destination_ip, + id, + } => { + // TODO: add id to global proxy map + add_proxy( + incoming_port, + SocketAddr::new(destination_ip, destination_port), + ) + .await + .unwrap(); // TODO: error propagation?? + } + Command::Modify { + destionation_ip, + id, + } => todo!(), + Command::Delete { id } => todo!(), + } + ( + StatusCode::CREATED, + Json(ProxyResponse { + message: "Success".to_string(), + }), + ) +} + +async fn add_proxy(in_port: u16, destination: SocketAddr) -> anyhow::Result<()> { + let listener = TcpListener::bind(("127.0.0.1", in_port)).await?; + + tracing::info!("proxying port {in_port} to {destination}"); + + tokio::spawn(proxy(listener, destination)); + Ok(()) +} + +async fn proxy(listener: TcpListener, destination: SocketAddr) { + while let Ok((inbound, _)) = listener.accept().await { + let transfer = transfer(inbound, destination); + + tokio::spawn(transfer); + } +} + +async fn transfer(mut inbound: TcpStream, destination: SocketAddr) -> anyhow::Result<()> { + let mut outbound = TcpStream::connect(destination).await?; + + let (mut ri, mut wi) = inbound.split(); + let (mut ro, mut wo) = outbound.split(); + + let client_to_server = async { + io::copy(&mut ri, &mut wo).await?; + wo.shutdown().await + }; + + let server_to_client = async { + io::copy(&mut ro, &mut wi).await?; + wi.shutdown().await + }; + + tokio::try_join!(client_to_server, server_to_client)?; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use std::net::Ipv4Addr; + + use crate::{Command, ProxyCommand}; + use serde::{Deserialize, Serialize}; + use uuid::uuid; + + #[test] + fn serialize_proxy_command_new() { + let proxy_command = ProxyCommand { + command: Command::New { + incoming_port: 5555, + destination_port: 6666, + destination_ip: std::net::IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + id: uuid!("67e55044-10b1-426f-9247-bb680e5fe0c8"), + }, + signature: [0u8; 32], + }; + let expected = "{\"command\":{\"New\":{\"incoming_port\":5555,\"destination_port\":6666,\"\ + destination_ip\":\"127.0.0.1\",\"id\":\"67e55044-10b1-426f-9247-bb680e5fe0c8\"}},\ + \"signature\":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]}"; + assert_eq!(serde_json::to_string(&proxy_command).unwrap(), expected); + } + + #[test] + fn serialize_proxy_command_delete() { + let proxy_command = ProxyCommand { + command: Command::Delete { + id: uuid!("67e55044-10b1-426f-9247-bb680e5fe0c8"), + }, + signature: [0u8; 32], + }; + let expected = "{\"command\":{\"Delete\":{\"id\":\"67e55044-10b1-426f-9247-bb680e5fe0c8\"}},\ + \"signature\":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]}"; + assert_eq!(serde_json::to_string(&proxy_command).unwrap(), expected); + } +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..132cf2f --- /dev/null +++ b/src/main.rs @@ -0,0 +1,32 @@ +use axum::{ + routing::{get, post}, + Router, +}; +use proxima_centauri::{process_command, root}; +use std::net::SocketAddr; +use tracing::Level; + +#[tokio::main] +async fn main() { + // initialize tracing + let subscriber = tracing_subscriber::FmtSubscriber::builder() + .with_max_level(Level::TRACE) + .finish(); + + tracing::subscriber::set_global_default(subscriber).unwrap(); + + // build our application with a route + let app = Router::new() + // `GET /` goes to `root` + .route("/", get(root)) + // `POST /command` goes to `process_command` + .route("/command", post(process_command)); + + // run our app with hyper + let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); + tracing::debug!("listening on {}", addr); + axum::Server::bind(&addr) + .serve(app.into_make_service()) + .await + .unwrap(); +} From dd2f26fe2be4fdb2a65a932f5901878c913c64a1 Mon Sep 17 00:00:00 2001 From: Erik Date: Tue, 14 Mar 2023 15:19:34 +0100 Subject: [PATCH 2/4] Add new and delete curl scripts --- delete.sh | 9 +++++++++ new.sh | 12 ++++++++++++ 2 files changed, 21 insertions(+) create mode 100755 delete.sh create mode 100755 new.sh diff --git a/delete.sh b/delete.sh new file mode 100755 index 0000000..c7907e4 --- /dev/null +++ b/delete.sh @@ -0,0 +1,9 @@ +curl --header "Content-Type: application/json" \ + --data '{"command": { + "Delete": { + "id": "67e55044-10b1-426f-9247-bb680e5fe0c8" + } + }, + "signature": [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + }' \ + http://localhost:3000/command diff --git a/new.sh b/new.sh new file mode 100755 index 0000000..4f9f241 --- /dev/null +++ b/new.sh @@ -0,0 +1,12 @@ +curl --header "Content-Type: application/json" \ + --data '{"command": { + "New": { + "incoming_port": 5555, + "destination_port": 8080, + "destination_ip": "127.0.0.1", + "id": "67e55044-10b1-426f-9247-bb680e5fe0c8" + } + }, + "signature": [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + }' \ + http://localhost:3000/command From b9d59761bfe01c9fdca8f4796431040f94186ef3 Mon Sep 17 00:00:00 2001 From: Erik Date: Tue, 14 Mar 2023 17:38:23 +0100 Subject: [PATCH 3/4] Add modify curl script --- modify.sh | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100755 modify.sh diff --git a/modify.sh b/modify.sh new file mode 100755 index 0000000..6968315 --- /dev/null +++ b/modify.sh @@ -0,0 +1,11 @@ +curl --header "Content-Type: application/json" \ + --data '{"command": { + "Modify": { + "destination_port": 8888, + "destination_ip": "127.0.0.1", + "id": "67e55044-10b1-426f-9247-bb680e5fe0c8" + } + }, + "signature": [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + }' \ + http://localhost:3000/command From fcfbd4cbecd64128a52880f6a979b8389b238e70 Mon Sep 17 00:00:00 2001 From: Erik Date: Tue, 14 Mar 2023 17:38:49 +0100 Subject: [PATCH 4/4] Add proxy control --- src/lib.rs | 118 ++++++++++++++++++++++++++++++++++++++++++---------- src/main.rs | 8 ++-- 2 files changed, 101 insertions(+), 25 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 09febde..ead51a3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,12 @@ +use axum::extract::State; use axum::{http::StatusCode, Json}; use serde::{Deserialize, Serialize}; -use std::net::{IpAddr, SocketAddr}; +use std::collections::HashMap; +use std::net::{Ipv4Addr, SocketAddrV4}; +use std::sync::{Arc, Mutex}; use tokio::io::{self, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::watch::{self, Receiver, Sender}; use uuid::Uuid; #[derive(Deserialize, Serialize, Debug)] @@ -16,11 +20,12 @@ enum Command { New { incoming_port: u16, destination_port: u16, - destination_ip: IpAddr, + destination_ip: Ipv4Addr, id: Uuid, }, Modify { - destionation_ip: IpAddr, + destination_port: u16, + destination_ip: Ipv4Addr, id: Uuid, }, Delete { @@ -33,11 +38,31 @@ pub struct ProxyResponse { message: String, } +#[derive(Debug)] +pub struct GlobalState { + proxies: Mutex>, +} + +impl GlobalState { + pub fn new() -> Self { + Self { + proxies: Mutex::new(HashMap::new()), + } + } +} + +#[derive(Debug)] +struct ProxyState { + destination: SocketAddrV4, + control: Sender, +} + pub async fn root() -> &'static str { "Hello, World!" } pub async fn process_command( + State(state): State>, Json(payload): Json, ) -> (StatusCode, Json) { tracing::error!("Received payload: {:?}", payload); @@ -49,19 +74,38 @@ pub async fn process_command( destination_ip, id, } => { - // TODO: add id to global proxy map - add_proxy( - incoming_port, - SocketAddr::new(destination_ip, destination_port), - ) - .await - .unwrap(); // TODO: error propagation?? + let addr = SocketAddrV4::new(destination_ip, destination_port); + let (tx, rx) = watch::channel(ProxyControlMessage::Open { destination: addr }); + state.proxies.lock().unwrap().insert( + id, + ProxyState { + destination: addr, + control: tx, + }, + ); + add_proxy(incoming_port, rx).await.unwrap(); // TODO: error propagation?? } Command::Modify { - destionation_ip, + destination_port, + destination_ip, id, - } => todo!(), - Command::Delete { id } => todo!(), + } => { + if let Some(proxy) = state.proxies.lock().unwrap().get_mut(&id) { + proxy.destination.set_port(destination_port); + proxy.destination.set_ip(destination_ip); + proxy + .control + .send(ProxyControlMessage::Open { + destination: proxy.destination, + }) + .unwrap(); + } + } + Command::Delete { id } => { + if let Some(proxy) = state.proxies.lock().unwrap().get_mut(&id) { + proxy.control.send(ProxyControlMessage::Close).unwrap(); + } + } } ( StatusCode::CREATED, @@ -71,24 +115,54 @@ pub async fn process_command( ) } -async fn add_proxy(in_port: u16, destination: SocketAddr) -> anyhow::Result<()> { +#[derive(Debug)] +enum ProxyControlMessage { + Open { destination: SocketAddrV4 }, // Reroute { new: SocketAddr }, + Close, +} + +async fn add_proxy(in_port: u16, control: Receiver) -> anyhow::Result<()> { let listener = TcpListener::bind(("127.0.0.1", in_port)).await?; - tracing::info!("proxying port {in_port} to {destination}"); + tracing::info!("proxying port {in_port} to {:?}", *control.borrow()); - tokio::spawn(proxy(listener, destination)); + tokio::spawn(proxy(listener, control)); Ok(()) } -async fn proxy(listener: TcpListener, destination: SocketAddr) { - while let Ok((inbound, _)) = listener.accept().await { - let transfer = transfer(inbound, destination); +async fn proxy(listener: TcpListener, mut control: Receiver) { + let mut current_destination = + if let ProxyControlMessage::Open { destination } = *control.borrow() { + Some(destination) + } else { + None + }; + loop { + tokio::select! { + l = listener.accept()=> { + if let Ok((inbound, _)) = l { + let transfer = transfer(inbound, current_destination.unwrap()); - tokio::spawn(transfer); + tokio::spawn(transfer); + } + } + _ = control.changed() => { + match *control.borrow() { + ProxyControlMessage::Open { destination } => { + tracing::info!("destination for proxy port {} changed to {}", listener.local_addr().unwrap(), destination); + current_destination=Some(destination); + }, + ProxyControlMessage::Close => { + tracing::info!("destination for proxy port {} closed", listener.local_addr().unwrap()); + return; + }, + } + } + } } } -async fn transfer(mut inbound: TcpStream, destination: SocketAddr) -> anyhow::Result<()> { +async fn transfer(mut inbound: TcpStream, destination: SocketAddrV4) -> anyhow::Result<()> { let mut outbound = TcpStream::connect(destination).await?; let (mut ri, mut wi) = inbound.split(); @@ -123,7 +197,7 @@ mod tests { command: Command::New { incoming_port: 5555, destination_port: 6666, - destination_ip: std::net::IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + destination_ip: Ipv4Addr::new(127, 0, 0, 1), id: uuid!("67e55044-10b1-426f-9247-bb680e5fe0c8"), }, signature: [0u8; 32], diff --git a/src/main.rs b/src/main.rs index 132cf2f..9bb451f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,8 +2,8 @@ use axum::{ routing::{get, post}, Router, }; -use proxima_centauri::{process_command, root}; -use std::net::SocketAddr; +use proxima_centauri::{process_command, root, GlobalState}; +use std::{net::SocketAddr, sync::Arc}; use tracing::Level; #[tokio::main] @@ -15,12 +15,14 @@ async fn main() { tracing::subscriber::set_global_default(subscriber).unwrap(); + let shared_state = Arc::new(GlobalState::new()); // build our application with a route let app = Router::new() // `GET /` goes to `root` .route("/", get(root)) // `POST /command` goes to `process_command` - .route("/command", post(process_command)); + .route("/command", post(process_command)) + .with_state(shared_state); // run our app with hyper let addr = SocketAddr::from(([127, 0, 0, 1], 3000));