1use std::{
7 collections::{HashMap, VecDeque},
8 sync::Arc,
9 time::{Duration, SystemTime},
10};
11
12use serde::{Deserialize, Serialize};
13use tokio::{
14 sync::{Mutex as AsyncMutex, RwLock},
15 time::interval,
16};
17
18use crate::dev_log;
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct DashboardConfig {
23 pub update_interval_ms:u64,
24 pub metrics_retention_hours:u64,
25 pub alert_threshold_ms:u64,
26 pub trace_sampling_rate:f64,
27 pub max_traces_stored:usize,
28}
29
30impl Default for DashboardConfig {
31 fn default() -> Self {
32 Self {
33 update_interval_ms:5000,
35 metrics_retention_hours:24,
37 alert_threshold_ms:1000,
39 trace_sampling_rate:0.1,
41 max_traces_stored:1000,
43 }
44 }
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub enum MetricType {
50 MessageProcessingTime,
51 ConnectionLatency,
52 MemoryUsage,
53 CpuUsage,
54 NetworkThroughput,
55 ErrorRate,
56 QueueSize,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct PerformanceMetric {
62 pub metric_type:MetricType,
63 pub value:f64,
64 pub timestamp:u64,
65 pub channel:Option<String>,
66 pub tags:HashMap<String, String>,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct TraceSpan {
72 pub trace_id:String,
73 pub span_id:String,
74 pub parent_span_id:Option<String>,
75 pub operation_name:String,
76 pub start_time:u64,
77 pub end_time:Option<u64>,
78 pub duration_ms:Option<u64>,
79 pub tags:HashMap<String, String>,
80 pub logs:Vec<TraceLog>,
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct TraceLog {
86 pub timestamp:u64,
87 pub message:String,
88 pub level:LogLevel,
89 pub fields:HashMap<String, String>,
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
94pub enum LogLevel {
95 Debug,
96 Info,
97 Warn,
98 Error,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct PerformanceAlert {
104 pub alert_id:String,
105 pub metric_type:MetricType,
106 pub threshold:f64,
107 pub current_value:f64,
108 pub timestamp:u64,
109 pub channel:Option<String>,
110 pub severity:AlertSeverity,
111 pub message:String,
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
116pub enum AlertSeverity {
117 Low,
118 Medium,
119 High,
120 Critical,
121}
122
123pub struct PerformanceDashboard {
125 config:DashboardConfig,
126 metrics:Arc<RwLock<VecDeque<PerformanceMetric>>>,
127 traces:Arc<RwLock<HashMap<String, TraceSpan>>>,
128 alerts:Arc<RwLock<VecDeque<PerformanceAlert>>>,
129 statistics:Arc<RwLock<DashboardStatistics>>,
130 is_running:Arc<AsyncMutex<bool>>,
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize)]
135pub struct DashboardStatistics {
136 pub total_metrics_collected:u64,
137 pub total_traces_collected:u64,
138 pub total_alerts_triggered:u64,
139 pub average_processing_time_ms:f64,
140 pub peak_processing_time_ms:u64,
141 pub error_rate_percentage:f64,
142 pub throughput_messages_per_second:f64,
143 pub memory_usage_mb:f64,
144 pub last_update:u64,
145}
146
147impl PerformanceDashboard {
148 pub fn new(config:DashboardConfig) -> Self {
150 let config_clone = config.clone();
151 let dashboard = Self {
152 config,
153 metrics:Arc::new(RwLock::new(VecDeque::new())),
154 traces:Arc::new(RwLock::new(HashMap::new())),
155 alerts:Arc::new(RwLock::new(VecDeque::new())),
156 statistics:Arc::new(RwLock::new(DashboardStatistics {
157 total_metrics_collected:0,
158 total_traces_collected:0,
159 total_alerts_triggered:0,
160 average_processing_time_ms:0.0,
161 peak_processing_time_ms:0,
162 error_rate_percentage:0.0,
163 throughput_messages_per_second:0.0,
164 memory_usage_mb:0.0,
165 last_update:SystemTime::now()
166 .duration_since(SystemTime::UNIX_EPOCH)
167 .unwrap_or_default()
168 .as_secs(),
169 })),
170 is_running:Arc::new(AsyncMutex::new(false)),
171 };
172
173 dev_log!(
174 "ipc",
175 "[PerformanceDashboard] Created dashboard with {}ms update interval",
176 config_clone.update_interval_ms
177 );
178
179 dashboard
180 }
181
182 pub async fn start(&self) -> Result<(), String> {
184 {
185 let mut running = self.is_running.lock().await;
186 if *running {
187 return Ok(());
189 }
190 *running = true;
191 }
192
193 self.start_metrics_collection().await;
195
196 self.start_alert_monitoring().await;
198
199 self.start_data_cleanup().await;
201
202 dev_log!("ipc", "[PerformanceDashboard] Performance dashboard started");
203 Ok(())
204 }
205
206 pub async fn stop(&self) -> Result<(), String> {
208 {
209 let mut running = self.is_running.lock().await;
210 if !*running {
211 return Ok(());
213 }
214 *running = false;
215 }
216
217 {
219 let mut metrics = self.metrics.write().await;
220 metrics.clear();
221 }
222
223 {
224 let mut traces = self.traces.write().await;
225 traces.clear();
226 }
227
228 {
229 let mut alerts = self.alerts.write().await;
230 alerts.clear();
231 }
232
233 dev_log!("ipc", "[PerformanceDashboard] Performance dashboard stopped");
234 Ok(())
235 }
236
237 pub async fn record_metric(&self, metric:PerformanceMetric) {
239 let mut metrics = self.metrics.write().await;
240 metrics.push_back(metric.clone());
241
242 self.update_statistics().await;
244
245 self.check_alerts(&metric).await;
247
248 dev_log!("ipc", "[PerformanceDashboard] Recorded metric: {:?}", metric.metric_type);
249 }
250
251 pub async fn start_trace_span(&self, operation_name:String) -> TraceSpan {
253 let trace_id = Self::generate_trace_id();
254 let span_id = Self::generate_span_id();
255
256 let span = TraceSpan {
257 trace_id:trace_id.clone(),
258 span_id:span_id.clone(),
259 parent_span_id:None,
260 operation_name,
261 start_time:SystemTime::now()
262 .duration_since(SystemTime::UNIX_EPOCH)
263 .unwrap_or_default()
264 .as_millis() as u64,
265 end_time:None,
266 duration_ms:None,
267 tags:HashMap::new(),
268 logs:Vec::new(),
269 };
270
271 {
273 let mut traces = self.traces.write().await;
274 traces.insert(span_id.clone(), span.clone());
275 }
276
277 {
279 let mut stats = self.statistics.write().await;
280 stats.total_traces_collected += 1;
281 }
282
283 span
284 }
285
286 pub async fn end_trace_span(&self, span_id:&str) -> Result<(), String> {
288 let mut traces = self.traces.write().await;
289
290 if let Some(span) = traces.get_mut(span_id) {
291 let end_time = SystemTime::now()
292 .duration_since(SystemTime::UNIX_EPOCH)
293 .unwrap_or_default()
294 .as_millis() as u64;
295
296 span.end_time = Some(end_time);
297 span.duration_ms = Some(end_time.saturating_sub(span.start_time));
298
299 dev_log!(
300 "ipc",
301 "[PerformanceDashboard] Ended trace span: {} (duration: {}ms)",
302 span.operation_name,
303 span.duration_ms.unwrap_or(0)
304 );
305
306 Ok(())
307 } else {
308 Err(format!("Trace span not found: {}", span_id))
309 }
310 }
311
312 pub async fn add_trace_log(&self, span_id:&str, log:TraceLog) -> Result<(), String> {
314 let mut traces = self.traces.write().await;
315
316 if let Some(span) = traces.get_mut(span_id) {
317 span.logs.push(log);
318 Ok(())
319 } else {
320 Err(format!("Trace span not found: {}", span_id))
321 }
322 }
323
324 async fn start_metrics_collection(&self) {
326 let dashboard = Arc::new(self.clone());
327
328 tokio::spawn(async move {
329 let mut interval = interval(Duration::from_millis(dashboard.config.update_interval_ms));
330
331 while *dashboard.is_running.lock().await {
332 interval.tick().await;
333
334 dashboard.collect_system_metrics().await;
336
337 dashboard.update_statistics().await;
339 }
340 });
341 }
342
343 async fn start_alert_monitoring(&self) {
345 let dashboard = Arc::new(self.clone());
346
347 tokio::spawn(async move {
348 let mut interval = interval(Duration::from_secs(10));
349
350 while *dashboard.is_running.lock().await {
351 interval.tick().await;
352
353 dashboard.check_performance_alerts().await;
355 }
356 });
357 }
358
359 async fn start_data_cleanup(&self) {
361 let dashboard = Arc::new(self.clone());
362
363 tokio::spawn(async move {
364 let mut interval = interval(Duration::from_secs(3600));
366
367 while *dashboard.is_running.lock().await {
368 interval.tick().await;
369
370 dashboard.cleanup_old_data().await;
372 }
373 });
374 }
375
376 async fn collect_system_metrics(&self) {
378 if let Ok(memory_usage) = Self::get_memory_usage() {
380 let metric = PerformanceMetric {
381 metric_type:MetricType::MemoryUsage,
382 value:memory_usage,
383 timestamp:SystemTime::now()
384 .duration_since(SystemTime::UNIX_EPOCH)
385 .unwrap_or_default()
386 .as_millis() as u64,
387 channel:None,
388 tags:HashMap::new(),
389 };
390
391 self.record_metric(metric).await;
392 }
393
394 if let Ok(cpu_usage) = Self::get_cpu_usage() {
396 let metric = PerformanceMetric {
397 metric_type:MetricType::CpuUsage,
398 value:cpu_usage,
399 timestamp:SystemTime::now()
400 .duration_since(SystemTime::UNIX_EPOCH)
401 .unwrap_or_default()
402 .as_millis() as u64,
403 channel:None,
404 tags:HashMap::new(),
405 };
406
407 self.record_metric(metric).await;
408 }
409 }
410
411 async fn update_statistics(&self) {
413 let metrics = self.metrics.read().await;
414 let mut stats = self.statistics.write().await;
415
416 let processing_metrics:Vec<&PerformanceMetric> = metrics
418 .iter()
419 .filter(|m| matches!(m.metric_type, MetricType::MessageProcessingTime))
420 .collect();
421
422 if !processing_metrics.is_empty() {
423 let total_time:f64 = processing_metrics.iter().map(|m| m.value).sum();
424 stats.average_processing_time_ms = total_time / processing_metrics.len() as f64;
425
426 stats.peak_processing_time_ms = processing_metrics.iter().map(|m| m.value as u64).max().unwrap_or(0);
427 }
428
429 let error_metrics:Vec<&PerformanceMetric> = metrics
431 .iter()
432 .filter(|m| matches!(m.metric_type, MetricType::ErrorRate))
433 .collect();
434
435 if !error_metrics.is_empty() {
436 let total_errors:f64 = error_metrics.iter().map(|m| m.value).sum();
437 stats.error_rate_percentage = total_errors / error_metrics.len() as f64;
438 }
439
440 let throughput_metrics:Vec<&PerformanceMetric> = metrics
442 .iter()
443 .filter(|m| matches!(m.metric_type, MetricType::NetworkThroughput))
444 .collect();
445
446 if !throughput_metrics.is_empty() {
447 let total_throughput:f64 = throughput_metrics.iter().map(|m| m.value).sum();
448 stats.throughput_messages_per_second = total_throughput / throughput_metrics.len() as f64;
449 }
450
451 let memory_metrics:Vec<&PerformanceMetric> = metrics
453 .iter()
454 .filter(|m| matches!(m.metric_type, MetricType::MemoryUsage))
455 .collect();
456
457 if !memory_metrics.is_empty() {
458 let total_memory:f64 = memory_metrics.iter().map(|m| m.value).sum();
459 stats.memory_usage_mb = total_memory / memory_metrics.len() as f64;
460 }
461
462 stats.last_update = SystemTime::now()
463 .duration_since(SystemTime::UNIX_EPOCH)
464 .unwrap_or_default()
465 .as_secs();
466 }
467
468 async fn check_alerts(&self, metric:&PerformanceMetric) {
470 let threshold = match metric.metric_type {
471 MetricType::MessageProcessingTime => self.config.alert_threshold_ms as f64,
472 MetricType::ErrorRate => 5.0,
474 MetricType::MemoryUsage => 1024.0,
476 MetricType::CpuUsage => 90.0,
478 _ => return,
480 };
481
482 if metric.value > threshold {
483 let severity = match metric.value / threshold {
484 ratio if ratio > 5.0 => AlertSeverity::Critical,
485 ratio if ratio > 3.0 => AlertSeverity::High,
486 ratio if ratio > 2.0 => AlertSeverity::Medium,
487 _ => AlertSeverity::Low,
488 };
489
490 let alert = PerformanceAlert {
491 alert_id:Self::generate_alert_id(),
492 metric_type:metric.metric_type.clone(),
493 threshold,
494 current_value:metric.value,
495 timestamp:metric.timestamp,
496 channel:metric.channel.clone(),
497 severity,
498 message:format!(
499 "{} exceeded threshold: {} > {}",
500 Self::metric_type_name(&metric.metric_type),
501 metric.value,
502 threshold
503 ),
504 };
505
506 {
507 let mut alerts = self.alerts.write().await;
508 alerts.push_back(alert.clone());
509 }
510
511 {
512 let mut stats = self.statistics.write().await;
513 stats.total_alerts_triggered += 1;
514 }
515
516 dev_log!("ipc", "warn: [PerformanceDashboard] Alert triggered: {}", alert.message);
517 }
518 }
519
520 async fn check_performance_alerts(&self) {
522 dev_log!("ipc", "[PerformanceDashboard] Checking performance alerts");
525 }
526
527 async fn cleanup_old_data(&self) {
529 let retention_threshold = SystemTime::now()
530 .duration_since(SystemTime::UNIX_EPOCH)
531 .unwrap_or_default()
532 .as_secs()
533 - (self.config.metrics_retention_hours * 3600);
534
535 {
537 let mut metrics = self.metrics.write().await;
538 metrics.retain(|m| m.timestamp >= retention_threshold);
539 }
540
541 {
543 let mut traces = self.traces.write().await;
544 traces.retain(|_, span| span.start_time >= retention_threshold);
545
546 if traces.len() > self.config.max_traces_stored {
548 let excess = traces.len() - self.config.max_traces_stored;
549 let keys_to_remove:Vec<String> = traces.keys().take(excess).cloned().collect();
550
551 for key in keys_to_remove {
552 traces.remove(&key);
553 }
554 }
555 }
556
557 {
559 let mut alerts = self.alerts.write().await;
560 alerts.retain(|a| a.timestamp >= retention_threshold);
561 }
562
563 dev_log!("ipc", "[PerformanceDashboard] Cleaned up old data");
564 }
565
566 fn get_memory_usage() -> Result<f64, String> {
568 Ok(100.0)
573 }
574
575 fn get_cpu_usage() -> Result<f64, String> {
577 Ok(25.0)
582 }
583
584 fn generate_trace_id() -> String { uuid::Uuid::new_v4().to_string() }
586
587 fn generate_span_id() -> String { uuid::Uuid::new_v4().to_string() }
589
590 fn generate_alert_id() -> String { uuid::Uuid::new_v4().to_string() }
592
593 fn metric_type_name(metric_type:&MetricType) -> &'static str {
595 match metric_type {
596 MetricType::MessageProcessingTime => "Message Processing Time",
597 MetricType::ConnectionLatency => "Connection Latency",
598 MetricType::MemoryUsage => "Memory Usage",
599 MetricType::CpuUsage => "CPU Usage",
600 MetricType::NetworkThroughput => "Network Throughput",
601 MetricType::ErrorRate => "Error Rate",
602 MetricType::QueueSize => "Queue Size",
603 }
604 }
605
606 pub async fn get_statistics(&self) -> DashboardStatistics { self.statistics.read().await.clone() }
608
609 pub async fn get_recent_metrics(&self, limit:usize) -> Vec<PerformanceMetric> {
611 let metrics = self.metrics.read().await;
612 metrics.iter().rev().take(limit).cloned().collect()
613 }
614
615 pub async fn get_active_alerts(&self) -> Vec<PerformanceAlert> {
617 let alerts = self.alerts.read().await;
618 alerts.iter().rev().cloned().collect()
619 }
620
621 pub async fn get_trace(&self, trace_id:&str) -> Option<TraceSpan> {
623 let traces = self.traces.read().await;
624 traces.values().find(|span| span.trace_id == trace_id).cloned()
625 }
626
627 pub fn default_dashboard() -> Self { Self::new(DashboardConfig::default()) }
629
630 pub fn high_frequency_dashboard() -> Self {
632 Self::new(DashboardConfig {
633 update_interval_ms:1000,
635 metrics_retention_hours:1,
637 alert_threshold_ms:500,
639 trace_sampling_rate:1.0,
641 max_traces_stored:5000,
643 })
644 }
645}
646
647impl Clone for PerformanceDashboard {
648 fn clone(&self) -> Self {
649 Self {
650 config:self.config.clone(),
651 metrics:self.metrics.clone(),
652 traces:self.traces.clone(),
653 alerts:self.alerts.clone(),
654 statistics:self.statistics.clone(),
655 is_running:Arc::new(AsyncMutex::new(false)),
656 }
657 }
658}
659
660impl PerformanceDashboard {
662 pub fn create_metric(
664 metric_type:MetricType,
665 value:f64,
666 channel:Option<String>,
667 tags:HashMap<String, String>,
668 ) -> PerformanceMetric {
669 PerformanceMetric {
670 metric_type,
671 value,
672 timestamp:SystemTime::now()
673 .duration_since(SystemTime::UNIX_EPOCH)
674 .unwrap_or_default()
675 .as_millis() as u64,
676 channel,
677 tags,
678 }
679 }
680
681 pub fn create_trace_log(message:String, level:LogLevel, fields:HashMap<String, String>) -> TraceLog {
683 TraceLog {
684 timestamp:SystemTime::now()
685 .duration_since(SystemTime::UNIX_EPOCH)
686 .unwrap_or_default()
687 .as_millis() as u64,
688 message,
689 level,
690 fields,
691 }
692 }
693
694 pub fn calculate_performance_score(average_processing_time:f64, error_rate:f64, throughput:f64) -> f64 {
696 let time_score = 100.0 / (1.0 + average_processing_time / 100.0);
698 let error_score = 100.0 * (1.0 - error_rate / 100.0);
699 let throughput_score = throughput / 1000.0;
702
703 (time_score * 0.4 + error_score * 0.4 + throughput_score * 0.2)
704 .max(0.0)
705 .min(100.0)
706 }
707
708 pub fn format_metric_value(metric_type:&MetricType, value:f64) -> String {
710 match metric_type {
711 MetricType::MessageProcessingTime => format!("{:.2}ms", value),
712 MetricType::ConnectionLatency => format!("{:.2}ms", value),
713 MetricType::MemoryUsage => format!("{:.2}MB", value),
714 MetricType::CpuUsage => format!("{:.2}%", value),
715 MetricType::NetworkThroughput => format!("{:.2} msg/s", value),
716 MetricType::ErrorRate => format!("{:.2}%", value),
717 MetricType::QueueSize => format!("{:.0}", value),
718 }
719 }
720}
721
722#[cfg(test)]
723mod tests {
724 use super::*;
725
726 #[tokio::test]
727 async fn test_performance_dashboard_creation() {
728 let dashboard = PerformanceDashboard::default_dashboard();
729 assert_eq!(dashboard.config.update_interval_ms, 5000);
730 }
731
732 #[tokio::test]
733 async fn test_metric_recording() {
734 let dashboard = PerformanceDashboard::default_dashboard();
735 dashboard.start().await.unwrap();
736
737 let metric = PerformanceDashboard::create_metric(
738 MetricType::MessageProcessingTime,
739 150.0,
740 Some("test_channel".to_string()),
741 HashMap::new(),
742 );
743
744 dashboard.record_metric(metric.clone()).await;
745
746 let recent_metrics = dashboard.get_recent_metrics(10).await;
747 assert!(!recent_metrics.is_empty());
748
749 dashboard.stop().await.unwrap();
750 }
751
752 #[tokio::test]
753 async fn test_trace_span_management() {
754 let dashboard = PerformanceDashboard::default_dashboard();
755 dashboard.start().await.unwrap();
756
757 let span = dashboard.start_trace_span("test_operation".to_string()).await;
758 assert_eq!(span.operation_name, "test_operation");
759
760 dashboard.end_trace_span(&span.span_id).await.unwrap();
761
762 let trace = dashboard.get_trace(&span.trace_id).await;
763 assert!(trace.is_some());
764
765 dashboard.stop().await.unwrap();
766 }
767
768 #[tokio::test]
769 async fn test_alert_generation() {
770 let dashboard = PerformanceDashboard::default_dashboard();
771 dashboard.start().await.unwrap();
772
773 let metric = PerformanceDashboard::create_metric(
775 MetricType::MessageProcessingTime,
776 2000.0,
778 None,
779 HashMap::new(),
780 );
781
782 dashboard.record_metric(metric).await;
783
784 let alerts = dashboard.get_active_alerts().await;
785 assert!(!alerts.is_empty());
786
787 dashboard.stop().await.unwrap();
788 }
789
790 #[test]
791 fn test_performance_score_calculation() {
792 let score = PerformanceDashboard::calculate_performance_score(50.0, 2.0, 500.0);
793 assert!(score >= 0.0 && score <= 100.0);
794 }
795
796 #[test]
797 fn test_metric_value_formatting() {
798 let formatted = PerformanceDashboard::format_metric_value(&MetricType::MessageProcessingTime, 123.456);
799 assert_eq!(formatted, "123.46ms");
800 }
801}