1use std::{
7 collections::HashMap,
8 sync::Arc,
9 time::{Duration, Instant},
10};
11
12use serde::{Deserialize, Serialize};
13use tokio::{
14 sync::{Mutex as AsyncMutex, Notify, RwLock, Semaphore},
15 time::{interval, timeout},
16};
17use uuid::Uuid;
18
19use crate::dev_log;
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct PoolConfig {
24 pub max_connections:usize,
25 pub min_connections:usize,
26 pub connection_timeout_ms:u64,
27 pub max_lifetime_ms:u64,
28 pub idle_timeout_ms:u64,
29 pub health_check_interval_ms:u64,
30}
31
32impl Default for PoolConfig {
33 fn default() -> Self {
34 Self {
35 max_connections:10,
36 min_connections:2,
37 connection_timeout_ms:30000,
39 max_lifetime_ms:300000,
41 idle_timeout_ms:60000,
43 health_check_interval_ms:30000,
45 }
46 }
47}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub enum ConnectionHealth {
52 Healthy,
53 Unhealthy,
54 Degraded,
55}
56
57#[derive(Debug, Clone)]
59pub struct ConnectionHandle {
60 pub id:String,
61 pub created_at:Instant,
62 pub last_used:Instant,
63 pub health_score:f64,
64 pub error_count:usize,
65 pub successful_operations:usize,
66 pub total_operations:usize,
67 pub is_active:bool,
68 pub reuse_count:u32,
69 pub health:ConnectionHealth,
70}
71
72impl ConnectionHandle {
73 pub fn new() -> Self {
75 Self {
76 id:Uuid::new_v4().to_string(),
77 created_at:Instant::now(),
78 last_used:Instant::now(),
79 health_score:100.0,
80 error_count:0,
81 successful_operations:0,
82 total_operations:0,
83 is_active:true,
84 reuse_count:0,
85 health:ConnectionHealth::Healthy,
86 }
87 }
88
89 pub fn update_health(&mut self, success:bool) {
91 self.last_used = Instant::now();
92 self.total_operations += 1;
93
94 if success {
95 self.successful_operations += 1;
96 self.health_score = (self.health_score + 2.0).min(100.0);
98 self.error_count = 0;
99 } else {
100 self.error_count += 1;
101 self.health_score = (self.health_score - 10.0).max(0.0);
103 }
104
105 let success_rate = if self.total_operations > 0 {
107 self.successful_operations as f64 / self.total_operations as f64
108 } else {
109 1.0
110 };
111
112 self.health_score = (self.health_score * 0.7 + success_rate * 100.0 * 0.3).max(0.0).min(100.0);
114 }
115
116 pub fn is_healthy(&self) -> bool {
118 self.health_score > 50.0 && self.error_count < 5 && self.is_active && self.age().as_secs() < 300
122 }
123
124 pub fn age(&self) -> Duration { self.created_at.elapsed() }
126
127 pub fn idle_time(&self) -> Duration { self.last_used.elapsed() }
129
130 pub fn success_rate(&self) -> f64 {
132 if self.total_operations == 0 {
133 1.0
134 } else {
135 self.successful_operations as f64 / self.total_operations as f64
136 }
137 }
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct PoolStats {
143 pub total_connections:usize,
144 pub active_connections:usize,
145 pub idle_connections:usize,
146 pub healthy_connections:usize,
147 pub max_connections:usize,
148 pub min_connections:usize,
149 pub wait_queue_size:usize,
150 pub average_wait_time_ms:f64,
151 pub total_operations:u64,
152 pub successful_operations:u64,
153 pub error_rate:f64,
154}
155
156pub struct ConnectionPool {
158 pub config:PoolConfig,
159 pub connections:Arc<AsyncMutex<HashMap<String, ConnectionHandle>>>,
160 pub semaphore:Arc<Semaphore>,
161 pub wait_queue:Arc<AsyncMutex<Vec<Arc<Notify>>>>,
162 pub stats:Arc<RwLock<PoolStats>>,
163 pub health_checker:Arc<AsyncMutex<ConnectionHealthChecker>>,
164 pub is_running:Arc<AsyncMutex<bool>>,
165}
166
167impl ConnectionPool {
168 pub fn new(config:PoolConfig) -> Self {
170 let max_connections = config.max_connections;
171 let min_connections = config.min_connections;
172
173 let pool = Self {
174 config:config.clone(),
175 connections:Arc::new(AsyncMutex::new(HashMap::new())),
176 semaphore:Arc::new(Semaphore::new(max_connections)),
177 wait_queue:Arc::new(AsyncMutex::new(Vec::new())),
178 stats:Arc::new(RwLock::new(PoolStats {
179 total_connections:0,
180 active_connections:0,
181 idle_connections:0,
182 healthy_connections:0,
183 max_connections,
184 min_connections,
185 wait_queue_size:0,
186 average_wait_time_ms:0.0,
187 total_operations:0,
188 successful_operations:0,
189 error_rate:0.0,
190 })),
191 health_checker:Arc::new(AsyncMutex::new(ConnectionHealthChecker::new())),
192 is_running:Arc::new(AsyncMutex::new(false)),
193 };
194
195 dev_log!("ipc", "[ConnectionPool] Created pool with max {} connections", max_connections);
196 pool
197 }
198
199 pub async fn start(&self) -> Result<(), String> {
201 {
202 let mut running = self.is_running.lock().await;
203 if *running {
204 return Ok(());
206 }
207 *running = true;
208 }
209
210 self.start_health_monitoring().await;
212
213 self.start_connection_cleanup().await;
215
216 self.initialize_min_connections().await;
218
219 dev_log!("ipc", "[ConnectionPool] Started connection pool");
220 Ok(())
221 }
222
223 pub async fn stop(&self) -> Result<(), String> {
225 {
226 let mut running = self.is_running.lock().await;
227 if !*running {
228 return Ok(());
230 }
231 *running = false;
232 }
233
234 {
236 let mut connections = self.connections.lock().await;
237 connections.clear();
238 }
239
240 {
242 let mut wait_queue = self.wait_queue.lock().await;
243 for notifier in wait_queue.drain(..) {
244 notifier.notify_one();
245 }
246 }
247
248 dev_log!("ipc", "[ConnectionPool] Stopped connection pool");
249 Ok(())
250 }
251
252 pub async fn get_connection(&self) -> Result<ConnectionHandle, String> {
254 let start_time = Instant::now();
255
256 let _permit = timeout(
258 Duration::from_millis(self.config.connection_timeout_ms),
259 self.semaphore.acquire(),
260 )
261 .await
262 .map_err(|_| "Connection timeout".to_string())?
263 .map_err(|e| format!("Failed to acquire connection: {}", e))?;
264
265 let wait_time = start_time.elapsed().as_millis() as f64;
266
267 {
269 let mut stats = self.stats.write().await;
270 stats.average_wait_time_ms = (stats.average_wait_time_ms * stats.total_operations as f64 + wait_time)
271 / (stats.total_operations as f64 + 1.0);
272 }
273
274 let connection = self.find_or_create_connection().await?;
276
277 {
279 let mut stats = self.stats.write().await;
280 stats.active_connections += 1;
281 stats.total_operations += 1;
282 }
283
284 dev_log!("ipc", "[ConnectionPool] Connection acquired: {}", connection.id);
285 Ok(connection)
286 }
287
288 pub async fn release_connection(&self, mut handle:ConnectionHandle) {
290 let connection_id = handle.id.clone();
291
292 handle.last_used = Instant::now();
293
294 {
296 let mut connections = self.connections.lock().await;
297 connections.insert(handle.id.clone(), handle.clone());
298 }
299
300 {
302 let mut stats = self.stats.write().await;
303 stats.active_connections = stats.active_connections.saturating_sub(1);
304 stats.idle_connections += 1;
305 }
306
307 drop(handle);
309
310 dev_log!("ipc", "[ConnectionPool] Connection released: {}", connection_id);
311 }
312
313 async fn find_or_create_connection(&self) -> Result<ConnectionHandle, String> {
315 let mut connections = self.connections.lock().await;
316
317 for (_id, handle) in connections.iter_mut() {
319 if handle.is_healthy() && handle.idle_time().as_millis() < self.config.idle_timeout_ms as u128 {
320 handle.last_used = Instant::now();
321 return Ok(handle.clone());
322 }
323 }
324
325 let new_handle = ConnectionHandle::new();
327 connections.insert(new_handle.id.clone(), new_handle.clone());
328
329 {
331 let mut stats = self.stats.write().await;
332 stats.total_connections += 1;
333 stats.healthy_connections += 1;
334 }
335
336 Ok(new_handle)
337 }
338
339 async fn start_health_monitoring(&self) {
341 let pool = Arc::new(self.clone());
342
343 tokio::spawn(async move {
344 let mut interval = interval(Duration::from_millis(pool.config.health_check_interval_ms));
345
346 while *pool.is_running.lock().await {
347 interval.tick().await;
348
349 if let Err(e) = pool.check_connection_health().await {
350 dev_log!("ipc", "error: [ConnectionPool] Health check failed: {}", e);
351 }
352 }
353 });
354 }
355
356 async fn start_connection_cleanup(&self) {
358 let pool = Arc::new(self.clone());
359
360 tokio::spawn(async move {
361 let mut interval = interval(Duration::from_secs(60));
363
364 while *pool.is_running.lock().await {
365 interval.tick().await;
366
367 let cleaned_count = pool.cleanup_stale_connections().await;
368 if cleaned_count > 0 {
369 dev_log!("ipc", "[ConnectionPool] Cleaned {} stale connections", cleaned_count);
370 }
371 }
372 });
373 }
374
375 async fn initialize_min_connections(&self) {
377 let current_count = self.connections.lock().await.len();
378
379 if current_count < self.config.min_connections {
380 let needed = self.config.min_connections - current_count;
381
382 for _ in 0..needed {
383 let handle = ConnectionHandle::new();
384 let mut connections = self.connections.lock().await;
385 connections.insert(handle.id.clone(), handle);
386 }
387
388 dev_log!("ipc", "[ConnectionPool] Initialized {} minimum connections", needed);
389 }
390 }
391
392 async fn check_connection_health(&self) -> Result<(), String> {
394 let mut connections = self.connections.lock().await;
395 let mut _health_checker = self.health_checker.lock().await;
396
397 let mut healthy_count = 0;
398
399 for (_id, handle) in connections.iter_mut() {
400 let is_healthy = _health_checker.check_connection_health(handle).await;
401 handle.update_health(is_healthy);
402
403 if handle.is_healthy() {
404 healthy_count += 1;
405 }
406 }
407
408 {
410 let mut stats = self.stats.write().await;
411 stats.healthy_connections = healthy_count;
412 stats.idle_connections = connections.len().saturating_sub(stats.active_connections);
413
414 if stats.total_operations > 0 {
415 stats.error_rate = 1.0 - (stats.successful_operations as f64 / stats.total_operations as f64);
416 }
417 }
418
419 Ok(())
420 }
421
422 async fn cleanup_stale_connections(&self) -> usize {
424 let mut connections = self.connections.lock().await;
425 let _now = Instant::now();
426
427 let stale_ids:Vec<String> = connections
428 .iter()
429 .filter(|(_, handle)| {
430 handle.age().as_millis() > self.config.max_lifetime_ms as u128
431 || handle.idle_time().as_millis() > self.config.idle_timeout_ms as u128
432 || !handle.is_healthy()
433 })
434 .map(|(id, _)| id.clone())
435 .collect();
436
437 for id in &stale_ids {
438 connections.remove(id);
439 }
440
441 {
443 let mut stats = self.stats.write().await;
444 stats.total_connections = connections.len();
445 stats.healthy_connections = connections.values().filter(|h| h.is_healthy()).count();
446 }
447
448 stale_ids.len()
449 }
450
451 pub async fn get_stats(&self) -> PoolStats { self.stats.read().await.clone() }
453
454 pub async fn get_active_count(&self) -> usize { self.stats.read().await.active_connections }
456
457 pub async fn get_healthy_count(&self) -> usize { self.stats.read().await.healthy_connections }
459
460 pub async fn is_running(&self) -> bool { *self.is_running.lock().await }
462}
463
464impl Clone for ConnectionPool {
465 fn clone(&self) -> Self {
466 Self {
467 config:self.config.clone(),
468 connections:self.connections.clone(),
469 semaphore:self.semaphore.clone(),
470 wait_queue:self.wait_queue.clone(),
471 stats:self.stats.clone(),
472 health_checker:self.health_checker.clone(),
473 is_running:self.is_running.clone(),
474 }
475 }
476}
477
478struct ConnectionHealthChecker {
480 ping_timeout:Duration,
481}
482
483impl ConnectionHealthChecker {
484 fn new() -> Self { Self { ping_timeout:Duration::from_secs(5) } }
485
486 async fn check_connection_health(&self, handle:&mut ConnectionHandle) -> bool {
488 let start_time = Instant::now();
491
492 tokio::time::sleep(Duration::from_millis(10)).await;
494
495 let response_time = start_time.elapsed();
496
497 response_time < self.ping_timeout
499 }
500}
501
502impl ConnectionPool {
504 pub fn default_pool() -> Self { Self::new(PoolConfig::default()) }
506
507 pub fn high_performance_pool() -> Self {
509 Self::new(PoolConfig {
510 max_connections:50,
511 min_connections:10,
512 connection_timeout_ms:10000,
513 max_lifetime_ms:180000,
515 idle_timeout_ms:30000,
516 health_check_interval_ms:15000,
518 })
519 }
520
521 pub fn conservative_pool() -> Self {
523 Self::new(PoolConfig {
524 max_connections:5,
525 min_connections:1,
526 connection_timeout_ms:60000,
527 max_lifetime_ms:600000,
529 idle_timeout_ms:120000,
530 health_check_interval_ms:60000,
532 })
533 }
534
535 pub fn calculate_optimal_pool_size() -> usize {
537 let num_cpus = num_cpus::get();
538 (num_cpus * 2).max(4).min(50)
541 }
542}
543
544#[cfg(test)]
545mod tests {
546 use tokio::time::sleep;
547
548 use super::*;
549
550 #[tokio::test]
551 async fn test_connection_pool_creation() {
552 let pool = ConnectionPool::default_pool();
553 assert_eq!(pool.config.max_connections, 10);
554 }
555
556 #[tokio::test]
557 async fn test_connection_handle_health() {
558 let mut handle = ConnectionHandle::new();
559 assert!(handle.is_healthy());
560
561 handle.update_health(true);
563 assert!(handle.is_healthy());
564 assert_eq!(handle.success_rate(), 1.0);
565
566 handle.update_health(false);
568 assert!(handle.is_healthy());
571 assert_eq!(handle.success_rate(), 0.5);
572 }
573
574 #[tokio::test]
575 async fn test_pool_lifecycle() {
576 let pool = ConnectionPool::default_pool();
577
578 pool.start().await.unwrap();
580 assert!(pool.is_running().await);
581
582 let handle = pool.get_connection().await.unwrap();
584 assert!(handle.is_healthy());
585
586 pool.release_connection(handle).await;
588
589 pool.stop().await.unwrap();
591 assert!(!pool.is_running().await);
592 }
593
594 #[tokio::test]
595 async fn test_pool_statistics() {
596 let pool = ConnectionPool::default_pool();
597 pool.start().await.unwrap();
598
599 let mut handles = Vec::new();
601 for _ in 0..3 {
602 handles.push(pool.get_connection().await.unwrap());
603 }
604
605 let stats = pool.get_stats().await;
606 assert_eq!(stats.active_connections, 3);
607
608 for handle in handles {
610 pool.release_connection(handle).await;
611 }
612
613 let stats_after = pool.get_stats().await;
614 assert_eq!(stats_after.active_connections, 0);
615 assert_eq!(stats_after.idle_connections, 3);
616
617 pool.stop().await.unwrap();
618 }
619
620 #[tokio::test]
621 async fn test_connection_cleanup() {
622 let pool = ConnectionPool::new(PoolConfig {
623 max_lifetime_ms:100,
625 idle_timeout_ms:50,
627 ..Default::default()
628 });
629
630 pool.start().await.unwrap();
631
632 let handle = pool.get_connection().await.unwrap();
634 pool.release_connection(handle).await;
635
636 sleep(Duration::from_millis(200)).await;
638
639 let cleaned_count = pool.cleanup_stale_connections().await;
640 assert!(cleaned_count > 0);
641
642 pool.stop().await.unwrap();
643 }
644
645 #[test]
646 fn test_optimal_pool_size_calculation() {
647 let optimal_size = ConnectionPool::calculate_optimal_pool_size();
648 assert!(optimal_size >= 4 && optimal_size <= 50);
649 }
650}