Spaces:
Running
Running
import csv | |
import json | |
import urllib.parse | |
import requests | |
import tempfile | |
import zipfile | |
import os | |
import re | |
import threading | |
from pathlib import Path | |
import gradio as gr | |
from fuzzywuzzy import fuzz | |
from pyarabic.araby import strip_tashkeel | |
def main() -> None: | |
with gr.Blocks( | |
theme=gr.themes.Default(font=[gr.themes.GoogleFont('Noto Sans Arabic'), 'Arial', 'sans-serif']), | |
css='\n'.join([ | |
'html, body, .gradio-container { direction: rtl !important; }', | |
'h1 { text-align: center; display: block; }', | |
'th, td { text-align: right !important; }', | |
'th span { white-space: nowrap !important; }', | |
'.icon-wrap { right: unset !important; left: var(--size-3) !important; }', | |
]) | |
) as demo: | |
index_state = gr.State() | |
results_data = gr.State() | |
selected_book_id = gr.State() | |
gr.Markdown('# ابحث في كتب المكتبة الوقفية 📚', rtl=True) | |
title = gr.Textbox(label='عنوان الكتاب', placeholder='اكتب عنوان الكتاب', lines=1, rtl=True) | |
with gr.Row(): | |
category = gr.Dropdown(choices=['جارٍ التحميل...'], label='التصنيف (اختياري)', interactive=False) | |
author = gr.Dropdown(choices=['جارٍ التحميل...'], label='المؤلف (اختياري)', interactive=False) | |
search_button = gr.Button('ابحث') | |
gr.Markdown('## النتائج 🎯', rtl=True) | |
results = gr.Dataframe( | |
headers=[ | |
'#', | |
'العنوان', | |
'الصفحات', | |
'الأجزاء', | |
'المؤلف', | |
'التصنيف', | |
'درجة التطابق', | |
], | |
interactive=False, | |
) | |
download_label = gr.Markdown('### تحميل ملفات الكتاب 📥', visible=False, rtl=True) | |
with gr.Row(visible=False) as download_buttons_box: | |
download_pdf_button = gr.Button('تحميل بصيغة PDF') | |
download_txt_button = gr.Button('تحميل بصيغة TXT') | |
download_docx_button = gr.Button('تحميل بصيغة DOCX') | |
download_output = gr.File(label='تنزيل الملف 📥', visible=False) | |
downloading_text = gr.Markdown('جارٍ تجهيز الملف للتحميل...', visible=False, rtl=True) | |
demo.load(load_data, outputs=[index_state, category, author]) | |
search_button.click( | |
fn=lambda t, c, a, idx: handle_search(idx, t, c, a), | |
inputs=[title, category, author, index_state], | |
outputs=[results, results_data, download_buttons_box, download_label, download_output, downloading_text], | |
) | |
results.select( | |
fn=show_details, | |
inputs=[results_data], | |
outputs=[download_label, selected_book_id, download_buttons_box], | |
) | |
download_pdf_button.click( | |
lambda: gr.update(visible=True), inputs=None, outputs=downloading_text | |
).then( | |
fn=download_book, | |
inputs=[index_state, selected_book_id, gr.State('pdf')], | |
outputs=download_output, | |
).then( | |
lambda: (gr.update(visible=False), gr.update(visible=True)), | |
inputs=None, | |
outputs=[downloading_text, download_output], | |
) | |
download_txt_button.click( | |
lambda: gr.update(visible=True), inputs=None, outputs=downloading_text | |
).then( | |
fn=download_book, | |
inputs=[index_state, selected_book_id, gr.State('txt')], | |
outputs=download_output, | |
).then( | |
lambda: (gr.update(visible=False), gr.update(visible=True)), | |
inputs=None, | |
outputs=[downloading_text, download_output], | |
) | |
download_docx_button.click( | |
lambda: gr.update(visible=True), inputs=None, outputs=downloading_text | |
).then( | |
fn=download_book, | |
inputs=[index_state, selected_book_id, gr.State('docx')], | |
outputs=download_output, | |
).then( | |
lambda: (gr.update(visible=False), gr.update(visible=True)), | |
inputs=None, | |
outputs=[downloading_text, download_output], | |
) | |
demo.launch() | |
def load_data() -> tuple[list[list[str | int]], gr.Dropdown, gr.Dropdown]: | |
_index = load_index() | |
_categories = get_categories(_index) | |
_authors = get_authors(_index) | |
return ( | |
_index, | |
gr.update(choices=_categories, value=_categories[0], interactive=True), | |
gr.update(choices=_authors, value=_authors[0], interactive=True), | |
) | |
def load_index() -> list[list[str | int]]: | |
with open('index.tsv', 'r', encoding='utf-8') as file: | |
data = list(csv.reader(file, delimiter='\t'))[1:] | |
for i in range(len(data)): | |
data[i] = [i + 1] + data[i] + [normalize_text(data[i][2])] | |
return data | |
def get_categories(index: list[list[str | int]]) -> list[str]: | |
return [''] + sorted(set([row[1] for row in index if row[1]])) | |
def get_authors(index: list[list[str | int]]) -> list[str]: | |
return [''] + sorted(set([row[2] for row in index if row[2]])) | |
def handle_search( | |
index: list[list[str | int]], | |
title: str, | |
category: str, | |
author: str, | |
) -> tuple: | |
title = normalize_text(title) | |
reset_buttons = gr.update(visible=False) | |
reset_label = gr.update(visible=False) | |
reset_download_file = gr.update(visible=False) | |
reset_downloading_text = gr.update(visible=False) | |
if not title.strip(): | |
return [ | |
[['', 'يرجى إدخال عنوان للبحث.', '', '', '']], | |
[['', 'يرجى إدخال عنوان للبحث.', '', '', '']], | |
reset_buttons, | |
reset_label, | |
reset_download_file, | |
reset_downloading_text, | |
] | |
filtered = index | |
if category and category != '': | |
filtered = [row for row in filtered if row[1] == category] | |
if author and author != '': | |
filtered = [row for row in filtered if row[2] == author] | |
scored_results = [] | |
for row in filtered: | |
score = fuzz.partial_ratio(title, row[-1]) | |
if score > 50: | |
scored_results.append((score, row)) | |
if not scored_results: | |
return [ | |
[['', 'لم يتم العثور على نتائج مطابقة.', '', '', '']], | |
[['', 'لم يتم العثور على نتائج مطابقة.', '', '', '']], | |
reset_buttons, | |
reset_label, | |
reset_download_file, | |
reset_downloading_text, | |
] | |
scored_results.sort(reverse=True) | |
result_table = [[row[0], row[3], row[4], row[5], row[2], row[1], score] for score, row in scored_results[:100]] | |
return [result_table, result_table, reset_buttons, reset_label, reset_download_file, reset_downloading_text] | |
def show_details(evt: gr.SelectData, results_data: list[list[str | int]]) -> tuple: | |
return [gr.update(visible=True), results_data[evt.index[0]][0], gr.update(visible=True)] | |
def download_book(index_state: list[list[str | int]], selected_book_id: int, file_type: str) -> str: | |
selected_book = index_state[selected_book_id - 1] | |
title = f'{selected_book[3]} - {selected_book[2]}' | |
file_paths = json.loads(selected_book[6 + {'pdf': 0, 'txt': 1, 'docx': 2}[file_type]].replace("'", '"')) | |
if not file_paths: | |
raise Exception('الملف غير متوفر') | |
safe_title = sanitize_filename(title) | |
if len(file_paths) == 1: | |
url = f'https://huggingface.co/datasets/ieasybooks-org/waqfeya-library/resolve/main/{urllib.parse.quote(file_paths[0][2:])}' | |
response = requests.get(url) | |
ext = Path(file_paths[0]).suffix | |
temp_file_name = os.path.join(tempfile.gettempdir(), f'{safe_title}{ext}') | |
temp_file = open(temp_file_name, 'wb') | |
temp_file.write(response.content) | |
temp_file.close() | |
schedule_file_deletion(temp_file_name) | |
return temp_file_name | |
else: | |
temp_dir = tempfile.mkdtemp() | |
zip_path = os.path.join(temp_dir, f'{safe_title}.zip') | |
with zipfile.ZipFile(zip_path, 'w') as zipf: | |
for path in file_paths: | |
url = f'https://huggingface.co/datasets/ieasybooks-org/waqfeya-library/resolve/main/{urllib.parse.quote(path[2:])}' | |
response = requests.get(url) | |
filename = os.path.basename(path) | |
full_path = os.path.join(temp_dir, filename) | |
with open(full_path, 'wb') as f: | |
f.write(response.content) | |
zipf.write(full_path, arcname=filename) | |
schedule_file_deletion(zip_path) | |
return zip_path | |
def schedule_file_deletion(file_path: str, delay: int = 3): | |
def delete_file(): | |
if os.path.exists(file_path): | |
os.remove(file_path) | |
threading.Timer(delay, delete_file).start() | |
def sanitize_filename(name: str) -> str: | |
return re.sub(r'[\\/:"*?<>|]+', '', name).strip() | |
def normalize_text(text: str) -> str: | |
text = strip_tashkeel(text) | |
text = text.replace('أ', 'ا') | |
text = text.replace('إ', 'ا') | |
text = text.replace('آ', 'ا') | |
text = text.replace('ي', 'ى') | |
text = text.replace('ة', 'ه') | |
return text | |
if __name__ == '__main__': | |
main() | |