|
|
|
import os |
|
import glob |
|
import base64 |
|
import time |
|
import shutil |
|
import pandas as pd |
|
import torch |
|
import torch.nn as nn |
|
import torch.nn.functional as F |
|
from transformers import AutoModelForCausalLM, AutoTokenizer, AutoModel |
|
from diffusers import StableDiffusionPipeline |
|
from torch.utils.data import Dataset, DataLoader |
|
import csv |
|
import fitz |
|
import requests |
|
from PIL import Image |
|
import cv2 |
|
import numpy as np |
|
import logging |
|
import asyncio |
|
import aiofiles |
|
from io import BytesIO |
|
from dataclasses import dataclass |
|
from typing import Optional, Tuple |
|
import zipfile |
|
import math |
|
import random |
|
import re |
|
import gradio as gr |
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") |
|
logger = logging.getLogger(__name__) |
|
log_records = [] |
|
|
|
class LogCaptureHandler(logging.Handler): |
|
def emit(self, record): |
|
log_records.append(record) |
|
|
|
logger.addHandler(LogCaptureHandler()) |
|
|
|
|
|
@dataclass |
|
class ModelConfig: |
|
name: str |
|
base_model: str |
|
size: str |
|
domain: Optional[str] = None |
|
model_type: str = "causal_lm" |
|
@property |
|
def model_path(self): |
|
return f"models/{self.name}" |
|
|
|
@dataclass |
|
class DiffusionConfig: |
|
name: str |
|
base_model: str |
|
size: str |
|
domain: Optional[str] = None |
|
@property |
|
def model_path(self): |
|
return f"diffusion_models/{self.name}" |
|
|
|
class SFTDataset(Dataset): |
|
def __init__(self, data, tokenizer, max_length=128): |
|
self.data = data |
|
self.tokenizer = tokenizer |
|
self.max_length = max_length |
|
def __len__(self): |
|
return len(self.data) |
|
def __getitem__(self, idx): |
|
prompt = self.data[idx]["prompt"] |
|
response = self.data[idx]["response"] |
|
full_text = f"{prompt} {response}" |
|
full_encoding = self.tokenizer(full_text, max_length=self.max_length, padding="max_length", truncation=True, return_tensors="pt") |
|
prompt_encoding = self.tokenizer(prompt, max_length=self.max_length, padding=False, truncation=True, return_tensors="pt") |
|
input_ids = full_encoding["input_ids"].squeeze() |
|
attention_mask = full_encoding["attention_mask"].squeeze() |
|
labels = input_ids.clone() |
|
prompt_len = prompt_encoding["input_ids"].shape[1] |
|
if prompt_len < self.max_length: |
|
labels[:prompt_len] = -100 |
|
return {"input_ids": input_ids, "attention_mask": attention_mask, "labels": labels} |
|
|
|
class TinyUNet(nn.Module): |
|
def __init__(self, in_channels=3, out_channels=3): |
|
super(TinyUNet, self).__init__() |
|
self.down1 = nn.Conv2d(in_channels, 32, 3, padding=1) |
|
self.down2 = nn.Conv2d(32, 64, 3, padding=1, stride=2) |
|
self.mid = nn.Conv2d(64, 128, 3, padding=1) |
|
self.up1 = nn.ConvTranspose2d(128, 64, 3, stride=2, padding=1, output_padding=1) |
|
self.up2 = nn.Conv2d(64 + 32, 32, 3, padding=1) |
|
self.out = nn.Conv2d(32, out_channels, 3, padding=1) |
|
self.time_embed = nn.Linear(1, 64) |
|
|
|
def forward(self, x, t): |
|
t_embed = F.relu(self.time_embed(t.unsqueeze(-1))) |
|
t_embed = t_embed.view(t_embed.size(0), t_embed.size(1), 1, 1) |
|
x1 = F.relu(self.down1(x)) |
|
x2 = F.relu(self.down2(x1)) |
|
x_mid = F.relu(self.mid(x2)) + t_embed |
|
x_up1 = F.relu(self.up1(x_mid)) |
|
x_up2 = F.relu(self.up2(torch.cat([x_up1, x1], dim=1))) |
|
return self.out(x_up2) |
|
|
|
class TinyDiffusion: |
|
def __init__(self, model, timesteps=100): |
|
self.model = model |
|
self.timesteps = timesteps |
|
self.beta = torch.linspace(0.0001, 0.02, timesteps) |
|
self.alpha = 1 - self.beta |
|
self.alpha_cumprod = torch.cumprod(self.alpha, dim=0) |
|
|
|
def train(self, images, epochs=50): |
|
dataset = TinyDiffusionDataset(images) |
|
dataloader = DataLoader(dataset, batch_size=1, shuffle=True) |
|
optimizer = torch.optim.Adam(self.model.parameters(), lr=1e-4) |
|
device = torch.device("cpu") |
|
self.model.to(device) |
|
for epoch in range(epochs): |
|
total_loss = 0 |
|
for x in dataloader: |
|
x = x.to(device) |
|
t = torch.randint(0, self.timesteps, (x.size(0),), device=device).float() |
|
noise = torch.randn_like(x) |
|
alpha_t = self.alpha_cumprod[t.long()].view(-1, 1, 1, 1) |
|
x_noisy = torch.sqrt(alpha_t) * x + torch.sqrt(1 - alpha_t) * noise |
|
pred_noise = self.model(x_noisy, t) |
|
loss = F.mse_loss(pred_noise, noise) |
|
optimizer.zero_grad() |
|
loss.backward() |
|
optimizer.step() |
|
total_loss += loss.item() |
|
logger.info(f"Epoch {epoch + 1}/{epochs}, Loss: {total_loss / len(dataloader):.4f}") |
|
return self |
|
|
|
def generate(self, size=(64, 64), steps=100): |
|
device = torch.device("cpu") |
|
x = torch.randn(1, 3, size[0], size[1], device=device) |
|
for t in reversed(range(steps)): |
|
t_tensor = torch.full((1,), t, device=device, dtype=torch.float32) |
|
alpha_t = self.alpha_cumprod[t].view(-1, 1, 1, 1) |
|
pred_noise = self.model(x, t_tensor) |
|
x = (x - (1 - self.alpha[t]) / torch.sqrt(1 - alpha_t) * pred_noise) / torch.sqrt(self.alpha[t]) |
|
if t > 0: |
|
x += torch.sqrt(self.beta[t]) * torch.randn_like(x) |
|
x = torch.clamp(x * 255, 0, 255).byte() |
|
return Image.fromarray(x.squeeze(0).permute(1, 2, 0).cpu().numpy()) |
|
|
|
class TinyDiffusionDataset(Dataset): |
|
def __init__(self, images): |
|
self.images = [torch.tensor(np.array(img.convert("RGB")).transpose(2, 0, 1), dtype=torch.float32) / 255.0 for img in images] |
|
def __len__(self): |
|
return len(self.images) |
|
def __getitem__(self, idx): |
|
return self.images[idx] |
|
|
|
class ModelBuilder: |
|
def __init__(self): |
|
self.config = None |
|
self.model = None |
|
self.tokenizer = None |
|
self.sft_data = None |
|
def load_model(self, model_path: str, config: Optional[ModelConfig] = None): |
|
self.model = AutoModelForCausalLM.from_pretrained(model_path) |
|
self.tokenizer = AutoTokenizer.from_pretrained(model_path) |
|
if self.tokenizer.pad_token is None: |
|
self.tokenizer.pad_token = self.tokenizer.eos_token |
|
if config: |
|
self.config = config |
|
self.model.to("cuda" if torch.cuda.is_available() else "cpu") |
|
return self |
|
def fine_tune_sft(self, csv_path: str, epochs: int = 3, batch_size: int = 4): |
|
self.sft_data = [] |
|
with open(csv_path, "r") as f: |
|
reader = csv.DictReader(f) |
|
for row in reader: |
|
self.sft_data.append({"prompt": row["prompt"], "response": row["response"]}) |
|
dataset = SFTDataset(self.sft_data, self.tokenizer) |
|
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True) |
|
optimizer = torch.optim.AdamW(self.model.parameters(), lr=2e-5) |
|
self.model.train() |
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
self.model.to(device) |
|
for epoch in range(epochs): |
|
total_loss = 0 |
|
for batch in dataloader: |
|
optimizer.zero_grad() |
|
input_ids = batch["input_ids"].to(device) |
|
attention_mask = batch["attention_mask"].to(device) |
|
labels = batch["labels"].to(device) |
|
outputs = self.model(input_ids=input_ids, attention_mask=attention_mask, labels=labels) |
|
loss = outputs.loss |
|
loss.backward() |
|
optimizer.step() |
|
total_loss += loss.item() |
|
logger.info(f"Epoch {epoch + 1} completed. Average loss: {total_loss / len(dataloader):.4f}") |
|
return self |
|
def save_model(self, path: str): |
|
os.makedirs(os.path.dirname(path), exist_ok=True) |
|
self.model.save_pretrained(path) |
|
self.tokenizer.save_pretrained(path) |
|
def evaluate(self, prompt: str): |
|
self.model.eval() |
|
with torch.no_grad(): |
|
inputs = self.tokenizer(prompt, return_tensors="pt", max_length=128, truncation=True).to(self.model.device) |
|
outputs = self.model.generate(**inputs, max_new_tokens=50, do_sample=True, top_p=0.95, temperature=0.7) |
|
return self.tokenizer.decode(outputs[0], skip_special_tokens=True) |
|
|
|
class DiffusionBuilder: |
|
def __init__(self): |
|
self.config = None |
|
self.pipeline = None |
|
def load_model(self, model_path: str, config: Optional[DiffusionConfig] = None): |
|
self.pipeline = StableDiffusionPipeline.from_pretrained(model_path, torch_dtype=torch.float32).to("cpu") |
|
if config: |
|
self.config = config |
|
return self |
|
def generate(self, prompt: str): |
|
return self.pipeline(prompt, num_inference_steps=20).images[0] |
|
|
|
|
|
def generate_filename(sequence, ext="png"): |
|
timestamp = time.strftime("%d%m%Y%HM%S") |
|
return f"{sequence}_{timestamp}.{ext}" |
|
|
|
def pdf_url_to_filename(url): |
|
safe_name = re.sub(r'[<>:"/\\|?*]', '_', url) |
|
return f"{safe_name}.pdf" |
|
|
|
def get_gallery_files(file_types=["png", "pdf"]): |
|
return sorted(list(set([f for ext in file_types for f in glob.glob(f"*.{ext}")]))) |
|
|
|
def download_pdf(url, output_path): |
|
try: |
|
response = requests.get(url, stream=True, timeout=10) |
|
if response.status_code == 200: |
|
with open(output_path, "wb") as f: |
|
for chunk in response.iter_content(chunk_size=8192): |
|
f.write(chunk) |
|
return True |
|
except requests.RequestException as e: |
|
logger.error(f"Failed to download {url}: {e}") |
|
return False |
|
|
|
async def process_pdf_snapshot(pdf_path, mode="single"): |
|
doc = fitz.open(pdf_path) |
|
output_files = [] |
|
if mode == "single": |
|
page = doc[0] |
|
pix = page.get_pixmap(matrix=fitz.Matrix(2.0, 2.0)) |
|
output_file = generate_filename("single", "png") |
|
pix.save(output_file) |
|
output_files.append(output_file) |
|
doc.close() |
|
return output_files |
|
|
|
|
|
def update_gallery(history): |
|
all_files = get_gallery_files() |
|
gallery_content = "\n".join([f"- {f}" for f in all_files[:5]]) |
|
history.append(f"Gallery updated: {len(all_files)} files") |
|
return gallery_content, history |
|
|
|
def camera_snap(image, history): |
|
if image is not None: |
|
filename = generate_filename("cam") |
|
image.save(filename) |
|
history.append(f"Snapshot saved: {filename}") |
|
return f"Image saved as {filename}", history |
|
return "No image captured", history |
|
|
|
def download_pdfs(urls, history): |
|
urls = urls.strip().split("\n") |
|
downloaded = [] |
|
for url in urls: |
|
if url: |
|
output_path = pdf_url_to_filename(url) |
|
if download_pdf(url, output_path): |
|
downloaded.append(output_path) |
|
history.append(f"Downloaded PDF: {output_path}") |
|
return f"Downloaded {len(downloaded)} PDFs", history |
|
|
|
def build_model(model_type, base_model, model_name, domain, history): |
|
config = (ModelConfig if model_type == "Causal LM" else DiffusionConfig)(name=model_name, base_model=base_model, size="small", domain=domain) |
|
builder = ModelBuilder() if model_type == "Causal LM" else DiffusionBuilder() |
|
builder.load_model(base_model, config) |
|
builder.save_model(config.model_path) |
|
history.append(f"Built {model_type} model: {model_name}") |
|
return builder, f"Model saved to {config.model_path}", history |
|
|
|
def test_model(builder, prompt, history): |
|
if builder is None: |
|
return "No model loaded", history |
|
if isinstance(builder, ModelBuilder): |
|
result = builder.evaluate(prompt) |
|
history.append(f"Tested Causal LM: {prompt} -> {result}") |
|
return result, history |
|
elif isinstance(builder, DiffusionBuilder): |
|
image = builder.generate(prompt) |
|
output_file = generate_filename("diffusion_test") |
|
image.save(output_file) |
|
history.append(f"Tested Diffusion: {prompt} -> {output_file}") |
|
return output_file, history |
|
|
|
|
|
with gr.Blocks(title="AI Vision & SFT Titans π") as demo: |
|
gr.Markdown("# AI Vision & SFT Titans π") |
|
history = gr.State(value=[]) |
|
builder = gr.State(value=None) |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
gr.Markdown("## Captured Files π") |
|
gallery_output = gr.Textbox(label="Gallery", lines=5) |
|
gr.Button("Update Gallery").click(update_gallery, inputs=[history], outputs=[gallery_output, history]) |
|
|
|
with gr.Column(scale=3): |
|
with gr.Tabs(): |
|
with gr.TabItem("Camera Snap π·"): |
|
camera_input = gr.Image(type="pil", label="Take a Picture") |
|
snap_output = gr.Textbox(label="Status") |
|
gr.Button("Capture").click(camera_snap, inputs=[camera_input, history], outputs=[snap_output, history]) |
|
|
|
with gr.TabItem("Download PDFs π₯"): |
|
url_input = gr.Textbox(label="Enter PDF URLs (one per line)", lines=5) |
|
pdf_output = gr.Textbox(label="Status") |
|
gr.Button("Download").click(download_pdfs, inputs=[url_input, history], outputs=[pdf_output, history]) |
|
|
|
with gr.TabItem("Build Titan π±"): |
|
model_type = gr.Dropdown(["Causal LM", "Diffusion"], label="Model Type") |
|
base_model = gr.Dropdown( |
|
choices=["HuggingFaceTB/SmolLM-135M", "Qwen/Qwen1.5-0.5B-Chat"] if model_type.value == "Causal LM" else ["OFA-Sys/small-stable-diffusion-v0", "stabilityai/stable-diffusion-2-base"], |
|
label="Base Model" |
|
) |
|
model_name = gr.Textbox(label="Model Name", value=f"tiny-titan-{int(time.time())}") |
|
domain = gr.Textbox(label="Domain", value="general") |
|
build_output = gr.Textbox(label="Status") |
|
gr.Button("Build").click(build_model, inputs=[model_type, base_model, model_name, domain, history], outputs=[builder, build_output, history]) |
|
|
|
with gr.TabItem("Test Titan π§ͺ"): |
|
test_prompt = gr.Textbox(label="Test Prompt", value="What is AI?") |
|
test_output = gr.Textbox(label="Result") |
|
gr.Button("Test").click(test_model, inputs=[builder, test_prompt, history], outputs=[test_output, history]) |
|
|
|
with gr.Row(): |
|
gr.Markdown("## History π") |
|
history_output = gr.Textbox(value="\n".join(history.value), label="History", lines=5, interactive=False) |
|
|
|
demo.launch() |