]>
Commit | Line | Data |
---|---|---|
768227f7 RBR |
1 | //! # Proxy Module |
2 | //! | |
3 | //! This module has the actual proxy functionality, exposed through | |
2fdda21d | 4 | //! `Server`. The proxy consists of a local unencrypted TCP stream |
768227f7 RBR |
5 | //! and a remote TLS stream. Messages are passed between them via two |
6 | //! threads. | |
7 | //! | |
8 | //! Each new client connection spawns two threads: | |
9 | //! - **Client to Server Thread**: Forwards data from client -> TLS server | |
10 | //! - **Serveer to Client Thread**: Forwards data from TLS server -> client | |
11 | //! | |
2fdda21d | 12 | //! Finally, the `Server` may be shutdown by calling `.shutdown()`, |
768227f7 RBR |
13 | //! this will stop new connections and wait for it to finish. |
14 | //! | |
15 | //! # Example | |
16 | //! | |
17 | //! ``` | |
18 | //! use std::sync::Arc; | |
2fdda21d RBR |
19 | //! use crate::configuration::Proxy; |
20 | //! use crate::proxy::Server; | |
768227f7 | 21 | //! |
2fdda21d | 22 | //! let config = Arc::new(Proxy { |
768227f7 RBR |
23 | //! protocol: "IMAP".to_string(), |
24 | //! local_port: 143, | |
25 | //! remote_domain: "imap.example.com".to_string(), | |
26 | //! remote_port: 993, | |
27 | //! }); | |
28 | //! | |
2fdda21d | 29 | //! let mut server = Server::new(config); |
768227f7 RBR |
30 | //! // The server runs in a background thread. To shut down gracefully: |
31 | //! server.shutdown(); | |
32 | //! ``` | |
33 | use log::{debug, error, info}; | |
dc3d6821 | 34 | use native_tls::TlsConnector; |
768227f7 | 35 | use std::io::{ErrorKind, Read, Write}; |
dc3d6821 | 36 | use std::net::{TcpListener, TcpStream}; |
768227f7 RBR |
37 | use std::sync::{ |
38 | atomic::{AtomicBool, Ordering}, | |
39 | Arc, Mutex, | |
40 | }; | |
41 | use std::thread::{sleep, spawn, JoinHandle}; | |
42 | use std::time::Duration; | |
dc3d6821 | 43 | |
2fdda21d | 44 | use crate::configuration::Proxy; |
66bcf7c1 | 45 | use crate::middleware::{CLIENT_MIDDLEWARE, SERVER_MIDDLEWARE}; |
dc3d6821 | 46 | |
768227f7 RBR |
47 | /// A proxy server that listens for plaintext connections and forwards them |
48 | /// via TLS. | |
49 | /// | |
2fdda21d | 50 | /// Creating a new `Server` spawns a dedicated thread that: |
768227f7 RBR |
51 | /// - Binds to a local port (non-blocking mode). |
52 | /// - Spawns additional threads for each incoming client connection. | |
53 | /// - Manages connection-lifetime until it receives a shutdown signal. | |
2fdda21d | 54 | pub struct Server { |
768227f7 RBR |
55 | running: Arc<AtomicBool>, |
56 | thread_handle: Option<JoinHandle<()>>, | |
dc3d6821 RBR |
57 | } |
58 | ||
2fdda21d RBR |
59 | impl Server { |
60 | /// Creates a new `Server` for the given `Proxy` configuration and | |
768227f7 RBR |
61 | /// immediately starts it. |
62 | /// | |
63 | /// # Arguments | |
64 | /// | |
2fdda21d | 65 | /// * `configuration` - Shared (Arc) `Proxy` |
768227f7 RBR |
66 | /// |
67 | /// # Returns | |
68 | /// | |
2fdda21d | 69 | /// A `Server` instance that will keep running until its `.shutdown()` |
768227f7 | 70 | /// method is called, or an error occurs. |
2fdda21d | 71 | pub fn new(configuration: Arc<Proxy>) -> Self { |
768227f7 RBR |
72 | let running = Arc::new(AtomicBool::new(true)); |
73 | let running_clone = Arc::clone(&running); | |
dc3d6821 | 74 | |
768227f7 | 75 | let thread_handle = spawn(move || { |
494920f1 | 76 | run_proxy(&configuration, &running_clone); |
768227f7 | 77 | }); |
dc3d6821 | 78 | |
2fdda21d | 79 | Server { |
768227f7 RBR |
80 | running, |
81 | thread_handle: Some(thread_handle), | |
82 | } | |
83 | } | |
84 | ||
85 | /// Signals this proxy to stop accepting new connections and waits | |
86 | /// for all active connection threads to complete. | |
87 | pub fn shutdown(&mut self) { | |
88 | self.running.store(false, Ordering::SeqCst); | |
89 | if let Some(handle) = self.thread_handle.take() { | |
90 | let _ = handle.join(); | |
91 | } | |
92 | } | |
93 | } | |
94 | ||
95 | /// The main loop that listens for incoming (plaintext) connections on | |
96 | /// `configuration.bind_address:configuration.local_port`. | |
494920f1 | 97 | fn run_proxy(configuration: &Arc<Proxy>, running: &Arc<AtomicBool>) { |
768227f7 RBR |
98 | let listener = match TcpListener::bind(format!( |
99 | "{}:{}", | |
100 | configuration.bind_address, configuration.local_port | |
101 | )) { | |
102 | Ok(l) => l, | |
103 | Err(e) => { | |
104 | error!("Failed to bind to port {}: {}", configuration.local_port, e); | |
105 | return; | |
106 | } | |
107 | }; | |
108 | listener.set_nonblocking(true).unwrap(); | |
109 | ||
110 | info!( | |
111 | "{} proxy listening on port {}:{}", | |
112 | configuration.protocol, configuration.bind_address, configuration.local_port | |
113 | ); | |
114 | ||
115 | // Keep track of active connections so we can join them on shutdown | |
116 | let mut active_threads = vec![]; | |
117 | ||
118 | while running.load(Ordering::SeqCst) { | |
119 | match listener.accept() { | |
120 | Ok((stream, addr)) => { | |
121 | info!("New {} connection from {}", configuration.protocol, addr); | |
122 | ||
69414c29 | 123 | let configuration_clone = Arc::clone(configuration); |
768227f7 | 124 | let handle = spawn(move || { |
494920f1 | 125 | handle_client(stream, &configuration_clone); |
dc3d6821 | 126 | }); |
768227f7 RBR |
127 | active_threads.push(handle); |
128 | } | |
129 | Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { | |
130 | // No pending connection; sleep briefly then loop again | |
131 | sleep(Duration::from_millis(100)); | |
132 | continue; | |
dc3d6821 RBR |
133 | } |
134 | Err(e) => { | |
768227f7 RBR |
135 | error!("Error accepting connection: {}", e); |
136 | break; | |
dc3d6821 RBR |
137 | } |
138 | } | |
768227f7 RBR |
139 | |
140 | // Clean up any finished threads | |
141 | active_threads.retain(|thread| !thread.is_finished()); | |
142 | ||
143 | // Potential Improvement: Configure thread limit. | |
144 | if active_threads.len() >= 50 { | |
145 | sleep(Duration::from_millis(100)); | |
146 | } | |
147 | } | |
148 | ||
149 | // On shutdown, wait for all threads to finish | |
150 | for thread in active_threads { | |
151 | let _ = thread.join(); | |
dc3d6821 RBR |
152 | } |
153 | } | |
154 | ||
768227f7 | 155 | /// Handles a single client connection by bridging it (plaintext) to a TLS connection. |
494920f1 | 156 | fn handle_client(client_stream: TcpStream, configuration: &Arc<Proxy>) { |
768227f7 RBR |
157 | if let Err(e) = client_stream.set_nonblocking(true) { |
158 | error!("Failed to set client stream to nonblocking: {}", e); | |
159 | return; | |
160 | } | |
161 | ||
162 | let connector = match TlsConnector::new() { | |
163 | Ok(c) => c, | |
164 | Err(e) => { | |
165 | error!("Failed to create TLS connector: {}", e); | |
166 | return; | |
167 | } | |
168 | }; | |
169 | ||
170 | let remote_addr = format!( | |
dc3d6821 | 171 | "{}:{}", |
768227f7 | 172 | configuration.remote_host, configuration.remote_port |
dc3d6821 | 173 | ); |
768227f7 RBR |
174 | let tcp_stream = match TcpStream::connect(&remote_addr) { |
175 | Ok(stream) => stream, | |
176 | Err(e) => { | |
177 | error!("Failed to connect to {}: {}", remote_addr, e); | |
178 | return; | |
179 | } | |
180 | }; | |
181 | ||
182 | let tls_stream = match connector.connect(&configuration.remote_host, tcp_stream) { | |
183 | Ok(tls_stream) => tls_stream, | |
184 | Err(e) => { | |
185 | error!( | |
186 | "TLS handshake to {} failed: {}", | |
187 | configuration.remote_host, e | |
188 | ); | |
189 | return; | |
190 | } | |
191 | }; | |
192 | ||
193 | // The nonblocking needs to be set AFTER the TLS handshake is completed. | |
194 | // Otherwise the TLS handshake is interrupted. | |
195 | if let Err(e) = tls_stream.get_ref().set_nonblocking(true) { | |
196 | error!("Failed to set remote stream to nonblocking: {}", e); | |
197 | return; | |
198 | } | |
199 | ||
200 | let tls_stream = Arc::new(Mutex::new(tls_stream)); | |
201 | ||
202 | let client_stream_clone = match client_stream.try_clone() { | |
203 | Ok(s) => s, | |
204 | Err(e) => { | |
205 | error!("Failed to clone client stream: {}", e); | |
206 | return; | |
207 | } | |
208 | }; | |
209 | ||
210 | // Client to Server Thread | |
211 | let tls_stream_clone = Arc::clone(&tls_stream); | |
212 | let client_to_server = spawn(move || { | |
213 | debug!(">>> BEGIN"); | |
214 | let mut buffer = [0u8; 8192]; | |
215 | let mut client_reader = client_stream; | |
216 | loop { | |
768227f7 RBR |
217 | let bytes_read = match client_reader.read(&mut buffer) { |
218 | Ok(0) => break, | |
219 | Ok(n) => n, | |
220 | Err(ref e) if e.kind() == ErrorKind::WouldBlock => { | |
221 | sleep(Duration::from_millis(10)); | |
222 | continue; | |
223 | } | |
224 | Err(error) => { | |
225 | debug!(">>> Error reading buffer {error}"); | |
226 | break; | |
227 | } | |
228 | }; | |
dc3d6821 | 229 | |
573aaf2a RBR |
230 | let mut command = buffer[..bytes_read].to_vec(); |
231 | ||
b5234d6f | 232 | for middleware in CLIENT_MIDDLEWARE { |
573aaf2a RBR |
233 | command = middleware(&command); |
234 | } | |
235 | ||
236 | let debug_str = String::from_utf8_lossy(&command) | |
768227f7 RBR |
237 | .replace('\n', "\\n") |
238 | .replace('\r', "\\r") | |
239 | .replace('\t', "\\t"); | |
240 | debug!(">>> {}", debug_str); | |
241 | ||
242 | // Lock the TLS stream and write the data to server | |
243 | match tls_stream_clone.lock() { | |
244 | Ok(mut tls_guard) => { | |
b5234d6f | 245 | if let Err(error) = tls_guard.write_all(&command) { |
768227f7 RBR |
246 | debug!(">>> Error writing to server: {error}"); |
247 | break; | |
248 | } | |
249 | if let Err(error) = tls_guard.flush() { | |
250 | debug!(">>> Error flushing server connection: {error}"); | |
251 | break; | |
252 | } | |
253 | } | |
254 | Err(error) => { | |
255 | debug!(">>> Error acquiring TLS stream lock: {error}"); | |
dc3d6821 RBR |
256 | break; |
257 | } | |
768227f7 RBR |
258 | } |
259 | } | |
260 | }); | |
261 | ||
262 | // Server to Client Thread | |
263 | let tls_stream_clone = Arc::clone(&tls_stream); | |
264 | let server_to_client = spawn(move || { | |
265 | debug!("<<< BEGIN"); | |
266 | let mut buffer = [0u8; 8192]; | |
267 | let mut client_writer = client_stream_clone; | |
268 | loop { | |
768227f7 RBR |
269 | // Lock the TLS stream and read from the server |
270 | let bytes_read = match tls_stream_clone.lock() { | |
271 | Ok(mut tls_guard) => match tls_guard.read(&mut buffer) { | |
272 | Ok(0) => break, // TLS server closed | |
273 | Ok(n) => n, | |
274 | Err(ref e) if e.kind() == ErrorKind::WouldBlock => { | |
275 | sleep(Duration::from_millis(10)); | |
276 | continue; | |
277 | } | |
278 | Err(error) => { | |
279 | debug!("<<< Error reading buffer {error}"); | |
280 | break; | |
281 | } | |
282 | }, | |
283 | Err(error) => { | |
284 | debug!("<<< Error Cloning TLS {error}"); | |
dc3d6821 RBR |
285 | break; |
286 | } | |
768227f7 RBR |
287 | }; |
288 | ||
b5234d6f RBR |
289 | let mut command = buffer[..bytes_read].to_vec(); |
290 | ||
291 | for middleware in SERVER_MIDDLEWARE { | |
292 | command = middleware(&command); | |
293 | } | |
294 | ||
295 | let debug_str = String::from_utf8_lossy(&command) | |
768227f7 RBR |
296 | .replace('\n', "\\n") |
297 | .replace('\r', "\\r") | |
298 | .replace('\t', "\\t"); | |
299 | debug!("<<< {}", debug_str); | |
300 | ||
301 | // Write decrypted data to client | |
b5234d6f | 302 | if client_writer.write_all(&command).is_err() { |
768227f7 RBR |
303 | debug!("<<< ERR"); |
304 | break; | |
dc3d6821 | 305 | } |
768227f7 RBR |
306 | if client_writer.flush().is_err() { |
307 | debug!("<<< ERR"); | |
dc3d6821 RBR |
308 | break; |
309 | } | |
310 | } | |
768227f7 RBR |
311 | }); |
312 | ||
313 | // Wait for both directions to finish | |
314 | let _ = client_to_server.join(); | |
315 | let _ = server_to_client.join(); | |
dc3d6821 | 316 | } |