#!/usr/bin/env python # -*- coding: utf-8 -*- """ وحدة مقارنة المستندات المتقدمة لتحليل الفروقات بين نسخ المستندات """ import os import sys import json import re import difflib import Levenshtein from datetime import datetime import numpy as np import pandas as pd import streamlit as st import plotly.express as px import plotly.graph_objects as go from collections import Counter from nltk.tokenize import sent_tokenize, word_tokenize from rouge_score import rouge_scorer from PyPDF2 import PdfReader import io # إضافة مسار النظام للوصول للملفات المشتركة sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))) # استيراد المكونات المساعدة from utils.helpers import create_directory_if_not_exists, format_time, get_user_info class DocumentComparator: """فئة مقارنة المستندات المتقدمة""" def __init__(self): """تهيئة مقارن المستندات""" self.comparison_dir = os.path.join(os.path.dirname(__file__), '..', '..', 'data', 'document_comparison') create_directory_if_not_exists(self.comparison_dir) # تهيئة NLTK وتنزيل حزمة punkt إذا لم تكن موجودة self._initialize_nltk() # إعداد مقيم ROUGE لمقارنة النصوص self.rouge_scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=False) def _initialize_nltk(self): """تهيئة مكتبة NLTK وتنزيل الحزم المطلوبة""" try: # استيراد nltk import nltk # قائمة بالحزم المطلوبة required_packages = ['punkt', 'stopwords', 'wordnet'] for package in required_packages: try: # محاولة استخدام الحزمة أولاً، وإذا فشلت يتم تنزيلها nltk.data.find(f'tokenizers/{package}') except LookupError: print(f"تنزيل حزمة NLTK: {package}") nltk.download(package, quiet=True) # محاولة استخدام sent_tokenize للتحقق من وجود حزمة punkt from nltk.tokenize import sent_tokenize sent_tokenize("This is a test sentence.") except LookupError: # تنزيل حزمة punkt تلقائيًا إذا لم تكن موجودة import nltk nltk.download('punkt', quiet=True) # طباعة رسالة تأكيد التنزيل st.info("تم تنزيل حزمة NLTK punkt بنجاح للاستخدام في مقارنة المستندات.") def _preprocess_text(self, text): """معالجة النص قبل التحليل""" # إزالة الأرقام والرموز الخاصة والمسافات الزائدة text = re.sub(r'\s+', ' ', text) text = text.strip() return text def _segment_text(self, text): """تقسيم النص إلى فقرات وجمل""" # تقسيم النص إلى فقرات paragraphs = [p.strip() for p in text.split('\n') if p.strip()] # تقسيم كل فقرة إلى جمل sentences = [] for paragraph in paragraphs: paragraph_sentences = sent_tokenize(paragraph) sentences.extend(paragraph_sentences) return paragraphs, sentences def _calculate_similarity(self, text1, text2): """حساب نسبة التشابه بين نصين""" # حساب نسبة التشابه باستخدام مقياس Levenshtein ratio = Levenshtein.ratio(text1, text2) # حساب درجات ROUGE rouge_scores = self.rouge_scorer.score(text1, text2) # حساب متوسط نقاط Rouge rouge1_f1 = rouge_scores['rouge1'].fmeasure rouge2_f1 = rouge_scores['rouge2'].fmeasure rougeL_f1 = rouge_scores['rougeL'].fmeasure avg_rouge = (rouge1_f1 + rouge2_f1 + rougeL_f1) / 3 # دمج النقاط للحصول على نتيجة نهائية combined_score = (ratio + avg_rouge) / 2 return { 'levenshtein_ratio': ratio, 'rouge1_f1': rouge1_f1, 'rouge2_f1': rouge2_f1, 'rougeL_f1': rougeL_f1, 'avg_rouge': avg_rouge, 'combined_score': combined_score } def _extract_text_from_pdf(self, pdf_file): """استخراج النص من ملف PDF""" text = "" try: # قراءة ملف PDF pdf_reader = PdfReader(pdf_file) # استخراج النص من كل صفحة for page in pdf_reader.pages: text += page.extract_text() + "\n" except Exception as e: st.error(f"خطأ في قراءة ملف PDF: {e}") return text def get_document_diff(self, text1, text2, title1="المستند الأول", title2="المستند الثاني"): """حساب الفروقات بين نصين""" if not text1 or not text2: return { "title1": title1, "title2": title2, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "similarity": 0, "similarity_score": 0, "text_diffs": [], "summary": "أحد المستندات فارغ، لا يمكن إجراء المقارنة." } # معالجة النصوص preprocessed_text1 = self._preprocess_text(text1) preprocessed_text2 = self._preprocess_text(text2) # حساب نسبة التشابه الإجمالية similarity_metrics = self._calculate_similarity(preprocessed_text1, preprocessed_text2) similarity_score = similarity_metrics['combined_score'] similarity_percentage = int(similarity_score * 100) # تقسيم النصوص إلى فقرات وجمل paragraphs1, sentences1 = self._segment_text(text1) paragraphs2, sentences2 = self._segment_text(text2) # تحديد الفروقات بين الجمل باستخدام difflib differ = difflib.Differ() sentence_diffs = [] # مصفوفة التشابه بين الجمل similarity_matrix = np.zeros((len(sentences1), len(sentences2))) for i, s1 in enumerate(sentences1): for j, s2 in enumerate(sentences2): similarity_matrix[i, j] = Levenshtein.ratio(s1, s2) # تحديد أفضل مطابقة لكل جملة matched_sentences2 = set() # تتبع الجمل المطابقة في المستند الثاني for i, s1 in enumerate(sentences1): if len(s1.split()) < 3: # تجاهل الجمل القصيرة جداً continue best_match_idx = -1 best_match_score = 0.7 # عتبة التشابه for j, s2 in enumerate(sentences2): if j in matched_sentences2: continue # تجاهل الجمل التي تم مطابقتها بالفعل if len(s2.split()) < 3: # تجاهل الجمل القصيرة جداً continue score = similarity_matrix[i, j] if score > best_match_score and score > 0.7: best_match_score = score best_match_idx = j if best_match_idx != -1: # وجدنا تطابق، تحديد الفروقات باستخدام difflib s2 = sentences2[best_match_idx] diff = list(differ.compare(s1.split(), s2.split())) # تحويل مخرجات difflib إلى تنسيق أسهل للاستخدام formatted_diff = [] for token in diff: if token.startswith(' '): # متطابق formatted_diff.append({'text': token[2:], 'status': 'same'}) elif token.startswith('- '): # حذف formatted_diff.append({'text': token[2:], 'status': 'removed'}) elif token.startswith('+ '): # إضافة formatted_diff.append({'text': token[2:], 'status': 'added'}) sentence_diffs.append({ 'doc1_idx': i, 'doc2_idx': best_match_idx, 'doc1_text': s1, 'doc2_text': s2, 'similarity': best_match_score, 'diff': formatted_diff }) matched_sentences2.add(best_match_idx) else: # لم نجد تطابق، هذه الجملة غير موجودة في المستند الثاني sentence_diffs.append({ 'doc1_idx': i, 'doc2_idx': -1, 'doc1_text': s1, 'doc2_text': "", 'similarity': 0, 'diff': [{'text': word, 'status': 'removed'} for word in s1.split()] }) # تحديد الجمل الجديدة في المستند الثاني for j, s2 in enumerate(sentences2): if j not in matched_sentences2 and len(s2.split()) >= 3: sentence_diffs.append({ 'doc1_idx': -1, 'doc2_idx': j, 'doc1_text': "", 'doc2_text': s2, 'similarity': 0, 'diff': [{'text': word, 'status': 'added'} for word in s2.split()] }) # ترتيب الفروقات حسب الموقع في المستند الأول sentence_diffs.sort(key=lambda x: (x['doc1_idx'] if x['doc1_idx'] != -1 else float('inf'), x['doc2_idx'] if x['doc2_idx'] != -1 else float('inf'))) # تحديد الفقرات المضافة والمحذوفة paragraph_diffs = [] matched_paragraphs2 = set() for i, p1 in enumerate(paragraphs1): if len(p1.split()) < 5: # تجاهل الفقرات القصيرة جداً continue best_match_idx = -1 best_match_score = 0.6 # عتبة التشابه for j, p2 in enumerate(paragraphs2): if j in matched_paragraphs2: continue if len(p2.split()) < 5: continue score = Levenshtein.ratio(p1, p2) if score > best_match_score: best_match_score = score best_match_idx = j if best_match_idx != -1: # وجدنا تطابق p2 = paragraphs2[best_match_idx] paragraph_diffs.append({ 'doc1_idx': i, 'doc2_idx': best_match_idx, 'doc1_text': p1, 'doc2_text': p2, 'similarity': best_match_score, 'status': 'modified' if best_match_score < 0.9 else 'same' }) matched_paragraphs2.add(best_match_idx) else: # لم نجد تطابق، هذه الفقرة غير موجودة في المستند الثاني paragraph_diffs.append({ 'doc1_idx': i, 'doc2_idx': -1, 'doc1_text': p1, 'doc2_text': "", 'similarity': 0, 'status': 'removed' }) # تحديد الفقرات الجديدة في المستند الثاني for j, p2 in enumerate(paragraphs2): if j not in matched_paragraphs2 and len(p2.split()) >= 5: paragraph_diffs.append({ 'doc1_idx': -1, 'doc2_idx': j, 'doc1_text': "", 'doc2_text': p2, 'similarity': 0, 'status': 'added' }) # ترتيب الفروقات حسب الموقع paragraph_diffs.sort(key=lambda x: (x['doc1_idx'] if x['doc1_idx'] != -1 else float('inf'), x['doc2_idx'] if x['doc2_idx'] != -1 else float('inf'))) # تحليل الفروقات للحصول على إحصائيات total_paragraphs = len(paragraphs1) + len(paragraphs2) removed_paragraphs = sum(1 for p in paragraph_diffs if p['status'] == 'removed') added_paragraphs = sum(1 for p in paragraph_diffs if p['status'] == 'added') modified_paragraphs = sum(1 for p in paragraph_diffs if p['status'] == 'modified') # تحليل الكلمات المضافة، المحذوفة والمتغيرة added_words = [] removed_words = [] modified_contexts = [] for diff in sentence_diffs: for token in diff['diff']: if token['status'] == 'added': added_words.append(token['text']) elif token['status'] == 'removed': removed_words.append(token['text']) # جمع السياقات المتغيرة للتحليل if diff['doc1_idx'] != -1 and diff['doc2_idx'] != -1 and diff['similarity'] < 0.9: modified_contexts.append({ 'doc1_text': diff['doc1_text'], 'doc2_text': diff['doc2_text'], 'similarity': diff['similarity'] }) # إنشاء التقرير النهائي comparison_report = { "title1": title1, "title2": title2, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "similarity": similarity_percentage, "similarity_metrics": similarity_metrics, "sentence_diffs": sentence_diffs, "paragraph_diffs": paragraph_diffs, "statistics": { "doc1_paragraphs": len(paragraphs1), "doc2_paragraphs": len(paragraphs2), "doc1_sentences": len(sentences1), "doc2_sentences": len(sentences2), "removed_paragraphs": removed_paragraphs, "added_paragraphs": added_paragraphs, "modified_paragraphs": modified_paragraphs, "removed_words_count": len(removed_words), "added_words_count": len(added_words), "top_removed_words": Counter(removed_words).most_common(10), "top_added_words": Counter(added_words).most_common(10) }, "modified_contexts": modified_contexts[:10], # أهم 10 سياقات متغيرة "summary": self._generate_comparison_summary( similarity_percentage, len(paragraphs1), len(paragraphs2), removed_paragraphs, added_paragraphs, modified_paragraphs, len(removed_words), len(added_words) ) } # حفظ تقرير المقارنة self._save_comparison_report(comparison_report, title1, title2) return comparison_report def _generate_comparison_summary(self, similarity, p1_count, p2_count, removed_p, added_p, modified_p, removed_w, added_w): """إنشاء ملخص للمقارنة بين المستندين""" if similarity >= 90: similarity_description = "متطابقة بشكل كبير" elif similarity >= 70: similarity_description = "متشابهة" elif similarity >= 50: similarity_description = "متشابهة جزئياً" else: similarity_description = "مختلفة" summary = f"المستندان {similarity_description} بنسبة {similarity}%. " # وصف التغييرات في الفقرات if removed_p > 0 or added_p > 0 or modified_p > 0: changes = [] if removed_p > 0: changes.append(f"تم حذف {removed_p} فقرة") if added_p > 0: changes.append(f"تم إضافة {added_p} فقرة") if modified_p > 0: changes.append(f"تم تعديل {modified_p} فقرة") summary += "التغييرات تشمل: " + "، ".join(changes) + ". " # وصف التغييرات في الكلمات if removed_w > 0 or added_w > 0: word_changes = [] if removed_w > 0: word_changes.append(f"تم حذف {removed_w} كلمة") if added_w > 0: word_changes.append(f"تم إضافة {added_w} كلمة") summary += "على مستوى الكلمات: " + "، ".join(word_changes) + "." return summary def _save_comparison_report(self, report, title1, title2): """حفظ تقرير المقارنة""" # إنشاء اسم ملف فريد timestamp = datetime.now().strftime("%Y%m%d%H%M%S") filename = f"compare_{title1.replace(' ', '_')}_{title2.replace(' ', '_')}_{timestamp}.json" file_path = os.path.join(self.comparison_dir, filename) try: with open(file_path, 'w', encoding='utf-8') as f: json.dump(report, f, ensure_ascii=False, indent=2) except Exception as e: print(f"خطأ في حفظ تقرير المقارنة: {e}") def load_comparison_report(self, filename): """تحميل تقرير مقارنة محفوظ""" file_path = os.path.join(self.comparison_dir, filename) if not os.path.exists(file_path): return None try: with open(file_path, 'r', encoding='utf-8') as f: report = json.load(f) return report except Exception as e: print(f"خطأ في تحميل تقرير المقارنة: {e}") return None def get_comparison_reports(self): """الحصول على قائمة تقارير المقارنة المحفوظة""" reports = [] for filename in os.listdir(self.comparison_dir): if filename.startswith("compare_") and filename.endswith(".json"): file_path = os.path.join(self.comparison_dir, filename) try: with open(file_path, 'r', encoding='utf-8') as f: report = json.load(f) reports.append({ "filename": filename, "title1": report.get("title1", "مستند 1"), "title2": report.get("title2", "مستند 2"), "timestamp": report.get("timestamp", ""), "similarity": report.get("similarity", 0) }) except Exception as e: print(f"خطأ في قراءة تقرير المقارنة {filename}: {e}") # ترتيب التقارير حسب التاريخ (الأحدث أولاً) reports.sort(key=lambda x: x["timestamp"], reverse=True) return reports def extract_key_differences(self, comparison_report): """استخراج الاختلافات الرئيسية من تقرير المقارنة""" if not comparison_report or "paragraph_diffs" not in comparison_report: return [] key_differences = [] # استخراج الفقرات المضافة added_paragraphs = [p for p in comparison_report["paragraph_diffs"] if p["status"] == "added"] if added_paragraphs: key_differences.append({ "type": "added_paragraphs", "label": "فقرات مضافة", "count": len(added_paragraphs), "items": [p["doc2_text"] for p in added_paragraphs] }) # استخراج الفقرات المحذوفة removed_paragraphs = [p for p in comparison_report["paragraph_diffs"] if p["status"] == "removed"] if removed_paragraphs: key_differences.append({ "type": "removed_paragraphs", "label": "فقرات محذوفة", "count": len(removed_paragraphs), "items": [p["doc1_text"] for p in removed_paragraphs] }) # استخراج الفقرات المعدلة modified_paragraphs = [p for p in comparison_report["paragraph_diffs"] if p["status"] == "modified"] if modified_paragraphs: modified_items = [] for p in modified_paragraphs: modified_items.append({ "doc1_text": p["doc1_text"], "doc2_text": p["doc2_text"], "similarity": p["similarity"] }) key_differences.append({ "type": "modified_paragraphs", "label": "فقرات معدلة", "count": len(modified_paragraphs), "items": modified_items }) # استخراج الكلمات الرئيسية المضافة والمحذوفة if "statistics" in comparison_report: stats = comparison_report["statistics"] if "top_added_words" in stats and stats["top_added_words"]: key_differences.append({ "type": "added_words", "label": "الكلمات المضافة الأكثر تكراراً", "count": stats["added_words_count"], "items": stats["top_added_words"] }) if "top_removed_words" in stats and stats["top_removed_words"]: key_differences.append({ "type": "removed_words", "label": "الكلمات المحذوفة الأكثر تكراراً", "count": stats["removed_words_count"], "items": stats["top_removed_words"] }) return key_differences def analyze_legal_changes(self, comparison_report): """تحليل التغييرات القانونية في المستندات""" if not comparison_report: return [] # قائمة المصطلحات القانونية الهامة للبحث عنها legal_terms = { "payment": ["دفع", "سداد", "مستحقات", "مقابل", "رسوم", "تكلفة", "مبلغ", "أتعاب"], "deadlines": ["ميعاد", "موعد", "تاريخ", "أجل", "مدة", "فترة", "مهلة"], "liability": ["مسؤولية", "التزام", "تحمل", "تعويض", "ضمان", "كفالة"], "termination": ["إنهاء", "فسخ", "إلغاء", "إيقاف", "إنهاء العلاقة"], "dispute": ["نزاع", "خلاف", "منازعة", "اعتراض", "تحكيم", "قضاء", "محكمة"], "penalties": ["غرامة", "عقوبة", "شرط جزائي", "جزاء", "تعويض"], "conditions": ["شرط", "بند", "حالة", "اشتراط", "متطلب"], "rights": ["حق", "صلاحية", "امتياز", "منفعة", "ملكية", "تصرف"], "obligations": ["التزام", "واجب", "تعهد", "إلزام", "لازم"] } # البحث عن التغييرات المتعلقة بالمصطلحات القانونية legal_changes = [] if "sentence_diffs" in comparison_report: for category, terms in legal_terms.items(): category_changes = [] for diff in comparison_report["sentence_diffs"]: # فحص فقط الجمل المعدلة (المتطابقة جزئياً) if diff["doc1_idx"] != -1 and diff["doc2_idx"] != -1 and diff["similarity"] < 0.9: # فحص ما إذا كانت الجملة تحتوي على أي من المصطلحات القانونية contains_term = False for term in terms: if term in diff["doc1_text"].lower() or term in diff["doc2_text"].lower(): contains_term = True break if contains_term: category_changes.append({ "doc1_text": diff["doc1_text"], "doc2_text": diff["doc2_text"], "similarity": diff["similarity"] }) if category_changes: legal_category_name = { "payment": "الدفع والمستحقات المالية", "deadlines": "المواعيد والفترات الزمنية", "liability": "المسؤولية والالتزامات", "termination": "إنهاء العقد أو فسخه", "dispute": "النزاعات والخلافات", "penalties": "الغرامات والعقوبات", "conditions": "الشروط والبنود", "rights": "الحقوق والصلاحيات", "obligations": "الالتزامات والواجبات" } legal_changes.append({ "category": category, "label": legal_category_name.get(category, category), "count": len(category_changes), "changes": category_changes }) # ترتيب التغييرات حسب الأهمية (عدد التغييرات) legal_changes.sort(key=lambda x: x["count"], reverse=True) return legal_changes def analyze_price_changes(self, text1, text2): """تحليل التغييرات في الأسعار بين نسختي المستند""" # البحث عن الأرقام متبوعة بعملة أو تعبيرات تدل على المبالغ price_pattern = r'(\d{1,3}(?:,\d{3})*(?:\.\d+)?)\s*(?:ريال|دولار|يورو|جنيه|درهم|دينار|SAR|USD|EUR|SR|$|€|£)' amount_pattern = r'مبلغ[\s\w]*?(\d{1,3}(?:,\d{3})*(?:\.\d+)?)' # استخراج الأسعار من كل نص prices1 = re.findall(price_pattern, text1) prices1.extend(re.findall(amount_pattern, text1)) prices1 = [p.replace(',', '') for p in prices1] prices1 = [float(p) for p in prices1 if p] prices2 = re.findall(price_pattern, text2) prices2.extend(re.findall(amount_pattern, text2)) prices2 = [p.replace(',', '') for p in prices2] prices2 = [float(p) for p in prices2 if p] # تحليل التغييرات price_diff = { "doc1_prices_count": len(prices1), "doc2_prices_count": len(prices2), "doc1_total": sum(prices1) if prices1 else 0, "doc2_total": sum(prices2) if prices2 else 0, "doc1_average": sum(prices1) / len(prices1) if prices1 else 0, "doc2_average": sum(prices2) / len(prices2) if prices2 else 0, "doc1_min": min(prices1) if prices1 else 0, "doc2_min": min(prices2) if prices2 else 0, "doc1_max": max(prices1) if prices1 else 0, "doc2_max": max(prices2) if prices2 else 0 } # حساب التغيير في إجمالي الأسعار if price_diff["doc1_total"] > 0: price_diff["total_change_percentage"] = ((price_diff["doc2_total"] - price_diff["doc1_total"]) / price_diff["doc1_total"]) * 100 else: price_diff["total_change_percentage"] = 0 return price_diff def analyze_date_changes(self, text1, text2): """تحليل التغييرات في التواريخ بين نسختي المستند""" # البحث عن التواريخ بالصيغ المختلفة date_patterns = [ r'\d{1,2}/\d{1,2}/\d{2,4}', # DD/MM/YYYY or MM/DD/YYYY r'\d{1,2}-\d{1,2}-\d{2,4}', # DD-MM-YYYY or MM-DD-YYYY r'\d{2,4}/\d{1,2}/\d{1,2}', # YYYY/MM/DD r'\d{2,4}-\d{1,2}-\d{1,2}', # YYYY-MM-DD r'\d{1,2}\s+(?:يناير|فبراير|مارس|أبريل|مايو|يونيو|يوليو|أغسطس|سبتمبر|أكتوبر|نوفمبر|ديسمبر)\s+\d{2,4}' # DD شهر YYYY ] dates1 = [] dates2 = [] for pattern in date_patterns: dates1.extend(re.findall(pattern, text1)) dates2.extend(re.findall(pattern, text2)) # إنشاء تقرير التغييرات في التواريخ date_changes = { "doc1_dates_count": len(dates1), "doc2_dates_count": len(dates2), "doc1_dates": dates1[:10], # أول 10 تواريخ فقط "doc2_dates": dates2[:10], "common_dates": list(set(dates1).intersection(set(dates2))), "removed_dates": list(set(dates1) - set(dates2)), "added_dates": list(set(dates2) - set(dates1)) } return date_changes def render_document_comparison(self, text1, text2, title1="المستند الأول", title2="المستند الثاني"): """عرض مقارنة المستندات بالواجهة التفاعلية""" st.markdown("