1use std::{
277 collections::HashMap,
278 sync::{Arc, Mutex},
279 time::{Duration, SystemTime},
280};
281
282use serde::{Deserialize, Serialize};
283use tokio::time::interval;
284use tauri::{Emitter, Manager};
285
286use crate::{IPC::AdvancedFeatures::PerformanceStats, RunTime::ApplicationRunTime::ApplicationRunTime, dev_log};
287
288#[derive(Clone, Serialize, Deserialize, Debug)]
295pub struct SyncStatus {
296 pub total_documents:u32,
297 pub synced_documents:u32,
298 pub conflicted_documents:u32,
299 pub offline_documents:u32,
300 pub last_sync_duration_ms:u64,
301}
302
303#[derive(Clone, Copy, PartialEq, Debug)]
305pub enum SyncState {
306 Modified,
307 Synced,
308 Conflicted,
309 Offline,
310}
311
312#[derive(Clone, Copy, Debug)]
314pub enum ChangeType {
315 Update,
316 Insert,
317 Delete,
318 Move,
319 Other,
320}
321
322#[derive(Clone, Debug)]
324pub struct SynchronizedDocument {
325 pub document_id:String,
326 pub file_path:String,
327 pub last_modified:u64,
328 pub content_hash:String,
329 pub sync_state:SyncState,
330 pub version:u32,
331}
332
333#[derive(Clone, Debug)]
335pub struct DocumentChange {
336 pub change_id:String,
337 pub document_id:String,
338 pub change_type:ChangeType,
339 pub content:Option<String>,
340 pub applied:bool,
341}
342
343pub struct DocumentSynchronization {
345 pub synchronized_documents:HashMap<String, SynchronizedDocument>,
346 pub pending_changes:HashMap<String, Vec<DocumentChange>>,
347 pub last_sync_time:u64,
348 pub sync_status:SyncStatus,
349}
350
351#[derive(Clone, Serialize, Deserialize, Debug)]
353pub struct RealTimeUpdate {
354 pub target:String,
355 pub data:String,
356}
357
358pub struct RealTimeUpdateManager {
360 pub Updates:Vec<RealTimeUpdate>,
361 pub Subscribers:HashMap<String, Vec<String>>,
362 pub UpdateQueue:Vec<RealTimeUpdate>,
363 pub LastBroadcast:u64,
364}
365
366#[derive(Clone, Debug)]
368pub struct ViewState {
369 pub zoom_level:f32,
370 pub sidebar_visible:bool,
371 pub panel_visible:bool,
372 pub status_bar_visible:bool,
373}
374
375#[derive(Clone, Debug)]
377pub struct GridLayout {
378 pub rows:u32,
379 pub columns:u32,
380 pub cell_width:u32,
381 pub cell_height:u32,
382}
383
384#[derive(Clone, Debug)]
386pub struct LayoutState {
387 pub editor_groups:Vec<String>,
388 pub active_group:u32,
389 pub grid_layout:GridLayout,
390}
391
392#[derive(Clone, Debug)]
394pub struct UIStateSynchronization {
395 pub active_editor:Option<String>,
396 pub cursor_positions:HashMap<String, (u32, u32)>,
397 pub selection_ranges:HashMap<String, (u32, u32)>,
398 pub view_state:ViewState,
399 pub theme:String,
400 pub layout:LayoutState,
401}
402
403#[derive(Clone)]
405pub struct WindAdvancedSync {
406 runtime:Arc<ApplicationRunTime>,
407 document_sync:Arc<Mutex<DocumentSynchronization>>,
408 ui_state_sync:Arc<Mutex<UIStateSynchronization>>,
409 real_time_updates:Arc<Mutex<RealTimeUpdateManager>>,
410 performance_stats:Arc<Mutex<PerformanceStats>>,
411 }
413
414impl WindAdvancedSync {
415 pub fn new(runtime:Arc<ApplicationRunTime>) -> Self {
417 Self {
418 runtime:runtime.clone(),
419 document_sync:Arc::new(Mutex::new(DocumentSynchronization {
420 synchronized_documents:HashMap::new(),
421 pending_changes:HashMap::new(),
422 last_sync_time:0,
423 sync_status:SyncStatus {
424 total_documents:0,
425 synced_documents:0,
426 conflicted_documents:0,
427 offline_documents:0,
428 last_sync_duration_ms:0,
429 },
430 })),
431 ui_state_sync:Arc::new(Mutex::new(UIStateSynchronization {
432 active_editor:None,
433 cursor_positions:HashMap::new(),
434 selection_ranges:HashMap::new(),
435 view_state:ViewState {
436 zoom_level:1.0,
437 sidebar_visible:true,
438 panel_visible:true,
439 status_bar_visible:true,
440 },
441 theme:"default".to_string(),
442 layout:LayoutState {
443 editor_groups:Vec::new(),
444 active_group:0,
445 grid_layout:GridLayout { rows:1, columns:1, cell_width:100, cell_height:100 },
446 },
447 })),
448 real_time_updates:Arc::new(Mutex::new(RealTimeUpdateManager {
449 Updates:Vec::new(),
450 Subscribers:HashMap::new(),
451 UpdateQueue:Vec::new(),
452 LastBroadcast:0,
453 })),
454 performance_stats:Arc::new(Mutex::new(PerformanceStats {
455 total_messages_sent:0,
456 total_messages_received:0,
457 average_processing_time_ms:0.0,
458 peak_message_rate:0,
459 error_count:0,
460 last_update:0,
461 connection_uptime:0,
462 })),
463 }
465 }
466
467 pub async fn initialize(&self) -> Result<(), String> {
469 dev_log!("ipc", "Initializing Wind Advanced Sync service");
470
471 self.start_sync_task().await;
473
474 self.start_performance_monitoring().await;
476
477 dev_log!("ipc", "Wind Advanced Sync service initialized successfully");
478 Ok(())
479 }
480
481 async fn start_sync_task(&self) {
483 let document_sync = self.document_sync.clone();
484 let runtime = self.runtime.clone();
485
486 tokio::spawn(async move {
487 let mut interval = interval(Duration::from_secs(5));
488
489 loop {
490 interval.tick().await;
491
492 if let Ok(mut sync) = document_sync.lock() {
494 let modified_docs:Vec<String> = sync
495 .synchronized_documents
496 .iter()
497 .filter(|(_, document)| document.sync_state == SyncState::Modified)
498 .map(|(doc_id, _)| doc_id.clone())
499 .collect();
500
501 if !modified_docs.is_empty() {
502 dev_log!("ipc", "Synchronizing {} documents", modified_docs.len());
503
504 sync.last_sync_time =
506 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis() as u64;
507
508 sync.sync_status = Self::calculate_sync_status(&sync.synchronized_documents);
510
511 let _ = runtime
513 .Environment
514 .ApplicationHandle
515 .emit("mountain_sync_status_update", sync.sync_status.clone());
516 }
517 }
518 }
519 });
520 }
521
522 async fn start_performance_monitoring(&self) {
524 let performance_stats = self.performance_stats.clone();
525 let runtime = self.runtime.clone();
526
527 tokio::spawn(async move {
528 let mut interval = interval(Duration::from_secs(10));
529
530 loop {
531 interval.tick().await;
532
533 if let Ok(mut stats) = performance_stats.lock() {
534 stats.last_update =
535 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis() as u64;
536 stats.connection_uptime += 10;
537
538 let _ = runtime
540 .Environment
541 .ApplicationHandle
542 .emit("mountain_performance_update", stats.clone());
543 }
544 }
545 });
546 }
547
548 fn calculate_sync_status(documents:&HashMap<String, SynchronizedDocument>) -> SyncStatus {
550 let total = documents.len() as u32;
551 let synced = documents.values().filter(|d| d.sync_state == SyncState::Synced).count() as u32;
552 let conflicted = documents.values().filter(|d| d.sync_state == SyncState::Conflicted).count() as u32;
553 let offline = documents.values().filter(|d| d.sync_state == SyncState::Offline).count() as u32;
554
555 SyncStatus {
556 total_documents:total,
557 synced_documents:synced,
558 conflicted_documents:conflicted,
559 offline_documents:offline,
560 last_sync_duration_ms:0,
561 }
562 }
563
564 pub fn register_commands(_app:&mut tauri::App) -> Result<(), Box<dyn std::error::Error>> {
566 dev_log!("ipc", "Registering Wind Advanced Sync IPC commands");
567 Ok(())
568 }
569}
570
571impl WindAdvancedSync {
572 pub async fn start_synchronization(self: Arc<Self>) -> Result<(), String> {
574 dev_log!("lifecycle", "Starting advanced synchronization");
575
576 let sync1 = self.clone();
578 tokio::spawn(async move {
579 sync1.synchronize_documents().await;
580 });
581
582 let sync2 = self.clone();
584 tokio::spawn(async move {
585 sync2.synchronize_ui_state().await;
586 });
587
588 let sync3 = self.clone();
590 tokio::spawn(async move {
591 sync3.broadcast_real_time_updates().await;
592 });
593
594 Ok(())
595 }
596
597 async fn synchronize_documents(&self) {
599 let mut interval = interval(Duration::from_secs(5));
600 let mut consecutive_failures = 0;
601 let max_consecutive_failures = 3;
602
603 loop {
604 interval.tick().await;
605
606 dev_log!("lifecycle", "Synchronizing documents");
607
608 let sync_start = std::time::Instant::now();
610 let mut success_count = 0;
611 let mut error_count = 0;
612
613 let changes = self.get_pending_changes().await;
615
616 for change in changes {
618 match self.apply_document_change(change).await {
619 Ok(_) => success_count += 1,
620 Err(e) => {
621 error_count += 1;
622 dev_log!("ipc", "error: [WindAdvancedSync] Failed to apply document change: {}", e);
623
624 consecutive_failures += 1;
626 if consecutive_failures >= max_consecutive_failures {
627 dev_log!("lifecycle", "Too many consecutive failures, slowing sync interval");
628 interval = tokio::time::interval(Duration::from_secs(30));
631 }
632 },
633 }
634 }
635
636 if success_count > 0 {
638 consecutive_failures = 0;
639 interval = tokio::time::interval(Duration::from_secs(5));
641 }
642
643 self.update_sync_status().await;
645
646 let sync_duration = sync_start.elapsed();
648 dev_log!(
649 "ipc",
650 "[WindAdvancedSync] Document sync completed: {} success, {} errors, {:.2}ms",
651 success_count,
652 error_count,
653 sync_duration.as_millis()
654 );
655 }
656 }
657
658 async fn synchronize_ui_state(&self) {
660 let mut interval = interval(Duration::from_secs(1));
661
662 loop {
663 interval.tick().await;
664
665 dev_log!("ipc", "[WindAdvancedSync] Synchronizing UI state");
666
667 let ui_state = self.get_ui_state().await;
669
670 if let Err(e) = self.update_ui_state(ui_state).await {
672 dev_log!("ipc", "error: [WindAdvancedSync] Failed to update UI state: {}", e);
673 }
674 }
675 }
676
677 async fn broadcast_real_time_updates(&self) {
679 let mut interval = interval(Duration::from_millis(100));
680
681 loop {
682 interval.tick().await;
683
684 let updates = self.get_pending_updates().await;
685
686 if !updates.is_empty() {
687 if let Err(e) = self.broadcast_updates(updates).await {
689 dev_log!("ipc", "error: [WindAdvancedSync] Failed to broadcast updates: {}", e);
690 }
691 }
692 }
693 }
694
695 async fn get_pending_changes(&self) -> Vec<DocumentChange> {
697 let sync = self.document_sync.lock().unwrap();
698 sync.pending_changes.values().flatten().cloned().collect()
699 }
700
701 async fn apply_document_change(&self, change:DocumentChange) -> Result<(), String> {
703 dev_log!("lifecycle", "Applying document change: {}", change.change_id);
704
705 let change_start = std::time::Instant::now();
707
708 if let Err(conflict) = self.check_for_conflicts(&change).await {
710 dev_log!("lifecycle", "Conflict detected: {}", conflict);
711 return Err(format!("Conflict detected: {}", conflict));
712 }
713
714 match change.change_type {
716 ChangeType::Update => {
717 if let Some(_content) = &change.content {
719 }
728 },
729 ChangeType::Insert => {
730 if let Some(_content) = &change.content {
732 }
741 },
742 ChangeType::Delete => {
743 },
752 _ => {
753 dev_log!("lifecycle", "Unsupported change type: {:?}", change.change_type);
754 },
755 }
756
757 let mut sync = self.document_sync.lock().unwrap();
759 if let Some(changes) = sync.pending_changes.get_mut(&change.document_id) {
760 if let Some(change_idx) = changes.iter().position(|c| c.change_id == change.change_id) {
761 changes[change_idx].applied = true;
762 }
763 }
764
765 let change_duration = change_start.elapsed();
767 dev_log!(
768 "ipc",
769 "[WindAdvancedSync] Change applied successfully in {:.2}ms: {}",
770 change_duration.as_millis(),
771 change.change_id
772 );
773
774 Ok(())
775 }
776
777 async fn check_for_conflicts(&self, change:&DocumentChange) -> Result<(), String> {
779 let sync = self.document_sync.lock().unwrap();
780
781 if let Some(document) = sync.synchronized_documents.get(&change.document_id) {
783 let current_time = SystemTime::now()
784 .duration_since(SystemTime::UNIX_EPOCH)
785 .unwrap_or_default()
786 .as_secs();
787
788 if current_time - document.last_modified < 10 {
791 return Err(format!(
792 "Document {} was modified recently ({}s ago)",
793 document.document_id,
794 current_time - document.last_modified
795 ));
796 }
797
798 if matches!(document.sync_state, SyncState::Conflicted) {
800 return Err(format!("Document {} is in conflicted state", document.document_id));
801 }
802 }
803
804 Ok(())
805 }
806
807 async fn update_sync_status(&self) {
809 let mut sync = self.document_sync.lock().unwrap();
810
811 sync.sync_status.total_documents = sync.synchronized_documents.len() as u32;
812 sync.sync_status.synced_documents = sync
813 .synchronized_documents
814 .values()
815 .filter(|doc| matches!(doc.sync_state, SyncState::Synced))
816 .count() as u32;
817 sync.sync_status.conflicted_documents = sync
818 .synchronized_documents
819 .values()
820 .filter(|doc| matches!(doc.sync_state, SyncState::Conflicted))
821 .count() as u32;
822 sync.sync_status.offline_documents = sync
823 .synchronized_documents
824 .values()
825 .filter(|doc| matches!(doc.sync_state, SyncState::Offline))
826 .count() as u32;
827
828 sync.last_sync_time = SystemTime::now()
829 .duration_since(SystemTime::UNIX_EPOCH)
830 .unwrap_or_default()
831 .as_secs();
832 }
833
834 async fn get_ui_state(&self) -> UIStateSynchronization {
836 let sync = self.ui_state_sync.lock().unwrap();
837 sync.clone()
838 }
839
840 async fn update_ui_state(&self, ui_state:UIStateSynchronization) -> Result<(), String> {
842 let mut sync = self.ui_state_sync.lock().unwrap();
843 *sync = ui_state;
844
845 Ok(())
851 }
852
853 async fn get_pending_updates(&self) -> Vec<RealTimeUpdate> {
855 let mut updates = self.real_time_updates.lock().unwrap();
856 let pending = updates.UpdateQueue.clone();
857 updates.UpdateQueue.clear();
858 pending
859 }
860
861 async fn broadcast_updates(&self, updates:Vec<RealTimeUpdate>) -> Result<(), String> {
863 for update in updates {
864 let subscribers = {
866 let rt = self.real_time_updates.lock().unwrap();
867 rt.Subscribers.get(&update.target).cloned()
868 };
869
870 if let Some(subscriber_list) = subscribers {
872 for subscriber in subscriber_list {
873 if let Err(e) = self
874 .runtime
875 .Environment
876 .ApplicationHandle
877 .emit(&format!("real-time-update-{}", subscriber), &update)
878 {
879 dev_log!("ipc", "error: [WindAdvancedSync] Failed to broadcast to {}: {}", subscriber, e);
880 }
881 }
882 }
883 }
884
885 Ok(())
886 }
887
888 pub async fn add_document(&self, document_id:String, file_path:String) -> Result<(), String> {
890 let mut sync = self.document_sync.lock().unwrap();
891
892 let document = SynchronizedDocument {
893 document_id:document_id.clone(),
894 file_path,
895 last_modified:SystemTime::now()
896 .duration_since(SystemTime::UNIX_EPOCH)
897 .unwrap_or_default()
898 .as_secs(),
899 content_hash:"".to_string(),
900 sync_state:SyncState::Synced,
901 version:1,
902 };
903
904 sync.synchronized_documents.insert(document_id, document);
905
906 dev_log!("lifecycle", "Document added for synchronization");
907 Ok(())
908 }
909
910 pub async fn subscribe_to_updates(&self, target:String, subscriber:String) -> Result<(), String> {
912 let mut updates = self.real_time_updates.lock().unwrap();
913
914 let target_clone = target.clone();
915 updates
916 .Subscribers
917 .entry(target_clone.clone())
918 .or_insert_with(Vec::new)
919 .push(subscriber);
920
921 dev_log!("lifecycle", "Subscriber added for target: {}", target_clone);
922 Ok(())
923 }
924
925 pub async fn queue_update(&self, update:RealTimeUpdate) -> Result<(), String> {
927 let mut updates = self.real_time_updates.lock().unwrap();
928
929 updates.UpdateQueue.push(update);
930 updates.LastBroadcast = SystemTime::now()
931 .duration_since(SystemTime::UNIX_EPOCH)
932 .unwrap_or_default()
933 .as_secs();
934
935 dev_log!("ipc", "[WindAdvancedSync] Update queued");
936 Ok(())
937 }
938
939 pub async fn get_sync_status(&self) -> SyncStatus {
941 let sync = self.document_sync.lock().unwrap();
942 sync.sync_status.clone()
943 }
944
945 pub async fn get_current_ui_state(&self) -> UIStateSynchronization { self.get_ui_state().await }
947
948 fn clone_sync(&self) -> WindAdvancedSync {
950 WindAdvancedSync {
951 runtime:self.runtime.clone(),
952 document_sync:self.document_sync.clone(),
953 ui_state_sync:self.ui_state_sync.clone(),
954 real_time_updates:self.real_time_updates.clone(),
955 performance_stats:self.performance_stats.clone(),
956 }
958 }
959}
960
961#[tauri::command]
963pub async fn mountain_add_document_for_sync(
964 app_handle:tauri::AppHandle,
965 document_id:String,
966 file_path:String,
967) -> Result<(), String> {
968 dev_log!("lifecycle", "Tauri command: add_document_for_sync");
969
970 if let Some(sync) = app_handle.try_state::<WindAdvancedSync>() {
971 sync.add_document(document_id, file_path).await
972 } else {
973 Err("WindAdvancedSync not found in application state".to_string())
974 }
975}
976
977#[tauri::command]
979pub async fn mountain_get_sync_status(app_handle:tauri::AppHandle) -> Result<SyncStatus, String> {
980 dev_log!("lifecycle", "Tauri command: get_sync_status");
981
982 if let Some(sync) = app_handle.try_state::<WindAdvancedSync>() {
983 Ok(sync.get_sync_status().await)
984 } else {
985 Err("WindAdvancedSync not found in application state".to_string())
986 }
987}
988
989#[tauri::command]
991pub async fn mountain_subscribe_to_updates(
992 app_handle:tauri::AppHandle,
993 target:String,
994 subscriber:String,
995) -> Result<(), String> {
996 dev_log!("lifecycle", "Tauri command: subscribe_to_updates");
997
998 if let Some(sync) = app_handle.try_state::<WindAdvancedSync>() {
999 sync.subscribe_to_updates(target, subscriber).await
1000 } else {
1001 Err("WindAdvancedSync not found in application state".to_string())
1002 }
1003}
1004
1005pub fn initialize_wind_advanced_sync(
1007 app_handle:&tauri::AppHandle,
1008 runtime:Arc<ApplicationRunTime>,
1009) -> Result<(), String> {
1010 dev_log!("lifecycle", "Initializing Wind advanced synchronization");
1011
1012 let sync = Arc::new(WindAdvancedSync::new(runtime));
1013
1014 app_handle.manage(sync.clone());
1016
1017 let sync_clone = sync.clone();
1019 tokio::spawn(async move {
1020 if let Err(e) = sync_clone.start_synchronization().await {
1021 dev_log!("ipc", "error: [WindAdvancedSync] Failed to start synchronization: {}", e);
1022 }
1023 });
1024
1025 Ok(())
1026}