1use std::{
104 collections::{HashMap, VecDeque},
105 path::{Path, PathBuf},
106 sync::Arc,
107 time::{Duration, Instant},
108};
109
110use serde::{Deserialize, Serialize};
111use tokio::sync::{RwLock, Semaphore};
112
113use crate::{
114 AirError,
115 ApplicationState::ApplicationState,
116 Configuration::ConfigurationManager,
117 Result,
118 Utility,
119 dev_log,
120};
121
122pub struct DownloadManager {
124 AppState:Arc<ApplicationState>,
126
127 ActiveDownloads:Arc<RwLock<HashMap<String, DownloadStatus>>>,
129
130 DownloadQueue:Arc<RwLock<VecDeque<QueuedDownload>>>,
132
133 CacheDirectory:PathBuf,
135
136 client:reqwest::Client,
138
139 ChecksumVerifier:Arc<crate::Security::ChecksumVerifier>,
141
142 BandwidthLimiter:Arc<Semaphore>,
144
145 TokenBucket:Arc<RwLock<TokenBucket>>,
147
148 ConcurrentLimiter:Arc<Semaphore>,
150
151 statistics:Arc<RwLock<DownloadStatistics>>,
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct DownloadStatus {
158 pub DownloadId:String,
159 pub url:String,
160 pub destination:PathBuf,
161 pub TotalSize:u64,
162 pub downloaded:u64,
163 pub progress:f32,
164 pub status:DownloadState,
165 pub error:Option<String>,
166 pub StartedAt:Option<chrono::DateTime<chrono::Utc>>,
167 pub CompletedAt:Option<chrono::DateTime<chrono::Utc>>,
168 pub ChunksCompleted:usize,
169 pub TotalChunks:usize,
170 pub DownloadRateBytesPerSec:u64,
171 pub ExpectedChecksum:Option<String>,
172 pub ActualChecksum:Option<String>,
173}
174
175#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
177pub enum DownloadState {
178 Pending,
179 Queued,
180 Downloading,
181 Verifying,
182 Completed,
183 Failed,
184 Cancelled,
185 Paused,
186 Resuming,
187}
188
189#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
191pub enum DownloadPriority {
192 High = 3,
193 Normal = 2,
194 Low = 1,
195 Background = 0,
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct QueuedDownload {
201 DownloadId:String,
202 url:String,
203 destination:PathBuf,
204 checksum:String,
205 priority:DownloadPriority,
206 AddedAt:chrono::DateTime<chrono::Utc>,
207 MaxFileSize:Option<u64>,
208 ValidateDiskSpace:bool,
209}
210
211#[derive(Debug, Clone)]
213pub struct DownloadResult {
214 pub path:String,
215 pub size:u64,
216 pub checksum:String,
217 pub duration:Duration,
218 pub AverageRate:u64,
219}
220
221#[derive(Debug, Clone, Serialize, Deserialize)]
223pub struct DownloadStatistics {
224 pub TotalDownloads:u64,
225 pub SuccessfulDownloads:u64,
226 pub FailedDownloads:u64,
227 pub CancelledDownloads:u64,
228 pub TotalBytesDownloaded:u64,
229 pub TotalDownloadTimeSecs:f64,
230 pub AverageDownloadRate:f64,
231 pub PeakDownloadRate:u64,
232 pub ActiveDownloads:usize,
233 pub QueuedDownloads:usize,
234}
235
236pub type ProgressCallback = Arc<dyn Fn(DownloadStatus) + Send + Sync>;
238
239#[derive(Debug)]
247struct TokenBucket {
248 tokens:f64,
250
251 capacity:f64,
253
254 refill_rate:f64,
256
257 last_refill:Instant,
259}
260
261impl TokenBucket {
262 fn new(bytes_per_sec:u64, capacity_factor:f64) -> Self {
264 let refill_rate = bytes_per_sec as f64;
265 let capacity = refill_rate * capacity_factor; Self { tokens:capacity, capacity, refill_rate, last_refill:Instant::now() }
268 }
269
270 fn refill(&mut self) {
272 let elapsed = self.last_refill.elapsed().as_secs_f64();
273 if elapsed > 0.0 {
274 let new_tokens = elapsed * self.refill_rate;
275 self.tokens = (self.tokens + new_tokens).min(self.capacity);
276 self.last_refill = Instant::now();
277 }
278 }
279
280 #[allow(dead_code)]
283 fn try_consume(&mut self, bytes:u64) -> u64 {
284 self.refill();
285
286 let bytes = bytes as f64;
287 if self.tokens >= bytes {
288 self.tokens -= bytes;
289 return bytes as u64;
290 }
291
292 let available = self.tokens;
294 self.tokens = 0.0;
295 available as u64
296 }
297
298 async fn consume(&mut self, bytes:u64) -> Result<()> {
300 let bytes_needed = bytes as f64;
301
302 loop {
303 self.refill();
304
305 if self.tokens >= bytes_needed {
306 self.tokens -= bytes_needed;
307 return Ok(());
308 }
309
310 let tokens_needed = bytes_needed - self.tokens;
312 let wait_duration = tokens_needed / self.refill_rate;
313
314 let sleep_duration = Duration::from_secs_f64(wait_duration.min(0.1));
316 tokio::time::sleep(sleep_duration).await;
317 }
318 }
319
320 fn set_rate(&mut self, bytes_per_sec:u64) {
322 self.refill_rate = bytes_per_sec as f64;
323 self.capacity = self.refill_rate * 5.0; }
325}
326
327#[derive(Debug, Clone)]
329pub struct DownloadConfig {
330 pub url:String,
331 pub destination:String,
332 pub checksum:String,
333 pub MaxFileSize:Option<u64>,
334 pub ChunkSize:usize,
335 pub MaxRetries:u32,
336 pub TimeoutSecs:u64,
337 pub priority:DownloadPriority,
338 pub ValidateDiskSpace:bool,
339}
340
341impl Default for DownloadConfig {
342 fn default() -> Self {
343 Self {
344 url:String::new(),
345 destination:String::new(),
346 checksum:String::new(),
347 MaxFileSize:None,
348 ChunkSize:8 * 1024 * 1024, MaxRetries:5,
350 TimeoutSecs:300,
351 priority:DownloadPriority::Normal,
352 ValidateDiskSpace:true,
353 }
354 }
355}
356
357impl DownloadManager {
358 pub async fn new(AppState:Arc<ApplicationState>) -> Result<Self> {
360 let config = &AppState.Configuration.Downloader;
361
362 let CacheDirectory = ConfigurationManager::ExpandPath(&config.CacheDirectory)?;
364
365 let CacheDirectoryClone = CacheDirectory.clone();
367
368 let CacheDirectoryCloneForInit = CacheDirectoryClone.clone();
370
371 tokio::fs::create_dir_all(&CacheDirectory)
373 .await
374 .map_err(|e| AirError::Configuration(format!("Failed to create cache directory: {}", e)))?;
375
376 let dns_port = Mist::dns_port();
378 let client = crate::HTTP::Client::secured_client_builder(dns_port)
379 .map_err(|e| AirError::Network(format!("Failed to create HTTP client: {}", e)))?
380 .timeout(Duration::from_secs(config.DownloadTimeoutSecs))
381 .connect_timeout(Duration::from_secs(30))
382 .pool_idle_timeout(Duration::from_secs(90))
383 .pool_max_idle_per_host(10)
384 .tcp_keepalive(Duration::from_secs(60))
385 .user_agent("Land-AirDownloader/0.1.0")
386 .build()
387 .map_err(|e| AirError::Network(format!("Failed to build HTTP client: {}", e)))?;
388
389 let BandwidthLimiter = Arc::new(Semaphore::new(100));
391
392 let TokenBucket = Arc::new(RwLock::new(TokenBucket::new(100 * 1024 * 1024, 5.0)));
394
395 let ConcurrentLimiter = Arc::new(Semaphore::new(5));
397
398 let manager = Self {
399 AppState,
400 ActiveDownloads:Arc::new(RwLock::new(HashMap::new())),
401 DownloadQueue:Arc::new(RwLock::new(VecDeque::new())),
402 CacheDirectory:CacheDirectoryCloneForInit,
403 client,
404 ChecksumVerifier:Arc::new(crate::Security::ChecksumVerifier::New()),
405 BandwidthLimiter,
406 TokenBucket,
407 ConcurrentLimiter,
408 statistics:Arc::new(RwLock::new(DownloadStatistics::default())),
409 };
410
411 manager
413 .AppState
414 .UpdateServiceStatus("downloader", crate::ApplicationState::ServiceStatus::Running)
415 .await
416 .map_err(|e| AirError::Internal(e.to_string()))?;
417
418 dev_log!(
419 "update",
420 "[DownloadManager] Initialized with cache directory: {}",
421 CacheDirectory.display()
422 );
423
424 Ok(manager)
425 }
426
427 pub async fn DownloadFile(&self, url:String, DestinationPath:String, checksum:String) -> Result<DownloadResult> {
429 self.DownloadFileWithConfig(DownloadConfig { url, destination:DestinationPath, checksum, ..Default::default() })
430 .await
431 }
432
433 pub async fn DownloadFileWithConfig(&self, config:DownloadConfig) -> Result<DownloadResult> {
435 let SanitizedUrl = Self::ValidateAndSanitizeUrl(&config.url)?;
437
438 let DownloadId = Utility::GenerateRequestId();
440
441 dev_log!(
442 "update",
443 "[DownloadManager] Starting download [ID: {}] - URL: {}",
444 DownloadId,
445 SanitizedUrl
446 );
447
448 if SanitizedUrl.is_empty() {
450 return Err(AirError::Network("URL cannot be empty".to_string()));
451 }
452
453 let Destination = if config.destination.is_empty() {
455 let Filename = SanitizedUrl
457 .split('/')
458 .last()
459 .and_then(|s| s.split('?').next())
460 .unwrap_or("download.bin");
461 self.CacheDirectory.join(Filename)
462 } else {
463 ConfigurationManager::ExpandPath(&config.destination)?
464 };
465
466 Utility::ValidateFilePath(
468 Destination
469 .to_str()
470 .ok_or_else(|| AirError::Configuration("Invalid destination path".to_string()))?,
471 )?;
472
473 let ExpectedChecksum = if config.checksum.is_empty() { None } else { Some(config.checksum.clone()) };
475
476 self.RegisterDownload(&DownloadId, &SanitizedUrl, &Destination, ExpectedChecksum.clone())
478 .await?;
479
480 if config.ValidateDiskSpace {
482 if let Some(MaxSize) = config.MaxFileSize {
483 self.ValidateDiskSpace(&SanitizedUrl, &Destination, MaxSize * 2).await?;
484 } else {
485 self.ValidateDiskSpace(&SanitizedUrl, &Destination, 1024 * 1024 * 1024).await?; }
487 }
488
489 if let Some(Parent) = Destination.parent() {
491 tokio::fs::create_dir_all(Parent)
492 .await
493 .map_err(|e| AirError::FileSystem(format!("Failed to create destination directory: {}", e)))?;
494 }
495
496 let StartTime = Instant::now();
497
498 let Result = self.DownloadWithRetry(&DownloadId, &SanitizedUrl, &Destination, &config).await;
500
501 let Duration = StartTime.elapsed();
502
503 match Result {
504 Ok(mut FileInfo) => {
505 FileInfo.duration = Duration;
506
507 self.UpdateStatistics(true, FileInfo.size, Duration).await;
509
510 self.UpdateDownloadStatus(&DownloadId, DownloadState::Completed, Some(100.0), None)
511 .await?;
512
513 dev_log!(
514 "update",
515 "[DownloadManager] Download completed [ID: {}] - Size: {} bytes in {:.2}s ({:.2} MB/s)",
516 DownloadId,
517 FileInfo.size,
518 Duration.as_secs_f64(),
519 FileInfo.size as f64 / 1_048_576.0 / Duration.as_secs_f64()
520 );
521
522 Ok(FileInfo)
523 },
524 Err(E) => {
525 self.UpdateStatistics(false, 0, Duration).await;
527
528 self.UpdateDownloadStatus(&DownloadId, DownloadState::Failed, None, Some(E.to_string()))
529 .await?;
530
531 if Destination.exists() {
533 let _ = tokio::fs::remove_file(&Destination).await;
534 dev_log!(
535 "update",
536 "warn: [DownloadManager] Cleaned up failed download: {}",
537 Destination.display()
538 );
539 }
540
541 dev_log!(
542 "update",
543 "error: [DownloadManager] Download failed [ID: {}] - Error: {}",
544 DownloadId,
545 E
546 );
547 Err(E)
548 },
549 }
550 }
551
552 fn ValidateAndSanitizeUrl(url:&str) -> Result<String> {
554 let url = url.trim();
555
556 if url.is_empty() {
558 return Err(AirError::Network("URL cannot be empty".to_string()));
559 }
560
561 let parsed = url::Url::parse(url).map_err(|e| AirError::Network(format!("Invalid URL format: {}", e)))?;
563
564 match parsed.scheme() {
566 "http" | "https" => (),
567 scheme => {
568 return Err(AirError::Network(format!(
569 "Unsupported URL scheme: '{}'. Only http and https are allowed.",
570 scheme
571 )));
572 },
573 }
574
575 if parsed.host().is_none() {
577 return Err(AirError::Network("URL must have a valid host".to_string()));
578 }
579
580 #[cfg(debug_assertions)]
582 {
583 }
585 #[cfg(not(debug_assertions))]
586 {
587 if let Some(host) = parsed.host_str() {
588 if host == "localhost" || host == "127.0.0.1" || host == "::1" {
589 return Err(AirError::Network("Localhost addresses are not allowed".to_string()));
590 }
591 if host.starts_with("192.168.") || host.starts_with("10.") || host.starts_with("172.16.") {
592 return Err(AirError::Network("Private network addresses are not allowed".to_string()));
593 }
594 }
595 }
596
597 let mut sanitized = parsed.clone();
599
600 if sanitized.password().is_some() {
602 sanitized.set_password(Some("")).ok();
603 }
604
605 Ok(sanitized.to_string())
606 }
607
608 async fn ValidateDiskSpace(&self, url:&str, destination:&Path, RequiredBytes:u64) -> Result<()> {
610 let DestPath = if destination.is_absolute() {
612 destination.to_path_buf()
613 } else {
614 std::env::current_dir()
615 .map_err(|e| AirError::FileSystem(format!("Failed to get current directory: {}", e)))?
616 .join(destination)
617 };
618
619 let MountPoint = self.FindMountPoint(&DestPath)?;
621
622 dev_log!(
624 "update",
625 "[DownloadManager] Validating disk space for URL {} (requires {} bytes) on mount point: {}",
626 url,
627 RequiredBytes,
628 MountPoint.display()
629 );
630
631 #[cfg(unix)]
632 {
633 match self.GetDiskStatvfs(&MountPoint) {
634 Ok((AvailableBytes, TotalBytes)) => {
635 if AvailableBytes < RequiredBytes {
636 dev_log!(
637 "update",
638 "warn: [DownloadManager] Insufficient disk space: {} bytes available, {} bytes required",
639 AvailableBytes,
640 RequiredBytes
641 );
642 return Err(AirError::FileSystem(format!(
643 "Insufficient disk space: {} bytes available, {} bytes required",
644 AvailableBytes, RequiredBytes
645 )));
646 }
647
648 dev_log!(
649 "update",
650 "[DownloadManager] Sufficient disk space: {} bytes available, {} bytes required (total: {})",
651 AvailableBytes,
652 RequiredBytes,
653 TotalBytes
654 );
655 },
656 Err(e) => {
657 dev_log!(
658 "update",
659 "warn: [DownloadManager] Failed to check disk space: {}, proceeding anyway",
660 e
661 );
662 },
663 }
664 }
665
666 #[cfg(windows)]
667 {
668 match self.GetDiskSpaceWindows(&MountPoint) {
669 Ok(AvailableBytes) => {
670 if AvailableBytes < RequiredBytes {
671 dev_log!(
672 "update",
673 "warn: [DownloadManager] Insufficient disk space: {} bytes available, {} bytes required",
674 AvailableBytes,
675 RequiredBytes
676 );
677 return Err(AirError::FileSystem(format!(
678 "Insufficient disk space: {} bytes available, {} bytes required",
679 available_bytes, RequiredBytes
680 )));
681 }
682 dev_log!(
683 "update",
684 "[DownloadManager] Sufficient disk space: {} bytes available, {} bytes required",
685 available_bytes,
686 RequiredBytes
687 );
688 },
689 Err(e) => {
690 dev_log!(
691 "update",
692 "warn: [DownloadManager] Failed to check disk space: {}, proceeding anyway",
693 e
694 );
695 },
696 }
697 }
698
699 #[cfg(not(any(unix, windows)))]
700 {
701 dev_log!(
702 "update",
703 "warn: [DownloadManager] Disk space validation not available on this platform"
704 );
705 }
706
707 Ok(())
708 }
709
710 #[cfg(unix)]
712 fn GetDiskStatvfs(&self, path:&Path) -> Result<(u64, u64)> {
713 use std::{ffi::CString, os::unix::ffi::OsStrExt};
714
715 dev_log!("update", "[DownloadManager] Checking disk space at: {}", path.display());
716 let path_cstr = CString::new(path.as_os_str().as_bytes())
718 .map_err(|e| AirError::FileSystem(format!("Failed to convert path to C string: {}", e)))?;
719
720 let mut stat:libc::statvfs = unsafe { std::mem::zeroed() };
722 let result = unsafe { libc::statvfs(path_cstr.as_ptr(), &mut stat) };
723
724 if result != 0 {
725 let err = std::io::Error::last_os_error();
726 return Err(AirError::FileSystem(format!("Failed to get disk stats: {}", err)));
727 }
728
729 let fragment_size = stat.f_frsize as u64;
731 let available_bytes = fragment_size * stat.f_bavail as u64;
732 let total_bytes = fragment_size * stat.f_blocks as u64;
733
734 dev_log!(
735 "update",
736 "[DownloadManager] Disk space at {}: {} bytes available, {} bytes total",
737 path.display(),
738 available_bytes,
739 total_bytes
740 );
741
742 Ok((available_bytes, total_bytes))
743 }
744
745 #[cfg(windows)]
747 fn GetDiskSpaceWindows(&self, path:&Path) -> Result<u64> {
748 use std::os::windows::ffi::OsStrExt;
749
750 use windows::Win32::Storage::FileSystem::GetDiskFreeSpaceExW;
751
752 dev_log!("update", "[DownloadManager] Checking disk space at: {}", path.display());
753 let path_str:Vec<u16> = path.as_os_str().encode_wide().chain(std::iter::once(0)).collect();
755
756 let mut free_bytes_available:u64 = 0;
757 let mut total_bytes:u64 = 0;
758 let mut total_free_bytes:u64 = 0;
759
760 let result = unsafe {
761 GetDiskFreeSpaceExW(
762 windows::core::PCWSTR(path_str.as_ptr()),
763 &mut free_bytes_available as *mut _ as _,
764 &mut total_bytes as *mut _ as _,
765 &mut total_free_bytes as *mut _ as _,
766 )
767 };
768
769 if !result.as_bool() {
770 let err = std::io::Error::last_os_error();
771 return Err(AirError::FileSystem(format!("Failed to get disk space: {}", err)));
772 }
773
774 dev_log!(
775 "update",
776 "[DownloadManager] Disk space at {}: {} bytes available, {} bytes total",
777 path.display(),
778 free_bytes_available,
779 total_bytes
780 );
781
782 Ok(free_bytes_available)
783 }
784
785 fn FindMountPoint(&self, path:&Path) -> Result<PathBuf> {
787 #[cfg(unix)]
788 {
789 let mut current = path
790 .canonicalize()
791 .map_err(|e| AirError::FileSystem(format!("Failed to canonicalize path: {}", e)))?;
792
793 loop {
794 if current.as_os_str().is_empty() || current == Path::new("/") {
795 return Ok(PathBuf::from("/"));
796 }
797
798 let metadata = std::fs::metadata(¤t)
799 .map_err(|e| AirError::FileSystem(format!("Failed to get metadata: {}", e)))?;
800
801 #[cfg(unix)]
803 let CurrentDevice = {
804 use std::os::unix::fs::MetadataExt;
805 metadata.dev()
806 };
807 #[cfg(not(unix))]
808 let CurrentDevice = 0u64; let parent = current.parent();
811
812 if let Some(parent_path) = parent {
813 let ParentMetadata = std::fs::metadata(parent_path)
814 .map_err(|e| AirError::FileSystem(format!("Failed to get parent metadata: {}", e)))?;
815
816 #[cfg(unix)]
817 let ParentDevice = {
818 use std::os::unix::fs::MetadataExt;
819 ParentMetadata.dev()
820 };
821 #[cfg(not(unix))]
822 let ParentDevice = 0u64; if ParentDevice != CurrentDevice {
825 return Ok(current);
826 }
827 } else {
828 return Ok(current);
829 }
830
831 current.pop();
832 }
833 }
834
835 #[cfg(windows)]
836 {
837 let PathStr = path.to_string_lossy();
839 if PathStr.len() >= 3 && PathStr.chars().nth(1) == Some(':') {
840 return Ok(PathBuf::from(&PathStr[..3]));
841 }
842 Ok(PathBuf::from("C:\\"))
843 }
844
845 #[cfg(not(any(unix, windows)))]
846 {
847 Ok(path.to_path_buf())
848 }
849 }
850
851 async fn DownloadWithRetry(
853 &self,
854 DownloadId:&str,
855 url:&str,
856 destination:&PathBuf,
857 config:&DownloadConfig,
858 ) -> Result<DownloadResult> {
859 let RetryPolicy = crate::Resilience::RetryPolicy {
860 MaxRetries:config.MaxRetries,
861 InitialIntervalMs:1000,
862 MaxIntervalMs:32000,
863 BackoffMultiplier:2.0,
864 JitterFactor:0.1,
865 BudgetPerMinute:100,
866 ErrorClassification:std::collections::HashMap::new(),
867 };
868
869 let RetryManager = crate::Resilience::RetryManager::new(RetryPolicy.clone());
870 let CircuitBreaker = crate::Resilience::CircuitBreaker::new(
871 "downloader".to_string(),
872 crate::Resilience::CircuitBreakerConfig::default(),
873 );
874
875 let mut attempt = 0;
876
877 loop {
878 if CircuitBreaker.GetState().await == crate::Resilience::CircuitState::Open {
880 if !CircuitBreaker.AttemptRecovery().await {
881 return Err(AirError::Network(
882 "Circuit breaker is open, too many recent failures".to_string(),
883 ));
884 }
885 }
886
887 if let Some(status) = self.GetDownloadStatus(DownloadId).await {
889 if status.status == DownloadState::Cancelled {
890 return Err(AirError::Network("Download cancelled".to_string()));
891 }
892 }
893
894 match self.PerformDownload(DownloadId, url, destination, config).await {
895 Ok(file_info) => {
896 if let Some(ref ExpectedChecksum) = ExpectedChecksumFromConfig(config) {
898 self.UpdateDownloadStatus(DownloadId, DownloadState::Verifying, Some(100.0), None)
899 .await?;
900
901 if let Err(e) = self.VerifyChecksum(destination, ExpectedChecksum).await {
902 dev_log!(
903 "update",
904 "warn: [DownloadManager] Checksum verification failed [ID: {}]: {}",
905 DownloadId,
906 e
907 );
908 CircuitBreaker.RecordFailure().await;
909
910 if attempt < config.MaxRetries && RetryManager.CanRetry("downloader").await {
911 attempt += 1;
912 let delay = RetryManager.CalculateRetryDelay(attempt);
913 dev_log!(
914 "update",
915 "[DownloadManager] Retrying download [ID: {}] (attempt {}/{}) after {:?}",
916 DownloadId,
917 attempt + 1,
918 config.MaxRetries + 1,
919 delay
920 );
921 tokio::time::sleep(delay).await;
922 continue;
923 } else {
924 return Err(AirError::Network(format!(
925 "Checksum verification failed after {} retries: {}",
926 attempt, e
927 )));
928 }
929 }
930 }
931
932 CircuitBreaker.RecordSuccess().await;
933 return Ok(file_info);
934 },
935 Err(e) => {
936 CircuitBreaker.RecordFailure().await;
937
938 if attempt < config.MaxRetries && RetryManager.CanRetry("downloader").await {
939 attempt += 1;
940 dev_log!(
941 "update",
942 "warn: [DownloadManager] Download failed [ID: {}], retrying (attempt {}/{}): {}",
943 DownloadId,
944 attempt + 1,
945 config.MaxRetries + 1,
946 e
947 );
948
949 let delay = RetryManager.CalculateRetryDelay(attempt);
950 tokio::time::sleep(delay).await;
951 } else {
952 return Err(e);
953 }
954 },
955 }
956 }
957 }
958
959 async fn PerformDownload(
961 &self,
962 DownloadId:&str,
963 url:&str,
964 destination:&PathBuf,
965 config:&DownloadConfig,
966 ) -> Result<DownloadResult> {
967 let _concurrent_permit = self
969 .ConcurrentLimiter
970 .acquire()
971 .await
972 .map_err(|e| AirError::Internal(format!("Failed to acquire download permit: {}", e)))?;
973
974 self.UpdateDownloadStatus(DownloadId, DownloadState::Downloading, Some(0.0), None)
975 .await?;
976
977 let TempDestination = destination.with_extension("tmp");
979
980 let mut ExistingSize:u64 = 0;
982 if TempDestination.exists() {
983 if let Ok(metadata) = tokio::fs::metadata(&TempDestination).await {
984 ExistingSize = metadata.len();
985 dev_log!("update", "[DownloadManager] Resuming download from {} bytes", ExistingSize);
986 }
987 }
988
989 let mut req = self.client.get(url).timeout(Duration::from_secs(config.TimeoutSecs));
991 if ExistingSize > 0 {
992 let RangeHeader = format!("bytes={}-", ExistingSize);
993 req = req.header(reqwest::header::RANGE, RangeHeader);
994 req = req.header(reqwest::header::IF_MATCH, "*"); }
996
997 let response = req
998 .send()
999 .await
1000 .map_err(|e| AirError::Network(format!("Failed to start download: {}", e)))?;
1001
1002 let FinalUrl = response.url().clone();
1004 let response = if FinalUrl.as_str() != url {
1005 dev_log!("update", "[DownloadManager] Redirected to: {}", FinalUrl);
1006 response
1007 } else {
1008 response
1009 };
1010
1011 let StatusCode = response.status();
1013 if !StatusCode.is_success() && StatusCode != reqwest::StatusCode::PARTIAL_CONTENT {
1014 return Err(AirError::Network(format!("Download failed with status: {}", StatusCode)));
1015 }
1016
1017 let TotalSize = if let Some(cl) = response.content_length() {
1019 if StatusCode == reqwest::StatusCode::PARTIAL_CONTENT {
1020 cl + ExistingSize
1021 } else {
1022 cl
1023 }
1024 } else {
1025 0
1026 };
1027
1028 if let Some(max_size) = config.MaxFileSize {
1030 if TotalSize > 0 && TotalSize > max_size {
1031 return Err(AirError::Network(format!(
1032 "File too large: {} bytes exceeds maximum allowed size: {} bytes",
1033 TotalSize, max_size
1034 )));
1035 }
1036 }
1037
1038 let mut file = tokio::fs::OpenOptions::new()
1040 .create(true)
1041 .append(true)
1042 .open(&TempDestination)
1043 .await
1044 .map_err(|e| AirError::FileSystem(format!("Failed to open destination file: {}", e)))?;
1045
1046 use tokio::io::AsyncWriteExt;
1047 use futures_util::StreamExt;
1048
1049 let mut downloaded = ExistingSize;
1050 let mut LastProgressUpdate = Instant::now();
1051 let BytesStream = response.bytes_stream();
1052
1053 tokio::pin!(BytesStream);
1054
1055 while let Some(result) = BytesStream.next().await {
1056 if let Some(status) = self.GetDownloadStatus(DownloadId).await {
1058 match status.status {
1059 DownloadState::Cancelled => {
1060 let _ = tokio::fs::remove_file(&TempDestination).await;
1062 return Err(AirError::Network("Download cancelled".to_string()));
1063 },
1064 DownloadState::Paused => {
1065 loop {
1067 tokio::time::sleep(Duration::from_millis(250)).await;
1068 if let Some(s) = self.GetDownloadStatus(DownloadId).await {
1069 match s.status {
1070 DownloadState::Paused => continue,
1071 DownloadState::Cancelled => {
1072 let _ = tokio::fs::remove_file(&TempDestination).await;
1073 return Err(AirError::Network("Download cancelled".to_string()));
1074 },
1075 _ => {
1076 dev_log!(
1077 "update",
1078 "[DownloadManager] Resuming paused download [ID: {}]",
1079 DownloadId
1080 );
1081 break;
1082 },
1083 }
1084 } else {
1085 break;
1086 }
1087 }
1088 },
1089 _ => {},
1090 }
1091 }
1092
1093 match result {
1094 Ok(chunk) => {
1095 let ChunkSize = chunk.len();
1097 {
1098 let mut bucket = self.TokenBucket.write().await;
1099 if let Err(e) = bucket.consume(ChunkSize as u64).await {
1100 dev_log!(
1101 "update",
1102 "warn: [DownloadManager] Bandwidth throttling error: {}, continuing anyway",
1103 e
1104 );
1105 }
1106 }
1107
1108 file.write_all(&chunk)
1109 .await
1110 .map_err(|e| AirError::FileSystem(format!("Failed to write file: {}", e)))?;
1111
1112 downloaded += ChunkSize as u64;
1113
1114 if LastProgressUpdate.elapsed() > Duration::from_millis(500) {
1116 LastProgressUpdate = Instant::now();
1117
1118 if TotalSize > 0 {
1119 let progress = (downloaded as f32 / TotalSize as f32) * 100.0;
1120 self.UpdateDownloadStatus(DownloadId, DownloadState::Downloading, Some(progress), None)
1121 .await?;
1122 }
1123
1124 let rate = self.CalculateDownloadRate(DownloadId, downloaded).await;
1126 self.UpdateDownloadRate(DownloadId, rate).await;
1127 }
1128 },
1129 Err(e) => {
1130 if e.is_timeout() || e.is_connect() {
1132 dev_log!("update", "warn: [DownloadManager] Connection/timeout error, may retry: {}", e);
1133 return Err(AirError::Network(format!("Network error: {}", e)));
1134 }
1135 return Err(AirError::Network(format!("Failed to read response: {}", e)));
1136 },
1137 }
1138 }
1139
1140 self.UpdateDownloadStatus(DownloadId, DownloadState::Downloading, Some(100.0), None)
1142 .await?;
1143
1144 file.flush()
1146 .await
1147 .map_err(|e| AirError::FileSystem(format!("Failed to flush file: {}", e)))?;
1148
1149 tokio::fs::rename(&TempDestination, destination)
1151 .await
1152 .map_err(|e| AirError::FileSystem(format!("Failed to commit download: {}", e)))?;
1153
1154 let checksum = self.CalculateChecksum(destination).await?;
1156
1157 self.UpdateActualChecksum(DownloadId, &checksum).await;
1159
1160 Ok(DownloadResult {
1161 path:destination.to_string_lossy().to_string(),
1162 size:downloaded,
1163 checksum,
1164 duration:Duration::from_secs(0),
1165 AverageRate:0,
1166 })
1167 }
1168
1169 pub async fn VerifyChecksum(&self, FilePath:&PathBuf, ExpectedChecksum:&str) -> Result<()> {
1171 if !FilePath.exists() {
1173 return Err(AirError::FileSystem(format!(
1174 "File not found for checksum verification: {}",
1175 FilePath.display()
1176 )));
1177 }
1178
1179 let ActualChecksum = self.ChecksumVerifier.CalculateSha256(FilePath).await?;
1180
1181 let NormalizedExpected = ExpectedChecksum.trim().to_lowercase().replace("sha256:", "");
1183 let NormalizedActual = ActualChecksum.trim().to_lowercase();
1184
1185 if NormalizedActual != NormalizedExpected {
1186 dev_log!(
1187 "update",
1188 "error: [DownloadManager] Checksum mismatch for {}: expected {}, got {}",
1189 FilePath.display(),
1190 NormalizedExpected,
1191 NormalizedActual
1192 );
1193 return Err(AirError::Network(format!(
1194 "Checksum verification failed: expected {}, got {}",
1195 NormalizedExpected, NormalizedActual
1196 )));
1197 }
1198
1199 dev_log!("update", "[DownloadManager] Checksum verified for file: {}", FilePath.display());
1200 Ok(())
1201 }
1202
1203 pub async fn CalculateChecksum(&self, FilePath:&PathBuf) -> Result<String> {
1205 if !FilePath.exists() {
1207 return Err(AirError::FileSystem(format!(
1208 "File not found for checksum calculation: {}",
1209 FilePath.display()
1210 )));
1211 }
1212
1213 self.ChecksumVerifier.CalculateSha256(FilePath).await
1214 }
1215
1216 async fn RegisterDownload(
1218 &self,
1219 DownloadId:&str,
1220 url:&str,
1221 destination:&PathBuf,
1222 ExpectedChecksum:Option<String>,
1223 ) -> Result<()> {
1224 let mut downloads = self.ActiveDownloads.write().await;
1225 let mut stats = self.statistics.write().await;
1226
1227 stats.ActiveDownloads += 1;
1228
1229 downloads.insert(
1230 DownloadId.to_string(),
1231 DownloadStatus {
1232 DownloadId:DownloadId.to_string(),
1233 url:url.to_string(),
1234 destination:destination.clone(),
1235 TotalSize:0,
1236 downloaded:0,
1237 progress:0.0,
1238 status:DownloadState::Pending,
1239 error:None,
1240 StartedAt:Some(chrono::Utc::now()),
1241 CompletedAt:None,
1242 ChunksCompleted:0,
1243 TotalChunks:1,
1244 DownloadRateBytesPerSec:0,
1245 ExpectedChecksum:ExpectedChecksum.clone(),
1246 ActualChecksum:None,
1247 },
1248 );
1249
1250 Ok(())
1251 }
1252
1253 async fn UpdateDownloadStatus(
1255 &self,
1256 DownloadId:&str,
1257 status:DownloadState,
1258 progress:Option<f32>,
1259 error:Option<String>,
1260 ) -> Result<()> {
1261 let mut downloads = self.ActiveDownloads.write().await;
1262
1263 if let Some(download) = downloads.get_mut(DownloadId) {
1264 if status == DownloadState::Completed || status == DownloadState::Failed {
1265 download.CompletedAt = Some(chrono::Utc::now());
1266 }
1267 download.status = status;
1268 if let Some(progress) = progress {
1269 download.progress = progress;
1270 }
1271 download.error = error;
1272 }
1273
1274 Ok(())
1275 }
1276
1277 async fn UpdateDownloadRate(&self, DownloadId:&str, rate:u64) {
1279 let mut downloads = self.ActiveDownloads.write().await;
1280 if let Some(download) = downloads.get_mut(DownloadId) {
1281 download.DownloadRateBytesPerSec = rate;
1282 }
1283 }
1284
1285 async fn UpdateActualChecksum(&self, DownloadId:&str, checksum:&str) {
1287 let mut downloads = self.ActiveDownloads.write().await;
1288 if let Some(download) = downloads.get_mut(DownloadId) {
1289 download.ActualChecksum = Some(checksum.to_string());
1290 }
1291 }
1292
1293 async fn CalculateDownloadRate(&self, DownloadId:&str, CurrentBytes:u64) -> u64 {
1295 let downloads = self.ActiveDownloads.read().await;
1296 if let Some(download) = downloads.get(DownloadId) {
1297 if let Some(StartedAt) = download.StartedAt {
1298 let elapsed = chrono::Utc::now().signed_duration_since(StartedAt);
1299 let ElapsedSecs = elapsed.num_seconds() as u64;
1300 if ElapsedSecs > 0 {
1301 return CurrentBytes / ElapsedSecs;
1302 }
1303 }
1304 }
1305 0
1306 }
1307
1308 async fn UpdateStatistics(&self, success:bool, bytes:u64, duration:Duration) {
1310 let mut stats = self.statistics.write().await;
1311
1312 if success {
1313 stats.SuccessfulDownloads += 1;
1314 stats.TotalBytesDownloaded += bytes;
1315 stats.TotalDownloadTimeSecs += duration.as_secs_f64();
1316
1317 if stats.TotalDownloadTimeSecs > 0.0 {
1318 stats.AverageDownloadRate = stats.TotalBytesDownloaded as f64 / stats.TotalDownloadTimeSecs
1319 }
1320
1321 let CurrentRate = if duration.as_secs_f64() > 0.0 {
1323 (bytes as f64 / duration.as_secs_f64()) as u64
1324 } else {
1325 0
1326 };
1327 if CurrentRate > stats.PeakDownloadRate {
1328 stats.PeakDownloadRate = CurrentRate;
1329 }
1330 } else {
1331 stats.FailedDownloads += 1;
1332 }
1333
1334 stats.TotalDownloads += 1;
1335 stats.ActiveDownloads = stats.ActiveDownloads.saturating_sub(1);
1336 }
1337
1338 pub async fn GetDownloadStatus(&self, DownloadId:&str) -> Option<DownloadStatus> {
1340 let downloads = self.ActiveDownloads.read().await;
1341 downloads.get(DownloadId).cloned()
1342 }
1343
1344 pub async fn GetAllDownloads(&self) -> Vec<DownloadStatus> {
1346 let downloads = self.ActiveDownloads.read().await;
1347 downloads.values().cloned().collect()
1348 }
1349
1350 pub async fn CancelDownload(&self, DownloadId:&str) -> Result<()> {
1352 dev_log!("update", "[DownloadManager] Cancelling download [ID: {}]", DownloadId);
1353 self.UpdateDownloadStatus(DownloadId, DownloadState::Cancelled, None, None)
1354 .await?;
1355
1356 if let Some(status) = self.GetDownloadStatus(DownloadId).await {
1358 let TempPath = status.destination.with_extension("tmp");
1359 if TempPath.exists() {
1360 let _ = tokio::fs::remove_file(&TempPath).await;
1361 }
1362 }
1363
1364 {
1366 let mut stats = self.statistics.write().await;
1367 stats.CancelledDownloads += 1;
1368 stats.ActiveDownloads = stats.ActiveDownloads.saturating_sub(1);
1369 }
1370
1371 Ok(())
1372 }
1373
1374 pub async fn PauseDownload(&self, DownloadId:&str) -> Result<()> {
1376 self.UpdateDownloadStatus(DownloadId, DownloadState::Paused, None, None).await?;
1377 dev_log!("update", "[DownloadManager] Download paused [ID: {}]", DownloadId);
1378 Ok(())
1379 }
1380
1381 pub async fn ResumeDownload(&self, DownloadId:&str) -> Result<()> {
1383 if let Some(status) = self.GetDownloadStatus(DownloadId).await {
1384 if status.status == DownloadState::Paused {
1385 self.UpdateDownloadStatus(DownloadId, DownloadState::Resuming, None, None)
1386 .await?;
1387 self.UpdateDownloadStatus(DownloadId, DownloadState::Downloading, None, None)
1389 .await?;
1390 dev_log!("update", "[DownloadManager] Download resumed [ID: {}]", DownloadId);
1391 } else {
1392 return Err(AirError::Network("Can only resume paused downloads".to_string()));
1393 }
1394 } else {
1395 return Err(AirError::Network("Download not found".to_string()));
1396 }
1397 Ok(())
1398 }
1399
1400 pub async fn GetActiveDownloadCount(&self) -> usize {
1402 let downloads = self.ActiveDownloads.read().await;
1403 downloads
1404 .iter()
1405 .filter(|(_, s)| {
1406 matches!(
1407 s.status,
1408 DownloadState::Downloading | DownloadState::Verifying | DownloadState::Resuming
1409 )
1410 })
1411 .count()
1412 }
1413
1414 pub async fn GetStatistics(&self) -> DownloadStatistics {
1416 let stats = self.statistics.read().await;
1417 stats.clone()
1418 }
1419
1420 pub async fn QueueDownload(
1422 &self,
1423 url:String,
1424 destination:String,
1425 checksum:String,
1426 priority:DownloadPriority,
1427 ) -> Result<String> {
1428 let DownloadId = Utility::GenerateRequestId();
1429
1430 let destination = if destination.is_empty() {
1431 let filename = url.split('/').last().unwrap_or("download.bin");
1432 self.CacheDirectory.join(filename)
1433 } else {
1434 ConfigurationManager::ExpandPath(&destination)?
1435 };
1436
1437 let queued_download = QueuedDownload {
1438 DownloadId:DownloadId.clone(),
1439 url,
1440 destination,
1441 checksum,
1442 priority,
1443 AddedAt:chrono::Utc::now(),
1444 MaxFileSize:None,
1445 ValidateDiskSpace:true,
1446 };
1447
1448 let mut queue = self.DownloadQueue.write().await;
1449 queue.push_back(queued_download);
1450
1451 queue.make_contiguous().sort_by(|a, b| {
1453 match b.priority.cmp(&a.priority) {
1454 std::cmp::Ordering::Equal => {
1455 a.AddedAt.cmp(&b.AddedAt)
1457 },
1458 order => order,
1459 }
1460 });
1461
1462 {
1463 let mut stats = self.statistics.write().await;
1464 stats.QueuedDownloads += 1;
1465 }
1466
1467 dev_log!(
1468 "update",
1469 "[DownloadManager] Download queued [ID: {}] with priority {:?}",
1470 DownloadId,
1471 priority
1472 );
1473
1474 Ok(DownloadId)
1475 }
1476
1477 pub async fn ProcessQueue(&self) -> Result<Option<String>> {
1479 let mut queue = self.DownloadQueue.write().await;
1480
1481 if let Some(queued) = queue.pop_front() {
1482 let download_id = queued.DownloadId.clone();
1483 drop(queue); let config = DownloadConfig {
1486 url:queued.url.clone(),
1487 destination:queued.destination.to_string_lossy().to_string(),
1488 checksum:queued.checksum.clone(),
1489 priority:queued.priority,
1490 MaxFileSize:queued.MaxFileSize,
1491 ValidateDiskSpace:queued.ValidateDiskSpace,
1492 ..Default::default()
1493 };
1494
1495 {
1496 let mut stats = self.statistics.write().await;
1497 stats.QueuedDownloads = stats.QueuedDownloads.saturating_sub(1);
1498 }
1499
1500 let manager = self.clone();
1502 let download_id_clone = download_id.clone();
1503 tokio::spawn(async move {
1504 if let Err(e) = manager.DownloadFileWithConfig(config).await {
1505 dev_log!(
1506 "update",
1507 "error: [DownloadManager] Queued download failed [ID: {}]: {}",
1508 download_id_clone,
1509 e
1510 ); let _ = manager
1512 .UpdateDownloadStatus(&download_id_clone, DownloadState::Failed, None, Some(e.to_string()))
1513 .await;
1514 }
1515 });
1516
1517 Ok(Some(download_id))
1518 } else {
1519 Ok(None)
1520 }
1521 }
1522
1523 pub async fn StartBackgroundTasks(&self) -> Result<tokio::task::JoinHandle<()>> {
1525 let manager = self.clone();
1526
1527 let handle = tokio::spawn(async move {
1528 manager.BackgroundTaskLoop().await;
1529 });
1530
1531 dev_log!("update", "[DownloadManager] Background tasks started");
1532 Ok(handle)
1533 }
1534
1535 async fn BackgroundTaskLoop(&self) {
1537 let mut interval = tokio::time::interval(Duration::from_secs(60));
1538
1539 loop {
1540 interval.tick().await;
1541
1542 if let Err(e) = self.ProcessQueue().await {
1544 dev_log!("update", "error: [DownloadManager] Queue processing error: {}", e);
1545 }
1546
1547 self.CleanupCompletedDownloads().await;
1549
1550 if let Err(e) = self.CleanupCache().await {
1552 dev_log!("update", "error: [DownloadManager] Cache cleanup failed: {}", e);
1553 }
1554 }
1555 }
1556
1557 async fn CleanupCompletedDownloads(&self) {
1559 let mut downloads = self.ActiveDownloads.write().await;
1560
1561 let mut cleaned_count = 0;
1562 downloads.retain(|_, download| {
1563 let is_final = matches!(
1564 download.status,
1565 DownloadState::Completed | DownloadState::Failed | DownloadState::Cancelled
1566 );
1567 if is_final {
1568 cleaned_count += 1;
1569 }
1570 !is_final
1571 });
1572
1573 if cleaned_count > 0 {
1574 dev_log!("update", "[DownloadManager] Cleaned up {} completed downloads", cleaned_count);
1575 }
1576 }
1577
1578 async fn CleanupCache(&self) -> Result<()> {
1580 let max_age_days = 7;
1581 let now = chrono::Utc::now();
1582
1583 let mut entries = tokio::fs::read_dir(&self.CacheDirectory)
1584 .await
1585 .map_err(|e| AirError::FileSystem(format!("Failed to read cache directory: {}", e)))?;
1586
1587 let mut cleaned_count = 0;
1588
1589 while let Some(entry) = entries
1590 .next_entry()
1591 .await
1592 .map_err(|e| AirError::FileSystem(format!("Failed to read cache entry: {}", e)))?
1593 {
1594 let metadata = entry
1595 .metadata()
1596 .await
1597 .map_err(|e| AirError::FileSystem(format!("Failed to get file metadata: {}", e)))?;
1598
1599 if metadata.is_file() {
1600 let path = entry.path();
1601
1602 let IsActive = {
1604 let downloads = self.ActiveDownloads.read().await;
1605 downloads.values().any(|d| d.destination == path)
1606 };
1607
1608 if IsActive {
1609 continue;
1610 }
1611
1612 let modified = metadata
1613 .modified()
1614 .map_err(|e| AirError::FileSystem(format!("Failed to get modification time: {}", e)))?;
1615
1616 let modified_time = chrono::DateTime::<chrono::Utc>::from(modified);
1617 let age = now.signed_duration_since(modified_time);
1618
1619 if age.num_days() > max_age_days {
1620 match tokio::fs::remove_file(&path).await {
1621 Ok(_) => {
1622 dev_log!(
1623 "update",
1624 "[DownloadManager] Removed old cache file: {}",
1625 entry.file_name().to_string_lossy()
1626 );
1627 cleaned_count += 1;
1628 },
1629 Err(e) => {
1630 dev_log!(
1631 "update",
1632 "warn: [DownloadManager] Failed to remove cache file {}: {}",
1633 entry.file_name().to_string_lossy(),
1634 e
1635 );
1636 },
1637 }
1638 }
1639 }
1640 }
1641
1642 if cleaned_count > 0 {
1643 dev_log!("update", "[DownloadManager] Cleaned up {} old cache files", cleaned_count);
1644 }
1645
1646 Ok(())
1647 }
1648
1649 pub async fn StopBackgroundTasks(&self) {
1651 dev_log!("update", "[DownloadManager] Stopping background tasks");
1652 let ids_to_cancel:Vec<String> = {
1654 let downloads = self.ActiveDownloads.read().await;
1655 downloads
1656 .iter()
1657 .filter(|(_, s)| matches!(s.status, DownloadState::Downloading))
1658 .map(|(id, _)| id.clone())
1659 .collect()
1660 };
1661
1662 for id in ids_to_cancel {
1664 let _ = self.CancelDownload(&id).await;
1665 }
1666
1667 let _ = self
1669 .AppState
1670 .UpdateServiceStatus("downloader", crate::ApplicationState::ServiceStatus::Stopped)
1671 .await;
1672 }
1673
1674 pub async fn SetBandwidthLimit(&mut self, mb_per_sec:usize) {
1687 let bytes_per_sec = (mb_per_sec.max(1).min(1000) * 1024 * 1024) as u64;
1688
1689 {
1691 let mut bucket = self.TokenBucket.write().await;
1692 bucket.set_rate(bytes_per_sec);
1693 }
1694
1695 let permits = mb_per_sec.max(1).min(1000);
1697 self.BandwidthLimiter = Arc::new(Semaphore::new(permits));
1698
1699 dev_log!(
1700 "update",
1701 "[DownloadManager] Bandwidth limit set to {} MB/s ({} bytes/s)",
1702 mb_per_sec,
1703 bytes_per_sec
1704 );
1705 }
1706
1707 pub async fn SetMaxConcurrentDownloads(&mut self, max:usize) {
1711 let permits = max.max(1).min(20);
1712 self.ConcurrentLimiter = Arc::new(Semaphore::new(permits));
1713 dev_log!("update", "[DownloadManager] Max concurrent downloads set to {}", max);
1714 }
1715}
1716
1717impl Clone for DownloadManager {
1718 fn clone(&self) -> Self {
1719 Self {
1720 AppState:self.AppState.clone(),
1721 ActiveDownloads:self.ActiveDownloads.clone(),
1722 DownloadQueue:self.DownloadQueue.clone(),
1723 CacheDirectory:self.CacheDirectory.clone(),
1724 client:self.client.clone(),
1725 ChecksumVerifier:self.ChecksumVerifier.clone(),
1726 BandwidthLimiter:self.BandwidthLimiter.clone(),
1727 TokenBucket:self.TokenBucket.clone(),
1728 ConcurrentLimiter:self.ConcurrentLimiter.clone(),
1729 statistics:self.statistics.clone(),
1730 }
1731 }
1732}
1733
1734impl Default for DownloadStatistics {
1735 fn default() -> Self {
1736 Self {
1737 TotalDownloads:0,
1738 SuccessfulDownloads:0,
1739 FailedDownloads:0,
1740 CancelledDownloads:0,
1741 TotalBytesDownloaded:0,
1742 TotalDownloadTimeSecs:0.0,
1743 AverageDownloadRate:0.0,
1744 PeakDownloadRate:0,
1745 ActiveDownloads:0,
1746 QueuedDownloads:0,
1747 }
1748 }
1749}
1750
1751fn ExpectedChecksumFromConfig(config:&DownloadConfig) -> Option<&str> {
1753 if config.checksum.is_empty() { None } else { Some(&config.checksum) }
1754}
1755
1756#[derive(Debug, Clone)]
1758struct ChunkInfo {
1759 start:u64,
1760 end:u64,
1761 #[allow(dead_code)]
1762 downloaded:u64,
1763 temp_path:PathBuf,
1764}
1765
1766#[derive(Debug)]
1768#[allow(dead_code)]
1769struct ParallelDownloadResult {
1770 chunks:Vec<ChunkInfo>,
1771 total_size:u64,
1772}
1773
1774impl DownloadManager {
1811 pub async fn DownloadFileWithChunks(
1821 &self,
1822 url:String,
1823 destination:String,
1824 checksum:String,
1825 chunk_size_mb:usize,
1826 ) -> Result<DownloadResult> {
1827 dev_log!(
1828 "update",
1829 "[DownloadManager] Starting chunked download - URL: {}, Chunk size: {} MB",
1830 url,
1831 chunk_size_mb
1832 );
1833
1834 let sanitized_url = Self::ValidateAndSanitizeUrl(&url)?;
1836
1837 let total_size = self.GetRemoteFileSize(&sanitized_url).await?;
1839
1840 dev_log!("update", "[DownloadManager] Remote file size: {} bytes", total_size);
1841 let chunk_threshold = 50 * 1024 * 1024; if total_size < chunk_threshold {
1844 dev_log!(
1845 "update",
1846 "[DownloadManager] File too small for chunked download, using normal download"
1847 );
1848 return self.DownloadFile(url, destination, checksum).await;
1849 }
1850
1851 let chunk_size = (chunk_size_mb * 1024 * 1024) as u64;
1853 let num_chunks = ((total_size + chunk_size - 1) / chunk_size) as usize;
1854 let num_concurrent = num_chunks.min(4); dev_log!(
1857 "update",
1858 "[DownloadManager] Downloading in {} chunks ({} concurrent)",
1859 num_chunks,
1860 num_concurrent
1861 );
1862
1863 let DownloadId = Utility::GenerateRequestId();
1864 let DestinationPath = if destination.is_empty() {
1865 let filename = sanitized_url.split('/').last().unwrap_or("download.bin");
1866 self.CacheDirectory.join(filename)
1867 } else {
1868 ConfigurationManager::ExpandPath(&destination)?
1869 };
1870
1871 let temp_dir = DestinationPath.with_extension("chunks");
1873 tokio::fs::create_dir_all(&temp_dir)
1874 .await
1875 .map_err(|e| AirError::FileSystem(format!("Failed to create temp directory: {}", e)))?;
1876
1877 let mut chunks = Vec::with_capacity(num_chunks);
1879 for i in 0..num_chunks {
1880 let start = (i as u64) * chunk_size;
1881 let end = std::cmp::min(start + chunk_size - 1, total_size - 1);
1882
1883 chunks.push(ChunkInfo { start, end, downloaded:0, temp_path:temp_dir.join(format!("chunk_{:04}", i)) });
1884 }
1885
1886 let downloaded_tracker = Arc::new(RwLock::new(0u64));
1888 let completed_tracker = Arc::new(RwLock::new(0usize));
1889
1890 let mut handles = Vec::new();
1892 for (i, chunk) in chunks.iter().enumerate() {
1893 let manager = self.clone();
1894 let url_clone = sanitized_url.clone();
1895 let chunk_clone = chunk.clone();
1896 let downloaded_tracker = downloaded_tracker.clone();
1897 let completed_tracker = completed_tracker.clone();
1898 let _Did = DownloadId.clone();
1899
1900 let handle = tokio::spawn(async move {
1901 manager.DownloadChunk(&url_clone, &chunk_clone, i).await?;
1902
1903 {
1905 let mut downloaded = downloaded_tracker.write().await;
1906 let mut completed = completed_tracker.write().await;
1907 *downloaded += chunk_clone.end - chunk_clone.start + 1;
1908 *completed += 1;
1909
1910 let progress = (*downloaded as f32 / total_size as f32) * 100.0;
1911 dev_log!(
1912 "update",
1913 "Chunk {} completed ({}/{}) - Progress: {:.1}%",
1914 i + 1,
1915 *completed,
1916 num_chunks,
1917 progress
1918 );
1919 }
1920
1921 Ok::<_, AirError>(())
1922 });
1923
1924 if (i + 1) % num_concurrent == 0 {
1926 for handle in handles.drain(..) {
1927 handle.await??;
1928 }
1929 }
1930
1931 handles.push(handle);
1932 }
1933
1934 for handle in handles {
1936 handle.await??;
1937 }
1938
1939 dev_log!("update", "[DownloadManager] Reassembling chunks into final file");
1941 self.ReassembleChunks(&chunks, &DestinationPath).await?;
1942
1943 tokio::fs::remove_dir_all(&temp_dir).await.map_err(|e| {
1945 dev_log!("update", "warn: [DownloadManager] Failed to clean up temp directory: {}", e);
1946 AirError::FileSystem(e.to_string())
1947 })?;
1948
1949 if !checksum.is_empty() {
1951 self.VerifyChecksum(&DestinationPath, &checksum).await?;
1952 }
1953
1954 let actual_checksum = self.CalculateChecksum(&DestinationPath).await?;
1955
1956 dev_log!("update", "[DownloadManager] Chunked download completed successfully");
1957 Ok(DownloadResult {
1958 path:DestinationPath.to_string_lossy().to_string(),
1959 size:total_size,
1960 checksum:actual_checksum,
1961 duration:Duration::from_secs(0),
1962 AverageRate:0,
1963 })
1964 }
1965
1966 async fn GetRemoteFileSize(&self, url:&str) -> Result<u64> {
1968 let response = self
1969 .client
1970 .head(url)
1971 .timeout(Duration::from_secs(30))
1972 .send()
1973 .await
1974 .map_err(|e| AirError::Network(format!("Failed to get file size: {}", e)))?;
1975
1976 if !response.status().is_success() {
1977 return Err(AirError::Network(format!("Failed to get file size: {}", response.status())));
1978 }
1979
1980 response
1981 .content_length()
1982 .ok_or_else(|| AirError::Network("Content-Length header not found".to_string()))
1983 }
1984
1985 async fn DownloadChunk(&self, url:&str, chunk:&ChunkInfo, chunk_index:usize) -> Result<()> {
1987 dev_log!(
1988 "update",
1989 "[DownloadManager] Downloading chunk {} (bytes {}-{})",
1990 chunk_index,
1991 chunk.start,
1992 chunk.end
1993 );
1994
1995 let range_header = format!("bytes={}-{}", chunk.start, chunk.end);
1996
1997 let response = self
1998 .client
1999 .get(url)
2000 .header(reqwest::header::RANGE, range_header)
2001 .timeout(Duration::from_secs(300))
2002 .send()
2003 .await
2004 .map_err(|e| AirError::Network(format!("Failed to start chunk download: {}", e)))?;
2005
2006 if response.status() != reqwest::StatusCode::PARTIAL_CONTENT {
2007 return Err(AirError::Network(format!(
2008 "Chunk download failed with status: {}",
2009 response.status()
2010 )));
2011 }
2012
2013 let bytes = response
2015 .bytes()
2016 .await
2017 .map_err(|e| AirError::Network(format!("Failed to read chunk bytes: {}", e)))?;
2018
2019 tokio::fs::write(&chunk.temp_path, &bytes)
2020 .await
2021 .map_err(|e| AirError::FileSystem(format!("Failed to write chunk: {}", e)))?;
2022
2023 dev_log!(
2024 "update",
2025 "[DownloadManager] Chunk {} downloaded: {} bytes",
2026 chunk_index,
2027 bytes.len()
2028 );
2029 Ok(())
2030 }
2031
2032 async fn ReassembleChunks(&self, chunks:&[ChunkInfo], destination:&Path) -> Result<()> {
2034 use tokio::io::AsyncWriteExt;
2035
2036 let mut file = tokio::fs::File::create(destination)
2037 .await
2038 .map_err(|e| AirError::FileSystem(format!("Failed to create destination file: {}", e)))?;
2039
2040 let mut sorted_chunks:Vec<_> = chunks.iter().collect();
2042 sorted_chunks.sort_by_key(|c| c.start);
2043
2044 for chunk in sorted_chunks {
2045 let contents = tokio::fs::read(&chunk.temp_path)
2046 .await
2047 .map_err(|e| AirError::FileSystem(format!("Failed to read chunk: {}", e)))?;
2048
2049 file.write_all(&contents)
2050 .await
2051 .map_err(|e| AirError::FileSystem(format!("Failed to write chunk to file: {}", e)))?;
2052
2053 dev_log!(
2054 "update",
2055 "[DownloadManager] Reassembled chunk (bytes {}-{})",
2056 chunk.start,
2057 chunk.end
2058 );
2059 }
2060
2061 file.flush()
2062 .await
2063 .map_err(|e| AirError::FileSystem(format!("Failed to flush file: {}", e)))?;
2064
2065 dev_log!("update", "[DownloadManager] All chunks reassembled successfully");
2066 Ok(())
2067 }
2068}