1use std::{collections::HashMap, sync::Arc};
93
94use serde::{Deserialize, Serialize};
95use tokio::sync::{Mutex, RwLock};
96use systemstat::{Platform, System};
97
98use crate::{AirError, Configuration::AirConfiguration, Result, Utility, dev_log};
99
100#[derive(Debug)]
102pub struct ApplicationState {
103 pub Configuration:Arc<AirConfiguration>,
105
106 pub ServiceStatus:Arc<RwLock<HashMap<String, ServiceStatus>>>,
108
109 pub ActiveRequest:Arc<Mutex<HashMap<String, RequestStatus>>>,
111
112 pub Metrics:Arc<RwLock<PerformanceMetrics>>,
114
115 pub Resources:Arc<RwLock<ResourceUsage>>,
117
118 pub Connection:Arc<RwLock<HashMap<String, ConnectionInfo>>>,
120
121 pub BackgroundTask:Arc<Mutex<Vec<tokio::task::JoinHandle<()>>>>,
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
127pub enum ServiceStatus {
128 Starting,
129 Running,
130 Stopping,
131 Stopped,
132 Error(String),
133}
134
135#[derive(Debug, Clone)]
137pub struct RequestStatus {
138 pub RequestId:String,
139 pub Service:String,
140 pub StartedAt:u64,
141 pub Status:RequestState,
142 pub Progress:Option<f32>,
143}
144
145#[derive(Debug, Clone)]
147pub enum RequestState {
148 Pending,
149 InProgress,
150 Completed,
151 Failed(String),
152 Cancelled,
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct PerformanceMetrics {
158 pub TotalRequest:u64,
159 pub SuccessfulRequest:u64,
160 pub FailedRequest:u64,
161 pub AverageResponseTime:f64,
162 pub UptimeSeconds:u64,
163 pub LastUpdated:u64,
164}
165
166#[derive(Debug, Clone, Serialize, Deserialize)]
168pub struct ResourceUsage {
169 pub MemoryUsageMb:f64,
170 pub CPUUsagePercent:f64,
171 pub DiskUsageMb:f64,
172 pub NetworkUsageMbps:f64,
173 pub LastUpdated:u64,
174}
175
176#[derive(Debug, Clone, Serialize, Deserialize)]
178pub struct ConnectionInfo {
179 pub ConnectionId:String,
180 pub ClientId:String,
181 pub ClientVersion:String,
182 pub ProtocolVersion:u32,
183 pub LastHeartbeat:u64,
184 pub IsActive:bool,
185 pub ConnectionType:ConnectionType,
186}
187
188#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
190pub enum ConnectionType {
191 MountainMain,
192 MountainWorker,
193 Cocoon,
194 Wind,
195 External,
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct ConnectionHealthReport {
201 pub TotalConnection:usize,
202 pub HealthyConnection:usize,
203 pub StaleConnection:usize,
204 pub ConnectionByType:HashMap<String, usize>,
205 pub LastChecked:u64,
206}
207
208impl ApplicationState {
209 pub async fn New(Configuration:Arc<AirConfiguration>) -> Result<Self> {
211 let State = Self {
212 Configuration,
213 ServiceStatus:Arc::new(RwLock::new(HashMap::new())),
214 ActiveRequest:Arc::new(Mutex::new(HashMap::new())),
215 Metrics:Arc::new(RwLock::new(PerformanceMetrics {
216 TotalRequest:0,
217 SuccessfulRequest:0,
218 FailedRequest:0,
219 AverageResponseTime:0.0,
220 UptimeSeconds:0,
221 LastUpdated:Utility::CurrentTimestamp(),
222 })),
223 Resources:Arc::new(RwLock::new(ResourceUsage {
224 MemoryUsageMb:0.0,
225 CPUUsagePercent:0.0,
226 DiskUsageMb:0.0,
227 NetworkUsageMbps:0.0,
228 LastUpdated:Utility::CurrentTimestamp(),
229 })),
230 Connection:Arc::new(RwLock::new(HashMap::new())),
231 BackgroundTask:Arc::new(Mutex::new(Vec::new())),
232 };
233
234 State.InitializeServiceStatus().await?;
236
237 Ok(State)
238 }
239
240 async fn InitializeServiceStatus(&self) -> Result<()> {
242 let mut Status = self.ServiceStatus.write().await;
243
244 Status.insert("authentication".to_string(), ServiceStatus::Starting);
245 Status.insert("updates".to_string(), ServiceStatus::Starting);
246 Status.insert("downloader".to_string(), ServiceStatus::Starting);
247 Status.insert("indexing".to_string(), ServiceStatus::Starting);
248 Status.insert("grpc".to_string(), ServiceStatus::Starting);
249 Status.insert("connections".to_string(), ServiceStatus::Starting);
250
251 Ok(())
252 }
253
254 pub async fn RegisterConnection(
257 &self,
258 ConnectionId:String,
259 ClientId:String,
260 ClientVersion:String,
261 ProtocolVersion:u32,
262 ConnectionType:ConnectionType,
263 ) -> Result<()> {
264 if ConnectionId.is_empty() {
266 return Err(AirError::Configuration("Connection ID cannot be empty".to_string()));
267 }
268
269 if ClientId.is_empty() {
271 return Err(AirError::Configuration("Client ID cannot be empty".to_string()));
272 }
273
274 if ProtocolVersion == 0 {
276 return Err(AirError::Configuration("Protocol version must be greater than 0".to_string()));
277 }
278
279 let mut Connection = self.Connection.write().await;
280
281 if Connection.contains_key(&ConnectionId) {
283 return Err(AirError::Configuration(format!("Connection {} already exists", ConnectionId)));
284 }
285
286 if matches!(ConnectionType, ConnectionType::MountainMain | ConnectionType::MountainWorker) {
288 let ClientConnCount = Connection
290 .values()
291 .filter(|c| {
292 c.ClientId == ClientId
293 && matches!(c.ConnectionType, ConnectionType::MountainMain | ConnectionType::MountainWorker)
294 })
295 .count();
296
297 const MAX_CONN_PER_CLIENT:usize = 10;
298 if ClientConnCount >= MAX_CONN_PER_CLIENT {
299 return Err(AirError::ResourceLimit(format!(
300 "Client {} exceeds maximum connection limit ({})",
301 ClientId, MAX_CONN_PER_CLIENT
302 )));
303 }
304 }
305
306 Connection.insert(
307 ConnectionId.clone(),
308 ConnectionInfo {
309 ConnectionId:ConnectionId.clone(),
310 ClientId:ClientId.clone(),
311 ClientVersion,
312 ProtocolVersion,
313 LastHeartbeat:Utility::CurrentTimestamp(),
314 IsActive:true,
315 ConnectionType:ConnectionType.clone(),
316 },
317 );
318
319 dev_log!(
320 "lifecycle",
321 "Connection registered: {} - {} ({:?})",
322 ConnectionId,
323 ClientId,
324 ConnectionType
325 );
326 Ok(())
327 }
328
329 pub async fn UpdateHeartbeat(&self, ConnectionId:&str) -> Result<()> {
332 if ConnectionId.is_empty() {
333 return Err(AirError::Configuration("Connection ID cannot be empty".to_string()));
334 }
335
336 let mut Connection = self.Connection.write().await;
337
338 if let Some(Connection) = Connection.get_mut(ConnectionId) {
339 let CurrentTime = Utility::CurrentTimestamp();
340 const MAX_HEARTBEAT_INTERVAL:u64 = 120000; if CurrentTime - Connection.LastHeartbeat > MAX_HEARTBEAT_INTERVAL {
344 dev_log!(
345 "lifecycle",
346 "warn: Long heartbeat interval for connection {}: {}ms",
347 ConnectionId,
348 CurrentTime - Connection.LastHeartbeat
349 );
350 }
351
352 Connection.LastHeartbeat = CurrentTime;
353 Connection.IsActive = true;
354
355 dev_log!(
356 "lifecycle",
357 "Heartbeat updated for connection: {} (client: {})",
358 ConnectionId,
359 Connection.ClientId
360 );
361 } else {
362 return Err(AirError::Internal(format!("Connection {} not found", ConnectionId)));
363 }
364
365 Ok(())
366 }
367
368 pub async fn RemoveConnection(&self, ConnectionId:&str) -> Result<()> {
371 if ConnectionId.is_empty() {
372 return Err(AirError::Configuration("Connection ID cannot be empty".to_string()));
373 }
374
375 let mut Connection = self.Connection.write().await;
376
377 if let Some(Connection) = Connection.remove(ConnectionId) {
378 dev_log!(
379 "lifecycle",
380 "Connection removed: {} (client: {}, type: {:?})",
381 ConnectionId,
382 Connection.ClientId,
383 Connection.ConnectionType
384 );
385
386 drop(Connection); } else {
393 dev_log!(
394 "lifecycle",
395 "warn: Attempted to remove non-existent connection: {}",
396 ConnectionId
397 );
398 }
399
400 Ok(())
401 }
402
403 pub async fn GetActiveConnectionCount(&self) -> usize {
405 let Connection = self.Connection.read().await;
406 Connection.values().filter(|c| c.IsActive).count()
407 }
408
409 pub async fn GetConnectionCountByType(&self, ConnectionType:ConnectionType) -> usize {
411 let Connection = self.Connection.read().await;
412 Connection
413 .values()
414 .filter(|c| c.ConnectionType == ConnectionType && c.IsActive)
415 .count()
416 }
417
418 pub async fn GetConnectionsByType(&self, ConnectionType:ConnectionType) -> Vec<ConnectionInfo> {
420 let Connection = self.Connection.read().await;
421 Connection
422 .values()
423 .filter(|c| c.ConnectionType == ConnectionType)
424 .cloned()
425 .collect()
426 }
427
428 pub async fn GetNextMountainConnection(&self) -> Result<ConnectionInfo> {
431 let Connection = self.Connection.read().await;
432
433 let MountainConnection:Vec<_> = Connection
434 .values()
435 .filter(|c| {
436 matches!(c.ConnectionType, ConnectionType::MountainMain | ConnectionType::MountainWorker) && c.IsActive
437 })
438 .collect();
439
440 if MountainConnection.is_empty() {
441 return Err(AirError::ServiceUnavailable(
442 "No active Mountain connections available".to_string(),
443 ));
444 }
445
446 let Selected = MountainConnection[0].clone();
452
453 Ok(Selected)
454 }
455
456 pub async fn CleanupStaleConnections(&self, TimeoutSeconds:u64) -> Result<usize> {
460 let mut Connection = self.Connection.write().await;
461 let CurrentTime = Utility::CurrentTimestamp();
462 let TimeoutMs = TimeoutSeconds * 1000;
463
464 let mut RemovedCount = 0;
465 let mut RemovedByType:HashMap<String, usize> = HashMap::new();
466
467 Connection.retain(|Id, Connection| {
468 if CurrentTime - Connection.LastHeartbeat > TimeoutMs {
469 dev_log!(
470 "lifecycle",
471 "warn: Removing stale connection: {} - {} ({:?}) - idle: {}ms",
472 Id,
473 Connection.ClientId,
474 Connection.ConnectionType,
475 CurrentTime - Connection.LastHeartbeat
476 );
477
478 *RemovedByType.entry(format!("{:?}", Connection.ConnectionType)).or_insert(0) += 1;
479
480 RemovedCount += 1;
481 false
482 } else {
483 true
484 }
485 });
486
487 if RemovedCount > 0 {
488 dev_log!("lifecycle", "Cleaned up {} stale connections", RemovedCount);
489 for (ConnType, Count) in RemovedByType {
490 dev_log!("lifecycle", " - {} connections: {}", ConnType, Count);
491 }
492 }
493
494 Ok(RemovedCount)
495 }
496
497 pub async fn RegisterBackgroundTask(&self, TaskItem:tokio::task::JoinHandle<()>) -> Result<()> {
499 let mut BackgroundTask = self.BackgroundTask.lock().await;
500 BackgroundTask.push(TaskItem);
501 dev_log!("lifecycle", "Background task registered. Total tasks: {}", BackgroundTask.len());
502 Ok(())
503 }
504
505 pub async fn StopAllBackgroundTasks(&self) -> Result<()> {
507 let mut BackgroundTask = self.BackgroundTask.lock().await;
508
509 let TaskCount = BackgroundTask.len();
510 dev_log!("lifecycle", "Stopping {} background tasks", TaskCount);
511 for TaskItem in BackgroundTask.drain(..) {
513 TaskItem.abort();
514 }
515
516 dev_log!("lifecycle", "Stopped all {} background tasks", TaskCount);
517 Ok(())
518 }
519
520 pub async fn UpdateServiceStatus(&self, Service:&str, Status:ServiceStatus) -> Result<()> {
522 if Service.is_empty() {
523 return Err(AirError::Configuration("Service name cannot be empty".to_string()));
524 }
525
526 let mut ServiceStatus = self.ServiceStatus.write().await;
527 let StatusClone = Status.clone();
528 ServiceStatus.insert(Service.to_string(), Status);
529 dev_log!("lifecycle", "Service status updated: {} -> {:?}", Service, StatusClone);
530 Ok(())
531 }
532
533 pub async fn GetServiceStatus(&self, Service:&str) -> Option<ServiceStatus> {
535 let ServiceStatus = self.ServiceStatus.read().await;
536 ServiceStatus.get(Service).cloned()
537 }
538
539 pub async fn GetAllServiceStatuses(&self) -> HashMap<String, ServiceStatus> {
541 let ServiceStatus = self.ServiceStatus.read().await;
542 ServiceStatus.clone()
543 }
544
545 pub async fn RegisterRequest(&self, RequestId:String, Service:String) -> Result<()> {
547 if RequestId.is_empty() {
548 return Err(AirError::Configuration("Request ID cannot be empty".to_string()));
549 }
550
551 if Service.is_empty() {
552 return Err(AirError::Configuration("Service name cannot be empty".to_string()));
553 }
554
555 let mut Request = self.ActiveRequest.lock().await;
556
557 if Request.contains_key(&RequestId) {
559 return Err(AirError::Configuration(format!("Request {} already exists", RequestId)));
560 }
561
562 Request.insert(
563 RequestId.clone(),
564 RequestStatus {
565 RequestId:RequestId.clone(),
566 Service,
567 StartedAt:Utility::CurrentTimestamp(),
568 Status:RequestState::Pending,
569 Progress:None,
570 },
571 );
572
573 dev_log!("lifecycle", "Request registered: {}", RequestId);
574 Ok(())
575 }
576
577 pub async fn UpdateRequestStatus(&self, RequestId:&str, Status:RequestState, Progress:Option<f32>) -> Result<()> {
579 if RequestId.is_empty() {
580 return Err(AirError::Configuration("Request ID cannot be empty".to_string()));
581 }
582
583 if let Some(p) = Progress {
585 if !(0.0..=1.0).contains(&p) {
586 return Err(AirError::Configuration("Progress must be between 0.0 and 1.0".to_string()));
587 }
588 }
589
590 let mut Request = self.ActiveRequest.lock().await;
591
592 if let Some(Request) = Request.get_mut(RequestId) {
593 Request.Status = Status;
594 Request.Progress = Progress;
595 } else {
596 return Err(AirError::Internal(format!("Request {} not found", RequestId)));
597 }
598
599 Ok(())
600 }
601
602 pub async fn RemoveRequest(&self, RequestId:&str) -> Result<()> {
604 if RequestId.is_empty() {
605 return Err(AirError::Configuration("Request ID cannot be empty".to_string()));
606 }
607
608 let mut request = self.ActiveRequest.lock().await;
609
610 if request.remove(RequestId).is_some() {
611 dev_log!("lifecycle", "Request removed: {}", RequestId);
612 }
613
614 Ok(())
615 }
616
617 pub async fn UpdateMetrics(&self, Success:bool, ResponseTime:u64) -> Result<()> {
619 let mut Metrics = self.Metrics.write().await;
620
621 Metrics.TotalRequest += 1;
622 if Success {
623 Metrics.SuccessfulRequest += 1;
624 } else {
625 Metrics.FailedRequest += 1;
626 }
627
628 let Alpha = 0.1; Metrics.AverageResponseTime = Alpha * (ResponseTime as f64) + (1.0 - Alpha) * Metrics.AverageResponseTime;
631
632 Metrics.LastUpdated = Utility::CurrentTimestamp();
633
634 Ok(())
635 }
636
637 pub async fn UpdateResourceUsage(&self) -> Result<()> {
639 let Sys = System::new();
640
641 let MemoryUsage = if let Ok(Memory) = Sys.memory() {
643 (Memory.total.as_u64() - Memory.free.as_u64()) as f64 / 1024.0 / 1024.0
644 } else {
645 dev_log!("lifecycle", "warn: Failed to get memory usage");
646 0.0
647 };
648
649 let CPUUsage = if let Ok(CPU) = Sys.cpu_load_aggregate() {
651 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
652 if let Ok(CPU) = CPU.done() {
653 (CPU.user + CPU.nice + CPU.system) as f64 * 100.0
654 } else {
655 dev_log!("lifecycle", "warn: Failed to get CPU usage after sampling");
656 0.0
657 }
658 } else {
659 dev_log!("lifecycle", "warn: Failed to start CPU load sampling");
660 0.0
661 };
662
663 let mut Resources = self.Resources.write().await;
665 Resources.MemoryUsageMb = MemoryUsage;
666 Resources.CPUUsagePercent = CPUUsage;
667 Resources.LastUpdated = Utility::CurrentTimestamp();
668
669 Ok(())
670 }
671
672 pub async fn GetMetrics(&self) -> PerformanceMetrics {
674 let metrics = self.Metrics.read().await;
675 metrics.clone()
676 }
677
678 pub async fn GetResourceUsage(&self) -> ResourceUsage {
680 let Resources = self.Resources.read().await;
681 Resources.clone()
682 }
683
684 pub async fn GetActiveRequestCount(&self) -> usize {
686 let Request = self.ActiveRequest.lock().await;
687 Request.len()
688 }
689
690 pub async fn IsRequestCancelled(&self, RequestId:&str) -> bool {
692 let Request = self.ActiveRequest.lock().await;
693 if let Some(Request) = Request.get(RequestId) {
694 matches!(Request.Status, RequestState::Cancelled)
695 } else {
696 false
697 }
698 }
699
700 pub async fn GetConfiguration(&self) -> Arc<AirConfiguration> { self.Configuration.clone() }
702
703 pub async fn UpdateConfiguration(
705 &self,
706 Section:String,
707 Updates:std::collections::HashMap<String, String>,
708 ) -> Result<()> {
709 dev_log!("lifecycle", "[ApplicationState] Updating configuration section: {}", Section);
710 if Section.is_empty() {
712 return Err(AirError::Configuration("Configuration section cannot be empty".to_string()));
713 }
714
715 if Updates.is_empty() {
717 return Err(AirError::Configuration("Configuration updates cannot be empty".to_string()));
718 }
719
720 match Section.as_str() {
728 "grpc" => {
729 dev_log!("lifecycle", "Updating gRPC configuration: {:?}", Updates);
730 },
731 "updates" => {
732 dev_log!("lifecycle", "Updating updates configuration: {:?}", Updates);
733 },
734 "downloader" => {
735 dev_log!("lifecycle", "Updating downloader configuration: {:?}", Updates);
736 },
737 "indexing" => {
738 dev_log!("lifecycle", "Updating indexing configuration: {:?}", Updates);
739 },
740 "daemon" => {
741 dev_log!("lifecycle", "Updating daemon configuration: {:?}", Updates);
742 },
743 _ => {
744 return Err(AirError::Configuration(format!("Unknown configuration section: {}", Section)));
745 },
746 }
747
748 Ok(())
749 }
750
751 pub async fn SetResourceLimits(
753 &self,
754 MemoryLimitMb:Option<u64>,
755 CPULimitPercent:Option<f64>,
756 DiskLimitMb:Option<u64>,
757 ) -> Result<()> {
758 dev_log!(
759 "lifecycle",
760 "[ApplicationState] Setting resource limits memory={:?}, CPU={:?}, disk={:?}",
761 MemoryLimitMb,
762 CPULimitPercent,
763 DiskLimitMb
764 );
765
766 if let Some(CPU) = CPULimitPercent {
768 if !(0.0..=100.0).contains(&CPU) {
769 return Err(AirError::ResourceLimit("CPU limit must be between 0 and 100".to_string()));
770 }
771 }
772
773 if let Some(Memory) = MemoryLimitMb {
775 if Memory == 0 {
776 return Err(AirError::ResourceLimit("Memory limit must be greater than 0".to_string()));
777 }
778 }
779
780 if let Some(Disk) = DiskLimitMb {
782 if Disk == 0 {
783 return Err(AirError::ResourceLimit("Disk limit must be greater than 0".to_string()));
784 }
785 }
786
787 if MemoryLimitMb.is_some() {
797 dev_log!("lifecycle", "Memory limit set: {} MB", MemoryLimitMb.unwrap());
798 }
799 if CPULimitPercent.is_some() {
800 dev_log!("lifecycle", "CPU limit set: {}%", CPULimitPercent.unwrap());
801 }
802 if DiskLimitMb.is_some() {
803 dev_log!("lifecycle", "Disk limit set: {} MB", DiskLimitMb.unwrap());
804 }
805
806 Ok(())
807 }
808
809 pub async fn CheckResourceLimits(&self) -> Result<bool> {
811 let _Resources = self.Resources.read().await;
812
813 Ok(false)
817 }
818
819 pub async fn GetConnectionHealthReport(&self) -> ConnectionHealthReport {
821 let Connection = self.Connection.read().await;
822 let CurrentTime = Utility::CurrentTimestamp();
823
824 let mut Healthy = 0;
825 let mut Stale = 0;
826 let mut ByType:HashMap<String, usize> = HashMap::new();
827
828 for ConnectionItem in Connection.values() {
829 let IsStale = CurrentTime - ConnectionItem.LastHeartbeat > 120000; if IsStale {
832 Stale += 1;
833 } else if ConnectionItem.IsActive {
834 Healthy += 1;
835 }
836
837 *ByType.entry(format!("{:?}", ConnectionItem.ConnectionType)).or_insert(0) += 1;
838 }
839
840 ConnectionHealthReport {
841 TotalConnection:Connection.len(),
842 HealthyConnection:Healthy,
843 StaleConnection:Stale,
844 ConnectionByType:ByType,
845 LastChecked:CurrentTime,
846 }
847 }
848}