1use std::{collections::HashMap, sync::Arc};
2
3use tokio::{
4 sync::{Mutex as AsyncMutex, Semaphore},
5 time::{Duration, timeout},
6};
7
8use super::{
9 Health::HealthChecker,
10 Types::{ConnectionHandle, ConnectionStats},
11};
12use crate::dev_log;
13
14pub type ConnectionManager = ConnectionPool;
19
20pub struct ConnectionPool {
71 MaxConnections:usize,
73
74 ConnectionTimeout:Duration,
76
77 Semaphore:Arc<Semaphore>,
79
80 ActiveConnection:Arc<AsyncMutex<HashMap<String, ConnectionHandle>>>,
82
83 HealthChecker:Arc<AsyncMutex<HealthChecker>>,
85}
86
87impl ConnectionPool {
88 pub fn new(MaxConnections:usize, ConnectionTimeout:Duration) -> Self {
100 dev_log!(
101 "ipc",
102 "[ConnectionPool] Creating pool with max: {}, timeout: {:?}",
103 MaxConnections,
104 ConnectionTimeout
105 );
106
107 Self {
108 MaxConnections,
109 ConnectionTimeout,
110 Semaphore:Arc::new(Semaphore::new(MaxConnections)),
111 ActiveConnection:Arc::new(AsyncMutex::new(HashMap::new())),
112 HealthChecker:Arc::new(AsyncMutex::new(HealthChecker::new())),
113 }
114 }
115
116 pub fn default() -> Self { Self::new(10, Duration::from_secs(30)) }
120
121 pub async fn GetConnection(&self) -> Result<ConnectionHandle, String> {
137 dev_log!("ipc", "[ConnectionPool] Acquiring connection permit");
138
139 let permit = timeout(self.ConnectionTimeout, self.Semaphore.acquire())
141 .await
142 .map_err(|_| "Connection timeout - pool may be at capacity".to_string())?
143 .map_err(|e| format!("Failed to acquire connection permit: {}", e))?;
144
145 let handle = ConnectionHandle::new();
147
148 {
150 let mut connections = self.ActiveConnection.lock().await;
151 connections.insert(handle.id.clone(), handle.clone());
152 }
153
154 dev_log!(
155 "ipc",
156 "[ConnectionPool] Connection {} acquired (permit released on drop)",
157 handle.id
158 );
159
160 self.StartHealthMonitoring(&handle.id).await;
162
163 drop(permit);
165
166 Ok(handle)
167 }
168
169 pub async fn ReleaseConnection(&self, handle:ConnectionHandle) {
183 dev_log!("ipc", "[ConnectionPool] Releasing connection {}", handle.id);
184
185 {
186 let mut connections = self.ActiveConnection.lock().await;
187 connections.remove(&handle.id);
188 }
189
190 dev_log!("ipc", "[ConnectionPool] Connection {} released", handle.id);
191 }
192
193 pub async fn GetStats(&self) -> ConnectionStats {
209 let connections = self.ActiveConnection.lock().await;
210 let healthy_connections = connections.values().filter(|h| h.is_healthy()).count();
211
212 ConnectionStats {
213 total_connections:connections.len(),
214 healthy_connections,
215 max_connections:self.MaxConnections,
216 available_permits:self.Semaphore.available_permits(),
217 connection_timeout:self.ConnectionTimeout,
218 }
219 }
220
221 pub async fn CleanUpStaleConnections(&self) -> usize {
240 let mut connections = self.ActiveConnection.lock().await;
241 let now = std::time::SystemTime::now();
242 let stale_threshold = Duration::from_secs(300); let stale_ids:Vec<String> = connections
245 .iter()
246 .filter(|(_, handle)| {
247 let is_stale_by_time = match now.duration_since(handle.last_used) {
249 Ok(idle_time) => idle_time > stale_threshold,
250 Err(_) => true, };
252 is_stale_by_time || !handle.is_healthy()
253 })
254 .map(|(id, _)| id.clone())
255 .collect();
256
257 let stale_count = stale_ids.len();
258 for id in stale_ids {
259 dev_log!("ipc", "[ConnectionPool] Removing stale connection {}", id);
260 connections.remove(&id);
261 }
262
263 if stale_count > 0 {
264 dev_log!("ipc", "[ConnectionPool] Cleaned up {} stale connection(s)", stale_count);
265 }
266
267 stale_count
268 }
269
270 async fn StartHealthMonitoring(&self, connection_id:&str) {
278 let health_checker = self.HealthChecker.clone();
279 let active_connection = self.ActiveConnection.clone();
280 let connection_id = connection_id.to_string();
281
282 tokio::spawn(async move {
283 let mut interval = tokio::time::interval(Duration::from_secs(30));
284
285 loop {
286 interval.tick().await;
287
288 let checker = health_checker.lock().await;
289 let mut connections = match active_connection.try_lock() {
290 Ok(conns) => conns,
291 Err(_) => continue,
292 };
293
294 if let Some(handle) = connections.get_mut(&connection_id) {
295 let is_healthy = checker.check_connection_health(handle).await;
296 handle.update_health(is_healthy);
297
298 if !handle.is_healthy() {
299 dev_log!(
300 "ipc",
301 "[ConnectionPool] Connection {} marked as unhealthy (score: {:.1}, errors: {})",
302 handle.id,
303 handle.health_score,
304 handle.error_count
305 );
306 }
307 } else {
308 dev_log!(
310 "ipc",
311 "[ConnectionPool] Connection {} removed from pool, stopping health monitoring",
312 connection_id
313 );
314 break;
315 }
316 }
317 });
318 }
319
320 pub fn max_connections(&self) -> usize { self.MaxConnections }
322
323 pub fn connection_timeout(&self) -> Duration { self.ConnectionTimeout }
325
326 pub fn available_permits(&self) -> usize { self.Semaphore.available_permits() }
328
329 pub async fn active_connection(&self) -> usize {
331 let connections = self.ActiveConnection.lock().await;
332 connections.len()
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use super::*;
339
340 #[tokio::test]
341 async fn test_connection_pool_creation() {
342 let pool = ConnectionPool::new(10, Duration::from_secs(30));
343 assert_eq!(pool.max_connections(), 10);
344 assert_eq!(pool.connection_timeout(), Duration::from_secs(30));
345 assert_eq!(pool.available_permits(), 10);
346 assert_eq!(pool.active_connections().await, 0);
347 }
348
349 #[tokio::test]
350 async fn test_default_connection_pool() {
351 let pool = ConnectionPool::default();
352 assert_eq!(pool.max_connections(), 10);
353 assert_eq!(pool.connection_timeout(), Duration::from_secs(30));
354 }
355
356 #[tokio::test]
357 async fn test_get_and_release_connection() {
358 let pool = Arc::new(ConnectionPool::new(5, Duration::from_secs(5)));
359
360 let handle = pool.GetConnection().await.unwrap();
362 assert_eq!(pool.active_connections().await, 1);
363 assert_eq!(pool.available_permits(), 4); pool.ReleaseConnection(handle).await;
367 assert_eq!(pool.active_connections().await, 0);
368 assert_eq!(pool.available_permits(), 5); }
370
371 #[tokio::test]
372 async fn test_multiple_connections() {
373 let pool = Arc::new(ConnectionPool::new(3, Duration::from_secs(5)));
374
375 let mut handles = Vec::new();
377 for _ in 0..3 {
378 handles.push(pool.GetConnection().await.unwrap());
379 }
380
381 assert_eq!(pool.active_connections().await, 3);
382 assert_eq!(pool.available_permits(), 0);
383
384 let result = timeout(Duration::from_secs(1), pool.GetConnection()).await;
386 assert!(result.is_err()); pool.ReleaseConnection(handles[0].clone()).await;
390 assert_eq!(pool.available_permits(), 1);
391
392 let handle = pool.GetConnection().await.unwrap();
394 assert_eq!(pool.available_permits(), 0);
395
396 for handle in handles {
398 pool.ReleaseConnection(handle).await;
399 }
400 pool.ReleaseConnection(handle).await;
401 }
402
403 #[tokio::test]
404 async fn test_connection_stats() {
405 let pool = Arc::new(ConnectionPool::new(5, Duration::from_secs(30)));
406
407 let stats = pool.GetStats().await;
408 assert_eq!(stats.total_connections, 0);
409 assert_eq!(stats.healthy_connections, 0);
410 assert_eq!(stats.max_connections, 5);
411 assert_eq!(stats.utilization(), 0.0);
412
413 for _ in 0..3 {
415 let _ = pool.GetConnection().await.unwrap();
416 }
417
418 let stats = pool.GetStats().await;
419 assert_eq!(stats.total_connections, 3);
420 assert!(stats.healthy_connections > 0);
421 assert!(stats.utilization() > 0.0);
422 }
423
424 #[tokio::test]
425 async fn test_cleanup_stale_connections() {
426 let pool = Arc::new(ConnectionPool::new(5, Duration::from_secs(5)));
427
428 let mut handle = pool.GetConnection().await.unwrap();
430
431 unsafe {
433 let ptr = &mut handle as *mut ConnectionHandle;
434 (*ptr).last_used = std::time::SystemTime::now()
436 .checked_sub(Duration::from_secs(360))
437 .unwrap_or((*ptr).last_used);
438 (*ptr).health_score = 25.0; }
440
441 pool.ReleaseConnection(handle).await;
443
444 let cleaned = pool.CleanUpStaleConnections().await;
447 assert!(cleaned >= 0);
448 }
449
450 #[tokio::test]
451 async fn test_pool_utilization() {
452 let pool = Arc::new(ConnectionPool::new(10, Duration::from_secs(30)));
453
454 assert_eq!(pool.GetStats().await.utilization(), 0.0);
455
456 for _ in 0..5 {
458 let _ = pool.GetConnection().await.unwrap();
459 }
460
461 let stats = pool.GetStats().await;
462 assert_eq!(stats.utilization(), 50.0);
463 }
464}