1use std::{
79 sync::{
80 Arc,
81 atomic::{AtomicI64, AtomicU64, Ordering},
82 },
83 time::{Duration, Instant},
84};
85
86use serde::{Deserialize, Serialize};
87
88use crate::{AirError, Result, dev_log};
89
90#[allow(dead_code)]
92struct MetricGuard {
93 current:u64,
94 max:u64,
95}
96
97impl MetricGuard {
98 #[allow(dead_code)]
99 fn new(current:u64, max:u64) -> Self { Self { current, max } }
100
101 #[allow(dead_code)]
103 fn increment(&mut self) -> bool {
104 if self.current < self.max.saturating_sub(1) {
105 self.current += 1;
106 true
107 } else {
108 dev_log!("metrics", "warn: [Metrics] Metric overflow detected, wrapping around");
109 self.current = 0;
110 true
111 }
112 }
113}
114
115#[derive(Debug)]
117struct AggregationValidator {
118 last_timestamp:Instant,
119 validation_window:Duration,
120}
121
122impl AggregationValidator {
123 fn new(validation_window_secs:u64) -> Self {
124 Self {
125 last_timestamp:Instant::now(),
126 validation_window:Duration::from_secs(validation_window_secs),
127 }
128 }
129
130 fn validate(&mut self) -> std::result::Result<(), String> {
132 let now = Instant::now();
133 if now.duration_since(self.last_timestamp) > self.validation_window {
134 dev_log!("metrics", "warn: [Metrics] Aggregation outside validation window, resetting");
135 self.last_timestamp = now;
136 Ok(())
137 } else {
138 Ok(())
139 }
140 }
141}
142
143#[derive(Debug, Clone)]
146pub struct MetricsCollector {
147 requests_total:Arc<AtomicU64>,
149 requests_successful:Arc<AtomicU64>,
150 requests_failed:Arc<AtomicU64>,
151 request_latency_sum_ms:Arc<AtomicU64>,
152 request_latency_count:Arc<AtomicU64>,
153 request_latency_min_ms:Arc<AtomicU64>,
154 request_latency_max_ms:Arc<AtomicU64>,
155
156 errors_total:Arc<AtomicU64>,
158 errors_by_type:Arc<std::sync::Mutex<std::collections::HashMap<String, u64>>>,
159
160 memory_usage_bytes:Arc<AtomicI64>,
162 cpu_usage_percent:Arc<AtomicU64>,
163 active_connections:Arc<AtomicU64>,
164 threads_active:Arc<AtomicU64>,
165
166 authentication_operations:Arc<AtomicU64>,
168 authentication_failures:Arc<AtomicU64>,
169 downloads_total:Arc<AtomicU64>,
170 downloads_completed:Arc<AtomicU64>,
171 downloads_failed:Arc<AtomicU64>,
172 downloads_bytes_total:Arc<AtomicU64>,
173 indexing_operations:Arc<AtomicU64>,
174 indexing_entries:Arc<AtomicI64>,
175 updates_checked:Arc<AtomicU64>,
176 updates_applied:Arc<AtomicU64>,
177
178 aggregator:Arc<std::sync::Mutex<AggregationValidator>>,
180}
181
182impl MetricsCollector {
183 pub fn new() -> Result<Self> {
185 dev_log!("metrics", "[Metrics] MetricsCollector initialized successfully");
186 Ok(Self {
187 requests_total:Arc::new(AtomicU64::new(0)),
188 requests_successful:Arc::new(AtomicU64::new(0)),
189 requests_failed:Arc::new(AtomicU64::new(0)),
190 request_latency_sum_ms:Arc::new(AtomicU64::new(0)),
191 request_latency_count:Arc::new(AtomicU64::new(0)),
192 request_latency_min_ms:Arc::new(AtomicU64::new(u64::MAX)),
193 request_latency_max_ms:Arc::new(AtomicU64::new(0)),
194 errors_total:Arc::new(AtomicU64::new(0)),
195 errors_by_type:Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
196 memory_usage_bytes:Arc::new(AtomicI64::new(0)),
197 cpu_usage_percent:Arc::new(AtomicU64::new(0)),
198 active_connections:Arc::new(AtomicU64::new(0)),
199 threads_active:Arc::new(AtomicU64::new(0)),
200 authentication_operations:Arc::new(AtomicU64::new(0)),
201 authentication_failures:Arc::new(AtomicU64::new(0)),
202 downloads_total:Arc::new(AtomicU64::new(0)),
203 downloads_completed:Arc::new(AtomicU64::new(0)),
204 downloads_failed:Arc::new(AtomicU64::new(0)),
205 downloads_bytes_total:Arc::new(AtomicU64::new(0)),
206 indexing_operations:Arc::new(AtomicU64::new(0)),
207 indexing_entries:Arc::new(AtomicI64::new(0)),
208 updates_checked:Arc::new(AtomicU64::new(0)),
209 updates_applied:Arc::new(AtomicU64::new(0)),
210 aggregator:Arc::new(std::sync::Mutex::new(AggregationValidator::new(3600))),
211 })
212 }
213
214 pub fn ValidateAggregation(&self) -> Result<()> {
216 match self.aggregator.lock() {
217 Ok(mut validator) => validator.validate().map_err(|e| AirError::Internal(e)),
218 Err(_) => {
219 dev_log!("metrics", "warn: [Metrics] Failed to acquire aggregation validator lock");
220 Ok(())
221 },
222 }
223 }
224
225 pub fn RecordRequestSuccess(&self, LatencySeconds:f64) {
227 let _ = self.ValidateAggregation();
228
229 let LatencyMs = (LatencySeconds * 1000.0) as u64;
230
231 let _ = self.requests_total.fetch_add(1, Ordering::Relaxed);
233 let _ = self.requests_successful.fetch_add(1, Ordering::Relaxed);
234
235 let _ = self.request_latency_sum_ms.fetch_add(LatencyMs, Ordering::Relaxed);
237 let _ = self.request_latency_count.fetch_add(1, Ordering::Relaxed);
238
239 MinMaxUpdate(&self.request_latency_min_ms, &self.request_latency_max_ms, LatencyMs);
241
242 dev_log!(
243 "metrics",
244 "[Metrics] Recorded successful request with latency: {:.3}s",
245 LatencySeconds
246 );
247 }
248
249 pub fn RecordRequestFailure(&self, ErrorType:&str, LatencySeconds:f64) {
251 let _ = self.ValidateAggregation();
252
253 let LatencyMs = (LatencySeconds * 1000.0) as u64;
254
255 let _ = self.requests_total.fetch_add(1, Ordering::Relaxed);
257 let _ = self.requests_failed.fetch_add(1, Ordering::Relaxed);
258 let _ = self.errors_total.fetch_add(1, Ordering::Relaxed);
259
260 let _ = self.request_latency_sum_ms.fetch_add(LatencyMs, Ordering::Relaxed);
262 let _ = self.request_latency_count.fetch_add(1, Ordering::Relaxed);
263
264 MinMaxUpdate(&self.request_latency_min_ms, &self.request_latency_max_ms, LatencyMs);
266
267 let RedactedError = self.RedactErrorType(ErrorType);
269 let RedactedErrorClone = RedactedError.clone();
270 if let Ok(mut error_map) = self.errors_by_type.lock() {
271 *error_map.entry(RedactedError).or_insert(0) += 1;
272 }
273
274 dev_log!(
275 "metrics",
276 "[Metrics] Recorded failed request: {}, latency: {:.3}s",
277 RedactedErrorClone,
278 LatencySeconds
279 );
280 }
281
282 pub fn UpdateResourceMetrics(&self, MemoryBytes:u64, CPUPercent:f64, ActiveConns:u64, ActiveThreads:u64) {
284 self.memory_usage_bytes.store(MemoryBytes as i64, Ordering::Relaxed);
285 self.cpu_usage_percent.store((CPUPercent * 100.0) as u64, Ordering::Relaxed);
286 self.active_connections.store(ActiveConns, Ordering::Relaxed);
287 self.threads_active.store(ActiveThreads, Ordering::Relaxed);
288
289 dev_log!(
290 "metrics",
291 "[Metrics] Updated resource metrics - Memory: {}B, CPU: {:.1}%, Connections: {}, Threads: {}",
292 MemoryBytes,
293 CPUPercent,
294 ActiveConns,
295 ActiveThreads
296 );
297 }
298
299 pub fn RecordAuthenticationOperation(&self, Success:bool) {
301 let _ = self.authentication_operations.fetch_add(1, Ordering::Relaxed);
302 if !Success {
303 let _ = self.authentication_failures.fetch_add(1, Ordering::Relaxed);
304 }
305 }
306
307 pub fn RecordDownload(&self, Success:bool, Bytes:u64) {
309 let _ = self.downloads_total.fetch_add(1, Ordering::Relaxed);
310 let _ = self.downloads_bytes_total.fetch_add(Bytes, Ordering::Relaxed);
311
312 if Success {
313 let _ = self.downloads_completed.fetch_add(1, Ordering::Relaxed);
314 } else {
315 let _ = self.downloads_failed.fetch_add(1, Ordering::Relaxed);
316 }
317 }
318
319 pub fn RecordIndexingOperation(&self, EntriesIndexed:u64) {
321 let _ = self.indexing_operations.fetch_add(1, Ordering::Relaxed);
322 self.indexing_entries.store(EntriesIndexed as i64, Ordering::Relaxed);
323 }
324
325 pub fn RecordUpdateCheck(&self, UpdatesAvailable:bool) {
327 let _ = self.updates_checked.fetch_add(1, Ordering::Relaxed);
328 if UpdatesAvailable {
329 let _ = self.updates_applied.fetch_add(1, Ordering::Relaxed);
330 }
331 }
332
333 fn RedactErrorType(&self, ErrorType:&str) -> String {
335 let Redacted = ErrorType.to_lowercase();
336
337 if Redacted.contains("password") || Redacted.contains("token") || Redacted.contains("secret") {
339 return "sensitive_error".to_string();
340 }
341
342 Redacted
343 }
344
345 pub fn ExportMetrics(&self) -> Result<String> {
347 let metrics_data = self.GetMetricsData();
348
349 let mut output = String::new();
350 output.push_str("# HELP air_requests_total Total number of requests processed by Air daemon\n");
351 output.push_str("# TYPE air_requests_total counter\n");
352 output.push_str(&format!("air_requests_total {}\n", metrics_data.requests_total));
353
354 output.push_str("# HELP air_requests_successful Total number of successful requests\n");
355 output.push_str("# TYPE air_requests_successful counter\n");
356 output.push_str(&format!("air_requests_successful {}\n", metrics_data.requests_successful));
357
358 output.push_str("# HELP air_requests_failed Total number of failed requests\n");
359 output.push_str("# TYPE air_requests_failed counter\n");
360 output.push_str(&format!("air_requests_failed {}\n", metrics_data.requests_failed));
361
362 output.push_str("# HELP air_errors_total Total number of errors encountered\n");
363 output.push_str("# TYPE air_errors_total counter\n");
364 output.push_str(&format!("air_errors_total {}\n", metrics_data.errors_total));
365
366 output.push_str("# HELP air_memory_usage_bytes Memory usage in bytes\n");
367 output.push_str("# TYPE air_memory_usage_bytes gauge\n");
368 output.push_str(&format!("air_memory_usage_bytes {}\n", metrics_data.memory_bytes));
369
370 output.push_str("# HELP air_cpu_usage_percent CPU usage in hundredths of a percent\n");
371 output.push_str("# TYPE air_cpu_usage_percent gauge\n");
372 output.push_str(&format!("air_cpu_usage_percent {}\n", metrics_data.cpu_percent));
373
374 output.push_str("# HELP air_active_connections Number of active connections\n");
375 output.push_str("# TYPE air_active_connections gauge\n");
376 output.push_str(&format!("air_active_connections {}\n", metrics_data.active_connections));
377
378 output.push_str("# HELP air_threads_active Number of active threads\n");
379 output.push_str("# TYPE air_threads_active gauge\n");
380 output.push_str(&format!("air_threads_active {}\n", metrics_data.active_threads));
381
382 output.push_str("# HELP air_authentication_operations_total Total authentication operations\n");
383 output.push_str("# TYPE air_authentication_operations_total counter\n");
384 output.push_str(&format!(
385 "air_authentication_operations_total {}\n",
386 metrics_data.authentication_operations
387 ));
388
389 output.push_str("# HELP air_authentication_failures_total Total authentication failures\n");
390 output.push_str("# TYPE air_authentication_failures_total counter\n");
391 output.push_str(&format!(
392 "air_authentication_failures_total {}\n",
393 metrics_data.authentication_failures
394 ));
395
396 output.push_str("# HELP air_downloads_total Total downloads initiated\n");
397 output.push_str("# TYPE air_downloads_total counter\n");
398 output.push_str(&format!("air_downloads_total {}\n", metrics_data.downloads_total));
399
400 output.push_str("# HELP air_downloads_completed_total Total downloads completed successfully\n");
401 output.push_str("# TYPE air_downloads_completed_total counter\n");
402 output.push_str(&format!("air_downloads_completed_total {}\n", metrics_data.downloads_completed));
403
404 output.push_str("# HELP air_downloads_failed_total Total downloads failed\n");
405 output.push_str("# TYPE air_downloads_failed_total counter\n");
406 output.push_str(&format!("air_downloads_failed_total {}\n", metrics_data.downloads_failed));
407
408 output.push_str("# HELP air_downloads_bytes_total Total bytes downloaded\n");
409 output.push_str("# TYPE air_downloads_bytes_total counter\n");
410 output.push_str(&format!("air_downloads_bytes_total {}\n", metrics_data.downloads_bytes));
411
412 output.push_str("# HELP air_indexing_operations_total Total indexing operations\n");
413 output.push_str("# TYPE air_indexing_operations_total counter\n");
414 output.push_str(&format!("air_indexing_operations_total {}\n", metrics_data.indexing_operations));
415
416 output.push_str("# HELP air_indexing_entries Number of indexed entries\n");
417 output.push_str("# TYPE air_indexing_entries gauge\n");
418 output.push_str(&format!("air_indexing_entries {}\n", metrics_data.indexing_entries));
419
420 output.push_str("# HELP air_updates_checked_total Total update checks performed\n");
421 output.push_str("# TYPE air_updates_checked_total counter\n");
422 output.push_str(&format!("air_updates_checked_total {}\n", metrics_data.updates_checked));
423
424 output.push_str("# HELP air_updates_applied_total Total updates applied\n");
425 output.push_str("# TYPE air_updates_applied_total counter\n");
426 output.push_str(&format!("air_updates_applied_total {}\n", metrics_data.updates_applied));
427
428 Ok(output)
429 }
430
431 pub fn GetMetricsData(&self) -> MetricsData {
433 let latency_avg = if self.request_latency_count.load(Ordering::Relaxed) > 0 {
434 self.request_latency_sum_ms.load(Ordering::Relaxed) as f64
435 / self.request_latency_count.load(Ordering::Relaxed) as f64
436 } else {
437 0.0
438 };
439
440 MetricsData {
441 timestamp:crate::Utility::CurrentTimestamp(),
442 requests_total:self.requests_total.load(Ordering::Relaxed),
443 requests_successful:self.requests_successful.load(Ordering::Relaxed),
444 requests_failed:self.requests_failed.load(Ordering::Relaxed),
445 errors_total:self.errors_total.load(Ordering::Relaxed),
446 memory_bytes:self.memory_usage_bytes.load(Ordering::Relaxed).max(0) as u64,
447 cpu_percent:self.cpu_usage_percent.load(Ordering::Relaxed) as f64 / 100.0,
448 active_connections:self.active_connections.load(Ordering::Relaxed),
449 active_threads:self.threads_active.load(Ordering::Relaxed),
450 authentication_operations:self.authentication_operations.load(Ordering::Relaxed),
451 authentication_failures:self.authentication_failures.load(Ordering::Relaxed),
452 downloads_total:self.downloads_total.load(Ordering::Relaxed),
453 downloads_completed:self.downloads_completed.load(Ordering::Relaxed),
454 downloads_failed:self.downloads_failed.load(Ordering::Relaxed),
455 downloads_bytes:self.downloads_bytes_total.load(Ordering::Relaxed),
456 indexing_operations:self.indexing_operations.load(Ordering::Relaxed),
457 indexing_entries:self.indexing_entries.load(Ordering::Relaxed).max(0) as u64,
458 updates_checked:self.updates_checked.load(Ordering::Relaxed),
459 updates_applied:self.updates_applied.load(Ordering::Relaxed),
460 latency_avg_ms:latency_avg,
461 latency_min_ms:self.request_latency_min_ms.load(Ordering::Relaxed),
462 latency_max_ms:self.request_latency_max_ms.load(Ordering::Relaxed),
463 }
464 }
465
466 #[cfg(test)]
468 pub fn Reset(&self) {
469 self.requests_total.store(0, Ordering::Relaxed);
470 self.requests_successful.store(0, Ordering::Relaxed);
471 self.requests_failed.store(0, Ordering::Relaxed);
472 self.request_latency_sum_ms.store(0, Ordering::Relaxed);
473 self.request_latency_count.store(0, Ordering::Relaxed);
474 self.request_latency_min_ms.store(u64::MAX, Ordering::Relaxed);
475 self.request_latency_max_ms.store(0, Ordering::Relaxed);
476 self.errors_total.store(0, Ordering::Relaxed);
477 self.memory_usage_bytes.store(0, Ordering::Relaxed);
478 self.cpu_usage_percent.store(0, Ordering::Relaxed);
479 self.active_connections.store(0, Ordering::Relaxed);
480 self.threads_active.store(0, Ordering::Relaxed);
481 self.authentication_operations.store(0, Ordering::Relaxed);
482 self.authentication_failures.store(0, Ordering::Relaxed);
483 self.downloads_total.store(0, Ordering::Relaxed);
484 self.downloads_completed.store(0, Ordering::Relaxed);
485 self.downloads_failed.store(0, Ordering::Relaxed);
486 self.downloads_bytes_total.store(0, Ordering::Relaxed);
487 self.indexing_operations.store(0, Ordering::Relaxed);
488 self.indexing_entries.store(0, Ordering::Relaxed);
489 self.updates_checked.store(0, Ordering::Relaxed);
490 self.updates_applied.store(0, Ordering::Relaxed);
491 }
492}
493
494fn MinMaxUpdate(MinMetric:&AtomicU64, MaxMetric:&AtomicU64, Value:u64) {
496 let mut CurrentMin = MinMetric.load(Ordering::Relaxed);
497 let mut CurrentMax = MaxMetric.load(Ordering::Relaxed);
498
499 loop {
500 if Value < CurrentMin {
501 match MinMetric.compare_exchange_weak(CurrentMin, Value, Ordering::Relaxed, Ordering::Relaxed) {
502 Ok(_) => break,
503 Err(NewMin) => CurrentMin = NewMin,
504 }
505 } else if Value > CurrentMax {
506 match MaxMetric.compare_exchange_weak(CurrentMax, Value, Ordering::Relaxed, Ordering::Relaxed) {
507 Ok(_) => break,
508 Err(NewMax) => CurrentMax = NewMax,
509 }
510 } else {
511 break;
512 }
513 }
514}
515
516impl Default for MetricsCollector {
517 fn default() -> Self { Self::new().expect("Failed to create MetricsCollector") }
518}
519
520#[derive(Debug, Clone, Serialize, Deserialize)]
522pub struct MetricsData {
523 pub timestamp:u64,
524 pub requests_total:u64,
525 pub requests_successful:u64,
526 pub requests_failed:u64,
527 pub errors_total:u64,
528 pub memory_bytes:u64,
529 pub cpu_percent:f64,
530 pub active_connections:u64,
531 pub active_threads:u64,
532 pub authentication_operations:u64,
533 pub authentication_failures:u64,
534 pub downloads_total:u64,
535 pub downloads_completed:u64,
536 pub downloads_failed:u64,
537 pub downloads_bytes:u64,
538 pub indexing_operations:u64,
539 pub indexing_entries:u64,
540 pub updates_checked:u64,
541 pub updates_applied:u64,
542 pub latency_avg_ms:f64,
543 pub latency_min_ms:u64,
544 pub latency_max_ms:u64,
545}
546
547impl MetricsData {
548 pub fn SuccessRate(&self) -> f64 {
550 if self.requests_total == 0 {
551 return 100.0;
552 }
553 (self.requests_successful as f64 / self.requests_total as f64) * 100.0
554 }
555
556 pub fn DownloadSuccessRate(&self) -> f64 {
558 if self.downloads_total == 0 {
559 return 100.0;
560 }
561 (self.downloads_completed as f64 / self.downloads_total as f64) * 100.0
562 }
563
564 pub fn ErrorRate(&self) -> f64 {
566 if self.requests_total == 0 {
567 return 0.0;
568 }
569 (self.errors_total as f64 / self.requests_total as f64) * 100.0
570 }
571}
572
573static METRICS_INSTANCE:std::sync::OnceLock<MetricsCollector> = std::sync::OnceLock::new();
575
576pub fn GetMetrics() -> &'static MetricsCollector { METRICS_INSTANCE.get_or_init(|| MetricsCollector::default()) }
578
579pub fn InitializeMetrics() -> Result<()> {
581 let _collector = GetMetrics();
582 dev_log!("metrics", "[Metrics] Global metrics collector initialized");
583 Ok(())
584}