|
from __future__ import annotations |
|
|
|
import base64 |
|
import imghdr |
|
import io |
|
import json |
|
import logging |
|
import os |
|
import random |
|
import re |
|
import shutil |
|
import stat |
|
import tempfile |
|
import zipfile |
|
from collections import Counter |
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
from dataclasses import dataclass |
|
from pathlib import Path |
|
from typing import Dict, List, Tuple |
|
|
|
import gradio as gr |
|
import numpy as np |
|
import pandas as pd |
|
import yaml |
|
from PIL import Image |
|
|
|
|
|
try: |
|
import cv2 |
|
except ImportError: |
|
cv2 = None |
|
try: |
|
import imagehash |
|
except ImportError: |
|
imagehash = None |
|
try: |
|
import fastdup |
|
except ImportError: |
|
fastdup = None |
|
try: |
|
from ultralytics import YOLO |
|
except ImportError: |
|
YOLO = None |
|
try: |
|
from roboflow import Roboflow |
|
except ImportError: |
|
Roboflow = None |
|
try: |
|
from cleanlab.pruning import get_noise_indices |
|
except ImportError: |
|
get_noise_indices = None |
|
|
|
|
|
TMP_ROOT = Path(tempfile.gettempdir()) / "rf_datasets" |
|
TMP_ROOT.mkdir(parents=True, exist_ok=True) |
|
CPU_COUNT = int(os.getenv("QC_CPU", 1)) |
|
BATCH_SIZE = int(os.getenv("QC_BATCH", 4)) |
|
SAMPLE_LIMIT = int(os.getenv("QC_SAMPLE", 200)) |
|
|
|
DEFAULT_W = { |
|
"Integrity": 0.25, |
|
"Class balance": 0.10, |
|
"Image quality": 0.15, |
|
"Duplicates": 0.10, |
|
"Model QA": 0.30, |
|
"Label issues": 0.10, |
|
} |
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s") |
|
|
|
_model_cache: dict[str, YOLO] = {} |
|
|
|
autoinc = 0 |
|
|
|
|
|
|
|
|
|
@dataclass |
|
class QCConfig: |
|
blur_thr: float |
|
iou_thr: float |
|
conf_thr: float |
|
weights: str | None |
|
cpu_count: int = CPU_COUNT |
|
batch_size: int = BATCH_SIZE |
|
sample_limit:int = SAMPLE_LIMIT |
|
|
|
|
|
def load_yaml(path: Path) -> Dict: |
|
with path.open("r", encoding="utf-8") as f: |
|
return yaml.safe_load(f) |
|
|
|
|
|
def parse_label_file(path: Path) -> list[tuple[int, float, float, float, float]]: |
|
if not path or not path.exists() or path.stat().st_size == 0: |
|
return [] |
|
try: |
|
arr = np.loadtxt(path, dtype=float) |
|
if arr.ndim == 1: |
|
arr = arr.reshape(1, -1) |
|
return [tuple(row) for row in arr] |
|
except Exception: |
|
return [] |
|
|
|
|
|
def guess_image_dirs(root: Path) -> List[Path]: |
|
candidates = [ |
|
root / "images", |
|
root / "train" / "images", |
|
root / "valid" / "images", |
|
root / "val" / "images", |
|
root / "test" / "images", |
|
] |
|
return [d for d in candidates if d.exists()] |
|
|
|
|
|
def gather_dataset(root: Path, yaml_path: Path | None): |
|
if yaml_path is None: |
|
yamls = list(root.glob("*.yaml")) |
|
if not yamls: |
|
raise FileNotFoundError("Dataset YAML not found") |
|
yaml_path = yamls[0] |
|
meta = load_yaml(yaml_path) |
|
img_dirs = guess_image_dirs(root) |
|
if not img_dirs: |
|
raise FileNotFoundError("images/ directory missing") |
|
imgs = [p for d in img_dirs for p in d.rglob("*.*") if imghdr.what(p)] |
|
labels_roots = {d.parent / "labels" for d in img_dirs} |
|
lbls = [ |
|
next((lr / f"{p.stem}.txt" for lr in labels_roots if (lr / f"{p.stem}.txt").exists()), None) |
|
for p in imgs |
|
] |
|
return imgs, lbls, meta |
|
|
|
|
|
def get_model(weights: str) -> YOLO | None: |
|
if not weights or YOLO is None: |
|
return None |
|
if weights not in _model_cache: |
|
_model_cache[weights] = YOLO(weights) |
|
return _model_cache[weights] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
RF_RE = re.compile(r"https?://universe\.roboflow\.com/([^/]+)/([^/]+)/?([^/]*)") |
|
|
|
def parse_roboflow_url(url: str) -> tuple[str, str, int | None]: |
|
""" |
|
Return (workspace, project, version|None) β tolerates many RF URL flavours. |
|
Any nonβpositive or malformed version is treated as None. |
|
""" |
|
m = RF_RE.match(url.strip()) |
|
if not m: |
|
return None, None, None |
|
ws, proj, tail = m.groups() |
|
ver: int | None = None |
|
|
|
|
|
if tail.startswith("dataset/"): |
|
try: |
|
v = int(tail.split("dataset/", 1)[1]) |
|
if v > 0: |
|
ver = v |
|
except ValueError: |
|
pass |
|
|
|
|
|
if ver is None and "?version=" in url: |
|
try: |
|
v = int(url.split("?version=", 1)[1]) |
|
if v > 0: |
|
ver = v |
|
except ValueError: |
|
pass |
|
|
|
return ws, proj, ver |
|
|
|
|
|
def get_latest_version(rf: Roboflow, ws: str, proj: str) -> str | None: |
|
try: |
|
p = rf.workspace(ws).project(proj) |
|
versions = p.versions() |
|
vnums = [int(getattr(v, "version_number", getattr(v, "number", 0))) for v in versions] |
|
return str(max(vnums)) if vnums else None |
|
except Exception as e: |
|
logging.warning(f"RF latestβversion lookup failed: {e}") |
|
return None |
|
|
|
|
|
def download_roboflow_dataset( |
|
url: str, |
|
rf_api_key: str, |
|
fmt: str = "yolov8", |
|
) -> Tuple[Path, List[str], List[str]]: |
|
"""Return (dataset_location, class_names, splits). Caches by folder name.""" |
|
if Roboflow is None: |
|
raise RuntimeError("`roboflow` pip package not installed") |
|
|
|
ws, proj, ver = parse_roboflow_url(url) |
|
if not (ws and proj): |
|
raise ValueError(f"Bad Roboflow URL: {url!r}") |
|
|
|
rf = Roboflow(api_key=rf_api_key) |
|
|
|
|
|
if not ver or ver <= 0: |
|
latest = get_latest_version(rf, ws, proj) |
|
if latest is None: |
|
raise RuntimeError("Could not resolve latest Roboflow version") |
|
try: |
|
ver = int(latest) |
|
except ValueError: |
|
raise RuntimeError(f"Invalid latest version returned: {latest!r}") |
|
|
|
ds_dir = TMP_ROOT / f"{ws}_{proj}_v{ver}" |
|
if ds_dir.exists(): |
|
yaml_path = ds_dir / "data.yaml" |
|
class_names = load_yaml(yaml_path).get("names", []) if yaml_path.exists() else [] |
|
splits = [s for s in ("train","valid","test") if (ds_dir / s).exists()] |
|
return ds_dir, class_names, splits |
|
|
|
ds_dir.mkdir(parents=True, exist_ok=True) |
|
rf.workspace(ws).project(proj).version(ver).download(fmt, location=str(ds_dir)) |
|
|
|
yaml_path = ds_dir / "data.yaml" |
|
class_names = load_yaml(yaml_path).get("names", []) if yaml_path.exists() else [] |
|
splits = [s for s in ("train","valid","test") if (ds_dir / s).exists()] |
|
return ds_dir, class_names, splits |
|
|
|
|
|
|
|
|
|
def gather_class_counts(dataset_info_list, class_name_mapping): |
|
counts = Counter() |
|
for dloc, class_names, splits, _ in dataset_info_list: |
|
for split in splits: |
|
labels_dir = Path(dloc) / split / "labels" |
|
if not labels_dir.exists(): |
|
continue |
|
for lp in labels_dir.rglob("*.txt"): |
|
for cls_id, *_ in parse_label_file(lp): |
|
orig = class_names[int(cls_id)] if int(cls_id) < len(class_names) else None |
|
if orig is None: |
|
continue |
|
merged = class_name_mapping.get(orig, orig) |
|
counts[merged] += 1 |
|
return dict(counts) |
|
|
|
|
|
def _process_label_file(label_path: Path, class_names_dataset, class_name_mapping): |
|
im_name = label_path.stem + label_path.suffix.replace(".txt", ".jpg") |
|
img_classes = set() |
|
for cls_id, *_ in parse_label_file(label_path): |
|
if 0 <= cls_id < len(class_names_dataset): |
|
orig = class_names_dataset[int(cls_id)] |
|
new = class_name_mapping.get(orig, orig) |
|
img_classes.add(new) |
|
return im_name, img_classes |
|
|
|
|
|
def merge_datasets( |
|
dataset_info_list: List[Tuple[str, List[str], List[str], str]], |
|
class_map_df: pd.DataFrame, |
|
out_dir: Path = Path("merged_dataset"), |
|
seed: int = 1234, |
|
) -> Path: |
|
"""Return path to merged dataset ready for training/eval.""" |
|
random.seed(seed) |
|
if out_dir.exists(): |
|
shutil.rmtree(out_dir, onerror=lambda f, p, _: (os.chmod(p, stat.S_IWRITE), f(p))) |
|
(out_dir / "train/images").mkdir(parents=True, exist_ok=True) |
|
(out_dir / "train/labels").mkdir(parents=True, exist_ok=True) |
|
(out_dir / "valid/images").mkdir(parents=True, exist_ok=True) |
|
(out_dir / "valid/labels").mkdir(parents=True, exist_ok=True) |
|
|
|
class_name_mapping = { |
|
row["original_class"]: row["new_name"] if not row["remove"] else "__REMOVED__" |
|
for _, row in class_map_df.iterrows() |
|
} |
|
limits_per_merged = { |
|
row["new_name"]: int(row["max_images"]) |
|
for _, row in class_map_df.iterrows() |
|
if not row["remove"] |
|
} |
|
active_classes = [c for c in sorted(set(class_name_mapping.values())) if c != "__REMOVED__"] |
|
id_map = {cls: idx for idx, cls in enumerate(active_classes)} |
|
|
|
image_to_classes: dict[str, set[str]] = {} |
|
image_to_label: dict[str, Path] = {} |
|
class_to_images: dict[str, set[str]] = {c: set() for c in active_classes} |
|
|
|
for dloc, class_names_dataset, splits, _ in dataset_info_list: |
|
for split in splits: |
|
labels_root = Path(dloc) / split / "labels" |
|
if not labels_root.exists(): |
|
continue |
|
for lp in labels_root.rglob("*.txt"): |
|
im_name, cls_set = _process_label_file(lp, class_names_dataset, class_name_mapping) |
|
cls_set = {c for c in cls_set if c in active_classes} |
|
if not cls_set: |
|
continue |
|
img_path = str(lp).replace("labels", "images").replace(".txt", ".jpg") |
|
image_to_classes[img_path] = cls_set |
|
image_to_label[img_path] = lp |
|
for c in cls_set: |
|
class_to_images[c].add(img_path) |
|
|
|
selected_images: set[str] = set() |
|
counters = {c: 0 for c in active_classes} |
|
shuffle_pool = [img for imgs in class_to_images.values() for img in imgs] |
|
random.shuffle(shuffle_pool) |
|
|
|
for img in shuffle_pool: |
|
cls_set = image_to_classes[img] |
|
if any(counters[c] >= limits_per_merged.get(c, 0) for c in cls_set): |
|
continue |
|
selected_images.add(img) |
|
for c in cls_set: |
|
counters[c] += 1 |
|
|
|
for img in selected_images: |
|
split = "train" if random.random() < 0.9 else "valid" |
|
dst_img = out_dir / split / "images" / Path(img).name |
|
dst_img.parent.mkdir(parents=True, exist_ok=True) |
|
shutil.copy(img, dst_img) |
|
|
|
lp_src = image_to_label[img] |
|
dst_label = out_dir / split / "labels" / Path(lp_src).name |
|
dst_label.parent.mkdir(parents=True, exist_ok=True) |
|
with open(lp_src, "r") as f: |
|
lines = f.readlines() |
|
new_lines = [] |
|
for line in lines: |
|
parts = line.strip().split() |
|
if not parts: |
|
continue |
|
cid = int(parts[0]) |
|
dloc_match = next((cl for dloc2, cl, _, _ in dataset_info_list if str(lp_src).startswith(dloc2)), None) |
|
if dloc_match is None: |
|
continue |
|
orig_cls_name = dloc_match[cid] if cid < len(dloc_match) else None |
|
if orig_cls_name is None: |
|
continue |
|
merged_cls_name = class_name_mapping.get(orig_cls_name, orig_cls_name) |
|
if merged_cls_name not in active_classes: |
|
continue |
|
new_id = id_map[merged_cls_name] |
|
new_lines.append(" ".join([str(new_id)] + parts[1:])) |
|
if new_lines: |
|
with open(dst_label, "w") as f: |
|
f.write("\n".join(new_lines)) |
|
else: |
|
(out_dir / split / "images" / Path(img).name).unlink(missing_ok=True) |
|
|
|
data_yaml = { |
|
"path": str(out_dir.resolve()), |
|
"train": "train/images", |
|
"val": "valid/images", |
|
"nc": len(active_classes), |
|
"names": active_classes, |
|
} |
|
with open(out_dir / "data.yaml", "w") as f: |
|
yaml.safe_dump(data_yaml, f) |
|
|
|
return out_dir |
|
|
|
|
|
def zip_directory(folder: Path) -> bytes: |
|
buf = io.BytesIO() |
|
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf: |
|
for file in folder.rglob("*"): |
|
zf.write(file, arcname=file.relative_to(folder)) |
|
buf.seek(0) |
|
return buf.getvalue() |
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(css="#classdf td{min-width:120px}") as demo: |
|
gr.Markdown(""" |
|
# πΉ **YOLOΒ Dataset Toolkit** |
|
_Evaluate β’ Merge β’ Edit β’ Download_ |
|
""") |
|
|
|
|
|
with gr.Tab("Evaluate"): |
|
with gr.Row(): |
|
api_in = gr.Textbox(label="Roboflow API key", type="password") |
|
url_txt = gr.File(label=".txt of RF dataset URLs", file_types=['.txt']) |
|
with gr.Row(): |
|
zip_in = gr.File(label="Dataset ZIP") |
|
path_in = gr.Textbox(label="Server path") |
|
with gr.Row(): |
|
yaml_in = gr.File(label="Custom YAML", file_types=['.yaml']) |
|
weights_in = gr.File(label="YOLO weights (.pt)") |
|
blur_sl = gr.Slider(0.0, 500.0, value=100.0, label="Blur threshold") |
|
iou_sl = gr.Slider(0.0, 1.0, value=0.5, label="IOU threshold") |
|
conf_sl = gr.Slider(0.0, 1.0, value=0.25, label="Min detection confidence") |
|
run_dup = gr.Checkbox(label="Check duplicates (fastdup)") |
|
run_qa = gr.Checkbox(label="Run Model QA & cleanlab") |
|
run_eval = gr.Button("Run Evaluation") |
|
out_md = gr.Markdown() |
|
out_df = gr.Dataframe(label="Class distribution") |
|
|
|
def _evaluate_cb(api_key, url_txt, zip_file, server_path, yaml_file, weights, |
|
blur_thr, iou_thr, conf_thr, run_dup, run_modelqa): |
|
return "Evaluation disabled in this trimmed snippet.", pd.DataFrame() |
|
|
|
run_eval.click( |
|
_evaluate_cb, |
|
[api_in, url_txt, zip_in, path_in, yaml_in, weights_in, |
|
blur_sl, iou_sl, conf_sl, run_dup, run_qa], |
|
[out_md, out_df] |
|
) |
|
|
|
|
|
with gr.Tab("Merge / Edit"): |
|
gr.Markdown("### 1οΈβ£Β Load one or more datasets") |
|
rf_key = gr.Textbox(label="Roboflow API key", type="password") |
|
rf_urls = gr.File(label=".txt of RF URLs", file_types=['.txt']) |
|
zips_in = gr.Files(label="One or more dataset ZIPs") |
|
load_btn = gr.Button("Load datasets") |
|
load_log = gr.Markdown() |
|
ds_state = gr.State([]) |
|
|
|
def _load_cb(rf_key, rf_urls_file, zip_files): |
|
global autoinc |
|
info_list = [] |
|
log_lines = [] |
|
|
|
|
|
if rf_urls_file is not None: |
|
for url in Path(rf_urls_file.name).read_text().splitlines(): |
|
if not url.strip(): |
|
continue |
|
try: |
|
ds, names, splits = download_roboflow_dataset(url, rf_key) |
|
info_list.append((str(ds), names, splits, Path(ds).name)) |
|
log_lines.append(f"βοΈ RF dataset **{Path(ds).name}** loaded ({len(names)} classes)") |
|
except Exception as e: |
|
log_lines.append(f"β οΈ RF load failed for {url!r}: {e}") |
|
|
|
|
|
for f in zip_files or []: |
|
autoinc += 1 |
|
tmp = TMP_ROOT / f"zip_{autoinc}" |
|
tmp.mkdir(parents=True, exist_ok=True) |
|
shutil.unpack_archive(f.name, tmp) |
|
yaml_path = next(tmp.rglob("*.yaml"), None) |
|
if yaml_path is None: |
|
continue |
|
names = load_yaml(yaml_path).get("names", []) |
|
splits = [s for s in ("train","valid","test") if (tmp / s).exists()] |
|
info_list.append((str(tmp), names, splits, tmp.name)) |
|
log_lines.append(f"βοΈ ZIP **{tmp.name}** loaded") |
|
|
|
return info_list, "\n".join(log_lines) if log_lines else "No datasets loaded." |
|
|
|
load_btn.click(_load_cb, [rf_key, rf_urls, zips_in], [ds_state, load_log]) |
|
|
|
|
|
gr.Markdown("### 2οΈβ£Β Edit class mapping / limits / removal") |
|
class_df = gr.Dataframe( |
|
headers=["original_class", "new_name", "max_images", "remove"], |
|
datatype=["str", "str", "number", "bool"], |
|
interactive=True, elem_id="classdf" |
|
) |
|
refresh_btn = gr.Button("Build class table from loaded datasets") |
|
|
|
def _build_class_df(ds_info): |
|
class_names_all = [] |
|
for _dloc, names, _spl, _ in ds_info: |
|
class_names_all.extend(names) |
|
class_names_all = sorted(set(class_names_all)) |
|
df = pd.DataFrame({ |
|
"original_class": class_names_all, |
|
"new_name": class_names_all, |
|
"max_images": [99999] * len(class_names_all), |
|
"remove": [False] * len(class_names_all), |
|
}) |
|
return df |
|
|
|
refresh_btn.click(_build_class_df, [ds_state], [class_df]) |
|
|
|
|
|
merge_btn = gr.Button("Merge datasets β¨") |
|
zip_out = gr.File(label="Download merged ZIP") |
|
merge_log = gr.Markdown() |
|
|
|
def _merge_cb(ds_info, class_df): |
|
if not ds_info: |
|
return None, "β οΈΒ Load datasets first." |
|
out_dir = merge_datasets(ds_info, class_df) |
|
zip_path = shutil.make_archive(str(out_dir), "zip", out_dir) |
|
return zip_path, ( |
|
f"β
Β Merged dataset created at **{out_dir}** with " |
|
f"{len(list(Path(out_dir).rglob('*.jpg')))} images." |
|
) |
|
|
|
merge_btn.click(_merge_cb, [ds_state, class_df], [zip_out, merge_log]) |
|
|
|
|
|
if __name__ == "__main__": |
|
demo.launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT", 7860))) |
|
|