3 //! This module has the actual proxy functionality, exposed through
4 //! `Server`. The proxy consists of a local unencrypted TCP stream
5 //! and a remote TLS stream. Messages are passed between them via two
8 //! Each new client connection spawns two threads:
9 //! - **Client to Server Thread**: Forwards data from client -> TLS server
10 //! - **Serveer to Client Thread**: Forwards data from TLS server -> client
12 //! Finally, the `Server` may be shutdown by calling `.shutdown()`,
13 //! this will stop new connections and wait for it to finish.
18 //! use std::sync::Arc;
19 //! use crate::configuration::Proxy;
20 //! use crate::proxy::Server;
22 //! let config = Arc::new(Proxy {
23 //! protocol: "IMAP".to_string(),
25 //! remote_domain: "imap.example.com".to_string(),
29 //! let mut server = Server::new(config);
30 //! // The server runs in a background thread. To shut down gracefully:
31 //! server.shutdown();
33 use log::{debug, error, info};
34 use native_tls::TlsConnector;
35 use std::io::{ErrorKind, Read, Write};
36 use std::net::{TcpListener, TcpStream};
38 atomic::{AtomicBool, Ordering},
41 use std::thread::{sleep, spawn, JoinHandle};
42 use std::time::Duration;
44 use crate::configuration::Proxy;
45 use crate::middleware::{SERVER_MIDDLEWARE, CLIENT_MIDDLEWARE};
47 /// A proxy server that listens for plaintext connections and forwards them
50 /// Creating a new `Server` spawns a dedicated thread that:
51 /// - Binds to a local port (non-blocking mode).
52 /// - Spawns additional threads for each incoming client connection.
53 /// - Manages connection-lifetime until it receives a shutdown signal.
55 running: Arc<AtomicBool>,
56 thread_handle: Option<JoinHandle<()>>,
60 /// Creates a new `Server` for the given `Proxy` configuration and
61 /// immediately starts it.
65 /// * `configuration` - Shared (Arc) `Proxy`
69 /// A `Server` instance that will keep running until its `.shutdown()`
70 /// method is called, or an error occurs.
71 pub fn new(configuration: Arc<Proxy>) -> Self {
72 let running = Arc::new(AtomicBool::new(true));
73 let running_clone = Arc::clone(&running);
75 let thread_handle = spawn(move || {
76 run_proxy(&configuration, &running_clone);
81 thread_handle: Some(thread_handle),
85 /// Signals this proxy to stop accepting new connections and waits
86 /// for all active connection threads to complete.
87 pub fn shutdown(&mut self) {
88 self.running.store(false, Ordering::SeqCst);
89 if let Some(handle) = self.thread_handle.take() {
90 let _ = handle.join();
95 /// The main loop that listens for incoming (plaintext) connections on
96 /// `configuration.bind_address:configuration.local_port`.
97 fn run_proxy(configuration: &Arc<Proxy>, running: &Arc<AtomicBool>) {
98 let listener = match TcpListener::bind(format!(
100 configuration.bind_address, configuration.local_port
104 error!("Failed to bind to port {}: {}", configuration.local_port, e);
108 listener.set_nonblocking(true).unwrap();
111 "{} proxy listening on port {}:{}",
112 configuration.protocol, configuration.bind_address, configuration.local_port
115 // Keep track of active connections so we can join them on shutdown
116 let mut active_threads = vec![];
118 while running.load(Ordering::SeqCst) {
119 match listener.accept() {
120 Ok((stream, addr)) => {
121 info!("New {} connection from {}", configuration.protocol, addr);
123 let configuration_clone = Arc::clone(configuration);
124 let handle = spawn(move || {
125 handle_client(stream, &configuration_clone);
127 active_threads.push(handle);
129 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
130 // No pending connection; sleep briefly then loop again
131 sleep(Duration::from_millis(100));
135 error!("Error accepting connection: {}", e);
140 // Clean up any finished threads
141 active_threads.retain(|thread| !thread.is_finished());
143 // Potential Improvement: Configure thread limit.
144 if active_threads.len() >= 50 {
145 sleep(Duration::from_millis(100));
149 // On shutdown, wait for all threads to finish
150 for thread in active_threads {
151 let _ = thread.join();
155 /// Handles a single client connection by bridging it (plaintext) to a TLS connection.
156 fn handle_client(client_stream: TcpStream, configuration: &Arc<Proxy>) {
157 if let Err(e) = client_stream.set_nonblocking(true) {
158 error!("Failed to set client stream to nonblocking: {}", e);
162 let connector = match TlsConnector::new() {
165 error!("Failed to create TLS connector: {}", e);
170 let remote_addr = format!(
172 configuration.remote_host, configuration.remote_port
174 let tcp_stream = match TcpStream::connect(&remote_addr) {
175 Ok(stream) => stream,
177 error!("Failed to connect to {}: {}", remote_addr, e);
182 let tls_stream = match connector.connect(&configuration.remote_host, tcp_stream) {
183 Ok(tls_stream) => tls_stream,
186 "TLS handshake to {} failed: {}",
187 configuration.remote_host, e
193 // The nonblocking needs to be set AFTER the TLS handshake is completed.
194 // Otherwise the TLS handshake is interrupted.
195 if let Err(e) = tls_stream.get_ref().set_nonblocking(true) {
196 error!("Failed to set remote stream to nonblocking: {}", e);
200 let tls_stream = Arc::new(Mutex::new(tls_stream));
202 let client_stream_clone = match client_stream.try_clone() {
205 error!("Failed to clone client stream: {}", e);
210 // Client to Server Thread
211 let tls_stream_clone = Arc::clone(&tls_stream);
212 let client_to_server = spawn(move || {
214 let mut buffer = [0u8; 8192];
215 let mut client_reader = client_stream;
217 let bytes_read = match client_reader.read(&mut buffer) {
220 Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
221 sleep(Duration::from_millis(10));
225 debug!(">>> Error reading buffer {error}");
230 let mut command = buffer[..bytes_read].to_vec();
232 for middleware in CLIENT_MIDDLEWARE {
233 command = middleware(&command);
236 let debug_str = String::from_utf8_lossy(&command)
237 .replace('\n', "\\n")
238 .replace('\r', "\\r")
239 .replace('\t', "\\t");
240 debug!(">>> {}", debug_str);
242 // Lock the TLS stream and write the data to server
243 match tls_stream_clone.lock() {
244 Ok(mut tls_guard) => {
245 if let Err(error) = tls_guard.write_all(&command) {
246 debug!(">>> Error writing to server: {error}");
249 if let Err(error) = tls_guard.flush() {
250 debug!(">>> Error flushing server connection: {error}");
255 debug!(">>> Error acquiring TLS stream lock: {error}");
262 // Server to Client Thread
263 let tls_stream_clone = Arc::clone(&tls_stream);
264 let server_to_client = spawn(move || {
266 let mut buffer = [0u8; 8192];
267 let mut client_writer = client_stream_clone;
269 // Lock the TLS stream and read from the server
270 let bytes_read = match tls_stream_clone.lock() {
271 Ok(mut tls_guard) => match tls_guard.read(&mut buffer) {
272 Ok(0) => break, // TLS server closed
274 Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
275 sleep(Duration::from_millis(10));
279 debug!("<<< Error reading buffer {error}");
284 debug!("<<< Error Cloning TLS {error}");
289 let mut command = buffer[..bytes_read].to_vec();
291 for middleware in SERVER_MIDDLEWARE {
292 command = middleware(&command);
295 let debug_str = String::from_utf8_lossy(&command)
296 .replace('\n', "\\n")
297 .replace('\r', "\\r")
298 .replace('\t', "\\t");
299 debug!("<<< {}", debug_str);
301 // Write decrypted data to client
302 if client_writer.write_all(&command).is_err() {
306 if client_writer.flush().is_err() {
313 // Wait for both directions to finish
314 let _ = client_to_server.join();
315 let _ = server_to_client.join();