Spaces:
Running
Running
import json | |
import logging | |
import os | |
from pathlib import Path | |
import time | |
import warnings | |
from PIL import Image | |
from dawsonia import io | |
from dawsonia import digitize | |
from dawsonia.ml import ml | |
from dawsonia.typing import Probability | |
import gradio as gr | |
from gradio_modal import Modal | |
import numpy as np | |
from numpy.typing import NDArray | |
import pandas as pd | |
import pooch | |
import yaml | |
from .visualizer import Page, TableCell | |
logger = logging.getLogger(__name__) | |
# Max number of images a user can upload at once | |
MAX_IMAGES = int(os.environ.get("MAX_IMAGES", 5)) | |
# Setup the cache directory to point to the directory where the example images | |
# are located. The images must lay in the cache directory because otherwise they | |
# have to be reuploaded when drag-and-dropped to the input image widget. | |
GRADIO_CACHE = os.getenv("GRADIO_CACHE_DIR", ".gradio_cache") | |
DATA_CACHE = os.path.join(GRADIO_CACHE, "data") | |
EXAMPLES_DIRECTORY = os.path.join(os.getcwd(), "examples") | |
# Example books | |
PIPELINES: dict[str, dict[str, str]] = { | |
"bjuröklubb": dict( | |
url="https://git.smhi.se/ai-for-obs/data/-/raw/688c04f13e8e946962792fe4b4e0ded98800b154/raw_zarr/BJUR%C3%96KLUBB/DAGBOK_Bjur%C3%B6klubb_Station_Jan-Dec_1928.zarr.zip", | |
known_hash="sha256:6d87b7f79836ae6373cfab11260fe28787d93fe16199fefede6697ccd750f71a", | |
), | |
"härnösand": dict( | |
url="https://git.smhi.se/ai-for-obs/data/-/raw/688c04f13e8e946962792fe4b4e0ded98800b154/raw_zarr/H%C3%84RN%C3%96SAND/DAGBOK_H%C3%A4rn%C3%B6sand_Station_1934.zarr.zip", | |
known_hash="sha256:a58fdb6521214d0bd569c9325ce78d696738de28ce6ec869cde0d46616b697f2", | |
), | |
} | |
def run_dawsonia( | |
table_fmt_config_override, | |
first_page, | |
last_page, | |
prob_thresh, | |
book, | |
gallery, | |
progress=gr.Progress(), | |
): | |
if book is None: | |
raise ValueError("You need to select / upload the pages to digitize") | |
progress(0, desc="Dawsonia: starting") | |
model_path = Path("data/models/dawsonia/2024-07-02") | |
output_path = Path("output") | |
print("Dawsonia: digitizing", book) | |
table_fmt = book.table_format | |
output_path_book = output_path / book.station_name | |
output_path_book.mkdir(exist_ok=True, parents=True) | |
(output_path_book / "probablities").mkdir(exist_ok=True) | |
init_data: list[dict[str, NDArray]] = [ | |
{ | |
key: np.empty(len(table_fmt.rows), dtype="O") | |
for key in table_fmt.columns[table_idx] | |
} | |
for table_idx in table_fmt.preproc.idx_tables_size_verify | |
] | |
collection = [] | |
images = [] | |
with warnings.catch_warnings(): | |
warnings.simplefilter("ignore", FutureWarning) | |
for page_number, im_from_gallery in zip(range(first_page, last_page), gallery): | |
output_path_page = output_path_book / str(page_number) | |
gr.Info(f"Digitizing {page_number = }") | |
if ( | |
not (output_path_book / str(page_number)) | |
.with_suffix(".parquet") | |
.exists() | |
): | |
digitize.digitize_page_and_write_output( | |
book, | |
init_data, | |
page_number=page_number, | |
date_str=f"0000-page-{page_number}", | |
model_path=model_path, | |
model_predict=ml.model_predict, | |
prob_thresh=prob_thresh, | |
output_path_page=output_path_page, | |
output_text_fmt=False, | |
debug=False, | |
) | |
progress_value = (page_number - first_page) / max(1, last_page - first_page) | |
if results := read_page( | |
output_path_book, | |
str(page_number), | |
prob_thresh, | |
progress, | |
progress_value, | |
): # , im_from_gallery[0]) | |
page, im = results | |
collection.append(page) | |
images.append(im) | |
else: | |
gr.Info(f"No tables detected in {page_number = }") | |
gr.Info("Pages were succesfully digitized ✨") | |
# yield collection, images | |
yield collection, gr.skip() | |
def read_page( | |
output_path_book: Path, | |
prefix: str, | |
prob_thresh: float, | |
progress, | |
progress_value, | |
im_path_from_gallery: str = "", | |
): | |
stats = digitize.Statistics.from_json( | |
(output_path_book / "statistics" / prefix).with_suffix(".json") | |
) | |
print(stats) | |
progress(progress_value, desc=f"Dawsonia: {stats!s:.50}") | |
if stats.tables_detected > 0: | |
values_df = pd.read_parquet((output_path_book / prefix).with_suffix(".parquet")) | |
prob_df = pd.read_parquet( | |
(output_path_book / "probablities" / prefix).with_suffix(".parquet") | |
) | |
table_meta = json.loads( | |
(output_path_book / "table_meta" / prefix).with_suffix(".json").read_text() | |
) | |
with Image.open( | |
image_path := (output_path_book / "pages" / prefix).with_suffix(".webp") | |
) as im: | |
width = im.width | |
height = im.height | |
values_array = values_df.values.flatten() | |
prob_array = prob_df.values.flatten() | |
bbox_array = np.hstack(table_meta["table_positions"]).reshape(-1, 4) | |
cells = [ | |
make_cell(value, bbox) | |
for value, prob, bbox in zip(values_array, prob_array, bbox_array) | |
if prob > prob_thresh | |
] | |
return Page(width, height, cells, im_path_from_gallery or str(image_path)), im | |
def make_cell(value: str, bbox: NDArray[np.int64]): | |
y, x, h, w = bbox | |
xmin, ymin = x - w // 2, y - h // 2 | |
xmax, ymax = x + w // 2, y + h // 2 | |
polygon = (xmin, ymin), (xmax, ymin), (xmax, ymax), (xmin, ymax), (xmin, ymin) | |
return TableCell(polygon, text_x=x - w // 4, text_y=y, text=value) | |
def all_example_images() -> list[str]: | |
""" | |
Get paths to all example images. | |
""" | |
examples = [ | |
os.path.join(EXAMPLES_DIRECTORY, f"{pipeline}.png") for pipeline in PIPELINES | |
] | |
return examples | |
def get_selected_example_image( | |
first_page, last_page, event: gr.SelectData | |
) -> tuple[str, io.Book, str] | None: | |
""" | |
Get the name of the pipeline that corresponds to the selected image. | |
""" | |
# for name, details in PIPELINES.items(): | |
name, _ext = event.value["image"]["orig_name"].split(".") | |
station_tf = Path("table_formats", name).with_suffix(".toml") | |
if (last_page - first_page) > MAX_IMAGES: | |
raise ValueError(f"Maximum images you can digitize is set to: {MAX_IMAGES}") | |
if name in PIPELINES: | |
book_path = pooch.retrieve(**PIPELINES[name], path=DATA_CACHE) | |
first, last, book = io.read_book(book_path) | |
book._name = name | |
book.size_cell = [1.0, 1.0, 1.0, 1.0] | |
return ( | |
[book.read_image(pg) for pg in range(first_page, last_page)], | |
book, | |
book_path, | |
station_tf.read_text(), | |
) | |
def overwrite_table_format_file(book: io.Book, book_path, table_fmt: str): | |
name = book.station_name | |
table_fmt_dir = Path("table_formats") | |
(table_fmt_dir / name).with_suffix(".toml").write_text(table_fmt) | |
book.table_format = io.read_specific_table_format(table_fmt_dir, Path(book_path)) | |
gr.Info(f"Overwritten table format file for {name}") | |
return book | |
with gr.Blocks() as submit: | |
gr.Markdown( | |
"🛈 Select or upload the image you want to transcribe. You can upload up to five images at a time." | |
) | |
batch_book_state = gr.State() | |
batch_book_path_state = gr.State() | |
collection_submit_state = gr.State() | |
with gr.Group(): | |
with gr.Row(equal_height=True): | |
with gr.Column(scale=5): | |
batch_image_gallery = gr.Gallery( | |
# file_types=[".pdf", ".zarr.zip"], | |
label="Book to digitize (should be a .pdf or .zarr.zip file)", | |
interactive=True, | |
object_fit="scale-down", | |
scale=1.0, | |
) | |
with gr.Column(scale=2): | |
first_page = gr.Number(3, label="First page of the book", precision=0) | |
last_page = gr.Number(5, label="Last page of the book", precision=0) | |
examples = gr.Gallery( | |
all_example_images(), | |
label="Examples", | |
interactive=False, | |
allow_preview=False, | |
object_fit="scale-down", | |
min_width=250, | |
) | |
upload_button = gr.UploadButton(min_width=200) | |
with Modal(visible=False) as edit_table_fmt_modal: | |
with gr.Column(): | |
gr.Markdown( | |
"## Table format configuration\n" | |
"Write a custom table format, overriding the default one. " | |
"Click on the **Save** button when you are done." | |
) | |
save_tf_button = gr.Button( | |
"Save", variant="primary", scale=0, min_width=200 | |
) | |
gr.HTML( | |
( | |
"<a href='https://dawsonia.readthedocs.io/en/latest/user_guide/misc.html#table-formats' target='_blank'>" | |
"Read the docs for the table-formats spec" | |
"</a>. " | |
), | |
padding=False, | |
elem_classes="pipeline-help", | |
) | |
table_fmt_config_override = gr.Code("", language="python") | |
with gr.Row(): | |
prob_thresh = gr.Slider( | |
minimum=0.0, | |
maximum=1.0, | |
value=0.75, | |
step=0.05, | |
label="Prediction probability threshold", | |
) | |
with gr.Row(): | |
run_button = gr.Button("Digitize", variant="primary", scale=0, min_width=200) | |
edit_table_fmt_button = gr.Button( | |
"Edit table format", variant="secondary", scale=0, min_width=200 | |
) | |
# All events interactions below | |
examples.select( | |
get_selected_example_image, | |
(first_page, last_page), | |
( | |
batch_image_gallery, | |
batch_book_state, | |
batch_book_path_state, | |
table_fmt_config_override, | |
), | |
trigger_mode="always_last", | |
) | |
def validate_images(images): | |
print(images) | |
if len(images) > MAX_IMAGES: | |
gr.Warning(f"Maximum images you can upload is set to: {MAX_IMAGES}") | |
return gr.update(value=None) | |
gr.Warning( | |
"Digitizing uploaded images is not implemented yet! Work in progress!" | |
) | |
raise NotImplementedError("WIP") | |
return images | |
run_button.click( | |
fn=run_dawsonia, | |
inputs=( | |
table_fmt_config_override, | |
first_page, | |
last_page, | |
prob_thresh, | |
batch_book_state, | |
batch_image_gallery, | |
), | |
outputs=(collection_submit_state, batch_image_gallery), | |
) | |
## Table formats modal dialog box | |
edit_table_fmt_button.click(lambda: Modal(visible=True), None, edit_table_fmt_modal) | |
save_tf_button.click( | |
overwrite_table_format_file, | |
(batch_book_state, batch_book_path_state, table_fmt_config_override), | |
(batch_book_state,), | |
) | |