File size: 8,275 Bytes
aa48222
 
 
 
 
 
c9c01b7
bd710a8
b2c2ee0
2b79b4e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aa48222
2b79b4e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e22c7fc
fe690f8
2b79b4e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fe690f8
2b79b4e
 
 
6107bcc
2b79b4e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c07d705
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2b79b4e
 
 
e22c7fc
 
4b6e8f7
444aed6
 
 
 
 
 
 
 
fb2d59a
e22c7fc
 
 
 
 
 
 
2b79b4e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# import gradio as gr
# import time  # 模拟处理耗时
# import os
# import spacy
# from spacy import displacy
# import pandas as pd



# nlp = spacy.load("en_core_web_md")
# def process_api(input_text):
#     # 这里编写实际的后端处理逻辑

#     return {
#         "status": "success",
#         # "result": f"Processed: {input_text.upper()}",
#         "result": f"Processed: {nlp(input_text).to_json()}",
#         "timestamp": time.time()
#     }

# # 设置API格式为JSON
# gr.Interface(
#     fn=process_api,
#     inputs="text",
#     outputs="json",
#     title="Backend API",
#     allow_flagging="never"
# ).launch()



import gradio as gr
import time
import spacy
from spacy.tokens import Span, Doc, Token
from spacy.language import Language
import llm_ent_extract
import regex_spatial
import re


colors = {'GPE': "#43c6fc", "LOC": "#fd9720", "RSE":"#a6e22d"}
options = {"ents": ['GPE', 'LOC', "RSE"], "colors": colors}
HTML_WRAPPER = """<div style="overflow-x: auto; border: none solid #a6e22d; border-radius: 0.25rem; padding: 1rem">{}</div>"""
BASE_URL = ""
model = ""
types = ""
nlp = spacy.load("en_core_web_md")

gpe_selected = 'GPE'
loc_selected = 'loc'
rse_selected = 'rse'
rse_id = "rse_id"
def set_selected_entities(doc):
    global gpe_selected, loc_selected, rse_selected, model
    ents = [ent for ent in doc.ents if ent.label_ == gpe_selected or ent.label_ == loc_selected or ent.label_ == rse_selected]

    doc.ents = ents
    return doc

def update_entities(doc, entity_texts, replace=True):
    """
    根据给定的文本内容标注实体,并直接修改 doc.ents。

    :param doc: spaCy 解析后的 Doc 对象
    :param entity_texts: 字典,键是要标注的实体文本,值是对应的实体类别
    :param replace: 布尔值,True 则替换现有实体,False 则保留现有实体并添加新的
    """
    new_ents = list(doc.ents) if not replace else []  # 如果 replace=False,保留已有实体

    for ent_text, ent_label in entity_texts.items():
        start = doc.text.find(ent_text)  # 在全文中查找文本位置
        if start != -1:
            start_token = len(doc.text[:start].split())  # 计算起始 token 索引
            end_token = start_token + len(ent_text.split())  # 计算结束 token 索引

            if start_token < len(doc) and end_token <= len(doc):  # 确保索引不越界
                new_ent = Span(doc, start_token, end_token, label=ent_label)
                new_ents.append(new_ent)

    doc.set_ents(new_ents)  # 更新 doc.ents
def find_ent_by_regex(doc, sentence, ent, regex):
  global id

  if id == "":
      id = ent.text
  for match in re.finditer(regex, doc.text):
        start, end = match.span()
        if(start>= sentence.start_char and start<= sentence.end_char):
          span = doc.char_span(start, end)
          if span is not None:
            id = span.text +"_"+ id
            if(start > ent.end_char):
              ent.end_char = end
            else:
              ent.start_char = start

          return ent

  return ent
def set_extension():
    Span.set_extension(rse_id, default="", force=True)
    Doc.set_extension(rse_id, default="", force=True)
    Token.set_extension(rse_id, default="", force=True)
def get_level1(doc, sentence, ent):
    return find_ent_by_regex(doc, sentence, ent, regex_spatial.get_level1_regex())

def get_level2(doc, sentence, ent):
  return find_ent_by_regex(doc, sentence, ent, regex_spatial.get_level2_regex())

def get_level3(doc, sentence, ent):
  return find_ent_by_regex(doc, sentence, ent, regex_spatial.get_level3_regex())
def get_relative_entity(doc, sentence, ent):
  global id

  id = ""
  rel_entity = get_level1(doc, sentence, ent)
  # print(1111 ,rel_entity)
  rel_entity = get_level2(doc, sentence, rel_entity)
  # print(2222 ,rel_entity)
  rel_entity = get_level3(doc, sentence, rel_entity)
  # print(3333 ,rel_entity)

  if("_" in id):
    rel_entity = doc.char_span(rel_entity.start_char, rel_entity.end_char, "RSE")
    rel_entity._.rse_id = id

    # print(id, 'idid')
    # print(rel_entity._.rse_id, '._._')

    return rel_entity
  rel_entity = doc.char_span(ent.start_char, ent.end_char, ent.label_)
  rel_entity._.rse_id = id
  # print(4444 ,rel_entity)
  return rel_entity

