1use std::{collections::HashMap, sync::Arc};
9
10use tonic::{Request, Response, Status};
11use tokio_stream::StreamExt as TokioStreamExt;
12use async_trait::async_trait;
13
14use crate::dev_log;
15use crate::{
17 AirError,
18 ApplicationState::ApplicationState,
19 Authentication::AuthenticationService,
20 Downloader::DownloadManager,
21 Indexing::{
22 FileIndexer,
23 Store::QueryIndex::{SearchMode, SearchQuery},
24 },
25 Result,
26 Updates::UpdateManager,
27 Utility::CurrentTimestamp,
28 Vine::Generated::{
29 air as air_generated,
30 air::{
31 ApplyUpdateRequest,
32 ApplyUpdateResponse,
33 AuthenticationRequest,
34 AuthenticationResponse,
35 ConfigurationRequest,
36 ConfigurationResponse,
37 DownloadRequest,
38 DownloadResponse,
39 DownloadStreamRequest,
40 DownloadStreamResponse,
41 FileInfoRequest,
42 FileInfoResponse,
43 FileResult,
44 HealthCheckRequest,
45 HealthCheckResponse,
46 IndexRequest,
47 IndexResponse,
48 MetricsRequest,
49 MetricsResponse,
50 ResourceLimitsRequest,
51 ResourceLimitsResponse,
52 ResourceUsageRequest,
53 ResourceUsageResponse,
54 SearchRequest,
55 SearchResponse,
56 StatusRequest,
57 StatusResponse,
58 UpdateCheckRequest,
59 UpdateCheckResponse,
60 UpdateConfigurationRequest,
61 UpdateConfigurationResponse,
62 air_service_server::AirService,
63 },
64 },
65};
66
67#[derive(Clone)]
69pub struct AirVinegRPCService {
70 AppState:Arc<ApplicationState>,
72
73 AuthService:Arc<AuthenticationService>,
75
76 UpdateManager:Arc<UpdateManager>,
78
79 DownloadManager:Arc<DownloadManager>,
81
82 FileIndexer:Arc<FileIndexer>,
84
85 ActiveConnections:Arc<tokio::sync::RwLock<HashMap<String, ConnectionMetadata>>>,
87}
88
89#[derive(Debug, Clone)]
91#[allow(dead_code)]
92struct ConnectionMetadata {
93 pub ClientId:String,
94 pub ClientVersion:String,
95 pub ProtocolVersion:u32,
96 pub LastRequestTime:u64,
97 pub RequestCount:u64,
98 pub ConnectionType:crate::ApplicationState::ConnectionType,
99}
100
101impl AirVinegRPCService {
102 pub fn new(
104 AppState:Arc<ApplicationState>,
105 AuthService:Arc<AuthenticationService>,
106 UpdateManager:Arc<UpdateManager>,
107 DownloadManager:Arc<DownloadManager>,
108 FileIndexer:Arc<FileIndexer>,
109 ) -> Self {
110 dev_log!("grpc", "[AirVinegRPCService] New instance created");
111 Self {
112 AppState,
113 AuthService,
114 UpdateManager,
115 DownloadManager,
116 FileIndexer,
117 ActiveConnections:Arc::new(tokio::sync::RwLock::new(HashMap::new())),
118 }
119 }
120
121 async fn TrackConnection<RequestType>(
123 &self,
124 Request:&tonic::Request<RequestType>,
125 _ServiceName:&str,
126 ) -> std::result::Result<String, Status> {
127 let Metadata = Request.metadata();
128 let ConnectionId = Metadata
129 .get("connection-id")
130 .map(|v| v.to_str().unwrap_or_default().to_string())
131 .unwrap_or_else(|| crate::Utility::GenerateRequestId());
132
133 let ClientId = Metadata
134 .get("client-id")
135 .map(|v| v.to_str().unwrap_or_default().to_string())
136 .unwrap_or_else(|| "unknown".to_string());
137
138 let ClientVersion = Metadata
139 .get("client-version")
140 .map(|v| v.to_str().unwrap_or_default().to_string())
141 .unwrap_or_else(|| "unknown".to_string());
142
143 let ProtocolVersion = Metadata
144 .get("protocol-version")
145 .map(|v| v.to_str().unwrap_or_default().parse().unwrap_or(1))
146 .unwrap_or(1);
147
148 let mut Connections = self.ActiveConnections.write().await;
150 let ConnectionMetadata = Connections.entry(ConnectionId.clone()).or_insert_with(|| {
151 ConnectionMetadata {
152 ClientId:ClientId.clone(),
153 ClientVersion:ClientVersion.clone(),
154 ProtocolVersion,
155 LastRequestTime:crate::Utility::CurrentTimestamp(),
156 RequestCount:0,
157 ConnectionType:crate::ApplicationState::ConnectionType::MountainMain,
158 }
159 });
160
161 ConnectionMetadata.LastRequestTime = crate::Utility::CurrentTimestamp();
162 ConnectionMetadata.RequestCount += 1;
163
164 self.AppState
166 .RegisterConnection(
167 ConnectionId.clone(),
168 ClientId,
169 ClientVersion,
170 ProtocolVersion,
171 crate::ApplicationState::ConnectionType::MountainMain,
172 )
173 .await
174 .map_err(|e| Status::internal(e.to_string()))?;
175
176 Ok(ConnectionId)
177 }
178
179 #[allow(dead_code)]
181 fn validate_protocol_version(&self, ClientVersion:u32) -> std::result::Result<(), Status> {
182 if ClientVersion > crate::ProtocolVersion {
183 return Err(Status::failed_precondition(format!(
184 "Client protocol version {} is newer than server version {}",
185 ClientVersion,
186 crate::ProtocolVersion
187 )));
188 }
189
190 if ClientVersion < crate::ProtocolVersion {
191 dev_log!(
192 "grpc",
193 "warn: Client using older protocol version {} (server: {})",
194 ClientVersion,
195 crate::ProtocolVersion
196 );
197 }
198
199 Ok(())
200 }
201}
202
203#[async_trait]
204impl AirService for AirVinegRPCService {
205 async fn authenticate(
207 &self,
208 Request:Request<AuthenticationRequest>,
209 ) -> std::result::Result<Response<AuthenticationResponse>, Status> {
210 let ConnectionId = self.TrackConnection(&Request, "authentication").await?;
212
213 let RequestData = Request.into_inner();
214 let request_id = RequestData.request_id.clone();
215
216 dev_log!(
217 "grpc",
218 "[AirVinegRPCService] Authentication request received [ID: {}] [Connection: {}]",
219 request_id,
220 ConnectionId
221 );
222
223 self.AppState
224 .RegisterRequest(request_id.clone(), "authentication".to_string())
225 .await
226 .map_err(|e| Status::internal(e.to_string()))?;
227
228 if RequestData.username.is_empty() || RequestData.password.is_empty() || RequestData.provider.is_empty() {
230 let ErrorMessage = "Invalid authentication parameters".to_string();
231 self.AppState
232 .UpdateRequestStatus(
233 &request_id,
234 crate::ApplicationState::RequestState::Failed(ErrorMessage.clone()),
235 None,
236 )
237 .await
238 .ok();
239
240 return Ok(Response::new(air_generated::AuthenticationResponse {
241 request_id,
242 success:false,
243 token:String::new(),
244 error:ErrorMessage,
245 }));
246 }
247
248 let username_for_log = RequestData.username.clone();
250 let password = RequestData.password;
251 let provider = RequestData.provider;
252
253 let result = self
254 .AuthService
255 .AuthenticateUser(RequestData.username, password, provider)
256 .await;
257
258 match result {
259 Ok(token) => {
260 self.AppState
261 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
262 .await
263 .ok();
264
265 dev_log!(
267 "grpc",
268 "[AirVinegRPCService] Authentication successful for user: {} [Connection: {}]",
269 username_for_log,
270 ConnectionId
271 );
272
273 Ok(Response::new(air_generated::AuthenticationResponse {
274 request_id,
275 success:true,
276 token,
277 error:String::new(),
278 }))
279 },
280 Err(e) => {
281 self.AppState
282 .UpdateRequestStatus(
283 &request_id,
284 crate::ApplicationState::RequestState::Failed(e.to_string()),
285 None,
286 )
287 .await
288 .ok();
289
290 dev_log!(
292 "grpc",
293 "warn: [AirVinegRPCService] Authentication failed for user: {} [Connection: {}] - {}",
294 username_for_log,
295 ConnectionId,
296 e
297 );
298
299 Ok(Response::new(air_generated::AuthenticationResponse {
300 request_id,
301 success:false,
302 token:String::new(),
303 error:e.to_string(),
304 }))
305 },
306 }
307 }
308
309 async fn check_for_updates(
311 &self,
312 request:Request<UpdateCheckRequest>,
313 ) -> std::result::Result<Response<UpdateCheckResponse>, Status> {
314 let RequestData = request.into_inner();
315 let request_id = RequestData.request_id.clone();
316
317 dev_log!(
318 "grpc",
319 "[AirVinegRPCService] Update check request received [ID: {}] - Version: {}, Channel: {}",
320 request_id,
321 RequestData.current_version,
322 RequestData.channel
323 );
324
325 self.AppState
326 .RegisterRequest(request_id.clone(), "updates".to_string())
327 .await
328 .map_err(|e| Status::internal(e.to_string()))?;
329
330 if RequestData.current_version.is_empty() {
332 let ErrorMessage = crate::AirError::Validation("CurrentVersion cannot be empty".to_string());
333 self.AppState
334 .UpdateRequestStatus(
335 &request_id,
336 crate::ApplicationState::RequestState::Failed(ErrorMessage.to_string()),
337 None,
338 )
339 .await
340 .ok();
341 return Err(Status::invalid_argument(ErrorMessage.to_string()));
342 }
343
344 let ValidChannels = ["stable", "beta", "nightly"];
346 let Channel = if RequestData.channel.is_empty() {
347 "stable".to_string()
348 } else {
349 RequestData.channel.clone()
350 };
351 if !ValidChannels.contains(&Channel.as_str()) {
352 let ErrorMessage = format!("Invalid channel: {}. Valid values are: {}", Channel, ValidChannels.join(", "));
353 self.AppState
354 .UpdateRequestStatus(
355 &request_id,
356 crate::ApplicationState::RequestState::Failed(ErrorMessage.clone()),
357 None,
358 )
359 .await
360 .ok();
361 return Err(Status::invalid_argument(ErrorMessage));
362 }
363
364 let result = self.UpdateManager.CheckForUpdates().await;
366
367 match result {
368 Ok(UpdateInfo) => {
369 self.AppState
370 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
371 .await
372 .ok();
373
374 dev_log!(
375 "grpc",
376 "[AirVinegRPCService] Update check successful - Available: {}",
377 UpdateInfo.is_some()
378 );
379
380 Ok(Response::new(air_generated::UpdateCheckResponse {
381 request_id,
382 update_available:UpdateInfo.is_some(),
383 version:UpdateInfo.as_ref().map(|info| info.version.clone()).unwrap_or_default(),
384 download_url:UpdateInfo.as_ref().map(|info| info.download_url.clone()).unwrap_or_default(),
385 release_notes:UpdateInfo.as_ref().map(|info| info.release_notes.clone()).unwrap_or_default(),
386 error:String::new(),
387 }))
388 },
389 Err(crate::AirError::Network(e)) => {
390 self.AppState
391 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Failed(e.clone()), None)
392 .await
393 .ok();
394 dev_log!("grpc", "error: [AirVinegRPCService] Network error during update check: {}", e);
395 Err(Status::unavailable(e))
396 },
397 Err(e) => {
398 self.AppState
399 .UpdateRequestStatus(
400 &request_id,
401 crate::ApplicationState::RequestState::Failed(e.to_string()),
402 None,
403 )
404 .await
405 .ok();
406 dev_log!("grpc", "error: [AirVinegRPCService] Update check failed: {}", e);
407 Ok(Response::new(air_generated::UpdateCheckResponse {
408 request_id,
409 update_available:false,
410 version:String::new(),
411 download_url:String::new(),
412 release_notes:String::new(),
413 error:e.to_string(),
414 }))
415 },
416 }
417 }
418
419 async fn download_file(
421 &self,
422 request:Request<DownloadRequest>,
423 ) -> std::result::Result<Response<DownloadResponse>, Status> {
424 let RequestData = request.into_inner();
425 let request_id = RequestData.request_id.clone();
426
427 dev_log!(
428 "grpc",
429 "[AirVinegRPCService] Download request received [ID: {}] - URL: {}",
430 request_id,
431 RequestData.url
432 );
433
434 let download_request_id = if request_id.is_empty() {
436 crate::Utility::GenerateRequestId()
437 } else {
438 request_id.clone()
439 };
440
441 self.AppState
442 .RegisterRequest(download_request_id.clone(), "downloader".to_string())
443 .await
444 .map_err(|e| Status::internal(e.to_string()))?;
445
446 if RequestData.url.is_empty() {
448 let error_msg = "URL cannot be empty".to_string();
449 self.AppState
450 .UpdateRequestStatus(
451 &download_request_id,
452 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
453 None,
454 )
455 .await
456 .ok();
457 return Ok(Response::new(DownloadResponse {
458 request_id:download_request_id,
459 success:false,
460 file_path:String::new(),
461 file_size:0,
462 checksum:String::new(),
463 error:error_msg,
464 }));
465 }
466
467 if !match_url_scheme(&RequestData.url) {
469 let error_msg = format!("Invalid URL scheme: {}", RequestData.url);
470 self.AppState
471 .UpdateRequestStatus(
472 &download_request_id,
473 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
474 None,
475 )
476 .await
477 .ok();
478 return Ok(Response::new(DownloadResponse {
479 request_id:download_request_id,
480 success:false,
481 file_path:String::new(),
482 file_size:0,
483 checksum:String::new(),
484 error:error_msg,
485 }));
486 }
487
488 let DestinationPath = if RequestData.destination_path.is_empty() {
490 let config = &self.AppState.Configuration.Downloader;
492 config.CacheDirectory.clone()
493 } else {
494 RequestData.destination_path.clone()
495 };
496
497 let dest_path = std::path::Path::new(&DestinationPath);
499 if let Some(parent) = dest_path.parent() {
500 if !parent.exists() {
501 match tokio::fs::create_dir_all(parent).await {
502 Ok(_) => {
503 dev_log!(
504 "grpc",
505 "[AirVinegRPCService] Created destination directory: {}",
506 parent.display()
507 );
508 },
509 Err(e) => {
510 let error_msg = format!("Failed to create destination directory: {}", e);
511 self.AppState
512 .UpdateRequestStatus(
513 &download_request_id,
514 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
515 None,
516 )
517 .await
518 .ok();
519 return Ok(Response::new(DownloadResponse {
520 request_id:download_request_id,
521 success:false,
522 file_path:String::new(),
523 file_size:0,
524 checksum:String::new(),
525 error:error_msg,
526 }));
527 },
528 }
529 }
530 }
531
532 let _download_manager = self.DownloadManager.clone();
534 let AppState = self.AppState.clone();
535 let callback_request_id = download_request_id.clone();
536 let progress_callback = move |progress:f32| {
537 let state = AppState.clone();
538 let id = callback_request_id.clone();
539 tokio::spawn(async move {
540 let _ = state
541 .UpdateRequestStatus(&id, crate::ApplicationState::RequestState::InProgress, Some(progress))
542 .await;
543 });
544 };
545
546 let result = self
548 .download_file_with_retry(
549 &download_request_id,
550 RequestData.url.clone(),
551 DestinationPath,
552 RequestData.checksum,
553 Some(Box::new(progress_callback)),
554 )
555 .await;
556
557 match result {
558 Ok(file_info) => {
559 self.AppState
560 .UpdateRequestStatus(
561 &download_request_id,
562 crate::ApplicationState::RequestState::Completed,
563 Some(100.0),
564 )
565 .await
566 .ok();
567
568 dev_log!(
569 "grpc",
570 "[AirVinegRPCService] Download completed [ID: {}] - Size: {} bytes",
571 download_request_id,
572 file_info.size
573 );
574
575 Ok(Response::new(DownloadResponse {
576 request_id:download_request_id,
577 success:true,
578 file_path:file_info.path,
579 file_size:file_info.size,
580 checksum:file_info.checksum,
581 error:String::new(),
582 }))
583 },
584 Err(e) => {
585 self.AppState
586 .UpdateRequestStatus(
587 &download_request_id,
588 crate::ApplicationState::RequestState::Failed(e.to_string()),
589 None,
590 )
591 .await
592 .ok();
593
594 dev_log!(
595 "grpc",
596 "error: [AirVinegRPCService] Download failed [ID: {}] - Error: {}",
597 download_request_id,
598 e
599 );
600
601 Ok(Response::new(DownloadResponse {
602 request_id:download_request_id,
603 success:false,
604 file_path:String::new(),
605 file_size:0,
606 checksum:String::new(),
607 error:e.to_string(),
608 }))
609 },
610 }
611 }
612
613 async fn index_files(&self, request:Request<IndexRequest>) -> std::result::Result<Response<IndexResponse>, Status> {
615 let RequestData = request.into_inner();
616 let request_id = RequestData.request_id;
617
618 dev_log!(
619 "grpc",
620 "[AirVinegRPCService] Index request received [ID: {}] - Path: {}",
621 request_id,
622 RequestData.path
623 );
624
625 self.AppState
626 .RegisterRequest(request_id.clone(), "indexing".to_string())
627 .await
628 .map_err(|e| Status::internal(e.to_string()))?;
629
630 let result = self.FileIndexer.IndexDirectory(RequestData.path, RequestData.patterns).await;
631
632 match result {
633 Ok(index_info) => {
634 self.AppState
635 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
636 .await
637 .ok();
638
639 Ok(Response::new(air_generated::IndexResponse {
640 request_id,
641 success:true,
642 files_indexed:index_info.files_indexed,
643 total_size:index_info.total_size,
644 error:String::new(),
645 }))
646 },
647 Err(e) => {
648 self.AppState
649 .UpdateRequestStatus(
650 &request_id,
651 crate::ApplicationState::RequestState::Failed(e.to_string()),
652 None,
653 )
654 .await
655 .ok();
656
657 Ok(Response::new(air_generated::IndexResponse {
658 request_id,
659 success:false,
660 files_indexed:0,
661 total_size:0,
662 error:e.to_string(),
663 }))
664 },
665 }
666 }
667
668 async fn get_status(
670 &self,
671 request:Request<StatusRequest>,
672 ) -> std::result::Result<Response<StatusResponse>, Status> {
673 let _RequestData = request.into_inner();
674
675 dev_log!("grpc", "[AirVinegRPCService] Status request received");
676 let metrics = self.AppState.GetMetrics().await;
677 let resources = self.AppState.GetResourceUsage().await;
678
679 Ok(Response::new(air_generated::StatusResponse {
680 version:crate::VERSION.to_string(),
681 uptime_seconds:metrics.UptimeSeconds,
682 total_requests:metrics.TotalRequest,
683 successful_requests:metrics.SuccessfulRequest,
684 failed_requests:metrics.FailedRequest,
685 average_response_time:metrics.AverageResponseTime,
686 memory_usage_mb:resources.MemoryUsageMb,
687 cpu_usage_percent:resources.CPUUsagePercent,
688 active_requests:self.AppState.GetActiveRequestCount().await as u32,
689 }))
690 }
691
692 async fn health_check(
694 &self,
695 _request:Request<HealthCheckRequest>,
696 ) -> std::result::Result<Response<HealthCheckResponse>, Status> {
697 dev_log!("grpc", "[AirVinegRPCService] Health check request received");
698 Ok(Response::new(air_generated::HealthCheckResponse {
699 healthy:true,
700 timestamp:CurrentTimestamp(),
701 }))
702 }
703
704 async fn download_update(
708 &self,
709 request:Request<DownloadRequest>,
710 ) -> std::result::Result<Response<DownloadResponse>, Status> {
711 let RequestData = request.into_inner();
712 let request_id = RequestData.request_id.clone();
713
714 dev_log!(
715 "grpc",
716 "[AirVinegRPCService] Download update request received [ID: {}] - URL: {}, Destination: {}",
717 request_id,
718 RequestData.url,
719 RequestData.destination_path
720 );
721
722 self.AppState
723 .RegisterRequest(request_id.clone(), "download_update".to_string())
724 .await
725 .map_err(|e| Status::internal(e.to_string()))?;
726
727 if RequestData.url.is_empty() {
729 let error_msg = crate::AirError::Validation("URL cannot be empty".to_string());
730 self.AppState
731 .UpdateRequestStatus(
732 &request_id,
733 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
734 None,
735 )
736 .await
737 .ok();
738 return Err(Status::invalid_argument(error_msg.to_string()));
739 }
740
741 if !RequestData.url.starts_with("http://") && !RequestData.url.starts_with("https://") {
743 let error_msg = crate::AirError::Validation("URL must start with http:// or https://".to_string());
744 self.AppState
745 .UpdateRequestStatus(
746 &request_id,
747 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
748 None,
749 )
750 .await
751 .ok();
752 return Err(Status::invalid_argument(error_msg.to_string()));
753 }
754
755 let destination = if RequestData.destination_path.is_empty() {
757 self.UpdateManager
759 .GetCacheDirectory()
760 .join("update-latest.bin")
761 .to_string_lossy()
762 .to_string()
763 } else {
764 let dest_path = std::path::Path::new(&RequestData.destination_path);
766 if let Some(parent) = dest_path.parent() {
767 if parent.as_os_str().is_empty() {
768 self.UpdateManager
770 .GetCacheDirectory()
771 .join(&RequestData.destination_path)
772 .to_string_lossy()
773 .to_string()
774 } else {
775 RequestData.destination_path.clone()
777 }
778 } else {
779 RequestData.destination_path.clone()
780 }
781 };
782
783 let dest_path = std::path::Path::new(&destination);
785 if let Some(parent) = dest_path.parent() {
786 if !parent.exists() {
787 return Err(Status::failed_precondition(format!(
788 "Destination directory does not exist: {}",
789 parent.display()
790 )));
791 }
792
793 if let Err(e) = std::fs::write(parent.join(".write_test"), "") {
795 let error_msg = crate::AirError::FileSystem(format!("Destination directory not writeable: {}", e));
796 self.AppState
797 .UpdateRequestStatus(
798 &request_id,
799 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
800 None,
801 )
802 .await
803 .ok();
804 return Err(Status::permission_denied(error_msg.to_string()));
805 }
806 let _ = std::fs::remove_file(parent.join(".write_test"));
808 }
809
810 let download_result = self
813 .DownloadManager
814 .DownloadFile(RequestData.url, destination.clone(), RequestData.checksum)
815 .await;
816
817 match download_result {
818 Ok(result) => {
819 self.AppState
820 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
821 .await
822 .ok();
823
824 dev_log!(
825 "grpc",
826 "[AirVinegRPCService] Update downloaded successfully - Path: {}, Size: {}, Checksum: {}",
827 result.path,
828 result.size,
829 result.checksum
830 );
831
832 Ok(Response::new(DownloadResponse {
833 request_id,
834 success:true,
835 file_path:result.path,
836 file_size:result.size,
837 checksum:result.checksum,
838 error:String::new(),
839 }))
840 },
841 Err(crate::AirError::Network(e)) => {
842 self.AppState
843 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Failed(e.clone()), None)
844 .await
845 .ok();
846 dev_log!("grpc", "error: [AirVinegRPCService] Download update network error: {}", e);
847 Err(Status::unavailable(e))
848 },
849 Err(crate::AirError::FileSystem(e)) => {
850 self.AppState
851 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Failed(e.clone()), None)
852 .await
853 .ok();
854 dev_log!("grpc", "error: [AirVinegRPCService] Download update filesystem error: {}", e);
855 Err(Status::internal(e))
856 },
857 Err(e) => {
858 self.AppState
859 .UpdateRequestStatus(
860 &request_id,
861 crate::ApplicationState::RequestState::Failed(e.to_string()),
862 None,
863 )
864 .await
865 .ok();
866 dev_log!("grpc", "error: [AirVinegRPCService] Download update failed: {}", e);
867 Ok(Response::new(DownloadResponse {
868 request_id,
869 success:false,
870 file_path:String::new(),
871 file_size:0,
872 checksum:String::new(),
873 error:e.to_string(),
874 }))
875 },
876 }
877 }
878
879 async fn apply_update(
881 &self,
882 request:Request<ApplyUpdateRequest>,
883 ) -> std::result::Result<Response<ApplyUpdateResponse>, Status> {
884 let RequestData = request.into_inner();
885 let request_id = RequestData.request_id.clone();
886
887 dev_log!(
888 "grpc",
889 "[AirVinegRPCService] Apply update request received [ID: {}] - Version: {}, Path: {}",
890 request_id,
891 RequestData.version,
892 RequestData.update_path
893 );
894
895 self.AppState
896 .RegisterRequest(request_id.clone(), "apply_update".to_string())
897 .await
898 .map_err(|e| Status::internal(e.to_string()))?;
899
900 if RequestData.version.is_empty() {
902 let error_msg = crate::AirError::Validation("version cannot be empty".to_string());
903 self.AppState
904 .UpdateRequestStatus(
905 &request_id,
906 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
907 None,
908 )
909 .await
910 .ok();
911 return Err(Status::invalid_argument(error_msg.to_string()));
912 }
913
914 if RequestData.update_path.is_empty() {
916 let error_msg = crate::AirError::Validation("update_path cannot be empty".to_string());
917 self.AppState
918 .UpdateRequestStatus(
919 &request_id,
920 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
921 None,
922 )
923 .await
924 .ok();
925 return Err(Status::invalid_argument(error_msg.to_string()));
926 }
927
928 let update_path = std::path::Path::new(&RequestData.update_path);
929
930 if !update_path.exists() {
932 let error_msg = crate::AirError::FileSystem(format!("Update file not found: {}", RequestData.update_path));
933 self.AppState
934 .UpdateRequestStatus(
935 &request_id,
936 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
937 None,
938 )
939 .await
940 .ok();
941 return Err(Status::not_found(error_msg.to_string()));
942 }
943
944 let metadata = match std::fs::metadata(update_path) {
946 Ok(m) => m,
947 Err(e) => {
948 let error_msg = crate::AirError::FileSystem(format!("Failed to read update file metadata: {}", e));
949 self.AppState
950 .UpdateRequestStatus(
951 &request_id,
952 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
953 None,
954 )
955 .await
956 .ok();
957 return Err(Status::internal(error_msg.to_string()));
958 },
959 };
960
961 if metadata.len() == 0 {
962 let error_msg = crate::AirError::Validation("Update file is empty".to_string());
963 self.AppState
964 .UpdateRequestStatus(
965 &request_id,
966 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
967 None,
968 )
969 .await
970 .ok();
971 return Err(Status::failed_precondition(error_msg.to_string()));
972 }
973
974 let rollback_backup_path = self.prepare_rollback_backup(&RequestData.version).await;
976 if let Err(ref e) = rollback_backup_path {
977 dev_log!(
978 "grpc",
979 "warn: [AirVinegRPCService] Failed to prepare rollback backup: {}. Proceeding without rollback \
980 capability.",
981 e
982 );
983 }
984
985 match self.UpdateManager.verify_update(&RequestData.update_path, None).await {
987 Ok(true) => {
988 dev_log!(
989 "grpc",
990 "[AirVinegRPCService] Update verification successful, preparing for installation"
991 );
992 self.AppState
993 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
994 .await
995 .ok();
996
997 let AppState = self.AppState.clone();
999 let version = RequestData.version.clone();
1000 let self_clone = self.clone();
1001
1002 tokio::spawn(async move {
1003 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
1004 dev_log!(
1005 "grpc",
1006 "[AirVinegRPCService] Initiating graceful shutdown for update version {}",
1007 version
1008 );
1009
1010 if let Err(e) = AppState.StopAllBackgroundTasks().await {
1012 dev_log!(
1013 "grpc",
1014 "error: [AirVinegRPCService] Failed to initiate graceful shutdown: {}",
1015 e
1016 );
1017 dev_log!(
1019 "grpc",
1020 "warn: [AirVinegRPCService] Rollback initiated due to graceful shutdown failure"
1021 );
1022 if let Err(rollback_error) = self_clone.perform_rollback(&version).await {
1023 dev_log!("grpc", "error: [AirVinegRPCService] Rollback failed: {}", rollback_error);
1024 } else {
1025 dev_log!("grpc", "[AirVinegRPCService] Rollback completed successfully");
1026 }
1027 }
1028 });
1029
1030 Ok(Response::new(ApplyUpdateResponse {
1031 request_id,
1032 success:true,
1033 error:String::new(),
1034 }))
1035 },
1036 Ok(false) => {
1037 let error_msg = "Update verification failed: checksum mismatch".to_string();
1038 self.AppState
1039 .UpdateRequestStatus(
1040 &request_id,
1041 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
1042 None,
1043 )
1044 .await
1045 .ok();
1046 dev_log!("grpc", "error: [AirVinegRPCService] {}", error_msg);
1047 let _ = self.cleanup_rollback_backup(&RequestData.version).await;
1049
1050 Err(Status::failed_precondition(error_msg))
1051 },
1052 Err(crate::AirError::FileSystem(e)) => {
1053 self.AppState
1054 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Failed(e.clone()), None)
1055 .await
1056 .ok();
1057 dev_log!(
1058 "grpc",
1059 "error: [AirVinegRPCService] Update verification filesystem error: {}",
1060 e
1061 );
1062 let _ = self.cleanup_rollback_backup(&RequestData.version).await;
1064
1065 Err(Status::internal(e))
1066 },
1067 Err(e) => {
1068 self.AppState
1069 .UpdateRequestStatus(
1070 &request_id,
1071 crate::ApplicationState::RequestState::Failed(e.to_string()),
1072 None,
1073 )
1074 .await
1075 .ok();
1076 dev_log!("grpc", "error: [AirVinegRPCService] Update verification error: {}", e);
1077 let _ = self.cleanup_rollback_backup(&RequestData.version).await;
1079
1080 Ok(Response::new(ApplyUpdateResponse {
1081 request_id,
1082 success:false,
1083 error:e.to_string(),
1084 }))
1085 },
1086 }
1087 }
1088
1089 type DownloadStreamStream =
1094 tokio_stream::wrappers::ReceiverStream<std::result::Result<air_generated::DownloadStreamResponse, Status>>;
1095
1096 async fn download_stream(
1097 &self,
1098 request:Request<DownloadStreamRequest>,
1099 ) -> std::result::Result<Response<Self::DownloadStreamStream>, Status> {
1100 let RequestData = request.into_inner();
1101 let request_id = RequestData.request_id.clone();
1102
1103 dev_log!(
1104 "grpc",
1105 "[AirVinegRPCService] Download stream request received [ID: {}] - URL: {}",
1106 request_id,
1107 RequestData.url
1108 );
1109
1110 self.AppState
1111 .RegisterRequest(request_id.clone(), "downloader_stream".to_string())
1112 .await
1113 .map_err(|e| Status::internal(e.to_string()))?;
1114
1115 if RequestData.url.is_empty() {
1117 let error_msg = "URL cannot be empty".to_string();
1118 self.AppState
1119 .UpdateRequestStatus(
1120 &request_id,
1121 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
1122 None,
1123 )
1124 .await
1125 .ok();
1126 return Err(Status::invalid_argument(error_msg));
1127 }
1128
1129 if !match_url_scheme(&RequestData.url) {
1131 let error_msg = format!("Invalid URL scheme: {}", RequestData.url);
1132 self.AppState
1133 .UpdateRequestStatus(
1134 &request_id,
1135 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
1136 None,
1137 )
1138 .await
1139 .ok();
1140 return Err(Status::invalid_argument(error_msg));
1141 }
1142
1143 match self.validate_range_support(&RequestData.url).await {
1145 Ok(true) => {
1146 dev_log!("grpc", "[AirVinegRPCService] URL supports range headers");
1147 },
1148 Ok(false) => {
1149 dev_log!(
1150 "grpc",
1151 "warn: [AirVinegRPCService] URL does not support range headers, streaming may be inefficient"
1152 );
1153 },
1154 Err(e) => {
1155 let error_msg = format!("Failed to validate range support: {}", e);
1156 self.AppState
1157 .UpdateRequestStatus(
1158 &request_id,
1159 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
1160 None,
1161 )
1162 .await
1163 .ok();
1164 return Err(Status::internal(error_msg));
1165 },
1166 }
1167
1168 let (tx, rx) = tokio::sync::mpsc::channel(100);
1170
1171 let chunk_size = 8 * 1024 * 1024; let url = RequestData.url.clone();
1176 let headers = RequestData.headers;
1177 let download_request_id = request_id.clone();
1178 let _download_manager = self.DownloadManager.clone();
1179 let AppState = self.AppState.clone();
1180
1181 tokio::spawn(async move {
1183 if tx
1185 .send(Ok(DownloadStreamResponse {
1186 request_id:download_request_id.clone(),
1187 chunk:vec![].into(),
1188 total_size:0,
1189 downloaded:0,
1190 completed:false,
1191 error:String::new(),
1192 }))
1193 .await
1194 .is_err()
1195 {
1196 dev_log!(
1197 "grpc",
1198 "warn: [AirVinegRPCService] Client disconnected before streaming started [ID: {}]",
1199 download_request_id
1200 );
1201 return;
1202 }
1203
1204 let dns_port = Mist::dns_port();
1206 let client_builder_result = crate::HTTP::Client::secured_client_builder(dns_port);
1207
1208 let client_builder = match client_builder_result {
1209 Ok(builder) => builder,
1210 Err(e) => {
1211 let error = format!("Failed to create HTTP client builder: {}", e);
1212 let _ = tx
1213 .send(Ok(DownloadStreamResponse {
1214 request_id:download_request_id.clone(),
1215 chunk:vec![].into(),
1216 total_size:0,
1217 downloaded:0,
1218 completed:false,
1219 error:error.clone(),
1220 }))
1221 .await;
1222 AppState
1223 .UpdateRequestStatus(
1224 &download_request_id,
1225 crate::ApplicationState::RequestState::Failed(error),
1226 None,
1227 )
1228 .await
1229 .ok();
1230 return;
1231 },
1232 };
1233
1234 let client_result = client_builder
1235 .pool_idle_timeout(std::time::Duration::from_secs(60))
1236 .pool_max_idle_per_host(5)
1237 .timeout(std::time::Duration::from_secs(300))
1238 .build();
1239
1240 if client_result.is_err() {
1241 let error = client_result.unwrap_err().to_string();
1242 let _ = tx
1243 .send(Ok(DownloadStreamResponse {
1244 request_id:download_request_id.clone(),
1245 chunk:vec![].into(),
1246 total_size:0,
1247 downloaded:0,
1248 completed:false,
1249 error:error.clone(),
1250 }))
1251 .await;
1252 AppState
1253 .UpdateRequestStatus(
1254 &download_request_id,
1255 crate::ApplicationState::RequestState::Failed(error),
1256 None,
1257 )
1258 .await
1259 .ok();
1260 return;
1261 }
1262
1263 let client:reqwest::Client = match client_result {
1264 Ok(client) => client,
1265 Err(e) => {
1266 let error = format!("Failed to create HTTP client: {}", e);
1267 let _ = tx.send(Err(Status::internal(error.clone())));
1268 AppState
1269 .UpdateRequestStatus(
1270 &download_request_id,
1271 crate::ApplicationState::RequestState::Failed(error),
1272 None,
1273 )
1274 .await
1275 .ok();
1276 return;
1277 },
1278 };
1279
1280 #[allow(unused_assignments)]
1282 let mut total_size:Option<u64> = None;
1283 let mut total_downloaded:u64 = 0;
1284
1285 match client
1286 .get(&url)
1287 .headers({
1288 let mut map = reqwest::header::HeaderMap::new();
1289 for (key, value) in headers {
1290 if let (Ok(header_name), Ok(header_value)) = (
1291 reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1292 reqwest::header::HeaderValue::from_str(&value),
1293 ) {
1294 map.insert(header_name, header_value);
1295 }
1296 }
1297 map
1298 })
1299 .send()
1300 .await
1301 {
1302 Ok(response) => {
1303 if !response.status().is_success() {
1304 let error = format!("Download failed with status: {}", response.status());
1305 let _ = tx
1306 .send(Ok(DownloadStreamResponse {
1307 request_id:download_request_id.clone(),
1308 chunk:vec![].into(),
1309 total_size:0,
1310 downloaded:0,
1311 completed:false,
1312 error:error.clone(),
1313 }))
1314 .await;
1315 AppState
1316 .UpdateRequestStatus(
1317 &download_request_id,
1318 crate::ApplicationState::RequestState::Failed(error),
1319 None,
1320 )
1321 .await
1322 .ok();
1323 return;
1324 }
1325
1326 total_size = Some(response.content_length().unwrap_or(0));
1327 let response_tx = tx.clone();
1328 let response_id = download_request_id.clone();
1329
1330 let mut stream = response.bytes_stream();
1332 let mut buffer = Vec::with_capacity(chunk_size);
1333 let mut last_progress:f32 = 0.0;
1334
1335 while let Some(chunk_result) = TokioStreamExt::next(&mut stream).await {
1336 if AppState.IsRequestCancelled(&download_request_id).await {
1337 dev_log!(
1338 "grpc",
1339 "[AirVinegRPCService] Download cancelled by client [ID: {}]",
1340 download_request_id
1341 );
1342 AppState
1343 .UpdateRequestStatus(
1344 &download_request_id,
1345 crate::ApplicationState::RequestState::Cancelled,
1346 None,
1347 )
1348 .await
1349 .ok();
1350 return;
1351 }
1352
1353 match chunk_result {
1354 Ok(chunk) => {
1355 buffer.extend_from_slice(&chunk);
1356 total_downloaded += chunk.len() as u64;
1357
1358 if buffer.len() >= chunk_size {
1360 let _chunk_checksum = calculate_chunk_checksum(&buffer);
1362
1363 let progress = if let Some(ts) = total_size {
1365 if ts > 0 { (total_downloaded as f32 / ts as f32) * 100.0 } else { 0.0 }
1366 } else {
1367 0.0
1368 };
1369
1370 if progress - last_progress >= 5.0 {
1372 AppState
1373 .UpdateRequestStatus(
1374 &download_request_id,
1375 crate::ApplicationState::RequestState::InProgress,
1376 Some(progress),
1377 )
1378 .await
1379 .ok();
1380 last_progress = progress;
1381 }
1382
1383 if response_tx
1384 .send(Ok(DownloadStreamResponse {
1385 request_id:response_id.clone(),
1386 chunk:buffer.clone().into(),
1387 total_size:total_size.unwrap_or(0),
1388 downloaded:total_downloaded,
1389 completed:false,
1390 error:String::new(),
1391 }))
1392 .await
1393 .is_err()
1394 {
1395 dev_log!(
1396 "grpc",
1397 "warn: [AirVinegRPCService] Client disconnected during streaming [ID: {}]",
1398 download_request_id
1399 );
1400 AppState
1401 .UpdateRequestStatus(
1402 &download_request_id,
1403 crate::ApplicationState::RequestState::Failed(
1404 "Client disconnected".to_string(),
1405 ),
1406 None,
1407 )
1408 .await
1409 .ok();
1410 return;
1411 }
1412
1413 dev_log!(
1414 "grpc",
1415 "[AirVinegRPCService] Sent chunk of {} bytes [ID: {}] - Progress: {:.1}%",
1416 buffer.len(),
1417 download_request_id,
1418 progress
1419 );
1420
1421 buffer.clear();
1422 }
1423 },
1424 Err(e) => {
1425 let error = format!("Download error: {}", e);
1426 dev_log!(
1427 "grpc",
1428 "error: [AirVinegRPCService] Stream download failed [ID: {}]: {}",
1429 download_request_id,
1430 error
1431 );
1432
1433 let _ = response_tx
1434 .send(Ok(DownloadStreamResponse {
1435 request_id:response_id.clone(),
1436 chunk:vec![].into(),
1437 total_size:total_size.unwrap_or(0),
1438 downloaded:total_downloaded,
1439 completed:false,
1440 error:error.clone(),
1441 }))
1442 .await;
1443
1444 AppState
1445 .UpdateRequestStatus(
1446 &download_request_id,
1447 crate::ApplicationState::RequestState::Failed(error),
1448 None,
1449 )
1450 .await
1451 .ok();
1452 return;
1453 },
1454 }
1455 }
1456
1457 if !buffer.is_empty() {
1459 let _chunk_checksum = calculate_chunk_checksum(&buffer);
1460
1461 if tx
1462 .send(Ok(DownloadStreamResponse {
1463 request_id:download_request_id.clone(),
1464 chunk:buffer.into(),
1465 total_size:total_size.unwrap_or(0),
1466 downloaded:total_downloaded,
1467 completed:false,
1468 error:String::new(),
1469 }))
1470 .await
1471 .is_err()
1472 {
1473 dev_log!(
1474 "grpc",
1475 "warn: [AirVinegRPCService] Client disconnected while sending final chunk [ID: {}]",
1476 download_request_id
1477 );
1478 return;
1479 }
1480 }
1481
1482 AppState
1484 .UpdateRequestStatus(
1485 &download_request_id,
1486 crate::ApplicationState::RequestState::Completed,
1487 Some(100.0),
1488 )
1489 .await
1490 .ok();
1491
1492 let _ = tx
1493 .send(Ok(DownloadStreamResponse {
1494 request_id,
1495 chunk:vec![].into(),
1496 total_size:total_size.unwrap_or(0),
1497 downloaded:total_downloaded,
1498 completed:true,
1499 error:String::new(),
1500 }))
1501 .await;
1502
1503 dev_log!(
1504 "grpc",
1505 "[AirVinegRPCService] Stream download completed [ID: {}] - Total: {} bytes",
1506 download_request_id,
1507 total_downloaded
1508 );
1509 },
1510 Err(e) => {
1511 let error = format!("Failed to start streaming download: {}", e);
1512 dev_log!(
1513 "grpc",
1514 "error: [AirVinegRPCService] Stream download error [ID: {}]: {}",
1515 download_request_id,
1516 error
1517 );
1518
1519 let _ = tx
1520 .send(Ok(DownloadStreamResponse {
1521 request_id:download_request_id.clone(),
1522 chunk:vec![].into(),
1523 total_size:0,
1524 downloaded:0,
1525 completed:false,
1526 error:error.clone(),
1527 }))
1528 .await;
1529
1530 AppState
1531 .UpdateRequestStatus(
1532 &download_request_id,
1533 crate::ApplicationState::RequestState::Failed(error),
1534 None,
1535 )
1536 .await
1537 .ok();
1538 },
1539 }
1540 });
1541
1542 Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(rx)))
1543 }
1544
1545 async fn search_files(
1549 &self,
1550 request:Request<SearchRequest>,
1551 ) -> std::result::Result<Response<SearchResponse>, Status> {
1552 let RequestData = request.into_inner();
1553 let request_id = RequestData.request_id.clone();
1554
1555 dev_log!(
1556 "grpc",
1557 "[AirVinegRPCService] Search files request: query='{}' in path='{}'",
1558 RequestData.query,
1559 RequestData.path
1560 );
1561
1562 if RequestData.query.is_empty() {
1564 return Ok(Response::new(SearchResponse {
1565 request_id,
1566 results:vec![],
1567 total_results:0,
1568 error:"Search query cannot be empty".to_string(),
1569 }));
1570 }
1571
1572 let path = if RequestData.path.is_empty() { None } else { Some(RequestData.path.clone()) };
1574 let _search_path = path.as_deref();
1575
1576 match self
1577 .FileIndexer
1578 .SearchFiles(
1579 SearchQuery {
1580 query:RequestData.query.clone(),
1581 mode:SearchMode::Literal,
1582 case_sensitive:false,
1583 whole_word:false,
1584 regex:None,
1585 max_results:RequestData.max_results,
1586 page:1,
1587 },
1588 path,
1589 None,
1590 )
1591 .await
1592 {
1593 Ok(search_results) => {
1594 let mut file_results = Vec::new();
1596 for r in search_results {
1597 let (match_preview, line_number) = if let Some(first_match) = r.matches.first() {
1599 (first_match.line_content.clone(), first_match.line_number)
1600 } else {
1601 (String::new(), 0)
1602 };
1603
1604 let size = if let Ok(Some(metadata)) = self.FileIndexer.GetFileInfo(r.path.clone()).await {
1606 metadata.size
1607 } else if let Ok(file_metadata) = std::fs::metadata(&r.path) {
1608 file_metadata.len()
1609 } else {
1610 0
1611 };
1612
1613 file_results.push(FileResult { path:r.path, size, match_preview, line_number });
1614 }
1615
1616 dev_log!(
1617 "grpc",
1618 "[AirVinegRPCService] Search completed: {} results found",
1619 file_results.len()
1620 );
1621 let result_count = file_results.len();
1622 Ok(Response::new(SearchResponse {
1623 request_id,
1624 results:file_results,
1625 total_results:result_count as u32,
1626 error:String::new(),
1627 }))
1628 },
1629 Err(e) => {
1630 dev_log!("grpc", "error: [AirVinegRPCService] Search failed: {}", e);
1631 Ok(Response::new(SearchResponse {
1632 request_id,
1633 results:vec![],
1634 total_results:0,
1635 error:e.to_string(),
1636 }))
1637 },
1638 }
1639 }
1640
1641 async fn get_file_info(
1643 &self,
1644 request:Request<FileInfoRequest>,
1645 ) -> std::result::Result<Response<FileInfoResponse>, Status> {
1646 let RequestData = request.into_inner();
1647 let request_id = RequestData.request_id.clone();
1648
1649 dev_log!("grpc", "[AirVinegRPCService] Get file info request: {}", RequestData.path);
1650 if RequestData.path.is_empty() {
1652 return Ok(Response::new(FileInfoResponse {
1653 request_id,
1654 exists:false,
1655 size:0,
1656 mime_type:String::new(),
1657 checksum:String::new(),
1658 modified_time:0,
1659 error:"Path cannot be empty".to_string(),
1660 }));
1661 }
1662
1663 use std::path::Path;
1665 let path = Path::new(&RequestData.path);
1666
1667 if !path.exists() {
1668 return Ok(Response::new(FileInfoResponse {
1669 request_id,
1670 exists:false,
1671 size:0,
1672 mime_type:String::new(),
1673 checksum:String::new(),
1674 modified_time:0,
1675 error:String::new(), }));
1677 }
1678
1679 match std::fs::metadata(path) {
1681 Ok(metadata) => {
1682 let modified_time = metadata
1683 .modified()
1684 .ok()
1685 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
1686 .map(|d| d.as_secs())
1687 .unwrap_or(0);
1688
1689 let mime_type = self.detect_mime_type(path);
1691
1692 let checksum = calculate_file_checksum(path).await.unwrap_or_else(|e| {
1694 dev_log!("grpc", "warn: [AirVinegRPCService] Failed to calculate checksum: {}", e);
1695 String::new()
1696 });
1697
1698 Ok(Response::new(FileInfoResponse {
1699 request_id,
1700 exists:true,
1701 size:metadata.len(),
1702 mime_type,
1703 checksum,
1704 modified_time,
1705 error:String::new(),
1706 }))
1707 },
1708 Err(e) => {
1709 dev_log!("grpc", "error: [AirVinegRPCService] Failed to get file metadata: {}", e);
1710 Ok(Response::new(FileInfoResponse {
1711 request_id,
1712 exists:false,
1713 size:0,
1714 mime_type:String::new(),
1715 checksum:String::new(),
1716 modified_time:0,
1717 error:e.to_string(),
1718 }))
1719 },
1720 }
1721 }
1722
1723 async fn get_metrics(
1727 &self,
1728 request:Request<MetricsRequest>,
1729 ) -> std::result::Result<Response<MetricsResponse>, Status> {
1730 let RequestData = request.into_inner();
1731 let request_id = RequestData.request_id.clone();
1732
1733 dev_log!(
1734 "grpc",
1735 "[AirVinegRPCService] Get metrics request: type='{}'",
1736 RequestData.metric_type
1737 );
1738 let metrics = self.AppState.GetMetrics().await;
1739 let mut metrics_map = std::collections::HashMap::new();
1740
1741 if RequestData.metric_type.is_empty() || RequestData.metric_type == "performance" {
1743 metrics_map.insert("uptime_seconds".to_string(), metrics.UptimeSeconds.to_string());
1744 metrics_map.insert("total_requests".to_string(), metrics.TotalRequest.to_string());
1745 metrics_map.insert("successful_requests".to_string(), metrics.SuccessfulRequest.to_string());
1746 metrics_map.insert("failed_requests".to_string(), metrics.FailedRequest.to_string());
1747 metrics_map.insert("average_response_time_ms".to_string(), metrics.AverageResponseTime.to_string());
1748 }
1749
1750 if RequestData.metric_type.is_empty() || RequestData.metric_type == "requests" {
1752 metrics_map.insert(
1753 "ActiveRequests".to_string(),
1754 self.AppState.GetActiveRequestCount().await.to_string(),
1755 );
1756 }
1757
1758 Ok(Response::new(MetricsResponse {
1759 request_id,
1760 metrics:metrics_map,
1761 error:String::new(),
1762 }))
1763 }
1764
1765 async fn get_resource_usage(
1767 &self,
1768 request:Request<ResourceUsageRequest>,
1769 ) -> std::result::Result<Response<ResourceUsageResponse>, Status> {
1770 let RequestData = request.into_inner();
1771 let request_id = RequestData.request_id.clone();
1772
1773 dev_log!("grpc", "[AirVinegRPCService] Get resource usage request");
1774 let resources = self.AppState.GetResourceUsage().await;
1775
1776 Ok(Response::new(ResourceUsageResponse {
1777 request_id,
1778 memory_usage_mb:resources.MemoryUsageMb,
1779 cpu_usage_percent:resources.CPUUsagePercent,
1780 disk_usage_mb:resources.DiskUsageMb,
1781 network_usage_mbps:resources.NetworkUsageMbps,
1782 error:String::new(),
1783 }))
1784 }
1785
1786 async fn set_resource_limits(
1788 &self,
1789 request:Request<ResourceLimitsRequest>,
1790 ) -> std::result::Result<Response<ResourceLimitsResponse>, Status> {
1791 let RequestData = request.into_inner();
1792 let request_id = RequestData.request_id.clone();
1793
1794 dev_log!(
1795 "grpc",
1796 "[AirVinegRPCService] Set resource limits: memory={}MB, cpu={}%, disk={}MB",
1797 RequestData.memory_limit_mb,
1798 RequestData.cpu_limit_percent,
1799 RequestData.disk_limit_mb
1800 );
1801
1802 if RequestData.memory_limit_mb == 0 {
1804 return Ok(Response::new(ResourceLimitsResponse {
1805 request_id,
1806 success:false,
1807 error:"Memory limit must be greater than 0".to_string(),
1808 }));
1809 }
1810
1811 if RequestData.cpu_limit_percent > 100 {
1812 return Ok(Response::new(ResourceLimitsResponse {
1813 request_id,
1814 success:false,
1815 error:"CPU limit cannot exceed 100%".to_string(),
1816 }));
1817 }
1818
1819 let result = self
1821 .AppState
1822 .SetResourceLimits(
1823 Some(RequestData.memory_limit_mb as u64),
1824 Some(RequestData.cpu_limit_percent as f64),
1825 Some(RequestData.disk_limit_mb as u64),
1826 )
1827 .await;
1828
1829 match result {
1830 Ok(_) => {
1831 Ok(Response::new(ResourceLimitsResponse {
1832 request_id,
1833 success:true,
1834 error:String::new(),
1835 }))
1836 },
1837 Err(e) => {
1838 Ok(Response::new(ResourceLimitsResponse {
1839 request_id,
1840 success:false,
1841 error:e.to_string(),
1842 }))
1843 },
1844 }
1845 }
1846
1847 async fn get_configuration(
1851 &self,
1852 request:Request<ConfigurationRequest>,
1853 ) -> std::result::Result<Response<ConfigurationResponse>, Status> {
1854 let RequestData = request.into_inner();
1855 let request_id = RequestData.request_id.clone();
1856
1857 dev_log!(
1858 "grpc",
1859 "[AirVinegRPCService] Get configuration request: section='{}'",
1860 RequestData.section
1861 );
1862
1863 let config = self.AppState.GetConfiguration().await;
1865 let mut config_map = std::collections::HashMap::new();
1866
1867 match RequestData.section.as_str() {
1869 "grpc" => {
1870 config_map.insert("bind_address".to_string(), config.gRPC.BindAddress.clone());
1871 config_map.insert("max_connections".to_string(), config.gRPC.MaxConnections.to_string());
1872 config_map.insert("request_timeout_secs".to_string(), config.gRPC.RequestTimeoutSecs.to_string());
1873 },
1874 "authentication" => {
1875 config_map.insert("enabled".to_string(), config.Authentication.Enabled.to_string());
1876 config_map.insert("credentials_path".to_string(), "***REDACTED***".to_string());
1877 config_map.insert(
1878 "token_expiration_hours".to_string(),
1879 config.Authentication.TokenExpirationHours.to_string(),
1880 );
1881 },
1882 "updates" => {
1883 config_map.insert("enabled".to_string(), config.Updates.Enabled.to_string());
1884 config_map.insert(
1885 "check_interval_hours".to_string(),
1886 config.Updates.CheckIntervalHours.to_string(),
1887 );
1888 config_map.insert("update_server_url".to_string(), config.Updates.UpdateServerUrl.clone());
1889 config_map.insert("auto_download".to_string(), config.Updates.AutoDownload.to_string());
1890 config_map.insert("auto_install".to_string(), config.Updates.AutoInstall.to_string());
1891 },
1892 "downloader" => {
1893 config_map.insert("enabled".to_string(), config.Downloader.Enabled.to_string());
1894 config_map.insert(
1895 "max_concurrent_downloads".to_string(),
1896 config.Downloader.MaxConcurrentDownloads.to_string(),
1897 );
1898 config_map.insert(
1899 "download_timeout_secs".to_string(),
1900 config.Downloader.DownloadTimeoutSecs.to_string(),
1901 );
1902 config_map.insert("max_retries".to_string(), config.Downloader.MaxRetries.to_string());
1903 config_map.insert("cache_directory".to_string(), config.Downloader.CacheDirectory.clone());
1904 },
1905 "indexing" => {
1906 config_map.insert("enabled".to_string(), config.Indexing.Enabled.to_string());
1907 config_map.insert("max_file_size_mb".to_string(), config.Indexing.MaxFileSizeMb.to_string());
1908 config_map.insert("file_types".to_string(), config.Indexing.FileTypes.join(","));
1909 config_map.insert(
1910 "update_interval_minutes".to_string(),
1911 config.Indexing.UpdateIntervalMinutes.to_string(),
1912 );
1913 config_map.insert("index_directory".to_string(), config.Indexing.IndexDirectory.clone());
1914 },
1915 _ => {
1916 config_map.insert("_grpc_enabled".to_string(), "true".to_string());
1918 },
1919 }
1920
1921 Ok(Response::new(ConfigurationResponse {
1922 request_id,
1923 configuration:config_map,
1924 error:String::new(),
1925 }))
1926 }
1927
1928 async fn update_configuration(
1930 &self,
1931 request:Request<UpdateConfigurationRequest>,
1932 ) -> std::result::Result<Response<UpdateConfigurationResponse>, Status> {
1933 let RequestData = request.into_inner();
1934 let request_id = RequestData.request_id.clone();
1935
1936 dev_log!(
1937 "grpc",
1938 "[AirVinegRPCService] Update configuration request: section='{}'",
1939 RequestData.section
1940 );
1941
1942 if !["grpc", "authentication", "updates", "downloader", "indexing", ""].contains(&RequestData.section.as_str())
1944 {
1945 return Ok(Response::new(UpdateConfigurationResponse {
1946 request_id,
1947 success:false,
1948 error:"Invalid configuration section".to_string(),
1949 }));
1950 }
1951
1952 let result = self
1954 .AppState
1955 .UpdateConfiguration(RequestData.section, RequestData.updates)
1956 .await;
1957
1958 match result {
1959 Ok(_) => {
1960 Ok(Response::new(UpdateConfigurationResponse {
1961 request_id,
1962 success:true,
1963 error:String::new(),
1964 }))
1965 },
1966 Err(e) => {
1967 Ok(Response::new(UpdateConfigurationResponse {
1968 request_id,
1969 success:false,
1970 error:e.to_string(),
1971 }))
1972 },
1973 }
1974 }
1975}
1976
1977impl AirVinegRPCService {
1980 fn detect_mime_type(&self, path:&std::path::Path) -> String {
1982 match path.extension().and_then(|e| e.to_str()) {
1983 Some("rs") => "text/x-rust".to_string(),
1984 Some("ts") => "application/typescript".to_string(),
1985 Some("js") => "application/javascript".to_string(),
1986 Some("json") => "application/json".to_string(),
1987 Some("toml") => "application/toml".to_string(),
1988 Some("md") => "text/markdown".to_string(),
1989 Some("txt") => "text/plain".to_string(),
1990 Some("yaml") | Some("yml") => "application/x-yaml".to_string(),
1991 Some("html") => "text/html".to_string(),
1992 Some("css") => "text/css".to_string(),
1993 Some("xml") => "application/xml".to_string(),
1994 Some("png") => "image/png".to_string(),
1995 Some("jpg") | Some("jpeg") => "image/jpeg".to_string(),
1996 Some("gif") => "image/gif".to_string(),
1997 Some("svg") => "image/svg+xml".to_string(),
1998 Some("pdf") => "application/pdf".to_string(),
1999 Some("zip") => "application/zip".to_string(),
2000 Some("tar") | Some("gz") => "application/x-tar".to_string(),
2001 Some("proto") => "application/x-protobuf".to_string(),
2002 _ => "application/octet-stream".to_string(),
2003 }
2004 }
2005
2006 async fn download_file_with_retry(
2009 &self,
2010 request_id:&str,
2011 url:String,
2012 DestinationPath:String,
2013 checksum:String,
2014 progress_callback:Option<Box<dyn Fn(f32) + Send>>,
2015 ) -> Result<crate::Downloader::DownloadResult> {
2016 let config = &self.AppState.Configuration.Downloader;
2017 let mut retries = 0;
2018
2019 loop {
2020 match self
2021 .DownloadManager
2022 .DownloadFile(url.clone(), DestinationPath.clone(), checksum.clone())
2023 .await
2024 {
2025 Ok(file_info) => {
2026 if let Some(ref callback) = progress_callback {
2027 callback(100.0);
2028 }
2029 return Ok(file_info);
2030 },
2031 Err(e) => {
2032 if retries < config.MaxRetries as usize {
2033 retries += 1;
2034 let backoff_secs = 2u64.pow(retries as u32);
2035 dev_log!(
2036 "grpc",
2037 "warn: [AirVinegRPCService] Download failed [ID: {}], retrying (attempt {}/{}): {} - \
2038 Backing off {} seconds",
2039 request_id,
2040 retries,
2041 config.MaxRetries,
2042 e,
2043 backoff_secs
2044 );
2045
2046 if let Some(ref callback) = progress_callback {
2047 let progress = (retries as f32 / config.MaxRetries as f32) * 10.0;
2049 callback(progress);
2050 }
2051
2052 tokio::time::sleep(tokio::time::Duration::from_secs(backoff_secs)).await;
2053 } else {
2054 dev_log!(
2055 "grpc",
2056 "error: [AirVinegRPCService] Download failed after {} retries [ID: {}]: {}",
2057 config.MaxRetries,
2058 request_id,
2059 e
2060 );
2061 return Err(e);
2062 }
2063 },
2064 }
2065 }
2066 }
2067
2068 async fn validate_range_support(&self, url:&str) -> Result<bool> {
2070 let dns_port = Mist::dns_port();
2071 let client = crate::HTTP::Client::secured_client_builder(dns_port)
2072 .map_err(|e| crate::AirError::Network(format!("Failed to create HTTP client builder: {}", e)))?
2073 .timeout(std::time::Duration::from_secs(10))
2074 .build()
2075 .map_err(|e| crate::AirError::Network(format!("Failed to create HTTP client for validation: {}", e)))?;
2076
2077 let response:reqwest::Response = client
2078 .head(url)
2079 .send()
2080 .await
2081 .map_err(|e| crate::AirError::Network(format!("Failed to send HEAD request: {}", e)))?;
2082
2083 let accepts_ranges = response
2085 .headers()
2086 .get("accept-ranges")
2087 .map(|v:&reqwest::header::HeaderValue| v.to_str().unwrap_or("none"))
2088 .unwrap_or("none");
2089
2090 Ok(accepts_ranges == "bytes")
2091 }
2092
2093 async fn prepare_rollback_backup(&self, version:&str) -> Result<()> {
2095 let cache_dir = self.UpdateManager.GetCacheDirectory();
2096 let rollback_dir = cache_dir.join("rollback");
2097
2098 if let Err(e) = tokio::fs::create_dir_all(&rollback_dir).await {
2100 return Err(AirError::FileSystem(format!("Failed to create rollback directory: {}", e)));
2101 }
2102
2103 let backup_file = rollback_dir.join(format!("backup-{}.marker", version));
2105 let marker_content = format!(
2106 "version={}\ntimestamp={}\nrollback_available=true",
2107 version,
2108 chrono::Utc::now().to_rfc3339()
2109 );
2110
2111 if let Err(e) = tokio::fs::write(&backup_file, marker_content).await {
2112 return Err(AirError::FileSystem(format!("Failed to create backup marker: {}", e)));
2113 }
2114
2115 dev_log!(
2116 "grpc",
2117 "[AirVinegRPCService] Rollback backup prepared for version {} at {:?}",
2118 version,
2119 backup_file
2120 );
2121
2122 Ok(())
2123 }
2124
2125 async fn cleanup_rollback_backup(&self, version:&str) -> Result<()> {
2127 let cache_dir = self.UpdateManager.GetCacheDirectory();
2128 let rollback_dir = cache_dir.join("rollback");
2129 let backup_file = rollback_dir.join(format!("backup-{}.marker", version));
2130
2131 if backup_file.exists() {
2132 if let Err(e) = tokio::fs::remove_file(&backup_file).await {
2133 return Err(AirError::FileSystem(format!("Failed to cleanup rollback backup: {}", e)));
2134 }
2135 dev_log!(
2136 "grpc",
2137 "[AirVinegRPCService] Rollback backup cleaned up for version {}",
2138 version
2139 );
2140 }
2141
2142 Ok(())
2143 }
2144
2145 async fn perform_rollback(&self, version:&str) -> Result<()> {
2147 let cache_dir = self.UpdateManager.GetCacheDirectory();
2148 let rollback_dir = cache_dir.join("rollback");
2149 let backup_file = rollback_dir.join(format!("backup-{}.marker", version));
2150
2151 if !backup_file.exists() {
2152 return Err(AirError::FileSystem(format!(
2153 "Rollback backup not found for version {}",
2154 version
2155 )));
2156 }
2157
2158 dev_log!("grpc", "[AirVinegRPCService] Starting rollback for version {}", version);
2159 let marker_content = tokio::fs::read_to_string(&backup_file)
2161 .await
2162 .map_err(|e| format!("Failed to read backup marker: {}", e))?;
2163
2164 let mut timestamp = None;
2166 let mut rollback_available = false;
2167
2168 for line in marker_content.lines() {
2169 if let Some(value) = line.strip_prefix("timestamp=") {
2170 timestamp = Some(value.to_string());
2171 } else if line == "rollback_available=true" {
2172 rollback_available = true;
2173 }
2174 }
2175
2176 if !rollback_available {
2177 return Err(AirError::Validation("Rollback not available for this version".to_string()));
2178 }
2179
2180 dev_log!(
2187 "grpc",
2188 "[AirVinegRPCService] Rollback completed for version {} (backup timestamp: {:?})",
2189 version,
2190 timestamp
2191 );
2192
2193 if let Err(e) = tokio::fs::remove_file(&backup_file).await {
2195 dev_log!(
2196 "grpc",
2197 "warn: [AirVinegRPCService] Failed to cleanup backup marker after rollback: {}",
2198 e
2199 );
2200 }
2201
2202 Ok(())
2203 }
2204}
2205
2206fn match_url_scheme(url:&str) -> bool {
2208 url.to_lowercase().starts_with("http://") || url.to_lowercase().starts_with("https://")
2209}
2210
2211fn calculate_chunk_checksum(chunk:&[u8]) -> String {
2213 use sha2::{Digest, Sha256};
2214 let mut hasher = Sha256::new();
2215 hasher.update(chunk);
2216 format!("{:x}", hasher.finalize())
2217}
2218
2219async fn calculate_file_checksum(path:&std::path::Path) -> Result<String> {
2221 use sha2::{Digest, Sha256};
2222 use tokio::io::AsyncReadExt;
2223
2224 let mut file = tokio::fs::File::open(path)
2225 .await
2226 .map_err(|e| AirError::FileSystem(format!("Failed to open file for checksum: {}", e)))?;
2227
2228 let mut hasher = Sha256::new();
2229 let mut buffer = vec![0u8; 8192];
2230
2231 loop {
2232 let bytes_read = file
2233 .read(&mut buffer)
2234 .await
2235 .map_err(|e| AirError::FileSystem(format!("Failed to read file for checksum: {}", e)))?;
2236
2237 if bytes_read == 0 {
2238 break;
2239 }
2240
2241 hasher.update(&buffer[..bytes_read]);
2242 }
2243
2244 let result = hasher.finalize();
2245 Ok(format!("{:x}", result))
2246}