Add redirection for existing connections

This commit is contained in:
Erik
2023-03-15 15:55:02 +01:00
parent 33c8d12ac4
commit 49a35cc8fe
2 changed files with 135 additions and 24 deletions

View File

@ -65,7 +65,7 @@ pub async fn process_command(
State(state): State<Arc<GlobalState>>,
Json(payload): Json<ProxyCommand>,
) -> (StatusCode, Json<ProxyResponse>) {
tracing::error!("Received payload: {:?}", payload);
tracing::info!("Received payload: {:?}", payload);
// TODO: verify signature
match payload.command {
Command::New {
@ -131,17 +131,11 @@ async fn add_proxy(in_port: u16, control: Receiver<ProxyControlMessage>) -> anyh
}
async fn proxy(listener: TcpListener, mut control: Receiver<ProxyControlMessage>) {
let mut current_destination =
if let ProxyControlMessage::Open { destination } = *control.borrow() {
Some(destination)
} else {
None
};
loop {
tokio::select! {
l = listener.accept()=> {
if let Ok((inbound, _)) = l {
let transfer = transfer(inbound, current_destination.unwrap());
let transfer = transfer(inbound, control.clone());
tokio::spawn(transfer);
}
@ -150,7 +144,6 @@ async fn proxy(listener: TcpListener, mut control: Receiver<ProxyControlMessage>
match *control.borrow() {
ProxyControlMessage::Open { destination } => {
tracing::info!("destination for proxy port {} changed to {}", listener.local_addr().unwrap(), destination);
current_destination=Some(destination);
},
ProxyControlMessage::Close => {
tracing::info!("destination for proxy port {} closed", listener.local_addr().unwrap());
@ -162,25 +155,68 @@ async fn proxy(listener: TcpListener, mut control: Receiver<ProxyControlMessage>
}
}
async fn transfer(mut inbound: TcpStream, destination: SocketAddrV4) -> anyhow::Result<()> {
let mut outbound = TcpStream::connect(destination).await?;
async fn transfer(
mut inbound: TcpStream,
mut control: Receiver<ProxyControlMessage>,
) -> anyhow::Result<()> {
loop {
let current_destination =
if let ProxyControlMessage::Open { destination } = *control.borrow() {
Some(destination)
} else {
break Ok(());
};
let mut outbound = TcpStream::connect(current_destination.unwrap()).await?;
let (mut ri, mut wi) = inbound.split();
let (mut ro, mut wo) = outbound.split();
let (mut ri, mut wi) = inbound.split();
let (mut ro, mut wo) = outbound.split();
let client_to_server = async {
io::copy(&mut ri, &mut wo).await?;
wo.shutdown().await
};
let client_to_server = async {
io::copy(&mut ri, &mut wo).await?;
wo.shutdown().await
};
let server_to_client = async {
io::copy(&mut ro, &mut wi).await?;
wi.shutdown().await
};
let server_to_client = async {
io::copy(&mut ro, &mut wi).await?;
wi.shutdown().await
};
tokio::try_join!(client_to_server, server_to_client)?;
// Select between the copy tasks and watch channel
tokio::select! {
// Join the two copy streams and wait for the connection to clone
result = async move { tokio::join!(client_to_server, server_to_client) } => {
match result {
(Ok(_), Ok(_)) => {
break Ok(());
}
(r1, r2) => {
if r1.is_err() {
tracing::error!("error closing client->server of {:?}: {:?}", inbound, &r1);
}
if r2.is_err() {
tracing::error!("error closing server->client of {:?}: {:?}", inbound, &r2);
}
r1?;
r2?;
},
}
}
_ = control.changed() => {
match *control.borrow() {
ProxyControlMessage::Open { destination } => {
eprintln!("Switching to new destination: {destination}");
// Disconnect the current outbound connection and restart the loop
drop(outbound);
continue;
},
ProxyControlMessage::Close => {
break Ok(());
},
}
Ok(())
}
}
}
}
#[cfg(test)]
@ -188,7 +224,6 @@ mod tests {
use std::net::Ipv4Addr;
use crate::{Command, ProxyCommand};
use serde::{Deserialize, Serialize};
use uuid::uuid;
#[test]