|
import re |
|
|
|
import spacy |
|
from spacy import Language |
|
from spacy.tokenizer import Tokenizer |
|
from spacy.util import compile_suffix_regex, compile_infix_regex |
|
import os |
|
from dotenv import load_dotenv |
|
from src.utils.Event import Schedule |
|
from huggingface_hub import hf_hub_download |
|
import joblib |
|
from huggingface_hub import login |
|
from datetime import datetime |
|
|
|
from src.utils.helpers import normalize_data |
|
from src.utils.markdown_processing.CustomMarkdownAnalyzer.MarkdownAnalyzer import MarkdownAnalyzer |
|
|
|
load_dotenv() |
|
token = os.getenv("HUGGING_FACE_SPACES_TOKEN") |
|
login(token=token) |
|
|
|
|
|
class NLPProcessor: |
|
def __init__(self, language="de"): |
|
self.nlp = spacy.blank(language) |
|
self._configure_pipeline() |
|
|
|
def _configure_pipeline(self): |
|
self.nlp.add_pipe("sentencizer", config={"punct_chars": [".", "!", "?"]}) |
|
self.nlp.add_pipe("custom_sentence_boundary", after="sentencizer") |
|
self._setup_tokenizer() |
|
self._setup_entity_ruler() |
|
|
|
def _setup_tokenizer(self): |
|
suffixes = list(self.nlp.Defaults.suffixes) + [r"\.", r"\,"] |
|
infixes = list(self.nlp.Defaults.infixes) + [r"(?<=\d)\.(?=\d)", r"(?<=\d)\:(?=\d)"] |
|
|
|
suffix_re = compile_suffix_regex(suffixes) |
|
infix_re = compile_infix_regex(infixes) |
|
|
|
self.nlp.tokenizer = Tokenizer(self.nlp.vocab, suffix_search=suffix_re.search, infix_finditer=infix_re.finditer) |
|
|
|
def _setup_entity_ruler(self): |
|
ruler = self.nlp.add_pipe("entity_ruler") |
|
ruler.add_patterns([ |
|
{"label": "DATE", |
|
"pattern": [{"SHAPE": "dd"}, {"ORTH": "."}, {"SHAPE": "dd"}, {"ORTH": "."}, {"SHAPE": "dddd"}]}, |
|
{"label": "TIME", "pattern": [{"SHAPE": "dd"}, {"ORTH": ":"}, {"SHAPE": "dd"}]} |
|
]) |
|
|
|
@staticmethod |
|
@Language.component("custom_sentence_boundary") |
|
def custom_sentence_boundary(doc): |
|
for token in doc[:-1]: |
|
if token.text.endswith(".") and token.nbor(1).is_digit: |
|
doc[token.i + 1].is_sent_start = False |
|
return doc |
|
|
|
|
|
class ScheduleFragment: |
|
def __init__(self, text, entities): |
|
self.text: str = text |
|
self.entities = entities |
|
|
|
try: |
|
self.date_classifier = self._load_classifier("adojode/date_classifier", "date_classifier") |
|
self.time_classifier = self._load_classifier("adojode/time_classifier", "time_classifier") |
|
except Exception as e: |
|
print(f"Exception loading classifiers: {e}") |
|
self._tokens = self.__tokenize() |
|
|
|
def __repr__(self): |
|
return f"ScheduleFragment(text={self.text!r}, entities={self.entities}, tokens={self._tokens})" |
|
|
|
@staticmethod |
|
def _load_classifier(repo_id, model_name): |
|
return joblib.load( |
|
hf_hub_download(repo_id=repo_id, filename=model_name + ".pkl") |
|
) |
|
|
|
def __tokenize(self, window_size=4): |
|
if not self.text: |
|
return [] |
|
|
|
tokens = self.text.split() |
|
token_objects = [] |
|
|
|
for token in tokens: |
|
token_label = "NEUTRAL" |
|
|
|
|
|
if token in ["+", "-"]: |
|
token_label = "CONNECTOR" |
|
else: |
|
|
|
for entity in self.entities: |
|
if entity.text in token: |
|
token_label = entity.label_ |
|
break |
|
|
|
token_objects.append(Token(token.strip(), token_label)) |
|
|
|
processed_tokens = token_objects.copy() |
|
|
|
for i in range(len(token_objects)): |
|
if token_objects[i].label in ["TIME", "DATE"]: |
|
window_text = [ |
|
"[TIME]" if t.label == "TIME" else "[DATE]" if t.label == "DATE" else t.text |
|
for t in token_objects |
|
] |
|
start = max(i - (window_size - 1), 0) |
|
end = min(start+window_size, len(token_objects)-1 ) |
|
classification_sum = {} |
|
num_iterations = 0 |
|
|
|
while start <= i and end <= len(token_objects): |
|
window = window_text[start:end] |
|
|
|
|
|
classification = ( |
|
self.time_classifier(" ".join(window)) |
|
if token_objects[i].label == "TIME" |
|
else self.date_classifier(" ".join(window)) |
|
) |
|
|
|
|
|
for label, score in classification.items(): |
|
classification_sum[label] = classification_sum.get(label, 0) + score |
|
|
|
start += 1 |
|
end += 1 |
|
num_iterations += 1 |
|
|
|
|
|
|
|
if num_iterations > 0: |
|
for label in classification_sum: |
|
classification_sum[label] /= num_iterations |
|
|
|
|
|
new_label, max_value = max(classification_sum.items(), key=lambda x: x[1]) |
|
|
|
if max_value < 0.8: |
|
new_label = "NEUTRAL" |
|
processed_tokens[i] = Token(token_objects[i].text, new_label) |
|
|
|
return processed_tokens |
|
|
|
@property |
|
def tokens(self): |
|
return self._tokens |
|
|
|
class Token: |
|
def __init__(self, text, label): |
|
self.text = text |
|
self.label = label |
|
|
|
def __repr__(self): |
|
return f"Token(text={self.text!r}, label={self.label!r})" |
|
|
|
|
|
class ScheduleExtractor(NLPProcessor): |
|
def __init__(self): |
|
super().__init__() |
|
|
|
def extract(self, md): |
|
analyzer = MarkdownAnalyzer(md) |
|
md_elements = analyzer.identify_all().get("block_elements") |
|
|
|
schedule_fragments = [] |
|
for element in md_elements: |
|
text = self.__preprocess(element.text) |
|
doc = self.nlp(text) |
|
for sent in doc.sents: |
|
doc_small = self.nlp(sent.text) |
|
if doc_small.ents: |
|
schedule_fragment = ScheduleFragment(text=doc_small.text, entities=doc_small.ents) |
|
schedule_fragments.append(schedule_fragment) |
|
date_times = [] |
|
for fragment in schedule_fragments: |
|
filtered_tokens = [token for token in fragment.tokens if token.label != "NEUTRAL"] |
|
|
|
start_date = end_date = start_time = end_time = admittance_time = None |
|
|
|
for i, token in enumerate(filtered_tokens): |
|
if token.label == "EVENT_DATE": |
|
if i + 1 < len(filtered_tokens) and filtered_tokens[i + 1].text == "-" or start_date is None: |
|
start_date = datetime.strptime(token.text, "%d.%m.%Y") |
|
elif i - 1 >= 0 and filtered_tokens[i - 1].text == "-" or end_date is None: |
|
end_date = datetime.strptime(token.text, "%d.%m.%Y") |
|
|
|
elif token.label == "EVENT_TIME": |
|
if i + 1 < len(filtered_tokens) and filtered_tokens[i + 1].text == "-" or start_time is None: |
|
start_time = datetime.combine(datetime.today().date(), |
|
datetime.strptime(token.text, "%H:%M").time()) |
|
elif i - 1 >= 0 and filtered_tokens[i - 1].text == "-" or end_time is None: |
|
end_time = datetime.combine(datetime.today().date(), |
|
datetime.strptime(token.text, "%H:%M").time()) |
|
|
|
elif token.label == "ADMITTANCE_TIME": |
|
admittance_time = datetime.combine(datetime.today().date(), |
|
datetime.strptime(token.text, "%H:%M").time()) |
|
|
|
if start_date and end_date and start_time and end_time and admittance_time: |
|
date_times.append(Schedule(start_date, end_date, start_time, end_time, admittance_time)) |
|
start_date = end_date = start_time = end_time = admittance_time = None |
|
|
|
date_times.append(Schedule(start_date, end_date, start_time, end_time, admittance_time)) |
|
date_times = self.__remove_subsets(date_times) |
|
return list(set(date_times)) |
|
|
|
def __remove_subsets(self,date_times): |
|
filtered = [] |
|
for dt in date_times: |
|
|
|
if any( |
|
all( |
|
v1 == v2 or v1 is None |
|
for v1, v2 in zip(dt.__dict__.values(), other.__dict__.values()) |
|
) |
|
for other in filtered |
|
): |
|
continue |
|
|
|
|
|
filtered = [ |
|
other for other in filtered if not all( |
|
v2 == v1 or v2 is None |
|
for v1, v2 in zip(dt.__dict__.values(), other.__dict__.values()) |
|
) |
|
] |
|
|
|
filtered.append(dt) |
|
return filtered |
|
|
|
|
|
def __preprocess(self, text): |
|
text = normalize_data(text) |
|
replacements = {" und ": " + ", " bis ": " - ", " um ": " ", " ab ": " ", " zum ": " ", ",": ""} |
|
for old, new in replacements.items(): |
|
text = text.replace(old, new) |
|
|
|
|
|
text = re.sub(r"(?<!\d)(\d{2}\.\d{2}\.\d{4})(?!\d)", r" \1 ", text) |
|
|
|
|
|
text = re.sub(r"(?<!\d)(\d{2}:\d{2})(?!\d)", r" \1 ", text) |
|
|
|
|
|
text = re.sub(r"\s+", " ", text).strip() |
|
|
|
return text |
|
|
|
|