File size: 7,407 Bytes
742ff21
 
 
 
 
 
 
 
 
ed9ddb3
 
 
 
 
 
742ff21
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed9ddb3
742ff21
ed9ddb3
 
742ff21
 
ed9ddb3
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
from fastapi import FastAPI, Query
import torch
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModel
import re
from fastapi import FastAPI, Depends
from fastapi_health import health
import logging
import sys
from pydantic import BaseModel

class TextPayload(BaseModel):
    text: str


logger = logging.getLogger(__name__)

logging.basicConfig(
        level=logging.getLevelName("INFO"),
        handlers=[logging.StreamHandler(sys.stdout)],
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logging.info('Logging module started')

def get_session():
    return True

def is_database_online(session: bool = Depends(get_session)):
    return session

app = FastAPI()
app.add_api_route("/healthz", health([is_database_online]))


class EmbeddingModels:
    def __init__(self):
        device="cuda" if torch.cuda.is_available() else "cpu"
        self.device = device
        # print(f"Using device {self.device}")
        self.text_model_ID = 'Alibaba-NLP/gte-large-en-v1.5'
        self.text_model, self.text_tokenizer = self.get_text_model_info(self.text_model_ID)
    

    def get_text_model_info(self, model_ID):
        if self.device == 'cuda':
            logging.info('Using Device CUDA')
            tokenizer = AutoTokenizer.from_pretrained(model_ID)
            model = AutoModel.from_pretrained(
                model_ID,
                trust_remote_code=True,
                unpad_inputs=True,
                use_memory_efficient_attention=True,
            ).to(self.device)
            logging.info('xfomrer based memory_efficient_attention enabled as device is cuda')
        else:
            logging.info('Using Device CPU')
            tokenizer = AutoTokenizer.from_pretrained(model_ID)
            model = AutoModel.from_pretrained(
                model_ID,
                trust_remote_code=True,
            ).to(self.device)
            logging.info(' memory_efficient_attention is not supported as device is cpu')
        
        return model, tokenizer
    

    def preprocessing_patent_data(self,text):
    # Removing Common tags in patent
        pattern0 =  r'\b(SUBSTITUTE SHEET RULE 2 SUMMARY OF THE INVENTION|BRIEF DESCRIPTION OF PREFERRED EMBODIMENTS|BRIEF DESCRIPTION OF THE DRAWINGS/FIGURES|BEST MODE FOR CARRYING OUT THE INVENTION|BACKGROUND AND SUMMARY OF THE INVENTION|FIELD AND BACKGROUND OF THE INVENTION|BACKGROUND OF THE PRESENT INVENTION|FIELD AND BACKGROUND OF INVENTION|STAND DER TECHNIK- BACKGROUND ART|BRIEF DESCRIPTION OF THE DRAWINGS|DESCRIPTION OF THE RELATED ART|BRIEF SUMMARY OF THE INVENTION|UTILITY MODEL CLAIMS A CONTENT|DESCRIPTION OF BACKGROUND ART|BRIEF DESCRIPTION OF DRAWINGS|BACKGROUND OF THE INVENTION|BACKGROUND TO THE INVENTION|TÉCNICA ANTERIOR- PRIOR ART|DISCLOSURE OF THE INVENTION|BRIEF SUMMARY OF INVENTION|BACKGROUND OF RELATED ART|SUMMARY OF THE DISCLOSURE|SUMMARY OF THE INVENTIONS|SUMMARY OF THE INVENTION|OBJECTS OF THE INVENTION|THE CONTENT OF INVENTION|DISCLOSURE OF INVENTION|Disclosure of Invention|Complete Specification|RELATED BACKGROUND ART|BACKGROUND INFORMATION|BACKGROUND TECHNOLOGY|DETAILED DESCRIPTION|SUMMARY OF INVENTION|DETAILED DESCRIPTION|PROBLEM TO BE SOLVED|EFFECT OF INVENTION|WHAT IS CLAIMED IS|What is claimed is|What is Claim is|SUBSTITUTE SHEET|SELECTED DRAWING|BACK GROUND ART|BACKGROUND ART|Background Art|JPO&INPIT|CONSTITUTION|DEFINITIONS|Related Art|BACKGROUND|JPO&INPIT|JPO&NCIPI|COPYRIGHT|SOLUTION|SUMMARY)\b'
        text = re.sub(pattern0, '[SEP]', text, flags=re.IGNORECASE)
        text = ' '.join(text.split())
        # Removing all tags between Heading to /Heading and id= 
        regex = r'<\s*heading[^>]*>(.*?)<\s*/\s*heading>|<[^<]+>|id=\"p-\d+\"|:'
        result = re.sub(regex, '[SEP]', text, flags=re.IGNORECASE)
        # find_formula_names from pat text to exclude it from below logic regex
        chemical_list = []
        pattern1 = r'\b((?:(?:H|He|Li|Be|B|C|N|O|F|Ne|Na|Mg|Al|Si|P|S|Cl|Ar|K|Ca|Sc|Ti|V|Cr|Mn|Fe|Co|Ni|Cu|Zn|Ga|Ge|As|Se|Br|Kr|Rb|Sr|Y|Zr|Nb|Mo|Tc|Ru|Rh|Pd|Ag|Cd|In|Sn|Sb|Te|I|Xe|Cs|Ba|La|Hf|Ta|W|Re|Os|Ir|Pt|Au|Hg|Tl|Pb|Bi|Po|At|Rn|Fr|Ra|Ac|Rf|Db|Sg|Bh|Hs|Mt|Ds|Rg|Cn|Nh|Fl|Mc|Lv|Ts|Og|Ce|Pr|Nd|Pm|Sm|Eu|Gd|Tb|Dy|Ho|Er|Tm|Yb|Lu|Th|Pa|U|Np|Pu|Am|Cm|Bk|Cf|Es|Fm|Md|No|Lr)\d*)+)\b'
        
        formula_names = re.findall(pattern1, result)
        for formula in formula_names:
            if len(formula)>=2:
                chemical_list.append(formula)
        # print("chemical_list:", chemical_list)

        # Remove numbers and alphanum inside brackets excluding chemical forms
        pattern2 = r"\((?![A-Za-z]+\))[\w\d\s,-]+\)|\([A-Za-z]\)"
        def keep_strings(text):
            matched = text.group(0)
            if any(item in matched for item in chemical_list):
                return matched
            return ' '
        cleaned_text = re.sub(pattern2, keep_strings, result)
        cleaned_text = ' '.join(cleaned_text.split())
        cleaned_text= re.sub("(\[SEP\]+\s*)+", ' ', cleaned_text, flags=re.IGNORECASE)
        # below new logic to remove chemical compounds (eg.chemical- polymerizable compounds)
        p_text2=re.sub('[\—\-\═\=]', ' ', cleaned_text)
        pattern1 = r'\b((?:(?:H|He|Li|Be|B|C|N|O|F|Ne|Na|Mg|Al|Si|P|S|Cl|Ar|K|Ca|Sc|Ti|V|Cr|Mn|Fe|Co|Ni|Cu|Zn|Ga|Ge|As|Se|Br|Kr|Rb|Sr|Y|Zr|Nb|Mo|Tc|Ru|Rh|Pd|Ag|Cd|In|Sn|Sb|Te|I|Xe|Cs|Ba|La|Hf|Ta|W|Re|Os|Ir|Pt|Au|Hg|Tl|Pb|Bi|Po|At|Rn|Fr|Ra|Ac|Rf|Db|Sg|Bh|Hs|Mt|Ds|Rg|Cn|Nh|Fl|Mc|Lv|Ts|Og|Ce|Pr|Nd|Pm|Sm|Eu|Gd|Tb|Dy|Ho|Er|Tm|Yb|Lu|Th|Pa|U|Np|Pu|Am|Cm|Bk|Cf|Es|Fm|Md|No|Lr)\d*)+)\b'
        cleaned_text = re.sub(pattern1, "", p_text2)
        cleaned_text = re.sub('  ,+|,  +', ' ', cleaned_text)
        cleaned_text = re.sub(' +', ' ', cleaned_text)
        cleaned_text = re.sub('\.+', '.', cleaned_text)
        cleaned_text = re.sub('[0-9] [0-9] +', ' ', cleaned_text)
        cleaned_text = re.sub('( )', ' ', cleaned_text)
        cleaned_text=cleaned_text.strip()
        return cleaned_text 

    def get_text_embedding(self, text):
        input_texts = []
        text = self.preprocessing_patent_data(text)
        logging.info('Input Text Processed')
        input_texts.append(text)
        batch_dict = self.text_tokenizer (input_texts, max_length=1024, padding=True, truncation=True, return_tensors='pt').to(self.device)
        if self.device == 'cuda':
            with torch.autocast(device_type="cuda", dtype=torch.float16):
                with torch.inference_mode():
                    outputs = self.text_model(**batch_dict)
        else:
            with torch.inference_mode():
                outputs = self.text_model(**batch_dict)
        embeddings = outputs.last_hidden_state[:, 0]
        embeddings = F.normalize(embeddings, p=2, dim=1)
        logging.info('Embd Normalized')
        values = embeddings[0].tolist()
        logging.info('Embd Created')
        return values

model = EmbeddingModels()
logging.info('Model Loaded!')

@app.post("/embed-text-gb/")
async def embed_text(payload: TextPayload):
    try:
        # Extract the text from the payload
        embeddings = model.get_text_embedding(payload.text)
        return embeddings
    except Exception as e:
        logging.info(f'Error: {e}')
        return {"error": str(e)}