Spaces:
Sleeping
Sleeping
from io import StringIO | |
import numpy as np | |
import pandas as pd | |
import requests | |
from bs4 import BeautifulSoup | |
import json | |
import os | |
import traceback | |
import uuid | |
import zipfile | |
import io | |
import subprocess | |
import os | |
import re | |
import time | |
from datetime import datetime | |
import warnings | |
from fastapi import FastAPI, HTTPException | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.responses import FileResponse | |
from fastapi.staticfiles import StaticFiles | |
from pydantic import BaseModel | |
from typing import Any, Dict, List, Literal, Optional | |
warnings.filterwarnings("ignore") | |
app = FastAPI(title="3GPP Document Finder API", | |
description="API to find 3GPP documents based on TSG document IDs") | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
origins = [ | |
"*", | |
] | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=origins, | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
class DocRequest(BaseModel): | |
doc_id: str | |
class DocResponse(BaseModel): | |
doc_id: str | |
url: str | |
release: Optional[str] = None | |
scope: Optional[str] = None | |
search_time: float | |
class MultiDocResponse(BaseModel): | |
doc_id: str | |
results: List[DocResponse] | |
search_time: float | |
class BatchDocRequest(BaseModel): | |
doc_ids: List[str] | |
release: Optional[int] = None | |
class BatchDocResponse(BaseModel): | |
results: Dict[str, str] | |
missing: List[str] | |
search_time: float | |
class DocFinder: | |
def __init__(self): | |
self.main_ftp_url = "https://docbox.etsi.org/SET" | |
self.session = requests.Session() | |
self.indexer_file = "indexed_docs.json" | |
self.indexer, self.last_indexer_date = self.load_indexer() | |
self.session.post("https://portal.etsi.org/ETSIPages/LoginEOL.ashx", verify=False, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"}, data=json.dumps({"username": os.environ.get("EOL_USER"), "password": os.environ.get("EOL_PASSWORD")})) | |
def load_indexer(self): | |
if os.path.exists(self.indexer_file): | |
with open(self.indexer_file, "r", encoding="utf-8") as f: | |
x = json.load(f) | |
return x["docs"], x["last_indexed_date"] | |
return {}, None | |
def save_indexer(self): | |
today = datetime.today() | |
self.last_indexer_date = today.strftime("%d/%m/%Y-%H:%M:%S") | |
with open(self.indexer_file, "w", encoding="utf-8") as f: | |
output = {"docs": self.indexer, "last_indexed_date": self.last_indexer_date} | |
json.dump(output, f, indent=4, ensure_ascii=False) | |
def get_workgroup(self, doc: str): | |
main_tsg = "SET-WG-R" if any(doc.startswith(kw) for kw in ["SETREQ", "SCPREQ"]) else "SET-WG-T" if any(doc.startswith(kw) for kw in ["SETTEC", "SCPTEC"]) else "SET" if any(doc.startswith(kw) for kw in ["SET", "SCP"]) else None | |
if main_tsg is None: | |
return None, None, None | |
regex = re.search(r'\(([^)]+)\)', doc) | |
workgroup = "20" + regex.group(1) | |
return main_tsg, workgroup, doc | |
def find_workgroup_url(self, main_tsg, workgroup): | |
response = self.session.get(f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS", verify=False) | |
soup = BeautifulSoup(response.text, 'html.parser') | |
for item in soup.find_all("tr"): | |
link = item.find("a") | |
if link and workgroup in link.get_text(): | |
return f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS/{link.get_text()}" | |
return f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS/{workgroup}" | |
def get_docs_from_url(self, url): | |
try: | |
response = self.session.get(url, verify=False, timeout=15) | |
soup = BeautifulSoup(response.text, "html.parser") | |
return [item.get_text() for item in soup.select("tr td a")] | |
except Exception as e: | |
print(f"Error accessing {url}: {e}") | |
return [] | |
def search_document(self, doc_id: str): | |
original = doc_id | |
if original in self.indexer: | |
return self.indexer[original] | |
for doc in self.indexer: | |
if doc.startswith(original): | |
return self.indexer[doc] | |
main_tsg, workgroup, doc = self.get_workgroup(doc_id) | |
urls = [] | |
if main_tsg: | |
wg_url = self.find_workgroup_url(main_tsg, workgroup) | |
if wg_url: | |
files = self.get_docs_from_url(wg_url) | |
for f in files: | |
if doc in f.lower() or original in f: | |
print(f) | |
doc_url = f"{wg_url}/{f}" | |
self.indexer[original] = doc_url | |
self.save_indexer() | |
urls.append(doc_url) | |
return urls[0] if len(urls) == 1 else urls[-2] if len(urls) > 1 else f"Document {doc_id} not found" | |
class SpecFinder: | |
def __init__(self): | |
self.main_url = "https://www.etsi.org/deliver/etsi_ts" | |
self.indexer_file = "indexed_specifications.json" | |
self.indexer, self.last_indexer_date = self.load_indexer() | |
self.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"} | |
def load_indexer(self): | |
if os.path.exists(self.indexer_file): | |
with open(self.indexer_file, "r", encoding="utf-8") as f: | |
x = json.load(f) | |
return x["specs"], x["last_indexed_date"] | |
return {}, None | |
def save_indexer(self): | |
today = datetime.today() | |
self.last_indexer_date = today.strftime("%d/%m/%Y-%H:%M:%S") | |
with open(self.indexer_file, "w", encoding="utf-8") as f: | |
output = {"specs": self.indexer, "last_indexed_date": self.last_indexer_date} | |
json.dump(output, f, indent=4, ensure_ascii=False) | |
def get_spec_path(self, doc_id: str): | |
if "-" in doc_id: | |
position, part = doc_id.split("-") | |
else: | |
position, part = doc_id, None | |
position = position.replace(" ", "") | |
if part: | |
if len(part) == 1: | |
part = "0" + part | |
spec_folder = position + part if part is not None else position | |
return f"{int(position) - (int(position)%100)}_{int(position) - (int(position)%100) + 99}/{spec_folder}" | |
def get_docs_from_url(self, url): | |
try: | |
response = requests.get(url, verify=False, timeout=15) | |
soup = BeautifulSoup(response.text, "html.parser") | |
docs = [item.get_text() for item in soup.find_all("a")][1:] | |
return docs | |
except Exception as e: | |
print(f"Error accessing {url}: {e}") | |
return [] | |
def search_document(self, doc_id: str): | |
# Example : 103 666[-2 opt] | |
original = doc_id | |
if original in self.indexer: | |
return self.indexer[original] | |
for doc in self.indexer: | |
if doc.startswith(original): | |
return self.indexer[doc] | |
url = f"{self.main_url}/{self.get_spec_path(original)}/" | |
releases = self.get_docs_from_url(url) | |
files = self.get_docs_from_url(url + releases[-1]) | |
for f in files: | |
if f.endswith(".pdf"): | |
return url + releases[-1] + "/" + f | |
return f"Specification {doc_id} not found" | |
async def main_menu(): | |
return FileResponse(os.path.join("templates", "index.html")) | |
finder_doc = DocFinder() | |
finder_spec = SpecFinder() | |
def find_document(request: DocRequest): | |
start_time = time.time() | |
finder = finder_spec if request.doc_id[0].isnumeric() else finder_doc | |
result = finder.search_document(request.doc_id) | |
if "not found" not in result and "Could not" not in result and "Unable" not in result: | |
return DocResponse( | |
doc_id=request.doc_id, | |
url=result, | |
search_time=time.time() - start_time | |
) if not isinstance(result, list) else result | |
else: | |
raise HTTPException(status_code=404, detail=result) | |
def find_documents_batch(request: BatchDocRequest): | |
start_time = time.time() | |
results = {} | |
missing = [] | |
for doc_id in request.doc_ids: | |
finder = finder_doc if doc_id[0].isalpha() else finder_spec | |
result = finder.search_document(doc_id) | |
if "not found" not in result and "Could not" not in result and "Unable" not in result: | |
results[doc_id] = result | |
else: | |
missing.append(doc_id) | |
return BatchDocResponse( | |
results=results, | |
missing=missing, | |
search_time=time.time() - start_time | |
) |