""" Document processor module for Norwegian RAG chatbot. Orchestrates the document processing pipeline with remote embeddings. """ import os import json import numpy as np from typing import List, Dict, Any, Optional, Tuple, Union from datetime import datetime from .extractor import TextExtractor from .chunker import TextChunker from ..api.huggingface_api import HuggingFaceAPI from ..api.config import CHUNK_SIZE, CHUNK_OVERLAP class DocumentProcessor: """ Orchestrates the document processing pipeline: 1. Extract text from documents 2. Split text into chunks 3. Generate embeddings using remote API 4. Store processed documents and embeddings """ def __init__( self, api_client: Optional[HuggingFaceAPI] = None, documents_dir: str = "/home/ubuntu/chatbot_project/data/documents", processed_dir: str = "/home/ubuntu/chatbot_project/data/processed", chunk_size: int = CHUNK_SIZE, chunk_overlap: int = CHUNK_OVERLAP, chunking_strategy: str = "paragraph" ): """ Initialize the document processor. Args: api_client: HuggingFaceAPI client for generating embeddings documents_dir: Directory for storing original documents processed_dir: Directory for storing processed documents and embeddings chunk_size: Maximum size of each chunk chunk_overlap: Overlap between consecutive chunks chunking_strategy: Strategy for chunking text ('fixed', 'paragraph', or 'sentence') """ self.api_client = api_client or HuggingFaceAPI() self.documents_dir = documents_dir self.processed_dir = processed_dir self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap self.chunking_strategy = chunking_strategy # Ensure directories exist os.makedirs(self.documents_dir, exist_ok=True) os.makedirs(self.processed_dir, exist_ok=True) # Initialize document index self.document_index_path = os.path.join(self.processed_dir, "document_index.json") self.document_index = self._load_document_index() def process_document( self, file_path: str, document_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None ) -> str: """ Process a document through the entire pipeline. Args: file_path: Path to the document file document_id: Optional custom document ID metadata: Optional metadata for the document Returns: Document ID """ # Generate document ID if not provided if document_id is None: document_id = f"doc_{datetime.now().strftime('%Y%m%d%H%M%S')}_{os.path.basename(file_path)}" # Extract text from document text = TextExtractor.extract_from_file(file_path) if not text: raise ValueError(f"Failed to extract text from {file_path}") # Split text into chunks chunks = TextChunker.chunk_text( text, chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap, strategy=self.chunking_strategy ) # Clean chunks chunks = [TextChunker.clean_chunk(chunk) for chunk in chunks] # Generate embeddings using remote API embeddings = self.api_client.generate_embeddings(chunks) # Prepare metadata if metadata is None: metadata = {} metadata.update({ "filename": os.path.basename(file_path), "processed_date": datetime.now().isoformat(), "chunk_count": len(chunks), "chunking_strategy": self.chunking_strategy, "embedding_model": self.api_client.embedding_model_id }) # Save processed document self._save_processed_document(document_id, chunks, embeddings, metadata) # Update document index self._update_document_index(document_id, metadata) return document_id def process_text( self, text: str, document_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None ) -> str: """ Process text directly through the pipeline. Args: text: Text content to process document_id: Optional custom document ID metadata: Optional metadata for the document Returns: Document ID """ # Generate document ID if not provided if document_id is None: document_id = f"text_{datetime.now().strftime('%Y%m%d%H%M%S')}" # Split text into chunks chunks = TextChunker.chunk_text( text, chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap, strategy=self.chunking_strategy ) # Clean chunks chunks = [TextChunker.clean_chunk(chunk) for chunk in chunks] # Generate embeddings using remote API embeddings = self.api_client.generate_embeddings(chunks) # Prepare metadata if metadata is None: metadata = {} metadata.update({ "source": "direct_text", "processed_date": datetime.now().isoformat(), "chunk_count": len(chunks), "chunking_strategy": self.chunking_strategy, "embedding_model": self.api_client.embedding_model_id }) # Save processed document self._save_processed_document(document_id, chunks, embeddings, metadata) # Update document index self._update_document_index(document_id, metadata) return document_id def get_document_chunks(self, document_id: str) -> List[str]: """ Get all chunks for a document. Args: document_id: Document ID Returns: List of text chunks """ document_path = os.path.join(self.processed_dir, f"{document_id}.json") if not os.path.exists(document_path): raise FileNotFoundError(f"Document not found: {document_id}") with open(document_path, 'r', encoding='utf-8') as f: document_data = json.load(f) return document_data.get("chunks", []) def get_document_embeddings(self, document_id: str) -> List[List[float]]: """ Get all embeddings for a document. Args: document_id: Document ID Returns: List of embedding vectors """ document_path = os.path.join(self.processed_dir, f"{document_id}.json") if not os.path.exists(document_path): raise FileNotFoundError(f"Document not found: {document_id}") with open(document_path, 'r', encoding='utf-8') as f: document_data = json.load(f) return document_data.get("embeddings", []) def get_all_documents(self) -> Dict[str, Dict[str, Any]]: """ Get all documents in the index. Returns: Dictionary of document IDs to metadata """ return self.document_index def delete_document(self, document_id: str) -> bool: """ Delete a document and its processed data. Args: document_id: Document ID Returns: True if successful, False otherwise """ if document_id not in self.document_index: return False # Remove from index del self.document_index[document_id] self._save_document_index() # Delete processed file document_path = os.path.join(self.processed_dir, f"{document_id}.json") if os.path.exists(document_path): os.remove(document_path) return True def _save_processed_document( self, document_id: str, chunks: List[str], embeddings: List[List[float]], metadata: Dict[str, Any] ) -> None: """ Save processed document data. Args: document_id: Document ID chunks: List of text chunks embeddings: List of embedding vectors metadata: Document metadata """ document_data = { "document_id": document_id, "metadata": metadata, "chunks": chunks, "embeddings": embeddings } document_path = os.path.join(self.processed_dir, f"{document_id}.json") with open(document_path, 'w', encoding='utf-8') as f: json.dump(document_data, f, ensure_ascii=False, indent=2) def _load_document_index(self) -> Dict[str, Dict[str, Any]]: """ Load the document index from disk. Returns: Dictionary of document IDs to metadata """ if os.path.exists(self.document_index_path): try: with open(self.document_index_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: print(f"Error loading document index: {str(e)}") return {} def _save_document_index(self) -> None: """ Save the document index to disk. """ with open(self.document_index_path, 'w', encoding='utf-8') as f: json.dump(self.document_index, f, ensure_ascii=False, indent=2) def _update_document_index(self, document_id: str, metadata: Dict[str, Any]) -> None: """ Update the document index with a new or updated document. Args: document_id: Document ID metadata: Document metadata """ self.document_index[document_id] = metadata self._save_document_index()