Compare commits

..

13 Commits

Author SHA1 Message Date
Erik van Bennekum
7d61fd1686
Merge pull request #13 from GreenPenguino/command-line-argument
Add command-line argument for control api endpoint
2023-06-06 11:48:32 +02:00
Erik
1cda80c041 Add command-line argument for control api endpoint 2023-06-06 11:48:03 +02:00
Erik van Bennekum
d8d40e5ba3
Merge pull request #12 from GreenPenguino/timestamp
Add timestamp
2023-06-06 11:45:38 +02:00
Erik
cc0d2fb3f0 Accept commands 30s in the future 2023-06-06 11:45:23 +02:00
Erik
0b5ab4872c Add time option 2023-05-31 17:33:54 +02:00
Erik
eb35683748 Add TCP ping utility 2023-05-31 16:33:32 +02:00
Erik
e2dae53701 Tunnel listens on all interfaces 2023-05-02 14:14:35 +02:00
Erik
7a40908543 Add timestamp 2023-04-26 15:48:31 +02:00
Erik van Bennekum
ddc838f2f3
Merge pull request #11 from GreenPenguino/status-command
Add status command
2023-04-26 13:29:14 +02:00
Erik
683de49025 Add status command 2023-04-26 13:03:24 +02:00
Felix Schulze
6f7b2239f3
Merge pull request #8 from GreenPenguino/checks-on-command
Add checks on incoming command
2023-04-25 16:05:39 +02:00
Felix Schulze
a4d69b4a6d
Merge pull request #9 from GreenPenguino/control-port
change control port to 14000
2023-04-25 16:00:45 +02:00
Erik
1069f46a40 change control port to 14000 2023-04-25 15:49:07 +02:00
9 changed files with 249 additions and 41 deletions

View File

@ -5,9 +5,24 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
default-run = "proxy"
[[bin]]
name = "proxy"
path = "src/bin/proxy.rs"
[[bin]]
name = "ping-server"
path = "src/bin/ping_server.rs"
[[bin]]
name = "ping-client"
path = "src/bin/ping_client.rs"
[dependencies]
anyhow = "1.0.69"
axum = { version = "0.6.11", features = ["json"] }
clap = { version = "4.3.0", features = ["derive"] }
p384 = { version = "0.13.0", features = ["ecdsa", "serde"] }
serde = { version = "1.0.155", features = ["derive"] }
serde_json = "1.0.94"

View File

@ -7,4 +7,4 @@ curl --header "Content-Type: application/json" \
"id": "67e55044-10b1-426f-9247-bb680e5fe0c8"
}
}' \
http://localhost:3000/command
http://localhost:14000/command

View File

@ -6,4 +6,4 @@ curl --header "Content-Type: application/json" \
"id": "67e55044-10b1-426f-9247-bb680e5fe0c8"
}
}' \
http://localhost:3000/command
http://localhost:14000/command

View File

@ -6,4 +6,4 @@ curl --header "Content-Type: application/json" \
"id": "67e55044-10b1-426f-9247-bb680e5fe0c8"
}
}' \
http://localhost:3000/command
http://localhost:14000/command

100
src/bin/ping_client.rs Normal file
View File

