]>
Commit | Line | Data |
---|---|---|
768227f7 RBR |
1 | //! # Proxy Module |
2 | //! | |
3 | //! This module has the actual proxy functionality, exposed through | |
2fdda21d | 4 | //! `Server`. The proxy consists of a local unencrypted TCP stream |
768227f7 RBR |
5 | //! and a remote TLS stream. Messages are passed between them via two |
6 | //! threads. | |
7 | //! | |
8 | //! Each new client connection spawns two threads: | |
9 | //! - **Client to Server Thread**: Forwards data from client -> TLS server | |
10 | //! - **Serveer to Client Thread**: Forwards data from TLS server -> client | |
11 | //! | |
2fdda21d | 12 | //! Finally, the `Server` may be shutdown by calling `.shutdown()`, |
768227f7 RBR |
13 | //! this will stop new connections and wait for it to finish. |
14 | //! | |
15 | //! # Example | |
16 | //! | |
17 | //! ``` | |
18 | //! use std::sync::Arc; | |
2fdda21d RBR |
19 | //! use crate::configuration::Proxy; |
20 | //! use crate::proxy::Server; | |
768227f7 | 21 | //! |
2fdda21d | 22 | //! let config = Arc::new(Proxy { |
768227f7 RBR |
23 | //! protocol: "IMAP".to_string(), |
24 | //! local_port: 143, | |
25 | //! remote_domain: "imap.example.com".to_string(), | |
26 | //! remote_port: 993, | |
27 | //! }); | |
28 | //! | |
2fdda21d | 29 | //! let mut server = Server::new(config); |
768227f7 RBR |
30 | //! // The server runs in a background thread. To shut down gracefully: |
31 | //! server.shutdown(); | |
32 | //! ``` | |
33 | use log::{debug, error, info}; | |
dc3d6821 | 34 | use native_tls::TlsConnector; |
768227f7 | 35 | use std::io::{ErrorKind, Read, Write}; |
dc3d6821 | 36 | use std::net::{TcpListener, TcpStream}; |
768227f7 RBR |
37 | use std::sync::{ |
38 | atomic::{AtomicBool, Ordering}, | |
39 | Arc, Mutex, | |
40 | }; | |
41 | use std::thread::{sleep, spawn, JoinHandle}; | |
42 | use std::time::Duration; | |
dc3d6821 | 43 | |
2fdda21d | 44 | use crate::configuration::Proxy; |
bbacb35a | 45 | use crate::middleware::get as get_middleware; |
dc3d6821 | 46 | |
768227f7 RBR |
47 | /// A proxy server that listens for plaintext connections and forwards them |
48 | /// via TLS. | |
49 | /// | |
2fdda21d | 50 | /// Creating a new `Server` spawns a dedicated thread that: |
768227f7 RBR |
51 | /// - Binds to a local port (non-blocking mode). |
52 | /// - Spawns additional threads for each incoming client connection. | |
53 | /// - Manages connection-lifetime until it receives a shutdown signal. | |
2fdda21d | 54 | pub struct Server { |
768227f7 RBR |
55 | running: Arc<AtomicBool>, |
56 | thread_handle: Option<JoinHandle<()>>, | |
dc3d6821 RBR |
57 | } |
58 | ||
2fdda21d RBR |
59 | impl Server { |
60 | /// Creates a new `Server` for the given `Proxy` configuration and | |
768227f7 RBR |
61 | /// immediately starts it. |
62 | /// | |
63 | /// # Arguments | |
64 | /// | |
2fdda21d | 65 | /// * `configuration` - Shared (Arc) `Proxy` |
768227f7 RBR |
66 | /// |
67 | /// # Returns | |
68 | /// | |
2fdda21d | 69 | /// A `Server` instance that will keep running until its `.shutdown()` |
768227f7 | 70 | /// method is called, or an error occurs. |
2fdda21d | 71 | pub fn new(configuration: Arc<Proxy>) -> Self { |
768227f7 RBR |
72 | let running = Arc::new(AtomicBool::new(true)); |
73 | let running_clone = Arc::clone(&running); | |
dc3d6821 | 74 | |
768227f7 | 75 | let thread_handle = spawn(move || { |
494920f1 | 76 | run_proxy(&configuration, &running_clone); |
768227f7 | 77 | }); |
dc3d6821 | 78 | |
2fdda21d | 79 | Server { |
768227f7 RBR |
80 | running, |
81 | thread_handle: Some(thread_handle), | |
82 | } | |
83 | } | |
84 | ||
85 | /// Signals this proxy to stop accepting new connections and waits | |
86 | /// for all active connection threads to complete. | |
87 | pub fn shutdown(&mut self) { | |
88 | self.running.store(false, Ordering::SeqCst); | |
89 | if let Some(handle) = self.thread_handle.take() { | |
90 | let _ = handle.join(); | |
91 | } | |
92 | } | |
93 | } | |
94 | ||
95 | /// The main loop that listens for incoming (plaintext) connections on | |
96 | /// `configuration.bind_address:configuration.local_port`. | |
494920f1 | 97 | fn run_proxy(configuration: &Arc<Proxy>, running: &Arc<AtomicBool>) { |
768227f7 RBR |
98 | let listener = match TcpListener::bind(format!( |
99 | "{}:{}", | |
100 | configuration.bind_address, configuration.local_port | |
101 | )) { | |
102 | Ok(l) => l, | |
103 | Err(e) => { | |
104 | error!("Failed to bind to port {}: {}", configuration.local_port, e); | |
105 | return; | |
106 | } | |
107 | }; | |
108 | listener.set_nonblocking(true).unwrap(); | |
109 | ||
110 | info!( | |
111 | "{} proxy listening on port {}:{}", | |
112 | configuration.protocol, configuration.bind_address, configuration.local_port | |
113 | ); | |
114 | ||
115 | // Keep track of active connections so we can join them on shutdown | |
116 | let mut active_threads = vec![]; | |
117 | ||
118 | while running.load(Ordering::SeqCst) { | |
119 | match listener.accept() { | |
8ab8739c RBR |
120 | Ok((stream, address)) => { |
121 | info!("New {} connection from {}", configuration.protocol, address); | |
768227f7 | 122 | |
69414c29 | 123 | let configuration_clone = Arc::clone(configuration); |
768227f7 | 124 | let handle = spawn(move || { |
494920f1 | 125 | handle_client(stream, &configuration_clone); |
dc3d6821 | 126 | }); |
768227f7 RBR |
127 | active_threads.push(handle); |
128 | } | |
129 | Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { | |
130 | // No pending connection; sleep briefly then loop again | |
131 | sleep(Duration::from_millis(100)); | |
132 | continue; | |
dc3d6821 RBR |
133 | } |
134 | Err(e) => { | |
768227f7 RBR |
135 | error!("Error accepting connection: {}", e); |
136 | break; | |
dc3d6821 RBR |
137 | } |
138 | } | |
768227f7 RBR |
139 | |
140 | // Clean up any finished threads | |
141 | active_threads.retain(|thread| !thread.is_finished()); | |
142 | ||
143 | // Potential Improvement: Configure thread limit. | |
144 | if active_threads.len() >= 50 { | |
145 | sleep(Duration::from_millis(100)); | |
146 | } | |
147 | } | |
148 | ||
149 | // On shutdown, wait for all threads to finish | |
150 | for thread in active_threads { | |
151 | let _ = thread.join(); | |
dc3d6821 RBR |
152 | } |
153 | } | |
154 | ||
768227f7 | 155 | /// Handles a single client connection by bridging it (plaintext) to a TLS connection. |
494920f1 | 156 | fn handle_client(client_stream: TcpStream, configuration: &Arc<Proxy>) { |
768227f7 RBR |
157 | if let Err(e) = client_stream.set_nonblocking(true) { |
158 | error!("Failed to set client stream to nonblocking: {}", e); | |
159 | return; | |
160 | } | |
161 | ||
8ab8739c RBR |
162 | let available_middleware = get_middleware(); |
163 | let available_middleware_clone = Arc::clone(&available_middleware); | |
164 | ||
768227f7 RBR |
165 | let connector = match TlsConnector::new() { |
166 | Ok(c) => c, | |
167 | Err(e) => { | |
168 | error!("Failed to create TLS connector: {}", e); | |
169 | return; | |
170 | } | |
171 | }; | |
172 | ||
8ab8739c | 173 | let remote_address = format!( |
dc3d6821 | 174 | "{}:{}", |
768227f7 | 175 | configuration.remote_host, configuration.remote_port |
dc3d6821 | 176 | ); |
8ab8739c | 177 | let tcp_stream = match TcpStream::connect(&remote_address) { |
768227f7 RBR |
178 | Ok(stream) => stream, |
179 | Err(e) => { | |
8ab8739c | 180 | error!("Failed to connect to {}: {}", remote_address, e); |
768227f7 RBR |
181 | return; |
182 | } | |
183 | }; | |
184 | ||
185 | let tls_stream = match connector.connect(&configuration.remote_host, tcp_stream) { | |
186 | Ok(tls_stream) => tls_stream, | |
187 | Err(e) => { | |
188 | error!( | |
189 | "TLS handshake to {} failed: {}", | |
190 | configuration.remote_host, e | |
191 | ); | |
192 | return; | |
193 | } | |
194 | }; | |
195 | ||
196 | // The nonblocking needs to be set AFTER the TLS handshake is completed. | |
197 | // Otherwise the TLS handshake is interrupted. | |
198 | if let Err(e) = tls_stream.get_ref().set_nonblocking(true) { | |
199 | error!("Failed to set remote stream to nonblocking: {}", e); | |
200 | return; | |
201 | } | |
202 | ||
203 | let tls_stream = Arc::new(Mutex::new(tls_stream)); | |
204 | ||
205 | let client_stream_clone = match client_stream.try_clone() { | |
206 | Ok(s) => s, | |
207 | Err(e) => { | |
208 | error!("Failed to clone client stream: {}", e); | |
209 | return; | |
210 | } | |
211 | }; | |
212 | ||
213 | // Client to Server Thread | |
214 | let tls_stream_clone = Arc::clone(&tls_stream); | |
215 | let client_to_server = spawn(move || { | |
216 | debug!(">>> BEGIN"); | |
217 | let mut buffer = [0u8; 8192]; | |
218 | let mut client_reader = client_stream; | |
219 | loop { | |
768227f7 RBR |
220 | let bytes_read = match client_reader.read(&mut buffer) { |
221 | Ok(0) => break, | |
222 | Ok(n) => n, | |
223 | Err(ref e) if e.kind() == ErrorKind::WouldBlock => { | |
224 | sleep(Duration::from_millis(10)); | |
225 | continue; | |
226 | } | |
227 | Err(error) => { | |
228 | debug!(">>> Error reading buffer {error}"); | |
229 | break; | |
230 | } | |
231 | }; | |
dc3d6821 | 232 | |
573aaf2a RBR |
233 | let mut command = buffer[..bytes_read].to_vec(); |
234 | ||
8ab8739c RBR |
235 | if let Ok(mut guard) = available_middleware.lock() { |
236 | for middleware in guard.iter_mut() { | |
237 | command = middleware.client_message(&command); | |
238 | } | |
573aaf2a RBR |
239 | } |
240 | ||
df07d7b0 | 241 | let debug_original = String::from_utf8_lossy(&buffer[..bytes_read]) |
768227f7 RBR |
242 | .replace('\n', "\\n") |
243 | .replace('\r', "\\r") | |
244 | .replace('\t', "\\t"); | |
df07d7b0 RBR |
245 | |
246 | let debug_final = String::from_utf8_lossy(&command) | |
247 | .replace('\n', "\\n") | |
248 | .replace('\r', "\\r") | |
249 | .replace('\t', "\\t"); | |
250 | ||
251 | debug!(">>> {debug_original}"); | |
6aebf7f9 RBR |
252 | if debug_original != debug_final { |
253 | debug!("### {debug_final}"); | |
254 | } | |
768227f7 RBR |
255 | |
256 | // Lock the TLS stream and write the data to server | |
257 | match tls_stream_clone.lock() { | |
258 | Ok(mut tls_guard) => { | |
b5234d6f | 259 | if let Err(error) = tls_guard.write_all(&command) { |
768227f7 RBR |
260 | debug!(">>> Error writing to server: {error}"); |
261 | break; | |
262 | } | |
263 | if let Err(error) = tls_guard.flush() { | |
264 | debug!(">>> Error flushing server connection: {error}"); | |
265 | break; | |
266 | } | |
267 | } | |
268 | Err(error) => { | |
269 | debug!(">>> Error acquiring TLS stream lock: {error}"); | |
dc3d6821 RBR |
270 | break; |
271 | } | |
768227f7 RBR |
272 | } |
273 | } | |
274 | }); | |
275 | ||
276 | // Server to Client Thread | |
277 | let tls_stream_clone = Arc::clone(&tls_stream); | |
278 | let server_to_client = spawn(move || { | |
279 | debug!("<<< BEGIN"); | |
280 | let mut buffer = [0u8; 8192]; | |
281 | let mut client_writer = client_stream_clone; | |
282 | loop { | |
768227f7 RBR |
283 | // Lock the TLS stream and read from the server |
284 | let bytes_read = match tls_stream_clone.lock() { | |
285 | Ok(mut tls_guard) => match tls_guard.read(&mut buffer) { | |
286 | Ok(0) => break, // TLS server closed | |
287 | Ok(n) => n, | |
288 | Err(ref e) if e.kind() == ErrorKind::WouldBlock => { | |
289 | sleep(Duration::from_millis(10)); | |
290 | continue; | |
291 | } | |
292 | Err(error) => { | |
293 | debug!("<<< Error reading buffer {error}"); | |
294 | break; | |
295 | } | |
296 | }, | |
297 | Err(error) => { | |
298 | debug!("<<< Error Cloning TLS {error}"); | |
dc3d6821 RBR |
299 | break; |
300 | } | |
768227f7 RBR |
301 | }; |
302 | ||
b5234d6f RBR |
303 | let mut command = buffer[..bytes_read].to_vec(); |
304 | ||
8ab8739c RBR |
305 | if let Ok(mut guard) = available_middleware_clone.lock() { |
306 | for middleware in guard.iter_mut() { | |
307 | command = middleware.server_message(&command); | |
308 | } | |
b5234d6f RBR |
309 | } |
310 | ||
df07d7b0 RBR |
311 | let debug_original = String::from_utf8_lossy(&buffer[..bytes_read]) |
312 | .replace('\n', "\\n") | |
313 | .replace('\r', "\\r") | |
314 | .replace('\t', "\\t"); | |
315 | ||
316 | let debug_final = String::from_utf8_lossy(&command) | |
768227f7 RBR |
317 | .replace('\n', "\\n") |
318 | .replace('\r', "\\r") | |
319 | .replace('\t', "\\t"); | |
df07d7b0 RBR |
320 | debug!("<<< {debug_original}"); |
321 | debug!("### {debug_final}"); | |
768227f7 RBR |
322 | |
323 | // Write decrypted data to client | |
b5234d6f | 324 | if client_writer.write_all(&command).is_err() { |
768227f7 RBR |
325 | debug!("<<< ERR"); |
326 | break; | |
dc3d6821 | 327 | } |
768227f7 RBR |
328 | if client_writer.flush().is_err() { |
329 | debug!("<<< ERR"); | |
dc3d6821 RBR |
330 | break; |
331 | } | |
332 | } | |
768227f7 RBR |
333 | }); |
334 | ||
335 | // Wait for both directions to finish | |
336 | let _ = client_to_server.join(); | |
337 | let _ = server_to_client.join(); | |
dc3d6821 | 338 | } |