]> git.r.bdr.sh - rbdr/olden-mail/blame - src/proxy.rs
Actually terminate with a newline
[rbdr/olden-mail] / src / proxy.rs
CommitLineData
768227f7
RBR
1//! # Proxy Module
2//!
3//! This module has the actual proxy functionality, exposed through
2fdda21d 4//! `Server`. The proxy consists of a local unencrypted TCP stream
768227f7
RBR
5//! and a remote TLS stream. Messages are passed between them via two
6//! threads.
7//!
8//! Each new client connection spawns two threads:
9//! - **Client to Server Thread**: Forwards data from client -> TLS server
10//! - **Serveer to Client Thread**: Forwards data from TLS server -> client
11//!
2fdda21d 12//! Finally, the `Server` may be shutdown by calling `.shutdown()`,
768227f7
RBR
13//! this will stop new connections and wait for it to finish.
14//!
15//! # Example
16//!
17//! ```
18//! use std::sync::Arc;
2fdda21d
RBR
19//! use crate::configuration::Proxy;
20//! use crate::proxy::Server;
768227f7 21//!
2fdda21d 22//! let config = Arc::new(Proxy {
768227f7
RBR
23//! protocol: "IMAP".to_string(),
24//! local_port: 143,
25//! remote_domain: "imap.example.com".to_string(),
26//! remote_port: 993,
27//! });
28//!
2fdda21d 29//! let mut server = Server::new(config);
768227f7
RBR
30//! // The server runs in a background thread. To shut down gracefully:
31//! server.shutdown();
32//! ```
33use log::{debug, error, info};
dc3d6821 34use native_tls::TlsConnector;
768227f7 35use std::io::{ErrorKind, Read, Write};
dc3d6821 36use std::net::{TcpListener, TcpStream};
768227f7
RBR
37use std::sync::{
38 atomic::{AtomicBool, Ordering},
39 Arc, Mutex,
40};
41use std::thread::{sleep, spawn, JoinHandle};
42use std::time::Duration;
dc3d6821 43
2fdda21d 44use crate::configuration::Proxy;
bbacb35a 45use crate::middleware::get as get_middleware;
dc3d6821 46
768227f7
RBR
47/// A proxy server that listens for plaintext connections and forwards them
48/// via TLS.
49///
2fdda21d 50/// Creating a new `Server` spawns a dedicated thread that:
768227f7
RBR
51/// - Binds to a local port (non-blocking mode).
52/// - Spawns additional threads for each incoming client connection.
53/// - Manages connection-lifetime until it receives a shutdown signal.
2fdda21d 54pub struct Server {
768227f7
RBR
55 running: Arc<AtomicBool>,
56 thread_handle: Option<JoinHandle<()>>,
dc3d6821
RBR
57}
58
2fdda21d
RBR
59impl Server {
60 /// Creates a new `Server` for the given `Proxy` configuration and
768227f7
RBR
61 /// immediately starts it.
62 ///
63 /// # Arguments
64 ///
2fdda21d 65 /// * `configuration` - Shared (Arc) `Proxy`
768227f7
RBR
66 ///
67 /// # Returns
68 ///
2fdda21d 69 /// A `Server` instance that will keep running until its `.shutdown()`
768227f7 70 /// method is called, or an error occurs.
2fdda21d 71 pub fn new(configuration: Arc<Proxy>) -> Self {
768227f7
RBR
72 let running = Arc::new(AtomicBool::new(true));
73 let running_clone = Arc::clone(&running);
dc3d6821 74
768227f7 75 let thread_handle = spawn(move || {
494920f1 76 run_proxy(&configuration, &running_clone);
768227f7 77 });
dc3d6821 78
2fdda21d 79 Server {
768227f7
RBR
80 running,
81 thread_handle: Some(thread_handle),
82 }
83 }
84
85 /// Signals this proxy to stop accepting new connections and waits
86 /// for all active connection threads to complete.
87 pub fn shutdown(&mut self) {
88 self.running.store(false, Ordering::SeqCst);
89 if let Some(handle) = self.thread_handle.take() {
90 let _ = handle.join();
91 }
92 }
93}
94
95/// The main loop that listens for incoming (plaintext) connections on
96/// `configuration.bind_address:configuration.local_port`.
494920f1 97fn run_proxy(configuration: &Arc<Proxy>, running: &Arc<AtomicBool>) {
768227f7
RBR
98 let listener = match TcpListener::bind(format!(
99 "{}:{}",
100 configuration.bind_address, configuration.local_port
101 )) {
102 Ok(l) => l,
103 Err(e) => {
104 error!("Failed to bind to port {}: {}", configuration.local_port, e);
105 return;
106 }
107 };
108 listener.set_nonblocking(true).unwrap();
109
110 info!(
111 "{} proxy listening on port {}:{}",
112 configuration.protocol, configuration.bind_address, configuration.local_port
113 );
114
115 // Keep track of active connections so we can join them on shutdown
116 let mut active_threads = vec![];
117
118 while running.load(Ordering::SeqCst) {
119 match listener.accept() {
8ab8739c
RBR
120 Ok((stream, address)) => {
121 info!("New {} connection from {}", configuration.protocol, address);
768227f7 122
69414c29 123 let configuration_clone = Arc::clone(configuration);
768227f7 124 let handle = spawn(move || {
494920f1 125 handle_client(stream, &configuration_clone);
dc3d6821 126 });
768227f7
RBR
127 active_threads.push(handle);
128 }
129 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
130 // No pending connection; sleep briefly then loop again
131 sleep(Duration::from_millis(100));
132 continue;
dc3d6821
RBR
133 }
134 Err(e) => {
768227f7
RBR
135 error!("Error accepting connection: {}", e);
136 break;
dc3d6821
RBR
137 }
138 }
768227f7
RBR
139
140 // Clean up any finished threads
141 active_threads.retain(|thread| !thread.is_finished());
142
143 // Potential Improvement: Configure thread limit.
144 if active_threads.len() >= 50 {
145 sleep(Duration::from_millis(100));
146 }
147 }
148
149 // On shutdown, wait for all threads to finish
150 for thread in active_threads {
151 let _ = thread.join();
dc3d6821
RBR
152 }
153}
154
768227f7 155/// Handles a single client connection by bridging it (plaintext) to a TLS connection.
494920f1 156fn handle_client(client_stream: TcpStream, configuration: &Arc<Proxy>) {
768227f7
RBR
157 if let Err(e) = client_stream.set_nonblocking(true) {
158 error!("Failed to set client stream to nonblocking: {}", e);
159 return;
160 }
161
8ab8739c
RBR
162 let available_middleware = get_middleware();
163 let available_middleware_clone = Arc::clone(&available_middleware);
164
768227f7
RBR
165 let connector = match TlsConnector::new() {
166 Ok(c) => c,
167 Err(e) => {
168 error!("Failed to create TLS connector: {}", e);
169 return;
170 }
171 };
172
8ab8739c 173 let remote_address = format!(
dc3d6821 174 "{}:{}",
768227f7 175 configuration.remote_host, configuration.remote_port
dc3d6821 176 );
8ab8739c 177 let tcp_stream = match TcpStream::connect(&remote_address) {
768227f7
RBR
178 Ok(stream) => stream,
179 Err(e) => {
8ab8739c 180 error!("Failed to connect to {}: {}", remote_address, e);
768227f7
RBR
181 return;
182 }
183 };
184
185 let tls_stream = match connector.connect(&configuration.remote_host, tcp_stream) {
186 Ok(tls_stream) => tls_stream,
187 Err(e) => {
188 error!(
189 "TLS handshake to {} failed: {}",
190 configuration.remote_host, e
191 );
192 return;
193 }
194 };
195
196 // The nonblocking needs to be set AFTER the TLS handshake is completed.
197 // Otherwise the TLS handshake is interrupted.
198 if let Err(e) = tls_stream.get_ref().set_nonblocking(true) {
199 error!("Failed to set remote stream to nonblocking: {}", e);
200 return;
201 }
202
203 let tls_stream = Arc::new(Mutex::new(tls_stream));
204
205 let client_stream_clone = match client_stream.try_clone() {
206 Ok(s) => s,
207 Err(e) => {
208 error!("Failed to clone client stream: {}", e);
209 return;
210 }
211 };
212
213 // Client to Server Thread
214 let tls_stream_clone = Arc::clone(&tls_stream);
215 let client_to_server = spawn(move || {
216 debug!(">>> BEGIN");
217 let mut buffer = [0u8; 8192];
218 let mut client_reader = client_stream;
219 loop {
768227f7
RBR
220 let bytes_read = match client_reader.read(&mut buffer) {
221 Ok(0) => break,
222 Ok(n) => n,
223 Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
224 sleep(Duration::from_millis(10));
225 continue;
226 }
227 Err(error) => {
228 debug!(">>> Error reading buffer {error}");
229 break;
230 }
231 };
dc3d6821 232
573aaf2a
RBR
233 let mut command = buffer[..bytes_read].to_vec();
234
8ab8739c
RBR
235 if let Ok(mut guard) = available_middleware.lock() {
236 for middleware in guard.iter_mut() {
237 command = middleware.client_message(&command);
238 }
573aaf2a
RBR
239 }
240
df07d7b0 241 let debug_original = String::from_utf8_lossy(&buffer[..bytes_read])
768227f7
RBR
242 .replace('\n', "\\n")
243 .replace('\r', "\\r")
244 .replace('\t', "\\t");
df07d7b0
RBR
245
246 let debug_final = String::from_utf8_lossy(&command)
247 .replace('\n', "\\n")
248 .replace('\r', "\\r")
249 .replace('\t', "\\t");
250
251 debug!(">>> {debug_original}");
6aebf7f9
RBR
252 if debug_original != debug_final {
253 debug!("### {debug_final}");
254 }
768227f7
RBR
255
256 // Lock the TLS stream and write the data to server
257 match tls_stream_clone.lock() {
258 Ok(mut tls_guard) => {
b5234d6f 259 if let Err(error) = tls_guard.write_all(&command) {
768227f7
RBR
260 debug!(">>> Error writing to server: {error}");
261 break;
262 }
263 if let Err(error) = tls_guard.flush() {
264 debug!(">>> Error flushing server connection: {error}");
265 break;
266 }
267 }
268 Err(error) => {
269 debug!(">>> Error acquiring TLS stream lock: {error}");
dc3d6821
RBR
270 break;
271 }
768227f7
RBR
272 }
273 }
274 });
275
276 // Server to Client Thread
277 let tls_stream_clone = Arc::clone(&tls_stream);
278 let server_to_client = spawn(move || {
279 debug!("<<< BEGIN");
280 let mut buffer = [0u8; 8192];
281 let mut client_writer = client_stream_clone;
282 loop {
768227f7
RBR
283 // Lock the TLS stream and read from the server
284 let bytes_read = match tls_stream_clone.lock() {
285 Ok(mut tls_guard) => match tls_guard.read(&mut buffer) {
286 Ok(0) => break, // TLS server closed
287 Ok(n) => n,
288 Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
289 sleep(Duration::from_millis(10));
290 continue;
291 }
292 Err(error) => {
293 debug!("<<< Error reading buffer {error}");
294 break;
295 }
296 },
297 Err(error) => {
298 debug!("<<< Error Cloning TLS {error}");
dc3d6821
RBR
299 break;
300 }
768227f7
RBR
301 };
302
b5234d6f
RBR
303 let mut command = buffer[..bytes_read].to_vec();
304
8ab8739c
RBR
305 if let Ok(mut guard) = available_middleware_clone.lock() {
306 for middleware in guard.iter_mut() {
307 command = middleware.server_message(&command);
308 }
b5234d6f
RBR
309 }
310
df07d7b0
RBR
311 let debug_original = String::from_utf8_lossy(&buffer[..bytes_read])
312 .replace('\n', "\\n")
313 .replace('\r', "\\r")
314 .replace('\t', "\\t");
315
316 let debug_final = String::from_utf8_lossy(&command)
768227f7
RBR
317 .replace('\n', "\\n")
318 .replace('\r', "\\r")
319 .replace('\t', "\\t");
df07d7b0 320 debug!("<<< {debug_original}");
35b52518
RBR
321 if debug_original != debug_final {
322 debug!("### {debug_final}");
323 }
768227f7
RBR
324
325 // Write decrypted data to client
b5234d6f 326 if client_writer.write_all(&command).is_err() {
768227f7
RBR
327 debug!("<<< ERR");
328 break;
dc3d6821 329 }
768227f7
RBR
330 if client_writer.flush().is_err() {
331 debug!("<<< ERR");
dc3d6821
RBR
332 break;
333 }
334 }
768227f7
RBR
335 });
336
337 // Wait for both directions to finish
338 let _ = client_to_server.join();
339 let _ = server_to_client.join();
dc3d6821 340}