Skip to main content

CommonLibrary/Transport/
TransportStrategy.rs

1#![allow(non_snake_case, non_camel_case_types, non_upper_case_globals)]
2//! # TransportStrategy Trait
3//!
4//! Defines the core trait that all transport implementations must implement.
5//! This trait provides a unified, transport-agnostic interface for sending
6//! requests and notifications, with optional event streaming capabilities.
7//!
8//! All transports must be async and thread-safe (`Send + Sync`).
9
10use async_trait::async_trait;
11use serde::{Deserialize, Serialize};
12
13use super::{
14	Common::TransportType,
15	TransportConfig::TransportConfig,
16	TransportError::TransportError,
17	UnifiedRequest::UnifiedRequest,
18	UnifiedResponse::UnifiedResponse,
19};
20
21/// Core transport strategy trait.
22///
23/// This trait defines the essential operations that any transport mechanism
24/// must provide. Components interact with transports through this trait,
25/// allowing them to be transport-agnostic.
26#[async_trait]
27pub trait TransportStrategy: Send + Sync {
28	/// Establishes a connection to the transport endpoint.
29	async fn Connect(&mut self) -> Result<(), TransportError>;
30
31	/// Closes the connection and releases any associated resources.
32	async fn Disconnect(&mut self) -> Result<(), TransportError>;
33
34	/// Sends a request and waits for a response.
35	async fn SendRequest(&mut self, Request:UnifiedRequest) -> Result<UnifiedResponse, TransportError>;
36
37	/// Sends a notification (fire-and-forget message).
38	async fn SendNotification(&mut self, Notification:UnifiedRequest) -> Result<(), TransportError>;
39
40	/// Creates a stream of events from the transport.
41	fn StreamEvents(&self)
42	-> std::result::Result<futures::stream::BoxStream<'static, UnifiedResponse>, TransportError>;
43
44	/// Checks if the transport is currently connected.
45	fn IsConnected(&self) -> bool;
46
47	/// Returns the estimated round-trip latency in milliseconds.
48	fn LatencyMilliseconds(&self) -> u64;
49
50	/// Returns the type of transport (gRPC, IPC, WASM, etc.).
51	fn TransportKind(&self) -> TransportType;
52
53	/// Returns the transport's configuration.
54	fn Configuration(&self) -> &TransportConfig;
55
56	/// Checks if the transport supports bidirectional streaming.
57	fn SupportsStreaming(&self) -> bool;
58
59	/// Returns the transport's current capabilities and limits.
60	fn Capabilities(&self) -> TransportCapabilities;
61
62	/// Collects and returns current performance metrics.
63	fn Metrics(&self) -> TransportMetrics;
64}
65
66/// Transport capabilities and limits.
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub struct TransportCapabilities {
69	/// Maximum size of a single message in bytes.
70	pub MaximumMessageSize:usize,
71
72	/// Whether the transport supports request-response pattern.
73	pub SupportsRequestResponse:bool,
74
75	/// Whether the transport supports server-side streaming.
76	pub SupportsServerStreaming:bool,
77
78	/// Whether the transport supports client-side streaming.
79	pub SupportsClientStreaming:bool,
80
81	/// Whether the transport supports bidirectional streaming.
82	pub SupportsBidirectionalStreaming:bool,
83
84	/// Whether the transport supports broadcast/notifications.
85	pub SupportsNotifications:bool,
86
87	/// Estimated maximum concurrent requests/connections.
88	pub MaximumConcurrent:usize,
89
90	/// Whether the transport requires network connectivity.
91	pub RequiresNetwork:bool,
92
93	/// Whether the transport supports encryption/TLS.
94	pub SupportsEncryption:bool,
95
96	/// Whether the transport supports compression.
97	pub SupportsCompression:bool,
98}
99
100impl Default for TransportCapabilities {
101	fn default() -> Self {
102		Self {
103			MaximumMessageSize:1024 * 1024, // 1MB
104			SupportsRequestResponse:true,
105			SupportsServerStreaming:false,
106			SupportsClientStreaming:false,
107			SupportsBidirectionalStreaming:false,
108			SupportsNotifications:true,
109			MaximumConcurrent:100,
110			RequiresNetwork:false,
111			SupportsEncryption:false,
112			SupportsCompression:false,
113		}
114	}
115}
116
117/// Transport performance metrics.
118#[derive(Debug, Clone, Default)]
119pub struct TransportMetrics {
120	/// Total number of requests sent (including retries).
121	pub RequestsTotal:u64,
122
123	/// Total number of successful requests (2xx/OK responses).
124	pub RequestsSuccessful:u64,
125
126	/// Total number of failed requests (excludes timeouts/retries).
127	pub RequestsFailed:u64,
128
129	/// Total number of notifications sent.
130	pub NotificationsSent:u64,
131
132	/// Total number of connections established (includes reconnections).
133	pub ConnectionsEstablished:u64,
134
135	/// Total number of connection failures.
136	pub ConnectionFailures:u64,
137
138	/// Total bytes sent (compressed size if compression enabled).
139	pub BytesSent:u64,
140
141	/// Total bytes received (compressed size if compression enabled).
142	pub BytesReceived:u64,
143
144	/// Counter for circuit breaker state changes.
145	pub CircuitBreakerState:u32,
146
147	/// Histogram of request latencies in milliseconds (p50, p95, p99).
148	/// Stored as (count, sum, sum of squares) for online variance calculation.
149	pub LatencyMillisecondsHistogram:Option<(u64, f64, f64)>,
150
151	/// Current active connections (gauge).
152	pub ActiveConnections:u32,
153
154	/// Current pending requests (gauge).
155	pub PendingRequests:u32,
156}
157
158impl TransportMetrics {
159	/// Creates a new, empty metrics container.
160	pub fn New() -> Self { Self::default() }
161
162	/// Resets all cumulative metrics to zero.
163	pub fn Reset(&mut self) { *self = Self::New(); }
164
165	/// Computes the success rate as a percentage (0-100).
166	pub fn SuccessRate(&self) -> Option<f64> {
167		let Total = self.RequestsTotal;
168		if Total == 0 {
169			None
170		} else {
171			Some((self.RequestsSuccessful as f64 / Total as f64) * 100.0)
172		}
173	}
174
175	/// Computes the average request latency in milliseconds.
176	pub fn AverageLatency(&self) -> Option<f64> {
177		let (Count, Sum, _) = self.LatencyMillisecondsHistogram?;
178		if Count == 0 { None } else { Some(Sum / Count as f64) }
179	}
180
181	/// Computes the 95th percentile latency from the histogram.
182	pub fn LatencyPercentile95(&self) -> Option<f64> {
183		let (Count, Mean, SumSquared) = self.LatencyMillisecondsHistogram?;
184		if Count < 20 {
185			return None;
186		}
187		let Variance = (SumSquared / Count as f64) - (Mean * Mean);
188		let StandardDeviation = Variance.sqrt();
189		Some(Mean + 1.645 * StandardDeviation)
190	}
191
192	/// Records a request latency sample.
193	pub fn RecordLatency(&mut self, LatencyMilliseconds:f64) {
194		let (Count, Sum, SumSquared) = self.LatencyMillisecondsHistogram.get_or_insert((0, 0.0, 0.0));
195		*Count += 1;
196		*Sum += LatencyMilliseconds;
197		*SumSquared += LatencyMilliseconds * LatencyMilliseconds;
198	}
199
200	/// Increments the RequestsTotal and RequestsSuccessful counters.
201	pub fn IncrementRequestSuccess(&mut self) {
202		self.RequestsTotal += 1;
203		self.RequestsSuccessful += 1;
204	}
205
206	/// Increments the RequestsTotal and RequestsFailed counters.
207	pub fn IncrementRequestFailure(&mut self) {
208		self.RequestsTotal += 1;
209		self.RequestsFailed += 1;
210	}
211
212	/// Updates the circuit breaker state.
213	pub fn SetCircuitBreakerState(&mut self, State:CircuitBreakerState) {
214		let StateCode = match State {
215			CircuitBreakerState::Closed => 1,
216			CircuitBreakerState::Open => 0,
217			CircuitBreakerState::HalfOpen => 2,
218		};
219		let OldState = self.CircuitBreakerState;
220		self.CircuitBreakerState = (OldState & 0xFFFF_0000) | StateCode as u32;
221	}
222}
223
224/// Circuit breaker state.
225#[derive(Debug, Clone, Copy, PartialEq, Eq)]
226pub enum CircuitBreakerState {
227	/// Circuit is closed; requests flow normally.
228	Closed,
229	/// Circuit is open; requests are rejected immediately.
230	Open,
231	/// Circuit is half-open; limited requests are allowed to test recovery.
232	HalfOpen,
233}
234
235/// Transport-specific error codes.
236#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
237#[repr(u16)]
238pub enum TransportErrorCode {
239	/// Connection to endpoint failed or was lost.
240	ConnectionFailed = 100,
241	/// Operation timed out.
242	Timeout = 101,
243	/// Target endpoint not found/service unavailable.
244	NotFound = 102,
245	/// Invalid request format or parameters.
246	InvalidRequest = 103,
247	/// Remote endpoint returned an application error.
248	RemoteError = 104,
249	/// Message too large for transport.
250	MessageTooLarge = 105,
251	/// Encryption/decryption failed.
252	EncryptionError = 106,
253	/// Serialization/deserialization failed.
254	SerializationError = 107,
255	/// Authentication/authorization failed.
256	Unauthorized = 108,
257	/// Rate limit exceeded.
258	RateLimited = 109,
259	/// Feature not supported by this transport.
260	NotSupported = 110,
261	/// Internal transport error (bug, corrupted state).
262	InternalError = 111,
263	/// Circuit breaker is open; request rejected.
264	CircuitBreakerOpen = 112,
265	/// Stream already in use or closed.
266	StreamError = 113,
267	/// Configuration error (invalid settings).
268	ConfigurationError = 114,
269}
270
271impl TransportErrorCode {
272	/// Returns `true` if this error code is retryable.
273	pub fn IsRetryable(&self) -> bool {
274		matches!(
275			self,
276			TransportErrorCode::ConnectionFailed
277				| TransportErrorCode::Timeout
278				| TransportErrorCode::RateLimited
279				| TransportErrorCode::RemoteError
280		)
281	}
282
283	/// Returns the recommended retry delay in milliseconds for this error.
284	pub fn RecommendedRetryDelayMilliseconds(&self) -> u64 {
285		match self {
286			TransportErrorCode::ConnectionFailed => 1000,
287			TransportErrorCode::Timeout => 500,
288			TransportErrorCode::RateLimited => 2000,
289			TransportErrorCode::RemoteError => 300,
290			_ => 0,
291		}
292	}
293}
294
295#[cfg(test)]
296mod tests {
297	use super::*;
298
299	#[test]
300	fn TestRetryableErrorCodes() {
301		assert!(TransportErrorCode::ConnectionFailed.IsRetryable());
302		assert!(TransportErrorCode::Timeout.IsRetryable());
303		assert!(TransportErrorCode::RateLimited.IsRetryable());
304		assert!(!TransportErrorCode::InvalidRequest.IsRetryable());
305		assert!(!TransportErrorCode::NotFound.IsRetryable());
306	}
307
308	#[test]
309	fn TestErrorRecommendedDelays() {
310		assert_eq!(TransportErrorCode::ConnectionFailed.RecommendedRetryDelayMilliseconds(), 1000);
311		assert_eq!(TransportErrorCode::Timeout.RecommendedRetryDelayMilliseconds(), 500);
312		assert_eq!(TransportErrorCode::RateLimited.RecommendedRetryDelayMilliseconds(), 2000);
313		assert_eq!(TransportErrorCode::InvalidRequest.RecommendedRetryDelayMilliseconds(), 0);
314	}
315}