Skip to main content

Mountain/IPC/Enhanced/
ConnectionPool.rs

1//! # Connection Pooling and Multiplexing
2//!
3//! Advanced connection pooling for concurrent IPC operations with health
4//! monitoring and connection lifecycle management.
5
6use std::{
7	collections::HashMap,
8	sync::Arc,
9	time::{Duration, Instant},
10};
11
12use serde::{Deserialize, Serialize};
13use tokio::{
14	sync::{Mutex as AsyncMutex, Notify, RwLock, Semaphore},
15	time::{interval, timeout},
16};
17use uuid::Uuid;
18
19use crate::dev_log;
20
21/// Connection pool configuration
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct PoolConfig {
24	pub max_connections:usize,
25	pub min_connections:usize,
26	pub connection_timeout_ms:u64,
27	pub max_lifetime_ms:u64,
28	pub idle_timeout_ms:u64,
29	pub health_check_interval_ms:u64,
30}
31
32impl Default for PoolConfig {
33	fn default() -> Self {
34		Self {
35			max_connections:10,
36			min_connections:2,
37			// Connection timeout: 30 seconds to acquire a connection from the pool.
38			connection_timeout_ms:30000,
39			// Maximum lifetime: 5 minutes before a connection is retired.
40			max_lifetime_ms:300000,
41			// Idle timeout: 1 minute before an unused connection is closed.
42			idle_timeout_ms:60000,
43			// Health check interval: 30 seconds between connection health probes.
44			health_check_interval_ms:30000,
45		}
46	}
47}
48
49/// Connection health status
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub enum ConnectionHealth {
52	Healthy,
53	Unhealthy,
54	Degraded,
55}
56
57/// Connection handle with health monitoring
58#[derive(Debug, Clone)]
59pub struct ConnectionHandle {
60	pub id:String,
61	pub created_at:Instant,
62	pub last_used:Instant,
63	pub health_score:f64,
64	pub error_count:usize,
65	pub successful_operations:usize,
66	pub total_operations:usize,
67	pub is_active:bool,
68	pub reuse_count:u32,
69	pub health:ConnectionHealth,
70}
71
72impl ConnectionHandle {
73	/// Create a new connection handle
74	pub fn new() -> Self {
75		Self {
76			id:Uuid::new_v4().to_string(),
77			created_at:Instant::now(),
78			last_used:Instant::now(),
79			health_score:100.0,
80			error_count:0,
81			successful_operations:0,
82			total_operations:0,
83			is_active:true,
84			reuse_count:0,
85			health:ConnectionHealth::Healthy,
86		}
87	}
88
89	/// Update health based on operation success
90	pub fn update_health(&mut self, success:bool) {
91		self.last_used = Instant::now();
92		self.total_operations += 1;
93
94		if success {
95			self.successful_operations += 1;
96			// Increase health score gradually
97			self.health_score = (self.health_score + 2.0).min(100.0);
98			self.error_count = 0;
99		} else {
100			self.error_count += 1;
101			// Decrease health score more aggressively
102			self.health_score = (self.health_score - 10.0).max(0.0);
103		}
104
105		// Calculate success rate
106		let success_rate = if self.total_operations > 0 {
107			self.successful_operations as f64 / self.total_operations as f64
108		} else {
109			1.0
110		};
111
112		// Adjust health score based on overall success rate
113		self.health_score = (self.health_score * 0.7 + success_rate * 100.0 * 0.3).max(0.0).min(100.0);
114	}
115
116	/// Check if connection is healthy
117	pub fn is_healthy(&self) -> bool {
118		// Connection is considered healthy if: health score > 50%, fewer than 5 errors,
119		// actively flagged as healthy, and less than 5 minutes old (prevents stale
120		// connections).
121		self.health_score > 50.0 && self.error_count < 5 && self.is_active && self.age().as_secs() < 300
122	}
123
124	/// Get connection age
125	pub fn age(&self) -> Duration { self.created_at.elapsed() }
126
127	/// Get idle time
128	pub fn idle_time(&self) -> Duration { self.last_used.elapsed() }
129
130	/// Get success rate
131	pub fn success_rate(&self) -> f64 {
132		if self.total_operations == 0 {
133			1.0
134		} else {
135			self.successful_operations as f64 / self.total_operations as f64
136		}
137	}
138}
139
140/// Connection pool statistics
141#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct PoolStats {
143	pub total_connections:usize,
144	pub active_connections:usize,
145	pub idle_connections:usize,
146	pub healthy_connections:usize,
147	pub max_connections:usize,
148	pub min_connections:usize,
149	pub wait_queue_size:usize,
150	pub average_wait_time_ms:f64,
151	pub total_operations:u64,
152	pub successful_operations:u64,
153	pub error_rate:f64,
154}
155
156/// Connection pool with advanced management
157pub struct ConnectionPool {
158	pub config:PoolConfig,
159	pub connections:Arc<AsyncMutex<HashMap<String, ConnectionHandle>>>,
160	pub semaphore:Arc<Semaphore>,
161	pub wait_queue:Arc<AsyncMutex<Vec<Arc<Notify>>>>,
162	pub stats:Arc<RwLock<PoolStats>>,
163	pub health_checker:Arc<AsyncMutex<ConnectionHealthChecker>>,
164	pub is_running:Arc<AsyncMutex<bool>>,
165}
166
167impl ConnectionPool {
168	/// Create a new connection pool
169	pub fn new(config:PoolConfig) -> Self {
170		let max_connections = config.max_connections;
171		let min_connections = config.min_connections;
172
173		let pool = Self {
174			config:config.clone(),
175			connections:Arc::new(AsyncMutex::new(HashMap::new())),
176			semaphore:Arc::new(Semaphore::new(max_connections)),
177			wait_queue:Arc::new(AsyncMutex::new(Vec::new())),
178			stats:Arc::new(RwLock::new(PoolStats {
179				total_connections:0,
180				active_connections:0,
181				idle_connections:0,
182				healthy_connections:0,
183				max_connections,
184				min_connections,
185				wait_queue_size:0,
186				average_wait_time_ms:0.0,
187				total_operations:0,
188				successful_operations:0,
189				error_rate:0.0,
190			})),
191			health_checker:Arc::new(AsyncMutex::new(ConnectionHealthChecker::new())),
192			is_running:Arc::new(AsyncMutex::new(false)),
193		};
194
195		dev_log!("ipc", "[ConnectionPool] Created pool with max {} connections", max_connections);
196		pool
197	}
198
199	/// Start the connection pool
200	pub async fn start(&self) -> Result<(), String> {
201		{
202			let mut running = self.is_running.lock().await;
203			if *running {
204				// If the pool is already running, exit early to avoid duplicate startup.
205				return Ok(());
206			}
207			*running = true;
208		}
209
210		// Start health monitoring
211		self.start_health_monitoring().await;
212
213		// Start connection cleanup
214		self.start_connection_cleanup().await;
215
216		// Initialize minimum connections
217		self.initialize_min_connections().await;
218
219		dev_log!("ipc", "[ConnectionPool] Started connection pool");
220		Ok(())
221	}
222
223	/// Stop the connection pool
224	pub async fn stop(&self) -> Result<(), String> {
225		{
226			let mut running = self.is_running.lock().await;
227			if !*running {
228				// If the pool is already stopped, exit early to avoid redundant operations.
229				return Ok(());
230			}
231			*running = false;
232		}
233
234		// Clear all connections
235		{
236			let mut connections = self.connections.lock().await;
237			connections.clear();
238		}
239
240		// Notify all waiting tasks
241		{
242			let mut wait_queue = self.wait_queue.lock().await;
243			for notifier in wait_queue.drain(..) {
244				notifier.notify_one();
245			}
246		}
247
248		dev_log!("ipc", "[ConnectionPool] Stopped connection pool");
249		Ok(())
250	}
251
252	/// Get a connection from the pool
253	pub async fn get_connection(&self) -> Result<ConnectionHandle, String> {
254		let start_time = Instant::now();
255
256		// Try to acquire permit with timeout
257		let _permit = timeout(
258			Duration::from_millis(self.config.connection_timeout_ms),
259			self.semaphore.acquire(),
260		)
261		.await
262		.map_err(|_| "Connection timeout".to_string())?
263		.map_err(|e| format!("Failed to acquire connection: {}", e))?;
264
265		let wait_time = start_time.elapsed().as_millis() as f64;
266
267		// Update wait time statistics
268		{
269			let mut stats = self.stats.write().await;
270			stats.average_wait_time_ms = (stats.average_wait_time_ms * stats.total_operations as f64 + wait_time)
271				/ (stats.total_operations as f64 + 1.0);
272		}
273
274		// Find or create a healthy connection
275		let connection = self.find_or_create_connection().await?;
276
277		// Update statistics
278		{
279			let mut stats = self.stats.write().await;
280			stats.active_connections += 1;
281			stats.total_operations += 1;
282		}
283
284		dev_log!("ipc", "[ConnectionPool] Connection acquired: {}", connection.id);
285		Ok(connection)
286	}
287
288	/// Release a connection back to the pool
289	pub async fn release_connection(&self, mut handle:ConnectionHandle) {
290		let connection_id = handle.id.clone();
291
292		handle.last_used = Instant::now();
293
294		// Update connection in pool
295		{
296			let mut connections = self.connections.lock().await;
297			connections.insert(handle.id.clone(), handle.clone());
298		}
299
300		// Update statistics
301		{
302			let mut stats = self.stats.write().await;
303			stats.active_connections = stats.active_connections.saturating_sub(1);
304			stats.idle_connections += 1;
305		}
306
307		// Release the semaphore permit when the handle is dropped.
308		drop(handle);
309
310		dev_log!("ipc", "[ConnectionPool] Connection released: {}", connection_id);
311	}
312
313	/// Find or create a healthy connection
314	async fn find_or_create_connection(&self) -> Result<ConnectionHandle, String> {
315		let mut connections = self.connections.lock().await;
316
317		// Try to find a healthy connection
318		for (_id, handle) in connections.iter_mut() {
319			if handle.is_healthy() && handle.idle_time().as_millis() < self.config.idle_timeout_ms as u128 {
320				handle.last_used = Instant::now();
321				return Ok(handle.clone());
322			}
323		}
324
325		// No healthy connection found, create new one
326		let new_handle = ConnectionHandle::new();
327		connections.insert(new_handle.id.clone(), new_handle.clone());
328
329		// Update statistics
330		{
331			let mut stats = self.stats.write().await;
332			stats.total_connections += 1;
333			stats.healthy_connections += 1;
334		}
335
336		Ok(new_handle)
337	}
338
339	/// Start health monitoring
340	async fn start_health_monitoring(&self) {
341		let pool = Arc::new(self.clone());
342
343		tokio::spawn(async move {
344			let mut interval = interval(Duration::from_millis(pool.config.health_check_interval_ms));
345
346			while *pool.is_running.lock().await {
347				interval.tick().await;
348
349				if let Err(e) = pool.check_connection_health().await {
350					dev_log!("ipc", "error: [ConnectionPool] Health check failed: {}", e);
351				}
352			}
353		});
354	}
355
356	/// Start connection cleanup
357	async fn start_connection_cleanup(&self) {
358		let pool = Arc::new(self.clone());
359
360		tokio::spawn(async move {
361			// Check for stale connections every minute.
362			let mut interval = interval(Duration::from_secs(60));
363
364			while *pool.is_running.lock().await {
365				interval.tick().await;
366
367				let cleaned_count = pool.cleanup_stale_connections().await;
368				if cleaned_count > 0 {
369					dev_log!("ipc", "[ConnectionPool] Cleaned {} stale connections", cleaned_count);
370				}
371			}
372		});
373	}
374
375	/// Initialize minimum connections
376	async fn initialize_min_connections(&self) {
377		let current_count = self.connections.lock().await.len();
378
379		if current_count < self.config.min_connections {
380			let needed = self.config.min_connections - current_count;
381
382			for _ in 0..needed {
383				let handle = ConnectionHandle::new();
384				let mut connections = self.connections.lock().await;
385				connections.insert(handle.id.clone(), handle);
386			}
387
388			dev_log!("ipc", "[ConnectionPool] Initialized {} minimum connections", needed);
389		}
390	}
391
392	/// Check connection health
393	async fn check_connection_health(&self) -> Result<(), String> {
394		let mut connections = self.connections.lock().await;
395		let mut _health_checker = self.health_checker.lock().await;
396
397		let mut healthy_count = 0;
398
399		for (_id, handle) in connections.iter_mut() {
400			let is_healthy = _health_checker.check_connection_health(handle).await;
401			handle.update_health(is_healthy);
402
403			if handle.is_healthy() {
404				healthy_count += 1;
405			}
406		}
407
408		// Update statistics
409		{
410			let mut stats = self.stats.write().await;
411			stats.healthy_connections = healthy_count;
412			stats.idle_connections = connections.len().saturating_sub(stats.active_connections);
413
414			if stats.total_operations > 0 {
415				stats.error_rate = 1.0 - (stats.successful_operations as f64 / stats.total_operations as f64);
416			}
417		}
418
419		Ok(())
420	}
421
422	/// Cleanup stale connections
423	async fn cleanup_stale_connections(&self) -> usize {
424		let mut connections = self.connections.lock().await;
425		let _now = Instant::now();
426
427		let stale_ids:Vec<String> = connections
428			.iter()
429			.filter(|(_, handle)| {
430				handle.age().as_millis() > self.config.max_lifetime_ms as u128
431					|| handle.idle_time().as_millis() > self.config.idle_timeout_ms as u128
432					|| !handle.is_healthy()
433			})
434			.map(|(id, _)| id.clone())
435			.collect();
436
437		for id in &stale_ids {
438			connections.remove(id);
439		}
440
441		// Update statistics
442		{
443			let mut stats = self.stats.write().await;
444			stats.total_connections = connections.len();
445			stats.healthy_connections = connections.values().filter(|h| h.is_healthy()).count();
446		}
447
448		stale_ids.len()
449	}
450
451	/// Get pool statistics
452	pub async fn get_stats(&self) -> PoolStats { self.stats.read().await.clone() }
453
454	/// Get active connection count
455	pub async fn get_active_count(&self) -> usize { self.stats.read().await.active_connections }
456
457	/// Get healthy connection count
458	pub async fn get_healthy_count(&self) -> usize { self.stats.read().await.healthy_connections }
459
460	/// Check if pool is running
461	pub async fn is_running(&self) -> bool { *self.is_running.lock().await }
462}
463
464impl Clone for ConnectionPool {
465	fn clone(&self) -> Self {
466		Self {
467			config:self.config.clone(),
468			connections:self.connections.clone(),
469			semaphore:self.semaphore.clone(),
470			wait_queue:self.wait_queue.clone(),
471			stats:self.stats.clone(),
472			health_checker:self.health_checker.clone(),
473			is_running:self.is_running.clone(),
474		}
475	}
476}
477
478/// Connection health checker
479struct ConnectionHealthChecker {
480	ping_timeout:Duration,
481}
482
483impl ConnectionHealthChecker {
484	fn new() -> Self { Self { ping_timeout:Duration::from_secs(5) } }
485
486	/// Check connection health
487	async fn check_connection_health(&self, handle:&mut ConnectionHandle) -> bool {
488		// Simulate health check by ensuring connection can handle basic operations
489		// In a real implementation, this would send an actual ping message
490		let start_time = Instant::now();
491
492		// Simulate network latency
493		tokio::time::sleep(Duration::from_millis(10)).await;
494
495		let response_time = start_time.elapsed();
496
497		// Connection is healthy if response time is reasonable
498		response_time < self.ping_timeout
499	}
500}
501
502/// Utility functions for connection pooling
503impl ConnectionPool {
504	/// Create a connection pool with default configuration
505	pub fn default_pool() -> Self { Self::new(PoolConfig::default()) }
506
507	/// Create a high-performance pool
508	pub fn high_performance_pool() -> Self {
509		Self::new(PoolConfig {
510			max_connections:50,
511			min_connections:10,
512			connection_timeout_ms:10000,
513			// Shorter lifetimes for high-performance mode: 3 minutes max, 30 seconds idle.
514			max_lifetime_ms:180000,
515			idle_timeout_ms:30000,
516			// More frequent health checks: every 15 seconds.
517			health_check_interval_ms:15000,
518		})
519	}
520
521	/// Create a conservative pool
522	pub fn conservative_pool() -> Self {
523		Self::new(PoolConfig {
524			max_connections:5,
525			min_connections:1,
526			connection_timeout_ms:60000,
527			// Longer lifetimes for conservative mode: 10 minutes max, 2 minutes idle.
528			max_lifetime_ms:600000,
529			idle_timeout_ms:120000,
530			// Less frequent health checks: every 60 seconds.
531			health_check_interval_ms:60000,
532		})
533	}
534
535	/// Calculate optimal pool size based on system resources
536	pub fn calculate_optimal_pool_size() -> usize {
537		let num_cpus = num_cpus::get();
538		// Calculate optimal pool size using 2x CPU count as a starting point,
539		// with minimum of 4 and maximum of 50 connections.
540		(num_cpus * 2).max(4).min(50)
541	}
542}
543
544#[cfg(test)]
545mod tests {
546	use tokio::time::sleep;
547
548	use super::*;
549
550	#[tokio::test]
551	async fn test_connection_pool_creation() {
552		let pool = ConnectionPool::default_pool();
553		assert_eq!(pool.config.max_connections, 10);
554	}
555
556	#[tokio::test]
557	async fn test_connection_handle_health() {
558		let mut handle = ConnectionHandle::new();
559		assert!(handle.is_healthy());
560
561		// Test successful operation
562		handle.update_health(true);
563		assert!(handle.is_healthy());
564		assert_eq!(handle.success_rate(), 1.0);
565
566		// Test failed operation
567		handle.update_health(false);
568		// The connection should still be healthy after one failure (health score drops
569		// but stays above 50).
570		assert!(handle.is_healthy());
571		assert_eq!(handle.success_rate(), 0.5);
572	}
573
574	#[tokio::test]
575	async fn test_pool_lifecycle() {
576		let pool = ConnectionPool::default_pool();
577
578		// Start pool
579		pool.start().await.unwrap();
580		assert!(pool.is_running().await);
581
582		// Get connection
583		let handle = pool.get_connection().await.unwrap();
584		assert!(handle.is_healthy());
585
586		// Release connection
587		pool.release_connection(handle).await;
588
589		// Stop pool
590		pool.stop().await.unwrap();
591		assert!(!pool.is_running().await);
592	}
593
594	#[tokio::test]
595	async fn test_pool_statistics() {
596		let pool = ConnectionPool::default_pool();
597		pool.start().await.unwrap();
598
599		// Get some connections without await in sync closure
600		let mut handles = Vec::new();
601		for _ in 0..3 {
602			handles.push(pool.get_connection().await.unwrap());
603		}
604
605		let stats = pool.get_stats().await;
606		assert_eq!(stats.active_connections, 3);
607
608		// Release connections
609		for handle in handles {
610			pool.release_connection(handle).await;
611		}
612
613		let stats_after = pool.get_stats().await;
614		assert_eq!(stats_after.active_connections, 0);
615		assert_eq!(stats_after.idle_connections, 3);
616
617		pool.stop().await.unwrap();
618	}
619
620	#[tokio::test]
621	async fn test_connection_cleanup() {
622		let pool = ConnectionPool::new(PoolConfig {
623			// Very short lifetime (100ms) for testing cleanup behavior.
624			max_lifetime_ms:100,
625			// Very short idle timeout (50ms) for testing.
626			idle_timeout_ms:50,
627			..Default::default()
628		});
629
630		pool.start().await.unwrap();
631
632		// Get and release connection
633		let handle = pool.get_connection().await.unwrap();
634		pool.release_connection(handle).await;
635
636		// Wait for cleanup
637		sleep(Duration::from_millis(200)).await;
638
639		let cleaned_count = pool.cleanup_stale_connections().await;
640		assert!(cleaned_count > 0);
641
642		pool.stop().await.unwrap();
643	}
644
645	#[test]
646	fn test_optimal_pool_size_calculation() {
647		let optimal_size = ConnectionPool::calculate_optimal_pool_size();
648		assert!(optimal_size >= 4 && optimal_size <= 50);
649	}
650}