1#![allow(non_snake_case, non_camel_case_types, non_upper_case_globals)]
2use async_trait::async_trait;
11use serde::{Deserialize, Serialize};
12
13use super::{
14 Common::TransportType,
15 TransportConfig::TransportConfig,
16 TransportError::TransportError,
17 UnifiedRequest::UnifiedRequest,
18 UnifiedResponse::UnifiedResponse,
19};
20
21#[async_trait]
27pub trait TransportStrategy: Send + Sync {
28 async fn Connect(&mut self) -> Result<(), TransportError>;
30
31 async fn Disconnect(&mut self) -> Result<(), TransportError>;
33
34 async fn SendRequest(&mut self, Request:UnifiedRequest) -> Result<UnifiedResponse, TransportError>;
36
37 async fn SendNotification(&mut self, Notification:UnifiedRequest) -> Result<(), TransportError>;
39
40 fn StreamEvents(&self)
42 -> std::result::Result<futures::stream::BoxStream<'static, UnifiedResponse>, TransportError>;
43
44 fn IsConnected(&self) -> bool;
46
47 fn LatencyMilliseconds(&self) -> u64;
49
50 fn TransportKind(&self) -> TransportType;
52
53 fn Configuration(&self) -> &TransportConfig;
55
56 fn SupportsStreaming(&self) -> bool;
58
59 fn Capabilities(&self) -> TransportCapabilities;
61
62 fn Metrics(&self) -> TransportMetrics;
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub struct TransportCapabilities {
69 pub MaximumMessageSize:usize,
71
72 pub SupportsRequestResponse:bool,
74
75 pub SupportsServerStreaming:bool,
77
78 pub SupportsClientStreaming:bool,
80
81 pub SupportsBidirectionalStreaming:bool,
83
84 pub SupportsNotifications:bool,
86
87 pub MaximumConcurrent:usize,
89
90 pub RequiresNetwork:bool,
92
93 pub SupportsEncryption:bool,
95
96 pub SupportsCompression:bool,
98}
99
100impl Default for TransportCapabilities {
101 fn default() -> Self {
102 Self {
103 MaximumMessageSize:1024 * 1024, SupportsRequestResponse:true,
105 SupportsServerStreaming:false,
106 SupportsClientStreaming:false,
107 SupportsBidirectionalStreaming:false,
108 SupportsNotifications:true,
109 MaximumConcurrent:100,
110 RequiresNetwork:false,
111 SupportsEncryption:false,
112 SupportsCompression:false,
113 }
114 }
115}
116
117#[derive(Debug, Clone, Default)]
119pub struct TransportMetrics {
120 pub RequestsTotal:u64,
122
123 pub RequestsSuccessful:u64,
125
126 pub RequestsFailed:u64,
128
129 pub NotificationsSent:u64,
131
132 pub ConnectionsEstablished:u64,
134
135 pub ConnectionFailures:u64,
137
138 pub BytesSent:u64,
140
141 pub BytesReceived:u64,
143
144 pub CircuitBreakerState:u32,
146
147 pub LatencyMillisecondsHistogram:Option<(u64, f64, f64)>,
150
151 pub ActiveConnections:u32,
153
154 pub PendingRequests:u32,
156}
157
158impl TransportMetrics {
159 pub fn New() -> Self { Self::default() }
161
162 pub fn Reset(&mut self) { *self = Self::New(); }
164
165 pub fn SuccessRate(&self) -> Option<f64> {
167 let Total = self.RequestsTotal;
168 if Total == 0 {
169 None
170 } else {
171 Some((self.RequestsSuccessful as f64 / Total as f64) * 100.0)
172 }
173 }
174
175 pub fn AverageLatency(&self) -> Option<f64> {
177 let (Count, Sum, _) = self.LatencyMillisecondsHistogram?;
178 if Count == 0 { None } else { Some(Sum / Count as f64) }
179 }
180
181 pub fn LatencyPercentile95(&self) -> Option<f64> {
183 let (Count, Mean, SumSquared) = self.LatencyMillisecondsHistogram?;
184 if Count < 20 {
185 return None;
186 }
187 let Variance = (SumSquared / Count as f64) - (Mean * Mean);
188 let StandardDeviation = Variance.sqrt();
189 Some(Mean + 1.645 * StandardDeviation)
190 }
191
192 pub fn RecordLatency(&mut self, LatencyMilliseconds:f64) {
194 let (Count, Sum, SumSquared) = self.LatencyMillisecondsHistogram.get_or_insert((0, 0.0, 0.0));
195 *Count += 1;
196 *Sum += LatencyMilliseconds;
197 *SumSquared += LatencyMilliseconds * LatencyMilliseconds;
198 }
199
200 pub fn IncrementRequestSuccess(&mut self) {
202 self.RequestsTotal += 1;
203 self.RequestsSuccessful += 1;
204 }
205
206 pub fn IncrementRequestFailure(&mut self) {
208 self.RequestsTotal += 1;
209 self.RequestsFailed += 1;
210 }
211
212 pub fn SetCircuitBreakerState(&mut self, State:CircuitBreakerState) {
214 let StateCode = match State {
215 CircuitBreakerState::Closed => 1,
216 CircuitBreakerState::Open => 0,
217 CircuitBreakerState::HalfOpen => 2,
218 };
219 let OldState = self.CircuitBreakerState;
220 self.CircuitBreakerState = (OldState & 0xFFFF_0000) | StateCode as u32;
221 }
222}
223
224#[derive(Debug, Clone, Copy, PartialEq, Eq)]
226pub enum CircuitBreakerState {
227 Closed,
229 Open,
231 HalfOpen,
233}
234
235#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
237#[repr(u16)]
238pub enum TransportErrorCode {
239 ConnectionFailed = 100,
241 Timeout = 101,
243 NotFound = 102,
245 InvalidRequest = 103,
247 RemoteError = 104,
249 MessageTooLarge = 105,
251 EncryptionError = 106,
253 SerializationError = 107,
255 Unauthorized = 108,
257 RateLimited = 109,
259 NotSupported = 110,
261 InternalError = 111,
263 CircuitBreakerOpen = 112,
265 StreamError = 113,
267 ConfigurationError = 114,
269}
270
271impl TransportErrorCode {
272 pub fn IsRetryable(&self) -> bool {
274 matches!(
275 self,
276 TransportErrorCode::ConnectionFailed
277 | TransportErrorCode::Timeout
278 | TransportErrorCode::RateLimited
279 | TransportErrorCode::RemoteError
280 )
281 }
282
283 pub fn RecommendedRetryDelayMilliseconds(&self) -> u64 {
285 match self {
286 TransportErrorCode::ConnectionFailed => 1000,
287 TransportErrorCode::Timeout => 500,
288 TransportErrorCode::RateLimited => 2000,
289 TransportErrorCode::RemoteError => 300,
290 _ => 0,
291 }
292 }
293}
294
295#[cfg(test)]
296mod tests {
297 use super::*;
298
299 #[test]
300 fn TestRetryableErrorCodes() {
301 assert!(TransportErrorCode::ConnectionFailed.IsRetryable());
302 assert!(TransportErrorCode::Timeout.IsRetryable());
303 assert!(TransportErrorCode::RateLimited.IsRetryable());
304 assert!(!TransportErrorCode::InvalidRequest.IsRetryable());
305 assert!(!TransportErrorCode::NotFound.IsRetryable());
306 }
307
308 #[test]
309 fn TestErrorRecommendedDelays() {
310 assert_eq!(TransportErrorCode::ConnectionFailed.RecommendedRetryDelayMilliseconds(), 1000);
311 assert_eq!(TransportErrorCode::Timeout.RecommendedRetryDelayMilliseconds(), 500);
312 assert_eq!(TransportErrorCode::RateLimited.RecommendedRetryDelayMilliseconds(), 2000);
313 assert_eq!(TransportErrorCode::InvalidRequest.RecommendedRetryDelayMilliseconds(), 0);
314 }
315}