# import gradio as gr # import time # 模拟处理耗时 # import os # import spacy # from spacy import displacy # import pandas as pd # nlp = spacy.load("en_core_web_md") # def process_api(input_text): # # 这里编写实际的后端处理逻辑 # return { # "status": "success", # # "result": f"Processed: {input_text.upper()}", # "result": f"Processed: {nlp(input_text).to_json()}", # "timestamp": time.time() # } # # 设置API格式为JSON # gr.Interface( # fn=process_api, # inputs="text", # outputs="json", # title="Backend API", # allow_flagging="never" # ).launch() import gradio as gr import time import spacy from spacy.tokens import Span, Doc, Token from spacy.language import Language import llm_ent_extract import regex_spatial import re colors = {'GPE': "#43c6fc", "LOC": "#fd9720", "RSE":"#a6e22d"} options = {"ents": ['GPE', 'LOC', "RSE"], "colors": colors} HTML_WRAPPER = """
{}
""" BASE_URL = "" model = "" types = "" nlp = spacy.load("en_core_web_md") gpe_selected = 'GPE' loc_selected = 'loc' rse_selected = 'rse' rse_id = "rse_id" def set_selected_entities(doc): global gpe_selected, loc_selected, rse_selected, model ents = [ent for ent in doc.ents if ent.label_ == gpe_selected or ent.label_ == loc_selected or ent.label_ == rse_selected] doc.ents = ents return doc def update_entities(doc, entity_texts, replace=True): """ 根据给定的文本内容标注实体,并直接修改 doc.ents。 :param doc: spaCy 解析后的 Doc 对象 :param entity_texts: 字典,键是要标注的实体文本,值是对应的实体类别 :param replace: 布尔值,True 则替换现有实体,False 则保留现有实体并添加新的 """ new_ents = list(doc.ents) if not replace else [] # 如果 replace=False,保留已有实体 for ent_text, ent_label in entity_texts.items(): start = doc.text.find(ent_text) # 在全文中查找文本位置 if start != -1: start_token = len(doc.text[:start].split()) # 计算起始 token 索引 end_token = start_token + len(ent_text.split()) # 计算结束 token 索引 if start_token < len(doc) and end_token <= len(doc): # 确保索引不越界 new_ent = Span(doc, start_token, end_token, label=ent_label) new_ents.append(new_ent) doc.set_ents(new_ents) # 更新 doc.ents def find_ent_by_regex(doc, sentence, ent, regex): global id if id == "": id = ent.text for match in re.finditer(regex, doc.text): start, end = match.span() if(start>= sentence.start_char and start<= sentence.end_char): span = doc.char_span(start, end) if span is not None: id = span.text +"_"+ id if(start > ent.end_char): ent.end_char = end else: ent.start_char = start return ent return ent def set_extension(): Span.set_extension(rse_id, default="", force=True) Doc.set_extension(rse_id, default="", force=True) Token.set_extension(rse_id, default="", force=True) def get_level1(doc, sentence, ent): return find_ent_by_regex(doc, sentence, ent, regex_spatial.get_level1_regex()) def get_level2(doc, sentence, ent): return find_ent_by_regex(doc, sentence, ent, regex_spatial.get_level2_regex()) def get_level3(doc, sentence, ent): return find_ent_by_regex(doc, sentence, ent, regex_spatial.get_level3_regex()) def get_relative_entity(doc, sentence, ent): global id id = "" rel_entity = get_level1(doc, sentence, ent) # print(1111 ,rel_entity) rel_entity = get_level2(doc, sentence, rel_entity) # print(2222 ,rel_entity) rel_entity = get_level3(doc, sentence, rel_entity) # print(3333 ,rel_entity) if("_" in id): rel_entity = doc.char_span(rel_entity.start_char, rel_entity.end_char, "RSE") rel_entity._.rse_id = id # print(id, 'idid') # print(rel_entity._.rse_id, '._._') return rel_entity rel_entity = doc.char_span(ent.start_char, ent.end_char, ent.label_) rel_entity._.rse_id = id # print(4444 ,rel_entity) return rel_entity @Language.component("spatial_pipeline") def get_spatial_ent(doc): set_extension() new_ents = [] # ents = [ent for ent in doc.ents if ent.label_ == "GPE" or ent.label_ == "LOC"] # 筛选出ase # LLM 输出 # GPE = '[###Pyrmont###, ###Glebe###]' # LLM 输出的实体 GPE = llm_ent_extract.extract_GPE(doc.text) # LLM 输出的实体 print(doc.text, 'llmin') print(GPE, 'llout') GPE = llm_ent_extract.extract(GPE, 'GPE') print(GPE, 'llmout2') update_entities(doc, GPE, True) ents = doc.ents print(ents, 'eee') end = None for ent in ents: if ent.end != len(doc): next_token = doc[ent.end] if end is not None: start = end else: start = ent.sent.start if next_token.text.lower() in regex_spatial.get_keywords(): end = next_token.i else: end = ent.end else: start = ent.sent.start end = ent.end # print(doc, '//',start, '//', end, 999888) # print(doc[start],'//', doc[end]) # print(ents, 999) rsi_ent = get_relative_entity(doc,Span(doc, start, end), ent) # print(doc.ents[0]._.rse_id, '._._2') # print(rsi_ent.text, rsi_ent.label_, rsi_ent._.rse_id) new_ents.append(rsi_ent) doc.ents = new_ents return doc nlp.add_pipe("spatial_pipeline", after="ner") def extract_spatial_entities(text): doc = nlp(text) # 分句处理 sent_ents = [] sent_texts = [] sent_rse_id = [] offset = 0 # 记录当前 token 偏移量 sent_start_positions = [0] # 记录句子信息 doc_copy = doc.copy() # 用于展示方程组合 for sent in doc.sents: sent_doc = nlp(sent.text) # 逐句处理 sent_doc = set_selected_entities(sent_doc) # 这里处理实体 sent_texts.append(sent_doc.text) for ent in sent_doc.ents: sent_rse_id.append(ent._.rse_id) # **调整每个实体的索引,使其匹配完整文本** for ent in sent_doc.ents: new_ent = Span(doc, ent.start + offset, ent.end + offset, label=ent.label_) sent_ents.append(new_ent) offset += len(sent) # 更新偏移量 sent_start_positions.append(sent_start_positions[-1] + len(sent)) # 记录句子起点 # **创建新 Doc** final_doc = Doc(nlp.vocab, words=[token.text for token in doc], spaces=[token.whitespace_ for token in doc]) for i in sent_start_positions: # 手动标记句子起始点 if i < len(final_doc): final_doc[i].is_sent_start = True # **设置实体** final_doc.set_ents(sent_ents) for i in range(len(sent_rse_id)): final_doc.ents[i]._.rse_id = sent_rse_id[i] doc = final_doc ents_ext = [] for ent in doc.ents: ents_ext.append({ "start": ent.start_char, "end": ent.end_char, "label": ent.label_, "rse_id": ent._.rse_id # ✅ 加入扩展字段 }) return { "text": doc.text, "ents": [{"start": ent.start_char, "end": ent.end_char, "label": ent.label_} for ent in doc.ents], "tokens": [{"id": i, "start": token.idx, "end": token.idx + len(token)} for i, token in enumerate(doc)], "ents_ext": ents_ext # ✅ 添加扩展字段 } def process_api(input_text): # 这里编写实际的后端处理逻辑 # return { # "status": "success", # # "result": f"Processed: {input_text.upper()}", # # "result": f"Processed: {nlp(input_text).to_json()}", # "result": f"Processed: {extract_spatial_entities(input_text)}", # "timestamp": time.time() # } return extract_spatial_entities(input_text) # 设置API格式为JSON gr.Interface( fn=process_api, inputs="text", outputs="json", title="Backend API", allow_flagging="never" ).launch()