Grove/Transport/
gRPCTransport.rs1#![allow(non_snake_case, non_camel_case_types, non_upper_case_globals)]
2use std::sync::Arc;
8
9use async_trait::async_trait;
10use tokio::sync::RwLock;
11use tonic::transport::{Channel, Endpoint};
12use crate::dev_log;
13
14use crate::Transport::{
15 Strategy::{TransportStats, TransportStrategy, TransportType},
16 TransportConfig,
17};
18
19#[derive(Clone, Debug)]
21pub struct gRPCTransport {
22 Endpoint:String,
24 Channel:Arc<RwLock<Option<Channel>>>,
26 Configuration:TransportConfig,
28 Connected:Arc<RwLock<bool>>,
30 Statistics:Arc<RwLock<TransportStats>>,
32}
33
34impl gRPCTransport {
35 pub fn New(Address:&str) -> anyhow::Result<Self> {
37 Ok(Self {
38 Endpoint:Address.to_string(),
39 Channel:Arc::new(RwLock::new(None)),
40 Configuration:TransportConfig::default(),
41 Connected:Arc::new(RwLock::new(false)),
42 Statistics:Arc::new(RwLock::new(TransportStats::default())),
43 })
44 }
45
46 pub fn WithConfiguration(Address:&str, Configuration:TransportConfig) -> anyhow::Result<Self> {
48 Ok(Self {
49 Endpoint:Address.to_string(),
50 Channel:Arc::new(RwLock::new(None)),
51 Configuration,
52 Connected:Arc::new(RwLock::new(false)),
53 Statistics:Arc::new(RwLock::new(TransportStats::default())),
54 })
55 }
56
57 pub fn Address(&self) -> &str { &self.Endpoint }
59
60 pub async fn GetChannel(&self) -> anyhow::Result<Channel> {
62 self.Channel
63 .read()
64 .await
65 .as_ref()
66 .cloned()
67 .ok_or_else(|| anyhow::anyhow!("gRPC channel not connected"))
68 }
69
70 pub async fn Statistics(&self) -> TransportStats { self.Statistics.read().await.clone() }
72
73 fn BuildEndpoint(&self) -> anyhow::Result<Endpoint> {
75 let EndpointValue = Endpoint::from_shared(self.Endpoint.clone())?
76 .timeout(self.Configuration.ConnectionTimeout)
77 .connect_timeout(self.Configuration.ConnectionTimeout)
78 .tcp_keepalive(Some(self.Configuration.KeepaliveInterval));
79 Ok(EndpointValue)
80 }
81}
82
83#[async_trait]
84impl TransportStrategy for gRPCTransport {
85 type Error = gRPCTransportError;
86
87 async fn connect(&self) -> Result<(), Self::Error> {
88 dev_log!("grpc", "Connecting to gRPC endpoint: {}", self.Endpoint);
89
90 let EndpointValue = self
91 .BuildEndpoint()
92 .map_err(|E| gRPCTransportError::ConnectionFailed(E.to_string()))?;
93
94 let ChannelValue = EndpointValue
95 .connect()
96 .await
97 .map_err(|E| gRPCTransportError::ConnectionFailed(E.to_string()))?;
98
99 *self.Channel.write().await = Some(ChannelValue);
100 *self.Connected.write().await = true;
101
102 dev_log!("grpc", "gRPC connection established: {}", self.Endpoint);
103 Ok(())
104 }
105
106 async fn send(&self, request:&[u8]) -> Result<Vec<u8>, Self::Error> {
107 let Start = std::time::Instant::now();
108
109 if !self.is_connected() {
110 return Err(gRPCTransportError::NotConnected);
111 }
112
113 dev_log!("grpc", "Sending gRPC request ({} bytes)", request.len());
114
115 let Response:Vec<u8> = vec![];
116 let LatencyMicroseconds = Start.elapsed().as_micros() as u64;
117
118 let mut Stats = self.Statistics.write().await;
119 Stats.record_sent(request.len() as u64, LatencyMicroseconds);
120 Stats.record_received(Response.len() as u64);
121
122 dev_log!("grpc", "gRPC request completed in {}µs", LatencyMicroseconds);
123 Ok(Response)
124 }
125
126 async fn send_no_response(&self, data:&[u8]) -> Result<(), Self::Error> {
127 if !self.is_connected() {
128 return Err(gRPCTransportError::NotConnected);
129 }
130
131 dev_log!("grpc", "Sending gRPC notification ({} bytes)", data.len());
132
133 let mut Stats = self.Statistics.write().await;
134 Stats.record_sent(data.len() as u64, 0);
135 Ok(())
136 }
137
138 async fn close(&self) -> Result<(), Self::Error> {
139 dev_log!("grpc", "Closing gRPC connection: {}", self.Endpoint);
140 *self.Channel.write().await = None;
141 *self.Connected.write().await = false;
142 dev_log!("grpc", "gRPC connection closed: {}", self.Endpoint);
143 Ok(())
144 }
145
146 fn is_connected(&self) -> bool { *self.Connected.blocking_read() }
147
148 fn transport_type(&self) -> TransportType { TransportType::gRPC }
149}
150
151#[derive(Debug, thiserror::Error)]
153pub enum gRPCTransportError {
154 #[error("Connection failed: {0}")]
156 ConnectionFailed(String),
157 #[error("Send failed: {0}")]
159 SendFailed(String),
160 #[error("Receive failed: {0}")]
162 ReceiveFailed(String),
163 #[error("Not connected")]
165 NotConnected,
166 #[error("Timeout")]
168 Timeout,
169 #[error("gRPC error: {0}")]
171 Error(String),
172}
173
174impl From<tonic::transport::Error> for gRPCTransportError {
175 fn from(Error:tonic::transport::Error) -> Self { gRPCTransportError::ConnectionFailed(Error.to_string()) }
176}
177
178impl From<tonic::Status> for gRPCTransportError {
179 fn from(Status:tonic::Status) -> Self { gRPCTransportError::Error(Status.to_string()) }
180}
181
182#[cfg(test)]
183mod tests {
184 use super::*;
185
186 #[test]
187 fn TestgRPCTransportCreation() {
188 let Result = gRPCTransport::New("127.0.0.1:50050");
189 assert!(Result.is_ok());
190 let Transport = Result.unwrap();
191 assert_eq!(Transport.Address(), "127.0.0.1:50050");
192 }
193
194 #[tokio::test]
195 async fn TestgRPCTransportNotConnected() {
196 let Transport = gRPCTransport::New("127.0.0.1:50050").unwrap();
197 assert!(!Transport.is_connected());
198 }
199}