|
from __future__ import annotations |
|
|
|
import imghdr |
|
import json |
|
import os |
|
import re |
|
import shutil |
|
import tempfile |
|
from collections import Counter |
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
from dataclasses import dataclass |
|
from pathlib import Path |
|
from typing import Dict, List, Tuple |
|
|
|
import gradio as gr |
|
import numpy as np |
|
import pandas as pd |
|
import yaml |
|
from PIL import Image |
|
|
|
|
|
try: |
|
import cv2 |
|
except ImportError: |
|
cv2 = None |
|
try: |
|
import imagehash |
|
except ImportError: |
|
imagehash = None |
|
try: |
|
import fastdup |
|
except ImportError: |
|
fastdup = None |
|
try: |
|
from ultralytics import YOLO |
|
except ImportError: |
|
YOLO = None |
|
try: |
|
from roboflow import Roboflow |
|
except ImportError: |
|
Roboflow = None |
|
try: |
|
from cleanlab.pruning import get_noise_indices |
|
except ImportError: |
|
get_noise_indices = None |
|
|
|
|
|
TMP_ROOT = Path(tempfile.gettempdir()) / "rf_datasets" |
|
TMP_ROOT.mkdir(parents=True, exist_ok=True) |
|
CPU_COUNT = int(os.getenv("QC_CPU", 1)) |
|
BATCH_SIZE = int(os.getenv("QC_BATCH", 4)) |
|
SAMPLE_LIMIT = int(os.getenv("QC_SAMPLE", 200)) |
|
|
|
DEFAULT_W = { |
|
"Integrity": 0.25, |
|
"Class balance": 0.10, |
|
"Image quality": 0.15, |
|
"Duplicates": 0.10, |
|
"Model QA": 0.30, |
|
"Label issues": 0.10, |
|
} |
|
|
|
_model_cache: dict[str, YOLO] = {} |
|
|
|
@dataclass |
|
class QCConfig: |
|
blur_thr: float |
|
iou_thr: float |
|
conf_thr: float |
|
weights: str | None |
|
cpu_count: int = CPU_COUNT |
|
batch_size: int = BATCH_SIZE |
|
sample_limit:int = SAMPLE_LIMIT |
|
|
|
|
|
def load_yaml(path: Path) -> Dict: |
|
with path.open('r', encoding='utf-8') as f: |
|
return yaml.safe_load(f) |
|
|
|
def parse_label_file(path: Path) -> list[tuple[int, float, float, float, float]]: |
|
if not path or not path.exists() or path.stat().st_size == 0: |
|
return [] |
|
try: |
|
arr = np.loadtxt(path, dtype=float) |
|
if arr.ndim == 1: |
|
arr = arr.reshape(1, -1) |
|
return [tuple(row) for row in arr] |
|
except: |
|
return [] |
|
|
|
def guess_image_dirs(root: Path) -> List[Path]: |
|
candidates = [ |
|
root/'images', |
|
root/'train'/'images', |
|
root/'valid'/'images', |
|
root/'val' /'images', |
|
root/'test' /'images', |
|
] |
|
return [d for d in candidates if d.exists()] |
|
|
|
def gather_dataset(root: Path, yaml_path: Path | None): |
|
if yaml_path is None: |
|
yamls = list(root.glob('*.yaml')) |
|
if not yamls: |
|
raise FileNotFoundError("Dataset YAML not found") |
|
yaml_path = yamls[0] |
|
meta = load_yaml(yaml_path) |
|
img_dirs = guess_image_dirs(root) |
|
if not img_dirs: |
|
raise FileNotFoundError("images/ directory missing") |
|
imgs = [p for d in img_dirs for p in d.rglob('*.*') if imghdr.what(p)] |
|
labels_roots = {d.parent/'labels' for d in img_dirs} |
|
lbls = [ |
|
next((lr/f"{p.stem}.txt" for lr in labels_roots if (lr/f"{p.stem}.txt").exists()), None) |
|
for p in imgs |
|
] |
|
return imgs, lbls, meta |
|
|
|
def get_model(weights: str) -> YOLO | None: |
|
if not weights or YOLO is None: |
|
return None |
|
if weights not in _model_cache: |
|
_model_cache[weights] = YOLO(weights) |
|
return _model_cache[weights] |
|
|
|
|
|
def _quality_stat_args(args: Tuple[Path, float]) -> Tuple[Path, bool, bool, bool]: |
|
path, thr = args |
|
if cv2 is None: |
|
return path, False, False, False |
|
im = cv2.imread(str(path)) |
|
if im is None: |
|
return path, False, False, False |
|
gray = cv2.cvtColor(im, cv2.COLOR_BGR2GRAY) |
|
lap = cv2.Laplacian(gray, cv2.CV_64F).var() |
|
mean = gray.mean() |
|
return path, lap < thr, mean < 25, mean > 230 |
|
|
|
def _is_corrupt(path: Path) -> bool: |
|
try: |
|
with Image.open(path) as im: |
|
im.verify() |
|
return False |
|
except: |
|
return True |
|
|
|
|
|
def qc_integrity(imgs: List[Path], lbls: List[Path], cfg: QCConfig) -> Dict: |
|
missing = [i for i, l in zip(imgs, lbls) if l is None] |
|
corrupt = [] |
|
sample = imgs[:cfg.sample_limit] |
|
with ThreadPoolExecutor(max_workers=cfg.cpu_count) as ex: |
|
fut = {ex.submit(_is_corrupt, p): p for p in sample} |
|
for f in as_completed(fut): |
|
if f.result(): |
|
corrupt.append(fut[f]) |
|
score = 100 - (len(missing) + len(corrupt)) / max(len(imgs), 1) * 100 |
|
return { |
|
"name": "Integrity", |
|
"score": max(score, 0), |
|
"details": { |
|
"missing_label_files": [str(p) for p in missing], |
|
"corrupt_images": [str(p) for p in corrupt], |
|
} |
|
} |
|
|
|
def qc_class_balance(lbls: List[Path], cfg: QCConfig) -> Dict: |
|
counts, boxes = Counter(), [] |
|
for l in lbls[:cfg.sample_limit]: |
|
bs = parse_label_file(l) if l else [] |
|
boxes.append(len(bs)) |
|
counts.update(b[0] for b in bs) |
|
if not counts: |
|
return {"name":"Class balance","score":0,"details":"No labels"} |
|
bal = min(counts.values()) / max(counts.values()) * 100 |
|
return { |
|
"name":"Class balance", |
|
"score":bal, |
|
"details":{ |
|
"class_counts": dict(counts), |
|
"boxes_per_image": { |
|
"min": min(boxes), |
|
"max": max(boxes), |
|
"mean": float(np.mean(boxes)) |
|
} |
|
} |
|
} |
|
|
|
def qc_image_quality(imgs: List[Path], cfg: QCConfig) -> Dict: |
|
if cv2 is None: |
|
return {"name":"Image quality","score":100,"details":"cv2 missing"} |
|
blurry, dark, bright = [], [], [] |
|
sample = imgs[:cfg.sample_limit] |
|
with ThreadPoolExecutor(max_workers=cfg.cpu_count) as ex: |
|
args = [(p, cfg.blur_thr) for p in sample] |
|
for p, isb, isd, isB in ex.map(_quality_stat_args, args): |
|
if isb: blurry.append(p) |
|
if isd: dark.append(p) |
|
if isB: bright.append(p) |
|
bad = len({*blurry, *dark, *bright}) |
|
score = 100 - bad / max(len(sample), 1) * 100 |
|
return { |
|
"name":"Image quality", |
|
"score":score, |
|
"details":{ |
|
"blurry": [str(p) for p in blurry], |
|
"dark": [str(p) for p in dark], |
|
"bright": [str(p) for p in bright] |
|
} |
|
} |
|
|
|
def qc_duplicates(imgs: List[Path], cfg: QCConfig) -> Dict: |
|
if fastdup is not None and len(imgs) > 50: |
|
try: |
|
fd = fastdup.create( |
|
input_dir=str(Path(imgs[0]).parent.parent), |
|
work_dir=str(TMP_ROOT / "fastdup") |
|
) |
|
fd.run() |
|
|
|
|
|
try: |
|
cc = fd.connected_components_grouped(sort_by="comp_size", ascending=False) |
|
if "files" in cc.columns: |
|
clusters = cc["files"].tolist() |
|
else: |
|
|
|
clusters = ( |
|
cc.groupby("component")["filename"] |
|
.apply(list) |
|
.tolist() |
|
) |
|
except Exception: |
|
|
|
clusters = fd.connected_components() |
|
|
|
dup = sum(len(c) - 1 for c in clusters) |
|
score = max(0.0, 100 - dup / len(imgs) * 100) |
|
return { |
|
"name": "Duplicates", |
|
"score": score, |
|
"details": {"groups": clusters[:50]} |
|
} |
|
except Exception as e: |
|
return { |
|
"name": "Duplicates", |
|
"score": 100.0, |
|
"details": {"fastdup_error": str(e)} |
|
} |
|
return {"name": "Duplicates", "score": 100.0, "details": {"note": "skipped"}} |
|
|
|
def qc_model_qa(imgs: List[Path], lbls: List[Path], cfg: QCConfig) -> Dict: |
|
model = get_model(cfg.weights) |
|
if model is None: |
|
return {"name":"Model QA","score":100,"details":"skipped"} |
|
ious, mism = [], [] |
|
sample = imgs[:cfg.sample_limit] |
|
for i in range(0, len(sample), cfg.batch_size): |
|
batch = sample[i:i+cfg.batch_size] |
|
results = model.predict(batch, verbose=False, half=True, dynamic=True) |
|
for p, res in zip(batch, results): |
|
gt = parse_label_file(Path(p).parent.parent/'labels'/f"{Path(p).stem}.txt") |
|
for cls, x, y, w, h in gt: |
|
best = 0.0 |
|
for b, c, conf in zip( |
|
res.boxes.xywh.cpu().numpy(), |
|
res.boxes.cls.cpu().numpy(), |
|
res.boxes.conf.cpu().numpy() |
|
): |
|
if conf < cfg.conf_thr or int(c) != cls: |
|
continue |
|
best = max(best, _rel_iou((x, y, w, h), tuple(b))) |
|
ious.append(best) |
|
if best < cfg.iou_thr: |
|
mism.append(str(p)) |
|
miou = float(np.mean(ious)) if ious else 1.0 |
|
return { |
|
"name":"Model QA", |
|
"score":miou*100, |
|
"details":{"mean_iou":miou, "mismatches":mism[:50]} |
|
} |
|
|
|
def qc_label_issues(imgs: List[Path], lbls: List[Path], cfg: QCConfig) -> Dict: |
|
if get_noise_indices is None: |
|
return {"name":"Label issues","score":100,"details":"skipped"} |
|
labels, idxs = [], [] |
|
sample = imgs[:cfg.sample_limit] |
|
for i, p in enumerate(sample): |
|
bs = parse_label_file(lbls[i]) if lbls[i] else [] |
|
for cls, *_ in bs: |
|
labels.append(int(cls)) |
|
idxs.append(i) |
|
if not labels: |
|
return {"name":"Label issues","score":100,"details":"no GT"} |
|
labels_arr = np.array(labels) |
|
uniq = sorted(set(labels_arr)) |
|
probs = np.eye(len(uniq))[np.searchsorted(uniq, labels_arr)] |
|
noise = get_noise_indices(labels=labels_arr, probabilities=probs) |
|
flags = sorted({idxs[n] for n in noise}) |
|
files = [str(sample[i]) for i in flags] |
|
score = 100 - len(flags)/len(labels)*100 |
|
return { |
|
"name":"Label issues", |
|
"score":score, |
|
"details":{"files":files[:50]} |
|
} |
|
|
|
def _rel_iou(b1, b2): |
|
x1, y1, w1, h1 = b1 |
|
x2, y2, w2, h2 = b2 |
|
xa1, ya1 = x1-w1/2, y1-h1/2 |
|
xa2, ya2 = x1+w1/2, y1+h1/2 |
|
xb1, yb1 = x2-w2/2, y2-h2/2 |
|
xb2, yb2 = x2+w2/2, y2+h2/2 |
|
ix1 = max(xa1, xb1); iy1 = max(ya1, yb1) |
|
ix2 = min(xa2, xb2); iy2 = min(ya2, yb2) |
|
inter = max(ix2-ix1, 0) * max(iy2-iy1, 0) |
|
union = w1*h1 + w2*h2 - inter |
|
return inter/union if union else 0.0 |
|
|
|
def aggregate(results: List[Dict]) -> float: |
|
return sum(DEFAULT_W[r["name"]]*r["score"] for r in results) |
|
|
|
RF_RE = re.compile(r"https?://universe\.roboflow\.com/([^/]+)/([^/]+)/dataset/(\d+)") |
|
|
|
def download_rf_dataset(url: str, rf_api: Roboflow, dest: Path) -> Path: |
|
m = RF_RE.match(url.strip()) |
|
if not m: |
|
raise ValueError(f"Bad RF URL: {url}") |
|
ws, proj, ver = m.groups() |
|
ds_dir = dest/f"{ws}_{proj}_v{ver}" |
|
if ds_dir.exists(): |
|
return ds_dir |
|
pr = rf_api.workspace(ws).project(proj) |
|
pr.version(int(ver)).download("yolov8", location=str(ds_dir)) |
|
return ds_dir |
|
|
|
def run_quality( |
|
root: Path, |
|
yaml_file: Path | None, |
|
weights: Path | None, |
|
cfg: QCConfig, |
|
run_dup: bool, |
|
run_modelqa: bool |
|
) -> Tuple[str, pd.DataFrame]: |
|
imgs, lbls, meta = gather_dataset(root, yaml_file) |
|
results = [ |
|
qc_integrity(imgs, lbls, cfg), |
|
qc_class_balance(lbls, cfg), |
|
qc_image_quality(imgs, cfg), |
|
qc_duplicates(imgs, cfg) if run_dup else {"name":"Duplicates","score":100,"details":"skipped"}, |
|
qc_model_qa(imgs, lbls, cfg) if run_modelqa else {"name":"Model QA","score":100,"details":"skipped"}, |
|
qc_label_issues(imgs, lbls, cfg) if run_modelqa else {"name":"Label issues","score":100,"details":"skipped"}, |
|
] |
|
final = aggregate(results) |
|
|
|
md = [f"## **{meta.get('name', root.name)}** β Score {final:.1f}/100"] |
|
for r in results: |
|
md.append(f"### {r['name']} {r['score']:.1f}") |
|
md.append("<details><summary>details</summary>\n```json") |
|
md.append(json.dumps(r["details"], indent=2)) |
|
md.append("```\n</details>\n") |
|
|
|
df = pd.DataFrame.from_dict( |
|
next(r for r in results if r["name"] == "Class balance")["details"]["class_counts"], |
|
orient="index", columns=["count"] |
|
) |
|
df.index.name = "class" |
|
return "\n".join(md), df |
|
|
|
with gr.Blocks(title="YOLO Dataset Quality Evaluator v3") as demo: |
|
gr.Markdown(""" |
|
# YOLOv8 Dataset Quality Evaluator v3 |
|
|
|
* Configurable blur, IOU & confidence thresholds |
|
* Optional duplicates (fastdup) |
|
* Optional Model QA & cleanlab label-issue detection |
|
* Model caching for speed |
|
""") |
|
with gr.Row(): |
|
api_in = gr.Textbox(label="Roboflow API key", type="password") |
|
url_txt = gr.File(label=".txt of RF dataset URLs", file_types=['.txt']) |
|
with gr.Row(): |
|
zip_in = gr.File(label="Dataset ZIP") |
|
path_in = gr.Textbox(label="Server path") |
|
with gr.Row(): |
|
yaml_in = gr.File(label="Custom YAML", file_types=['.yaml']) |
|
weights_in = gr.File(label="YOLO weights (.pt)") |
|
with gr.Row(): |
|
blur_sl = gr.Slider(0.0, 500.0, value=100.0, label="Blur threshold") |
|
iou_sl = gr.Slider(0.0, 1.0, value=0.5, label="IOU threshold") |
|
conf_sl = gr.Slider(0.0, 1.0, value=0.25, label="Min detection confidence") |
|
with gr.Row(): |
|
run_dup = gr.Checkbox(label="Check duplicates (fastdup)", value=False) |
|
run_modelqa = gr.Checkbox(label="Run Model QA & cleanlab", value=False) |
|
run_btn = gr.Button("Evaluate") |
|
out_md = gr.Markdown() |
|
out_df = gr.Dataframe() |
|
|
|
def evaluate( |
|
api_key, url_txt, zip_file, server_path, yaml_file, weights, |
|
blur_thr, iou_thr, conf_thr, run_dup, run_modelqa |
|
): |
|
reports, dfs = [], [] |
|
cfg = QCConfig( |
|
blur_thr, iou_thr, conf_thr, |
|
weights.name if weights else None |
|
) |
|
rf = Roboflow(api_key) if api_key and Roboflow else None |
|
|
|
|
|
if url_txt: |
|
for line in Path(url_txt.name).read_text().splitlines(): |
|
if not line.strip(): |
|
continue |
|
try: |
|
ds = download_rf_dataset(line, rf, TMP_ROOT) |
|
md, df = run_quality( |
|
ds, None, |
|
Path(weights.name) if weights else None, |
|
cfg, run_dup, run_modelqa |
|
) |
|
reports.append(md) |
|
dfs.append(df) |
|
except Exception as e: |
|
reports.append(f"### {line}\nβ οΈ {e}") |
|
|
|
|
|
if zip_file: |
|
tmp = Path(tempfile.mkdtemp()) |
|
shutil.unpack_archive(zip_file.name, tmp) |
|
md, df = run_quality( |
|
tmp, |
|
Path(yaml_file.name) if yaml_file else None, |
|
Path(weights.name) if weights else None, |
|
cfg, run_dup, run_modelqa |
|
) |
|
reports.append(md) |
|
dfs.append(df) |
|
shutil.rmtree(tmp, ignore_errors=True) |
|
|
|
|
|
if server_path: |
|
ds = Path(server_path) |
|
md, df = run_quality( |
|
ds, |
|
Path(yaml_file.name) if yaml_file else None, |
|
Path(weights.name) if weights else None, |
|
cfg, run_dup, run_modelqa |
|
) |
|
reports.append(md) |
|
dfs.append(df) |
|
|
|
summary = "\n---\n".join(reports) |
|
combined = pd.concat(dfs).groupby(level=0).sum() if dfs else pd.DataFrame() |
|
return summary, combined |
|
|
|
run_btn.click( |
|
evaluate, |
|
inputs=[api_in, url_txt, zip_in, path_in, yaml_in, weights_in, |
|
blur_sl, iou_sl, conf_sl, run_dup, run_modelqa], |
|
outputs=[out_md, out_df] |
|
) |
|
|
|
if __name__ == '__main__': |
|
demo.launch(server_name='0.0.0.0', server_port=int(os.getenv('PORT', 7860))) |
|
|