ETSIDocFinder / app.py
om4r932's picture
Removed comments + add specifications + add batch search
2cdd1ea
from io import StringIO
import numpy as np
import pandas as pd
import requests
from bs4 import BeautifulSoup
import json
import os
import traceback
import uuid
import zipfile
import io
import subprocess
import os
import re
import time
from datetime import datetime
import warnings
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from typing import Any, Dict, List, Literal, Optional
warnings.filterwarnings("ignore")
app = FastAPI(title="3GPP Document Finder API",
description="API to find 3GPP documents based on TSG document IDs")
app.mount("/static", StaticFiles(directory="static"), name="static")
origins = [
"*",
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class DocRequest(BaseModel):
doc_id: str
class DocResponse(BaseModel):
doc_id: str
url: str
release: Optional[str] = None
scope: Optional[str] = None
search_time: float
class MultiDocResponse(BaseModel):
doc_id: str
results: List[DocResponse]
search_time: float
class BatchDocRequest(BaseModel):
doc_ids: List[str]
release: Optional[int] = None
class BatchDocResponse(BaseModel):
results: Dict[str, str]
missing: List[str]
search_time: float
class DocFinder:
def __init__(self):
self.main_ftp_url = "https://docbox.etsi.org/SET"
self.session = requests.Session()
self.indexer_file = "indexed_docs.json"
self.indexer, self.last_indexer_date = self.load_indexer()
self.session.post("https://portal.etsi.org/ETSIPages/LoginEOL.ashx", verify=False, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"}, data=json.dumps({"username": os.environ.get("EOL_USER"), "password": os.environ.get("EOL_PASSWORD")}))
def load_indexer(self):
if os.path.exists(self.indexer_file):
with open(self.indexer_file, "r", encoding="utf-8") as f:
x = json.load(f)
return x["docs"], x["last_indexed_date"]
return {}, None
def save_indexer(self):
today = datetime.today()
self.last_indexer_date = today.strftime("%d/%m/%Y-%H:%M:%S")
with open(self.indexer_file, "w", encoding="utf-8") as f:
output = {"docs": self.indexer, "last_indexed_date": self.last_indexer_date}
json.dump(output, f, indent=4, ensure_ascii=False)
def get_workgroup(self, doc: str):
main_tsg = "SET-WG-R" if any(doc.startswith(kw) for kw in ["SETREQ", "SCPREQ"]) else "SET-WG-T" if any(doc.startswith(kw) for kw in ["SETTEC", "SCPTEC"]) else "SET" if any(doc.startswith(kw) for kw in ["SET", "SCP"]) else None
if main_tsg is None:
return None, None, None
regex = re.search(r'\(([^)]+)\)', doc)
workgroup = "20" + regex.group(1)
return main_tsg, workgroup, doc
def find_workgroup_url(self, main_tsg, workgroup):
response = self.session.get(f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS", verify=False)
soup = BeautifulSoup(response.text, 'html.parser')
for item in soup.find_all("tr"):
link = item.find("a")
if link and workgroup in link.get_text():
return f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS/{link.get_text()}"
return f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS/{workgroup}"
def get_docs_from_url(self, url):
try:
response = self.session.get(url, verify=False, timeout=15)
soup = BeautifulSoup(response.text, "html.parser")
return [item.get_text() for item in soup.select("tr td a")]
except Exception as e:
print(f"Error accessing {url}: {e}")
return []
def search_document(self, doc_id: str):
original = doc_id
if original in self.indexer:
return self.indexer[original]
for doc in self.indexer:
if doc.startswith(original):
return self.indexer[doc]
main_tsg, workgroup, doc = self.get_workgroup(doc_id)
urls = []
if main_tsg:
wg_url = self.find_workgroup_url(main_tsg, workgroup)
if wg_url:
files = self.get_docs_from_url(wg_url)
for f in files:
if doc in f.lower() or original in f:
print(f)
doc_url = f"{wg_url}/{f}"
self.indexer[original] = doc_url
self.save_indexer()
urls.append(doc_url)
return urls[0] if len(urls) == 1 else urls[-2] if len(urls) > 1 else f"Document {doc_id} not found"
class SpecFinder:
def __init__(self):
self.main_url = "https://www.etsi.org/deliver/etsi_ts"
self.indexer_file = "indexed_specifications.json"
self.indexer, self.last_indexer_date = self.load_indexer()
self.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"}
def load_indexer(self):
if os.path.exists(self.indexer_file):
with open(self.indexer_file, "r", encoding="utf-8") as f:
x = json.load(f)
return x["specs"], x["last_indexed_date"]
return {}, None
def save_indexer(self):
today = datetime.today()
self.last_indexer_date = today.strftime("%d/%m/%Y-%H:%M:%S")
with open(self.indexer_file, "w", encoding="utf-8") as f:
output = {"specs": self.indexer, "last_indexed_date": self.last_indexer_date}
json.dump(output, f, indent=4, ensure_ascii=False)
def get_spec_path(self, doc_id: str):
if "-" in doc_id:
position, part = doc_id.split("-")
else:
position, part = doc_id, None
position = position.replace(" ", "")
if part:
if len(part) == 1:
part = "0" + part
spec_folder = position + part if part is not None else position
return f"{int(position) - (int(position)%100)}_{int(position) - (int(position)%100) + 99}/{spec_folder}"
def get_docs_from_url(self, url):
try:
response = requests.get(url, verify=False, timeout=15)
soup = BeautifulSoup(response.text, "html.parser")
docs = [item.get_text() for item in soup.find_all("a")][1:]
return docs
except Exception as e:
print(f"Error accessing {url}: {e}")
return []
def search_document(self, doc_id: str):
# Example : 103 666[-2 opt]
original = doc_id
if original in self.indexer:
return self.indexer[original]
for doc in self.indexer:
if doc.startswith(original):
return self.indexer[doc]
url = f"{self.main_url}/{self.get_spec_path(original)}/"
releases = self.get_docs_from_url(url)
files = self.get_docs_from_url(url + releases[-1])
for f in files:
if f.endswith(".pdf"):
return url + releases[-1] + "/" + f
return f"Specification {doc_id} not found"
@app.get("/")
async def main_menu():
return FileResponse(os.path.join("templates", "index.html"))
finder_doc = DocFinder()
finder_spec = SpecFinder()
@app.post("/find")
def find_document(request: DocRequest):
start_time = time.time()
finder = finder_spec if request.doc_id[0].isnumeric() else finder_doc
result = finder.search_document(request.doc_id)
if "not found" not in result and "Could not" not in result and "Unable" not in result:
return DocResponse(
doc_id=request.doc_id,
url=result,
search_time=time.time() - start_time
) if not isinstance(result, list) else result
else:
raise HTTPException(status_code=404, detail=result)
@app.post("/batch", response_model=BatchDocResponse)
def find_documents_batch(request: BatchDocRequest):
start_time = time.time()
results = {}
missing = []
for doc_id in request.doc_ids:
finder = finder_doc if doc_id[0].isalpha() else finder_spec
result = finder.search_document(doc_id)
if "not found" not in result and "Could not" not in result and "Unable" not in result:
results[doc_id] = result
else:
missing.append(doc_id)
return BatchDocResponse(
results=results,
missing=missing,
search_time=time.time() - start_time
)