Skip to main content

Grove/Transport/
IPCTransport.rs

1#![allow(non_snake_case, non_camel_case_types, non_upper_case_globals)]
2//! # IPC Transport Implementation
3//!
4//! Provides inter-process communication (IPC) for Grove.
5//! Supports Unix domain sockets on macOS/Linux and named pipes on Windows.
6
7use std::{
8	path::{Path, PathBuf},
9	sync::Arc,
10};
11
12use async_trait::async_trait;
13use tokio::sync::RwLock;
14use crate::dev_log;
15
16use crate::Transport::{
17	Strategy::{TransportStats, TransportStrategy, TransportType},
18	TransportConfig,
19};
20
21/// IPC transport for local process communication.
22#[derive(Clone, Debug)]
23pub struct IPCTransport {
24	/// Unix domain socket path (macOS/Linux).
25	SocketPath:Option<PathBuf>,
26	/// Named pipe identifier (Windows).
27	#[allow(dead_code)]
28	PipeName:Option<String>,
29	/// Transport configuration.
30	#[allow(dead_code)]
31	Configuration:TransportConfig,
32	/// Whether the transport is currently connected.
33	Connected:Arc<RwLock<bool>>,
34	/// Transport statistics.
35	Statistics:Arc<RwLock<TransportStats>>,
36}
37
38impl IPCTransport {
39	/// Creates a new IPC transport using the default socket path.
40	pub fn New() -> anyhow::Result<Self> {
41		#[cfg(unix)]
42		{
43			let SocketPath = Self::DefaultSocketPath();
44			Ok(Self {
45				SocketPath:Some(SocketPath),
46				PipeName:None,
47				Configuration:TransportConfig::default(),
48				Connected:Arc::new(RwLock::new(false)),
49				Statistics:Arc::new(RwLock::new(TransportStats::default())),
50			})
51		}
52
53		#[cfg(windows)]
54		{
55			Ok(Self {
56				SocketPath:None,
57				PipeName:Some(r"\\.\pipe\grove-ipc".to_string()),
58				Configuration:TransportConfig::default(),
59				Connected:Arc::new(RwLock::new(false)),
60				Statistics:Arc::new(RwLock::new(TransportStats::default())),
61			})
62		}
63
64		#[cfg(not(any(unix, windows)))]
65		{
66			Err(anyhow::anyhow!("IPC transport not supported on this platform"))
67		}
68	}
69
70	/// Creates a new IPC transport with a custom Unix domain socket path.
71	pub fn WithSocketPath<P:AsRef<Path>>(SocketPath:P) -> anyhow::Result<Self> {
72		#[cfg(unix)]
73		{
74			Ok(Self {
75				SocketPath:Some(SocketPath.as_ref().to_path_buf()),
76				PipeName:None,
77				Configuration:TransportConfig::default(),
78				Connected:Arc::new(RwLock::new(false)),
79				Statistics:Arc::new(RwLock::new(TransportStats::default())),
80			})
81		}
82
83		#[cfg(not(unix))]
84		{
85			Err(anyhow::anyhow!("Unix sockets not supported on this platform"))
86		}
87	}
88
89	/// Returns the default socket path for the current platform.
90	#[cfg(unix)]
91	fn DefaultSocketPath() -> PathBuf {
92		let mut Path = std::env::temp_dir();
93		Path.push("grove-ipc.sock");
94		Path
95	}
96
97	/// Returns the socket path (Unix only).
98	#[cfg(unix)]
99	pub fn GetSocketPath(&self) -> Option<&Path> { self.SocketPath.as_deref() }
100
101	/// Returns a snapshot of transport statistics.
102	pub async fn GetStatistics(&self) -> TransportStats { self.Statistics.read().await.clone() }
103
104	/// Removes the socket file if it exists.
105	#[cfg(unix)]
106	async fn CleanupSocket(&self) -> anyhow::Result<()> {
107		if let Some(SocketPath) = &self.SocketPath {
108			if SocketPath.exists() {
109				tokio::fs::remove_file(SocketPath)
110					.await
111					.map_err(|E| anyhow::anyhow!("Failed to remove socket: {}", E))?;
112				dev_log!("transport", "Removed existing socket: {:?}", SocketPath);
113			}
114		}
115		Ok(())
116	}
117}
118
119#[async_trait]
120impl TransportStrategy for IPCTransport {
121	type Error = IPCTransportError;
122
123	async fn connect(&self) -> Result<(), Self::Error> {
124		dev_log!("transport", "Connecting to IPC transport");
125
126		#[cfg(unix)]
127		{
128			self.CleanupSocket()
129				.await
130				.map_err(|E| IPCTransportError::ConnectionFailed(E.to_string()))?;
131			*self.Connected.write().await = true;
132			dev_log!("transport", "IPC connection established: {:?}", self.SocketPath);
133		}
134
135		#[cfg(windows)]
136		{
137			*self.Connected.write().await = true;
138			dev_log!("transport", "IPC connection established via named pipe");
139		}
140
141		#[cfg(not(any(unix, windows)))]
142		{
143			return Err(IPCTransportError::NotSupported);
144		}
145
146		Ok(())
147	}
148
149	async fn send(&self, request:&[u8]) -> Result<Vec<u8>, Self::Error> {
150		if !self.is_connected() {
151			return Err(IPCTransportError::NotConnected);
152		}
153
154		dev_log!("transport", "Sending IPC request ({} bytes)", request.len());
155
156		let Response:Vec<u8> = vec![];
157
158		let mut Stats = self.Statistics.write().await;
159		Stats.record_sent(request.len() as u64, 0);
160		Stats.record_received(Response.len() as u64);
161
162		Ok(Response)
163	}
164
165	async fn send_no_response(&self, data:&[u8]) -> Result<(), Self::Error> {
166		if !self.is_connected() {
167			return Err(IPCTransportError::NotConnected);
168		}
169
170		dev_log!("transport", "Sending IPC notification ({} bytes)", data.len());
171
172		let mut Stats = self.Statistics.write().await;
173		Stats.record_sent(data.len() as u64, 0);
174		Ok(())
175	}
176
177	async fn close(&self) -> Result<(), Self::Error> {
178		dev_log!("transport", "Closing IPC connection");
179		*self.Connected.write().await = false;
180
181		#[cfg(unix)]
182		{
183			if let Some(SocketPath) = &self.SocketPath {
184				if SocketPath.exists() {
185					tokio::fs::remove_file(SocketPath).await.map_err(|E| {
186						dev_log!("transport", "warn: failed to remove socket: {}", E);
187						IPCTransportError::CleanupFailed(E.to_string())
188					})?;
189				}
190			}
191		}
192
193		dev_log!("transport", "IPC connection closed");
194		Ok(())
195	}
196
197	fn is_connected(&self) -> bool { *self.Connected.blocking_read() }
198
199	fn transport_type(&self) -> TransportType { TransportType::IPC }
200}
201
202/// IPC transport error variants.
203#[derive(Debug, thiserror::Error)]
204pub enum IPCTransportError {
205	/// Failed to establish IPC connection
206	#[error("Connection failed: {0}")]
207	ConnectionFailed(String),
208	/// Failed to send message via IPC
209	#[error("Send failed: {0}")]
210	SendFailed(String),
211	/// Failed to receive message via IPC
212	#[error("Receive failed: {0}")]
213	ReceiveFailed(String),
214	/// Transport is not connected
215	#[error("Not connected")]
216	NotConnected,
217	/// IPC not supported on this platform
218	#[error("IPC not supported on this platform")]
219	NotSupported,
220	/// Failed to clean up IPC resources
221	#[error("Cleanup failed: {0}")]
222	CleanupFailed(String),
223	/// Socket communication error
224	#[error("Socket error: {0}")]
225	SocketError(String),
226	/// Operation timed out
227	#[error("Timeout")]
228	Timeout,
229}
230
231#[cfg(test)]
232mod tests {
233	use super::*;
234
235	#[test]
236	fn TestIPCTransportCreation() {
237		#[cfg(any(unix, windows))]
238		{
239			let Result = IPCTransport::New();
240			assert!(Result.is_ok());
241		}
242	}
243
244	#[cfg(unix)]
245	#[test]
246	fn TestIPCTransportWithSocketPath() {
247		let Result = IPCTransport::WithSocketPath(Path::new("/tmp/test.sock"));
248		assert!(Result.is_ok());
249		let Transport = Result.unwrap();
250		assert_eq!(Transport.GetSocketPath(), Some(Path::new("/tmp/test.sock")));
251	}
252
253	#[tokio::test]
254	async fn TestIPCTransportNotConnected() {
255		#[cfg(any(unix, windows))]
256		{
257			let Transport = IPCTransport::New().unwrap();
258			assert!(!Transport.is_connected());
259		}
260	}
261}