Skip to main content

AirLibrary/Indexing/Background/
StartWatcher.rs

1//! # StartWatcher
2//!
3//! ## File: Indexing/Background/StartWatcher.rs
4//!
5//! ## Role in Air Architecture
6//!
7//! Provides background task management for the File Indexer service,
8//! handling file watching startup and periodic indexing tasks.
9//!
10//! ## Primary Responsibility
11//!
12//! Start and manage background file watcher and periodic indexing tasks
13//! for the indexing service.
14//!
15//! ## Secondary Responsibilities
16//!
17//! - File watcher initialization and lifecycle management
18//! - Periodic background re-indexing
19//! - Watcher event debouncing
20//! - Background task cleanup
21//!
22//! ## Dependencies
23//!
24//! **External Crates:**
25//! - `notify` - File system watching
26//! - `tokio` - Async runtime for background tasks
27//!
28//! **Internal Modules:**
29//! - `crate::Result` - Error handling type
30//! - `crate::AirError` - Error types
31//! - `crate::ApplicationState::ApplicationState` - Application state
32//! - `super::super::FileIndexer` - Main file indexer
33//! - `super::WatchFile` - File watching operations
34//!
35//! ## Dependents
36//!
37//! - `Indexing::mod::FileIndexer` - Main file indexer implementation
38//!
39//! ## VSCode Pattern Reference
40//!
41//! Inspired by VSCode's background services in
42//! `src/vs/workbench/services/search/common/`
43//!
44//! ## Security Considerations
45//!
46//! - Path validation before watching
47//! - Watch path limits enforcement
48//! - Permission checking on watch paths
49//!
50//! ## Performance Considerations
51//!
52//! - Event debouncing prevents excessive re-indexing
53//! - Parallel processing of file changes
54//! - Efficient background task scheduling
55//!
56//! ## Error Handling Strategy
57//!
58//! Background tasks log errors and continue running, ensuring
59//! temporary failures don't stop the indexing service.
60//!
61//! ## Thread Safety
62//!
63//! Background tasks use Arc for shared state and async/await
64//! for safe concurrent operations.
65
66use std::{path::PathBuf, sync::Arc, time::Duration};
67
68use tokio::{
69	sync::{Mutex, RwLock, Semaphore},
70	task::JoinHandle,
71};
72
73use crate::{AirError, ApplicationState::ApplicationState, Indexing::State::CreateState::FileIndex, Result, dev_log};
74
75/// Maximum number of parallel watch event processors
76const MAX_WATCH_PROCESSORS:usize = 5;
77
78/// Background indexer context containing shared state
79pub struct BackgroundIndexerContext {
80	/// Application state reference
81	pub app_state:Arc<ApplicationState>,
82	/// File index
83	pub file_index:Arc<RwLock<FileIndex>>,
84	/// Corruption detected flag
85	pub corruption_detected:Arc<Mutex<bool>>,
86	/// File watcher (optional)
87	pub file_watcher:Arc<Mutex<Option<notify::RecommendedWatcher>>>,
88	/// Semaphore for limiting parallel operations
89	pub indexing_semaphore:Arc<Semaphore>,
90	/// Debounced event handler
91	pub debounced_handler:Arc<crate::Indexing::Watch::WatchFile::DebouncedEventHandler>,
92}
93
94impl BackgroundIndexerContext {
95	pub fn new(app_state:Arc<ApplicationState>, file_index:Arc<RwLock<FileIndex>>) -> Self {
96		Self {
97			app_state,
98			file_index,
99			corruption_detected:Arc::new(Mutex::new(false)),
100			file_watcher:Arc::new(Mutex::new(None)),
101			indexing_semaphore:Arc::new(Semaphore::new(MAX_WATCH_PROCESSORS)),
102			debounced_handler:Arc::new(crate::Indexing::Watch::WatchFile::DebouncedEventHandler::new()),
103		}
104	}
105}
106
107/// Start file watcher for incremental indexing
108///
109/// Monitors file system changes and updates index in real-time.
110/// This enables:
111/// - Real-time search updates
112/// - Automatic reindexing of changed files
113/// - Removal of deleted files from index
114pub async fn StartFileWatcher(context:&BackgroundIndexerContext, paths:Vec<PathBuf>) -> Result<()> {
115	use notify::Watcher;
116
117	let index = context.file_index.clone();
118	let corruption_flag = context.corruption_detected.clone();
119	let config = context.app_state.Configuration.Indexing.clone();
120	let debounced_handler = context.debounced_handler.clone();
121
122	// Create and start the watcher
123	let mut watcher:notify::RecommendedWatcher = Watcher::new(
124		move |res:std::result::Result<notify::Event, notify::Error>| {
125			if let Ok(event) = res {
126				// Check corruption flag before processing events
127				if *corruption_flag.blocking_lock() {
128					dev_log!(
129						"indexing",
130						"warn: [StartWatcher] Skipping file event - index marked as corrupted"
131					);
132					return;
133				}
134
135				let index = index.clone();
136				// Variables cloned for use in async task
137				let _index = index.clone();
138				let debounced_handler = debounced_handler.clone();
139				let _config_clone = config.clone();
140
141				tokio::spawn(async move {
142					// Convert event to change type and add to debounced handler
143					if let Some(change_type) = crate::Indexing::Watch::WatchFile::EventKindToChangeType(event.kind) {
144						for path in &event.paths {
145							if crate::Indexing::Watch::WatchFile::ShouldWatchPath(
146								path,
147								&crate::Indexing::Watch::WatchFile::GetDefaultIgnoredPatterns(),
148							) {
149								debounced_handler.AddChange(path.clone(), change_type).await;
150							}
151						}
152					}
153				});
154			}
155		},
156		notify::Config::default(),
157	)
158	.map_err(|e| AirError::Internal(format!("Failed to create file watcher: {}", e)))?;
159
160	// Watch all specified paths
161	for path in &paths {
162		if path.exists() {
163			match crate::Indexing::Watch::WatchFile::ValidateWatchPath(path) {
164				Ok(()) => {
165					watcher
166						.watch(path, notify::RecursiveMode::Recursive)
167						.map_err(|e| AirError::FileSystem(format!("Failed to watch path {}: {}", path.display(), e)))?;
168					dev_log!("indexing", "[StartWatcher] Watching path: {}", path.display());
169				},
170				Err(e) => {
171					dev_log!(
172						"indexing",
173						"warn: [StartWatcher] Skipping invalid watch path {}: {}",
174						path.display(),
175						e
176					);
177				},
178			}
179		}
180	}
181
182	*context.file_watcher.lock().await = Some(watcher);
183
184	dev_log!(
185		"indexing",
186		"[StartWatcher] File watcher started successfully for {} paths",
187		paths.len()
188	);
189	Ok(())
190}
191
192/// Start the debounce processor task
193pub fn StartDebounceProcessor(context:Arc<BackgroundIndexerContext>) -> JoinHandle<()> {
194	tokio::spawn(async move {
195		dev_log!("indexing", "[StartWatcher] Debounce processor started");
196		let interval = Duration::from_millis(100); // Process every 100ms
197		// Debounce age cutoff
198		let debounce_cutoff = Duration::from_millis(500);
199
200		loop {
201			tokio::time::sleep(interval).await;
202			{
203				// Check corruption flag
204				if *context.corruption_detected.lock().await {
205					dev_log!("indexing", "warn: [StartWatcher] Index corrupted, pausing debounce processing");
206					tokio::time::sleep(Duration::from_secs(5)).await;
207					continue;
208				}
209
210				// Process pending changes
211				let config = context.app_state.Configuration.Indexing.clone();
212
213				match context
214					.debounced_handler
215					.ProcessPendingChanges(debounce_cutoff, &context.file_index, &config)
216					.await
217				{
218					Ok(changes) => {
219						if !changes.is_empty() {
220							dev_log!("indexing", "[StartWatcher] Processed {} debounced changes", changes.len());
221						}
222					},
223					Err(e) => {
224						dev_log!("indexing", "error: [StartWatcher] Failed to process pending changes: {}", e);
225					},
226				}
227			}
228		}
229	})
230}
231
232/// Start background tasks for periodic indexing
233pub async fn StartBackgroundTasks(context:Arc<BackgroundIndexerContext>) -> Result<tokio::task::JoinHandle<()>> {
234	let config = &context.app_state.Configuration.Indexing;
235
236	if !config.Enabled {
237		dev_log!("indexing", "[StartWatcher] Background indexing disabled in configuration");
238		return Err(AirError::Configuration("Background indexing is disabled".to_string()));
239	}
240
241	let handle = tokio::spawn(BackgroundTask(context));
242
243	dev_log!("indexing", "[StartWatcher] Background tasks started");
244	Ok(handle)
245}
246
247/// Stop background tasks
248pub async fn StopBackgroundTasks(_context:&BackgroundIndexerContext) {
249	dev_log!("indexing", "[StartWatcher] Stopping background tasks"); // Tasks are cancelled when the task handle is dropped
250}
251
252/// Stop file watcher
253pub async fn StopFileWatcher(context:&BackgroundIndexerContext) {
254	if let Some(watcher) = context.file_watcher.lock().await.take() {
255		drop(watcher);
256		dev_log!("indexing", "[StartWatcher] File watcher stopped");
257	}
258}
259
260/// Background task for periodic indexing
261async fn BackgroundTask(context:Arc<BackgroundIndexerContext>) {
262	let config = context.app_state.Configuration.Indexing.clone();
263
264	let interval = Duration::from_secs(config.UpdateIntervalMinutes as u64 * 60);
265	let mut interval = tokio::time::interval(interval);
266
267	dev_log!(
268		"indexing",
269		"[StartWatcher] Background indexing configured for {} minute intervals",
270		config.UpdateIntervalMinutes
271	);
272
273	loop {
274		interval.tick().await;
275		{
276			// Check corruption flag
277			if *context.corruption_detected.lock().await {
278				dev_log!("indexing", "warn: [StartWatcher] Index corrupted, skipping background update");
279				continue;
280			}
281
282			dev_log!("indexing", "[StartWatcher] Running periodic background index...");
283			// Re-index configured directories
284			let directories = config.IndexDirectory.clone();
285			if let Err(e) = crate::Indexing::Scan::ScanDirectory::ScanDirectory(&directories, vec![], &config, 10).await
286			{
287				dev_log!("indexing", "error: [StartWatcher] Background indexing failed: {}", e);
288			}
289		}
290	}
291}
292
293/// Get watcher status
294pub async fn GetWatcherStatus(context:&BackgroundIndexerContext) -> WatcherStatus {
295	let is_running = context.file_watcher.lock().await.is_some();
296	let pending_count = context.debounced_handler.PendingCount().await;
297	let is_corrupted = *context.corruption_detected.lock().await;
298
299	WatcherStatus { is_running, pending_count, is_corrupted }
300}
301
302/// Watcher status information
303#[derive(Debug, Clone)]
304pub struct WatcherStatus {
305	pub is_running:bool,
306	pub pending_count:usize,
307	pub is_corrupted:bool,
308}
309
310/// Start all background components (watcher and tasks)
311pub async fn StartAll(
312	context:Arc<BackgroundIndexerContext>,
313	watch_paths:Vec<PathBuf>,
314) -> Result<(Option<JoinHandle<()>>, Option<JoinHandle<()>>)> {
315	let watcher_handle = if config_watch_enabled(&context) {
316		match StartFileWatcher(&context, watch_paths).await {
317			Ok(()) => {
318				// Start debounce processor
319				Some(StartDebounceProcessor(context.clone()))
320			},
321			Err(e) => {
322				dev_log!("indexing", "error: [StartWatcher] Failed to start file watcher: {}", e);
323				None
324			},
325		}
326	} else {
327		None
328	};
329
330	let background_handle = match StartBackgroundTasks(context.clone()).await {
331		Ok(handle) => Some(handle),
332		Err(e) => {
333			dev_log!("indexing", "warn: [StartWatcher] Failed to start background tasks: {}", e);
334			None
335		},
336	};
337
338	Ok((watcher_handle, background_handle))
339}
340
341/// Stop all background components
342pub async fn StopAll(context:&BackgroundIndexerContext) {
343	StopBackgroundTasks(context).await;
344	StopFileWatcher(context).await;
345}
346
347/// Check if watching is enabled in configuration
348fn config_watch_enabled(context:&BackgroundIndexerContext) -> bool { context.app_state.Configuration.Indexing.Enabled }