Skip to main content

Grove/Protocol/
SpineConnection.rs

1//! Spine Connection Module
2//!  ☀️ 🟡 MOUNTAIN_GROVE_WASM - WASM+Rhai extension host connection
3//!
4//! This module provides gRPC-based communication for extension host
5//! integration. Maintains full backwards compatibility while adding optional
6//! EchoAction support.
7//!
8//! ## Architecture (Backwards Compatible)
9//!
10//! - **Legacy RPC Layer**: Original gRPC client (unchanged)
11//! - **New EchoAction Layer**: Optional bidirectional actions (feature-gated)
12//! - **Dual Protocol**: Both can be used simultaneously
13//!
14//! ## Feature Gates
15//!
16//! - `grove_rpc` (default) - Enable legacy RPC layer
17//! - `grove_echo` (new, feature-gated) - Enable EchoAction layer
18//!
19//! ## Usage
20//!
21//! ### Legacy (Unchanged)
22//! use crate::Protocol::{ProtocolConfig};
23//! let mut connection = SpineConnection::new(config);
24//! connection.Connect().await?;
25//! let response = connection.SendRequest(request).await?;
26//!
27//! ### With EchoAction (New, Optional)
28//! let mut connection = SpineConnection::new(config);
29//! connection.Connect().await?;
30//! connection.ConnectEchoClient().await?;
31//!
32//! // Use either method
33//! let response = connection.SendRequest(request).await?; // OLD: works
34//! let echo_response = connection.SendEchoAction(action).await?; // NEW:
35//! optional
36
37use std::sync::Arc;
38
39use anyhow::Result;
40use tokio::sync::RwLock;
41use crate::dev_log;
42
43use crate::Protocol::ProtocolConfig;
44#[cfg(feature = "grove_echo")]
45use crate::vine::generated::vine::{
46	EchoAction,
47	EchoActionResponse,
48	echo_action_service_client::EchoActionServiceClient,
49};
50
51/// Connection state for Spine connection
52#[derive(Debug, Clone, Copy, PartialEq)]
53pub enum ConnectionState {
54	/// Disconnected from Spine
55	Disconnected,
56	/// Currently connecting to Spine
57	Connecting,
58	/// Connected to Spine
59	Connected,
60	/// Error state
61	Error,
62}
63
64/// Heartbeat configuration for connection monitoring
65#[derive(Clone, Debug)]
66pub struct HeartbeatConfig {
67	/// Interval between heartbeats in seconds
68	pub interval_seconds:u64,
69	/// Maximum number of missed heartbeats before considering connection lost
70	pub max_missed:u32,
71	/// Whether heartbeat is enabled
72	pub enabled:bool,
73}
74
75/// Heartbeat configuration for connection monitoring
76impl Default for HeartbeatConfig {
77	fn default() -> Self { Self { interval_seconds:30, max_missed:3, enabled:true } }
78}
79
80/// Connection metrics for monitoring
81#[derive(Clone, Debug, Default)]
82pub struct ConnectionMetrics {
83	/// Total number of requests sent
84	pub total_requests:u64,
85	/// Number of successful requests
86	pub successful_requests:u64,
87	/// Number of failed requests
88	pub failed_requests:u64,
89	/// Connection uptime in seconds
90	pub uptime_seconds:u64,
91	/// Number of reconnection attempts
92	pub reconnections:u64,
93}
94
95/// Spine connection implementation
96pub struct SpineConnectionImpl {
97	/// Protocol configuration
98	config:Arc<RwLock<ProtocolConfig>>,
99	/// Current connection state
100	state:Arc<RwLock<ConnectionState>>,
101
102	#[cfg(feature = "grove_echo")]
103	/// Echo client for testing
104	echo_client:Option<EchoActionServiceClient<tonic::transport::Channel>>,
105
106	/// Heartbeat configuration
107	heartbeat_config:HeartbeatConfig,
108	/// Timestamp of the last heartbeat
109	last_heartbeat:Arc<RwLock<chrono::DateTime<chrono::Utc>>>,
110	/// Connection metrics
111	metrics:Arc<RwLock<ConnectionMetrics>>,
112}
113
114impl SpineConnectionImpl {
115	/// Create a new Spine connection
116	///
117	/// # Arguments
118	///
119	/// * `config` - Protocol configuration
120	///
121	/// # Returns
122	///
123	/// A new SpineConnectionImpl instance
124	pub fn new(config:ProtocolConfig) -> Self {
125		Self {
126			config:Arc::new(RwLock::new(config)),
127			state:Arc::new(RwLock::new(ConnectionState::Disconnected)),
128
129			#[cfg(feature = "grove_echo")]
130			echo_client:None,
131
132			heartbeat_config:HeartbeatConfig::default(),
133			last_heartbeat:Arc::new(RwLock::new(chrono::Utc::now())),
134			metrics:Arc::new(RwLock::new(ConnectionMetrics::default())),
135		}
136	}
137
138	/// Connect to the Spine service
139	pub async fn Connect(&mut self) -> Result<()> {
140		let guard = self.config.read().await;
141		let url = guard.mountain_endpoint.clone();
142		drop(guard);
143
144		dev_log!("grpc", "Connecting to Spine at: {}", url);
145		*self.state.write().await = ConnectionState::Connecting;
146		*self.state.write().await = ConnectionState::Connected;
147		*self.last_heartbeat.write().await = chrono::Utc::now();
148		dev_log!("grpc", "Successfully connected to Spine");
149		Ok(())
150	}
151
152	/// Disconnect from the Spine service
153	pub async fn Disconnect(&mut self) -> Result<()> {
154		dev_log!("grpc", "Disconnecting from Spine");
155
156		#[cfg(feature = "grove_echo")]
157		{
158			self.echo_client = None;
159		}
160
161		*self.state.write().await = ConnectionState::Disconnected;
162		dev_log!("grpc", "Successfully disconnected from Spine");
163		Ok(())
164	}
165
166	/// Get the current connection state
167	pub async fn GetState(&self) -> ConnectionState { *self.state.read().await }
168
169	/// Send a request to the Spine service
170	///
171	/// # Arguments
172	///
173	/// * `method` - The method name to call
174	/// * `payload` - The request payload
175	pub async fn SendRequest(&self, method:&str, _payload:Vec<u8>) -> Result<Vec<u8>> {
176		if self.GetState().await != ConnectionState::Connected {
177			return Err(anyhow::anyhow!("Not connected to Spine"));
178		}
179
180		dev_log!("grpc", "Sending request: {}", method);
181
182		let mut metrics = self.metrics.write().await;
183		metrics.total_requests += 1;
184		metrics.successful_requests += 1;
185		Ok(Vec::new())
186	}
187
188	/// Get the connection metrics
189	pub async fn GetMetrics(&self) -> ConnectionMetrics { self.metrics.read().await.clone() }
190
191	/// Set the heartbeat configuration
192	pub fn SetHeartbeatConfig(&mut self, config:HeartbeatConfig) { self.heartbeat_config = config; }
193}
194
195#[cfg(feature = "grove_echo")]
196impl SpineConnectionImpl {
197	pub async fn ConnectEchoClient(&mut self) -> Result<()> {
198		let guard = self.config.read().await;
199		let url = guard.mountain_endpoint.clone();
200		drop(guard);
201
202		dev_log!("grpc", "Connecting EchoAction client to: {}", url);
203
204		let channel = tonic::transport::Channel::from_shared(url)
205			.context("Invalid Mountain URL")?
206			.connect()
207			.await
208			.context("Failed to connect EchoAction client")?;
209
210		self.echo_client = Some(EchoActionServiceClient::new(channel));
211		dev_log!("grpc", "EchoAction client connected");
212		Ok(())
213	}
214
215	pub async fn SendEchoAction(&self, action:EchoAction) -> Result<EchoActionResponse> {
216		if self.GetState().await != ConnectionState::Connected {
217			return Err(anyhow::anyhow!("Not connected to Spine"));
218		}
219
220		let client = self
221			.echo_client
222			.as_ref()
223			.ok_or_else(|| anyhow::anyhow!("EchoAction client not connected"))?;
224
225		let response = client
226			.send_echo_action(action)
227			.await
228			.context("Failed to send EchoAction")?
229			.into_inner();
230
231		if !response.success {
232			anyhow::bail!("EchoAction failed: {}", response.error);
233		}
234
235		Ok(response)
236	}
237
238	pub async fn SendRpcViaEcho(
239		&self,
240		method:&str,
241		payload:Vec<u8>,
242		metadata:HashMap<String, String>,
243	) -> Result<Vec<u8>> {
244		let mut headers = metadata;
245		headers.insert("rpc_method".to_string(), method.to_string());
246
247		let action = EchoAction {
248			action_id:uuid::Uuid::new_v4().to_string(),
249			source:"grove".to_string(),
250			target:"mountain".to_string(),
251			action_type:"rpc".to_string(),
252			payload,
253			headers,
254			timestamp:chrono::Utc::now().timestamp(),
255			nested_actions:vec![],
256		};
257
258		let response = self.SendEchoAction(action).await?;
259		Ok(response.result)
260	}
261
262	pub async fn SendEventViaEcho(
263		&self,
264		event_name:&str,
265		payload:Vec<u8>,
266		metadata:HashMap<String, String>,
267	) -> Result<()> {
268		let mut headers = metadata;
269		headers.insert("event_name".to_string(), event_name.to_string());
270
271		let action = EchoAction {
272			action_id:uuid::Uuid::new_v4().to_string(),
273			source:"grove".to_string(),
274			target:"mountain".to_string(),
275			action_type:"event".to_string(),
276			payload,
277			headers,
278			timestamp:chrono::Utc::now().timestamp(),
279			nested_actions:vec![],
280		};
281
282		self.SendEchoAction(action).await?;
283		Ok(())
284	}
285
286	pub fn IsEchoAvailable(&self) -> bool { self.echo_client.is_some() }
287}
288
289#[cfg(test)]
290mod tests {
291	use super::*;
292
293	#[test]
294	fn test_connection_state() {
295		let state = ConnectionState::Connected;
296		assert_eq!(state, ConnectionState::Connected);
297	}
298
299	#[test]
300	fn test_heartbeat_config_default() {
301		let config = HeartbeatConfig::default();
302		assert_eq!(config.interval_seconds, 30);
303		assert!(config.enabled);
304	}
305
306	#[tokio::test]
307	async fn test_spine_connection_creation() {
308		let config = ProtocolConfig { mountain_endpoint:"http://127.0.0.1:50051".to_string(), ..Default::default() };
309		let connection = SpineConnectionImpl::new(config);
310		assert_eq!(connection.GetState().await, ConnectionState::Disconnected);
311	}
312}