|
from fastapi import FastAPI, Query
|
|
import torch
|
|
import torch.nn.functional as F
|
|
from transformers import AutoTokenizer, AutoModel
|
|
import re
|
|
from fastapi import FastAPI, Depends
|
|
from fastapi_health import health
|
|
import logging
|
|
import sys
|
|
from pydantic import BaseModel
|
|
|
|
class TextPayload(BaseModel):
|
|
text: str
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
logging.basicConfig(
|
|
level=logging.getLevelName("INFO"),
|
|
handlers=[logging.StreamHandler(sys.stdout)],
|
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
|
|
logging.info('Logging module started')
|
|
|
|
def get_session():
|
|
return True
|
|
|
|
def is_database_online(session: bool = Depends(get_session)):
|
|
return session
|
|
|
|
app = FastAPI()
|
|
app.add_api_route("/healthz", health([is_database_online]))
|
|
|
|
|
|
class EmbeddingModels:
|
|
def __init__(self):
|
|
device="cuda" if torch.cuda.is_available() else "cpu"
|
|
self.device = device
|
|
|
|
self.text_model_ID = 'Alibaba-NLP/gte-large-en-v1.5'
|
|
self.text_model, self.text_tokenizer = self.get_text_model_info(self.text_model_ID)
|
|
|
|
|
|
def get_text_model_info(self, model_ID):
|
|
if self.device == 'cuda':
|
|
logging.info('Using Device CUDA')
|
|
tokenizer = AutoTokenizer.from_pretrained(model_ID)
|
|
model = AutoModel.from_pretrained(
|
|
model_ID,
|
|
trust_remote_code=True,
|
|
unpad_inputs=True,
|
|
use_memory_efficient_attention=True,
|
|
).to(self.device)
|
|
logging.info('xfomrer based memory_efficient_attention enabled as device is cuda')
|
|
else:
|
|
logging.info('Using Device CPU')
|
|
tokenizer = AutoTokenizer.from_pretrained(model_ID)
|
|
model = AutoModel.from_pretrained(
|
|
model_ID,
|
|
trust_remote_code=True,
|
|
).to(self.device)
|
|
logging.info(' memory_efficient_attention is not supported as device is cpu')
|
|
|
|
return model, tokenizer
|
|
|
|
|
|
def preprocessing_patent_data(self,text):
|
|
|
|
pattern0 = r'\b(SUBSTITUTE SHEET RULE 2 SUMMARY OF THE INVENTION|BRIEF DESCRIPTION OF PREFERRED EMBODIMENTS|BRIEF DESCRIPTION OF THE DRAWINGS/FIGURES|BEST MODE FOR CARRYING OUT THE INVENTION|BACKGROUND AND SUMMARY OF THE INVENTION|FIELD AND BACKGROUND OF THE INVENTION|BACKGROUND OF THE PRESENT INVENTION|FIELD AND BACKGROUND OF INVENTION|STAND DER TECHNIK- BACKGROUND ART|BRIEF DESCRIPTION OF THE DRAWINGS|DESCRIPTION OF THE RELATED ART|BRIEF SUMMARY OF THE INVENTION|UTILITY MODEL CLAIMS A CONTENT|DESCRIPTION OF BACKGROUND ART|BRIEF DESCRIPTION OF DRAWINGS|BACKGROUND OF THE INVENTION|BACKGROUND TO THE INVENTION|TÉCNICA ANTERIOR- PRIOR ART|DISCLOSURE OF THE INVENTION|BRIEF SUMMARY OF INVENTION|BACKGROUND OF RELATED ART|SUMMARY OF THE DISCLOSURE|SUMMARY OF THE INVENTIONS|SUMMARY OF THE INVENTION|OBJECTS OF THE INVENTION|THE CONTENT OF INVENTION|DISCLOSURE OF INVENTION|Disclosure of Invention|Complete Specification|RELATED BACKGROUND ART|BACKGROUND INFORMATION|BACKGROUND TECHNOLOGY|DETAILED DESCRIPTION|SUMMARY OF INVENTION|DETAILED DESCRIPTION|PROBLEM TO BE SOLVED|EFFECT OF INVENTION|WHAT IS CLAIMED IS|What is claimed is|What is Claim is|SUBSTITUTE SHEET|SELECTED DRAWING|BACK GROUND ART|BACKGROUND ART|Background Art|JPO&INPIT|CONSTITUTION|DEFINITIONS|Related Art|BACKGROUND|JPO&INPIT|JPO&NCIPI|COPYRIGHT|SOLUTION|SUMMARY)\b'
|
|
text = re.sub(pattern0, '[SEP]', text, flags=re.IGNORECASE)
|
|
text = ' '.join(text.split())
|
|
|
|
regex = r'<\s*heading[^>]*>(.*?)<\s*/\s*heading>|<[^<]+>|id=\"p-\d+\"|:'
|
|
result = re.sub(regex, '[SEP]', text, flags=re.IGNORECASE)
|
|
|
|
chemical_list = []
|
|
pattern1 = r'\b((?:(?:H|He|Li|Be|B|C|N|O|F|Ne|Na|Mg|Al|Si|P|S|Cl|Ar|K|Ca|Sc|Ti|V|Cr|Mn|Fe|Co|Ni|Cu|Zn|Ga|Ge|As|Se|Br|Kr|Rb|Sr|Y|Zr|Nb|Mo|Tc|Ru|Rh|Pd|Ag|Cd|In|Sn|Sb|Te|I|Xe|Cs|Ba|La|Hf|Ta|W|Re|Os|Ir|Pt|Au|Hg|Tl|Pb|Bi|Po|At|Rn|Fr|Ra|Ac|Rf|Db|Sg|Bh|Hs|Mt|Ds|Rg|Cn|Nh|Fl|Mc|Lv|Ts|Og|Ce|Pr|Nd|Pm|Sm|Eu|Gd|Tb|Dy|Ho|Er|Tm|Yb|Lu|Th|Pa|U|Np|Pu|Am|Cm|Bk|Cf|Es|Fm|Md|No|Lr)\d*)+)\b'
|
|
|
|
formula_names = re.findall(pattern1, result)
|
|
for formula in formula_names:
|
|
if len(formula)>=2:
|
|
chemical_list.append(formula)
|
|
|
|
|
|
|
|
pattern2 = r"\((?![A-Za-z]+\))[\w\d\s,-]+\)|\([A-Za-z]\)"
|
|
def keep_strings(text):
|
|
matched = text.group(0)
|
|
if any(item in matched for item in chemical_list):
|
|
return matched
|
|
return ' '
|
|
cleaned_text = re.sub(pattern2, keep_strings, result)
|
|
cleaned_text = ' '.join(cleaned_text.split())
|
|
cleaned_text= re.sub("(\[SEP\]+\s*)+", ' ', cleaned_text, flags=re.IGNORECASE)
|
|
|
|
p_text2=re.sub('[\—\-\═\=]', ' ', cleaned_text)
|
|
pattern1 = r'\b((?:(?:H|He|Li|Be|B|C|N|O|F|Ne|Na|Mg|Al|Si|P|S|Cl|Ar|K|Ca|Sc|Ti|V|Cr|Mn|Fe|Co|Ni|Cu|Zn|Ga|Ge|As|Se|Br|Kr|Rb|Sr|Y|Zr|Nb|Mo|Tc|Ru|Rh|Pd|Ag|Cd|In|Sn|Sb|Te|I|Xe|Cs|Ba|La|Hf|Ta|W|Re|Os|Ir|Pt|Au|Hg|Tl|Pb|Bi|Po|At|Rn|Fr|Ra|Ac|Rf|Db|Sg|Bh|Hs|Mt|Ds|Rg|Cn|Nh|Fl|Mc|Lv|Ts|Og|Ce|Pr|Nd|Pm|Sm|Eu|Gd|Tb|Dy|Ho|Er|Tm|Yb|Lu|Th|Pa|U|Np|Pu|Am|Cm|Bk|Cf|Es|Fm|Md|No|Lr)\d*)+)\b'
|
|
cleaned_text = re.sub(pattern1, "", p_text2)
|
|
cleaned_text = re.sub(' ,+|, +', ' ', cleaned_text)
|
|
cleaned_text = re.sub(' +', ' ', cleaned_text)
|
|
cleaned_text = re.sub('\.+', '.', cleaned_text)
|
|
cleaned_text = re.sub('[0-9] [0-9] +', ' ', cleaned_text)
|
|
cleaned_text = re.sub('( )', ' ', cleaned_text)
|
|
cleaned_text=cleaned_text.strip()
|
|
return cleaned_text
|
|
|
|
def get_text_embedding(self, text):
|
|
input_texts = []
|
|
text = self.preprocessing_patent_data(text)
|
|
logging.info('Input Text Processed')
|
|
input_texts.append(text)
|
|
batch_dict = self.text_tokenizer (input_texts, max_length=1024, padding=True, truncation=True, return_tensors='pt').to(self.device)
|
|
if self.device == 'cuda':
|
|
with torch.autocast(device_type="cuda", dtype=torch.float16):
|
|
with torch.inference_mode():
|
|
outputs = self.text_model(**batch_dict)
|
|
else:
|
|
with torch.inference_mode():
|
|
outputs = self.text_model(**batch_dict)
|
|
embeddings = outputs.last_hidden_state[:, 0]
|
|
embeddings = F.normalize(embeddings, p=2, dim=1)
|
|
logging.info('Embd Normalized')
|
|
values = embeddings[0].tolist()
|
|
logging.info('Embd Created')
|
|
return values
|
|
|
|
model = EmbeddingModels()
|
|
logging.info('Model Loaded!')
|
|
|
|
@app.post("/embed-text-gb/")
|
|
async def embed_text(payload: TextPayload):
|
|
try:
|
|
|
|
embeddings = model.get_text_embedding(payload.text)
|
|
return embeddings
|
|
except Exception as e:
|
|
logging.info(f'Error: {e}')
|
|
return {"error": str(e)} |