AirLibrary/Indexing/Store/
UpdateIndex.rs1use std::{path::PathBuf, sync::Arc, time::Duration};
71
72use tokio::{
73 sync::{RwLock, Semaphore},
74 time::Instant,
75};
76
77use crate::{
78 AirError,
79 Configuration::IndexingConfig,
80 Indexing::State::CreateState::{FileIndex, FileMetadata},
81 Result,
82 dev_log,
83};
84
85pub async fn UpdateSingleFile(
87 index:&mut FileIndex,
88 file_path:&PathBuf,
89 config:&IndexingConfig,
90) -> Result<Option<FileMetadata>> {
91 let start_time = Instant::now();
92
93 if !file_path.exists() {
95 crate::Indexing::State::UpdateState::RemoveFileFromIndex(index, file_path)?;
97 dev_log!("indexing", "[UpdateIndex] Removed deleted file: {}", file_path.display());
98 return Ok(None);
99 }
100
101 let current_metadata = std::fs::metadata(file_path)
103 .map_err(|e| AirError::FileSystem(format!("Failed to get file metadata: {}", e)))?;
104
105 let current_modified = current_metadata
106 .modified()
107 .map_err(|e| AirError::FileSystem(format!("Failed to get modification time: {}", e)))?;
108
109 let _current_modified_time = chrono::DateTime::<chrono::Utc>::from(current_modified);
110
111 let needs_update = match index.files.get(file_path) {
113 Some(old_metadata) => {
114 let checksum = crate::Indexing::Scan::ScanFile::CalculateChecksum(
116 &tokio::fs::read(file_path).await.unwrap_or_default(),
117 );
118 old_metadata.checksum != checksum
119 },
120 None => {
121 true
123 },
124 };
125
126 if !needs_update {
127 dev_log!("indexing", "file unchanged: {}", file_path.display());
128 return Ok(index.files.get(file_path).cloned());
129 }
130
131 use crate::Indexing::{Scan::ScanFile::IndexFileInternal, State::UpdateState::UpdateIndexMetadata};
133
134 let (metadata, symbols) = IndexFileInternal(file_path, config, &[]).await?;
135
136 crate::Indexing::State::UpdateState::RemoveFileFromIndex(index, file_path)?;
138 crate::Indexing::State::UpdateState::AddFileToIndex(index, file_path.clone(), metadata.clone(), symbols)?;
139
140 UpdateIndexMetadata(index)?;
142
143 let elapsed = start_time.elapsed();
144 dev_log!(
145 "indexing",
146 "updated {} in {}ms ({} symbols)",
147 file_path.display(),
148 elapsed.as_millis(),
149 metadata.symbol_count
150 );
151
152 Ok(Some(metadata))
153}
154
155pub async fn UpdateFileContent(index:&mut FileIndex, file_path:&PathBuf, metadata:&FileMetadata) -> Result<()> {
157 if !metadata.mime_type.starts_with("text/") && !metadata.mime_type.contains("json") {
159 return Ok(());
160 }
161
162 let content = tokio::fs::read_to_string(file_path)
163 .await
164 .map_err(|e| AirError::FileSystem(format!("Failed to read file content: {}", e)))?;
165
166 for (_, files) in index.content_index.iter_mut() {
168 files.retain(|p| p != file_path);
169 }
170
171 let tokens = crate::Indexing::Process::ProcessContent::TokenizeContent(&content);
173
174 for token in tokens {
175 if token.len() > 2 {
176 index
178 .content_index
179 .entry(token)
180 .or_insert_with(Vec::new)
181 .push(file_path.clone());
182 }
183 }
184
185 Ok(())
186}
187
188pub async fn UpdateFilesBatch(
190 index:&mut FileIndex,
191 file_paths:Vec<PathBuf>,
192 config:&IndexingConfig,
193) -> Result<UpdateBatchResult> {
194 let start_time = Instant::now();
195 let mut updated_count = 0u32;
196 let mut removed_count = 0u32;
197 let mut error_count = 0u32;
198 let mut total_size = 0u64;
199
200 for file_path in file_paths {
201 match UpdateSingleFile(index, &file_path, config).await {
202 Ok(Some(metadata)) => {
203 updated_count += 1;
204 total_size += metadata.size;
205 },
206 Ok(None) => {
207 removed_count += 1;
208 },
209 Err(e) => {
210 dev_log!(
211 "indexing",
212 "warn: [UpdateIndex] Failed to update file {}: {}",
213 file_path.display(),
214 e
215 );
216 error_count += 1;
217 },
218 }
219 }
220
221 crate::Indexing::State::UpdateState::UpdateIndexMetadata(index)?;
223
224 Ok(UpdateBatchResult {
225 updated_count,
226 removed_count,
227 error_count,
228 total_size,
229 duration_seconds:start_time.elapsed().as_secs_f64(),
230 })
231}
232
233#[derive(Debug, Clone)]
235pub struct UpdateBatchResult {
236 pub updated_count:u32,
237 pub removed_count:u32,
238 pub error_count:u32,
239 pub total_size:u64,
240 pub duration_seconds:f64,
241}
242
243pub struct DebouncedUpdate {
245 file_path:PathBuf,
246 last_seen:Instant,
247 index:*const RwLock<FileIndex>,
248 config:IndexingConfig,
249 duration:Duration,
250 pending:bool,
251}
252
253unsafe impl Send for DebouncedUpdate {}
254
255impl DebouncedUpdate {
256 pub fn new(file_path:PathBuf, index:&RwLock<FileIndex>, config:&IndexingConfig, duration:Duration) -> Self {
257 Self {
258 file_path,
259 last_seen:Instant::now(),
260 index:index as *const RwLock<FileIndex>,
261 config:config.clone(),
262 duration,
263 pending:false,
264 }
265 }
266
267 pub async fn trigger(&mut self) {
268 self.last_seen = Instant::now();
269 self.pending = true;
270 }
271
272 pub async fn process_if_ready(&mut self) -> Result<bool> {
273 if !self.pending {
274 return Ok(false);
275 }
276
277 if self.last_seen.elapsed() >= self.duration {
278 self.pending = false;
279
280 let index_ref = unsafe { &*self.index };
282 let mut index = index_ref.write().await;
283
284 match UpdateSingleFile(&mut index, &self.file_path, &self.config).await {
285 Ok(_) => {
286 dev_log!(
287 "indexing",
288 "[UpdateIndex] Debounced update completed: {}",
289 self.file_path.display()
290 );
291 return Ok(true);
292 },
293 Err(e) => {
294 dev_log!("indexing", "warn: [UpdateIndex] Debounced update failed: {}", e);
295 return Err(e);
296 },
297 }
298 }
299
300 Ok(false)
301 }
302
303 pub fn clear_pending(&mut self) { self.pending = false; }
304}
305
306pub async fn ProcessWatcherEvent(
308 index:&mut FileIndex,
309 event:notify::Event,
310 config:&IndexingConfig,
311) -> Result<WatcherEventResult> {
312 let mut updated = 0u32;
313 let mut removed = 0u32;
314
315 for file_path in event.paths {
316 match event.kind {
317 notify::EventKind::Create(notify::event::CreateKind::File) => {
318 dev_log!("indexing", "[UpdateIndex] File created: {}", file_path.display());
319 if UpdateSingleFile(index, &file_path, config).await.is_ok() {
320 updated += 1;
321 }
322 },
323 notify::EventKind::Modify(notify::event::ModifyKind::Data(_))
324 | notify::EventKind::Modify(notify::event::ModifyKind::Name(notify::event::RenameMode::Both)) => {
325 dev_log!("indexing", "[UpdateIndex] File modified: {}", file_path.display());
326 if UpdateSingleFile(index, &file_path, config).await.is_ok() {
327 updated += 1;
328 }
329 },
330 notify::EventKind::Remove(notify::event::RemoveKind::File) => {
331 dev_log!("indexing", "[UpdateIndex] File removed: {}", file_path.display());
332 if super::super::State::UpdateState::RemoveFileFromIndex(index, &file_path).is_ok() {
333 removed += 1;
334 }
335 },
336 _ => {},
337 }
338 }
339
340 super::super::State::UpdateState::UpdateIndexMetadata(index)?;
342
343 Ok(WatcherEventResult { updated, removed })
344}
345
346#[derive(Debug, Clone)]
348pub struct WatcherEventResult {
349 pub updated:u32,
350 pub removed:u32,
351}
352
353pub async fn CleanupRemovedFiles(index:&mut FileIndex) -> Result<u32> {
355 let mut paths_to_remove = Vec::new();
356 let all_paths:Vec<_> = index.files.keys().cloned().collect();
357
358 for path in all_paths {
359 if !path.exists() {
360 paths_to_remove.push(path);
361 }
362 }
363
364 for path in &paths_to_remove {
365 super::super::State::UpdateState::RemoveFileFromIndex(index, path)?;
366 }
367
368 super::super::State::UpdateState::UpdateIndexMetadata(index)?;
370
371 dev_log!("indexing", "[UpdateIndex] Cleaned up {} removed files", paths_to_remove.len());
372 Ok(paths_to_remove.len() as u32)
373}
374
375pub async fn RebuildIndex(
377 index:&mut FileIndex,
378 directories:Vec<String>,
379 patterns:Vec<String>,
380 config:&IndexingConfig,
381) -> Result<UpdateBatchResult> {
382 let start_time = Instant::now();
383
384 index.files.clear();
386 index.content_index.clear();
387 index.symbol_index.clear();
388 index.file_symbols.clear();
389
390 let (files_to_index, scan_result) =
392 crate::Indexing::Scan::ScanDirectory::ScanDirectoriesParallel(directories, patterns, config, 10).await?;
393
394 let semaphore = Arc::new(Semaphore::new(config.MaxParallelIndexing as usize));
396 let index_arc = Arc::new(RwLock::new(index.clone()));
397 let mut tasks = Vec::new();
398
399 for file_path in files_to_index {
400 let permit = semaphore.clone().acquire_owned().await.unwrap();
401 let _index_ref = index_arc.clone();
403 let config_clone = config.clone();
404
405 let task = tokio::spawn(async move {
406 let _permit = permit;
407
408 crate::Indexing::Scan::ScanFile::IndexFileInternal(&file_path, &config_clone, &[]).await
409 });
410
411 tasks.push(task);
412 }
413
414 let mut updated_count = 0u32;
415 let mut total_size = 0u64;
416
417 for task in tasks {
418 match task.await {
419 Ok(Ok((metadata, symbols))) => {
420 let file_size = metadata.size;
421 super::super::State::UpdateState::AddFileToIndex(index, metadata.path.clone(), metadata, symbols)?;
422 updated_count += 1;
423 total_size += file_size;
424 },
425 Ok(Err(e)) => {
426 dev_log!("indexing", "warn: [UpdateIndex] Rebuild task failed: {}", e);
427 },
428 Err(e) => {
429 dev_log!("indexing", "warn: [UpdateIndex] Rebuild task join failed: {}", e);
430 },
431 }
432 }
433
434 super::super::State::UpdateState::UpdateIndexMetadata(index)?;
436
437 Ok(UpdateBatchResult {
438 updated_count,
439 removed_count:0,
440 error_count:scan_result.errors,
441 total_size,
442 duration_seconds:start_time.elapsed().as_secs_f64(),
443 })
444}
445
446pub async fn ValidateAndRepairIndex(index:&mut FileIndex) -> Result<RepairResult> {
448 let start_time = Instant::now();
449 let mut repaired_files = 0u32;
450 let removed_orphans;
451
452 match super::super::State::UpdateState::ValidateIndexConsistency(index) {
454 Ok(()) => {},
455 Err(e) => {
456 dev_log!("indexing", "warn: [UpdateIndex] Index validation failed: {}", e);
457 repaired_files += 1;
458 },
459 }
460
461 removed_orphans = super::super::State::UpdateState::CleanupOrphanedEntries(index)?;
463
464 super::super::State::UpdateState::UpdateIndexMetadata(index)?;
466
467 Ok(RepairResult {
468 repaired_files,
469 removed_orphans,
470 duration_seconds:start_time.elapsed().as_secs_f64(),
471 })
472}
473
474#[derive(Debug, Clone)]
476pub struct RepairResult {
477 pub repaired_files:u32,
478 pub removed_orphans:u32,
479 pub duration_seconds:f64,
480}