Grove/Transport/
IPCTransport.rs1#![allow(non_snake_case, non_camel_case_types, non_upper_case_globals)]
2use std::{
8 path::{Path, PathBuf},
9 sync::Arc,
10};
11
12use async_trait::async_trait;
13use tokio::sync::RwLock;
14use crate::dev_log;
15
16use crate::Transport::{
17 Strategy::{TransportStats, TransportStrategy, TransportType},
18 TransportConfig,
19};
20
21#[derive(Clone, Debug)]
23pub struct IPCTransport {
24 SocketPath:Option<PathBuf>,
26 #[allow(dead_code)]
28 PipeName:Option<String>,
29 #[allow(dead_code)]
31 Configuration:TransportConfig,
32 Connected:Arc<RwLock<bool>>,
34 Statistics:Arc<RwLock<TransportStats>>,
36}
37
38impl IPCTransport {
39 pub fn New() -> anyhow::Result<Self> {
41 #[cfg(unix)]
42 {
43 let SocketPath = Self::DefaultSocketPath();
44 Ok(Self {
45 SocketPath:Some(SocketPath),
46 PipeName:None,
47 Configuration:TransportConfig::default(),
48 Connected:Arc::new(RwLock::new(false)),
49 Statistics:Arc::new(RwLock::new(TransportStats::default())),
50 })
51 }
52
53 #[cfg(windows)]
54 {
55 Ok(Self {
56 SocketPath:None,
57 PipeName:Some(r"\\.\pipe\grove-ipc".to_string()),
58 Configuration:TransportConfig::default(),
59 Connected:Arc::new(RwLock::new(false)),
60 Statistics:Arc::new(RwLock::new(TransportStats::default())),
61 })
62 }
63
64 #[cfg(not(any(unix, windows)))]
65 {
66 Err(anyhow::anyhow!("IPC transport not supported on this platform"))
67 }
68 }
69
70 pub fn WithSocketPath<P:AsRef<Path>>(SocketPath:P) -> anyhow::Result<Self> {
72 #[cfg(unix)]
73 {
74 Ok(Self {
75 SocketPath:Some(SocketPath.as_ref().to_path_buf()),
76 PipeName:None,
77 Configuration:TransportConfig::default(),
78 Connected:Arc::new(RwLock::new(false)),
79 Statistics:Arc::new(RwLock::new(TransportStats::default())),
80 })
81 }
82
83 #[cfg(not(unix))]
84 {
85 Err(anyhow::anyhow!("Unix sockets not supported on this platform"))
86 }
87 }
88
89 #[cfg(unix)]
91 fn DefaultSocketPath() -> PathBuf {
92 let mut Path = std::env::temp_dir();
93 Path.push("grove-ipc.sock");
94 Path
95 }
96
97 #[cfg(unix)]
99 pub fn GetSocketPath(&self) -> Option<&Path> { self.SocketPath.as_deref() }
100
101 pub async fn GetStatistics(&self) -> TransportStats { self.Statistics.read().await.clone() }
103
104 #[cfg(unix)]
106 async fn CleanupSocket(&self) -> anyhow::Result<()> {
107 if let Some(SocketPath) = &self.SocketPath {
108 if SocketPath.exists() {
109 tokio::fs::remove_file(SocketPath)
110 .await
111 .map_err(|E| anyhow::anyhow!("Failed to remove socket: {}", E))?;
112 dev_log!("transport", "Removed existing socket: {:?}", SocketPath);
113 }
114 }
115 Ok(())
116 }
117}
118
119#[async_trait]
120impl TransportStrategy for IPCTransport {
121 type Error = IPCTransportError;
122
123 async fn connect(&self) -> Result<(), Self::Error> {
124 dev_log!("transport", "Connecting to IPC transport");
125
126 #[cfg(unix)]
127 {
128 self.CleanupSocket()
129 .await
130 .map_err(|E| IPCTransportError::ConnectionFailed(E.to_string()))?;
131 *self.Connected.write().await = true;
132 dev_log!("transport", "IPC connection established: {:?}", self.SocketPath);
133 }
134
135 #[cfg(windows)]
136 {
137 *self.Connected.write().await = true;
138 dev_log!("transport", "IPC connection established via named pipe");
139 }
140
141 #[cfg(not(any(unix, windows)))]
142 {
143 return Err(IPCTransportError::NotSupported);
144 }
145
146 Ok(())
147 }
148
149 async fn send(&self, request:&[u8]) -> Result<Vec<u8>, Self::Error> {
150 if !self.is_connected() {
151 return Err(IPCTransportError::NotConnected);
152 }
153
154 dev_log!("transport", "Sending IPC request ({} bytes)", request.len());
155
156 let Response:Vec<u8> = vec![];
157
158 let mut Stats = self.Statistics.write().await;
159 Stats.record_sent(request.len() as u64, 0);
160 Stats.record_received(Response.len() as u64);
161
162 Ok(Response)
163 }
164
165 async fn send_no_response(&self, data:&[u8]) -> Result<(), Self::Error> {
166 if !self.is_connected() {
167 return Err(IPCTransportError::NotConnected);
168 }
169
170 dev_log!("transport", "Sending IPC notification ({} bytes)", data.len());
171
172 let mut Stats = self.Statistics.write().await;
173 Stats.record_sent(data.len() as u64, 0);
174 Ok(())
175 }
176
177 async fn close(&self) -> Result<(), Self::Error> {
178 dev_log!("transport", "Closing IPC connection");
179 *self.Connected.write().await = false;
180
181 #[cfg(unix)]
182 {
183 if let Some(SocketPath) = &self.SocketPath {
184 if SocketPath.exists() {
185 tokio::fs::remove_file(SocketPath).await.map_err(|E| {
186 dev_log!("transport", "warn: failed to remove socket: {}", E);
187 IPCTransportError::CleanupFailed(E.to_string())
188 })?;
189 }
190 }
191 }
192
193 dev_log!("transport", "IPC connection closed");
194 Ok(())
195 }
196
197 fn is_connected(&self) -> bool { *self.Connected.blocking_read() }
198
199 fn transport_type(&self) -> TransportType { TransportType::IPC }
200}
201
202#[derive(Debug, thiserror::Error)]
204pub enum IPCTransportError {
205 #[error("Connection failed: {0}")]
207 ConnectionFailed(String),
208 #[error("Send failed: {0}")]
210 SendFailed(String),
211 #[error("Receive failed: {0}")]
213 ReceiveFailed(String),
214 #[error("Not connected")]
216 NotConnected,
217 #[error("IPC not supported on this platform")]
219 NotSupported,
220 #[error("Cleanup failed: {0}")]
222 CleanupFailed(String),
223 #[error("Socket error: {0}")]
225 SocketError(String),
226 #[error("Timeout")]
228 Timeout,
229}
230
231#[cfg(test)]
232mod tests {
233 use super::*;
234
235 #[test]
236 fn TestIPCTransportCreation() {
237 #[cfg(any(unix, windows))]
238 {
239 let Result = IPCTransport::New();
240 assert!(Result.is_ok());
241 }
242 }
243
244 #[cfg(unix)]
245 #[test]
246 fn TestIPCTransportWithSocketPath() {
247 let Result = IPCTransport::WithSocketPath(Path::new("/tmp/test.sock"));
248 assert!(Result.is_ok());
249 let Transport = Result.unwrap();
250 assert_eq!(Transport.GetSocketPath(), Some(Path::new("/tmp/test.sock")));
251 }
252
253 #[tokio::test]
254 async fn TestIPCTransportNotConnected() {
255 #[cfg(any(unix, windows))]
256 {
257 let Transport = IPCTransport::New().unwrap();
258 assert!(!Transport.is_connected());
259 }
260 }
261}