AirLibrary/Indexing/Background/
StartWatcher.rs1use std::{path::PathBuf, sync::Arc, time::Duration};
67
68use tokio::{
69 sync::{Mutex, RwLock, Semaphore},
70 task::JoinHandle,
71};
72
73use crate::{AirError, ApplicationState::ApplicationState, Indexing::State::CreateState::FileIndex, Result, dev_log};
74
75const MAX_WATCH_PROCESSORS:usize = 5;
77
78pub struct BackgroundIndexerContext {
80 pub app_state:Arc<ApplicationState>,
82 pub file_index:Arc<RwLock<FileIndex>>,
84 pub corruption_detected:Arc<Mutex<bool>>,
86 pub file_watcher:Arc<Mutex<Option<notify::RecommendedWatcher>>>,
88 pub indexing_semaphore:Arc<Semaphore>,
90 pub debounced_handler:Arc<crate::Indexing::Watch::WatchFile::DebouncedEventHandler>,
92}
93
94impl BackgroundIndexerContext {
95 pub fn new(app_state:Arc<ApplicationState>, file_index:Arc<RwLock<FileIndex>>) -> Self {
96 Self {
97 app_state,
98 file_index,
99 corruption_detected:Arc::new(Mutex::new(false)),
100 file_watcher:Arc::new(Mutex::new(None)),
101 indexing_semaphore:Arc::new(Semaphore::new(MAX_WATCH_PROCESSORS)),
102 debounced_handler:Arc::new(crate::Indexing::Watch::WatchFile::DebouncedEventHandler::new()),
103 }
104 }
105}
106
107pub async fn StartFileWatcher(context:&BackgroundIndexerContext, paths:Vec<PathBuf>) -> Result<()> {
115 use notify::Watcher;
116
117 let index = context.file_index.clone();
118 let corruption_flag = context.corruption_detected.clone();
119 let config = context.app_state.Configuration.Indexing.clone();
120 let debounced_handler = context.debounced_handler.clone();
121
122 let mut watcher:notify::RecommendedWatcher = Watcher::new(
124 move |res:std::result::Result<notify::Event, notify::Error>| {
125 if let Ok(event) = res {
126 if *corruption_flag.blocking_lock() {
128 dev_log!(
129 "indexing",
130 "warn: [StartWatcher] Skipping file event - index marked as corrupted"
131 );
132 return;
133 }
134
135 let index = index.clone();
136 let _index = index.clone();
138 let debounced_handler = debounced_handler.clone();
139 let _config_clone = config.clone();
140
141 tokio::spawn(async move {
142 if let Some(change_type) = crate::Indexing::Watch::WatchFile::EventKindToChangeType(event.kind) {
144 for path in &event.paths {
145 if crate::Indexing::Watch::WatchFile::ShouldWatchPath(
146 path,
147 &crate::Indexing::Watch::WatchFile::GetDefaultIgnoredPatterns(),
148 ) {
149 debounced_handler.AddChange(path.clone(), change_type).await;
150 }
151 }
152 }
153 });
154 }
155 },
156 notify::Config::default(),
157 )
158 .map_err(|e| AirError::Internal(format!("Failed to create file watcher: {}", e)))?;
159
160 for path in &paths {
162 if path.exists() {
163 match crate::Indexing::Watch::WatchFile::ValidateWatchPath(path) {
164 Ok(()) => {
165 watcher
166 .watch(path, notify::RecursiveMode::Recursive)
167 .map_err(|e| AirError::FileSystem(format!("Failed to watch path {}: {}", path.display(), e)))?;
168 dev_log!("indexing", "[StartWatcher] Watching path: {}", path.display());
169 },
170 Err(e) => {
171 dev_log!(
172 "indexing",
173 "warn: [StartWatcher] Skipping invalid watch path {}: {}",
174 path.display(),
175 e
176 );
177 },
178 }
179 }
180 }
181
182 *context.file_watcher.lock().await = Some(watcher);
183
184 dev_log!(
185 "indexing",
186 "[StartWatcher] File watcher started successfully for {} paths",
187 paths.len()
188 );
189 Ok(())
190}
191
192pub fn StartDebounceProcessor(context:Arc<BackgroundIndexerContext>) -> JoinHandle<()> {
194 tokio::spawn(async move {
195 dev_log!("indexing", "[StartWatcher] Debounce processor started");
196 let interval = Duration::from_millis(100); let debounce_cutoff = Duration::from_millis(500);
199
200 loop {
201 tokio::time::sleep(interval).await;
202 {
203 if *context.corruption_detected.lock().await {
205 dev_log!("indexing", "warn: [StartWatcher] Index corrupted, pausing debounce processing");
206 tokio::time::sleep(Duration::from_secs(5)).await;
207 continue;
208 }
209
210 let config = context.app_state.Configuration.Indexing.clone();
212
213 match context
214 .debounced_handler
215 .ProcessPendingChanges(debounce_cutoff, &context.file_index, &config)
216 .await
217 {
218 Ok(changes) => {
219 if !changes.is_empty() {
220 dev_log!("indexing", "[StartWatcher] Processed {} debounced changes", changes.len());
221 }
222 },
223 Err(e) => {
224 dev_log!("indexing", "error: [StartWatcher] Failed to process pending changes: {}", e);
225 },
226 }
227 }
228 }
229 })
230}
231
232pub async fn StartBackgroundTasks(context:Arc<BackgroundIndexerContext>) -> Result<tokio::task::JoinHandle<()>> {
234 let config = &context.app_state.Configuration.Indexing;
235
236 if !config.Enabled {
237 dev_log!("indexing", "[StartWatcher] Background indexing disabled in configuration");
238 return Err(AirError::Configuration("Background indexing is disabled".to_string()));
239 }
240
241 let handle = tokio::spawn(BackgroundTask(context));
242
243 dev_log!("indexing", "[StartWatcher] Background tasks started");
244 Ok(handle)
245}
246
247pub async fn StopBackgroundTasks(_context:&BackgroundIndexerContext) {
249 dev_log!("indexing", "[StartWatcher] Stopping background tasks"); }
251
252pub async fn StopFileWatcher(context:&BackgroundIndexerContext) {
254 if let Some(watcher) = context.file_watcher.lock().await.take() {
255 drop(watcher);
256 dev_log!("indexing", "[StartWatcher] File watcher stopped");
257 }
258}
259
260async fn BackgroundTask(context:Arc<BackgroundIndexerContext>) {
262 let config = context.app_state.Configuration.Indexing.clone();
263
264 let interval = Duration::from_secs(config.UpdateIntervalMinutes as u64 * 60);
265 let mut interval = tokio::time::interval(interval);
266
267 dev_log!(
268 "indexing",
269 "[StartWatcher] Background indexing configured for {} minute intervals",
270 config.UpdateIntervalMinutes
271 );
272
273 loop {
274 interval.tick().await;
275 {
276 if *context.corruption_detected.lock().await {
278 dev_log!("indexing", "warn: [StartWatcher] Index corrupted, skipping background update");
279 continue;
280 }
281
282 dev_log!("indexing", "[StartWatcher] Running periodic background index...");
283 let directories = config.IndexDirectory.clone();
285 if let Err(e) = crate::Indexing::Scan::ScanDirectory::ScanDirectory(&directories, vec![], &config, 10).await
286 {
287 dev_log!("indexing", "error: [StartWatcher] Background indexing failed: {}", e);
288 }
289 }
290 }
291}
292
293pub async fn GetWatcherStatus(context:&BackgroundIndexerContext) -> WatcherStatus {
295 let is_running = context.file_watcher.lock().await.is_some();
296 let pending_count = context.debounced_handler.PendingCount().await;
297 let is_corrupted = *context.corruption_detected.lock().await;
298
299 WatcherStatus { is_running, pending_count, is_corrupted }
300}
301
302#[derive(Debug, Clone)]
304pub struct WatcherStatus {
305 pub is_running:bool,
306 pub pending_count:usize,
307 pub is_corrupted:bool,
308}
309
310pub async fn StartAll(
312 context:Arc<BackgroundIndexerContext>,
313 watch_paths:Vec<PathBuf>,
314) -> Result<(Option<JoinHandle<()>>, Option<JoinHandle<()>>)> {
315 let watcher_handle = if config_watch_enabled(&context) {
316 match StartFileWatcher(&context, watch_paths).await {
317 Ok(()) => {
318 Some(StartDebounceProcessor(context.clone()))
320 },
321 Err(e) => {
322 dev_log!("indexing", "error: [StartWatcher] Failed to start file watcher: {}", e);
323 None
324 },
325 }
326 } else {
327 None
328 };
329
330 let background_handle = match StartBackgroundTasks(context.clone()).await {
331 Ok(handle) => Some(handle),
332 Err(e) => {
333 dev_log!("indexing", "warn: [StartWatcher] Failed to start background tasks: {}", e);
334 None
335 },
336 };
337
338 Ok((watcher_handle, background_handle))
339}
340
341pub async fn StopAll(context:&BackgroundIndexerContext) {
343 StopBackgroundTasks(context).await;
344 StopFileWatcher(context).await;
345}
346
347fn config_watch_enabled(context:&BackgroundIndexerContext) -> bool { context.app_state.Configuration.Indexing.Enabled }