Spaces:
Running
Running
# import gradio as gr | |
# import time # 模拟处理耗时 | |
# import os | |
# import spacy | |
# from spacy import displacy | |
# import pandas as pd | |
# nlp = spacy.load("en_core_web_md") | |
# def process_api(input_text): | |
# # 这里编写实际的后端处理逻辑 | |
# return { | |
# "status": "success", | |
# # "result": f"Processed: {input_text.upper()}", | |
# "result": f"Processed: {nlp(input_text).to_json()}", | |
# "timestamp": time.time() | |
# } | |
# # 设置API格式为JSON | |
# gr.Interface( | |
# fn=process_api, | |
# inputs="text", | |
# outputs="json", | |
# title="Backend API", | |
# allow_flagging="never" | |
# ).launch() | |
import gradio as gr | |
import time | |
import spacy | |
from spacy.tokens import Span, Doc, Token | |
from spacy.language import Language | |
import llm_ent_extract | |
import regex_spatial | |
import re | |
colors = {'GPE': "#43c6fc", "LOC": "#fd9720", "RSE":"#a6e22d"} | |
options = {"ents": ['GPE', 'LOC', "RSE"], "colors": colors} | |
HTML_WRAPPER = """<div style="overflow-x: auto; border: none solid #a6e22d; border-radius: 0.25rem; padding: 1rem">{}</div>""" | |
BASE_URL = "" | |
model = "" | |
types = "" | |
nlp = spacy.load("en_core_web_md") | |
gpe_selected = 'GPE' | |
loc_selected = 'loc' | |
rse_selected = 'rse' | |
rse_id = "rse_id" | |
def set_selected_entities(doc): | |
global gpe_selected, loc_selected, rse_selected, model | |
ents = [ent for ent in doc.ents if ent.label_ == gpe_selected or ent.label_ == loc_selected or ent.label_ == rse_selected] | |
doc.ents = ents | |
return doc | |
def update_entities(doc, entity_texts, replace=True): | |
""" | |
根据给定的文本内容标注实体,并直接修改 doc.ents。 | |
:param doc: spaCy 解析后的 Doc 对象 | |
:param entity_texts: 字典,键是要标注的实体文本,值是对应的实体类别 | |
:param replace: 布尔值,True 则替换现有实体,False 则保留现有实体并添加新的 | |
""" | |
new_ents = list(doc.ents) if not replace else [] # 如果 replace=False,保留已有实体 | |
for ent_text, ent_label in entity_texts.items(): | |
start = doc.text.find(ent_text) # 在全文中查找文本位置 | |
if start != -1: | |
start_token = len(doc.text[:start].split()) # 计算起始 token 索引 | |
end_token = start_token + len(ent_text.split()) # 计算结束 token 索引 | |
if start_token < len(doc) and end_token <= len(doc): # 确保索引不越界 | |
new_ent = Span(doc, start_token, end_token, label=ent_label) | |
new_ents.append(new_ent) | |
doc.set_ents(new_ents) # 更新 doc.ents | |
def find_ent_by_regex(doc, sentence, ent, regex): | |
global id | |
if id == "": | |
id = ent.text | |
for match in re.finditer(regex, doc.text): | |
start, end = match.span() | |
if(start>= sentence.start_char and start<= sentence.end_char): | |
span = doc.char_span(start, end) | |
if span is not None: | |
id = span.text +"_"+ id | |
if(start > ent.end_char): | |
ent.end_char = end | |
else: | |
ent.start_char = start | |
return ent | |
return ent | |
def set_extension(): | |
Span.set_extension(rse_id, default="", force=True) | |
Doc.set_extension(rse_id, default="", force=True) | |
Token.set_extension(rse_id, default="", force=True) | |
def get_level1(doc, sentence, ent): | |
return find_ent_by_regex(doc, sentence, ent, regex_spatial.get_level1_regex()) | |
def get_level2(doc, sentence, ent): | |
return find_ent_by_regex(doc, sentence, ent, regex_spatial.get_level2_regex()) | |
def get_level3(doc, sentence, ent): | |
return find_ent_by_regex(doc, sentence, ent, regex_spatial.get_level3_regex()) | |
def get_relative_entity(doc, sentence, ent): | |
global id | |
id = "" | |
rel_entity = get_level1(doc, sentence, ent) | |
# print(1111 ,rel_entity) | |
rel_entity = get_level2(doc, sentence, rel_entity) | |
# print(2222 ,rel_entity) | |
rel_entity = get_level3(doc, sentence, rel_entity) | |
# print(3333 ,rel_entity) | |
if("_" in id): | |
rel_entity = doc.char_span(rel_entity.start_char, rel_entity.end_char, "RSE") | |
rel_entity._.rse_id = id | |
# print(id, 'idid') | |
# print(rel_entity._.rse_id, '._._') | |
return rel_entity | |
rel_entity = doc.char_span(ent.start_char, ent.end_char, ent.label_) | |
rel_entity._.rse_id = id | |
# print(4444 ,rel_entity) | |
return rel_entity | |
def get_spatial_ent(doc): | |
set_extension() | |
new_ents = [] | |
# ents = [ent for ent in doc.ents if ent.label_ == "GPE" or ent.label_ == "LOC"] # 筛选出ase | |
# LLM 输出 | |
# GPE = '[###Pyrmont###, ###Glebe###]' # LLM 输出的实体 | |
GPE = llm_ent_extract.extract_GPE(doc.text) # LLM 输出的实体 | |
print(doc.text, 'llmin') | |
print(GPE, 'llout') | |
GPE = llm_ent_extract.extract(GPE, 'GPE') | |
print(GPE, 'llmout2') | |
update_entities(doc, GPE, True) | |
ents = doc.ents | |
print(ents, 'eee') | |
end = None | |
for ent in ents: | |
if ent.end != len(doc): | |
next_token = doc[ent.end] | |
if end is not None: | |
start = end | |
else: | |
start = ent.sent.start | |
if next_token.text.lower() in regex_spatial.get_keywords(): | |
end = next_token.i | |
else: | |
end = ent.end | |
else: | |
start = ent.sent.start | |
end = ent.end | |
# print(doc, '//',start, '//', end, 999888) | |
# print(doc[start],'//', doc[end]) | |
# print(ents, 999) | |
rsi_ent = get_relative_entity(doc,Span(doc, start, end), ent) | |
# print(doc.ents[0]._.rse_id, '._._2') | |
# print(rsi_ent.text, rsi_ent.label_, rsi_ent._.rse_id) | |
new_ents.append(rsi_ent) | |
doc.ents = new_ents | |
return doc | |
nlp.add_pipe("spatial_pipeline", after="ner") | |
def extract_spatial_entities(text): | |
doc = nlp(text) | |
# 分句处理 | |
sent_ents = [] | |
sent_texts = [] | |
sent_rse_id = [] | |
offset = 0 # 记录当前 token 偏移量 | |
sent_start_positions = [0] # 记录句子信息 | |
doc_copy = doc.copy() # 用于展示方程组合 | |
for sent in doc.sents: | |
sent_doc = nlp(sent.text) # 逐句处理 | |
sent_doc = set_selected_entities(sent_doc) # 这里处理实体 | |
sent_texts.append(sent_doc.text) | |
for ent in sent_doc.ents: | |
sent_rse_id.append(ent._.rse_id) | |
# **调整每个实体的索引,使其匹配完整文本** | |
for ent in sent_doc.ents: | |
new_ent = Span(doc, ent.start + offset, ent.end + offset, label=ent.label_) | |
sent_ents.append(new_ent) | |
offset += len(sent) # 更新偏移量 | |
sent_start_positions.append(sent_start_positions[-1] + len(sent)) # 记录句子起点 | |
# **创建新 Doc** | |
final_doc = Doc(nlp.vocab, words=[token.text for token in doc], spaces=[token.whitespace_ for token in doc]) | |
for i in sent_start_positions: # 手动标记句子起始点 | |
if i < len(final_doc): | |
final_doc[i].is_sent_start = True | |
# **设置实体** | |
final_doc.set_ents(sent_ents) | |
for i in range(len(sent_rse_id)): | |
final_doc.ents[i]._.rse_id = sent_rse_id[i] | |
doc = final_doc | |
ents_ext = [] | |
for ent in doc.ents: | |
ents_ext.append({ | |
"start": ent.start_char, | |
"end": ent.end_char, | |
"label": ent.label_, | |
"rse_id": ent._.rse_id # ✅ 加入扩展字段 | |
}) | |
return { | |
"text": doc.text, | |
"ents": [{"start": ent.start_char, "end": ent.end_char, "label": ent.label_} for ent in doc.ents], | |
"tokens": [{"id": i, "start": token.idx, "end": token.idx + len(token)} for i, token in enumerate(doc)], | |
"ents_ext": ents_ext # ✅ 添加扩展字段 | |
} | |
def process_api(input_text): | |
# 这里编写实际的后端处理逻辑 | |
# return { | |
# "status": "success", | |
# # "result": f"Processed: {input_text.upper()}", | |
# # "result": f"Processed: {nlp(input_text).to_json()}", | |
# "result": f"Processed: {extract_spatial_entities(input_text)}", | |
# "timestamp": time.time() | |
# } | |
return extract_spatial_entities(input_text) | |
# 设置API格式为JSON | |
gr.Interface( | |
fn=process_api, | |
inputs="text", | |
outputs="json", | |
title="Backend API", | |
allow_flagging="never" | |
).launch() |