Spaces:
Sleeping
Sleeping
import os | |
from typing import List | |
import PyPDF2 | |
import concurrent.futures | |
import logging | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
class TextFileLoader: | |
def __init__(self, path: str, encoding: str = "utf-8"): | |
self.documents = [] | |
self.path = path | |
self.encoding = encoding | |
def load(self): | |
if os.path.isdir(self.path): | |
self.load_directory() | |
elif os.path.isfile(self.path) and self.path.endswith(".txt"): | |
self.load_file() | |
else: | |
raise ValueError( | |
"Provided path is neither a valid directory nor a .txt file." | |
) | |
def load_file(self): | |
with open(self.path, "r", encoding=self.encoding) as f: | |
self.documents.append(f.read()) | |
def load_directory(self): | |
for root, _, files in os.walk(self.path): | |
for file in files: | |
if file.endswith(".txt"): | |
with open( | |
os.path.join(root, file), "r", encoding=self.encoding | |
) as f: | |
self.documents.append(f.read()) | |
def load_documents(self): | |
self.load() | |
return self.documents | |
class CharacterTextSplitter: | |
def __init__( | |
self, | |
chunk_size: int = 1000, | |
chunk_overlap: int = 200, | |
max_workers: int = 4 | |
): | |
assert ( | |
chunk_size > chunk_overlap | |
), "Chunk size must be greater than chunk overlap" | |
self.chunk_size = chunk_size | |
self.chunk_overlap = chunk_overlap | |
self.max_workers = max_workers | |
def split(self, text: str) -> List[str]: | |
chunks = [] | |
for i in range(0, len(text), self.chunk_size - self.chunk_overlap): | |
chunks.append(text[i : i + self.chunk_size]) | |
return chunks | |
def split_texts(self, texts: List[str]) -> List[str]: | |
logger.info(f"Splitting {len(texts)} texts in parallel with {self.max_workers} workers") | |
chunks = [] | |
# Use parallel processing if there are multiple texts or large single text | |
if len(texts) > 1 or (len(texts) == 1 and len(texts[0]) > 50000): | |
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
# Map the split function to the list of texts | |
future_to_text = {executor.submit(self.split, text): text for text in texts} | |
# Collect results as they complete | |
for future in concurrent.futures.as_completed(future_to_text): | |
try: | |
text_chunks = future.result() | |
chunks.extend(text_chunks) | |
logger.info(f"Processed text chunk batch: {len(text_chunks)} chunks") | |
except Exception as e: | |
logger.error(f"Error processing text chunk: {str(e)}") | |
else: | |
# For small amounts of text, process sequentially | |
for text in texts: | |
chunks.extend(self.split(text)) | |
logger.info(f"Completed splitting texts into {len(chunks)} chunks") | |
return chunks | |
class PDFLoader: | |
def __init__(self, path: str): | |
self.documents = [] | |
self.path = path | |
print(f"PDFLoader initialized with path: {self.path}") | |
def load(self): | |
print(f"Loading PDF from path: {self.path}") | |
print(f"Path exists: {os.path.exists(self.path)}") | |
print(f"Is file: {os.path.isfile(self.path)}") | |
print(f"Is directory: {os.path.isdir(self.path)}") | |
print(f"File permissions: {oct(os.stat(self.path).st_mode)[-3:]}") | |
try: | |
# Try to open the file first to verify access | |
with open(self.path, 'rb') as test_file: | |
pass | |
# If we can open it, proceed with loading | |
self.load_file() | |
except IOError as e: | |
raise ValueError(f"Cannot access file at '{self.path}': {str(e)}") | |
except Exception as e: | |
raise ValueError(f"Error processing file at '{self.path}': {str(e)}") | |
def load_file(self): | |
with open(self.path, 'rb') as file: | |
# Create PDF reader object | |
pdf_reader = PyPDF2.PdfReader(file) | |
# Extract text from each page | |
text = "" | |
for page in pdf_reader.pages: | |
text += page.extract_text() + "\n" | |
self.documents.append(text) | |
def load_directory(self): | |
for root, _, files in os.walk(self.path): | |
for file in files: | |
if file.lower().endswith('.pdf'): | |
file_path = os.path.join(root, file) | |
with open(file_path, 'rb') as f: | |
pdf_reader = PyPDF2.PdfReader(f) | |
# Extract text from each page | |
text = "" | |
for page in pdf_reader.pages: | |
text += page.extract_text() + "\n" | |
self.documents.append(text) | |
def load_documents(self): | |
self.load() | |
return self.documents | |
if __name__ == "__main__": | |
loader = TextFileLoader("data/KingLear.txt") | |
loader.load() | |
splitter = CharacterTextSplitter() | |
chunks = splitter.split_texts(loader.documents) | |
print(len(chunks)) | |
print(chunks[0]) | |
print("--------") | |
print(chunks[1]) | |
print("--------") | |
print(chunks[-2]) | |
print("--------") | |
print(chunks[-1]) | |