+ Server {
+ running,
+ thread_handle: Some(thread_handle),
+ }
+ }
+
+ /// Signals this proxy to stop accepting new connections and waits
+ /// for all active connection threads to complete.
+ pub fn shutdown(&mut self) {
+ self.running.store(false, Ordering::SeqCst);
+ if let Some(handle) = self.thread_handle.take() {
+ let _ = handle.join();
+ }
+ }
+}
+
+/// The main loop that listens for incoming (plaintext) connections on
+/// `configuration.bind_address:configuration.local_port`.
+fn run_proxy(configuration: &Arc<Proxy>, running: &Arc<AtomicBool>) {
+ let listener = match TcpListener::bind(format!(
+ "{}:{}",
+ configuration.bind_address, configuration.local_port
+ )) {
+ Ok(l) => l,
+ Err(e) => {
+ error!("Failed to bind to port {}: {}", configuration.local_port, e);
+ return;
+ }
+ };
+ listener.set_nonblocking(true).unwrap();
+
+ info!(
+ "{} proxy listening on port {}:{}",
+ configuration.protocol, configuration.bind_address, configuration.local_port
+ );
+
+ // Keep track of active connections so we can join them on shutdown
+ let mut active_threads = vec![];
+
+ while running.load(Ordering::SeqCst) {
+ match listener.accept() {
+ Ok((stream, address)) => {
+ info!("New {} connection from {}", configuration.protocol, address);
+
+ let configuration_clone = Arc::clone(configuration);
+ let handle = spawn(move || {
+ handle_client(stream, &configuration_clone);