Grove/Protocol/
SpineConnection.rs1use std::sync::Arc;
38
39use anyhow::Result;
40use tokio::sync::RwLock;
41use crate::dev_log;
42
43use crate::Protocol::ProtocolConfig;
44#[cfg(feature = "grove_echo")]
45use crate::vine::generated::vine::{
46 EchoAction,
47 EchoActionResponse,
48 echo_action_service_client::EchoActionServiceClient,
49};
50
51#[derive(Debug, Clone, Copy, PartialEq)]
53pub enum ConnectionState {
54 Disconnected,
56 Connecting,
58 Connected,
60 Error,
62}
63
64#[derive(Clone, Debug)]
66pub struct HeartbeatConfig {
67 pub interval_seconds:u64,
69 pub max_missed:u32,
71 pub enabled:bool,
73}
74
75impl Default for HeartbeatConfig {
77 fn default() -> Self { Self { interval_seconds:30, max_missed:3, enabled:true } }
78}
79
80#[derive(Clone, Debug, Default)]
82pub struct ConnectionMetrics {
83 pub total_requests:u64,
85 pub successful_requests:u64,
87 pub failed_requests:u64,
89 pub uptime_seconds:u64,
91 pub reconnections:u64,
93}
94
95pub struct SpineConnectionImpl {
97 config:Arc<RwLock<ProtocolConfig>>,
99 state:Arc<RwLock<ConnectionState>>,
101
102 #[cfg(feature = "grove_echo")]
103 echo_client:Option<EchoActionServiceClient<tonic::transport::Channel>>,
105
106 heartbeat_config:HeartbeatConfig,
108 last_heartbeat:Arc<RwLock<chrono::DateTime<chrono::Utc>>>,
110 metrics:Arc<RwLock<ConnectionMetrics>>,
112}
113
114impl SpineConnectionImpl {
115 pub fn new(config:ProtocolConfig) -> Self {
125 Self {
126 config:Arc::new(RwLock::new(config)),
127 state:Arc::new(RwLock::new(ConnectionState::Disconnected)),
128
129 #[cfg(feature = "grove_echo")]
130 echo_client:None,
131
132 heartbeat_config:HeartbeatConfig::default(),
133 last_heartbeat:Arc::new(RwLock::new(chrono::Utc::now())),
134 metrics:Arc::new(RwLock::new(ConnectionMetrics::default())),
135 }
136 }
137
138 pub async fn Connect(&mut self) -> Result<()> {
140 let guard = self.config.read().await;
141 let url = guard.mountain_endpoint.clone();
142 drop(guard);
143
144 dev_log!("grpc", "Connecting to Spine at: {}", url);
145 *self.state.write().await = ConnectionState::Connecting;
146 *self.state.write().await = ConnectionState::Connected;
147 *self.last_heartbeat.write().await = chrono::Utc::now();
148 dev_log!("grpc", "Successfully connected to Spine");
149 Ok(())
150 }
151
152 pub async fn Disconnect(&mut self) -> Result<()> {
154 dev_log!("grpc", "Disconnecting from Spine");
155
156 #[cfg(feature = "grove_echo")]
157 {
158 self.echo_client = None;
159 }
160
161 *self.state.write().await = ConnectionState::Disconnected;
162 dev_log!("grpc", "Successfully disconnected from Spine");
163 Ok(())
164 }
165
166 pub async fn GetState(&self) -> ConnectionState { *self.state.read().await }
168
169 pub async fn SendRequest(&self, method:&str, _payload:Vec<u8>) -> Result<Vec<u8>> {
176 if self.GetState().await != ConnectionState::Connected {
177 return Err(anyhow::anyhow!("Not connected to Spine"));
178 }
179
180 dev_log!("grpc", "Sending request: {}", method);
181
182 let mut metrics = self.metrics.write().await;
183 metrics.total_requests += 1;
184 metrics.successful_requests += 1;
185 Ok(Vec::new())
186 }
187
188 pub async fn GetMetrics(&self) -> ConnectionMetrics { self.metrics.read().await.clone() }
190
191 pub fn SetHeartbeatConfig(&mut self, config:HeartbeatConfig) { self.heartbeat_config = config; }
193}
194
195#[cfg(feature = "grove_echo")]
196impl SpineConnectionImpl {
197 pub async fn ConnectEchoClient(&mut self) -> Result<()> {
198 let guard = self.config.read().await;
199 let url = guard.mountain_endpoint.clone();
200 drop(guard);
201
202 dev_log!("grpc", "Connecting EchoAction client to: {}", url);
203
204 let channel = tonic::transport::Channel::from_shared(url)
205 .context("Invalid Mountain URL")?
206 .connect()
207 .await
208 .context("Failed to connect EchoAction client")?;
209
210 self.echo_client = Some(EchoActionServiceClient::new(channel));
211 dev_log!("grpc", "EchoAction client connected");
212 Ok(())
213 }
214
215 pub async fn SendEchoAction(&self, action:EchoAction) -> Result<EchoActionResponse> {
216 if self.GetState().await != ConnectionState::Connected {
217 return Err(anyhow::anyhow!("Not connected to Spine"));
218 }
219
220 let client = self
221 .echo_client
222 .as_ref()
223 .ok_or_else(|| anyhow::anyhow!("EchoAction client not connected"))?;
224
225 let response = client
226 .send_echo_action(action)
227 .await
228 .context("Failed to send EchoAction")?
229 .into_inner();
230
231 if !response.success {
232 anyhow::bail!("EchoAction failed: {}", response.error);
233 }
234
235 Ok(response)
236 }
237
238 pub async fn SendRpcViaEcho(
239 &self,
240 method:&str,
241 payload:Vec<u8>,
242 metadata:HashMap<String, String>,
243 ) -> Result<Vec<u8>> {
244 let mut headers = metadata;
245 headers.insert("rpc_method".to_string(), method.to_string());
246
247 let action = EchoAction {
248 action_id:uuid::Uuid::new_v4().to_string(),
249 source:"grove".to_string(),
250 target:"mountain".to_string(),
251 action_type:"rpc".to_string(),
252 payload,
253 headers,
254 timestamp:chrono::Utc::now().timestamp(),
255 nested_actions:vec![],
256 };
257
258 let response = self.SendEchoAction(action).await?;
259 Ok(response.result)
260 }
261
262 pub async fn SendEventViaEcho(
263 &self,
264 event_name:&str,
265 payload:Vec<u8>,
266 metadata:HashMap<String, String>,
267 ) -> Result<()> {
268 let mut headers = metadata;
269 headers.insert("event_name".to_string(), event_name.to_string());
270
271 let action = EchoAction {
272 action_id:uuid::Uuid::new_v4().to_string(),
273 source:"grove".to_string(),
274 target:"mountain".to_string(),
275 action_type:"event".to_string(),
276 payload,
277 headers,
278 timestamp:chrono::Utc::now().timestamp(),
279 nested_actions:vec![],
280 };
281
282 self.SendEchoAction(action).await?;
283 Ok(())
284 }
285
286 pub fn IsEchoAvailable(&self) -> bool { self.echo_client.is_some() }
287}
288
289#[cfg(test)]
290mod tests {
291 use super::*;
292
293 #[test]
294 fn test_connection_state() {
295 let state = ConnectionState::Connected;
296 assert_eq!(state, ConnectionState::Connected);
297 }
298
299 #[test]
300 fn test_heartbeat_config_default() {
301 let config = HeartbeatConfig::default();
302 assert_eq!(config.interval_seconds, 30);
303 assert!(config.enabled);
304 }
305
306 #[tokio::test]
307 async fn test_spine_connection_creation() {
308 let config = ProtocolConfig { mountain_endpoint:"http://127.0.0.1:50051".to_string(), ..Default::default() };
309 let connection = SpineConnectionImpl::new(config);
310 assert_eq!(connection.GetState().await, ConnectionState::Disconnected);
311 }
312}