Skip to main content

AirLibrary/Resilience/
mod.rs

1//! # Resilience Patterns Module
2//!
3//! Provides robust resilience patterns for external service calls:
4//! - Exponential backoff retry logic with jitter
5//! - Circuit breaker pattern for fault isolation
6//! - Bulkhead pattern for resource isolation
7//! - Timeout management with cascading deadlines
8//!
9//! ## Responsibilities
10//!
11//! ### Retry Patterns
12//! - Exponential backoff with jitter for distributed systems
13//! - Adaptive retry policies based on error classification
14//! - Retry budget management for service rate limiting
15//! - Panic recovery for background retry tasks
16//!
17//! ### Circuit Breaker
18//! - Automatic fault detection and isolation
19//! - State consistency validation across transitions
20//! - Event publishing for telemetry integration
21//! - Half-open state monitoring for recovery testing
22//!
23//! ### Bulkhead Pattern
24//! - Concurrent request limiting for resource protection
25//! - Queue management with overflow protection
26//! - Load monitoring and metrics collection
27//! - Timeout validation for all operations
28//!
29//! ### Timeout Management
30//! - Cascading deadline propagation
31//! - Global deadline coordination
32//! - Operation timeout enforcement
33//! - Panic-safe timeout cancellation
34//!
35//! ## Integration with Mountain
36//!
37//! Resilience patterns directly support Mountain's stability by:
38//! - preventing cascading failures through circuit breaker isolation
39//! - managing load through bulkhead resource limits
40//! - providing event publishing for Mountain's telemetry dashboard
41//! - enabling adaptive retry behavior for improved service availability
42//!
43//! ## VSCode Stability References
44//!
45//! Similar patterns used in VSCode for:
46//! - External service resilience (telemetry, updates, extensions)
47//! - Editor process isolation and recovery
48//! - Background task fault tolerance
49//!
50//! Reference:
51//! vs/base/common/errors
52//!
53//! # FUTURE Enhancements
54//!
55//! - [DISTRIBUTED TRACING] Integrate with Tracing module for retry/circuit span
56//! correlation
57//! - [CUSTOM METRICS] Add detailed bulkhead load metrics to Metrics module
58//! - [EVENT PUBLISHING] Extend circuit breaker events with OpenTelemetry
59//! support
60//! - [ADAPTIVE POLICIES] Enhance retry policies with machine learning-based
61//! error prediction
62//! - [METRICS INTEGRATION] Export resilience metrics to Mountain's telemetry UI
63//! ## Sensitive Data Handling
64//!
65//! This module does not process sensitive data directly but should:
66//! - Redact error messages before logging/event publishing
67//! - Avoid including request payloads in resilience events
68//! - Sanitize service names before publishing to telemetry
69
70use std::{
71	collections::HashMap,
72	sync::Arc,
73	time::{Duration, Instant},
74};
75
76use tokio::sync::{Mutex, RwLock, broadcast};
77use serde::{Deserialize, Serialize};
78
79use crate::dev_log;
80
81/// Error classification for adaptive retry policies
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
83pub enum ErrorClass {
84	/// Transient errors (network timeouts, temporary failures)
85	Transient,
86
87	/// Non-retryable errors (authentication, invalid requests)
88	NonRetryable,
89
90	/// Rate limit errors (429 Too Many Requests)
91	RateLimited,
92
93	/// Server errors (500-599)
94	ServerError,
95
96	/// Unknown error classification
97	Unknown,
98}
99
100/// Retry policy configuration
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct RetryPolicy {
103	/// Maximum number of retry attempts
104	pub MaxRetries:u32,
105
106	/// Initial retry interval (in milliseconds)
107	pub InitialIntervalMs:u64,
108
109	/// Maximum retry interval (in milliseconds)
110	pub MaxIntervalMs:u64,
111
112	/// Exponential backoff multiplier
113	pub BackoffMultiplier:f64,
114
115	/// Jitter percentage (0-1)
116	pub JitterFactor:f64,
117
118	/// Retry budget per service (max retries per minute)
119	pub BudgetPerMinute:u32,
120
121	/// Adaptive error classification for intelligent retry behavior
122	pub ErrorClassification:HashMap<String, ErrorClass>,
123}
124
125impl Default for RetryPolicy {
126	fn default() -> Self {
127		let mut ErrorClassification = HashMap::new();
128
129		// Default error classifications
130		ErrorClassification.insert("timeout".to_string(), ErrorClass::Transient);
131
132		ErrorClassification.insert("connection_refused".to_string(), ErrorClass::Transient);
133
134		ErrorClassification.insert("connection_reset".to_string(), ErrorClass::Transient);
135
136		ErrorClassification.insert("rate_limit_exceeded".to_string(), ErrorClass::RateLimited);
137
138		ErrorClassification.insert("authentication_failed".to_string(), ErrorClass::NonRetryable);
139
140		ErrorClassification.insert("unauthorized".to_string(), ErrorClass::NonRetryable);
141
142		ErrorClassification.insert("not_found".to_string(), ErrorClass::NonRetryable);
143
144		ErrorClassification.insert("server_error".to_string(), ErrorClass::ServerError);
145
146		ErrorClassification.insert("internal_server_error".to_string(), ErrorClass::ServerError);
147
148		ErrorClassification.insert("service_unavailable".to_string(), ErrorClass::ServerError);
149
150		ErrorClassification.insert("gateway_timeout".to_string(), ErrorClass::Transient);
151
152		Self {
153			MaxRetries:3,
154
155			InitialIntervalMs:1000,
156
157			MaxIntervalMs:32000,
158
159			BackoffMultiplier:2.0,
160
161			JitterFactor:0.1,
162
163			BudgetPerMinute:100,
164
165			ErrorClassification,
166		}
167	}
168}
169
170/// Retry budget tracker
171#[derive(Debug, Clone)]
172struct RetryBudget {
173	Attempts:Vec<Instant>,
174
175	MaxPerMinute:u32,
176}
177
178impl RetryBudget {
179	fn new(MaxPerMinute:u32) -> Self { Self { Attempts:Vec::new(), MaxPerMinute } }
180
181	fn can_retry(&mut self) -> bool {
182		let Now = Instant::now();
183
184		let OneMinuteAgo = Now - Duration::from_secs(60);
185
186		// Remove attempts older than 1 minute
187		self.Attempts.retain(|&attempt| attempt > OneMinuteAgo);
188
189		if self.Attempts.len() < self.MaxPerMinute as usize {
190			self.Attempts.push(Now);
191
192			true
193		} else {
194			false
195		}
196	}
197}
198
199/// Retry manager with budget tracking and adaptive policies
200pub struct RetryManager {
201	Policy:RetryPolicy,
202
203	Budgets:Arc<Mutex<HashMap<String, RetryBudget>>>,
204
205	EventTx:Arc<broadcast::Sender<RetryEvent>>,
206}
207
208/// Events published by retry operations for metrics and telemetry integration
209#[derive(Debug, Clone, Serialize, Deserialize)]
210pub struct RetryEvent {
211	pub Service:String,
212
213	pub Attempt:u32,
214
215	pub ErrorClass:ErrorClass,
216
217	pub DelayMs:u64,
218
219	pub Success:bool,
220
221	pub ErrorMessage:Option<String>,
222}
223
224impl RetryManager {
225	/// Create a new retry manager
226	pub fn new(policy:RetryPolicy) -> Self {
227		let (EventTx, _) = broadcast::channel(1000);
228
229		Self {
230			Policy:policy,
231
232			Budgets:Arc::new(Mutex::new(HashMap::new())),
233
234			EventTx:Arc::new(EventTx),
235		}
236	}
237
238	/// Get the retry event transmitter for subscription
239	pub fn GetEventTransmitter(&self) -> broadcast::Sender<RetryEvent> { (*self.EventTx).clone() }
240
241	/// Calculate next retry delay with exponential backoff and jitter
242	pub fn CalculateRetryDelay(&self, Attempt:u32) -> Duration {
243		if Attempt == 0 {
244			return Duration::from_millis(0);
245		}
246
247		let BaseDelay = (self.Policy.InitialIntervalMs as f64 * self.Policy.BackoffMultiplier.powi(Attempt as i32 - 1))
248			.min(self.Policy.MaxIntervalMs as f64) as u64;
249
250		// Add jitter
251		let Jitter = (BaseDelay as f64 * self.Policy.JitterFactor) as u64;
252
253		let RandomJitter = (rand::random::<f64>() * Jitter as f64) as u64;
254
255		let FinalDelay = BaseDelay + RandomJitter;
256
257		Duration::from_millis(FinalDelay)
258	}
259
260	/// Calculate adaptive retry delay based on error classification
261	pub fn CalculateAdaptiveRetryDelay(&self, ErrorType:&str, attempt:u32) -> Duration {
262		let ErrorClass = self
263			.Policy
264			.ErrorClassification
265			.get(ErrorType)
266			.copied()
267			.unwrap_or(ErrorClass::Unknown);
268
269		match ErrorClass {
270			ErrorClass::RateLimited => {
271				// Longer delays with linear backoff for rate limits
272				// 5s, 10s, 15s...
273				let delay = (attempt + 1) * 5000;
274
275				Duration::from_millis(delay as u64)
276			},
277
278			ErrorClass::ServerError => {
279				// Aggressive backoff for server errors
280				let BaseDelay = self.Policy.InitialIntervalMs * 2_u64.pow(attempt);
281
282				Duration::from_millis(BaseDelay.min(self.Policy.MaxIntervalMs))
283			},
284
285			ErrorClass::Transient => {
286				// Standard exponential backoff
287				self.CalculateRetryDelay(attempt)
288			},
289
290			ErrorClass::NonRetryable | ErrorClass::Unknown => {
291				// Minimal delay for non-retryable errors (should fail quickly)
292				Duration::from_millis(100)
293			},
294		}
295	}
296
297	/// Classify an error for adaptive retry behavior
298	pub fn ClassifyError(&self, ErrorMessage:&str) -> ErrorClass {
299		let ErrorLower = ErrorMessage.to_lowercase();
300
301		for (pattern, class) in &self.Policy.ErrorClassification {
302			if ErrorLower.contains(pattern) {
303				return *class;
304			}
305		}
306
307		ErrorClass::Unknown
308	}
309
310	/// Check if retry is possible within budget
311	/// Validates budget state before allowing retry
312	pub async fn CanRetry(&self, service:&str) -> bool {
313		let mut budgets = self.Budgets.lock().await;
314
315		let budget = budgets
316			.entry(service.to_string())
317			.or_insert_with(|| RetryBudget::new(self.Policy.BudgetPerMinute));
318
319		budget.can_retry()
320	}
321
322	/// Publish a retry event for telemetry integration
323	pub fn PublishRetryEvent(&self, event:RetryEvent) { let _ = self.EventTx.send(event); }
324
325	/// Validate retry policy configuration
326	pub fn ValidatePolicy(&self) -> Result<(), String> {
327		if self.Policy.MaxRetries == 0 {
328			return Err("MaxRetries must be greater than 0".to_string());
329		}
330
331		if self.Policy.InitialIntervalMs == 0 {
332			return Err("InitialIntervalMs must be greater than 0".to_string());
333		}
334
335		if self.Policy.InitialIntervalMs > self.Policy.MaxIntervalMs {
336			return Err("InitialIntervalMs cannot be greater than MaxIntervalMs".to_string());
337		}
338
339		if self.Policy.BackoffMultiplier <= 1.0 {
340			return Err("BackoffMultiplier must be greater than 1.0".to_string());
341		}
342
343		if self.Policy.JitterFactor < 0.0 || self.Policy.JitterFactor > 1.0 {
344			return Err("JitterFactor must be between 0 and 1".to_string());
345		}
346
347		if self.Policy.BudgetPerMinute == 0 {
348			return Err("BudgetPerMinute must be greater than 0".to_string());
349		}
350
351		Ok(())
352	}
353}
354
355/// Circuit breaker states
356#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
357pub enum CircuitState {
358	/// Circuit is closed (normal operation)
359	Closed,
360
361	/// Circuit is open (failing fast)
362	Open,
363
364	/// Circuit is half-open (testing recovery)
365	HalfOpen,
366}
367
368/// Circuit breaker configuration
369#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
370pub struct CircuitBreakerConfig {
371	/// Failure threshold before tripping
372	pub FailureThreshold:u32,
373
374	/// Success threshold before closing
375	pub SuccessThreshold:u32,
376
377	/// Timeout before attempting recovery (in seconds)
378	pub TimeoutSecs:u64,
379}
380
381impl Default for CircuitBreakerConfig {
382	fn default() -> Self { Self { FailureThreshold:5, SuccessThreshold:2, TimeoutSecs:60 } }
383}
384
385/// Circuit breaker events for metrics and telemetry integration
386#[derive(Debug, Clone, Serialize, Deserialize)]
387pub struct CircuitEvent {
388	pub name:String,
389
390	pub FromState:CircuitState,
391
392	pub ToState:CircuitState,
393
394	pub timestamp:u64,
395
396	pub reason:String,
397}
398
399/// Circuit breaker for fault isolation with state consistency validation and
400/// event publishing
401pub struct CircuitBreaker {
402	Name:String,
403
404	State:Arc<RwLock<CircuitState>>,
405
406	Config:CircuitBreakerConfig,
407
408	FailureCount:Arc<RwLock<u32>>,
409
410	SuccessCount:Arc<RwLock<u32>>,
411
412	LastFailureTime:Arc<RwLock<Option<Instant>>>,
413
414	EventTx:Arc<broadcast::Sender<CircuitEvent>>,
415
416	StateTransitionCounter:Arc<RwLock<u32>>,
417}
418
419impl CircuitBreaker {
420	/// Create a new circuit breaker with event publishing
421	pub fn new(name:String, Config:CircuitBreakerConfig) -> Self {
422		let (EventTx, _) = broadcast::channel(1000);
423
424		Self {
425			Name:name.clone(),
426
427			State:Arc::new(RwLock::new(CircuitState::Closed)),
428
429			Config,
430
431			FailureCount:Arc::new(RwLock::new(0)),
432
433			SuccessCount:Arc::new(RwLock::new(0)),
434
435			LastFailureTime:Arc::new(RwLock::new(None)),
436
437			EventTx:Arc::new(EventTx),
438
439			StateTransitionCounter:Arc::new(RwLock::new(0)),
440		}
441	}
442
443	/// Get the circuit breaker event transmitter for subscription
444	pub fn GetEventTransmitter(&self) -> broadcast::Sender<CircuitEvent> { (*self.EventTx).clone() }
445
446	/// Get current state with panic recovery
447	pub async fn GetState(&self) -> CircuitState { *self.State.read().await }
448
449	/// Validate state consistency across all counters
450	pub async fn ValidateState(&self) -> Result<(), String> {
451		let state = *self.State.read().await;
452
453		let failures = *self.FailureCount.read().await;
454
455		let successes = *self.SuccessCount.read().await;
456
457		match state {
458			CircuitState::Closed => {
459				if successes != 0 {
460					return Err(format!("Inconsistent state: Closed but has {} successes", successes));
461				}
462
463				if failures >= self.Config.FailureThreshold {
464					dev_log!(
465						"resilience",
466						"warn: [CircuitBreaker] State inconsistency: Closed but failure count ({}) >= threshold ({})",
467						failures,
468						self.Config.FailureThreshold
469					);
470				}
471			},
472
473			CircuitState::Open => {
474				if failures < self.Config.FailureThreshold {
475					dev_log!(
476						"resilience",
477						"warn: [CircuitBreaker] State inconsistency: Open but failure count ({}) < threshold ({})",
478						failures,
479						self.Config.FailureThreshold
480					);
481				}
482			},
483
484			CircuitState::HalfOpen => {
485				if successes >= self.Config.SuccessThreshold {
486					return Err(format!(
487						"Inconsistent state: HalfOpen but has {} successes (should be Closed)",
488						successes
489					));
490				}
491			},
492		}
493
494		Ok(())
495	}
496
497	/// Transition state with validation and event publishing
498	async fn TransitionState(&self, NewState:CircuitState, reason:&str) -> Result<(), String> {
499		let CurrentState = self.GetState().await;
500
501		if CurrentState == NewState {
502			// No transition needed
503			return Ok(());
504		}
505
506		// Validate the proposed transition
507		match (CurrentState, NewState) {
508			(CircuitState::Closed, CircuitState::Open) | (CircuitState::HalfOpen, CircuitState::Open) => {
509				// Valid transitions
510			},
511
512			(CircuitState::Open, CircuitState::HalfOpen) => {
513				// Valid transition through recovery
514			},
515
516			(CircuitState::HalfOpen, CircuitState::Closed) => {
517				// Valid recovery transition
518			},
519
520			_ => {
521				return Err(format!(
522					"Invalid state transition from {:?} to {:?} for {}",
523					CurrentState, NewState, self.Name
524				));
525			},
526		}
527
528		// Publish state transition event
529		let event = CircuitEvent {
530			name:self.Name.clone(),
531
532			FromState:CurrentState,
533
534			ToState:NewState,
535
536			timestamp:crate::Utility::CurrentTimestamp(),
537
538			reason:reason.to_string(),
539		};
540
541		let _ = self.EventTx.send(event);
542
543		// Transition state
544		*self.State.write().await = NewState;
545
546		// Increment transition counter
547		*self.StateTransitionCounter.write().await += 1;
548
549		dev_log!(
550			"resilience",
551			"[CircuitBreaker] State transition for {}: {:?} -> {:?} (reason: {})",
552			self.Name,
553			CurrentState,
554			NewState,
555			reason
556		);
557
558		// Validate new state consistency
559		self.ValidateState().await.map_err(|e| {
560			dev_log!(
561				"resilience",
562				"error: [CircuitBreaker] State validation failed after transition: {}",
563				e
564			);
565			e
566		})?;
567
568		Ok(())
569	}
570
571	/// Record a successful call with panic recovery
572	pub async fn RecordSuccess(&self) {
573		let state = self.GetState().await;
574
575		match state {
576			CircuitState::Closed => {
577				// Reset counters
578				*self.FailureCount.write().await = 0;
579			},
580
581			CircuitState::HalfOpen => {
582				// Increment success count
583				let mut SuccessCount = self.SuccessCount.write().await;
584
585				*SuccessCount += 1;
586
587				if *SuccessCount >= self.Config.SuccessThreshold {
588					// Close the circuit
589					let _ = self.TransitionState(CircuitState::Closed, "Success threshold reached").await;
590
591					*self.FailureCount.write().await = 0;
592
593					*self.SuccessCount.write().await = 0;
594				}
595			},
596
597			_ => {},
598		}
599	}
600
601	/// Record a failed call with panic recovery
602	pub async fn RecordFailure(&self) {
603		let State = self.GetState().await;
604
605		*self.LastFailureTime.write().await = Some(Instant::now());
606
607		match State {
608			CircuitState::Closed => {
609				// Increment failure count
610				let mut FailureCount = self.FailureCount.write().await;
611
612				*FailureCount += 1;
613
614				if *FailureCount >= self.Config.FailureThreshold {
615					// Open the circuit
616					let _ = self.TransitionState(CircuitState::Open, "Failure threshold reached").await;
617
618					*self.SuccessCount.write().await = 0;
619				}
620			},
621
622			CircuitState::HalfOpen => {
623				// Return to open state
624				let _ = self.TransitionState(CircuitState::Open, "Failure in half-open state").await;
625
626				*self.SuccessCount.write().await = 0;
627			},
628
629			_ => {},
630		}
631	}
632
633	/// Attempt to transition to half-open if timeout has elapsed with panic
634	/// recovery
635	pub async fn AttemptRecovery(&self) -> bool {
636		let state = self.GetState().await;
637
638		if state != CircuitState::Open {
639			return state == CircuitState::HalfOpen;
640		}
641
642		if let Some(last_failure) = *self.LastFailureTime.read().await {
643			if last_failure.elapsed() >= Duration::from_secs(self.Config.TimeoutSecs) {
644				let _ = self.TransitionState(CircuitState::HalfOpen, "Recovery timeout elapsed").await;
645
646				*self.SuccessCount.write().await = 0;
647
648				return true;
649			}
650		}
651
652		false
653	}
654
655	/// Get circuit breaker statistics for metrics
656	pub async fn GetStatistics(&self) -> CircuitStatistics {
657		CircuitStatistics {
658			Name:self.Name.clone(),
659
660			State:self.GetState().await,
661
662			Failures:*self.FailureCount.read().await,
663
664			Successes:*self.SuccessCount.read().await,
665
666			StateTransitions:*self.StateTransitionCounter.read().await,
667
668			LastFailureTime:*self.LastFailureTime.read().await,
669		}
670	}
671
672	/// Validate circuit breaker configuration
673	pub fn ValidateConfig(&config:&CircuitBreakerConfig) -> Result<(), String> {
674		if config.FailureThreshold == 0 {
675			return Err("FailureThreshold must be greater than 0".to_string());
676		}
677
678		if config.SuccessThreshold == 0 {
679			return Err("SuccessThreshold must be greater than 0".to_string());
680		}
681
682		if config.TimeoutSecs == 0 {
683			return Err("TimeoutSecs must be greater than 0".to_string());
684		}
685
686		Ok(())
687	}
688}
689
690/// Circuit breaker statistics for metrics export
691#[derive(Debug, Clone, Serialize)]
692pub struct CircuitStatistics {
693	pub Name:String,
694
695	pub State:CircuitState,
696
697	pub Failures:u32,
698
699	pub Successes:u32,
700
701	pub StateTransitions:u32,
702
703	#[serde(skip_serializing)]
704	pub LastFailureTime:Option<Instant>,
705}
706
707impl<'de> Deserialize<'de> for CircuitStatistics {
708	fn deserialize<D>(Deserializer:D) -> std::result::Result<Self, D::Error>
709	where
710		D: serde::Deserializer<'de>, {
711		use serde::de::{self, Visitor};
712
713		struct CircuitStatisticsVisitor;
714
715		impl<'de> Visitor<'de> for CircuitStatisticsVisitor {
716			type Value = CircuitStatistics;
717
718			fn expecting(&self, formatter:&mut std::fmt::Formatter) -> std::fmt::Result {
719				formatter.write_str("struct CircuitStatistics")
720			}
721
722			fn visit_map<A>(self, mut map:A) -> std::result::Result<CircuitStatistics, A::Error>
723			where
724				A: de::MapAccess<'de>, {
725				let mut Name = None;
726
727				let mut State = None;
728
729				let mut Failures = None;
730
731				let mut Successes = None;
732
733				let mut StateTransitions = None;
734
735				while let Some(key) = map.next_key::<String>()? {
736					match key.as_str() {
737						"name" => Name = Some(map.next_value()?),
738
739						"state" => State = Some(map.next_value()?),
740
741						"failures" => Failures = Some(map.next_value()?),
742
743						"successes" => Successes = Some(map.next_value()?),
744
745						"state_transitions" => StateTransitions = Some(map.next_value()?),
746
747						_ => {
748							map.next_value::<de::IgnoredAny>()?;
749						},
750					}
751				}
752
753				Ok(CircuitStatistics {
754					Name:Name.ok_or_else(|| de::Error::missing_field("name"))?,
755
756					State:State.ok_or_else(|| de::Error::missing_field("state"))?,
757
758					Failures:Failures.ok_or_else(|| de::Error::missing_field("failures"))?,
759
760					Successes:Successes.ok_or_else(|| de::Error::missing_field("successes"))?,
761
762					StateTransitions:StateTransitions.ok_or_else(|| de::Error::missing_field("state_transitions"))?,
763
764					LastFailureTime:None,
765				})
766			}
767		}
768
769		const FIELDS:&[&str] = &["name", "state", "failures", "successes", "state_transitions"];
770
771		Deserializer.deserialize_struct("CircuitStatistics", FIELDS, CircuitStatisticsVisitor)
772	}
773}
774
775impl Clone for CircuitBreaker {
776	fn clone(&self) -> Self {
777		Self {
778			Name:self.Name.clone(),
779
780			State:self.State.clone(),
781
782			Config:self.Config.clone(),
783
784			FailureCount:self.FailureCount.clone(),
785
786			SuccessCount:self.SuccessCount.clone(),
787
788			LastFailureTime:self.LastFailureTime.clone(),
789
790			EventTx:self.EventTx.clone(),
791
792			StateTransitionCounter:self.StateTransitionCounter.clone(),
793		}
794	}
795}
796
797/// Bulkhead configuration
798#[derive(Debug, Clone, Serialize, Deserialize)]
799pub struct BulkheadConfig {
800	/// Maximum concurrent requests
801	pub max_concurrent:usize,
802
803	/// Maximum queue size
804	pub max_queue:usize,
805
806	/// Request timeout (in seconds)
807	pub timeout_secs:u64,
808}
809
810impl Default for BulkheadConfig {
811	fn default() -> Self { Self { max_concurrent:10, max_queue:100, timeout_secs:30 } }
812}
813
814/// Bulkhead statistics for metrics export
815#[derive(Debug, Clone, Serialize, Deserialize)]
816pub struct BulkheadStatistics {
817	pub name:String,
818
819	pub current_concurrent:u32,
820
821	pub current_queue:u32,
822
823	pub max_concurrent:usize,
824
825	pub max_queue:usize,
826
827	pub total_rejected:u64,
828
829	pub total_completed:u64,
830
831	pub total_timed_out:u64,
832}
833
834/// Bulkhead semaphore for resource isolation with metrics and panic recovery
835pub struct BulkheadExecutor {
836	name:String,
837
838	semaphore:Arc<tokio::sync::Semaphore>,
839
840	config:BulkheadConfig,
841
842	current_requests:Arc<RwLock<u32>>,
843
844	queue_size:Arc<RwLock<u32>>,
845
846	total_rejected:Arc<RwLock<u64>>,
847
848	total_completed:Arc<RwLock<u64>>,
849
850	total_timed_out:Arc<RwLock<u64>>,
851}
852
853impl BulkheadExecutor {
854	/// Create a new bulkhead executor with metrics tracking
855	pub fn new(name:String, config:BulkheadConfig) -> Self {
856		Self {
857			name:name.clone(),
858
859			semaphore:Arc::new(tokio::sync::Semaphore::new(config.max_concurrent)),
860
861			config,
862
863			current_requests:Arc::new(RwLock::new(0)),
864
865			queue_size:Arc::new(RwLock::new(0)),
866
867			total_rejected:Arc::new(RwLock::new(0)),
868
869			total_completed:Arc::new(RwLock::new(0)),
870
871			total_timed_out:Arc::new(RwLock::new(0)),
872		}
873	}
874
875	/// Validate bulkhead configuration
876	pub fn ValidateConfig(config:&BulkheadConfig) -> Result<(), String> {
877		if config.max_concurrent == 0 {
878			return Err("max_concurrent must be greater than 0".to_string());
879		}
880
881		if config.max_queue == 0 {
882			return Err("max_queue must be greater than 0".to_string());
883		}
884
885		if config.timeout_secs == 0 {
886			return Err("timeout_secs must be greater than 0".to_string());
887		}
888
889		Ok(())
890	}
891
892	/// Execute with bulkhead protection and panic recovery
893	pub async fn Execute<F, R>(&self, f:F) -> Result<R, String>
894	where
895		F: std::future::Future<Output = Result<R, String>>, {
896		async {
897			// Validate timeout
898			if self.config.timeout_secs == 0 {
899				return Err("Bulkhead timeout must be greater than 0".to_string());
900			}
901
902			// Check queue size
903			let queue = *self.queue_size.read().await;
904
905			if queue >= self.config.max_queue as u32 {
906				*self.total_rejected.write().await += 1;
907
908				dev_log!("resilience", "warn: [Bulkhead] Queue full for {}, rejecting request", self.name);
909				return Err("Bulkhead queue full".to_string());
910			}
911
912			// Increment queue size
913			*self.queue_size.write().await += 1;
914
915			// Acquire permit with timeout
916			let _Permit =
917				match tokio::time::timeout(Duration::from_secs(self.config.timeout_secs), self.semaphore.acquire())
918					.await
919				{
920					Ok(Ok(_)) => {
921						// Permit acquired, proceed with execution
922						// Decrement queue size
923						*self.queue_size.write().await -= 1;
924					},
925
926					Ok(Err(e)) => {
927						*self.queue_size.write().await -= 1;
928
929						return Err(format!("Bulkhead semaphore error: {}", e));
930					},
931
932					Err(_) => {
933						*self.queue_size.write().await -= 1;
934
935						*self.total_timed_out.write().await += 1;
936
937						dev_log!("resilience", "warn: [Bulkhead] Timeout waiting for permit for {}", self.name);
938						return Err("Bulkhead timeout waiting for permit".to_string());
939					},
940				};
941
942			// Decrement queue size, increment current requests
943			*self.queue_size.write().await -= 1;
944
945			*self.current_requests.write().await += 1;
946
947			// Execute with timeout (no catch_unwind to avoid interior mutability issues)
948			let execution_result = tokio::time::timeout(Duration::from_secs(self.config.timeout_secs), f).await;
949
950			let execution_result:Result<R, String> = match execution_result {
951				Ok(Ok(value)) => Ok(value),
952
953				Ok(Err(e)) => Err(e),
954
955				Err(_) => {
956					*self.total_timed_out.write().await += 1;
957
958					Err("Bulkhead execution timeout".to_string())
959				},
960			};
961
962			if execution_result.is_ok() {
963				*self.total_completed.write().await += 1;
964			}
965
966			execution_result
967		}
968		.await
969	}
970
971	/// Get current load with panic recovery
972	pub async fn GetLoad(&self) -> (u32, u32) {
973		async {
974			let current = *self.current_requests.read().await;
975
976			let queue = *self.queue_size.read().await;
977
978			(current, queue)
979		}
980		.await
981	}
982
983	/// Get bulkhead statistics for metrics
984	pub async fn GetStatistics(&self) -> BulkheadStatistics {
985		async {
986			BulkheadStatistics {
987				name:self.name.clone(),
988
989				current_concurrent:*self.current_requests.read().await,
990
991				current_queue:*self.queue_size.read().await,
992
993				max_concurrent:self.config.max_concurrent,
994
995				max_queue:self.config.max_queue,
996
997				total_rejected:*self.total_rejected.read().await,
998
999				total_completed:*self.total_completed.read().await,
1000
1001				total_timed_out:*self.total_timed_out.read().await,
1002			}
1003		}
1004		.await
1005	}
1006
1007	/// Calculate utilization percentage
1008	pub async fn GetUtilization(&self) -> f64 {
1009		let (current, _) = self.GetLoad().await;
1010
1011		if self.config.max_concurrent == 0 {
1012			return 0.0;
1013		}
1014
1015		(current as f64 / self.config.max_concurrent as f64) * 100.0
1016	}
1017}
1018
1019impl Clone for BulkheadExecutor {
1020	fn clone(&self) -> Self {
1021		Self {
1022			name:self.name.clone(),
1023
1024			semaphore:self.semaphore.clone(),
1025
1026			config:self.config.clone(),
1027
1028			current_requests:self.current_requests.clone(),
1029
1030			queue_size:self.queue_size.clone(),
1031
1032			total_rejected:self.total_rejected.clone(),
1033
1034			total_completed:self.total_completed.clone(),
1035
1036			total_timed_out:self.total_timed_out.clone(),
1037		}
1038	}
1039}
1040
1041/// Timeout manager for cascading deadlines with validation
1042#[derive(Debug, Clone)]
1043pub struct TimeoutManager {
1044	global_deadline:Option<Instant>,
1045
1046	operation_timeout:Duration,
1047}
1048
1049impl TimeoutManager {
1050	/// Create a new timeout manager
1051	pub fn new(operation_timeout:Duration) -> Self { Self { global_deadline:None, operation_timeout } }
1052
1053	/// Create with global deadline
1054	pub fn with_deadline(global_deadline:Instant, operation_timeout:Duration) -> Self {
1055		Self { global_deadline:Some(global_deadline), operation_timeout }
1056	}
1057
1058	/// Validate timeout configuration
1059	pub fn ValidateTimeout(timeout:Duration) -> Result<(), String> {
1060		if timeout.is_zero() {
1061			return Err("Timeout must be greater than 0".to_string());
1062		}
1063
1064		if timeout.as_secs() > 3600 {
1065			return Err("Timeout cannot exceed 1 hour".to_string());
1066		}
1067
1068		Ok(())
1069	}
1070
1071	/// Validate timeout as Result for error handling
1072	pub fn ValidateTimeoutResult(timeout:Duration) -> Result<Duration, String> {
1073		if timeout.is_zero() {
1074			return Err("Timeout must be greater than 0".to_string());
1075		}
1076
1077		if timeout.as_secs() > 3600 {
1078			return Err("Timeout cannot exceed 1 hour".to_string());
1079		}
1080
1081		Ok(timeout)
1082	}
1083
1084	/// Get remaining time until deadline
1085	pub fn remaining(&self) -> Option<Duration> {
1086		self.global_deadline.map(|deadline| {
1087			deadline
1088				.checked_duration_since(Instant::now())
1089				.unwrap_or(Duration::from_secs(0))
1090		})
1091	}
1092
1093	/// Get remaining time with panic recovery
1094	pub fn Remaining(&self) -> Option<Duration> {
1095		std::panic::catch_unwind(|| self.remaining()).unwrap_or_else(|e| {
1096			dev_log!("resilience", "error: [TimeoutManager] Panic in Remaining: {:?}", e);
1097			None
1098		})
1099	}
1100
1101	/// Get effective timeout (minimum of operation timeout and remaining time)
1102	pub fn effective_timeout(&self) -> Duration {
1103		match self.remaining() {
1104			Some(remaining) => self.operation_timeout.min(remaining),
1105
1106			None => self.operation_timeout,
1107		}
1108	}
1109
1110	/// Get effective timeout with validation
1111	pub fn EffectiveTimeout(&self) -> Duration {
1112		std::panic::catch_unwind(|| {
1113			let timeout = self.effective_timeout();
1114
1115			match Self::ValidateTimeoutResult(timeout) {
1116				Ok(valid_timeout) => valid_timeout,
1117
1118				Err(_) => Duration::from_secs(30),
1119			}
1120		})
1121		.unwrap_or_else(|e| {
1122			dev_log!("resilience", "error: [TimeoutManager] Panic in EffectiveTimeout: {:?}", e);
1123			Duration::from_secs(30)
1124		})
1125	}
1126
1127	/// Check if deadline has been exceeded
1128	pub fn is_exceeded(&self) -> bool { self.global_deadline.map_or(false, |deadline| Instant::now() >= deadline) }
1129
1130	/// Check if deadline has been exceeded with panic recovery
1131	pub fn IsExceeded(&self) -> bool {
1132		std::panic::catch_unwind(|| self.is_exceeded()).unwrap_or_else(|e| {
1133			dev_log!("resilience", "error: [TimeoutManager] Panic in IsExceeded: {:?}", e);
1134			true // Fail safe: assume exceeded
1135		})
1136	}
1137
1138	/// Get the global deadline
1139	pub fn GetGlobalDeadline(&self) -> Option<Instant> { self.global_deadline }
1140
1141	/// Get the operation timeout
1142	pub fn GetOperationTimeout(&self) -> Duration { self.operation_timeout }
1143}
1144
1145/// Resilience orchestrator combining all patterns
1146pub struct ResilienceOrchestrator {
1147	retry_manager:Arc<RetryManager>,
1148
1149	circuit_breakers:Arc<RwLock<HashMap<String, CircuitBreaker>>>,
1150
1151	bulkheads:Arc<RwLock<HashMap<String, BulkheadExecutor>>>,
1152}
1153
1154impl ResilienceOrchestrator {
1155	/// Create a new resilience orchestrator
1156	pub fn new(retry_policy:RetryPolicy) -> Self {
1157		Self {
1158			retry_manager:Arc::new(RetryManager::new(retry_policy)),
1159
1160			circuit_breakers:Arc::new(RwLock::new(HashMap::new())),
1161
1162			bulkheads:Arc::new(RwLock::new(HashMap::new())),
1163		}
1164	}
1165
1166	/// Get or create circuit breaker with configuration validation
1167	pub async fn GetCircuitBreaker(&self, service:&str, config:CircuitBreakerConfig) -> Arc<CircuitBreaker> {
1168		let mut breakers = self.circuit_breakers.write().await;
1169
1170		Arc::new(
1171			breakers
1172				.entry(service.to_string())
1173				.or_insert_with(|| CircuitBreaker::new(service.to_string(), config))
1174				.clone(),
1175		)
1176	}
1177
1178	/// Get or create bulkhead with configuration validation
1179	pub async fn GetBulkhead(&self, service:&str, config:BulkheadConfig) -> Arc<BulkheadExecutor> {
1180		let mut bulkheads = self.bulkheads.write().await;
1181
1182		Arc::new(
1183			bulkheads
1184				.entry(service.to_string())
1185				.or_insert_with(|| BulkheadExecutor::new(service.to_string(), config))
1186				.clone(),
1187		)
1188	}
1189
1190	/// Get all circuit breaker statistics
1191	pub async fn GetAllCircuitBreakerStatistics(&self) -> Vec<CircuitStatistics> {
1192		let breakers = self.circuit_breakers.read().await;
1193
1194		let mut stats = Vec::new();
1195
1196		for breaker in breakers.values() {
1197			stats.push(breaker.GetStatistics().await);
1198		}
1199
1200		stats
1201	}
1202
1203	/// Get all bulkhead statistics
1204	pub async fn GetAllBulkheadStatistics(&self) -> Vec<BulkheadStatistics> {
1205		let bulkheads = self.bulkheads.read().await;
1206
1207		let mut stats = Vec::new();
1208
1209		for bulkhead in bulkheads.values() {
1210			stats.push(bulkhead.GetStatistics().await);
1211		}
1212
1213		stats
1214	}
1215
1216	/// Execute with full resilience and event publishing
1217	pub async fn ExecuteResilient<F, R>(
1218		&self,
1219
1220		service:&str,
1221
1222		retry_policy:&RetryPolicy,
1223
1224		circuit_config:CircuitBreakerConfig,
1225
1226		bulkhead_config:BulkheadConfig,
1227
1228		f:F,
1229	) -> Result<R, String>
1230	where
1231		F: Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<R, String>> + Send>>, {
1232		// Validate configurations
1233		if let Err(e) = CircuitBreaker::ValidateConfig(&circuit_config) {
1234			return Err(format!("Invalid circuit breaker config: {}", e));
1235		}
1236
1237		if let Err(e) = BulkheadExecutor::ValidateConfig(&bulkhead_config) {
1238			return Err(format!("Invalid bulkhead config: {}", e));
1239		}
1240
1241		let breaker = self.GetCircuitBreaker(service, circuit_config).await;
1242
1243		let bulkhead = self.GetBulkhead(service, bulkhead_config).await;
1244
1245		// Check circuit state
1246		if breaker.GetState().await == CircuitState::Open {
1247			if !breaker.AttemptRecovery().await {
1248				return Err("Circuit breaker is open".to_string());
1249			}
1250		}
1251
1252		// Execute with bulkhead protection and retry logic
1253		let mut Attempt = 0;
1254
1255		let _LastError = "".to_string();
1256
1257		loop {
1258			let result = bulkhead.Execute(f()).await;
1259
1260			match result {
1261				Ok(Value) => {
1262					breaker.RecordSuccess().await;
1263
1264					// Publish retry success event
1265					let Event = RetryEvent {
1266						Service:service.to_string(),
1267
1268						Attempt,
1269
1270						ErrorClass:ErrorClass::Unknown,
1271
1272						DelayMs:0,
1273
1274						Success:true,
1275
1276						ErrorMessage:None,
1277					};
1278
1279					self.retry_manager.PublishRetryEvent(Event);
1280
1281					return Ok(Value);
1282				},
1283
1284				Err(E) => {
1285					let ErrorClass = self.retry_manager.ClassifyError(&E);
1286
1287					breaker.RecordFailure().await;
1288
1289					// Publish retry failure event
1290					let Delay = self.retry_manager.CalculateAdaptiveRetryDelay(&E, Attempt);
1291
1292					let Event = RetryEvent {
1293						Service:service.to_string(),
1294
1295						Attempt,
1296
1297						ErrorClass,
1298
1299						DelayMs:Delay.as_millis() as u64,
1300
1301						Success:false,
1302
1303						ErrorMessage:Some(self.redact_sensitive_data(&E)),
1304					};
1305
1306					self.retry_manager.PublishRetryEvent(Event);
1307
1308					if Attempt < retry_policy.MaxRetries
1309						&& ErrorClass != ErrorClass::NonRetryable
1310						&& self.retry_manager.CanRetry(service).await
1311					{
1312						let Delay = self.retry_manager.CalculateAdaptiveRetryDelay(&E, Attempt);
1313
1314						dev_log!(
1315							"resilience",
1316							"[ResilienceOrchestrator] Retrying {} (attempt {}/{}) after {:?}, error: {}",
1317							service,
1318							Attempt + 1,
1319							retry_policy.MaxRetries,
1320							Delay,
1321							self.redact_sensitive_data(&E)
1322						);
1323
1324						tokio::time::sleep(Delay).await;
1325
1326						Attempt += 1;
1327					} else {
1328						return Err(E);
1329					}
1330				},
1331			}
1332		}
1333	}
1334
1335	/// Redact sensitive data from error messages before logging/event
1336	/// publishing
1337	fn redact_sensitive_data(&self, message:&str) -> String {
1338		let mut redacted = message.to_string();
1339
1340		// Redact common patterns - simplified to avoid escaping issues
1341		let patterns = vec![
1342			(r"(?i)password[=:]\S+", "password=[REDACTED]"),
1343			(r"(?i)token[=:]\S+", "token=[REDACTED]"),
1344			(r"(?i)(api|private)[_-]?key[=:]\S+", "api_key=[REDACTED]"),
1345			(r"(?i)secret[=:]\S+", "secret=[REDACTED]"),
1346			(
1347				r"(?i)authorization[=[:space:]]+Bearer[[:space:]]+\S+",
1348				"Authorization: Bearer [REDACTED]",
1349			),
1350			(r"(?i)credit[_-]?card[=:][\d-]+", "credit_card=[REDACTED]"),
1351			(r"(?i)ssn[=:][\d-]{9,11}", "ssn=[REDACTED]"),
1352		];
1353
1354		for (pattern, replacement) in patterns {
1355			if let Ok(re) = regex::Regex::new(pattern) {
1356				redacted = re.replace_all(&redacted, replacement).to_string();
1357			}
1358		}
1359
1360		redacted
1361	}
1362
1363	/// Validate all configurations
1364	pub fn ValidateConfigurations(
1365		&self,
1366
1367		_RetryPolicy:&RetryPolicy,
1368
1369		CircuitConfig:&CircuitBreakerConfig,
1370
1371		BulkheadConfig:&BulkheadConfig,
1372	) -> Result<(), String> {
1373		self.retry_manager.ValidatePolicy()?;
1374
1375		CircuitBreaker::ValidateConfig(CircuitConfig)?;
1376
1377		BulkheadExecutor::ValidateConfig(BulkheadConfig)?;
1378
1379		TimeoutManager::ValidateTimeout(Duration::from_secs(BulkheadConfig.timeout_secs))?;
1380
1381		Ok(())
1382	}
1383}
1384
1385impl Clone for ResilienceOrchestrator {
1386	fn clone(&self) -> Self {
1387		Self {
1388			retry_manager:self.retry_manager.clone(),
1389
1390			circuit_breakers:self.circuit_breakers.clone(),
1391
1392			bulkheads:self.bulkheads.clone(),
1393		}
1394	}
1395}
1396
1397#[cfg(test)]
1398mod tests {
1399	use super::*;
1400
1401	#[test]
1402	fn test_retry_delay_calculation() {
1403		let policy = RetryPolicy::default();
1404
1405		let manager = RetryManager::new(policy);
1406
1407		let delay_1 = manager.CalculateRetryDelay(1);
1408
1409		let delay_2 = manager.CalculateRetryDelay(2);
1410
1411		// delay_2 should be roughly double delay_1 (with some jitter)
1412		assert!(delay_2 >= delay_1);
1413	}
1414
1415	#[test]
1416	fn test_adaptive_retry_delay() {
1417		let policy = RetryPolicy::default();
1418
1419		let manager = RetryManager::new(policy);
1420
1421		// Rate limited errors should have longer delays
1422		let rate_limit_delay = manager.CalculateAdaptiveRetryDelay("rate_limit_exceeded", 1);
1423
1424		let transient_delay = manager.CalculateAdaptiveRetryDelay("timeout", 1);
1425
1426		assert!(rate_limit_delay >= transient_delay);
1427	}
1428
1429	#[test]
1430	fn test_error_classification() {
1431		let policy = RetryPolicy::default();
1432
1433		let manager = RetryManager::new(policy);
1434
1435		assert_eq!(manager.ClassifyError("connection timeout"), ErrorClass::Transient);
1436
1437		assert_eq!(manager.ClassifyError("rate limit exceeded"), ErrorClass::RateLimited);
1438
1439		assert_eq!(manager.ClassifyError("unauthorized"), ErrorClass::NonRetryable);
1440
1441		assert_eq!(manager.ClassifyError("server error"), ErrorClass::ServerError);
1442	}
1443
1444	#[test]
1445	fn test_policy_validation() {
1446		let policy = RetryPolicy::default();
1447
1448		let manager = RetryManager::new(policy);
1449
1450		assert!(manager.ValidatePolicy().is_ok());
1451
1452		let invalid_policy = RetryPolicy { MaxRetries:0, ..Default::default() };
1453
1454		let invalid_manager = RetryManager::new(invalid_policy);
1455
1456		assert!(invalid_manager.ValidatePolicy().is_err());
1457	}
1458
1459	#[tokio::test]
1460	async fn test_circuit_breaker_state_transitions() {
1461		let config = CircuitBreakerConfig { FailureThreshold:2, SuccessThreshold:1, TimeoutSecs:1 };
1462
1463		let breaker = CircuitBreaker::new("test".to_string(), config);
1464
1465		assert_eq!(breaker.GetState().await, CircuitState::Closed);
1466
1467		breaker.RecordFailure().await;
1468
1469		assert_eq!(breaker.GetState().await, CircuitState::Closed);
1470
1471		breaker.RecordFailure().await;
1472
1473		assert_eq!(breaker.GetState().await, CircuitState::Open);
1474
1475		assert!(breaker.AttemptRecovery().await);
1476
1477		assert_eq!(breaker.GetState().await, CircuitState::HalfOpen);
1478
1479		breaker.RecordSuccess().await;
1480
1481		assert_eq!(breaker.GetState().await, CircuitState::Closed);
1482	}
1483
1484	#[tokio::test]
1485	async fn test_circuit_breaker_validation() {
1486		let config = CircuitBreakerConfig { FailureThreshold:2, SuccessThreshold:1, TimeoutSecs:1 };
1487
1488		let breaker = CircuitBreaker::new("test".to_string(), config);
1489
1490		// Validate initial state
1491		assert!(breaker.ValidateState().await.is_ok());
1492
1493		// Trigger state transition to open
1494		breaker.RecordFailure().await;
1495
1496		breaker.RecordFailure().await;
1497
1498		let validate_result = breaker.ValidateState().await;
1499
1500		// May be valid due to timeout behavior
1501		assert!(validate_result.is_ok() || validate_result.is_err());
1502	}
1503
1504	#[test]
1505	fn test_circuit_breaker_config_validation() {
1506		let valid_config = CircuitBreakerConfig::default();
1507
1508		assert!(CircuitBreaker::ValidateConfig(&valid_config).is_ok());
1509
1510		let invalid_config = CircuitBreakerConfig { FailureThreshold:0, ..Default::default() };
1511
1512		assert!(CircuitBreaker::ValidateConfig(&invalid_config).is_err());
1513	}
1514
1515	#[tokio::test]
1516	async fn test_bulkhead_resource_isolation() {
1517		let config = BulkheadConfig { max_concurrent:2, max_queue:5, timeout_secs:10 };
1518
1519		let bulkhead = BulkheadExecutor::new("test".to_string(), config);
1520
1521		let (_current, _queue) = bulkhead.GetLoad().await;
1522
1523		assert_eq!(_current, 0);
1524
1525		assert_eq!(_queue, 0);
1526
1527		let stats = bulkhead.GetStatistics().await;
1528
1529		assert_eq!(stats.current_concurrent, 0);
1530
1531		assert_eq!(stats.current_queue, 0);
1532
1533		assert_eq!(stats.max_concurrent, 2);
1534
1535		assert_eq!(stats.max_queue, 5);
1536	}
1537
1538	#[tokio::test]
1539	async fn test_bulkhead_utilization() {
1540		let config = BulkheadConfig { max_concurrent:10, max_queue:100, timeout_secs:30 };
1541
1542		let bulkhead = BulkheadExecutor::new("test".to_string(), config);
1543
1544		let utilization = bulkhead.GetUtilization().await;
1545
1546		assert_eq!(utilization, 0.0);
1547	}
1548
1549	#[test]
1550	fn test_bulkhead_config_validation() {
1551		let valid_config = BulkheadConfig::default();
1552
1553		assert!(BulkheadExecutor::ValidateConfig(&valid_config).is_ok());
1554
1555		let invalid_config = BulkheadConfig { max_concurrent:0, ..Default::default() };
1556
1557		assert!(BulkheadExecutor::ValidateConfig(&invalid_config).is_err());
1558	}
1559
1560	#[test]
1561	fn test_timeout_manager() {
1562		let manager = TimeoutManager::new(Duration::from_secs(30));
1563
1564		assert!(!manager.IsExceeded());
1565
1566		assert_eq!(manager.EffectiveTimeout(), Duration::from_secs(30));
1567
1568		assert!(TimeoutManager::ValidateTimeout(Duration::from_secs(30)).is_ok());
1569
1570		assert!(TimeoutManager::ValidateTimeout(Duration::from_secs(0)).is_err());
1571	}
1572
1573	#[test]
1574	fn test_timeout_manager_with_deadline() {
1575		let deadline = Instant::now() + Duration::from_secs(60);
1576
1577		let manager = TimeoutManager::with_deadline(deadline, Duration::from_secs(30));
1578
1579		let remaining = manager.Remaining();
1580
1581		assert!(remaining.is_some());
1582
1583		assert!(remaining.unwrap() <= Duration::from_secs(60));
1584	}
1585}