Skip to main content

Mountain/IPC/
TauriIPCServer.rs

1//! # TauriIPCServer - Mountain-Wind IPC Bridge
2//!
3//! **File Responsibilities:**
4//! This module serves as the core IPC (Inter-Process Communication) server for
5//! Mountain, establishing and managing the bidirectional communication bridge
6//! between Mountain's Rust backend and Wind's TypeScript frontend. It
7//! implements the Mountain counterpart to Wind's TauriIPCServer.ts, ensuring
8//! seamless integration across the language boundary.
9//!
10//! **Architectural Role in Wind-Mountain Connection:**
11//! The TauriIPCServer acts as the central message router and communication
12//! orchestrator:
13//!
14//! 1. **Connection Management:**
15//!    - Establishes secure connections between Wind and Mountain
16//!    - Maintains connection health and auto-reconnects on failure
17//!    - Manages connection pooling for optimal resource usage
18//!    - Tracks connection state for monitoring and debugging
19//!
20//! 2. **Message Routing:**
21//!    - Routes incoming messages from Wind to appropriate handlers
22//!    - Broadcasts messages from Mountain to Wind subscribers
23//!    - Implements message filtering and prioritization
24//!    - Supports point-to-point and publish-subscribe patterns
25//!
26//! 3. **Security Layer:**
27//!    - Validates all incoming messages for security
28//!    - Implements permission-based access control (RBAC)
29//!    - Provides AES-256-GCM encryption for sensitive data
30//!    - Logs all security events for audit trails
31//!
32//! 4. **Reliability Features:**
33//!    - Message queuing for offline scenarios
34//!    - Automatic retry with exponential backoff
35//!    - Graceful degradation when services unavailable
36//!    - Circuit breaker pattern for cascading failure prevention
37//!
38//! **Communication Patterns:**
39//!
40//! **1. Request-Response Pattern:**
41//! ```text
42//! // Wind sends request
43//! let result = app_handle.invoke_handler("command", args).await?;
44//!
45//! // Mountain processes and responds
46//! let response = handle_request().await;
47//! ipc.emit(response_channel, response).await;
48//! ```
49//!
50//! **2. Event Emission Pattern:**
51//! ```text
52//! // Mountain emits events to Wind subscribers
53//! app.emit("configuration-updated", new_config).await;
54//! app.emit("file-changed", file_event).await;
55//! ```
56//!
57//! **3. Broadcast Pattern:**
58//! ```rust
59//! // Broadcast to all subscribers on a channel
60//! for listener in listeners.get(channel) {
61//! 	listener(message.clone()).await;
62//! }
63//! ```
64//!
65//! **Message Flow:**
66//! ```text
67//! Wind Frontend
68//! |
69//! | 4. Response
70//! v
71//! Tauri Bridge (JS Bridge)
72//! |
73//! | 1. IPC Invoke
74//! v
75//! TauriIPCServer (Rust)
76//! |
77//! | 2. Route & Validate
78//! v
79//! WindServiceHandlers
80//! |
81//! | 3. Execute
82//! v
83//! Mountain Services
84//! ```
85//!
86//! **Key Structures:**
87//!
88//! - **TauriIPCMessage:** Standard message format for all IPC communication
89//! - **ConnectionStatus:** Tracks connection health and uptime
90//! - **ConnectionPool:** Manages concurrent IPC connections efficiently
91//! - **PermissionManager:** Implements role-based access control
92//! - **SecureMessageChannel:** Provides encryption for sensitive data
93//! - **MessageCompressor:** Gzip compression for large payloads
94//!
95//! **Defensive Coding Practices:**
96//!
97//! 1. **Input Validation:**
98//!    - All messages validated before processing
99//!    - Type checking for all serialized data
100//!    - Schema validation for complex payloads
101//!
102//! 2. **Error Handling:**
103//!    - Comprehensive error messages with context
104//!    - Error logging at appropriate levels
105//!    - Graceful handling of transient failures
106//!    - Automatic retry with backoff
107//!
108//! 3. **Timeout Management:**
109//!    - Configurable timeouts for all operations
110//!    - Timeout-based circuit breaking
111//!    - Graceful degradation on timeout
112//!
113//! 4. **Resource Management:**
114//!    - Connection pooling to prevent exhaustion
115//!    - Automatic cleanup of stale resources
116//!    - Memory-efficient message queuing
117//!
118//! **Security Architecture:**
119//!
120//! - **Authentication:** User identity verification
121//! - **Authorization:** Permission-based access control (RBAC)
122//! - **Encryption:** AES-256-GCM for sensitive data
123//! - **Auditing:** Complete security event logging
124//! - **Threat Detection:** Anomaly monitoring and alerts
125//!
126//! **Performance Optimizations:**
127//!
128//! - **Message Compression:** Gzip for large payloads
129//! - **Connection Pooling:** Reuse connections efficiently
130//! - **Caching:** Cache frequently used data
131//! - **Batching:** Batch multiple messages for efficiency
132//! - **Async/Await:** Non-blocking I/O operations
133//!
134//! **Monitoring & Observability:**
135//!
136//! - **Connection Status:** Real-time health monitoring
137//! - **Performance Metrics:** Latency, throughput, error rates
138//! - **Audit Logs:** Complete message and security event logging
139//! - **Health Checks:** Periodic health assessments
140//!
141//! **VSCode RPC Patterns (Study Reference):**
142//! This implementation draws inspiration from VSCode's RPC/IPC architecture:
143//! - Channel-based message routing
144//! - Request-response correlation
145//! - Cancellation token support
146//! - Binary protocol message serialization
147//! - Protocol versioning for compatibility
148
149use std::{
150	collections::HashMap,
151	io::{Read, Write},
152	sync::{Arc, Mutex},
153	time::Duration,
154};
155
156use base64::{Engine, engine::general_purpose};
157use flate2::{Compression, read::GzDecoder, write::GzEncoder};
158use ring::{
159	aead::{self, AES_256_GCM, LessSafeKey, UnboundKey},
160	hmac,
161	rand::{SecureRandom, SystemRandom},
162};
163use serde::{Deserialize, Serialize};
164use tauri::{AppHandle, Emitter, Manager};
165use tokio::{
166	sync::{Mutex as AsyncMutex, RwLock, Semaphore},
167	time::timeout,
168};
169
170use crate::dev_log;
171
172/// IPC message structure matching Wind's ITauriIPCMessage interface
173#[derive(Debug, Clone, Serialize, Deserialize)]
174pub struct TauriIPCMessage {
175	pub channel:String,
176	pub data:serde_json::Value,
177	pub sender:Option<String>,
178	pub timestamp:u64,
179}
180
181/// Connection status message
182#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct ConnectionStatus {
184	pub connected:bool,
185}
186
187/// Listener callback type
188type ListenerCallback = Box<dyn Fn(serde_json::Value) -> Result<(), String> + Send + Sync>;
189
190/// Mountain's IPC Server counterpart to Wind's TauriIPCServer
191#[derive(Clone)]
192pub struct TauriIPCServer {
193	app_handle:AppHandle,
194	listeners:Arc<Mutex<HashMap<String, Vec<ListenerCallback>>>>,
195	is_connected:Arc<Mutex<bool>>,
196	message_queue:Arc<Mutex<Vec<TauriIPCMessage>>>,
197}
198
199/// Message compression utility for optimizing IPC message transfer
200pub struct MessageCompressor {
201	CompressionLevel:u32,
202	BatchSize:usize,
203}
204
205impl MessageCompressor {
206	/// Create a new message compressor with specified parameters
207	pub fn new(CompressionLevel:u32, BatchSize:usize) -> Self { Self { CompressionLevel, BatchSize } }
208
209	/// Compress messages using Gzip for efficient transfer
210	pub fn compress_messages(&self, Messages:Vec<TauriIPCMessage>) -> Result<Vec<u8>, String> {
211		let SerializedMessages =
212			serde_json::to_vec(&Messages).map_err(|e| format!("Failed to serialize messages: {}", e))?;
213
214		let mut encoder = GzEncoder::new(Vec::new(), Compression::new(self.CompressionLevel));
215		encoder
216			.write_all(&SerializedMessages)
217			.map_err(|e| format!("Failed to compress messages: {}", e))?;
218
219		encoder.finish().map_err(|e| format!("Failed to finish compression: {}", e))
220	}
221
222	/// Decompress messages from compressed data
223	pub fn decompress_messages(&self, CompressedData:&[u8]) -> Result<Vec<TauriIPCMessage>, String> {
224		let mut decoder = GzDecoder::new(CompressedData);
225		let mut DecompressedData = Vec::new();
226		decoder
227			.read_to_end(&mut DecompressedData)
228			.map_err(|e| format!("Failed to decompress data: {}", e))?;
229
230		serde_json::from_slice(&DecompressedData).map_err(|e| format!("Failed to deserialize messages: {}", e))
231	}
232
233	/// Check if messages should be batched for compression
234	pub fn should_batch(&self, MessagesCount:usize) -> bool { MessagesCount >= self.BatchSize }
235}
236
237impl TauriIPCServer {
238	/// Create a new Tauri IPC Server instance
239	pub fn new(app_handle:AppHandle) -> Self {
240		dev_log!("ipc", "[TauriIPCServer] Initializing Mountain IPC Server");
241
242		Self {
243			app_handle,
244			listeners:Arc::new(Mutex::new(HashMap::new())),
245			is_connected:Arc::new(Mutex::new(false)),
246			message_queue:Arc::new(Mutex::new(Vec::new())),
247		}
248	}
249
250	/// Initialize the IPC server and set up event listeners
251	pub async fn initialize(&self) -> Result<(), String> {
252		dev_log!("ipc", "[TauriIPCServer] Setting up IPC listeners");
253
254		// Set up connection status
255		{
256			let mut is_connected = self
257				.is_connected
258				.lock()
259				.map_err(|e| format!("Failed to lock connection status: {}", e))?;
260			*is_connected = true;
261		}
262
263		// Notify Wind that Mountain is ready
264		self.send_connection_status(true)
265			.await
266			.map_err(|e| format!("Failed to send connection status: {}", e))?;
267
268		dev_log!("ipc", "[TauriIPCServer] IPC Server initialized successfully");
269
270		// Process any queued messages
271		self.process_message_queue().await;
272
273		Ok(())
274	}
275
276	/// Send a message to the Wind frontend
277	pub async fn send(&self, channel:&str, data:serde_json::Value) -> Result<(), String> {
278		let message = TauriIPCMessage {
279			channel:channel.to_string(),
280			data,
281			sender:Some("mountain".to_string()),
282			timestamp:std::time::SystemTime::now()
283				.duration_since(std::time::UNIX_EPOCH)
284				.unwrap_or_default()
285				.as_millis() as u64,
286		};
287
288		let is_connected = {
289			let guard = self
290				.is_connected
291				.lock()
292				.map_err(|e| format!("Failed to check connection status: {}", e))?;
293			*guard
294		};
295
296		if !is_connected {
297			// Queue the message for later delivery
298			let mut queue = self
299				.message_queue
300				.lock()
301				.map_err(|e| format!("Failed to access message queue: {}", e))?;
302			queue.push(message);
303			dev_log!(
304				"ipc",
305				"[TauriIPCServer] Message queued (channel: {}, queue size: {})",
306				channel,
307				queue.len()
308			);
309			return Ok(());
310		}
311
312		// Send immediately
313		self.emit_message(&message).await
314	}
315
316	/// Register a listener for incoming messages from Wind
317	pub fn on(&self, channel:&str, callback:ListenerCallback) -> Result<(), String> {
318		let mut listeners = self
319			.listeners
320			.lock()
321			.map_err(|e| format!("Failed to access listeners: {}", e))?;
322
323		listeners.entry(channel.to_string()).or_insert_with(Vec::new).push(callback);
324
325		dev_log!("ipc", "[TauriIPCServer] Listener registered for channel: {}", channel);
326		Ok(())
327	}
328
329	/// Remove a listener
330	pub fn off(&self, channel:&str, callback:&ListenerCallback) -> Result<(), String> {
331		let mut listeners = self
332			.listeners
333			.lock()
334			.map_err(|e| format!("Failed to access listeners: {}", e))?;
335
336		if let Some(channel_listeners) = listeners.get_mut(channel) {
337			channel_listeners.retain(|cb| !std::ptr::eq(cb as *const _, callback as *const _));
338
339			if channel_listeners.is_empty() {
340				listeners.remove(channel);
341			}
342		}
343
344		dev_log!("ipc", "[TauriIPCServer] Listener removed from channel: {}", channel);
345		Ok(())
346	}
347
348	/// Handle incoming messages from Wind
349	pub async fn handle_incoming_message(&self, message:TauriIPCMessage) -> Result<(), String> {
350		dev_log!("ipc", "[TauriIPCServer] Received message on channel: {}", message.channel);
351
352		let listeners = self
353			.listeners
354			.lock()
355			.map_err(|e| format!("Failed to access listeners: {}", e))?;
356
357		if let Some(channel_listeners) = listeners.get(&message.channel) {
358			for callback in channel_listeners {
359				if let Err(e) = callback(message.data.clone()) {
360					dev_log!(
361						"ipc",
362						"error: [TauriIPCServer] Error in listener for channel {}: {}",
363						message.channel,
364						e
365					);
366				}
367			}
368		} else {
369			dev_log!("ipc", "[TauriIPCServer] No listeners found for channel: {}", message.channel);
370		}
371
372		Ok(())
373	}
374
375	/// Send connection status to Wind
376	async fn send_connection_status(&self, connected:bool) -> Result<(), String> {
377		let status = ConnectionStatus { connected };
378
379		self.app_handle
380			.emit("vscode-ipc-status", status)
381			.map_err(|e| format!("Failed to emit connection status: {}", e))?;
382
383		dev_log!("ipc", "[TauriIPCServer] Connection status sent: {}", connected);
384		Ok(())
385	}
386
387	/// Emit a message to Wind
388	async fn emit_message(&self, message:&TauriIPCMessage) -> Result<(), String> {
389		self.app_handle
390			.emit("vscode-ipc-message", message)
391			.map_err(|e| format!("Failed to emit message: {}", e))?;
392
393		dev_log!("ipc", "[TauriIPCServer] Message emitted on channel: {}", message.channel);
394		Ok(())
395	}
396
397	/// Process queued messages
398	async fn process_message_queue(&self) {
399		let mut queue = match self.message_queue.lock() {
400			Ok(queue) => queue,
401			Err(e) => {
402				dev_log!("ipc", "error: [TauriIPCServer] Failed to access message queue: {}", e);
403				return;
404			},
405		};
406
407		while let Some(message) = queue.pop() {
408			if let Err(e) = self.emit_message(&message).await {
409				dev_log!("ipc", "error: [TauriIPCServer] Failed to send queued message: {}", e);
410				// Put the message back in the queue
411				queue.insert(0, message);
412				break;
413			}
414		}
415
416		dev_log!(
417			"ipc",
418			"[TauriIPCServer] Message queue processed, {} messages remaining",
419			queue.len()
420		);
421	}
422
423	/// Get connection status
424	pub fn get_connection_status(&self) -> Result<bool, String> {
425		let guard = self
426			.is_connected
427			.lock()
428			.map_err(|e| format!("Failed to get connection status: {}", e))?;
429		Ok(*guard)
430	}
431
432	/// Get queued message count
433	pub fn get_queue_size(&self) -> Result<usize, String> {
434		let guard = self
435			.message_queue
436			.lock()
437			.map_err(|e| format!("Failed to get queue size: {}", e))?;
438		Ok(guard.len())
439	}
440
441	/// Cleanup resources
442	pub fn dispose(&self) -> Result<(), String> {
443		{
444			let mut listeners = self
445				.listeners
446				.lock()
447				.map_err(|e| format!("Failed to access listeners: {}", e))?;
448			listeners.clear();
449		}
450
451		{
452			let mut queue = self
453				.message_queue
454				.lock()
455				.map_err(|e| format!("Failed to access message queue: {}", e))?;
456			queue.clear();
457		}
458
459		{
460			let mut is_connected = self
461				.is_connected
462				.lock()
463				.map_err(|e| format!("Failed to access connection status: {}", e))?;
464			*is_connected = false;
465		}
466
467		dev_log!("ipc", "[TauriIPCServer] IPC Server disposed");
468		Ok(())
469	}
470
471	/// Advanced: Validate message permissions
472	pub async fn validate_message_permissions(&self, message:&TauriIPCMessage) -> Result<(), String> {
473		let permission_manager = PermissionManager::new();
474		permission_manager.initialize_defaults().await;
475
476		let context = self.create_security_context(message);
477
478		// Extract operation from channel name
479		let operation = message.channel.replace("mountain_", "");
480
481		// Validate permission
482		permission_manager.validate_permission(&operation, &context).await
483	}
484
485	/// Advanced: Create security context from message
486	fn create_security_context(&self, message:&TauriIPCMessage) -> SecurityContext {
487		SecurityContext {
488			user_id:message.sender.clone().unwrap_or("unknown".to_string()),
489			// Default role assigned to authenticated IPC connections
490			roles:vec!["user".to_string()],
491			permissions:vec![],
492			// IPC connections use loopback address for security (localhost only)
493			ip_address:"127.0.0.1".to_string(),
494			timestamp:std::time::SystemTime::UNIX_EPOCH + std::time::Duration::from_millis(message.timestamp),
495		}
496	}
497
498	/// Advanced: Log security event
499	pub async fn log_security_event(&self, event:SecurityEvent) {
500		let permission_manager = PermissionManager::new();
501		permission_manager.log_security_event(event).await;
502	}
503
504	/// Advanced: Record performance metrics
505	pub async fn record_performance_metrics(&self, channel:String, duration:std::time::Duration, success:bool) {
506		// This would integrate with the PerformanceDashboard
507		dev_log!(
508			"ipc",
509			"[TauriIPCServer] Performance recorded - Channel: {}, Duration: {:?}, Success: {}",
510			channel,
511			duration,
512			success
513		);
514	}
515
516	/// Advanced: Get security audit log
517	pub async fn get_security_audit_log(&self, limit:usize) -> Result<Vec<SecurityEvent>, String> {
518		let permission_manager = PermissionManager::new();
519		Ok(permission_manager.get_audit_log(limit).await)
520	}
521
522	/// Send compressed message batch
523	pub async fn send_compressed_batch(&self, channel:&str, messages:Vec<TauriIPCMessage>) -> Result<(), String> {
524		// Configure compressor with balanced settings: level 6 (good compression/speed
525		// tradeoff) and batch size 10 (aggregate small messages for efficiency)
526		let compressor = MessageCompressor::new(6, 10);
527
528		let compressed_data = compressor
529			.compress_messages(messages)
530			.map_err(|e| format!("Failed to compress batch: {}", e))?;
531
532		let batch_message = TauriIPCMessage {
533			channel:"compressed_batch".to_string(),
534			data:serde_json::Value::String(general_purpose::STANDARD.encode(&compressed_data)),
535			sender:Some("mountain".to_string()),
536			timestamp:std::time::SystemTime::now()
537				.duration_since(std::time::UNIX_EPOCH)
538				.unwrap_or_default()
539				.as_millis() as u64,
540		};
541
542		self.send(channel, serde_json::to_value(batch_message).unwrap()).await
543	}
544
545	/// Handle compressed batch message
546	pub async fn handle_compressed_batch(&self, message:TauriIPCMessage) -> Result<(), String> {
547		let compressed_data_base64 = message.data.as_str().ok_or("Compressed batch data must be a string")?;
548
549		let compressed_data = general_purpose::STANDARD
550			.decode(compressed_data_base64)
551			.map_err(|e| format!("Failed to decode base64: {}", e))?;
552
553		let compressor = MessageCompressor::new(6, 10);
554		let messages = compressor
555			.decompress_messages(&compressed_data)
556			.map_err(|e| format!("Failed to decompress batch: {}", e))?;
557
558		// Process each message in the batch
559		for message in messages {
560			self.handle_incoming_message(message).await?;
561		}
562
563		Ok(())
564	}
565
566	/// Send message using connection pool
567	pub async fn send_with_pool(&self, channel:&str, data:serde_json::Value) -> Result<(), String> {
568		let pool = Arc::new(ConnectionPool::new(10, Duration::from_secs(30)));
569
570		let handle = pool
571			.GetConnection()
572			.await
573			.map_err(|e| format!("Failed to get connection: {}", e))?;
574
575		let result = self.send(channel, data).await;
576
577		pool.ReleaseConnection(handle).await;
578
579		result
580	}
581
582	/// Get connection pool statistics
583	pub async fn get_connection_stats(&self) -> Result<ConnectionStats, String> {
584		let pool = Arc::new(ConnectionPool::new(10, Duration::from_secs(30)));
585		Ok(pool.GetStats().await)
586	}
587
588	/// Send encrypted message
589	pub async fn send_secure(&self, channel:&str, data:serde_json::Value) -> Result<(), String> {
590		let secure_channel =
591			SecureMessageChannel::new().map_err(|e| format!("Failed to create secure channel: {}", e))?;
592
593		let message = TauriIPCMessage {
594			channel:channel.to_string(),
595			data,
596			sender:Some("mountain".to_string()),
597			timestamp:std::time::SystemTime::now()
598				.duration_since(std::time::UNIX_EPOCH)
599				.unwrap_or_default()
600				.as_millis() as u64,
601		};
602
603		let encrypted_message = secure_channel
604			.encrypt_message(&message)
605			.map_err(|e| format!("Failed to encrypt message: {}", e))?;
606
607		let encrypted_data = serde_json::to_value(encrypted_message)
608			.map_err(|e| format!("Failed to serialize encrypted message: {}", e))?;
609
610		self.send("secure_message", encrypted_data).await
611	}
612
613	/// Handle encrypted message
614	pub async fn handle_secure_message(&self, encrypted_data:serde_json::Value) -> Result<(), String> {
615		let encrypted_message:EncryptedMessage = serde_json::from_value(encrypted_data)
616			.map_err(|e| format!("Failed to deserialize encrypted message: {}", e))?;
617
618		let secure_channel =
619			SecureMessageChannel::new().map_err(|e| format!("Failed to create secure channel: {}", e))?;
620
621		let message = secure_channel
622			.decrypt_message(&encrypted_message)
623			.map_err(|e| format!("Failed to decrypt message: {}", e))?;
624
625		self.handle_incoming_message(message).await
626	}
627
628	/// Handle message with permission validation
629	pub async fn handle_message_with_permissions(&self, message:TauriIPCMessage) -> Result<(), String> {
630		let permission_manager = PermissionManager::new();
631		let context = self.create_security_context(&message);
632
633		// Extract operation from channel name
634		let operation = message.channel.replace("mountain_", "");
635
636		// Validate permission
637		permission_manager.validate_permission(&operation, &context).await?;
638
639		// Process the message
640		self.handle_incoming_message(message).await
641	}
642}
643
644/// Connection pool for IPC operations - manages concurrent connections
645/// efficiently
646///
647/// **Purpose:** Prevents connection exhaustion by pooling connections and
648/// reusing them **Features:** Health monitoring, automatic cleanup,
649/// configurable timeouts
650pub struct ConnectionPool {
651	MaxConnections:usize,
652	ConnectionTimeout:Duration,
653	Semaphore:Arc<Semaphore>,
654	ActiveConnection:Arc<AsyncMutex<HashMap<String, ConnectionHandle>>>,
655	HealthChecker:Arc<AsyncMutex<ConnectionHealthChecker>>,
656}
657
658/// Handle representing an active connection
659#[derive(Clone)]
660pub struct ConnectionHandle {
661	pub id:String,
662	pub created_at:std::time::Instant,
663	pub last_used:std::time::Instant,
664	pub health_score:f64,
665	pub error_count:usize,
666}
667
668impl ConnectionHandle {
669	/// Create a new connection handle with health monitoring
670	pub fn new() -> Self {
671		Self {
672			id:uuid::Uuid::new_v4().to_string(),
673			created_at:std::time::Instant::now(),
674			last_used:std::time::Instant::now(),
675			health_score:100.0,
676			error_count:0,
677		}
678	}
679
680	/// Update health score based on operation success
681	pub fn update_health(&mut self, success:bool) {
682		if success {
683			self.health_score = (self.health_score + 10.0).min(100.0);
684			self.error_count = 0;
685		} else {
686			self.health_score = (self.health_score - 25.0).max(0.0);
687			self.error_count += 1;
688		}
689		self.last_used = std::time::Instant::now();
690	}
691
692	/// Check if connection is healthy
693	pub fn is_healthy(&self) -> bool { self.health_score > 50.0 && self.error_count < 5 }
694}
695
696impl ConnectionPool {
697	/// Create a new connection pool with specified parameters
698	pub fn new(MaxConnections:usize, ConnectionTimeout:Duration) -> Self {
699		Self {
700			MaxConnections,
701			ConnectionTimeout,
702			Semaphore:Arc::new(Semaphore::new(MaxConnections)),
703			ActiveConnection:Arc::new(AsyncMutex::new(HashMap::new())),
704			HealthChecker:Arc::new(AsyncMutex::new(ConnectionHealthChecker::new())),
705		}
706	}
707
708	/// Get a connection handle from the pool with timeout
709	pub async fn GetConnection(&self) -> Result<ConnectionHandle, String> {
710		let _permit = timeout(self.ConnectionTimeout, self.Semaphore.acquire())
711			.await
712			.map_err(|_| "Connection timeout")?
713			.map_err(|e| format!("Failed to acquire connection: {}", e))?;
714
715		let handle = ConnectionHandle::new();
716
717		{
718			let mut connections = self.ActiveConnection.lock().await;
719			connections.insert(handle.id.clone(), handle.clone());
720		}
721
722		// Start health monitoring for this connection
723		self.StartHealthMonitoring(&handle.id).await;
724
725		Ok(handle)
726	}
727
728	/// Release a connection handle back to the pool
729	pub async fn ReleaseConnection(&self, handle:ConnectionHandle) {
730		{
731			let mut connections = self.ActiveConnection.lock().await;
732			connections.remove(&handle.id);
733		}
734
735		// The permit is released when dropped
736	}
737
738	/// Get connection statistics for monitoring
739	pub async fn GetStats(&self) -> ConnectionStats {
740		let connections = self.ActiveConnection.lock().await;
741		let healthy_connections = connections.values().filter(|h| h.is_healthy()).count();
742
743		ConnectionStats {
744			total_connections:connections.len(),
745			healthy_connections,
746			max_connections:self.MaxConnections,
747			available_permits:self.Semaphore.available_permits(),
748			connection_timeout:self.ConnectionTimeout,
749		}
750	}
751
752	/// Clean up stale connections
753	pub async fn CleanUpStaleConnections(&self) -> usize {
754		let mut connections = self.ActiveConnection.lock().await;
755		let now = std::time::Instant::now();
756		// Stale connections are those unused for 5 minutes (300 seconds)
757		let stale_threshold = Duration::from_secs(300);
758
759		let stale_ids:Vec<String> = connections
760			.iter()
761			.filter(|(_, handle)| now.duration_since(handle.last_used) > stale_threshold || !handle.is_healthy())
762			.map(|(id, _)| id.clone())
763			.collect();
764
765		let stale_count = stale_ids.len();
766		for id in stale_ids {
767			connections.remove(&id);
768		}
769
770		stale_count
771	}
772
773	/// Start health monitoring for a connection
774	async fn StartHealthMonitoring(&self, connection_id:&str) {
775		let health_checker = self.HealthChecker.clone();
776		let active_connection = self.ActiveConnection.clone();
777		let connection_id = connection_id.to_string();
778
779		tokio::spawn(async move {
780			let mut interval = tokio::time::interval(Duration::from_secs(30));
781
782			loop {
783				interval.tick().await;
784
785				let checker = health_checker.lock().await;
786				let mut connections = match active_connection.try_lock() {
787					Ok(conns) => conns,
788					Err(_) => continue,
789				};
790
791				if let Some(handle) = connections.get_mut(&connection_id) {
792					let is_healthy = checker.check_connection_health(handle).await;
793					handle.update_health(is_healthy);
794
795					if !handle.is_healthy() {
796						dev_log!(
797							"ipc",
798							"Connection {} marked as unhealthy (score: {:.1})",
799							handle.id,
800							handle.health_score
801						);
802					}
803				} else {
804					// The connection has been removed from the pool, stop monitoring
805					break;
806				}
807			}
808		});
809	}
810}
811
812/// Connection health checker
813struct ConnectionHealthChecker {
814	ping_timeout:Duration,
815}
816
817impl ConnectionHealthChecker {
818	fn new() -> Self { Self { ping_timeout:Duration::from_secs(5) } }
819
820	/// Check connection health by sending a ping
821	async fn check_connection_health(&self, _handle:&mut ConnectionHandle) -> bool {
822		// Simulate health check by ensuring connection can handle basic operations
823		// In a real implementation, this would send an actual ping message
824		let start_time = std::time::Instant::now();
825
826		// Simulate network latency
827		tokio::time::sleep(Duration::from_millis(10)).await;
828
829		let response_time = start_time.elapsed();
830
831		// Connection is healthy if response time is reasonable
832		response_time < self.ping_timeout
833	}
834}
835
836/// Connection statistics
837#[derive(Debug, Clone, Default)]
838pub struct ConnectionStats {
839	pub total_connections:usize,
840	pub healthy_connections:usize,
841	pub max_connections:usize,
842	pub available_permits:usize,
843	pub connection_timeout:Duration,
844}
845
846/// Secure message channel with encryption and authentication
847pub struct SecureMessageChannel {
848	encryption_key:LessSafeKey,
849	hmac_key:Vec<u8>,
850}
851
852impl SecureMessageChannel {
853	/// Create a new secure channel
854	pub fn new() -> Result<Self, String> {
855		let rng = SystemRandom::new();
856
857		// Generate encryption key
858		let mut encryption_key_bytes = vec![0u8; 32];
859		rng.fill(&mut encryption_key_bytes)
860			.map_err(|e| format!("Failed to generate encryption key: {}", e))?;
861
862		let unbound_key = UnboundKey::new(&AES_256_GCM, &encryption_key_bytes)
863			.map_err(|e| format!("Failed to create unbound key: {}", e))?;
864
865		let encryption_key = LessSafeKey::new(unbound_key);
866
867		// Generate HMAC key
868		let mut hmac_key = vec![0u8; 32];
869		rng.fill(&mut hmac_key)
870			.map_err(|e| format!("Failed to generate HMAC key: {}", e))?;
871
872		Ok(Self { encryption_key, hmac_key })
873	}
874
875	/// Encrypt and authenticate a message
876	pub fn encrypt_message(&self, message:&TauriIPCMessage) -> Result<EncryptedMessage, String> {
877		let serialized_message =
878			serde_json::to_vec(message).map_err(|e| format!("Failed to serialize message: {}", e))?;
879
880		// Generate nonce
881		let mut nonce = [0u8; 12];
882		SystemRandom::new()
883			.fill(&mut nonce)
884			.map_err(|e| format!("Failed to generate nonce: {}", e))?;
885
886		// Encrypt message
887		let mut in_out = serialized_message.clone();
888		self.encryption_key
889			.seal_in_place_append_tag(aead::Nonce::assume_unique_for_key(nonce), aead::Aad::empty(), &mut in_out)
890			.map_err(|e| format!("Encryption failed: {}", e))?;
891
892		// Create HMAC
893		let hmac_key = hmac::Key::new(hmac::HMAC_SHA256, &self.hmac_key);
894		let hmac_tag = hmac::sign(&hmac_key, &in_out);
895
896		Ok(EncryptedMessage { nonce:nonce.to_vec(), ciphertext:in_out, hmac_tag:hmac_tag.as_ref().to_vec() })
897	}
898
899	/// Decrypt and verify a message
900	pub fn decrypt_message(&self, encrypted:&EncryptedMessage) -> Result<TauriIPCMessage, String> {
901		// Verify HMAC
902		let hmac_key = hmac::Key::new(hmac::HMAC_SHA256, &self.hmac_key);
903		hmac::verify(&hmac_key, &encrypted.ciphertext, &encrypted.hmac_tag)
904			.map_err(|_| "HMAC verification failed".to_string())?;
905
906		// Decrypt message
907		let mut in_out = encrypted.ciphertext.clone();
908		let nonce_slice:&[u8] = &encrypted.nonce;
909		let nonce_array:[u8; 12] = nonce_slice.try_into().map_err(|_| "Invalid nonce length".to_string())?;
910
911		let nonce = aead::Nonce::assume_unique_for_key(nonce_array);
912
913		self.encryption_key
914			.open_in_place(nonce, aead::Aad::empty(), &mut in_out)
915			.map_err(|e| format!("Decryption failed: {}", e))?;
916
917		// Remove authentication tag
918		let plaintext_len = in_out.len() - AES_256_GCM.tag_len();
919		in_out.truncate(plaintext_len);
920
921		// Deserialize message
922		serde_json::from_slice(&in_out).map_err(|e| format!("Failed to deserialize message: {}", e))
923	}
924
925	/// Rotate encryption keys
926	pub fn rotate_keys(&mut self) -> Result<(), String> {
927		*self = Self::new()?;
928		Ok(())
929	}
930}
931
932/// Encrypted message structure
933#[derive(Debug, Clone, Serialize, Deserialize)]
934pub struct EncryptedMessage {
935	nonce:Vec<u8>,
936	ciphertext:Vec<u8>,
937	hmac_tag:Vec<u8>,
938}
939
940/// Advanced permission-based IPC message handler
941#[tauri::command]
942pub async fn mountain_ipc_receive_message(app_handle:tauri::AppHandle, message:TauriIPCMessage) -> Result<(), String> {
943	dev_log!(
944		"ipc",
945		"[TauriIPCServer] Received IPC message from Wind on channel: {}",
946		message.channel
947	);
948
949	// Get the IPC server instance from application state
950	if let Some(ipc_server) = app_handle.try_state::<TauriIPCServer>() {
951		// Advanced security: Validate permissions before processing
952		if let Err(e) = ipc_server.validate_message_permissions(&message).await {
953			dev_log!(
954				"ipc",
955				"error: [TauriIPCServer] Permission validation failed for channel {}: {}",
956				message.channel,
957				e
958			);
959
960			// Log security event
961			ipc_server
962				.log_security_event(SecurityEvent {
963					event_type:SecurityEventType::PermissionDenied,
964					user_id:message.sender.clone().unwrap_or("unknown".to_string()),
965					operation:message.channel.clone(),
966					timestamp:std::time::SystemTime::now(),
967					details:Some(format!("Permission denied: {}", e)),
968				})
969				.await;
970
971			return Err(format!("Permission denied: {}", e));
972		}
973
974		// Advanced monitoring: Track message processing time
975		let start_time = std::time::Instant::now();
976		let result = ipc_server.handle_incoming_message(message.clone()).await;
977		let duration = start_time.elapsed();
978
979		// Record performance metrics
980		ipc_server
981			.record_performance_metrics(message.channel, duration, result.is_ok())
982			.await;
983
984		result
985	} else {
986		Err("IPC Server not found in application state".to_string())
987	}
988}
989
990/// Tauri command handler for Wind to check connection status
991///
992/// **Command Registration:** Registered in Tauri's invoke_handler
993/// Called by Wind using: `app.handle.invoke('mountain_ipc_get_status')`
994///
995/// **Response Format:**
996/// ```json
997/// {
998///   "connected": true
999/// }
1000/// ```
1001#[tauri::command]
1002pub async fn mountain_ipc_get_status(app_handle:tauri::AppHandle) -> Result<ConnectionStatus, String> {
1003	if let Some(ipc_server) = app_handle.try_state::<TauriIPCServer>() {
1004		let connected = ipc_server
1005			.get_connection_status()
1006			.map_err(|e| format!("Failed to get connection status: {}", e))?;
1007
1008		Ok(ConnectionStatus { connected })
1009	} else {
1010		Err("IPC Server not found in application state".to_string())
1011	}
1012}
1013
1014/// Security context for permission validation
1015#[derive(Debug, Clone, Serialize, Deserialize)]
1016pub struct SecurityContext {
1017	pub user_id:String,
1018	pub roles:Vec<String>,
1019	pub permissions:Vec<String>,
1020	pub ip_address:String,
1021	pub timestamp:std::time::SystemTime,
1022}
1023
1024/// Permission manager for IPC operations
1025pub struct PermissionManager {
1026	roles:Arc<RwLock<HashMap<String, Role>>>,
1027	permissions:Arc<RwLock<HashMap<String, Permission>>>,
1028	audit_log:Arc<RwLock<Vec<SecurityEvent>>>,
1029}
1030
1031/// Security event for auditing
1032#[derive(Debug, Clone, Serialize, Deserialize)]
1033pub struct SecurityEvent {
1034	pub event_type:SecurityEventType,
1035	pub user_id:String,
1036	pub operation:String,
1037	pub timestamp:std::time::SystemTime,
1038	pub details:Option<String>,
1039}
1040
1041#[derive(Debug, Clone, Serialize, Deserialize)]
1042pub enum SecurityEventType {
1043	PermissionDenied,
1044	AccessGranted,
1045	ConfigurationChange,
1046	SecurityViolation,
1047	PerformanceAnomaly,
1048}
1049
1050/// Role definition for RBAC
1051#[derive(Debug, Clone, Serialize, Deserialize)]
1052pub struct Role {
1053	pub name:String,
1054	pub permissions:Vec<String>,
1055	pub description:String,
1056}
1057
1058/// Permission definition
1059#[derive(Debug, Clone, Serialize, Deserialize)]
1060pub struct Permission {
1061	pub name:String,
1062	pub description:String,
1063	pub category:String,
1064}
1065
1066impl PermissionManager {
1067	pub fn new() -> Self {
1068		Self {
1069			roles:Arc::new(RwLock::new(HashMap::new())),
1070			permissions:Arc::new(RwLock::new(HashMap::new())),
1071			audit_log:Arc::new(RwLock::new(Vec::new())),
1072		}
1073	}
1074
1075	/// Validate permission for an operation
1076	pub async fn validate_permission(&self, operation:&str, context:&SecurityContext) -> Result<(), String> {
1077		// Check if operation requires specific permissions
1078		let required_permissions = self.get_required_permissions(operation).await;
1079
1080		if required_permissions.is_empty() {
1081			return Ok(()); // No specific permissions required
1082		}
1083
1084		// Check if user has required permissions
1085		let mut user_permissions:Vec<String> = context.permissions.iter().cloned().collect();
1086		for role in context.roles.iter() {
1087			let role_perms = self.get_role_permissions(role).await;
1088			user_permissions.extend(role_perms);
1089		}
1090
1091		for required in required_permissions {
1092			if !user_permissions.contains(&required) {
1093				return Err(format!("Missing permission: {}", required));
1094			}
1095		}
1096
1097		// Log successful access
1098		self.log_security_event(SecurityEvent {
1099			event_type:SecurityEventType::AccessGranted,
1100			user_id:context.user_id.clone(),
1101			operation:operation.to_string(),
1102			timestamp:std::time::SystemTime::now(),
1103			details:Some(format!("Access granted for operation: {}", operation)),
1104		})
1105		.await;
1106
1107		Ok(())
1108	}
1109
1110	/// Get required permissions for an operation
1111	async fn get_required_permissions(&self, operation:&str) -> Vec<String> {
1112		// Define operation-to-permission mapping
1113		match operation {
1114			"file:write" | "file:delete" => vec!["file.write".to_string()],
1115			"configuration:update" => vec!["config.update".to_string()],
1116			"storage:set" => vec!["storage.write".to_string()],
1117			"native:openExternal" => vec!["system.external".to_string()],
1118			// Operations not in the mapping require no special permissions by default
1119			_ => Vec::new(),
1120		}
1121	}
1122
1123	/// Get permissions for a role
1124	async fn get_role_permissions(&self, role_name:&str) -> Vec<String> {
1125		let roles = self.roles.read().await;
1126		roles.get(role_name).map(|role| role.permissions.clone()).unwrap_or_default()
1127	}
1128
1129	/// Log security event
1130	pub async fn log_security_event(&self, event:SecurityEvent) {
1131		let mut audit_log = self.audit_log.write().await;
1132		audit_log.push(event);
1133
1134		// Keep only last 1000 events
1135		if audit_log.len() > 1000 {
1136			audit_log.remove(0);
1137		}
1138	}
1139
1140	/// Get security audit log
1141	pub async fn get_audit_log(&self, limit:usize) -> Vec<SecurityEvent> {
1142		let audit_log = self.audit_log.read().await;
1143		audit_log.iter().rev().take(limit).cloned().collect()
1144	}
1145
1146	/// Initialize default roles and permissions
1147	pub async fn initialize_defaults(&self) {
1148		let mut permissions = self.permissions.write().await;
1149		let mut roles = self.roles.write().await;
1150
1151		// Define standard permissions
1152		let standard_permissions = vec![
1153			("file.read", "Read file operations"),
1154			("file.write", "Write file operations"),
1155			("config.read", "Read configuration"),
1156			("config.update", "Update configuration"),
1157			("storage.read", "Read storage"),
1158			("storage.write", "Write storage"),
1159			("system.external", "Access external system resources"),
1160		];
1161
1162		for (name, description) in standard_permissions {
1163			permissions.insert(
1164				name.to_string(),
1165				Permission {
1166					name:name.to_string(),
1167					description:description.to_string(),
1168					category:"standard".to_string(),
1169				},
1170			);
1171		}
1172
1173		// Define standard roles
1174		let standard_roles = vec![
1175			("user", vec!["file.read", "config.read", "storage.read"]),
1176			(
1177				"developer",
1178				vec!["file.read", "file.write", "config.read", "storage.read", "storage.write"],
1179			),
1180			(
1181				"admin",
1182				vec![
1183					"file.read",
1184					"file.write",
1185					"config.read",
1186					"config.update",
1187					"storage.read",
1188					"storage.write",
1189					"system.external",
1190				],
1191			),
1192		];
1193
1194		for (name, role_permissions) in standard_roles {
1195			roles.insert(
1196				name.to_string(),
1197				Role {
1198					name:name.to_string(),
1199					permissions:role_permissions.iter().map(|p| p.to_string()).collect(),
1200					description:format!("{} role with standard permissions", name),
1201				},
1202			);
1203		}
1204	}
1205}