@ -0,0 +1,100 @@
use clap::Parser;
use std::net::SocketAddrV4;
use std::str::FromStr;
use std::sync::{
atomic::{AtomicBool, AtomicU32, Ordering},
Arc, Mutex,
};
use std::time::{Duration, Instant};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
time::sleep,
};
#[tokio::main]
async fn main() {
let args = Args::parse();
let in_transit = Arc::new(AtomicBool::new(false));
let in_transit2 = in_transit.clone();
let out_timestamp = Arc::new(Mutex::new(Instant::now()));
let out_timestamp2 = out_timestamp.clone();
let count = AtomicU32::new(args.count);
let addr = SocketAddrV4::from_str(&args.address).unwrap();
let stream = TcpStream::connect(addr).await.unwrap();
if !args.csv {
println!("Ping {addr}");
}
let (mut si, mut so) = stream.into_split();
let ping_in = async move {
let mut read_buf = [0; 1024];
while count.load(Ordering::Relaxed) > 0 {
let bytes = si.read(&mut read_buf).await.unwrap();
if bytes > 0 {
let in_timestamp = Instant::now();
// println!("Received {bytes} bytes");
let duration = in_timestamp.duration_since(*out_timestamp.lock().unwrap());
let i = u32::from_be_bytes(read_buf[0..bytes].try_into().unwrap());
let rtt = duration.as_micros();
if !args.csv {
println!("Ping {i} arrived with RTT of {rtt}us");
} else {
println!("{rtt},");
}
// let old = count.fetch_sub(1, Ordering::Relaxed);
// println!("Old count value: {old}");
count.fetch_sub(1, Ordering::Relaxed);
in_transit2.store(false, Ordering::Relaxed);
}
}
if !args.csv {
println!("Done receiving");
}
};
let ping_out = async move {
for i in 1..=args.count {
if let Ok(false) =
in_transit.compare_exchange_weak(false, true, Ordering::Relaxed, Ordering::Relaxed)
{
*out_timestamp2.lock().unwrap() = Instant::now();
so.write_u32(i).await.unwrap();
}
if !args.csv {
println!("Sending ping {i}");
}
sleep(Duration::from_millis(args.time)).await;
}
if !args.csv {
println!("Done sending pings");
}
};
tokio::join!(ping_out, ping_in);
}
#[derive(Parser, Debug)]
struct Args {
/// Socket address to ping
#[arg(long)]
address: String,
/// Amount of pings to send
#[arg(short, long)]
count: u32,
/// Time between pings
#[arg(short, long)]
time: u64,
/// CSV mode
#[arg(long)]
csv: bool,
}

33
src/bin/ping_server.rs Normal file
View File

@ -0,0 +1,33 @@
use clap::Parser;
use std::error::Error;
use std::net::SocketAddrV4;
use std::str::FromStr;
use tokio::io::AsyncWriteExt;
use tokio::{io, net::TcpListener};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let args = Args::parse();
let addr = SocketAddrV4::from_str(&args.address).unwrap();
let listener = TcpListener::bind(&addr).await?;
println!("Listening on: {addr}");
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
let (mut si, mut so) = socket.split();
io::copy(&mut si, &mut so).await?;
so.shutdown().await
});
}
}
#[derive(Parser, Debug)]
struct Args {
/// Socket address to listen on
#[arg(long)]
address: String,
}

View File

