import json import logging import os from pathlib import Path import time import warnings from PIL import Image from dawsonia import io from dawsonia import digitize from dawsonia.ml import ml from dawsonia.typing import Probability import gradio as gr from gradio_modal import Modal import numpy as np from numpy.typing import NDArray import pandas as pd import pooch import yaml from .visualizer import Page, TableCell logger = logging.getLogger(__name__) # Max number of images a user can upload at once MAX_IMAGES = int(os.environ.get("MAX_IMAGES", 5)) # Setup the cache directory to point to the directory where the example images # are located. The images must lay in the cache directory because otherwise they # have to be reuploaded when drag-and-dropped to the input image widget. GRADIO_CACHE = os.getenv("GRADIO_CACHE_DIR", ".gradio_cache") DATA_CACHE = os.path.join(GRADIO_CACHE, "data") EXAMPLES_DIRECTORY = os.path.join(os.getcwd(), "examples") # Example books PIPELINES: dict[str, dict[str, str]] = { "bjuröklubb": dict( url="https://git.smhi.se/ai-for-obs/data/-/raw/688c04f13e8e946962792fe4b4e0ded98800b154/raw_zarr/BJUR%C3%96KLUBB/DAGBOK_Bjur%C3%B6klubb_Station_Jan-Dec_1928.zarr.zip", known_hash="sha256:6d87b7f79836ae6373cfab11260fe28787d93fe16199fefede6697ccd750f71a", ), "härnösand": dict( url="https://git.smhi.se/ai-for-obs/data/-/raw/688c04f13e8e946962792fe4b4e0ded98800b154/raw_zarr/H%C3%84RN%C3%96SAND/DAGBOK_H%C3%A4rn%C3%B6sand_Station_1934.zarr.zip", known_hash="sha256:a58fdb6521214d0bd569c9325ce78d696738de28ce6ec869cde0d46616b697f2", ), } def run_dawsonia( table_fmt_config_override, first_page, last_page, prob_thresh, book, gallery, progress=gr.Progress(), ): if book is None: raise ValueError("You need to select / upload the pages to digitize") progress(0, desc="Dawsonia: starting") model_path = Path("data/models/dawsonia/2024-07-02") output_path = Path("output") print("Dawsonia: digitizing", book) table_fmt = book.table_format output_path_book = output_path / book.station_name output_path_book.mkdir(exist_ok=True, parents=True) (output_path_book / "probablities").mkdir(exist_ok=True) init_data: list[dict[str, NDArray]] = [ { key: np.empty(len(table_fmt.rows), dtype="O") for key in table_fmt.columns[table_idx] } for table_idx in table_fmt.preproc.idx_tables_size_verify ] collection = [] images = [] with warnings.catch_warnings(): warnings.simplefilter("ignore", FutureWarning) for page_number, im_from_gallery in zip(range(first_page, last_page), gallery): output_path_page = output_path_book / str(page_number) gr.Info(f"Digitizing {page_number = }") if ( not (output_path_book / str(page_number)) .with_suffix(".parquet") .exists() ): digitize.digitize_page_and_write_output( book, init_data, page_number=page_number, date_str=f"0000-page-{page_number}", model_path=model_path, model_predict=ml.model_predict, prob_thresh=prob_thresh, output_path_page=output_path_page, output_text_fmt=False, debug=False, ) progress_value = (page_number - first_page) / max(1, last_page - first_page) if results := read_page( output_path_book, str(page_number), prob_thresh, progress, progress_value, ): # , im_from_gallery[0]) page, im = results collection.append(page) images.append(im) else: gr.Info(f"No tables detected in {page_number = }") gr.Info("Pages were succesfully digitized ✨") # yield collection, images yield collection, gr.skip() def read_page( output_path_book: Path, prefix: str, prob_thresh: float, progress, progress_value, im_path_from_gallery: str = "", ): stats = digitize.Statistics.from_json( (output_path_book / "statistics" / prefix).with_suffix(".json") ) print(stats) progress(progress_value, desc=f"Dawsonia: {stats!s:.50}") if stats.tables_detected > 0: values_df = pd.read_parquet((output_path_book / prefix).with_suffix(".parquet")) prob_df = pd.read_parquet( (output_path_book / "probablities" / prefix).with_suffix(".parquet") ) table_meta = json.loads( (output_path_book / "table_meta" / prefix).with_suffix(".json").read_text() ) with Image.open( image_path := (output_path_book / "pages" / prefix).with_suffix(".webp") ) as im: width = im.width height = im.height values_array = values_df.values.flatten() prob_array = prob_df.values.flatten() bbox_array = np.hstack(table_meta["table_positions"]).reshape(-1, 4) cells = [ make_cell(value, bbox) for value, prob, bbox in zip(values_array, prob_array, bbox_array) if prob > prob_thresh ] return Page(width, height, cells, im_path_from_gallery or str(image_path)), im def make_cell(value: str, bbox: NDArray[np.int64]): y, x, h, w = bbox xmin, ymin = x - w // 2, y - h // 2 xmax, ymax = x + w // 2, y + h // 2 polygon = (xmin, ymin), (xmax, ymin), (xmax, ymax), (xmin, ymax), (xmin, ymin) return TableCell(polygon, text_x=x - w // 4, text_y=y, text=value) def all_example_images() -> list[str]: """ Get paths to all example images. """ examples = [ os.path.join(EXAMPLES_DIRECTORY, f"{pipeline}.png") for pipeline in PIPELINES ] return examples def get_selected_example_image( first_page, last_page, event: gr.SelectData ) -> tuple[str, io.Book, str] | None: """ Get the name of the pipeline that corresponds to the selected image. """ # for name, details in PIPELINES.items(): name, _ext = event.value["image"]["orig_name"].split(".") station_tf = Path("table_formats", name).with_suffix(".toml") if (last_page - first_page) > MAX_IMAGES: raise ValueError(f"Maximum images you can digitize is set to: {MAX_IMAGES}") if name in PIPELINES: book_path = pooch.retrieve(**PIPELINES[name], path=DATA_CACHE) first, last, book = io.read_book(book_path) book._name = name book.size_cell = [1.0, 1.0, 1.0, 1.0] return ( [book.read_image(pg) for pg in range(first_page, last_page)], book, book_path, station_tf.read_text(), ) def overwrite_table_format_file(book: io.Book, book_path, table_fmt: str): name = book.station_name table_fmt_dir = Path("table_formats") (table_fmt_dir / name).with_suffix(".toml").write_text(table_fmt) book.table_format = io.read_specific_table_format(table_fmt_dir, Path(book_path)) gr.Info(f"Overwritten table format file for {name}") return book with gr.Blocks() as submit: gr.Markdown( "🛈 Select or upload the image you want to transcribe. You can upload up to five images at a time." ) batch_book_state = gr.State() batch_book_path_state = gr.State() collection_submit_state = gr.State() with gr.Group(): with gr.Row(equal_height=True): with gr.Column(scale=5): batch_image_gallery = gr.Gallery( # file_types=[".pdf", ".zarr.zip"], label="Book to digitize (should be a .pdf or .zarr.zip file)", interactive=True, object_fit="scale-down", scale=1.0, ) with gr.Column(scale=2): first_page = gr.Number(3, label="First page of the book", precision=0) last_page = gr.Number(5, label="Last page of the book", precision=0) examples = gr.Gallery( all_example_images(), label="Examples", interactive=False, allow_preview=False, object_fit="scale-down", min_width=250, ) upload_button = gr.UploadButton(min_width=200) with Modal(visible=False) as edit_table_fmt_modal: with gr.Column(): gr.Markdown( "## Table format configuration\n" "Write a custom table format, overriding the default one. " "Click on the **Save** button when you are done." ) save_tf_button = gr.Button( "Save", variant="primary", scale=0, min_width=200 ) gr.HTML( ( "" "Read the docs for the table-formats spec" ". " ), padding=False, elem_classes="pipeline-help", ) table_fmt_config_override = gr.Code("", language="python") with gr.Row(): prob_thresh = gr.Slider( minimum=0.0, maximum=1.0, value=0.75, step=0.05, label="Prediction probability threshold", ) with gr.Row(): run_button = gr.Button("Digitize", variant="primary", scale=0, min_width=200) edit_table_fmt_button = gr.Button( "Edit table format", variant="secondary", scale=0, min_width=200 ) # All events interactions below examples.select( get_selected_example_image, (first_page, last_page), ( batch_image_gallery, batch_book_state, batch_book_path_state, table_fmt_config_override, ), trigger_mode="always_last", ) @batch_image_gallery.upload( inputs=batch_image_gallery, outputs=[batch_image_gallery], ) def validate_images(images): print(images) if len(images) > MAX_IMAGES: gr.Warning(f"Maximum images you can upload is set to: {MAX_IMAGES}") return gr.update(value=None) gr.Warning( "Digitizing uploaded images is not implemented yet! Work in progress!" ) raise NotImplementedError("WIP") return images run_button.click( fn=run_dawsonia, inputs=( table_fmt_config_override, first_page, last_page, prob_thresh, batch_book_state, batch_image_gallery, ), outputs=(collection_submit_state, batch_image_gallery), ) ## Table formats modal dialog box edit_table_fmt_button.click(lambda: Modal(visible=True), None, edit_table_fmt_modal) save_tf_button.click( overwrite_table_format_file, (batch_book_state, batch_book_path_state, table_fmt_config_override), (batch_book_state,), )