Spaces:
Sleeping
Sleeping
import os | |
import requests | |
import joblib | |
import logging | |
import zipfile | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
# Get model URLs from environment variables | |
DIABETES_MODEL_URL = os.getenv("DIABETES_MODEL_URL") | |
SCALER_URL = os.getenv("SCALER_URL") | |
MULTI_MODEL_URL = os.getenv("MULTI_MODEL_URL") | |
# Local paths for downloaded models | |
MODEL_PATHS = { | |
"DIABETES_MODEL": "finaliseddiabetes_model.zip", | |
"SCALER": "finalisedscaler.zip", | |
"MULTI_MODEL": "nodiabetes.zip", | |
} | |
# Extracted model names | |
EXTRACTED_MODELS = { | |
"DIABETES_MODEL": "finaliseddiabetes_model.joblib", | |
"SCALER": "finalisedscaler.joblib", | |
"MULTI_MODEL": "nodiabetes.joblib", | |
} | |
BASE_DIR = os.getcwd() # Get current working directory | |
def download_model(url, zip_filename): | |
"""Downloads the model zip file from the given URL and saves it locally.""" | |
zip_path = os.path.join(BASE_DIR, zip_filename) | |
if not url: | |
logging.error(f"URL for {zip_filename} is missing!") | |
return False | |
try: | |
response = requests.get(url, allow_redirects=True) | |
if response.status_code == 200: | |
with open(zip_path, 'wb') as f: | |
f.write(response.content) | |
logging.info(f"Downloaded {zip_filename} successfully.") | |
return True | |
else: | |
logging.error(f"Failed to download {zip_filename}. HTTP Status: {response.status_code}") | |
return False | |
except Exception as e: | |
logging.error(f"Error downloading {zip_filename}: {e}") | |
return False | |
def extract_if_needed(zip_filename, extracted_filename): | |
"""Extracts model file from zip if not already extracted.""" | |
zip_path = os.path.join(BASE_DIR, zip_filename) | |
extracted_path = os.path.join(BASE_DIR, extracted_filename) | |
if os.path.exists(extracted_path): | |
logging.info(f"{extracted_filename} already exists. Skipping extraction.") | |
return True | |
if not os.path.exists(zip_path): | |
logging.error(f"Zip file missing: {zip_path}") | |
return False | |
try: | |
with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
zip_ref.extractall(BASE_DIR) | |
extracted_files = zip_ref.namelist() | |
logging.info(f"Extracted {zip_filename}, contents: {extracted_files}") | |
return True | |
except Exception as e: | |
logging.error(f"Error extracting {zip_filename}: {e}") | |
return False | |
def load_model(model_filename): | |
"""Loads a model from the given filename.""" | |
model_path = os.path.join(BASE_DIR, model_filename) | |
if not os.path.exists(model_path): | |
logging.error(f"Model file not found: {model_path}") | |
return None | |
try: | |
model = joblib.load(model_path) | |
logging.info(f"Loaded {model_filename} successfully.") | |
return model | |
except Exception as e: | |
logging.error(f"Error loading {model_filename}: {e}") | |
return None | |
# **Main Execution** | |
for model_key, zip_filename in MODEL_PATHS.items(): | |
extracted_filename = EXTRACTED_MODELS[model_key] | |
# Step 1: Download model if not present | |
if not os.path.exists(os.path.join(BASE_DIR, zip_filename)): | |
download_model(globals()[f"{model_key}_URL"], zip_filename) | |
# Step 2: Extract model file | |
extract_if_needed(zip_filename, extracted_filename) | |
# Step 3: Load models | |
diabetes_model = load_model(EXTRACTED_MODELS["DIABETES_MODEL"]) | |
scaler = load_model(EXTRACTED_MODELS["SCALER"]) | |
multi_model = load_model(EXTRACTED_MODELS["MULTI_MODEL"]) | |
# Final check | |
if diabetes_model and scaler and multi_model: | |
logging.info("All models loaded successfully! β ") | |
else: | |
logging.error("Some models failed to load. β Check logs for details.") | |