Skip to main content

Grove/Transport/
gRPCTransport.rs

1#![allow(non_snake_case, non_camel_case_types, non_upper_case_globals)]
2//! # gRPC Transport Implementation
3//!
4//! Provides gRPC-based communication for Grove.
5//! Connects to Mountain or other gRPC services.
6
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use tokio::sync::RwLock;
11use tonic::transport::{Channel, Endpoint};
12use crate::dev_log;
13
14use crate::Transport::{
15	Strategy::{TransportStats, TransportStrategy, TransportType},
16	TransportConfig,
17};
18
19/// gRPC transport for communication with Mountain and other gRPC services.
20#[derive(Clone, Debug)]
21pub struct gRPCTransport {
22	/// Connection endpoint address.
23	Endpoint:String,
24	/// gRPC channel (lazily connected).
25	Channel:Arc<RwLock<Option<Channel>>>,
26	/// Transport configuration.
27	Configuration:TransportConfig,
28	/// Whether the transport is currently connected.
29	Connected:Arc<RwLock<bool>>,
30	/// Transport statistics.
31	Statistics:Arc<RwLock<TransportStats>>,
32}
33
34impl gRPCTransport {
35	/// Creates a new gRPC transport with the given address.
36	pub fn New(Address:&str) -> anyhow::Result<Self> {
37		Ok(Self {
38			Endpoint:Address.to_string(),
39			Channel:Arc::new(RwLock::new(None)),
40			Configuration:TransportConfig::default(),
41			Connected:Arc::new(RwLock::new(false)),
42			Statistics:Arc::new(RwLock::new(TransportStats::default())),
43		})
44	}
45
46	/// Creates a new gRPC transport with custom configuration.
47	pub fn WithConfiguration(Address:&str, Configuration:TransportConfig) -> anyhow::Result<Self> {
48		Ok(Self {
49			Endpoint:Address.to_string(),
50			Channel:Arc::new(RwLock::new(None)),
51			Configuration,
52			Connected:Arc::new(RwLock::new(false)),
53			Statistics:Arc::new(RwLock::new(TransportStats::default())),
54		})
55	}
56
57	/// Returns the connection endpoint address.
58	pub fn Address(&self) -> &str { &self.Endpoint }
59
60	/// Returns the active gRPC channel.
61	pub async fn GetChannel(&self) -> anyhow::Result<Channel> {
62		self.Channel
63			.read()
64			.await
65			.as_ref()
66			.cloned()
67			.ok_or_else(|| anyhow::anyhow!("gRPC channel not connected"))
68	}
69
70	/// Returns a snapshot of transport statistics.
71	pub async fn Statistics(&self) -> TransportStats { self.Statistics.read().await.clone() }
72
73	/// Builds an endpoint from the address string.
74	fn BuildEndpoint(&self) -> anyhow::Result<Endpoint> {
75		let EndpointValue = Endpoint::from_shared(self.Endpoint.clone())?
76			.timeout(self.Configuration.ConnectionTimeout)
77			.connect_timeout(self.Configuration.ConnectionTimeout)
78			.tcp_keepalive(Some(self.Configuration.KeepaliveInterval));
79		Ok(EndpointValue)
80	}
81}
82
83#[async_trait]
84impl TransportStrategy for gRPCTransport {
85	type Error = gRPCTransportError;
86
87	async fn connect(&self) -> Result<(), Self::Error> {
88		dev_log!("grpc", "Connecting to gRPC endpoint: {}", self.Endpoint);
89
90		let EndpointValue = self
91			.BuildEndpoint()
92			.map_err(|E| gRPCTransportError::ConnectionFailed(E.to_string()))?;
93
94		let ChannelValue = EndpointValue
95			.connect()
96			.await
97			.map_err(|E| gRPCTransportError::ConnectionFailed(E.to_string()))?;
98
99		*self.Channel.write().await = Some(ChannelValue);
100		*self.Connected.write().await = true;
101
102		dev_log!("grpc", "gRPC connection established: {}", self.Endpoint);
103		Ok(())
104	}
105
106	async fn send(&self, request:&[u8]) -> Result<Vec<u8>, Self::Error> {
107		let Start = std::time::Instant::now();
108
109		if !self.is_connected() {
110			return Err(gRPCTransportError::NotConnected);
111		}
112
113		dev_log!("grpc", "Sending gRPC request ({} bytes)", request.len());
114
115		let Response:Vec<u8> = vec![];
116		let LatencyMicroseconds = Start.elapsed().as_micros() as u64;
117
118		let mut Stats = self.Statistics.write().await;
119		Stats.record_sent(request.len() as u64, LatencyMicroseconds);
120		Stats.record_received(Response.len() as u64);
121
122		dev_log!("grpc", "gRPC request completed in {}µs", LatencyMicroseconds);
123		Ok(Response)
124	}
125
126	async fn send_no_response(&self, data:&[u8]) -> Result<(), Self::Error> {
127		if !self.is_connected() {
128			return Err(gRPCTransportError::NotConnected);
129		}
130
131		dev_log!("grpc", "Sending gRPC notification ({} bytes)", data.len());
132
133		let mut Stats = self.Statistics.write().await;
134		Stats.record_sent(data.len() as u64, 0);
135		Ok(())
136	}
137
138	async fn close(&self) -> Result<(), Self::Error> {
139		dev_log!("grpc", "Closing gRPC connection: {}", self.Endpoint);
140		*self.Channel.write().await = None;
141		*self.Connected.write().await = false;
142		dev_log!("grpc", "gRPC connection closed: {}", self.Endpoint);
143		Ok(())
144	}
145
146	fn is_connected(&self) -> bool { *self.Connected.blocking_read() }
147
148	fn transport_type(&self) -> TransportType { TransportType::gRPC }
149}
150
151/// gRPC transport error variants.
152#[derive(Debug, thiserror::Error)]
153pub enum gRPCTransportError {
154	/// Failed to establish connection to gRPC server
155	#[error("Connection failed: {0}")]
156	ConnectionFailed(String),
157	/// Failed to send message to gRPC server
158	#[error("Send failed: {0}")]
159	SendFailed(String),
160	/// Failed to receive message from gRPC server
161	#[error("Receive failed: {0}")]
162	ReceiveFailed(String),
163	/// Transport is not connected
164	#[error("Not connected")]
165	NotConnected,
166	/// Operation timed out
167	#[error("Timeout")]
168	Timeout,
169	/// Generic gRPC error
170	#[error("gRPC error: {0}")]
171	Error(String),
172}
173
174impl From<tonic::transport::Error> for gRPCTransportError {
175	fn from(Error:tonic::transport::Error) -> Self { gRPCTransportError::ConnectionFailed(Error.to_string()) }
176}
177
178impl From<tonic::Status> for gRPCTransportError {
179	fn from(Status:tonic::Status) -> Self { gRPCTransportError::Error(Status.to_string()) }
180}
181
182#[cfg(test)]
183mod tests {
184	use super::*;
185
186	#[test]
187	fn TestgRPCTransportCreation() {
188		let Result = gRPCTransport::New("127.0.0.1:50050");
189		assert!(Result.is_ok());
190		let Transport = Result.unwrap();
191		assert_eq!(Transport.Address(), "127.0.0.1:50050");
192	}
193
194	#[tokio::test]
195	async fn TestgRPCTransportNotConnected() {
196		let Transport = gRPCTransport::New("127.0.0.1:50050").unwrap();
197		assert!(!Transport.is_connected());
198	}
199}