diff --git a/.gitignore b/.gitignore index 088ba6b..22d3516 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,8 @@ Cargo.lock # These are backup files generated by rustfmt **/*.rs.bk + + +# Added by cargo + +/target diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..bf57738 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "proxima-centauri" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0.69" +axum = { version = "0.6.11", features = ["json"] } +serde = { version = "1.0.155", features = ["derive"] } +serde_json = "1.0.94" +tokio = { version = "1.26.0", features = ["full"] } +tracing = "0.1.37" +tracing-subscriber = "0.3.16" +uuid = { version = "1.3.0", features = ["v4", "serde"] } diff --git a/delete.sh b/delete.sh new file mode 100755 index 0000000..c7907e4 --- /dev/null +++ b/delete.sh @@ -0,0 +1,9 @@ +curl --header "Content-Type: application/json" \ + --data '{"command": { + "Delete": { + "id": "67e55044-10b1-426f-9247-bb680e5fe0c8" + } + }, + "signature": [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + }' \ + http://localhost:3000/command diff --git a/modify.sh b/modify.sh new file mode 100755 index 0000000..6968315 --- /dev/null +++ b/modify.sh @@ -0,0 +1,11 @@ +curl --header "Content-Type: application/json" \ + --data '{"command": { + "Modify": { + "destination_port": 8888, + "destination_ip": "127.0.0.1", + "id": "67e55044-10b1-426f-9247-bb680e5fe0c8" + } + }, + "signature": [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + }' \ + http://localhost:3000/command diff --git a/new.sh b/new.sh new file mode 100755 index 0000000..4f9f241 --- /dev/null +++ b/new.sh @@ -0,0 +1,12 @@ +curl --header "Content-Type: application/json" \ + --data '{"command": { + "New": { + "incoming_port": 5555, + "destination_port": 8080, + "destination_ip": "127.0.0.1", + "id": "67e55044-10b1-426f-9247-bb680e5fe0c8" + } + }, + "signature": [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + }' \ + http://localhost:3000/command diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..ead51a3 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,223 @@ +use axum::extract::State; +use axum::{http::StatusCode, Json}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::net::{Ipv4Addr, SocketAddrV4}; +use std::sync::{Arc, Mutex}; +use tokio::io::{self, AsyncWriteExt}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::watch::{self, Receiver, Sender}; +use uuid::Uuid; + +#[derive(Deserialize, Serialize, Debug)] +pub struct ProxyCommand { + command: Command, + signature: [u8; 32], +} + +#[derive(Deserialize, Serialize, Debug)] +enum Command { + New { + incoming_port: u16, + destination_port: u16, + destination_ip: Ipv4Addr, + id: Uuid, + }, + Modify { + destination_port: u16, + destination_ip: Ipv4Addr, + id: Uuid, + }, + Delete { + id: Uuid, + }, +} + +#[derive(Serialize)] +pub struct ProxyResponse { + message: String, +} + +#[derive(Debug)] +pub struct GlobalState { + proxies: Mutex>, +} + +impl GlobalState { + pub fn new() -> Self { + Self { + proxies: Mutex::new(HashMap::new()), + } + } +} + +#[derive(Debug)] +struct ProxyState { + destination: SocketAddrV4, + control: Sender, +} + +pub async fn root() -> &'static str { + "Hello, World!" +} + +pub async fn process_command( + State(state): State>, + Json(payload): Json, +) -> (StatusCode, Json) { + tracing::error!("Received payload: {:?}", payload); + // TODO: verify signature + match payload.command { + Command::New { + incoming_port, + destination_port, + destination_ip, + id, + } => { + let addr = SocketAddrV4::new(destination_ip, destination_port); + let (tx, rx) = watch::channel(ProxyControlMessage::Open { destination: addr }); + state.proxies.lock().unwrap().insert( + id, + ProxyState { + destination: addr, + control: tx, + }, + ); + add_proxy(incoming_port, rx).await.unwrap(); // TODO: error propagation?? + } + Command::Modify { + destination_port, + destination_ip, + id, + } => { + if let Some(proxy) = state.proxies.lock().unwrap().get_mut(&id) { + proxy.destination.set_port(destination_port); + proxy.destination.set_ip(destination_ip); + proxy + .control + .send(ProxyControlMessage::Open { + destination: proxy.destination, + }) + .unwrap(); + } + } + Command::Delete { id } => { + if let Some(proxy) = state.proxies.lock().unwrap().get_mut(&id) { + proxy.control.send(ProxyControlMessage::Close).unwrap(); + } + } + } + ( + StatusCode::CREATED, + Json(ProxyResponse { + message: "Success".to_string(), + }), + ) +} + +#[derive(Debug)] +enum ProxyControlMessage { + Open { destination: SocketAddrV4 }, // Reroute { new: SocketAddr }, + Close, +} + +async fn add_proxy(in_port: u16, control: Receiver) -> anyhow::Result<()> { + let listener = TcpListener::bind(("127.0.0.1", in_port)).await?; + + tracing::info!("proxying port {in_port} to {:?}", *control.borrow()); + + tokio::spawn(proxy(listener, control)); + Ok(()) +} + +async fn proxy(listener: TcpListener, mut control: Receiver) { + let mut current_destination = + if let ProxyControlMessage::Open { destination } = *control.borrow() { + Some(destination) + } else { + None + }; + loop { + tokio::select! { + l = listener.accept()=> { + if let Ok((inbound, _)) = l { + let transfer = transfer(inbound, current_destination.unwrap()); + + tokio::spawn(transfer); + } + } + _ = control.changed() => { + match *control.borrow() { + ProxyControlMessage::Open { destination } => { + tracing::info!("destination for proxy port {} changed to {}", listener.local_addr().unwrap(), destination); + current_destination=Some(destination); + }, + ProxyControlMessage::Close => { + tracing::info!("destination for proxy port {} closed", listener.local_addr().unwrap()); + return; + }, + } + } + } + } +} + +async fn transfer(mut inbound: TcpStream, destination: SocketAddrV4) -> anyhow::Result<()> { + let mut outbound = TcpStream::connect(destination).await?; + + let (mut ri, mut wi) = inbound.split(); + let (mut ro, mut wo) = outbound.split(); + + let client_to_server = async { + io::copy(&mut ri, &mut wo).await?; + wo.shutdown().await + }; + + let server_to_client = async { + io::copy(&mut ro, &mut wi).await?; + wi.shutdown().await + }; + + tokio::try_join!(client_to_server, server_to_client)?; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use std::net::Ipv4Addr; + + use crate::{Command, ProxyCommand}; + use serde::{Deserialize, Serialize}; + use uuid::uuid; + + #[test] + fn serialize_proxy_command_new() { + let proxy_command = ProxyCommand { + command: Command::New { + incoming_port: 5555, + destination_port: 6666, + destination_ip: Ipv4Addr::new(127, 0, 0, 1), + id: uuid!("67e55044-10b1-426f-9247-bb680e5fe0c8"), + }, + signature: [0u8; 32], + }; + let expected = "{\"command\":{\"New\":{\"incoming_port\":5555,\"destination_port\":6666,\"\ + destination_ip\":\"127.0.0.1\",\"id\":\"67e55044-10b1-426f-9247-bb680e5fe0c8\"}},\ + \"signature\":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]}"; + assert_eq!(serde_json::to_string(&proxy_command).unwrap(), expected); + } + + #[test] + fn serialize_proxy_command_delete() { + let proxy_command = ProxyCommand { + command: Command::Delete { + id: uuid!("67e55044-10b1-426f-9247-bb680e5fe0c8"), + }, + signature: [0u8; 32], + }; + let expected = "{\"command\":{\"Delete\":{\"id\":\"67e55044-10b1-426f-9247-bb680e5fe0c8\"}},\ + \"signature\":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]}"; + assert_eq!(serde_json::to_string(&proxy_command).unwrap(), expected); + } +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..9bb451f --- /dev/null +++ b/src/main.rs @@ -0,0 +1,34 @@ +use axum::{ + routing::{get, post}, + Router, +}; +use proxima_centauri::{process_command, root, GlobalState}; +use std::{net::SocketAddr, sync::Arc}; +use tracing::Level; + +#[tokio::main] +async fn main() { + // initialize tracing + let subscriber = tracing_subscriber::FmtSubscriber::builder() + .with_max_level(Level::TRACE) + .finish(); + + tracing::subscriber::set_global_default(subscriber).unwrap(); + + let shared_state = Arc::new(GlobalState::new()); + // build our application with a route + let app = Router::new() + // `GET /` goes to `root` + .route("/", get(root)) + // `POST /command` goes to `process_command` + .route("/command", post(process_command)) + .with_state(shared_state); + + // run our app with hyper + let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); + tracing::debug!("listening on {}", addr); + axum::Server::bind(&addr) + .serve(app.into_make_service()) + .await + .unwrap(); +}