@ -3,11 +3,17 @@ use axum::{
Router,
};
use proxima_centauri::{process_command, root, GlobalState};
use std::{net::SocketAddr, sync::Arc};
use std::sync::Arc;
use tracing::Level;
#[tokio::main]
async fn main() {
let addr: std::net::SocketAddr = std::env::args()
.nth(1)
.unwrap_or_else(|| "127.0.0.1:14000".to_string())
.parse()
.unwrap();
// initialize tracing
let subscriber = tracing_subscriber::FmtSubscriber::builder()
.with_max_level(Level::INFO)
@ -27,7 +33,6 @@ async fn main() {
.with_state(shared_state);
// run our app with hyper
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
tracing::debug!("listening on {}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())

View File

@ -7,6 +7,7 @@ use std::collections::{HashMap, HashSet};
use std::net::{IpAddr, SocketAddr};
use std::str::FromStr;
use std::sync::{Arc, Mutex, RwLock};
use std::time;
use tokio::io::{self, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::watch::{self, Receiver, Sender};
@ -16,6 +17,7 @@ use uuid::Uuid;
pub struct ProxyCommand {
#[serde(flatten)]
command: Command,
timestamp: Option<u64>,
signature: Option<Signature>,
}
@ -23,8 +25,34 @@ impl ProxyCommand {
fn verify_signature(&self, verifying_key: &Option<VerifyingKey>) -> bool {
match (verifying_key, &self.signature) {
(Some(key), Some(signature)) => {
let message = serde_json::to_string(&self.command).unwrap();
key.verify(message.as_bytes(), signature).is_ok()
let mut message = serde_json::to_string(&self.command).unwrap();
let timestamp = if let Some(timestamp) = self.timestamp {
message.push_str(&timestamp.to_string());
time::Duration::from_secs(timestamp)
} else {
tracing::debug!("timestamp missing while signature is present");
return false; // timestamp missing with signature present
};
if !key.verify(message.as_bytes(), signature).is_ok() {
tracing::debug!("signature does not match message");
return false; // signature doesn't match
}
let now = time::SystemTime::now()
.duration_since(time::UNIX_EPOCH)
.unwrap();
if timestamp > (now + time::Duration::from_secs(30)) {
tracing::warn!("command is more than 30s from the future");
false
} else if now - timestamp <= time::Duration::from_secs(60) {
// less than a minute old
true
} else {
tracing::warn!("command is more than a minute old");
false
}
}
(Some(_), None) => false,
(None, _) => true,
@ -49,11 +77,15 @@ enum Command {
Delete {
id: Uuid,
},
Status,
}
#[derive(Serialize)]
pub struct ProxyResponse {
message: String,
pub enum ProxyResponse {
Message(String),
Status {
tunnels: HashMap<Uuid, (u16, SocketAddr)>,
},
}
#[derive(Debug)]
@ -68,9 +100,7 @@ impl GlobalState {
Self {
proxies: Mutex::new(HashMap::new()),
ports: RwLock::new(HashSet::new()),
verifying_key: verifying_key
.map(|key| VerifyingKey::from_str(key.as_ref()).ok())
.flatten(),
verifying_key: verifying_key.and_then(|key| VerifyingKey::from_str(key.as_ref()).ok()),
}
}
}
@ -95,9 +125,7 @@ pub async fn process_command(
if !payload.verify_signature(&state.verifying_key) {
return (
StatusCode::UNAUTHORIZED,
Json(ProxyResponse {
message: "Invalid signature".to_string(),
}),
Json(ProxyResponse::Message("Invalid signature".to_string())),
);
}
match payload.command {
@ -111,17 +139,17 @@ pub async fn process_command(
if state.proxies.lock().unwrap().get(&id).is_some() {
return (
StatusCode::CONFLICT,
Json(ProxyResponse {
message: "Id already exists. Use the modify command instead.".to_string(),
}),
Json(ProxyResponse::Message(
"Id already exists. Use the modify command instead.".to_string(),
)),
);
}
if !state.ports.write().unwrap().insert(incoming_port) {
return (
StatusCode::CONFLICT,
Json(ProxyResponse {
message: format!("The `incoming_port` already in use: {incoming_port}"),
}),
Json(ProxyResponse::Message(format!(
"The `incoming_port` already in use: {incoming_port}"
))),
);
}
@ -138,11 +166,11 @@ pub async fn process_command(
add_proxy(incoming_port, rx).await.unwrap(); // TODO: error propagation??
(
StatusCode::ACCEPTED,
Json(ProxyResponse {
message: format!(
Json(ProxyResponse ::
Message( format!(
"Created tunnel {id} on port {incoming_port} to use {destination_ip}:{destination_port}"
),
}),
)),
)
}
Command::Modify {
@ -161,18 +189,14 @@ pub async fn process_command(
.unwrap();
(
StatusCode::ACCEPTED,
Json(ProxyResponse {
message: format!(
Json(ProxyResponse::Message(format!(
"Changed tunnel {id} to use {destination_ip}:{destination_port}"
),
}),
))),
)
} else {
(
StatusCode::NOT_FOUND,
Json(ProxyResponse {
message: format!("Id not found: {id}"),
}),
Json(ProxyResponse::Message(format!("Id not found: {id}"))),
)
}
}
@ -182,19 +206,27 @@ pub async fn process_command(
state.ports.write().unwrap().remove(&proxy.incoming_port);
(
StatusCode::ACCEPTED,
Json(ProxyResponse {
message: format!("Deleted tunnel: {id}"),
}),
Json(ProxyResponse::Message(format!("Deleted tunnel: {id}"))),
)
} else {
(
StatusCode::NOT_FOUND,
Json(ProxyResponse {
message: format!("Id not found: {id}"),
}),
Json(ProxyResponse::Message(format!("Id not found: {id}"))),
)
}
}
Command::Status => (
StatusCode::OK,
Json(ProxyResponse::Status {
tunnels: state
.proxies
.lock()
.unwrap()
.iter()
.map(|(key, value)| (*key, (value.incoming_port, value.destination)))
.collect(),
}),
),
}
}
@ -205,7 +237,7 @@ enum ProxyControlMessage {
}
async fn add_proxy(in_port: u16, control: Receiver<ProxyControlMessage>) -> anyhow::Result<()> {
let listener = TcpListener::bind(("127.0.0.1", in_port)).await?;
let listener = TcpListener::bind(("0.0.0.0", in_port)).await?;
tracing::info!("proxying port {in_port} to {:?}", *control.borrow());
@ -304,7 +336,10 @@ async fn transfer(
#[cfg(test)]
mod tests {
use std::net::{IpAddr, Ipv4Addr};
use std::{
net::{IpAddr, Ipv4Addr},
time,
};
use crate::{Command, ProxyCommand};
use p384::{
@ -324,10 +359,12 @@ mod tests {
destination_ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
id: uuid!("67e55044-10b1-426f-9247-bb680e5fe0c8"),
},
timestamp: Some(8888),
signature: Some(signature),
};
let expected = "{\"create\":{\"incoming_port\":5555,\"destination_port\":6666,\"\
destination_ip\":\"127.0.0.1\",\"id\":\"67e55044-10b1-426f-9247-bb680e5fe0c8\"},\
\"timestamp\":8888,\
\"signature\":\"\
5C912C4B3BFF2ADB49885DCBDB53D6D3041D0632E498CDFF\
2114CD2DCAC936AB0901B47C411E5BB57FE77BEF96044940\
@ -345,9 +382,11 @@ mod tests {
command: Command::Delete {
id: uuid!("67e55044-10b1-426f-9247-bb680e5fe0c8"),
},
timestamp: Some(987654),
signature: Some(signature),
};
let expected = "{\"delete\":{\"id\":\"67e55044-10b1-426f-9247-bb680e5fe0c8\"},\
\"timestamp\":987654,\
\"signature\":\"\
5C912C4B3BFF2ADB49885DCBDB53D6D3041D0632E498CDFF\
2114CD2DCAC936AB0901B47C411E5BB57FE77BEF96044940\
@ -359,6 +398,11 @@ mod tests {
#[test]
fn verify_signature() {
let subscriber = tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::TRACE)
.finish();
tracing::subscriber::set_global_default(subscriber).unwrap();
let command = Command::Create {
incoming_port: 4567,
destination_port: 7654,
@ -368,12 +412,18 @@ mod tests {
// Create signed message
let signing_key = SigningKey::random(&mut OsRng);
let message = serde_json::to_string(&command).unwrap();
let timestamp = time::SystemTime::now()
.duration_since(time::UNIX_EPOCH)
.unwrap()
.as_secs();
let mut message = serde_json::to_string(&command).unwrap();
message.push_str(&timestamp.to_string());
let signature: Signature = signing_key.sign(message.as_bytes());
let bytes = signature.to_bytes();
assert_eq!(bytes.len(), 96);
let proxy_command = ProxyCommand {
command,
timestamp: Some(timestamp),
signature: Some(signature),
};

5
status.sh Executable file
View File

@ -0,0 +1,5 @@
curl --header "Content-Type: application/json" \
--data '{
"status": null
}' \
http://localhost:14000/command