@Language.component("spatial_pipeline")
def get_spatial_ent(doc):
  set_extension()
  new_ents = []
  # ents = [ent for ent in doc.ents if ent.label_ == "GPE" or ent.label_ == "LOC"]        # 筛选出ase


  # LLM 输出
  # GPE = '[###Pyrmont###, ###Glebe###]'                       # LLM 输出的实体
  GPE = llm_ent_extract.extract_GPE(doc.text)                       # LLM 输出的实体
  print(doc.text, 'llmin')
  print(GPE, 'llout')

  GPE = llm_ent_extract.extract(GPE, 'GPE')
  print(GPE, 'llmout2')
  update_entities(doc, GPE, True)
  ents = doc.ents
  print(ents, 'eee')
  end = None
  for ent in ents:

    if ent.end != len(doc):
        next_token = doc[ent.end]
        if end is not None:
          start = end
        else:
          start = ent.sent.start
        if next_token.text.lower() in regex_spatial.get_keywords():
          end = next_token.i
        else:
          end = ent.end

    else:
        start = ent.sent.start
        end = ent.end

    # print(doc, '//',start, '//', end, 999888)
    # print(doc[start],'//', doc[end])
    # print(ents, 999)


    rsi_ent = get_relative_entity(doc,Span(doc, start, end), ent)
    # print(doc.ents[0]._.rse_id, '._._2')


    # print(rsi_ent.text, rsi_ent.label_, rsi_ent._.rse_id)
    new_ents.append(rsi_ent)

  doc.ents = new_ents
  return doc
nlp.add_pipe("spatial_pipeline", after="ner")
def extract_spatial_entities(text):


    
    doc = nlp(text)

    # 分句处理
    sent_ents = []
    sent_texts = []
    sent_rse_id = []
    offset = 0                              # 记录当前 token 偏移量
    sent_start_positions = [0]              # 记录句子信息
    doc_copy = doc.copy()                   # 用于展示方程组合
    for sent in doc.sents:

        sent_doc = nlp(sent.text)  # 逐句处理
        sent_doc = set_selected_entities(sent_doc)  # 这里处理实体
        sent_texts.append(sent_doc.text)

        for ent in sent_doc.ents:
            sent_rse_id.append(ent._.rse_id)
        # **调整每个实体的索引,使其匹配完整文本**
        for ent in sent_doc.ents:
            new_ent = Span(doc, ent.start + offset, ent.end + offset, label=ent.label_)
            sent_ents.append(new_ent)

        offset += len(sent)  # 更新偏移量
        sent_start_positions.append(sent_start_positions[-1] + len(sent))           # 记录句子起点
    # **创建新 Doc**
    final_doc = Doc(nlp.vocab, words=[token.text for token in doc], spaces=[token.whitespace_ for token in doc])
    for i in sent_start_positions:                      # 手动标记句子起始点
        if i < len(final_doc):
            final_doc[i].is_sent_start = True
    # **设置实体**
    final_doc.set_ents(sent_ents)

    for i in range(len(sent_rse_id)):
        final_doc.ents[i]._.rse_id = sent_rse_id[i]

    doc = final_doc

    ents_ext = []
    for ent in doc.ents:
        ents_ext.append({
            "start": ent.start_char,
            "end": ent.end_char,
            "label": ent.label_,
            "rse_id": ent._.rse_id  # ✅ 加入扩展字段
        })

    return {
        "text": doc.text,
        "ents": [{"start": ent.start_char, "end": ent.end_char, "label": ent.label_} for ent in doc.ents],
        "tokens": [{"id": i, "start": token.idx, "end": token.idx + len(token)} for i, token in enumerate(doc)],
        "ents_ext": ents_ext  # ✅ 添加扩展字段
    }



def process_api(input_text):
    # 这里编写实际的后端处理逻辑

    # return {
    #     "status": "success",
    #     # "result": f"Processed: {input_text.upper()}",
    #     # "result": f"Processed: {nlp(input_text).to_json()}",
    #     "result": f"Processed: {extract_spatial_entities(input_text)}",
    #     "timestamp": time.time()
    # }
    return extract_spatial_entities(input_text)

# 设置API格式为JSON
gr.Interface(
    fn=process_api,
    inputs="text",
    outputs="json",
    title="Backend API",
    allow_flagging="never"
).launch()