1use std::{
51 collections::HashMap,
52 sync::Arc,
53 time::{Duration, Instant},
54};
55
56use lazy_static::lazy_static;
57use parking_lot::Mutex;
58use serde_json::{Value, from_slice, to_vec};
59use tokio::time::timeout;
60
61use super::{
62 Error::VineError,
63 Generated::{GenericNotification, GenericRequest, cocoon_service_client::CocoonServiceClient},
64};
65use crate::dev_log;
66
67type CocoonClient = CocoonServiceClient<tonic::transport::Channel>;
69
70mod Config {
72 pub const DEFAULT_TIMEOUT_MS:u64 = 5000;
74
75 pub const MAX_RETRY_ATTEMPTS:usize = 3;
77
78 pub const RETRY_BASE_DELAY_MS:u64 = 100;
80
81 pub const MAX_MESSAGE_SIZE_BYTES:usize = 4 * 1024 * 1024;
83
84 pub const HEALTH_CHECK_INTERVAL_MS:u64 = 30000;
86
87 pub const CONNECTION_TIMEOUT_MS:u64 = 10000;
89}
90
91struct ConnectionMetadata {
93 LastActivity:Instant,
95 FailureCount:usize,
97 IsHealthy:bool,
99}
100
101lazy_static! {
102 static ref SIDECAR_CLIENTS: Arc<Mutex<HashMap<String, CocoonClient>>> = Arc::new(Mutex::new(HashMap::new()));
104
105 static ref CONNECTION_METADATA: Arc<Mutex<HashMap<String, ConnectionMetadata>>> = Arc::new(Mutex::new(HashMap::new()));
107}
108
109pub async fn ConnectToSideCar(SideCarIdentifier:String, Address:String) -> Result<(), VineError> {
132 dev_log!(
133 "grpc",
134 "[VineClient] Connecting to sidecar '{}' at '{}'...",
135 SideCarIdentifier,
136 Address
137 );
138
139 let endpoint = format!("http://{}", Address);
140
141 if endpoint.len() > 256 {
143 return Err(VineError::RPCError(format!("Invalid endpoint address: exceeds maximum length")));
144 }
145
146 let mut last_error = None;
148
149 for attempt in 1..=Config::MAX_RETRY_ATTEMPTS {
150 let result = try_connect_single(&SideCarIdentifier, &endpoint).await;
151
152 if result.is_ok() {
153 CONNECTION_METADATA.lock().insert(
155 SideCarIdentifier.clone(),
156 ConnectionMetadata { LastActivity:Instant::now(), FailureCount:0, IsHealthy:true },
157 );
158
159 dev_log!("grpc", "[VineClient] Successfully connected to sidecar '{}'", SideCarIdentifier);
160
161 return Ok(result?);
162 }
163
164 last_error = Some(result.unwrap_err());
166
167 if attempt < Config::MAX_RETRY_ATTEMPTS {
169 let delay_ms = Config::RETRY_BASE_DELAY_MS * 2_u64.pow(attempt as u32);
170 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
171 }
172 }
173
174 Err(last_error.unwrap_or_else(|| VineError::RPCError("Connection failed".to_string())))
175}
176
177async fn try_connect_single(_SideCarIdentifier:&str, endpoint:&str) -> Result<(), VineError> {
179 let endpoint_url = if endpoint.starts_with("http://") || endpoint.starts_with("https://") {
180 endpoint.to_string()
181 } else {
182 format!("http://{}", endpoint)
183 };
184
185 let channel = tonic::transport::Channel::from_shared(endpoint_url)
186 .map_err(|e| VineError::RPCError(format!("Failed to create channel: {}", e)))?
187 .connect()
188 .await
189 .map_err(|e| VineError::RPCError(format!("Failed to connect: {}", e)))?;
190
191 let client = CocoonClient::new(channel);
192
193 let mut clients = SIDECAR_CLIENTS.lock();
194 clients.insert(_SideCarIdentifier.to_string(), client);
195
196 Ok(())
197}
198
199pub fn DisconnectFromSideCar(SideCarIdentifier:String) -> Result<(), VineError> {
220 let mut clients = SIDECAR_CLIENTS.lock();
221
222 if clients.remove(&SideCarIdentifier).is_some() {
223 CONNECTION_METADATA.lock().remove(&SideCarIdentifier);
224
225 dev_log!("grpc", "[VineClient] Disconnected from sidecar '{}'", SideCarIdentifier);
226
227 Ok(())
228 } else {
229 Err(VineError::ClientNotConnected(SideCarIdentifier))
230 }
231}
232
233pub fn CheckSideCarHealth(SideCarIdentifier:&str) -> Result<bool, VineError> {
257 let metadata = CONNECTION_METADATA.lock();
258
259 if let Some(conn) = metadata.get(SideCarIdentifier) {
260 let is_stale = conn.LastActivity.elapsed() > Duration::from_millis(Config::HEALTH_CHECK_INTERVAL_MS);
261 let has_many_failures = conn.FailureCount > Config::MAX_RETRY_ATTEMPTS;
262
263 Ok(conn.IsHealthy && !is_stale && !has_many_failures)
264 } else {
265 Err(VineError::ClientNotConnected(SideCarIdentifier.to_string()))
266 }
267}
268
269fn RecordSideCarFailure(SideCarIdentifier:&str) {
276 let mut metadata = CONNECTION_METADATA.lock();
277
278 if let Some(conn) = metadata.get_mut(SideCarIdentifier) {
279 conn.FailureCount += 1;
280 conn.IsHealthy = false;
281 }
282}
283
284fn UpdateSideCarActivity(SideCarIdentifier:&str) {
291 let mut metadata = CONNECTION_METADATA.lock();
292
293 if let Some(conn) = metadata.get_mut(SideCarIdentifier) {
294 conn.LastActivity = Instant::now();
295 conn.FailureCount = 0;
296 conn.IsHealthy = true;
297 }
298}
299
300fn ValidateMessageSize(data:&[u8]) -> Result<(), VineError> {
311 if data.len() > Config::MAX_MESSAGE_SIZE_BYTES {
312 Err(VineError::MessageTooLarge { ActualSize:data.len(), MaxSize:Config::MAX_MESSAGE_SIZE_BYTES })
313 } else {
314 Ok(())
315 }
316}
317
318pub async fn SendRequest(
345 SideCarIdentifier:&str,
346 Method:String,
347 Parameters:Value,
348 TimeoutMilliseconds:u64,
349) -> Result<Value, VineError> {
350 if Method.is_empty() || Method.len() > 128 {
352 return Err(VineError::RPCError(
353 "Method name must be between 1 and 128 characters".to_string(),
354 ));
355 }
356
357 let timeout_duration = Duration::from_millis(if TimeoutMilliseconds > 0 {
358 TimeoutMilliseconds
359 } else {
360 Config::DEFAULT_TIMEOUT_MS
361 });
362
363 let parameter_bytes =
365 to_vec(&Parameters).map_err(|e| VineError::RPCError(format!("Failed to serialize parameters: {}", e)))?;
366 ValidateMessageSize(¶meter_bytes)?;
367
368 let client = {
369 let guard = SIDECAR_CLIENTS.lock();
370 guard.get(SideCarIdentifier).cloned()
371 };
372
373 if client.is_none() {
374 return Err(VineError::ClientNotConnected(SideCarIdentifier.to_string()));
375 }
376
377 let mut client = client.unwrap();
378
379 let request_identifier = std::time::SystemTime::now()
380 .duration_since(std::time::UNIX_EPOCH)
381 .unwrap()
382 .as_nanos() as u64;
383 let method_clone = Method.clone();
384 let request = GenericRequest { request_identifier, method:Method, parameter:parameter_bytes };
385
386 let result = timeout(timeout_duration, client.process_mountain_request(request)).await;
387
388 match result {
389 Ok(Ok(response)) => {
390 UpdateSideCarActivity(SideCarIdentifier);
391 dev_log!(
392 "grpc",
393 "[VineClient] Request sent successfully to sidecar '{}': method='{}'",
394 SideCarIdentifier,
395 method_clone
396 );
397
398 let inner_response = response.into_inner();
400
401 let result_bytes = inner_response.result;
403 let result_value:Value = from_slice(&result_bytes)
404 .map_err(|e| VineError::RPCError(format!("Failed to deserialize response: {}", e)))?;
405
406 if let Some(error_data) = inner_response.error {
408 return Err(VineError::RPCError(format!(
409 "RPC error from sidecar: code={}, message={}",
410 error_data.code, error_data.message
411 )));
412 }
413
414 Ok(result_value)
415 },
416 Ok(Err(status)) => {
417 RecordSideCarFailure(SideCarIdentifier);
418 return Err(VineError::RPCError(format!("gRPC error: {}", status)));
419 },
420 Err(_) => {
421 RecordSideCarFailure(SideCarIdentifier);
422 Err(VineError::RequestTimeout {
423 SideCarIdentifier:SideCarIdentifier.to_string(),
424 MethodName:method_clone,
425 TimeoutMilliseconds:timeout_duration.as_millis() as u64,
426 })
427 },
428 }
429}
430
431pub async fn SendNotification(SideCarIdentifier:String, Method:String, Parameters:Value) -> Result<(), VineError> {
449 if Method.is_empty() || Method.len() > 128 {
451 return Err(VineError::RPCError(
452 "Method name must be between 1 and 128 characters".to_string(),
453 ));
454 }
455
456 let parameter_bytes = to_vec(&Parameters)?;
457 ValidateMessageSize(¶meter_bytes)?;
458
459 let mut client = {
460 let guard = SIDECAR_CLIENTS.lock();
461 guard.get(&SideCarIdentifier).cloned()
462 };
463
464 if let Some(ref mut client) = client {
465 let request = GenericNotification { method:Method, parameter:parameter_bytes };
466
467 match client.send_mountain_notification(request).await {
468 Ok(_) => {
469 UpdateSideCarActivity(&SideCarIdentifier);
470 dev_log!(
471 "grpc",
472 "[VineClient] Notification sent successfully to sidecar '{}'",
473 SideCarIdentifier
474 );
475 Ok(())
476 },
477 Err(status) => {
478 RecordSideCarFailure(&SideCarIdentifier);
479 dev_log!(
480 "grpc",
481 "error: [VineClient] Failed to send notification to sidecar '{}': {}",
482 SideCarIdentifier,
483 status
484 );
485 Err(VineError::from(status))
486 },
487 }
488 } else {
489 Err(VineError::ClientNotConnected(SideCarIdentifier))
490 }
491}