|
"""
|
|
معالج ملفات PDF
|
|
"""
|
|
|
|
import os
|
|
import io
|
|
import re
|
|
import PyPDF2
|
|
import fitz
|
|
import pdfplumber
|
|
import numpy as np
|
|
from PIL import Image
|
|
import pytesseract
|
|
import pandas as pd
|
|
import traceback
|
|
|
|
from utils.helpers import create_directory_if_not_exists, extract_numbers_from_text
|
|
|
|
|
|
def extract_text_from_pdf(file_path, method='pymupdf'):
|
|
"""
|
|
استخراج النص من ملف PDF
|
|
|
|
المعلمات:
|
|
file_path: مسار ملف PDF
|
|
method: طريقة الاستخراج ('pymupdf', 'pypdf2', 'pdfplumber')
|
|
|
|
الإرجاع:
|
|
نص مستخرج من ملف PDF
|
|
"""
|
|
try:
|
|
|
|
if not os.path.exists(file_path):
|
|
raise FileNotFoundError(f"الملف غير موجود: {file_path}")
|
|
|
|
|
|
if method.lower() == 'pymupdf':
|
|
return _extract_text_with_pymupdf(file_path)
|
|
elif method.lower() == 'pypdf2':
|
|
return _extract_text_with_pypdf2(file_path)
|
|
elif method.lower() == 'pdfplumber':
|
|
return _extract_text_with_pdfplumber(file_path)
|
|
else:
|
|
|
|
return _extract_text_with_pymupdf(file_path)
|
|
|
|
except Exception as e:
|
|
error_msg = f"خطأ في استخراج النص من ملف PDF: {str(e)}"
|
|
print(error_msg)
|
|
traceback.print_exc()
|
|
raise Exception(error_msg)
|
|
|
|
|
|
def _extract_text_with_pymupdf(file_path):
|
|
"""استخراج النص باستخدام PyMuPDF"""
|
|
document = fitz.open(file_path)
|
|
text = ""
|
|
|
|
for page_number in range(len(document)):
|
|
page = document.load_page(page_number)
|
|
text += page.get_text("text") + "\n\n"
|
|
|
|
document.close()
|
|
return text
|
|
|
|
|
|
def _extract_text_with_pypdf2(file_path):
|
|
"""استخراج النص باستخدام PyPDF2"""
|
|
with open(file_path, 'rb') as file:
|
|
reader = PyPDF2.PdfReader(file)
|
|
text = ""
|
|
|
|
for page_number in range(len(reader.pages)):
|
|
page = reader.pages[page_number]
|
|
text += page.extract_text() + "\n\n"
|
|
|
|
return text
|
|
|
|
|
|
def _extract_text_with_pdfplumber(file_path):
|
|
"""استخراج النص باستخدام pdfplumber"""
|
|
with pdfplumber.open(file_path) as pdf:
|
|
text = ""
|
|
|
|
for page in pdf.pages:
|
|
text += page.extract_text() + "\n\n"
|
|
|
|
return text
|
|
|
|
|
|
def extract_tables_from_pdf(file_path, page_numbers=None):
|
|
"""
|
|
استخراج الجداول من ملف PDF
|
|
|
|
المعلمات:
|
|
file_path: مسار ملف PDF
|
|
page_numbers: قائمة بأرقام الصفحات للاستخراج منها (افتراضي: None لجميع الصفحات)
|
|
|
|
الإرجاع:
|
|
قائمة من DataFrames تمثل الجداول المستخرجة
|
|
"""
|
|
try:
|
|
|
|
if not os.path.exists(file_path):
|
|
raise FileNotFoundError(f"الملف غير موجود: {file_path}")
|
|
|
|
|
|
tables = []
|
|
|
|
with pdfplumber.open(file_path) as pdf:
|
|
|
|
if page_numbers is None:
|
|
pages_to_extract = range(len(pdf.pages))
|
|
else:
|
|
pages_to_extract = [p-1 for p in page_numbers if 1 <= p <= len(pdf.pages)]
|
|
|
|
|
|
for page_idx in pages_to_extract:
|
|
page = pdf.pages[page_idx]
|
|
page_tables = page.extract_tables()
|
|
|
|
if page_tables:
|
|
for table in page_tables:
|
|
if table:
|
|
|
|
df = pd.DataFrame(table[1:], columns=table[0])
|
|
|
|
|
|
df = df.applymap(lambda x: x.strip() if isinstance(x, str) else x)
|
|
|
|
|
|
tables.append(df)
|
|
|
|
return tables
|
|
|
|
except Exception as e:
|
|
error_msg = f"خطأ في استخراج الجداول من ملف PDF: {str(e)}"
|
|
print(error_msg)
|
|
traceback.print_exc()
|
|
raise Exception(error_msg)
|
|
|
|
|
|
def extract_images_from_pdf(file_path, output_dir=None, prefix='image'):
|
|
"""
|
|
استخراج الصور من ملف PDF
|
|
|
|
المعلمات:
|
|
file_path: مسار ملف PDF
|
|
output_dir: دليل الإخراج (افتراضي: None للإرجاع كقائمة من الصور)
|
|
prefix: بادئة أسماء ملفات الصور
|
|
|
|
الإرجاع:
|
|
قائمة من مسارات الصور المستخرجة إذا تم تحديد دليل الإخراج، وإلا قائمة من الصور
|
|
"""
|
|
try:
|
|
|
|
if not os.path.exists(file_path):
|
|
raise FileNotFoundError(f"الملف غير موجود: {file_path}")
|
|
|
|
|
|
if output_dir:
|
|
create_directory_if_not_exists(output_dir)
|
|
|
|
|
|
document = fitz.open(file_path)
|
|
images = []
|
|
image_paths = []
|
|
|
|
for page_idx in range(len(document)):
|
|
page = document.load_page(page_idx)
|
|
|
|
|
|
image_list = page.get_images(full=True)
|
|
|
|
for img_idx, img_info in enumerate(image_list):
|
|
xref = img_info[0]
|
|
base_image = document.extract_image(xref)
|
|
image_bytes = base_image["image"]
|
|
|
|
|
|
image = Image.open(io.BytesIO(image_bytes))
|
|
|
|
if output_dir:
|
|
|
|
image_filename = f"{prefix}_{page_idx+1}_{img_idx+1}.{base_image['ext']}"
|
|
image_path = os.path.join(output_dir, image_filename)
|
|
image.save(image_path)
|
|
image_paths.append(image_path)
|
|
else:
|
|
|
|
images.append(image)
|
|
|
|
document.close()
|
|
|
|
return image_paths if output_dir else images
|
|
|
|
except Exception as e:
|
|
error_msg = f"خطأ في استخراج الصور من ملف PDF: {str(e)}"
|
|
print(error_msg)
|
|
traceback.print_exc()
|
|
raise Exception(error_msg)
|
|
|
|
|
|
def extract_text_from_image(image, lang='ara+eng'):
|
|
"""
|
|
استخراج النص من صورة باستخدام OCR
|
|
|
|
المعلمات:
|
|
image: كائن الصورة أو مسار الصورة
|
|
lang: لغة النص (افتراضي: 'ara+eng' للعربية والإنجليزية)
|
|
|
|
الإرجاع:
|
|
النص المستخرج من الصورة
|
|
"""
|
|
try:
|
|
|
|
if isinstance(image, str):
|
|
image = Image.open(image)
|
|
|
|
|
|
text = pytesseract.image_to_string(image, lang=lang)
|
|
|
|
return text
|
|
|
|
except Exception as e:
|
|
error_msg = f"خطأ في استخراج النص من الصورة: {str(e)}"
|
|
print(error_msg)
|
|
traceback.print_exc()
|
|
return ""
|
|
|
|
|
|
def ocr_pdf(file_path, lang='ara+eng'):
|
|
"""
|
|
تنفيذ OCR على ملف PDF
|
|
|
|
المعلمات:
|
|
file_path: مسار ملف PDF
|
|
lang: لغة النص (افتراضي: 'ara+eng' للعربية والإنجليزية)
|
|
|
|
الإرجاع:
|
|
النص المستخرج من ملف PDF
|
|
"""
|
|
try:
|
|
|
|
if not os.path.exists(file_path):
|
|
raise FileNotFoundError(f"الملف غير موجود: {file_path}")
|
|
|
|
|
|
document = fitz.open(file_path)
|
|
text = ""
|
|
|
|
for page_idx in range(len(document)):
|
|
page = document.load_page(page_idx)
|
|
|
|
|
|
pix = page.get_pixmap(matrix=fitz.Matrix(300/72, 300/72))
|
|
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
|
|
|
|
|
|
page_text = extract_text_from_image(img, lang=lang)
|
|
text += page_text + "\n\n"
|
|
|
|
document.close()
|
|
|
|
return text
|
|
|
|
except Exception as e:
|
|
error_msg = f"خطأ في تنفيذ OCR على ملف PDF: {str(e)}"
|
|
print(error_msg)
|
|
traceback.print_exc()
|
|
raise Exception(error_msg)
|
|
|
|
|
|
def search_in_pdf(file_path, search_text):
|
|
"""
|
|
البحث عن نص في ملف PDF
|
|
|
|
المعلمات:
|
|
file_path: مسار ملف PDF
|
|
search_text: النص المراد البحث عنه
|
|
|
|
الإرجاع:
|
|
قائمة من النتائج {page_number, text_snippet, matched_text}
|
|
"""
|
|
try:
|
|
|
|
if not os.path.exists(file_path):
|
|
raise FileNotFoundError(f"الملف غير موجود: {file_path}")
|
|
|
|
|
|
document = fitz.open(file_path)
|
|
results = []
|
|
|
|
for page_idx in range(len(document)):
|
|
page = document.load_page(page_idx)
|
|
page_text = page.get_text("text")
|
|
|
|
|
|
if search_text.lower() in page_text.lower():
|
|
|
|
lines = page_text.split('\n')
|
|
for line in lines:
|
|
if search_text.lower() in line.lower():
|
|
results.append({
|
|
'page_number': page_idx + 1,
|
|
'text_snippet': line,
|
|
'matched_text': search_text
|
|
})
|
|
|
|
document.close()
|
|
|
|
return results
|
|
|
|
except Exception as e:
|
|
error_msg = f"خطأ في البحث في ملف PDF: {str(e)}"
|
|
print(error_msg)
|
|
traceback.print_exc()
|
|
raise Exception(error_msg)
|
|
|
|
|
|
def extract_quantities_from_pdf(file_path):
|
|
"""
|
|
استخراج الكميات من ملف PDF
|
|
|
|
المعلمات:
|
|
file_path: مسار ملف PDF
|
|
|
|
الإرجاع:
|
|
DataFrame يحتوي على البنود والكميات المستخرجة
|
|
"""
|
|
try:
|
|
|
|
text = extract_text_from_pdf(file_path)
|
|
tables = extract_tables_from_pdf(file_path)
|
|
|
|
quantities = []
|
|
|
|
|
|
for table in tables:
|
|
|
|
quantity_cols = [col for col in table.columns if any(term in col.lower() for term in ['كمية', 'عدد', 'الكمية'])]
|
|
unit_cols = [col for col in table.columns if any(term in col.lower() for term in ['وحدة', 'الوحدة'])]
|
|
item_cols = [col for col in table.columns if any(term in col.lower() for term in ['بند', 'وصف', 'البند', 'العمل'])]
|
|
|
|
if quantity_cols and (unit_cols or item_cols):
|
|
quantity_col = quantity_cols[0]
|
|
unit_col = unit_cols[0] if unit_cols else None
|
|
item_col = item_cols[0] if item_cols else None
|
|
|
|
|
|
for _, row in table.iterrows():
|
|
if pd.notna(row[quantity_col]) and (item_col is None or pd.notna(row[item_col])):
|
|
quantity_value = extract_numbers_from_text(row[quantity_col])
|
|
quantity = quantity_value[0] if quantity_value else None
|
|
|
|
quantities.append({
|
|
'البند': row[item_col] if item_col else "غير محدد",
|
|
'الوحدة': row[unit_col] if unit_col else "غير محدد",
|
|
'الكمية': quantity
|
|
})
|
|
|
|
|
|
lines = text.split('\n')
|
|
for line in lines:
|
|
|
|
if re.search(r'\d+(?:,\d+)*(?:\.\d+)?', line) and any(unit in line for unit in ['م2', 'م3', 'متر', 'طن', 'كجم', 'عدد']):
|
|
numbers = extract_numbers_from_text(line)
|
|
if numbers:
|
|
|
|
unit_match = re.search(r'\b(م2|م3|متر مربع|متر مكعب|م\.ط|طن|كجم|عدد|قطعة)\b', line)
|
|
unit = unit_match.group(1) if unit_match else "غير محدد"
|
|
|
|
quantities.append({
|
|
'البند': line,
|
|
'الوحدة': unit,
|
|
'الكمية': numbers[0]
|
|
})
|
|
|
|
|
|
if quantities:
|
|
quantities_df = pd.DataFrame(quantities)
|
|
return quantities_df
|
|
else:
|
|
return pd.DataFrame(columns=['البند', 'الوحدة', 'الكمية'])
|
|
|
|
except Exception as e:
|
|
error_msg = f"خطأ في استخراج الكميات من ملف PDF: {str(e)}"
|
|
print(error_msg)
|
|
traceback.print_exc()
|
|
raise Exception(error_msg)
|
|
|
|
|
|
def merge_pdfs(input_paths, output_path):
|
|
"""
|
|
دمج ملفات PDF متعددة في ملف واحد
|
|
|
|
المعلمات:
|
|
input_paths: قائمة من مسارات ملفات PDF المراد دمجها
|
|
output_path: مسار ملف PDF الناتج
|
|
|
|
الإرجاع:
|
|
True في حالة النجاح
|
|
"""
|
|
try:
|
|
|
|
for file_path in input_paths:
|
|
if not os.path.exists(file_path):
|
|
raise FileNotFoundError(f"الملف غير موجود: {file_path}")
|
|
|
|
|
|
output_dir = os.path.dirname(output_path)
|
|
create_directory_if_not_exists(output_dir)
|
|
|
|
|
|
merger = PyPDF2.PdfMerger()
|
|
|
|
for file_path in input_paths:
|
|
merger.append(file_path)
|
|
|
|
merger.write(output_path)
|
|
merger.close()
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
error_msg = f"خطأ في دمج ملفات PDF: {str(e)}"
|
|
print(error_msg)
|
|
traceback.print_exc()
|
|
raise Exception(error_msg)
|
|
|
|
|
|
def split_pdf(input_path, output_dir, prefix='page'):
|
|
"""
|
|
تقسيم ملف PDF إلى ملفات منفصلة لكل صفحة
|
|
|
|
المعلمات:
|
|
input_path: مسار ملف PDF المراد تقسيمه
|
|
output_dir: دليل الإخراج
|
|
prefix: بادئة أسماء ملفات الإخراج
|
|
|
|
الإرجاع:
|
|
قائمة من مسارات ملفات PDF الناتجة
|
|
"""
|
|
try:
|
|
|
|
if not os.path.exists(input_path):
|
|
raise FileNotFoundError(f"الملف غير موجود: {input_path}")
|
|
|
|
|
|
create_directory_if_not_exists(output_dir)
|
|
|
|
|
|
with open(input_path, 'rb') as file:
|
|
reader = PyPDF2.PdfReader(file)
|
|
output_files = []
|
|
|
|
|
|
for page_idx in range(len(reader.pages)):
|
|
writer = PyPDF2.PdfWriter()
|
|
writer.add_page(reader.pages[page_idx])
|
|
|
|
output_filename = f"{prefix}_{page_idx+1}.pdf"
|
|
output_path = os.path.join(output_dir, output_filename)
|
|
|
|
with open(output_path, 'wb') as output_file:
|
|
writer.write(output_file)
|
|
|
|
output_files.append(output_path)
|
|
|
|
return output_files
|
|
|
|
except Exception as e:
|
|
error_msg = f"خطأ في تقسيم ملف PDF: {str(e)}"
|
|
print(error_msg)
|
|
traceback.print_exc()
|
|
raise Exception(error_msg) |