Spaces:
Sleeping
Sleeping
# -*- coding: utf-8 -*- | |
""" | |
وحدة تطبيق تحليل المستندات | |
هذا الملف يحتوي على الفئة الرئيسية لتطبيق تحليل المستندات. | |
""" | |
# استيراد المكتبات القياسية | |
import os | |
import sys | |
import logging | |
import base64 | |
import json | |
import time | |
from io import BytesIO | |
from pathlib import Path | |
from urllib.parse import urlparse | |
from tempfile import NamedTemporaryFile | |
# استيراد مكتبة Streamlit | |
import streamlit as st | |
# استيراد المكتبات الإضافية | |
import requests | |
from PIL import Image | |
try: | |
# استيراد مكتبات Docling و MLX VLM | |
from docling_core.types.doc import ImageRefMode | |
from docling_core.types.doc.document import DocTagsDocument, DoclingDocument | |
from mlx_vlm import load, generate | |
from mlx_vlm.prompt_utils import apply_chat_template | |
from mlx_vlm.utils import load_config, stream_generate | |
docling_available = True | |
except ImportError: | |
docling_available = False | |
logging.warning("لم يتم العثور على مكتبات Docling و MLX VLM. بعض الوظائف قد لا تعمل.") | |
try: | |
# استيراد مكتبة pdf2image للتعامل مع ملفات PDF | |
from pdf2image import convert_from_path | |
pdf_conversion_available = True | |
except ImportError: | |
pdf_conversion_available = False | |
logging.warning("لم يتم العثور على مكتبة pdf2image. لن يمكن تحويل ملفات PDF إلى صور.") | |
# إعداد المسار للوحدات النمطية | |
current_dir = os.path.dirname(os.path.abspath(__file__)) | |
parent_dir = os.path.dirname(os.path.dirname(current_dir)) | |
if parent_dir not in sys.path: | |
sys.path.append(parent_dir) | |
# استيراد الخدمات باستخدام المسار النسبي | |
try: | |
# الطريقة 1: استيراد نسبي مباشر | |
from .services.text_extractor import TextExtractor | |
from .services.item_extractor import ItemExtractor | |
from .services.document_parser import DocumentParser | |
except ImportError: | |
try: | |
# الطريقة 2: استيراد مطلق | |
from modules.document_analysis.services.text_extractor import TextExtractor | |
from modules.document_analysis.services.item_extractor import ItemExtractor | |
from modules.document_analysis.services.document_parser import DocumentParser | |
except ImportError: | |
# الطريقة 3: تعريف الفئات مباشرة كحل مؤقت | |
logging.warning("لا يمكن استيراد خدمات تحليل المستندات. استخدام التعريفات المؤقتة.") | |
class TextExtractor: | |
def __init__(self, config=None): | |
self.config = config or {} | |
def extract_from_pdf(self, file_path): | |
return "نص مستخرج مؤقت من PDF" | |
def extract_from_docx(self, file_path): | |
return "نص مستخرج مؤقت من DOCX" | |
def extract_from_image(self, file_path): | |
return "نص مستخرج مؤقت من صورة" | |
def extract(self, file_path): | |
_, ext = os.path.splitext(file_path) | |
ext = ext.lower() | |
if ext == '.pdf': | |
return self.extract_from_pdf(file_path) | |
elif ext in ('.doc', '.docx'): | |
return self.extract_from_docx(file_path) | |
elif ext in ('.jpg', '.jpeg', '.png'): | |
return self.extract_from_image(file_path) | |
else: | |
return "نوع ملف غير مدعوم" | |
class ItemExtractor: | |
def __init__(self, config=None): | |
self.config = config or {} | |
def extract_tables(self, document): | |
return [{"عنوان": "جدول مؤقت", "بيانات": []}] | |
def extract(self, file_path): | |
return [ | |
{"بند": "بند مؤقت 1", "قيمة": 1000}, | |
{"بند": "بند مؤقت 2", "قيمة": 2000}, | |
{"بند": "بند مؤقت 3", "قيمة": 3000} | |
] | |
class DocumentParser: | |
def __init__(self, config=None): | |
self.config = config or {} | |
def parse_document(self, file_path): | |
return {"نوع": "مستند مؤقت", "محتوى": "محتوى مؤقت"} | |
def parse(self, file_path): | |
return { | |
"نوع المستند": "مستند مؤقت", | |
"عدد الصفحات": 5, | |
"تاريخ التحليل": "2025-03-24", | |
"درجة الثقة": "80%", | |
"ملاحظات": "تحليل مؤقت للمستند" | |
} | |
class DoclingAnalyzer: | |
""" | |
فئة لتحليل المستندات باستخدام نماذج Docling و MLX VLM | |
""" | |
def __init__(self): | |
self.model = None | |
self.processor = None | |
self.config = None | |
self.docling_available = False | |
try: | |
# تحميل النموذج | |
import os | |
from mlx_vlm import load, generate | |
from mlx_vlm.utils import load_config | |
model_path = "ds4sd/SmolDocling-256M-preview-mlx-bf16" | |
self.model, self.processor = load(model_path) | |
self.config = load_config(model_path) | |
self.docling_available = True | |
except Exception as e: | |
print(f"خطأ في تحميل نموذج Docling: {str(e)}") | |
self.docling_available = False | |
def is_available(self): | |
"""التحقق من توفر نماذج Docling""" | |
return self.docling_available and self.model is not None | |
def analyze_image(self, image_path=None, image_url=None, image_bytes=None, prompt="Convert this page to docling."): | |
""" | |
تحليل صورة باستخدام نموذج Docling | |
المعلمات: | |
image_path (str): مسار الصورة المحلية (اختياري) | |
image_url (str): رابط الصورة (اختياري) | |
image_bytes (bytes): بيانات الصورة (اختياري) | |
prompt (str): التوجيه للنموذج | |
العوائد: | |
dict: نتائج التحليل متضمنة النص والعلامات والمستند | |
""" | |
if not self.is_available(): | |
return { | |
"error": "Docling غير متوفر. يرجى تثبيت المكتبات المطلوبة." | |
} | |
try: | |
from io import BytesIO | |
from pathlib import Path | |
from urllib.parse import urlparse | |
import requests | |
from PIL import Image | |
from docling_core.types.doc import ImageRefMode | |
from docling_core.types.doc.document import DocTagsDocument, DoclingDocument | |
from mlx_vlm.prompt_utils import apply_chat_template | |
from mlx_vlm.utils import stream_generate, load_image | |
# تحميل الصورة | |
pil_image = None | |
image_source = None | |
if image_url: | |
try: | |
response = requests.get(image_url, stream=True, timeout=10) | |
response.raise_for_status() | |
pil_image = Image.open(BytesIO(response.content)) | |
image_source = image_url | |
except Exception as e: | |
return {"error": f"فشل في تحميل الصورة من الرابط: {str(e)}"} | |
elif image_path: | |
try: | |
# التأكد من وجود الملف | |
if not Path(image_path).exists(): | |
return {"error": f"ملف الصورة غير موجود: {image_path}"} | |
pil_image = Image.open(image_path) | |
image_source = image_path | |
except Exception as e: | |
return {"error": f"فشل في فتح ملف الصورة: {str(e)}"} | |
elif image_bytes: | |
try: | |
pil_image = Image.open(BytesIO(image_bytes)) | |
# حفظ الصورة مؤقتا للتحليل | |
temp_path = "/tmp/temp_image.jpg" | |
pil_image.save(temp_path) | |
image_source = temp_path | |
except Exception as e: | |
return {"error": f"فشل في معالجة بيانات الصورة: {str(e)}"} | |
else: | |
return {"error": "يجب توفير مصدر للصورة (مسار، رابط، أو بيانات)"} | |
# تطبيق قالب المحادثة | |
formatted_prompt = apply_chat_template(self.processor, self.config, prompt, num_images=1) | |
# إنشاء النتيجة | |
output = "" | |
# تمرير مسار الصورة أو عنوان URL الفعلي | |
try: | |
for token in stream_generate( | |
self.model, self.processor, formatted_prompt, [image_source], | |
max_tokens=4096, verbose=False | |
): | |
output += token.text | |
if "</doctag>" in token.text: | |
break | |
except Exception as e: | |
return {"error": f"فشل في تحليل الصورة: {str(e)}"} | |
# إنشاء مستند Docling | |
try: | |
doctags_doc = DocTagsDocument.from_doctags_and_image_pairs([output], [pil_image]) | |
doc = DoclingDocument(name="AnalyzedDocument") | |
doc.load_from_doctags(doctags_doc) | |
# إرجاع النتائج | |
return { | |
"doctags": output, | |
"markdown": doc.export_to_markdown(), | |
"document": doc, | |
"image": pil_image | |
} | |
except Exception as e: | |
return {"error": f"فشل في إنشاء مستند Docling: {str(e)}"} | |
except Exception as e: | |
return {"error": f"حدث خطأ غير متوقع: {str(e)}"} | |
def export_to_html(self, doc, output_path="./output.html", show_in_browser=False): | |
""" | |
تصدير المستند إلى HTML | |
المعلمات: | |
doc (DoclingDocument): مستند Docling | |
output_path (str): مسار ملف الإخراج | |
show_in_browser (bool): عرض الملف في المتصفح | |
العوائد: | |
str: مسار ملف HTML المولد | |
""" | |
if not self.is_available(): | |
return None | |
try: | |
from pathlib import Path | |
from docling_core.types.doc import ImageRefMode | |
# إنشاء مسار الإخراج | |
out_path = Path(output_path) | |
# التأكد من وجود المجلد | |
out_path.parent.mkdir(exist_ok=True, parents=True) | |
doc.save_as_html(out_path, image_mode=ImageRefMode.EMBEDDED) | |
# فتح في المتصفح إذا تم طلب ذلك | |
if show_in_browser: | |
import webbrowser | |
webbrowser.open(f"file:///{str(out_path.resolve())}") | |
return str(out_path) | |
except Exception as e: | |
print(f"خطأ في تصدير المستند إلى HTML: {str(e)}") | |
return None | |
class ClaudeAnalyzer: | |
""" | |
فئة لتحليل المستندات باستخدام Claude.ai API | |
""" | |
def __init__(self): | |
"""تهيئة محلل Claude""" | |
self.api_url = "https://api.anthropic.com/v1/messages" | |
def get_api_key(self): | |
"""الحصول على مفتاح API من متغيرات البيئة""" | |
api_key = os.environ.get("anthropic") | |
if not api_key: | |
raise ValueError("مفتاح API لـ Claude غير موجود في متغيرات البيئة") | |
return api_key | |
def analyze_document(self, file_path, model_name="claude-3-7-sonnet", prompt=None): | |
""" | |
تحليل مستند باستخدام Claude AI | |
المعلمات: | |
file_path: مسار الملف المراد تحليله | |
model_name: اسم نموذج Claude المراد استخدامه | |
prompt: التوجيه المخصص للتحليل (اختياري) | |
العوائد: | |
dict: نتائج التحليل | |
""" | |
try: | |
# الحصول على مفتاح API | |
api_key = self.get_api_key() | |
# تحديد التوجيه المناسب إذا لم يتم توفيره | |
if prompt is None: | |
_, ext = os.path.splitext(file_path) | |
ext = ext.lower() | |
if ext == '.pdf': | |
prompt = "قم بتحليل هذه الصورة المستخرجة من مستند PDF واستخراج المعلومات الرئيسية مثل العناوين، الفقرات، الجداول، والنقاط المهمة." | |
elif ext in ('.doc', '.docx'): | |
prompt = "قم بتحليل هذه الصورة المستخرجة من مستند Word واستخراج المعلومات الرئيسية والخلاصة." | |
elif ext in ('.jpg', '.jpeg', '.png', '.gif', '.webp'): | |
prompt = "قم بوصف وتحليل محتوى هذه الصورة بالتفصيل، مع ذكر العناصر المهمة والنصوص والبيانات الموجودة فيها." | |
else: | |
prompt = "قم بتحليل محتوى هذا الملف واستخراج المعلومات المفيدة منه." | |
# التحقق من نوع الملف وتحويله إذا لزم الأمر | |
_, ext = os.path.splitext(file_path) | |
ext = ext.lower() | |
processed_file_path = file_path | |
temp_files = [] # قائمة للملفات المؤقتة لحذفها لاحقاً | |
# للملفات غير المدعومة مباشرة (مثل PDF) | |
if ext not in ('.jpg', '.jpeg', '.png', '.gif', '.webp'): | |
# إذا كان الملف PDF، حاول تحويله إلى صورة | |
if ext == '.pdf': | |
if not pdf_conversion_available: | |
return {"error": "لا يمكن تحويل ملف PDF إلى صورة. يرجى تثبيت مكتبة pdf2image."} | |
try: | |
# تحويل الصفحة الأولى فقط | |
images = convert_from_path(file_path, first_page=1, last_page=1) | |
if images: | |
# حفظ الصورة بشكل مؤقت | |
temp_image_path = "/tmp/temp_pdf_image.jpg" | |
images[0].save(temp_image_path, 'JPEG') | |
processed_file_path = temp_image_path # استخدام مسار الصورة الجديد | |
temp_files.append(temp_image_path) | |
else: | |
return {"error": "فشل في تحويل ملف PDF إلى صورة"} | |
except Exception as e: | |
return {"error": f"فشل في تحويل ملف PDF إلى صورة: {str(e)}"} | |
else: | |
return {"error": f"نوع الملف {ext} غير مدعوم. Claude API يدعم فقط الصور (JPEG, PNG, GIF, WebP) أو PDF (يتم تحويله تلقائياً)."} | |
# ضغط الصورة إذا كان حجمها كبيراً | |
try: | |
img = Image.open(processed_file_path) | |
# تحقق من حجم الصورة وضغطها إذا كانت كبيرة | |
img_width, img_height = img.size | |
if img_width > 1500 or img_height > 1500: | |
# تحويل الصورة إلى حجم أصغر (1500×1500 بكسل كحد أقصى) | |
img.thumbnail((1500, 1500)) | |
# حفظ الصورة المضغوطة في ملف مؤقت | |
compressed_image_path = "/tmp/compressed_image.jpg" | |
img.save(compressed_image_path, format="JPEG", quality=85) | |
# إضافة الملف المؤقت إلى القائمة | |
if processed_file_path not in temp_files: | |
temp_files.append(compressed_image_path) | |
processed_file_path = compressed_image_path | |
except Exception as e: | |
logging.warning(f"فشل في ضغط الصورة: {str(e)}. سيتم استخدام الصورة الأصلية.") | |
# قراءة محتوى الملف المعالج | |
with open(processed_file_path, 'rb') as f: | |
file_content = f.read() | |
# التحقق من حجم الملف (يجب أن يكون أقل من 20 ميجابايت) | |
file_size_mb = len(file_content) / (1024 * 1024) | |
if file_size_mb > 20: | |
# محاولة ضغط الصورة أكثر إذا كان حجمها أكبر من 20 ميجابايت | |
try: | |
img = Image.open(processed_file_path) | |
# ضغط أكبر - حجم أصغر وجودة أقل | |
compressed_image_path = "/tmp/extra_compressed_image.jpg" | |
img.thumbnail((1000, 1000)) | |
img.save(compressed_image_path, format="JPEG", quality=70) | |
# إضافة الملف المؤقت إلى القائمة | |
temp_files.append(compressed_image_path) | |
processed_file_path = compressed_image_path | |
# قراءة الملف المضغوط | |
with open(processed_file_path, 'rb') as f: | |
file_content = f.read() | |
# التحقق من الحجم مرة أخرى | |
file_size_mb = len(file_content) / (1024 * 1024) | |
if file_size_mb > 20: | |
# لا يزال الحجم كبيراً | |
for temp_file in temp_files: | |
try: | |
os.unlink(temp_file) | |
except: | |
pass | |
return {"error": f"حجم الملف كبير جدًا ({file_size_mb:.2f} ميجابايت) حتى بعد الضغط. يجب أن يكون أقل من 20 ميجابايت."} | |
except Exception as e: | |
for temp_file in temp_files: | |
try: | |
os.unlink(temp_file) | |
except: | |
pass | |
return {"error": f"الملف كبير جدًا ({file_size_mb:.2f} ميجابايت) ولا يمكن ضغطه. يجب أن يكون أقل من 20 ميجابايت."} | |
# تحديد نوع الملف المعالج (بعد التحويل إذا تم) | |
file_type = self._get_file_type(processed_file_path) | |
# تحويل المحتوى إلى Base64 | |
file_base64 = base64.b64encode(file_content).decode('utf-8') | |
# إعداد البيانات للطلب | |
headers = { | |
"Content-Type": "application/json", | |
"x-api-key": api_key, | |
"anthropic-version": "2023-06-01" | |
} | |
# التحقق من اسم النموذج وتصحيحه إذا لزم الأمر | |
valid_models = { | |
"claude-3-7-sonnet": "claude-3-7-sonnet-20250219", | |
"claude-3-5-haiku": "claude-3-5-haiku-20240307" | |
} | |
if model_name in valid_models: | |
model_name = valid_models[model_name] | |
# طباعة معلومات التصحيح | |
logging.debug(f"إرسال طلب إلى Claude API: {model_name}, نوع الملف: {file_type}") | |
# تحضير payload للـ API | |
payload = { | |
"model": model_name, | |
"max_tokens": 4096, | |
"messages": [ | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "text", "text": prompt}, | |
{ | |
"type": "image", | |
"source": { | |
"type": "base64", | |
"media_type": file_type, | |
"data": file_base64 | |
} | |
} | |
] | |
} | |
] | |
} | |
# إرسال الطلب إلى API مع محاولات إعادة | |
for attempt in range(3): # ثلاث محاولات كحد أقصى | |
try: | |
response = requests.post( | |
self.api_url, | |
headers=headers, | |
json=payload, | |
timeout=120 # زيادة مهلة الانتظار إلى دقيقتين | |
) | |
# إذا نجح الطلب، نخرج من الحلقة | |
if response.status_code == 200: | |
break | |
# إذا كان الخطأ 502، ننتظر ونحاول مرة أخرى | |
if response.status_code == 502: | |
wait_time = (attempt + 1) * 5 # انتظار 5، 10، 15 ثانية | |
logging.warning(f"تم استلام خطأ 502. الانتظار {wait_time} ثانية قبل إعادة المحاولة.") | |
time.sleep(wait_time) | |
else: | |
# إذا كان الخطأ ليس 502، نخرج من الحلقة | |
break | |
except requests.exceptions.RequestException as e: | |
logging.warning(f"فشل الطلب في المحاولة {attempt+1}: {str(e)}") | |
if attempt == 2: # آخر محاولة | |
# حذف الملفات المؤقتة | |
for temp_file in temp_files: | |
try: | |
os.unlink(temp_file) | |
except: | |
pass | |
return {"error": f"فشل الاتصال بعد عدة محاولات: {str(e)}"} | |
time.sleep((attempt + 1) * 5) # انتظار قبل إعادة المحاولة | |
# حذف الملفات المؤقتة | |
for temp_file in temp_files: | |
try: | |
os.unlink(temp_file) | |
except: | |
pass | |
# التحقق من نجاح الطلب | |
if response.status_code != 200: | |
error_message = f"فشل طلب API: {response.status_code}" | |
try: | |
error_details = response.json() | |
error_message += f"\nتفاصيل: {error_details}" | |
except: | |
error_message += f"\nتفاصيل: {response.text}" | |
return { | |
"error": error_message | |
} | |
# معالجة الاستجابة | |
result = response.json() | |
return { | |
"success": True, | |
"content": result["content"][0]["text"], | |
"model": result["model"], | |
"usage": result.get("usage", {}) | |
} | |
except Exception as e: | |
# حذف الملفات المؤقتة في حالة حدوث خطأ | |
for temp_file in temp_files: | |
try: | |
os.unlink(temp_file) | |
except: | |
pass | |
logging.error(f"خطأ أثناء تحليل المستند: {str(e)}") | |
import traceback | |
stack_trace = traceback.format_exc() | |
return {"error": f"فشل في تحليل المستند: {str(e)}\n{stack_trace}"} | |
def _get_file_type(self, file_path): | |
"""تحديد نوع الملف من امتداده""" | |
_, ext = os.path.splitext(file_path) | |
ext = ext.lower() | |
# Claude API يدعم فقط أنواع الصور التالية | |
if ext in ('.jpg', '.jpeg'): | |
return "image/jpeg" | |
elif ext == '.png': | |
return "image/png" | |
elif ext == '.gif': | |
return "image/gif" | |
elif ext == '.webp': | |
return "image/webp" | |
else: | |
# للملفات الأخرى، نعيد نوع صورة افتراضي | |
# هذا سيستخدم فقط إذا تم تحويل الملف إلى صورة أولاً | |
return "image/jpeg" | |
def get_available_models(self): | |
""" | |
الحصول على قائمة بالنماذج المتاحة | |
العوائد: | |
dict: قائمة بالنماذج مع وصفها | |
""" | |
return { | |
"claude-3-7-sonnet": "Claude 3.7 Sonnet - نموذج ذكي للمهام المتقدمة", | |
"claude-3-5-haiku": "Claude 3.5 Haiku - أسرع نموذج للمهام اليومية" | |
} | |
def get_model_full_name(self, short_name): | |
""" | |
تحويل الاسم المختصر للنموذج إلى الاسم الكامل | |
المعلمات: | |
short_name: الاسم المختصر للنموذج | |
العوائد: | |
str: الاسم الكامل للنموذج | |
""" | |
valid_models = { | |
"claude-3-7-sonnet": "claude-3-7-sonnet-20250219", | |
"claude-3-5-haiku": "claude-3-5-haiku-20240307" | |
} | |
return valid_models.get(short_name, short_name) | |
class DocumentAnalysisApp: | |
def __init__(self): | |
# إنشاء كائنات الخدمات | |
self.text_extractor = TextExtractor() | |
self.item_extractor = ItemExtractor() | |
self.document_parser = DocumentParser() | |
# إنشاء محلل Docling | |
self.docling_analyzer = DoclingAnalyzer() | |
# إنشاء محلل Claude | |
self.claude_analyzer = ClaudeAnalyzer() | |
def render(self): | |
"""العرض الرئيسي للتطبيق""" | |
st.title("تحليل المستندات") | |
st.write("اختر ملفًا لتحليله واستخرج البيانات المطلوبة.") | |
# إنشاء علامات تبويب للأنواع المختلفة من التحليل | |
tabs = st.tabs(["تحليل عام", "تحليل Docling", "تحليل Claude AI"]) | |
with tabs[0]: | |
self._render_general_analysis() | |
with tabs[1]: | |
self._render_docling_analysis() | |
with tabs[2]: | |
self._render_claude_analysis() | |
def _render_general_analysis(self): | |
"""عرض واجهة التحليل العام""" | |
uploaded_file = st.file_uploader("ارفع ملف PDF أو DOCX", type=["pdf", "docx"], key="general_uploader") | |
if uploaded_file: | |
with st.spinner("جاري تحليل المستند..."): | |
file_path = f"/tmp/{uploaded_file.name}" | |
with open(file_path, "wb") as f: | |
f.write(uploaded_file.read()) | |
# تحديد نوع الملف من امتداده | |
_, ext = os.path.splitext(file_path) | |
ext = ext.lower() | |
# استخراج النص حسب نوع الملف | |
if ext == '.pdf': | |
extracted_text = self.text_extractor.extract_from_pdf(file_path) | |
elif ext in ('.doc', '.docx'): | |
extracted_text = self.text_extractor.extract_from_docx(file_path) | |
else: | |
extracted_text = "نوع ملف غير مدعوم للنص" | |
# عرض النص المستخرج | |
st.subheader("النص المستخرج:") | |
st.text_area("النص", extracted_text, height=300) | |
# استخراج البنود | |
extracted_items = self.item_extractor.extract(file_path) | |
if extracted_items: | |
st.subheader("البنود المستخرجة:") | |
st.dataframe(extracted_items) | |
# تحليل المستند | |
parsed_data = self.document_parser.parse(file_path) | |
st.subheader("تحليل المستند:") | |
st.json(parsed_data) | |
def _render_docling_analysis(self): | |
"""عرض واجهة تحليل Docling""" | |
import streamlit as st | |
from tempfile import NamedTemporaryFile | |
if not self.docling_analyzer.is_available(): | |
st.warning("مكتبات Docling و MLX VLM غير متوفرة. يرجى تثبيت الحزم المطلوبة.") | |
st.code(""" | |
# يرجى تثبيت الحزم التالية: | |
pip install docling-core mlx-vlm pillow>=10.3.0 transformers>=4.49.0 tqdm>=4.66.2 | |
""") | |
return | |
st.subheader("تحليل الصور والمستندات باستخدام Docling") | |
# اختيار مصدر الصورة | |
source_option = st.radio("اختر مصدر الصورة:", ["رفع صورة", "رابط صورة"]) | |
image_path = None | |
image_url = None | |
image_data = None | |
if source_option == "رفع صورة": | |
uploaded_image = st.file_uploader("ارفع صورة", type=["jpg", "jpeg", "png"], key="docling_uploader") | |
if uploaded_image: | |
# حفظ الصورة المرفوعة إلى ملف مؤقت | |
image_data = uploaded_image.read() | |
# عرض الصورة المرفوعة | |
st.image(image_data, caption="الصورة المرفوعة", width=400) | |
# إنشاء ملف مؤقت لحفظ الصورة | |
with NamedTemporaryFile(delete=False, suffix=f".{uploaded_image.name.split('.')[-1]}") as temp_file: | |
temp_file.write(image_data) | |
image_path = temp_file.name | |
else: | |
image_url = st.text_input("أدخل رابط الصورة:") | |
if image_url: | |
try: | |
# عرض الصورة من الرابط | |
st.image(image_url, caption="الصورة من الرابط", width=400) | |
except Exception as e: | |
st.error(f"خطأ في تحميل الصورة: {str(e)}") | |
# توجيه للنموذج | |
prompt = st.text_input("توجيه للنموذج:", value="Convert this page to docling.") | |
# زر التحليل | |
if st.button("تحليل الصورة"): | |
if image_path or image_url: | |
with st.spinner("جاري تحليل الصورة..."): | |
# تحليل الصورة | |
results = self.docling_analyzer.analyze_image( | |
image_path=image_path, | |
image_url=image_url, | |
image_bytes=None, # نستخدم الملف المؤقت بدلاً من البيانات المباشرة | |
prompt=prompt | |
) | |
if "error" in results: | |
st.error(results["error"]) | |
else: | |
# عرض النتائج | |
with st.expander("علامات DocTags", expanded=True): | |
st.code(results["doctags"], language="xml") | |
with st.expander("Markdown", expanded=True): | |
st.code(results["markdown"], language="markdown") | |
# تصدير إلى HTML | |
if st.button("تصدير إلى HTML"): | |
html_path = self.docling_analyzer.export_to_html( | |
results["document"], | |
show_in_browser=True | |
) | |
if html_path: | |
st.success(f"تم تصدير المستند إلى: {html_path}") | |
else: | |
st.error("فشل تصدير المستند إلى HTML") | |
# حذف الملف المؤقت بعد الانتهاء | |
if image_path and os.path.exists(image_path) and image_data: | |
try: | |
os.unlink(image_path) | |
except: | |
pass | |
else: | |
st.warning("يرجى اختيار صورة للتحليل أولاً.") | |
def _render_claude_analysis(self): | |
"""عرض واجهة تحليل Claude AI مع توسعة البيانات المعروضة""" | |
import time | |
st.subheader("تحليل المستندات باستخدام Claude AI") | |
col1, col2 = st.columns([2, 1]) | |
with col1: | |
# إضافة اختيار النموذج | |
claude_models = { | |
"claude-3-7-sonnet": "Claude 3.7 Sonnet - نموذج ذكي للمهام المتقدمة", | |
"claude-3-5-haiku": "Claude 3.5 Haiku - أسرع نموذج للمهام اليومية" | |
} | |
selected_model = st.radio( | |
"اختر نموذج Claude", | |
options=list(claude_models.keys()), | |
format_func=lambda x: claude_models[x], | |
horizontal=True | |
) | |
with col2: | |
# إضافة شرح بسيط للنموذج | |
if selected_model == "claude-3-7-sonnet": | |
st.info("نموذج Claude 3.7 Sonnet هو أحدث نموذج ذكي يقدم تحليلاً متعمقاً للمستندات مع دقة عالية") | |
else: | |
st.info("نموذج Claude 3.5 Haiku أسرع في التحليل ومناسب للمهام البسيطة والاستخدام اليومي") | |
# تخصيص التوجيه مع اقتراحات للتوجيهات المخصصة | |
st.subheader("تخصيص التحليل") | |
prompt_templates = { | |
"تحليل عام": "قم بتحليل هذا المستند واستخراج جميع المعلومات المهمة.", | |
"استخراج البيانات الأساسية": "استخرج كافة البيانات الأساسية من هذا المستند بما في ذلك الأسماء والتواريخ والأرقام والمبالغ المالية.", | |
"تلخيص المستند": "قم بتلخيص هذا المستند بشكل مفصل مع التركيز على النقاط الرئيسية.", | |
"تحليل العقود": "حلل هذا العقد واستخرج الأطراف والالتزامات والشروط والتواريخ المهمة.", | |
"تحليل فواتير": "استخرج كافة المعلومات من هذه الفاتورة بما في ذلك المورد والعميل وتفاصيل المنتجات والأسعار والمبالغ الإجمالية." | |
} | |
prompt_type = st.selectbox( | |
"اختر نوع التوجيه", | |
options=list(prompt_templates.keys()), | |
index=0 | |
) | |
default_prompt = prompt_templates[prompt_type] | |
custom_prompt = st.text_area( | |
"تخصيص التوجيه للتحليل", | |
value=default_prompt, | |
height=100 | |
) | |
# خيارات متقدمة | |
with st.expander("خيارات متقدمة"): | |
extraction_format = st.selectbox( | |
"تنسيق استخراج البيانات", | |
["عام", "جداول", "قائمة", "هيكل منظم"], | |
index=0 | |
) | |
detail_level = st.slider( | |
"مستوى التفاصيل", | |
min_value=1, | |
max_value=5, | |
value=3, | |
help="1: ملخص موجز، 5: تحليل تفصيلي كامل" | |
) | |
# تحديث التوجيه بناء على الخيارات المتقدمة | |
if extraction_format != "عام" or detail_level != 3: | |
custom_prompt += f"\n\nاستخدم تنسيق {extraction_format} مع مستوى تفاصيل {detail_level}/5." | |
# رفع الملف | |
uploaded_file = st.file_uploader( | |
"ارفع ملفًا للتحليل", | |
type=["pdf", "jpg", "jpeg", "png"], | |
key="claude_uploader", | |
help="يدعم ملفات PDF والصور. سيتم تحويل PDF إلى صور لمعالجتها." | |
) | |
# التحقق من وجود مفتاح API | |
api_available = True | |
try: | |
self.claude_analyzer.get_api_key() | |
except ValueError: | |
api_available = False | |
st.warning("مفتاح API لـ Claude غير متوفر. يرجى التأكد من تعيين متغير البيئة 'anthropic'.") | |
# زر التحليل | |
analyze_col1, analyze_col2 = st.columns([1, 3]) | |
with analyze_col1: | |
analyze_button = st.button( | |
"تحليل المستند", | |
key="analyze_claude_btn", | |
use_container_width=True, | |
disabled=not (uploaded_file and api_available) | |
) | |
with analyze_col2: | |
if not uploaded_file: | |
st.info("يرجى رفع ملف للتحليل") | |
# إجراء التحليل | |
if uploaded_file and api_available and analyze_button: | |
# عرض شريط التقدم | |
progress_bar = st.progress(0, text="جاري تجهيز الملف...") | |
with st.spinner(f"جاري التحليل باستخدام {claude_models[selected_model].split('-')[0]}..."): | |
# حفظ الملف المرفوع إلى ملف مؤقت | |
temp_path = f"/tmp/{uploaded_file.name}" | |
with open(temp_path, "wb") as f: | |
f.write(uploaded_file.getbuffer()) | |
# تحديث شريط التقدم | |
progress_bar.progress(25, text="جاري معالجة الملف...") | |
try: | |
# تحليل المستند | |
progress_bar.progress(40, text="جاري إرسال الطلب إلى Claude AI...") | |
results = self.claude_analyzer.analyze_document( | |
temp_path, | |
model_name=selected_model, | |
prompt=custom_prompt | |
) | |
progress_bar.progress(90, text="جاري معالجة النتائج...") | |
if "error" in results: | |
st.error(results["error"]) | |
else: | |
progress_bar.progress(100, text="اكتمل التحليل!") | |
# عرض النتائج بشكل منظم | |
st.success(f"تم التحليل بنجاح باستخدام {results.get('model', selected_model)}!") | |
# إضافة علامات تبويب فرعية للنتائج | |
result_tabs = st.tabs(["التحليل الكامل", "بيانات مستخرجة", "معلومات إضافية"]) | |
with result_tabs[0]: | |
# عرض النتائج الكاملة | |
st.markdown("## نتائج التحليل") | |
st.markdown(results["content"]) | |
with result_tabs[1]: | |
# محاولة استخراج بيانات منظمة من النتائج | |
st.markdown("## البيانات المستخرجة") | |
# تقسيم النتائج إلى أقسام | |
content_parts = results["content"].split("\n\n") | |
# استخراج العناوين والبيانات الهامة | |
headings = [] | |
key_values = {} | |
for part in content_parts: | |
# تحديد العناوين | |
if part.startswith("#") or part.startswith("##") or part.startswith("###"): | |
headings.append(part.strip()) | |
continue | |
# محاولة استخراج أزواج المفتاح/القيمة | |
if ":" in part and len(part.split(":")) == 2: | |
key, value = part.split(":") | |
key_values[key.strip()] = value.strip() | |
# عرض العناوين | |
if headings: | |
st.markdown("### العناوين الرئيسية") | |
for heading in headings[:5]: # عرض أهم 5 عناوين | |
st.markdown(f"- {heading}") | |
if len(headings) > 5: | |
with st.expander(f"عرض {len(headings) - 5} عناوين إضافية"): | |
for heading in headings[5:]: | |
st.markdown(f"- {heading}") | |
# عرض البيانات الهامة | |
if key_values: | |
st.markdown("### بيانات هامة") | |
# تحويل البيانات إلى DataFrame | |
import pandas as pd | |
df = pd.DataFrame([key_values.values()], columns=key_values.keys()) | |
st.dataframe(df.T) | |
# البحث عن الجداول في النص | |
if "| ------ |" in results["content"] or "\n|" in results["content"]: | |
st.markdown("### جداول مستخرجة") | |
# استخراج الجداول من النص Markdown | |
table_parts = [] | |
in_table = False | |
current_table = [] | |
for line in results["content"].split("\n"): | |
if line.startswith("|") and "-|-" in line.replace(" ", ""): | |
in_table = True | |
current_table.append(line) | |
elif in_table and line.startswith("|"): | |
current_table.append(line) | |
elif in_table and not line.startswith("|") and line.strip(): | |
in_table = False | |
table_parts.append("\n".join(current_table)) | |
current_table = [] | |
# إضافة الجدول الأخير إذا كان هناك | |
if current_table: | |
table_parts.append("\n".join(current_table)) | |
# عرض الجداول | |
for i, table in enumerate(table_parts): | |
st.markdown(f"#### جدول {i+1}") | |
st.markdown(table) | |
# إذا لم يتم العثور على أي بيانات منظمة | |
if not headings and not key_values and not ("| ------ |" in results["content"] or "\n|" in results["content"]): | |
st.info("لم يتم العثور على بيانات منظمة في النتائج. يمكنك تعديل التوجيه لطلب تنسيق أكثر هيكلية.") | |
with result_tabs[2]: | |
# عرض معلومات إضافية | |
st.markdown("## معلومات عن التحليل") | |
# عرض معلومات الاستخدام | |
col1, col2 = st.columns(2) | |
with col1: | |
st.markdown("### معلومات النموذج") | |
st.markdown(f"**النموذج المستخدم**: {results.get('model', selected_model)}") | |
st.markdown(f"**تاريخ التحليل**: {time.strftime('%Y-%m-%d %H:%M:%S')}") | |
with col2: | |
st.markdown("### إحصائيات الاستخدام") | |
if "usage" in results: | |
usage = results["usage"] | |
st.markdown(f"**توكنز المدخلات**: {usage.get('input_tokens', 'غير متوفر')}") | |
st.markdown(f"**توكنز الإخراج**: {usage.get('output_tokens', 'غير متوفر')}") | |
st.markdown(f"**إجمالي التوكنز**: {usage.get('input_tokens', 0) + usage.get('output_tokens', 0)}") | |
else: | |
st.info("معلومات الاستخدام غير متوفرة") | |
# إضافة خيارات التصدير | |
st.markdown("### تصدير النتائج") | |
export_col1, export_col2 = st.columns(2) | |
with export_col1: | |
# تصدير كنص | |
st.download_button( | |
label="تحميل النتائج كملف نصي", | |
data=results["content"], | |
file_name=f"claude_analysis_{uploaded_file.name.split('.')[0]}.txt", | |
mime="text/plain" | |
) | |
with export_col2: | |
# تصدير كـ Markdown | |
st.download_button( | |
label="تحميل النتائج كملف Markdown", | |
data=results["content"], | |
file_name=f"claude_analysis_{uploaded_file.name.split('.')[0]}.md", | |
mime="text/markdown" | |
) | |
finally: | |
# حذف الملف المؤقت | |
try: | |
os.unlink(temp_path) | |
except: | |
pass | |
def analyze_document(self, file_path): | |
""" | |
تحليل مستند وإرجاع نتائج التحليل | |
المعلمات: | |
file_path (str): مسار المستند المراد تحليله | |
العوائد: | |
dict: نتائج تحليل المستند | |
""" | |
# تحديد نوع المستند من امتداد الملف | |
_, ext = os.path.splitext(file_path) | |
ext = ext.lower() | |
# تحليل المستند حسب نوعه | |
if ext == '.pdf': | |
text = self.text_extractor.extract_from_pdf(file_path) | |
elif ext in ('.doc', '.docx'): | |
text = self.text_extractor.extract_from_docx(file_path) | |
elif ext in ('.jpg', '.jpeg', '.png'): | |
# استخدام محلل Docling للصور إذا كان متاحًا | |
if self.docling_analyzer.is_available(): | |
docling_results = self.docling_analyzer.analyze_image(image_path=file_path) | |
if "error" not in docling_results: | |
return { | |
"نص": docling_results["markdown"], | |
"doctags": docling_results["doctags"], | |
"معلومات": { | |
"نوع المستند": "صورة", | |
"تحليل": "تم تحليله باستخدام Docling" | |
} | |
} | |
# استخدام المحلل العادي إذا كان Docling غير متاح | |
text = self.text_extractor.extract_from_image(file_path) | |
else: | |
raise ValueError(f"نوع المستند غير مدعوم: {ext}") | |
# تحليل المستند | |
document = self.document_parser.parse_document(file_path) | |
# استخراج العناصر المنظمة | |
tables = self.item_extractor.extract_tables(document) | |
# إرجاع نتائج التحليل | |
return { | |
"نص": text, | |
"جداول": tables, | |
"معلومات": document | |
} | |
def analyze_with_claude(self, file_path, model_name="claude-3-7-sonnet", prompt=None): | |
""" | |
تحليل مستند باستخدام Claude AI | |
المعلمات: | |
file_path (str): مسار المستند المراد تحليله | |
model_name (str): اسم نموذج Claude المراد استخدامه | |
prompt (str): التوجيه المخصص للتحليل (اختياري) | |
العوائد: | |
dict: نتائج التحليل | |
""" | |
# محاولة تحليل المستند باستخدام Claude | |
try: | |
# التحقق من وجود المفتاح | |
self.claude_analyzer.get_api_key() | |
# تحليل المستند باستخدام Claude | |
return self.claude_analyzer.analyze_document( | |
file_path, | |
model_name=model_name, | |
prompt=prompt | |
) | |
except Exception as e: | |
logging.error(f"خطأ في تحليل المستند باستخدام Claude: {str(e)}") | |
return {"error": f"فشل في تحليل المستند باستخدام Claude: {str(e)}"} | |
# تشغيل التطبيق | |
if __name__ == "__main__": | |
app = DocumentAnalysisApp() | |
app.render() |