# src/implementations/document_service.py from pathlib import Path import shutil import os import uuid from typing import List, Tuple from fastapi import UploadFile, BackgroundTasks from ..vectorstores.chroma_vectorstore import ChromaVectorStore from ..utils.document_processor import DocumentProcessor from ..models import DocumentResponse, DocumentInfo, BatchUploadResponse from ..utils.logger import logger class DocumentService: def __init__(self, doc_processor: DocumentProcessor): self.doc_processor = doc_processor self.upload_dir = Path("temp_uploads") self.upload_dir.mkdir(exist_ok=True) async def process_documents( self, files: List[UploadFile], vector_store: ChromaVectorStore, background_tasks: BackgroundTasks ) -> BatchUploadResponse: """Process multiple document uploads""" processed_files, failed_files = await self._handle_file_uploads( files, vector_store, background_tasks ) return BatchUploadResponse( message=f"Processed {len(processed_files)} documents with {len(failed_files)} failures", processed_files=processed_files, failed_files=failed_files ) async def _handle_file_uploads( self, files: List[UploadFile], vector_store: ChromaVectorStore, background_tasks: BackgroundTasks ) -> Tuple[List[DocumentResponse], List[dict]]: """Handle individual file uploads and processing""" processed_files = [] failed_files = [] for file in files: try: if not self._is_supported_format(file.filename): failed_files.append(self._create_failed_file_entry( file.filename, "Unsupported file format" )) continue document_response = await self._process_single_file( file, vector_store, background_tasks ) processed_files.append(document_response) except Exception as e: logger.error(f"Error processing file {file.filename}: {str(e)}") failed_files.append(self._create_failed_file_entry( file.filename, str(e) )) return processed_files, failed_files def _is_supported_format(self, filename: str) -> bool: """Check if file format is supported""" return any(filename.lower().endswith(ext) for ext in self.doc_processor.supported_formats) async def _process_single_file( self, file: UploadFile, vector_store: ChromaVectorStore, background_tasks: BackgroundTasks ) -> DocumentResponse: """Process a single file upload""" document_id = str(uuid.uuid4()) temp_path = self.upload_dir / f"{document_id}_{file.filename}" # Save file with open(temp_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) # Add background task for processing background_tasks.add_task( self._process_and_store_document, temp_path, vector_store, document_id ) return DocumentResponse( message="Document queued for processing", document_id=document_id, status="processing", document_info=DocumentInfo( original_filename=file.filename, size=os.path.getsize(temp_path), content_type=file.content_type ) ) async def _process_and_store_document( self, file_path: Path, vector_store: ChromaVectorStore, document_id: str ): """Process document and store in vector database""" try: processed_doc = await self.doc_processor.process_document(file_path) vector_store.add_documents( documents=processed_doc['chunks'], metadatas=[{ 'document_id': document_id, 'chunk_id': i, 'source': str(file_path.name), 'metadata': processed_doc['metadata'] } for i in range(len(processed_doc['chunks']))], ids=[f"{document_id}_chunk_{i}" for i in range(len(processed_doc['chunks']))] ) return processed_doc finally: if file_path.exists(): file_path.unlink() def _create_failed_file_entry(self, filename: str, error: str) -> dict: """Create a failed file entry""" return { "filename": filename, "error": error } def cleanup(self): """Clean up upload directory""" if self.upload_dir.exists() and not any(self.upload_dir.iterdir()): self.upload_dir.rmdir()