|
""" |
|
Document processor module for Norwegian RAG chatbot. |
|
Orchestrates the document processing pipeline with remote embeddings. |
|
""" |
|
|
|
import os |
|
import json |
|
import numpy as np |
|
from typing import List, Dict, Any, Optional, Tuple, Union |
|
from datetime import datetime |
|
|
|
from .extractor import TextExtractor |
|
from .chunker import TextChunker |
|
from ..api.huggingface_api import HuggingFaceAPI |
|
from ..api.config import CHUNK_SIZE, CHUNK_OVERLAP |
|
|
|
class DocumentProcessor: |
|
""" |
|
Orchestrates the document processing pipeline: |
|
1. Extract text from documents |
|
2. Split text into chunks |
|
3. Generate embeddings using remote API |
|
4. Store processed documents and embeddings |
|
""" |
|
|
|
def __init__( |
|
self, |
|
api_client: Optional[HuggingFaceAPI] = None, |
|
documents_dir: str = "/home/ubuntu/chatbot_project/data/documents", |
|
processed_dir: str = "/home/ubuntu/chatbot_project/data/processed", |
|
chunk_size: int = CHUNK_SIZE, |
|
chunk_overlap: int = CHUNK_OVERLAP, |
|
chunking_strategy: str = "paragraph" |
|
): |
|
""" |
|
Initialize the document processor. |
|
|
|
Args: |
|
api_client: HuggingFaceAPI client for generating embeddings |
|
documents_dir: Directory for storing original documents |
|
processed_dir: Directory for storing processed documents and embeddings |
|
chunk_size: Maximum size of each chunk |
|
chunk_overlap: Overlap between consecutive chunks |
|
chunking_strategy: Strategy for chunking text ('fixed', 'paragraph', or 'sentence') |
|
""" |
|
self.api_client = api_client or HuggingFaceAPI() |
|
self.documents_dir = documents_dir |
|
self.processed_dir = processed_dir |
|
self.chunk_size = chunk_size |
|
self.chunk_overlap = chunk_overlap |
|
self.chunking_strategy = chunking_strategy |
|
|
|
|
|
os.makedirs(self.documents_dir, exist_ok=True) |
|
os.makedirs(self.processed_dir, exist_ok=True) |
|
|
|
|
|
self.document_index_path = os.path.join(self.processed_dir, "document_index.json") |
|
self.document_index = self._load_document_index() |
|
|
|
def process_document( |
|
self, |
|
file_path: str, |
|
document_id: Optional[str] = None, |
|
metadata: Optional[Dict[str, Any]] = None |
|
) -> str: |
|
""" |
|
Process a document through the entire pipeline. |
|
|
|
Args: |
|
file_path: Path to the document file |
|
document_id: Optional custom document ID |
|
metadata: Optional metadata for the document |
|
|
|
Returns: |
|
Document ID |
|
""" |
|
|
|
if document_id is None: |
|
document_id = f"doc_{datetime.now().strftime('%Y%m%d%H%M%S')}_{os.path.basename(file_path)}" |
|
|
|
|
|
text = TextExtractor.extract_from_file(file_path) |
|
if not text: |
|
raise ValueError(f"Failed to extract text from {file_path}") |
|
|
|
|
|
chunks = TextChunker.chunk_text( |
|
text, |
|
chunk_size=self.chunk_size, |
|
chunk_overlap=self.chunk_overlap, |
|
strategy=self.chunking_strategy |
|
) |
|
|
|
|
|
chunks = [TextChunker.clean_chunk(chunk) for chunk in chunks] |
|
|
|
|
|
embeddings = self.api_client.generate_embeddings(chunks) |
|
|
|
|
|
if metadata is None: |
|
metadata = {} |
|
|
|
metadata.update({ |
|
"filename": os.path.basename(file_path), |
|
"processed_date": datetime.now().isoformat(), |
|
"chunk_count": len(chunks), |
|
"chunking_strategy": self.chunking_strategy, |
|
"embedding_model": self.api_client.embedding_model_id |
|
}) |
|
|
|
|
|
self._save_processed_document(document_id, chunks, embeddings, metadata) |
|
|
|
|
|
self._update_document_index(document_id, metadata) |
|
|
|
return document_id |
|
|
|
def process_text( |
|
self, |
|
text: str, |
|
document_id: Optional[str] = None, |
|
metadata: Optional[Dict[str, Any]] = None |
|
) -> str: |
|
""" |
|
Process text directly through the pipeline. |
|
|
|
Args: |
|
text: Text content to process |
|
document_id: Optional custom document ID |
|
metadata: Optional metadata for the document |
|
|
|
Returns: |
|
Document ID |
|
""" |
|
|
|
if document_id is None: |
|
document_id = f"text_{datetime.now().strftime('%Y%m%d%H%M%S')}" |
|
|
|
|
|
chunks = TextChunker.chunk_text( |
|
text, |
|
chunk_size=self.chunk_size, |
|
chunk_overlap=self.chunk_overlap, |
|
strategy=self.chunking_strategy |
|
) |
|
|
|
|
|
chunks = [TextChunker.clean_chunk(chunk) for chunk in chunks] |
|
|
|
|
|
embeddings = self.api_client.generate_embeddings(chunks) |
|
|
|
|
|
if metadata is None: |
|
metadata = {} |
|
|
|
metadata.update({ |
|
"source": "direct_text", |
|
"processed_date": datetime.now().isoformat(), |
|
"chunk_count": len(chunks), |
|
"chunking_strategy": self.chunking_strategy, |
|
"embedding_model": self.api_client.embedding_model_id |
|
}) |
|
|
|
|
|
self._save_processed_document(document_id, chunks, embeddings, metadata) |
|
|
|
|
|
self._update_document_index(document_id, metadata) |
|
|
|
return document_id |
|
|
|
def get_document_chunks(self, document_id: str) -> List[str]: |
|
""" |
|
Get all chunks for a document. |
|
|
|
Args: |
|
document_id: Document ID |
|
|
|
Returns: |
|
List of text chunks |
|
""" |
|
document_path = os.path.join(self.processed_dir, f"{document_id}.json") |
|
if not os.path.exists(document_path): |
|
raise FileNotFoundError(f"Document not found: {document_id}") |
|
|
|
with open(document_path, 'r', encoding='utf-8') as f: |
|
document_data = json.load(f) |
|
|
|
return document_data.get("chunks", []) |
|
|
|
def get_document_embeddings(self, document_id: str) -> List[List[float]]: |
|
""" |
|
Get all embeddings for a document. |
|
|
|
Args: |
|
document_id: Document ID |
|
|
|
Returns: |
|
List of embedding vectors |
|
""" |
|
document_path = os.path.join(self.processed_dir, f"{document_id}.json") |
|
if not os.path.exists(document_path): |
|
raise FileNotFoundError(f"Document not found: {document_id}") |
|
|
|
with open(document_path, 'r', encoding='utf-8') as f: |
|
document_data = json.load(f) |
|
|
|
return document_data.get("embeddings", []) |
|
|
|
def get_all_documents(self) -> Dict[str, Dict[str, Any]]: |
|
""" |
|
Get all documents in the index. |
|
|
|
Returns: |
|
Dictionary of document IDs to metadata |
|
""" |
|
return self.document_index |
|
|
|
def delete_document(self, document_id: str) -> bool: |
|
""" |
|
Delete a document and its processed data. |
|
|
|
Args: |
|
document_id: Document ID |
|
|
|
Returns: |
|
True if successful, False otherwise |
|
""" |
|
if document_id not in self.document_index: |
|
return False |
|
|
|
|
|
del self.document_index[document_id] |
|
self._save_document_index() |
|
|
|
|
|
document_path = os.path.join(self.processed_dir, f"{document_id}.json") |
|
if os.path.exists(document_path): |
|
os.remove(document_path) |
|
|
|
return True |
|
|
|
def _save_processed_document( |
|
self, |
|
document_id: str, |
|
chunks: List[str], |
|
embeddings: List[List[float]], |
|
metadata: Dict[str, Any] |
|
) -> None: |
|
""" |
|
Save processed document data. |
|
|
|
Args: |
|
document_id: Document ID |
|
chunks: List of text chunks |
|
embeddings: List of embedding vectors |
|
metadata: Document metadata |
|
""" |
|
document_data = { |
|
"document_id": document_id, |
|
"metadata": metadata, |
|
"chunks": chunks, |
|
"embeddings": embeddings |
|
} |
|
|
|
document_path = os.path.join(self.processed_dir, f"{document_id}.json") |
|
with open(document_path, 'w', encoding='utf-8') as f: |
|
json.dump(document_data, f, ensure_ascii=False, indent=2) |
|
|
|
def _load_document_index(self) -> Dict[str, Dict[str, Any]]: |
|
""" |
|
Load the document index from disk. |
|
|
|
Returns: |
|
Dictionary of document IDs to metadata |
|
""" |
|
if os.path.exists(self.document_index_path): |
|
try: |
|
with open(self.document_index_path, 'r', encoding='utf-8') as f: |
|
return json.load(f) |
|
except Exception as e: |
|
print(f"Error loading document index: {str(e)}") |
|
|
|
return {} |
|
|
|
def _save_document_index(self) -> None: |
|
""" |
|
Save the document index to disk. |
|
""" |
|
with open(self.document_index_path, 'w', encoding='utf-8') as f: |
|
json.dump(self.document_index, f, ensure_ascii=False, indent=2) |
|
|
|
def _update_document_index(self, document_id: str, metadata: Dict[str, Any]) -> None: |
|
""" |
|
Update the document index with a new or updated document. |
|
|
|
Args: |
|
document_id: Document ID |
|
metadata: Document metadata |
|
""" |
|
self.document_index[document_id] = metadata |
|
self._save_document_index() |
|
|