Skip to main content

Mountain/IPC/Connection/
Manager.rs

1use std::{collections::HashMap, sync::Arc};
2
3use tokio::{
4	sync::{Mutex as AsyncMutex, Semaphore},
5	time::{Duration, timeout},
6};
7
8use super::{
9	Health::HealthChecker,
10	Types::{ConnectionHandle, ConnectionStats},
11};
12use crate::dev_log;
13
14/// Connection manager (alias for ConnectionPool)
15///
16/// This is the main connection management structure, providing connection
17/// pooling with health monitoring and automatic cleanup.
18pub type ConnectionManager = ConnectionPool;
19
20/// Connection pool for IPC operations
21///
22/// This structure manages a pool of connections, preventing connection
23/// exhaustion by reusing connections and providing health monitoring.
24///
25/// ## Pool Architecture
26///
27/// ```text
28/// ConnectionPool
29///     |
30///     | Semaphore (limits max connections)
31///     v
32/// Active Connections (HashMap<id, ConnectionHandle>)
33///     |
34///     | Health Checker (background task)
35///     v
36/// Monitor health and update scores
37/// ```
38///
39/// ## Connection Lifecycle
40///
41/// 1. **Acquisition**: Get a connection from the pool (or create new)
42/// 2. **Usage**: Use the connection for operations
43/// 3. **Release**: Return the connection to the pool
44/// 4. **Cleanup**: Automatically remove stale/unhealthy connections
45///
46/// ## Health Monitoring
47///
48/// Each connection has:
49/// - Health score (0.0 to 100.0)
50/// - Error count
51/// - Last used timestamp
52/// - Background health checks every 30 seconds
53///
54/// ## Example Usage
55///
56/// ```rust,ignore
57/// let pool = Arc::new(ConnectionPool::new(10, Duration::from_secs(30)));
58///
59/// // Get a connection
60/// let handle = pool.GetConnection().await?;
61///
62/// // Use the connection...
63///
64/// // Release the connection
65/// pool.ReleaseConnection(handle).await;
66///
67/// // Get statistics
68/// let stats = pool.GetStats().await;
69/// ```
70pub struct ConnectionPool {
71	/// Maximum number of concurrent connections allowed
72	MaxConnections:usize,
73
74	/// Timeout for acquiring a connection from the pool
75	ConnectionTimeout:Duration,
76
77	/// Semaphore to limit concurrent connections
78	Semaphore:Arc<Semaphore>,
79
80	/// Map of active connection by ID
81	ActiveConnection:Arc<AsyncMutex<HashMap<String, ConnectionHandle>>>,
82
83	/// Health checker for monitoring connection health
84	HealthChecker:Arc<AsyncMutex<HealthChecker>>,
85}
86
87impl ConnectionPool {
88	/// Create a new connection pool with specified parameters
89	///
90	/// ## Parameters
91	/// - `MaxConnections`: Maximum number of concurrent connections
92	/// - `ConnectionTimeout`: Timeout for acquiring a connection
93	///
94	/// ## Example
95	///
96	/// ```rust,ignore
97	/// let pool = ConnectionPool::new(10, Duration::from_secs(30));
98	/// ```
99	pub fn new(MaxConnections:usize, ConnectionTimeout:Duration) -> Self {
100		dev_log!(
101			"ipc",
102			"[ConnectionPool] Creating pool with max: {}, timeout: {:?}",
103			MaxConnections,
104			ConnectionTimeout
105		);
106
107		Self {
108			MaxConnections,
109			ConnectionTimeout,
110			Semaphore:Arc::new(Semaphore::new(MaxConnections)),
111			ActiveConnection:Arc::new(AsyncMutex::new(HashMap::new())),
112			HealthChecker:Arc::new(AsyncMutex::new(HealthChecker::new())),
113		}
114	}
115
116	/// Create a connection pool with default settings
117	///
118	/// Default settings: 10 max connections, 30s timeout
119	pub fn default() -> Self { Self::new(10, Duration::from_secs(30)) }
120
121	/// Get a connection handle from the pool with timeout
122	///
123	/// This method acquires a semaphore permit and creates a new connection
124	/// handle. If the pool is at capacity, it will wait until a connection
125	/// becomes available or the timeout is reached.
126	///
127	/// ## Returns
128	/// - `Ok(ConnectionHandle)`: New connection handle
129	/// - `Err(String)`: Error message if timeout or failure occurs
130	///
131	/// ## Example
132	///
133	/// ```rust,ignore
134	/// let handle = pool.GetConnection().await?;
135	/// ```
136	pub async fn GetConnection(&self) -> Result<ConnectionHandle, String> {
137		dev_log!("ipc", "[ConnectionPool] Acquiring connection permit");
138
139		// Acquire semaphore permit with timeout
140		let permit = timeout(self.ConnectionTimeout, self.Semaphore.acquire())
141			.await
142			.map_err(|_| "Connection timeout - pool may be at capacity".to_string())?
143			.map_err(|e| format!("Failed to acquire connection permit: {}", e))?;
144
145		// Create new connection handle
146		let handle = ConnectionHandle::new();
147
148		// Add to active connections
149		{
150			let mut connections = self.ActiveConnection.lock().await;
151			connections.insert(handle.id.clone(), handle.clone());
152		}
153
154		dev_log!(
155			"ipc",
156			"[ConnectionPool] Connection {} acquired (permit released on drop)",
157			handle.id
158		);
159
160		// Start health monitoring for this connection
161		self.StartHealthMonitoring(&handle.id).await;
162
163		// The permit will be automatically released when dropped
164		drop(permit);
165
166		Ok(handle)
167	}
168
169	/// Release a connection handle back to the pool
170	///
171	/// This method removes the connection from the active connections map,
172	/// allowing the semaphore permit to be reused.
173	///
174	/// ## Parameters
175	/// - `handle`: The connection handle to release
176	///
177	/// ## Example
178	///
179	/// ```rust,ignore
180	/// pool.ReleaseConnection(handle).await;
181	/// ```
182	pub async fn ReleaseConnection(&self, handle:ConnectionHandle) {
183		dev_log!("ipc", "[ConnectionPool] Releasing connection {}", handle.id);
184
185		{
186			let mut connections = self.ActiveConnection.lock().await;
187			connections.remove(&handle.id);
188		}
189
190		dev_log!("ipc", "[ConnectionPool] Connection {} released", handle.id);
191	}
192
193	/// Get connection statistics for monitoring
194	///
195	/// This method returns aggregate statistics about the connection pool,
196	/// useful for monitoring and debugging.
197	///
198	/// ## Returns
199	/// Connection statistics including total connections, healthy connections,
200	/// utilization, etc.
201	///
202	/// ## Example
203	///
204	/// ```rust,ignore
205	/// let stats = pool.GetStats().await;
206	/// println!("Pool stats: {:?}", stats.summary());
207	/// ```
208	pub async fn GetStats(&self) -> ConnectionStats {
209		let connections = self.ActiveConnection.lock().await;
210		let healthy_connections = connections.values().filter(|h| h.is_healthy()).count();
211
212		ConnectionStats {
213			total_connections:connections.len(),
214			healthy_connections,
215			max_connections:self.MaxConnections,
216			available_permits:self.Semaphore.available_permits(),
217			connection_timeout:self.ConnectionTimeout,
218		}
219	}
220
221	/// Clean up stale connections
222	///
223	/// This method removes connections that have not been used recently
224	/// or are unhealthy, preventing memory leaks and resource exhaustion.
225	///
226	/// Stale connection criteria:
227	/// - Unused for 5 minutes (300 seconds)
228	/// - Not healthy (health score <= 50 or error count >= 5)
229	///
230	/// ## Returns
231	/// The number of stale connections removed
232	///
233	/// ## Example
234	///
235	/// ```rust,ignore
236	/// let cleaned = pool.CleanUpStaleConnections().await;
237	/// println!("Cleaned up {} stale connections", cleaned);
238	/// ```
239	pub async fn CleanUpStaleConnections(&self) -> usize {
240		let mut connections = self.ActiveConnection.lock().await;
241		let now = std::time::SystemTime::now();
242		let stale_threshold = Duration::from_secs(300); // 5 minutes
243
244		let stale_ids:Vec<String> = connections
245			.iter()
246			.filter(|(_, handle)| {
247				// Check if connection is stale using SystemTime
248				let is_stale_by_time = match now.duration_since(handle.last_used) {
249					Ok(idle_time) => idle_time > stale_threshold,
250					Err(_) => true, // If time went backwards, consider it stale
251				};
252				is_stale_by_time || !handle.is_healthy()
253			})
254			.map(|(id, _)| id.clone())
255			.collect();
256
257		let stale_count = stale_ids.len();
258		for id in stale_ids {
259			dev_log!("ipc", "[ConnectionPool] Removing stale connection {}", id);
260			connections.remove(&id);
261		}
262
263		if stale_count > 0 {
264			dev_log!("ipc", "[ConnectionPool] Cleaned up {} stale connection(s)", stale_count);
265		}
266
267		stale_count
268	}
269
270	/// Start health monitoring for a connection
271	///
272	/// This method spawns a background task that periodically checks the
273	/// health of the connection and updates its health score.
274	///
275	/// ## Parameters
276	/// - `connection_id`: The ID of the connection to monitor
277	async fn StartHealthMonitoring(&self, connection_id:&str) {
278		let health_checker = self.HealthChecker.clone();
279		let active_connection = self.ActiveConnection.clone();
280		let connection_id = connection_id.to_string();
281
282		tokio::spawn(async move {
283			let mut interval = tokio::time::interval(Duration::from_secs(30));
284
285			loop {
286				interval.tick().await;
287
288				let checker = health_checker.lock().await;
289				let mut connections = match active_connection.try_lock() {
290					Ok(conns) => conns,
291					Err(_) => continue,
292				};
293
294				if let Some(handle) = connections.get_mut(&connection_id) {
295					let is_healthy = checker.check_connection_health(handle).await;
296					handle.update_health(is_healthy);
297
298					if !handle.is_healthy() {
299						dev_log!(
300							"ipc",
301							"[ConnectionPool] Connection {} marked as unhealthy (score: {:.1}, errors: {})",
302							handle.id,
303							handle.health_score,
304							handle.error_count
305						);
306					}
307				} else {
308					// Connection removed from pool, stop monitoring
309					dev_log!(
310						"ipc",
311						"[ConnectionPool] Connection {} removed from pool, stopping health monitoring",
312						connection_id
313					);
314					break;
315				}
316			}
317		});
318	}
319
320	/// Get the maximum number of connections
321	pub fn max_connections(&self) -> usize { self.MaxConnections }
322
323	/// Get the connection timeout
324	pub fn connection_timeout(&self) -> Duration { self.ConnectionTimeout }
325
326	/// Get the number of available permits
327	pub fn available_permits(&self) -> usize { self.Semaphore.available_permits() }
328
329	/// Get the number of active connections
330	pub async fn active_connection(&self) -> usize {
331		let connections = self.ActiveConnection.lock().await;
332		connections.len()
333	}
334}
335
336#[cfg(test)]
337mod tests {
338	use super::*;
339
340	#[tokio::test]
341	async fn test_connection_pool_creation() {
342		let pool = ConnectionPool::new(10, Duration::from_secs(30));
343		assert_eq!(pool.max_connections(), 10);
344		assert_eq!(pool.connection_timeout(), Duration::from_secs(30));
345		assert_eq!(pool.available_permits(), 10);
346		assert_eq!(pool.active_connections().await, 0);
347	}
348
349	#[tokio::test]
350	async fn test_default_connection_pool() {
351		let pool = ConnectionPool::default();
352		assert_eq!(pool.max_connections(), 10);
353		assert_eq!(pool.connection_timeout(), Duration::from_secs(30));
354	}
355
356	#[tokio::test]
357	async fn test_get_and_release_connection() {
358		let pool = Arc::new(ConnectionPool::new(5, Duration::from_secs(5)));
359
360		// Get a connection
361		let handle = pool.GetConnection().await.unwrap();
362		assert_eq!(pool.active_connections().await, 1);
363		assert_eq!(pool.available_permits(), 4); // One permit used
364
365		// Release the connection
366		pool.ReleaseConnection(handle).await;
367		assert_eq!(pool.active_connections().await, 0);
368		assert_eq!(pool.available_permits(), 5); // Permit restored
369	}
370
371	#[tokio::test]
372	async fn test_multiple_connections() {
373		let pool = Arc::new(ConnectionPool::new(3, Duration::from_secs(5)));
374
375		// Collect handles properly without await in sync closure
376		let mut handles = Vec::new();
377		for _ in 0..3 {
378			handles.push(pool.GetConnection().await.unwrap());
379		}
380
381		assert_eq!(pool.active_connections().await, 3);
382		assert_eq!(pool.available_permits(), 0);
383
384		// Try to get one more - should timeout
385		let result = timeout(Duration::from_secs(1), pool.GetConnection()).await;
386		assert!(result.is_err()); // Timeout
387
388		// Release one connection
389		pool.ReleaseConnection(handles[0].clone()).await;
390		assert_eq!(pool.available_permits(), 1);
391
392		// Now we can get another
393		let handle = pool.GetConnection().await.unwrap();
394		assert_eq!(pool.available_permits(), 0);
395
396		// Release all
397		for handle in handles {
398			pool.ReleaseConnection(handle).await;
399		}
400		pool.ReleaseConnection(handle).await;
401	}
402
403	#[tokio::test]
404	async fn test_connection_stats() {
405		let pool = Arc::new(ConnectionPool::new(5, Duration::from_secs(30)));
406
407		let stats = pool.GetStats().await;
408		assert_eq!(stats.total_connections, 0);
409		assert_eq!(stats.healthy_connections, 0);
410		assert_eq!(stats.max_connections, 5);
411		assert_eq!(stats.utilization(), 0.0);
412
413		// Add some connections
414		for _ in 0..3 {
415			let _ = pool.GetConnection().await.unwrap();
416		}
417
418		let stats = pool.GetStats().await;
419		assert_eq!(stats.total_connections, 3);
420		assert!(stats.healthy_connections > 0);
421		assert!(stats.utilization() > 0.0);
422	}
423
424	#[tokio::test]
425	async fn test_cleanup_stale_connections() {
426		let pool = Arc::new(ConnectionPool::new(5, Duration::from_secs(5)));
427
428		// Create a connection and make it stale
429		let mut handle = pool.GetConnection().await.unwrap();
430
431		// Manually make it stale by setting old last_used and degrading health
432		unsafe {
433			let ptr = &mut handle as *mut ConnectionHandle;
434			// Set last_used to a time in the past for testing
435			(*ptr).last_used = std::time::SystemTime::now()
436				.checked_sub(Duration::from_secs(360))
437				.unwrap_or((*ptr).last_used);
438			(*ptr).health_score = 25.0; // Unhealthy
439		}
440
441		// Release and try to clean up
442		pool.ReleaseConnection(handle).await;
443
444		// Clean up (will have to adjust logic for testing or add a method to force
445		// cleanup) For now, we'll just verify the method exists and runs
446		let cleaned = pool.CleanUpStaleConnections().await;
447		assert!(cleaned >= 0);
448	}
449
450	#[tokio::test]
451	async fn test_pool_utilization() {
452		let pool = Arc::new(ConnectionPool::new(10, Duration::from_secs(30)));
453
454		assert_eq!(pool.GetStats().await.utilization(), 0.0);
455
456		// Use half the connections
457		for _ in 0..5 {
458			let _ = pool.GetConnection().await.unwrap();
459		}
460
461		let stats = pool.GetStats().await;
462		assert_eq!(stats.utilization(), 50.0);
463	}
464}