RAG-retrieval / utils /file_reader.py
yasirme's picture
init
407c79e
import io
import pdfplumber
import pandas as pd
import json
from docx import Document
from openpyxl import load_workbook
import re
import uuid
class FileReader:
def __init__(self):
self.allowed_files = ["txt", "pdf", "docx", "md", "json", "csv", "xlsx", "xls"]
self.max_chars_per_file = 5000000
def calc_chars(self, files, allowed_chars):
total_chars = 0
clean_contents = []
for file in files:
file_extension = file.filename.split('.')[-1].lower()
if file_extension not in self.allowed_files:
return {"error": "unsupported file type uploaded"}, 400
try:
if file_extension == 'txt' or file_extension=="md":
text = self._read_txt(file)
elif file_extension == 'pdf':
text = self._read_pdf(file)
elif file_extension == 'docx':
text = self._read_docx(file)
elif file_extension == 'json':
text = self._read_json(file)
elif file_extension == 'csv':
text = self._read_csv(file)
elif file_extension in ['xlsx', 'xls']:
text = self._read_excel(file)
if(len(text)>self.max_chars_per_file):
return {"error": "max 5 million characters per file allowed."} , 400
clean_contents.append({
"type": file_extension,
"content": text,
"name": file.filename,
"id": str(uuid.uuid4()),
"total_chars": len(text)
})
total_chars += len(text)
if(total_chars>int(allowed_chars)):
return {"error": "Total allowed characters limit reached"}, 400
except Exception as e:
return {"error": f"Error reading file {file.filename}: {e}"}, 500
return {"total_chars": total_chars, "clean_contents": clean_contents}, 200
def _read_txt(self, file):
file_content = file.read().decode("utf-8")
return self._clean_text(file_content)
def _read_pdf(self, file):
with pdfplumber.open(file) as pdf:
text = ''
for page in pdf.pages:
text += page.extract_text() or ''
return self._clean_text(text)
def _read_docx(self, file):
doc = Document(file)
text = ''
for para in doc.paragraphs:
text += para.text + "\n"
return self._clean_text(text)
def _read_json(self, file):
content = json.load(file)
text = json.dumps(content, ensure_ascii=False)
return self._clean_text(text)
def _read_csv(self, file):
df = pd.read_csv(file)
text = df.to_string(index=False)
return self._clean_text(text)
def _read_excel(self, file):
wb = load_workbook(file)
text = ''
for sheet in wb.sheetnames:
ws = wb[sheet]
for row in ws.iter_rows(values_only=True):
text += ' | '.join(str(cell) if cell is not None else '' for cell in row) + "\n"
return self._clean_text(text)
def _clean_text(self, text):
text = re.sub(r'\s+', ' ', text)
text = re.sub(r'[^\x00-\x7F]+', '', text)
text = text.strip()
return text
file_reader = FileReader()