1use std::{
71 collections::HashMap,
72 sync::Arc,
73 time::{Duration, Instant},
74};
75
76use tokio::sync::{Mutex, RwLock, broadcast};
77use serde::{Deserialize, Serialize};
78
79use crate::dev_log;
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
83pub enum ErrorClass {
84 Transient,
86
87 NonRetryable,
89
90 RateLimited,
92
93 ServerError,
95
96 Unknown,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct RetryPolicy {
103 pub MaxRetries:u32,
105
106 pub InitialIntervalMs:u64,
108
109 pub MaxIntervalMs:u64,
111
112 pub BackoffMultiplier:f64,
114
115 pub JitterFactor:f64,
117
118 pub BudgetPerMinute:u32,
120
121 pub ErrorClassification:HashMap<String, ErrorClass>,
123}
124
125impl Default for RetryPolicy {
126 fn default() -> Self {
127 let mut ErrorClassification = HashMap::new();
128
129 ErrorClassification.insert("timeout".to_string(), ErrorClass::Transient);
131
132 ErrorClassification.insert("connection_refused".to_string(), ErrorClass::Transient);
133
134 ErrorClassification.insert("connection_reset".to_string(), ErrorClass::Transient);
135
136 ErrorClassification.insert("rate_limit_exceeded".to_string(), ErrorClass::RateLimited);
137
138 ErrorClassification.insert("authentication_failed".to_string(), ErrorClass::NonRetryable);
139
140 ErrorClassification.insert("unauthorized".to_string(), ErrorClass::NonRetryable);
141
142 ErrorClassification.insert("not_found".to_string(), ErrorClass::NonRetryable);
143
144 ErrorClassification.insert("server_error".to_string(), ErrorClass::ServerError);
145
146 ErrorClassification.insert("internal_server_error".to_string(), ErrorClass::ServerError);
147
148 ErrorClassification.insert("service_unavailable".to_string(), ErrorClass::ServerError);
149
150 ErrorClassification.insert("gateway_timeout".to_string(), ErrorClass::Transient);
151
152 Self {
153 MaxRetries:3,
154
155 InitialIntervalMs:1000,
156
157 MaxIntervalMs:32000,
158
159 BackoffMultiplier:2.0,
160
161 JitterFactor:0.1,
162
163 BudgetPerMinute:100,
164
165 ErrorClassification,
166 }
167 }
168}
169
170#[derive(Debug, Clone)]
172struct RetryBudget {
173 Attempts:Vec<Instant>,
174
175 MaxPerMinute:u32,
176}
177
178impl RetryBudget {
179 fn new(MaxPerMinute:u32) -> Self { Self { Attempts:Vec::new(), MaxPerMinute } }
180
181 fn can_retry(&mut self) -> bool {
182 let Now = Instant::now();
183
184 let OneMinuteAgo = Now - Duration::from_secs(60);
185
186 self.Attempts.retain(|&attempt| attempt > OneMinuteAgo);
188
189 if self.Attempts.len() < self.MaxPerMinute as usize {
190 self.Attempts.push(Now);
191
192 true
193 } else {
194 false
195 }
196 }
197}
198
199pub struct RetryManager {
201 Policy:RetryPolicy,
202
203 Budgets:Arc<Mutex<HashMap<String, RetryBudget>>>,
204
205 EventTx:Arc<broadcast::Sender<RetryEvent>>,
206}
207
208#[derive(Debug, Clone, Serialize, Deserialize)]
210pub struct RetryEvent {
211 pub Service:String,
212
213 pub Attempt:u32,
214
215 pub ErrorClass:ErrorClass,
216
217 pub DelayMs:u64,
218
219 pub Success:bool,
220
221 pub ErrorMessage:Option<String>,
222}
223
224impl RetryManager {
225 pub fn new(policy:RetryPolicy) -> Self {
227 let (EventTx, _) = broadcast::channel(1000);
228
229 Self {
230 Policy:policy,
231
232 Budgets:Arc::new(Mutex::new(HashMap::new())),
233
234 EventTx:Arc::new(EventTx),
235 }
236 }
237
238 pub fn GetEventTransmitter(&self) -> broadcast::Sender<RetryEvent> { (*self.EventTx).clone() }
240
241 pub fn CalculateRetryDelay(&self, Attempt:u32) -> Duration {
243 if Attempt == 0 {
244 return Duration::from_millis(0);
245 }
246
247 let BaseDelay = (self.Policy.InitialIntervalMs as f64 * self.Policy.BackoffMultiplier.powi(Attempt as i32 - 1))
248 .min(self.Policy.MaxIntervalMs as f64) as u64;
249
250 let Jitter = (BaseDelay as f64 * self.Policy.JitterFactor) as u64;
252
253 let RandomJitter = (rand::random::<f64>() * Jitter as f64) as u64;
254
255 let FinalDelay = BaseDelay + RandomJitter;
256
257 Duration::from_millis(FinalDelay)
258 }
259
260 pub fn CalculateAdaptiveRetryDelay(&self, ErrorType:&str, attempt:u32) -> Duration {
262 let ErrorClass = self
263 .Policy
264 .ErrorClassification
265 .get(ErrorType)
266 .copied()
267 .unwrap_or(ErrorClass::Unknown);
268
269 match ErrorClass {
270 ErrorClass::RateLimited => {
271 let delay = (attempt + 1) * 5000;
274
275 Duration::from_millis(delay as u64)
276 },
277
278 ErrorClass::ServerError => {
279 let BaseDelay = self.Policy.InitialIntervalMs * 2_u64.pow(attempt);
281
282 Duration::from_millis(BaseDelay.min(self.Policy.MaxIntervalMs))
283 },
284
285 ErrorClass::Transient => {
286 self.CalculateRetryDelay(attempt)
288 },
289
290 ErrorClass::NonRetryable | ErrorClass::Unknown => {
291 Duration::from_millis(100)
293 },
294 }
295 }
296
297 pub fn ClassifyError(&self, ErrorMessage:&str) -> ErrorClass {
299 let ErrorLower = ErrorMessage.to_lowercase();
300
301 for (pattern, class) in &self.Policy.ErrorClassification {
302 if ErrorLower.contains(pattern) {
303 return *class;
304 }
305 }
306
307 ErrorClass::Unknown
308 }
309
310 pub async fn CanRetry(&self, service:&str) -> bool {
313 let mut budgets = self.Budgets.lock().await;
314
315 let budget = budgets
316 .entry(service.to_string())
317 .or_insert_with(|| RetryBudget::new(self.Policy.BudgetPerMinute));
318
319 budget.can_retry()
320 }
321
322 pub fn PublishRetryEvent(&self, event:RetryEvent) { let _ = self.EventTx.send(event); }
324
325 pub fn ValidatePolicy(&self) -> Result<(), String> {
327 if self.Policy.MaxRetries == 0 {
328 return Err("MaxRetries must be greater than 0".to_string());
329 }
330
331 if self.Policy.InitialIntervalMs == 0 {
332 return Err("InitialIntervalMs must be greater than 0".to_string());
333 }
334
335 if self.Policy.InitialIntervalMs > self.Policy.MaxIntervalMs {
336 return Err("InitialIntervalMs cannot be greater than MaxIntervalMs".to_string());
337 }
338
339 if self.Policy.BackoffMultiplier <= 1.0 {
340 return Err("BackoffMultiplier must be greater than 1.0".to_string());
341 }
342
343 if self.Policy.JitterFactor < 0.0 || self.Policy.JitterFactor > 1.0 {
344 return Err("JitterFactor must be between 0 and 1".to_string());
345 }
346
347 if self.Policy.BudgetPerMinute == 0 {
348 return Err("BudgetPerMinute must be greater than 0".to_string());
349 }
350
351 Ok(())
352 }
353}
354
355#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
357pub enum CircuitState {
358 Closed,
360
361 Open,
363
364 HalfOpen,
366}
367
368#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
370pub struct CircuitBreakerConfig {
371 pub FailureThreshold:u32,
373
374 pub SuccessThreshold:u32,
376
377 pub TimeoutSecs:u64,
379}
380
381impl Default for CircuitBreakerConfig {
382 fn default() -> Self { Self { FailureThreshold:5, SuccessThreshold:2, TimeoutSecs:60 } }
383}
384
385#[derive(Debug, Clone, Serialize, Deserialize)]
387pub struct CircuitEvent {
388 pub name:String,
389
390 pub FromState:CircuitState,
391
392 pub ToState:CircuitState,
393
394 pub timestamp:u64,
395
396 pub reason:String,
397}
398
399pub struct CircuitBreaker {
402 Name:String,
403
404 State:Arc<RwLock<CircuitState>>,
405
406 Config:CircuitBreakerConfig,
407
408 FailureCount:Arc<RwLock<u32>>,
409
410 SuccessCount:Arc<RwLock<u32>>,
411
412 LastFailureTime:Arc<RwLock<Option<Instant>>>,
413
414 EventTx:Arc<broadcast::Sender<CircuitEvent>>,
415
416 StateTransitionCounter:Arc<RwLock<u32>>,
417}
418
419impl CircuitBreaker {
420 pub fn new(name:String, Config:CircuitBreakerConfig) -> Self {
422 let (EventTx, _) = broadcast::channel(1000);
423
424 Self {
425 Name:name.clone(),
426
427 State:Arc::new(RwLock::new(CircuitState::Closed)),
428
429 Config,
430
431 FailureCount:Arc::new(RwLock::new(0)),
432
433 SuccessCount:Arc::new(RwLock::new(0)),
434
435 LastFailureTime:Arc::new(RwLock::new(None)),
436
437 EventTx:Arc::new(EventTx),
438
439 StateTransitionCounter:Arc::new(RwLock::new(0)),
440 }
441 }
442
443 pub fn GetEventTransmitter(&self) -> broadcast::Sender<CircuitEvent> { (*self.EventTx).clone() }
445
446 pub async fn GetState(&self) -> CircuitState { *self.State.read().await }
448
449 pub async fn ValidateState(&self) -> Result<(), String> {
451 let state = *self.State.read().await;
452
453 let failures = *self.FailureCount.read().await;
454
455 let successes = *self.SuccessCount.read().await;
456
457 match state {
458 CircuitState::Closed => {
459 if successes != 0 {
460 return Err(format!("Inconsistent state: Closed but has {} successes", successes));
461 }
462
463 if failures >= self.Config.FailureThreshold {
464 dev_log!(
465 "resilience",
466 "warn: [CircuitBreaker] State inconsistency: Closed but failure count ({}) >= threshold ({})",
467 failures,
468 self.Config.FailureThreshold
469 );
470 }
471 },
472
473 CircuitState::Open => {
474 if failures < self.Config.FailureThreshold {
475 dev_log!(
476 "resilience",
477 "warn: [CircuitBreaker] State inconsistency: Open but failure count ({}) < threshold ({})",
478 failures,
479 self.Config.FailureThreshold
480 );
481 }
482 },
483
484 CircuitState::HalfOpen => {
485 if successes >= self.Config.SuccessThreshold {
486 return Err(format!(
487 "Inconsistent state: HalfOpen but has {} successes (should be Closed)",
488 successes
489 ));
490 }
491 },
492 }
493
494 Ok(())
495 }
496
497 async fn TransitionState(&self, NewState:CircuitState, reason:&str) -> Result<(), String> {
499 let CurrentState = self.GetState().await;
500
501 if CurrentState == NewState {
502 return Ok(());
504 }
505
506 match (CurrentState, NewState) {
508 (CircuitState::Closed, CircuitState::Open) | (CircuitState::HalfOpen, CircuitState::Open) => {
509 },
511
512 (CircuitState::Open, CircuitState::HalfOpen) => {
513 },
515
516 (CircuitState::HalfOpen, CircuitState::Closed) => {
517 },
519
520 _ => {
521 return Err(format!(
522 "Invalid state transition from {:?} to {:?} for {}",
523 CurrentState, NewState, self.Name
524 ));
525 },
526 }
527
528 let event = CircuitEvent {
530 name:self.Name.clone(),
531
532 FromState:CurrentState,
533
534 ToState:NewState,
535
536 timestamp:crate::Utility::CurrentTimestamp(),
537
538 reason:reason.to_string(),
539 };
540
541 let _ = self.EventTx.send(event);
542
543 *self.State.write().await = NewState;
545
546 *self.StateTransitionCounter.write().await += 1;
548
549 dev_log!(
550 "resilience",
551 "[CircuitBreaker] State transition for {}: {:?} -> {:?} (reason: {})",
552 self.Name,
553 CurrentState,
554 NewState,
555 reason
556 );
557
558 self.ValidateState().await.map_err(|e| {
560 dev_log!(
561 "resilience",
562 "error: [CircuitBreaker] State validation failed after transition: {}",
563 e
564 );
565 e
566 })?;
567
568 Ok(())
569 }
570
571 pub async fn RecordSuccess(&self) {
573 let state = self.GetState().await;
574
575 match state {
576 CircuitState::Closed => {
577 *self.FailureCount.write().await = 0;
579 },
580
581 CircuitState::HalfOpen => {
582 let mut SuccessCount = self.SuccessCount.write().await;
584
585 *SuccessCount += 1;
586
587 if *SuccessCount >= self.Config.SuccessThreshold {
588 let _ = self.TransitionState(CircuitState::Closed, "Success threshold reached").await;
590
591 *self.FailureCount.write().await = 0;
592
593 *self.SuccessCount.write().await = 0;
594 }
595 },
596
597 _ => {},
598 }
599 }
600
601 pub async fn RecordFailure(&self) {
603 let State = self.GetState().await;
604
605 *self.LastFailureTime.write().await = Some(Instant::now());
606
607 match State {
608 CircuitState::Closed => {
609 let mut FailureCount = self.FailureCount.write().await;
611
612 *FailureCount += 1;
613
614 if *FailureCount >= self.Config.FailureThreshold {
615 let _ = self.TransitionState(CircuitState::Open, "Failure threshold reached").await;
617
618 *self.SuccessCount.write().await = 0;
619 }
620 },
621
622 CircuitState::HalfOpen => {
623 let _ = self.TransitionState(CircuitState::Open, "Failure in half-open state").await;
625
626 *self.SuccessCount.write().await = 0;
627 },
628
629 _ => {},
630 }
631 }
632
633 pub async fn AttemptRecovery(&self) -> bool {
636 let state = self.GetState().await;
637
638 if state != CircuitState::Open {
639 return state == CircuitState::HalfOpen;
640 }
641
642 if let Some(last_failure) = *self.LastFailureTime.read().await {
643 if last_failure.elapsed() >= Duration::from_secs(self.Config.TimeoutSecs) {
644 let _ = self.TransitionState(CircuitState::HalfOpen, "Recovery timeout elapsed").await;
645
646 *self.SuccessCount.write().await = 0;
647
648 return true;
649 }
650 }
651
652 false
653 }
654
655 pub async fn GetStatistics(&self) -> CircuitStatistics {
657 CircuitStatistics {
658 Name:self.Name.clone(),
659
660 State:self.GetState().await,
661
662 Failures:*self.FailureCount.read().await,
663
664 Successes:*self.SuccessCount.read().await,
665
666 StateTransitions:*self.StateTransitionCounter.read().await,
667
668 LastFailureTime:*self.LastFailureTime.read().await,
669 }
670 }
671
672 pub fn ValidateConfig(&config:&CircuitBreakerConfig) -> Result<(), String> {
674 if config.FailureThreshold == 0 {
675 return Err("FailureThreshold must be greater than 0".to_string());
676 }
677
678 if config.SuccessThreshold == 0 {
679 return Err("SuccessThreshold must be greater than 0".to_string());
680 }
681
682 if config.TimeoutSecs == 0 {
683 return Err("TimeoutSecs must be greater than 0".to_string());
684 }
685
686 Ok(())
687 }
688}
689
690#[derive(Debug, Clone, Serialize)]
692pub struct CircuitStatistics {
693 pub Name:String,
694
695 pub State:CircuitState,
696
697 pub Failures:u32,
698
699 pub Successes:u32,
700
701 pub StateTransitions:u32,
702
703 #[serde(skip_serializing)]
704 pub LastFailureTime:Option<Instant>,
705}
706
707impl<'de> Deserialize<'de> for CircuitStatistics {
708 fn deserialize<D>(Deserializer:D) -> std::result::Result<Self, D::Error>
709 where
710 D: serde::Deserializer<'de>, {
711 use serde::de::{self, Visitor};
712
713 struct CircuitStatisticsVisitor;
714
715 impl<'de> Visitor<'de> for CircuitStatisticsVisitor {
716 type Value = CircuitStatistics;
717
718 fn expecting(&self, formatter:&mut std::fmt::Formatter) -> std::fmt::Result {
719 formatter.write_str("struct CircuitStatistics")
720 }
721
722 fn visit_map<A>(self, mut map:A) -> std::result::Result<CircuitStatistics, A::Error>
723 where
724 A: de::MapAccess<'de>, {
725 let mut Name = None;
726
727 let mut State = None;
728
729 let mut Failures = None;
730
731 let mut Successes = None;
732
733 let mut StateTransitions = None;
734
735 while let Some(key) = map.next_key::<String>()? {
736 match key.as_str() {
737 "name" => Name = Some(map.next_value()?),
738
739 "state" => State = Some(map.next_value()?),
740
741 "failures" => Failures = Some(map.next_value()?),
742
743 "successes" => Successes = Some(map.next_value()?),
744
745 "state_transitions" => StateTransitions = Some(map.next_value()?),
746
747 _ => {
748 map.next_value::<de::IgnoredAny>()?;
749 },
750 }
751 }
752
753 Ok(CircuitStatistics {
754 Name:Name.ok_or_else(|| de::Error::missing_field("name"))?,
755
756 State:State.ok_or_else(|| de::Error::missing_field("state"))?,
757
758 Failures:Failures.ok_or_else(|| de::Error::missing_field("failures"))?,
759
760 Successes:Successes.ok_or_else(|| de::Error::missing_field("successes"))?,
761
762 StateTransitions:StateTransitions.ok_or_else(|| de::Error::missing_field("state_transitions"))?,
763
764 LastFailureTime:None,
765 })
766 }
767 }
768
769 const FIELDS:&[&str] = &["name", "state", "failures", "successes", "state_transitions"];
770
771 Deserializer.deserialize_struct("CircuitStatistics", FIELDS, CircuitStatisticsVisitor)
772 }
773}
774
775impl Clone for CircuitBreaker {
776 fn clone(&self) -> Self {
777 Self {
778 Name:self.Name.clone(),
779
780 State:self.State.clone(),
781
782 Config:self.Config.clone(),
783
784 FailureCount:self.FailureCount.clone(),
785
786 SuccessCount:self.SuccessCount.clone(),
787
788 LastFailureTime:self.LastFailureTime.clone(),
789
790 EventTx:self.EventTx.clone(),
791
792 StateTransitionCounter:self.StateTransitionCounter.clone(),
793 }
794 }
795}
796
797#[derive(Debug, Clone, Serialize, Deserialize)]
799pub struct BulkheadConfig {
800 pub max_concurrent:usize,
802
803 pub max_queue:usize,
805
806 pub timeout_secs:u64,
808}
809
810impl Default for BulkheadConfig {
811 fn default() -> Self { Self { max_concurrent:10, max_queue:100, timeout_secs:30 } }
812}
813
814#[derive(Debug, Clone, Serialize, Deserialize)]
816pub struct BulkheadStatistics {
817 pub name:String,
818
819 pub current_concurrent:u32,
820
821 pub current_queue:u32,
822
823 pub max_concurrent:usize,
824
825 pub max_queue:usize,
826
827 pub total_rejected:u64,
828
829 pub total_completed:u64,
830
831 pub total_timed_out:u64,
832}
833
834pub struct BulkheadExecutor {
836 name:String,
837
838 semaphore:Arc<tokio::sync::Semaphore>,
839
840 config:BulkheadConfig,
841
842 current_requests:Arc<RwLock<u32>>,
843
844 queue_size:Arc<RwLock<u32>>,
845
846 total_rejected:Arc<RwLock<u64>>,
847
848 total_completed:Arc<RwLock<u64>>,
849
850 total_timed_out:Arc<RwLock<u64>>,
851}
852
853impl BulkheadExecutor {
854 pub fn new(name:String, config:BulkheadConfig) -> Self {
856 Self {
857 name:name.clone(),
858
859 semaphore:Arc::new(tokio::sync::Semaphore::new(config.max_concurrent)),
860
861 config,
862
863 current_requests:Arc::new(RwLock::new(0)),
864
865 queue_size:Arc::new(RwLock::new(0)),
866
867 total_rejected:Arc::new(RwLock::new(0)),
868
869 total_completed:Arc::new(RwLock::new(0)),
870
871 total_timed_out:Arc::new(RwLock::new(0)),
872 }
873 }
874
875 pub fn ValidateConfig(config:&BulkheadConfig) -> Result<(), String> {
877 if config.max_concurrent == 0 {
878 return Err("max_concurrent must be greater than 0".to_string());
879 }
880
881 if config.max_queue == 0 {
882 return Err("max_queue must be greater than 0".to_string());
883 }
884
885 if config.timeout_secs == 0 {
886 return Err("timeout_secs must be greater than 0".to_string());
887 }
888
889 Ok(())
890 }
891
892 pub async fn Execute<F, R>(&self, f:F) -> Result<R, String>
894 where
895 F: std::future::Future<Output = Result<R, String>>, {
896 async {
897 if self.config.timeout_secs == 0 {
899 return Err("Bulkhead timeout must be greater than 0".to_string());
900 }
901
902 let queue = *self.queue_size.read().await;
904
905 if queue >= self.config.max_queue as u32 {
906 *self.total_rejected.write().await += 1;
907
908 dev_log!("resilience", "warn: [Bulkhead] Queue full for {}, rejecting request", self.name);
909 return Err("Bulkhead queue full".to_string());
910 }
911
912 *self.queue_size.write().await += 1;
914
915 let _Permit =
917 match tokio::time::timeout(Duration::from_secs(self.config.timeout_secs), self.semaphore.acquire())
918 .await
919 {
920 Ok(Ok(_)) => {
921 *self.queue_size.write().await -= 1;
924 },
925
926 Ok(Err(e)) => {
927 *self.queue_size.write().await -= 1;
928
929 return Err(format!("Bulkhead semaphore error: {}", e));
930 },
931
932 Err(_) => {
933 *self.queue_size.write().await -= 1;
934
935 *self.total_timed_out.write().await += 1;
936
937 dev_log!("resilience", "warn: [Bulkhead] Timeout waiting for permit for {}", self.name);
938 return Err("Bulkhead timeout waiting for permit".to_string());
939 },
940 };
941
942 *self.queue_size.write().await -= 1;
944
945 *self.current_requests.write().await += 1;
946
947 let execution_result = tokio::time::timeout(Duration::from_secs(self.config.timeout_secs), f).await;
949
950 let execution_result:Result<R, String> = match execution_result {
951 Ok(Ok(value)) => Ok(value),
952
953 Ok(Err(e)) => Err(e),
954
955 Err(_) => {
956 *self.total_timed_out.write().await += 1;
957
958 Err("Bulkhead execution timeout".to_string())
959 },
960 };
961
962 if execution_result.is_ok() {
963 *self.total_completed.write().await += 1;
964 }
965
966 execution_result
967 }
968 .await
969 }
970
971 pub async fn GetLoad(&self) -> (u32, u32) {
973 async {
974 let current = *self.current_requests.read().await;
975
976 let queue = *self.queue_size.read().await;
977
978 (current, queue)
979 }
980 .await
981 }
982
983 pub async fn GetStatistics(&self) -> BulkheadStatistics {
985 async {
986 BulkheadStatistics {
987 name:self.name.clone(),
988
989 current_concurrent:*self.current_requests.read().await,
990
991 current_queue:*self.queue_size.read().await,
992
993 max_concurrent:self.config.max_concurrent,
994
995 max_queue:self.config.max_queue,
996
997 total_rejected:*self.total_rejected.read().await,
998
999 total_completed:*self.total_completed.read().await,
1000
1001 total_timed_out:*self.total_timed_out.read().await,
1002 }
1003 }
1004 .await
1005 }
1006
1007 pub async fn GetUtilization(&self) -> f64 {
1009 let (current, _) = self.GetLoad().await;
1010
1011 if self.config.max_concurrent == 0 {
1012 return 0.0;
1013 }
1014
1015 (current as f64 / self.config.max_concurrent as f64) * 100.0
1016 }
1017}
1018
1019impl Clone for BulkheadExecutor {
1020 fn clone(&self) -> Self {
1021 Self {
1022 name:self.name.clone(),
1023
1024 semaphore:self.semaphore.clone(),
1025
1026 config:self.config.clone(),
1027
1028 current_requests:self.current_requests.clone(),
1029
1030 queue_size:self.queue_size.clone(),
1031
1032 total_rejected:self.total_rejected.clone(),
1033
1034 total_completed:self.total_completed.clone(),
1035
1036 total_timed_out:self.total_timed_out.clone(),
1037 }
1038 }
1039}
1040
1041#[derive(Debug, Clone)]
1043pub struct TimeoutManager {
1044 global_deadline:Option<Instant>,
1045
1046 operation_timeout:Duration,
1047}
1048
1049impl TimeoutManager {
1050 pub fn new(operation_timeout:Duration) -> Self { Self { global_deadline:None, operation_timeout } }
1052
1053 pub fn with_deadline(global_deadline:Instant, operation_timeout:Duration) -> Self {
1055 Self { global_deadline:Some(global_deadline), operation_timeout }
1056 }
1057
1058 pub fn ValidateTimeout(timeout:Duration) -> Result<(), String> {
1060 if timeout.is_zero() {
1061 return Err("Timeout must be greater than 0".to_string());
1062 }
1063
1064 if timeout.as_secs() > 3600 {
1065 return Err("Timeout cannot exceed 1 hour".to_string());
1066 }
1067
1068 Ok(())
1069 }
1070
1071 pub fn ValidateTimeoutResult(timeout:Duration) -> Result<Duration, String> {
1073 if timeout.is_zero() {
1074 return Err("Timeout must be greater than 0".to_string());
1075 }
1076
1077 if timeout.as_secs() > 3600 {
1078 return Err("Timeout cannot exceed 1 hour".to_string());
1079 }
1080
1081 Ok(timeout)
1082 }
1083
1084 pub fn remaining(&self) -> Option<Duration> {
1086 self.global_deadline.map(|deadline| {
1087 deadline
1088 .checked_duration_since(Instant::now())
1089 .unwrap_or(Duration::from_secs(0))
1090 })
1091 }
1092
1093 pub fn Remaining(&self) -> Option<Duration> {
1095 std::panic::catch_unwind(|| self.remaining()).unwrap_or_else(|e| {
1096 dev_log!("resilience", "error: [TimeoutManager] Panic in Remaining: {:?}", e);
1097 None
1098 })
1099 }
1100
1101 pub fn effective_timeout(&self) -> Duration {
1103 match self.remaining() {
1104 Some(remaining) => self.operation_timeout.min(remaining),
1105
1106 None => self.operation_timeout,
1107 }
1108 }
1109
1110 pub fn EffectiveTimeout(&self) -> Duration {
1112 std::panic::catch_unwind(|| {
1113 let timeout = self.effective_timeout();
1114
1115 match Self::ValidateTimeoutResult(timeout) {
1116 Ok(valid_timeout) => valid_timeout,
1117
1118 Err(_) => Duration::from_secs(30),
1119 }
1120 })
1121 .unwrap_or_else(|e| {
1122 dev_log!("resilience", "error: [TimeoutManager] Panic in EffectiveTimeout: {:?}", e);
1123 Duration::from_secs(30)
1124 })
1125 }
1126
1127 pub fn is_exceeded(&self) -> bool { self.global_deadline.map_or(false, |deadline| Instant::now() >= deadline) }
1129
1130 pub fn IsExceeded(&self) -> bool {
1132 std::panic::catch_unwind(|| self.is_exceeded()).unwrap_or_else(|e| {
1133 dev_log!("resilience", "error: [TimeoutManager] Panic in IsExceeded: {:?}", e);
1134 true })
1136 }
1137
1138 pub fn GetGlobalDeadline(&self) -> Option<Instant> { self.global_deadline }
1140
1141 pub fn GetOperationTimeout(&self) -> Duration { self.operation_timeout }
1143}
1144
1145pub struct ResilienceOrchestrator {
1147 retry_manager:Arc<RetryManager>,
1148
1149 circuit_breakers:Arc<RwLock<HashMap<String, CircuitBreaker>>>,
1150
1151 bulkheads:Arc<RwLock<HashMap<String, BulkheadExecutor>>>,
1152}
1153
1154impl ResilienceOrchestrator {
1155 pub fn new(retry_policy:RetryPolicy) -> Self {
1157 Self {
1158 retry_manager:Arc::new(RetryManager::new(retry_policy)),
1159
1160 circuit_breakers:Arc::new(RwLock::new(HashMap::new())),
1161
1162 bulkheads:Arc::new(RwLock::new(HashMap::new())),
1163 }
1164 }
1165
1166 pub async fn GetCircuitBreaker(&self, service:&str, config:CircuitBreakerConfig) -> Arc<CircuitBreaker> {
1168 let mut breakers = self.circuit_breakers.write().await;
1169
1170 Arc::new(
1171 breakers
1172 .entry(service.to_string())
1173 .or_insert_with(|| CircuitBreaker::new(service.to_string(), config))
1174 .clone(),
1175 )
1176 }
1177
1178 pub async fn GetBulkhead(&self, service:&str, config:BulkheadConfig) -> Arc<BulkheadExecutor> {
1180 let mut bulkheads = self.bulkheads.write().await;
1181
1182 Arc::new(
1183 bulkheads
1184 .entry(service.to_string())
1185 .or_insert_with(|| BulkheadExecutor::new(service.to_string(), config))
1186 .clone(),
1187 )
1188 }
1189
1190 pub async fn GetAllCircuitBreakerStatistics(&self) -> Vec<CircuitStatistics> {
1192 let breakers = self.circuit_breakers.read().await;
1193
1194 let mut stats = Vec::new();
1195
1196 for breaker in breakers.values() {
1197 stats.push(breaker.GetStatistics().await);
1198 }
1199
1200 stats
1201 }
1202
1203 pub async fn GetAllBulkheadStatistics(&self) -> Vec<BulkheadStatistics> {
1205 let bulkheads = self.bulkheads.read().await;
1206
1207 let mut stats = Vec::new();
1208
1209 for bulkhead in bulkheads.values() {
1210 stats.push(bulkhead.GetStatistics().await);
1211 }
1212
1213 stats
1214 }
1215
1216 pub async fn ExecuteResilient<F, R>(
1218 &self,
1219
1220 service:&str,
1221
1222 retry_policy:&RetryPolicy,
1223
1224 circuit_config:CircuitBreakerConfig,
1225
1226 bulkhead_config:BulkheadConfig,
1227
1228 f:F,
1229 ) -> Result<R, String>
1230 where
1231 F: Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<R, String>> + Send>>, {
1232 if let Err(e) = CircuitBreaker::ValidateConfig(&circuit_config) {
1234 return Err(format!("Invalid circuit breaker config: {}", e));
1235 }
1236
1237 if let Err(e) = BulkheadExecutor::ValidateConfig(&bulkhead_config) {
1238 return Err(format!("Invalid bulkhead config: {}", e));
1239 }
1240
1241 let breaker = self.GetCircuitBreaker(service, circuit_config).await;
1242
1243 let bulkhead = self.GetBulkhead(service, bulkhead_config).await;
1244
1245 if breaker.GetState().await == CircuitState::Open {
1247 if !breaker.AttemptRecovery().await {
1248 return Err("Circuit breaker is open".to_string());
1249 }
1250 }
1251
1252 let mut Attempt = 0;
1254
1255 let _LastError = "".to_string();
1256
1257 loop {
1258 let result = bulkhead.Execute(f()).await;
1259
1260 match result {
1261 Ok(Value) => {
1262 breaker.RecordSuccess().await;
1263
1264 let Event = RetryEvent {
1266 Service:service.to_string(),
1267
1268 Attempt,
1269
1270 ErrorClass:ErrorClass::Unknown,
1271
1272 DelayMs:0,
1273
1274 Success:true,
1275
1276 ErrorMessage:None,
1277 };
1278
1279 self.retry_manager.PublishRetryEvent(Event);
1280
1281 return Ok(Value);
1282 },
1283
1284 Err(E) => {
1285 let ErrorClass = self.retry_manager.ClassifyError(&E);
1286
1287 breaker.RecordFailure().await;
1288
1289 let Delay = self.retry_manager.CalculateAdaptiveRetryDelay(&E, Attempt);
1291
1292 let Event = RetryEvent {
1293 Service:service.to_string(),
1294
1295 Attempt,
1296
1297 ErrorClass,
1298
1299 DelayMs:Delay.as_millis() as u64,
1300
1301 Success:false,
1302
1303 ErrorMessage:Some(self.redact_sensitive_data(&E)),
1304 };
1305
1306 self.retry_manager.PublishRetryEvent(Event);
1307
1308 if Attempt < retry_policy.MaxRetries
1309 && ErrorClass != ErrorClass::NonRetryable
1310 && self.retry_manager.CanRetry(service).await
1311 {
1312 let Delay = self.retry_manager.CalculateAdaptiveRetryDelay(&E, Attempt);
1313
1314 dev_log!(
1315 "resilience",
1316 "[ResilienceOrchestrator] Retrying {} (attempt {}/{}) after {:?}, error: {}",
1317 service,
1318 Attempt + 1,
1319 retry_policy.MaxRetries,
1320 Delay,
1321 self.redact_sensitive_data(&E)
1322 );
1323
1324 tokio::time::sleep(Delay).await;
1325
1326 Attempt += 1;
1327 } else {
1328 return Err(E);
1329 }
1330 },
1331 }
1332 }
1333 }
1334
1335 fn redact_sensitive_data(&self, message:&str) -> String {
1338 let mut redacted = message.to_string();
1339
1340 let patterns = vec![
1342 (r"(?i)password[=:]\S+", "password=[REDACTED]"),
1343 (r"(?i)token[=:]\S+", "token=[REDACTED]"),
1344 (r"(?i)(api|private)[_-]?key[=:]\S+", "api_key=[REDACTED]"),
1345 (r"(?i)secret[=:]\S+", "secret=[REDACTED]"),
1346 (
1347 r"(?i)authorization[=[:space:]]+Bearer[[:space:]]+\S+",
1348 "Authorization: Bearer [REDACTED]",
1349 ),
1350 (r"(?i)credit[_-]?card[=:][\d-]+", "credit_card=[REDACTED]"),
1351 (r"(?i)ssn[=:][\d-]{9,11}", "ssn=[REDACTED]"),
1352 ];
1353
1354 for (pattern, replacement) in patterns {
1355 if let Ok(re) = regex::Regex::new(pattern) {
1356 redacted = re.replace_all(&redacted, replacement).to_string();
1357 }
1358 }
1359
1360 redacted
1361 }
1362
1363 pub fn ValidateConfigurations(
1365 &self,
1366
1367 _RetryPolicy:&RetryPolicy,
1368
1369 CircuitConfig:&CircuitBreakerConfig,
1370
1371 BulkheadConfig:&BulkheadConfig,
1372 ) -> Result<(), String> {
1373 self.retry_manager.ValidatePolicy()?;
1374
1375 CircuitBreaker::ValidateConfig(CircuitConfig)?;
1376
1377 BulkheadExecutor::ValidateConfig(BulkheadConfig)?;
1378
1379 TimeoutManager::ValidateTimeout(Duration::from_secs(BulkheadConfig.timeout_secs))?;
1380
1381 Ok(())
1382 }
1383}
1384
1385impl Clone for ResilienceOrchestrator {
1386 fn clone(&self) -> Self {
1387 Self {
1388 retry_manager:self.retry_manager.clone(),
1389
1390 circuit_breakers:self.circuit_breakers.clone(),
1391
1392 bulkheads:self.bulkheads.clone(),
1393 }
1394 }
1395}
1396
1397#[cfg(test)]
1398mod tests {
1399 use super::*;
1400
1401 #[test]
1402 fn test_retry_delay_calculation() {
1403 let policy = RetryPolicy::default();
1404
1405 let manager = RetryManager::new(policy);
1406
1407 let delay_1 = manager.CalculateRetryDelay(1);
1408
1409 let delay_2 = manager.CalculateRetryDelay(2);
1410
1411 assert!(delay_2 >= delay_1);
1413 }
1414
1415 #[test]
1416 fn test_adaptive_retry_delay() {
1417 let policy = RetryPolicy::default();
1418
1419 let manager = RetryManager::new(policy);
1420
1421 let rate_limit_delay = manager.CalculateAdaptiveRetryDelay("rate_limit_exceeded", 1);
1423
1424 let transient_delay = manager.CalculateAdaptiveRetryDelay("timeout", 1);
1425
1426 assert!(rate_limit_delay >= transient_delay);
1427 }
1428
1429 #[test]
1430 fn test_error_classification() {
1431 let policy = RetryPolicy::default();
1432
1433 let manager = RetryManager::new(policy);
1434
1435 assert_eq!(manager.ClassifyError("connection timeout"), ErrorClass::Transient);
1436
1437 assert_eq!(manager.ClassifyError("rate limit exceeded"), ErrorClass::RateLimited);
1438
1439 assert_eq!(manager.ClassifyError("unauthorized"), ErrorClass::NonRetryable);
1440
1441 assert_eq!(manager.ClassifyError("server error"), ErrorClass::ServerError);
1442 }
1443
1444 #[test]
1445 fn test_policy_validation() {
1446 let policy = RetryPolicy::default();
1447
1448 let manager = RetryManager::new(policy);
1449
1450 assert!(manager.ValidatePolicy().is_ok());
1451
1452 let invalid_policy = RetryPolicy { MaxRetries:0, ..Default::default() };
1453
1454 let invalid_manager = RetryManager::new(invalid_policy);
1455
1456 assert!(invalid_manager.ValidatePolicy().is_err());
1457 }
1458
1459 #[tokio::test]
1460 async fn test_circuit_breaker_state_transitions() {
1461 let config = CircuitBreakerConfig { FailureThreshold:2, SuccessThreshold:1, TimeoutSecs:1 };
1462
1463 let breaker = CircuitBreaker::new("test".to_string(), config);
1464
1465 assert_eq!(breaker.GetState().await, CircuitState::Closed);
1466
1467 breaker.RecordFailure().await;
1468
1469 assert_eq!(breaker.GetState().await, CircuitState::Closed);
1470
1471 breaker.RecordFailure().await;
1472
1473 assert_eq!(breaker.GetState().await, CircuitState::Open);
1474
1475 assert!(breaker.AttemptRecovery().await);
1476
1477 assert_eq!(breaker.GetState().await, CircuitState::HalfOpen);
1478
1479 breaker.RecordSuccess().await;
1480
1481 assert_eq!(breaker.GetState().await, CircuitState::Closed);
1482 }
1483
1484 #[tokio::test]
1485 async fn test_circuit_breaker_validation() {
1486 let config = CircuitBreakerConfig { FailureThreshold:2, SuccessThreshold:1, TimeoutSecs:1 };
1487
1488 let breaker = CircuitBreaker::new("test".to_string(), config);
1489
1490 assert!(breaker.ValidateState().await.is_ok());
1492
1493 breaker.RecordFailure().await;
1495
1496 breaker.RecordFailure().await;
1497
1498 let validate_result = breaker.ValidateState().await;
1499
1500 assert!(validate_result.is_ok() || validate_result.is_err());
1502 }
1503
1504 #[test]
1505 fn test_circuit_breaker_config_validation() {
1506 let valid_config = CircuitBreakerConfig::default();
1507
1508 assert!(CircuitBreaker::ValidateConfig(&valid_config).is_ok());
1509
1510 let invalid_config = CircuitBreakerConfig { FailureThreshold:0, ..Default::default() };
1511
1512 assert!(CircuitBreaker::ValidateConfig(&invalid_config).is_err());
1513 }
1514
1515 #[tokio::test]
1516 async fn test_bulkhead_resource_isolation() {
1517 let config = BulkheadConfig { max_concurrent:2, max_queue:5, timeout_secs:10 };
1518
1519 let bulkhead = BulkheadExecutor::new("test".to_string(), config);
1520
1521 let (_current, _queue) = bulkhead.GetLoad().await;
1522
1523 assert_eq!(_current, 0);
1524
1525 assert_eq!(_queue, 0);
1526
1527 let stats = bulkhead.GetStatistics().await;
1528
1529 assert_eq!(stats.current_concurrent, 0);
1530
1531 assert_eq!(stats.current_queue, 0);
1532
1533 assert_eq!(stats.max_concurrent, 2);
1534
1535 assert_eq!(stats.max_queue, 5);
1536 }
1537
1538 #[tokio::test]
1539 async fn test_bulkhead_utilization() {
1540 let config = BulkheadConfig { max_concurrent:10, max_queue:100, timeout_secs:30 };
1541
1542 let bulkhead = BulkheadExecutor::new("test".to_string(), config);
1543
1544 let utilization = bulkhead.GetUtilization().await;
1545
1546 assert_eq!(utilization, 0.0);
1547 }
1548
1549 #[test]
1550 fn test_bulkhead_config_validation() {
1551 let valid_config = BulkheadConfig::default();
1552
1553 assert!(BulkheadExecutor::ValidateConfig(&valid_config).is_ok());
1554
1555 let invalid_config = BulkheadConfig { max_concurrent:0, ..Default::default() };
1556
1557 assert!(BulkheadExecutor::ValidateConfig(&invalid_config).is_err());
1558 }
1559
1560 #[test]
1561 fn test_timeout_manager() {
1562 let manager = TimeoutManager::new(Duration::from_secs(30));
1563
1564 assert!(!manager.IsExceeded());
1565
1566 assert_eq!(manager.EffectiveTimeout(), Duration::from_secs(30));
1567
1568 assert!(TimeoutManager::ValidateTimeout(Duration::from_secs(30)).is_ok());
1569
1570 assert!(TimeoutManager::ValidateTimeout(Duration::from_secs(0)).is_err());
1571 }
1572
1573 #[test]
1574 fn test_timeout_manager_with_deadline() {
1575 let deadline = Instant::now() + Duration::from_secs(60);
1576
1577 let manager = TimeoutManager::with_deadline(deadline, Duration::from_secs(30));
1578
1579 let remaining = manager.Remaining();
1580
1581 assert!(remaining.is_some());
1582
1583 assert!(remaining.unwrap() <= Duration::from_secs(60));
1584 }
1585}