hevold's picture
Upload 29 files
b34efa5 verified
"""
Document processor module for Norwegian RAG chatbot.
Orchestrates the document processing pipeline with remote embeddings.
"""
import os
import json
import numpy as np
from typing import List, Dict, Any, Optional, Tuple, Union
from datetime import datetime
from .extractor import TextExtractor
from .chunker import TextChunker
from ..api.huggingface_api import HuggingFaceAPI
from ..api.config import CHUNK_SIZE, CHUNK_OVERLAP
class DocumentProcessor:
"""
Orchestrates the document processing pipeline:
1. Extract text from documents
2. Split text into chunks
3. Generate embeddings using remote API
4. Store processed documents and embeddings
"""
def __init__(
self,
api_client: Optional[HuggingFaceAPI] = None,
documents_dir: str = "/home/ubuntu/chatbot_project/data/documents",
processed_dir: str = "/home/ubuntu/chatbot_project/data/processed",
chunk_size: int = CHUNK_SIZE,
chunk_overlap: int = CHUNK_OVERLAP,
chunking_strategy: str = "paragraph"
):
"""
Initialize the document processor.
Args:
api_client: HuggingFaceAPI client for generating embeddings
documents_dir: Directory for storing original documents
processed_dir: Directory for storing processed documents and embeddings
chunk_size: Maximum size of each chunk
chunk_overlap: Overlap between consecutive chunks
chunking_strategy: Strategy for chunking text ('fixed', 'paragraph', or 'sentence')
"""
self.api_client = api_client or HuggingFaceAPI()
self.documents_dir = documents_dir
self.processed_dir = processed_dir
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.chunking_strategy = chunking_strategy
# Ensure directories exist
os.makedirs(self.documents_dir, exist_ok=True)
os.makedirs(self.processed_dir, exist_ok=True)
# Initialize document index
self.document_index_path = os.path.join(self.processed_dir, "document_index.json")
self.document_index = self._load_document_index()
def process_document(
self,
file_path: str,
document_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None
) -> str:
"""
Process a document through the entire pipeline.
Args:
file_path: Path to the document file
document_id: Optional custom document ID
metadata: Optional metadata for the document
Returns:
Document ID
"""
# Generate document ID if not provided
if document_id is None:
document_id = f"doc_{datetime.now().strftime('%Y%m%d%H%M%S')}_{os.path.basename(file_path)}"
# Extract text from document
text = TextExtractor.extract_from_file(file_path)
if not text:
raise ValueError(f"Failed to extract text from {file_path}")
# Split text into chunks
chunks = TextChunker.chunk_text(
text,
chunk_size=self.chunk_size,
chunk_overlap=self.chunk_overlap,
strategy=self.chunking_strategy
)
# Clean chunks
chunks = [TextChunker.clean_chunk(chunk) for chunk in chunks]
# Generate embeddings using remote API
embeddings = self.api_client.generate_embeddings(chunks)
# Prepare metadata
if metadata is None:
metadata = {}
metadata.update({
"filename": os.path.basename(file_path),
"processed_date": datetime.now().isoformat(),
"chunk_count": len(chunks),
"chunking_strategy": self.chunking_strategy,
"embedding_model": self.api_client.embedding_model_id
})
# Save processed document
self._save_processed_document(document_id, chunks, embeddings, metadata)
# Update document index
self._update_document_index(document_id, metadata)
return document_id
def process_text(
self,
text: str,
document_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None
) -> str:
"""
Process text directly through the pipeline.
Args:
text: Text content to process
document_id: Optional custom document ID
metadata: Optional metadata for the document
Returns:
Document ID
"""
# Generate document ID if not provided
if document_id is None:
document_id = f"text_{datetime.now().strftime('%Y%m%d%H%M%S')}"
# Split text into chunks
chunks = TextChunker.chunk_text(
text,
chunk_size=self.chunk_size,
chunk_overlap=self.chunk_overlap,
strategy=self.chunking_strategy
)
# Clean chunks
chunks = [TextChunker.clean_chunk(chunk) for chunk in chunks]
# Generate embeddings using remote API
embeddings = self.api_client.generate_embeddings(chunks)
# Prepare metadata
if metadata is None:
metadata = {}
metadata.update({
"source": "direct_text",
"processed_date": datetime.now().isoformat(),
"chunk_count": len(chunks),
"chunking_strategy": self.chunking_strategy,
"embedding_model": self.api_client.embedding_model_id
})
# Save processed document
self._save_processed_document(document_id, chunks, embeddings, metadata)
# Update document index
self._update_document_index(document_id, metadata)
return document_id
def get_document_chunks(self, document_id: str) -> List[str]:
"""
Get all chunks for a document.
Args:
document_id: Document ID
Returns:
List of text chunks
"""
document_path = os.path.join(self.processed_dir, f"{document_id}.json")
if not os.path.exists(document_path):
raise FileNotFoundError(f"Document not found: {document_id}")
with open(document_path, 'r', encoding='utf-8') as f:
document_data = json.load(f)
return document_data.get("chunks", [])
def get_document_embeddings(self, document_id: str) -> List[List[float]]:
"""
Get all embeddings for a document.
Args:
document_id: Document ID
Returns:
List of embedding vectors
"""
document_path = os.path.join(self.processed_dir, f"{document_id}.json")
if not os.path.exists(document_path):
raise FileNotFoundError(f"Document not found: {document_id}")
with open(document_path, 'r', encoding='utf-8') as f:
document_data = json.load(f)
return document_data.get("embeddings", [])
def get_all_documents(self) -> Dict[str, Dict[str, Any]]:
"""
Get all documents in the index.
Returns:
Dictionary of document IDs to metadata
"""
return self.document_index
def delete_document(self, document_id: str) -> bool:
"""
Delete a document and its processed data.
Args:
document_id: Document ID
Returns:
True if successful, False otherwise
"""
if document_id not in self.document_index:
return False
# Remove from index
del self.document_index[document_id]
self._save_document_index()
# Delete processed file
document_path = os.path.join(self.processed_dir, f"{document_id}.json")
if os.path.exists(document_path):
os.remove(document_path)
return True
def _save_processed_document(
self,
document_id: str,
chunks: List[str],
embeddings: List[List[float]],
metadata: Dict[str, Any]
) -> None:
"""
Save processed document data.
Args:
document_id: Document ID
chunks: List of text chunks
embeddings: List of embedding vectors
metadata: Document metadata
"""
document_data = {
"document_id": document_id,
"metadata": metadata,
"chunks": chunks,
"embeddings": embeddings
}
document_path = os.path.join(self.processed_dir, f"{document_id}.json")
with open(document_path, 'w', encoding='utf-8') as f:
json.dump(document_data, f, ensure_ascii=False, indent=2)
def _load_document_index(self) -> Dict[str, Dict[str, Any]]:
"""
Load the document index from disk.
Returns:
Dictionary of document IDs to metadata
"""
if os.path.exists(self.document_index_path):
try:
with open(self.document_index_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"Error loading document index: {str(e)}")
return {}
def _save_document_index(self) -> None:
"""
Save the document index to disk.
"""
with open(self.document_index_path, 'w', encoding='utf-8') as f:
json.dump(self.document_index, f, ensure_ascii=False, indent=2)
def _update_document_index(self, document_id: str, metadata: Dict[str, Any]) -> None:
"""
Update the document index with a new or updated document.
Args:
document_id: Document ID
metadata: Document metadata
"""
self.document_index[document_id] = metadata
self._save_document_index()