manaviel85370
create new date extractor, optimize testing and pipelines
58c260c
import re
import spacy
from spacy import Language
from spacy.tokenizer import Tokenizer
from spacy.util import compile_suffix_regex, compile_infix_regex
import os
from dotenv import load_dotenv
from src.utils.Event import Schedule
from huggingface_hub import hf_hub_download
import joblib
from huggingface_hub import login
from datetime import datetime
from src.utils.helpers import normalize_data
from src.utils.markdown_processing.CustomMarkdownAnalyzer.MarkdownAnalyzer import MarkdownAnalyzer
load_dotenv()
token = os.getenv("HUGGING_FACE_SPACES_TOKEN")
login(token=token)
class NLPProcessor:
def __init__(self, language="de"):
self.nlp = spacy.blank(language)
self._configure_pipeline()
def _configure_pipeline(self):
self.nlp.add_pipe("sentencizer", config={"punct_chars": [".", "!", "?"]})
self.nlp.add_pipe("custom_sentence_boundary", after="sentencizer")
self._setup_tokenizer()
self._setup_entity_ruler()
def _setup_tokenizer(self):
suffixes = list(self.nlp.Defaults.suffixes) + [r"\.", r"\,"]
infixes = list(self.nlp.Defaults.infixes) + [r"(?<=\d)\.(?=\d)", r"(?<=\d)\:(?=\d)"]
suffix_re = compile_suffix_regex(suffixes)
infix_re = compile_infix_regex(infixes)
self.nlp.tokenizer = Tokenizer(self.nlp.vocab, suffix_search=suffix_re.search, infix_finditer=infix_re.finditer)
def _setup_entity_ruler(self):
ruler = self.nlp.add_pipe("entity_ruler")
ruler.add_patterns([
{"label": "DATE",
"pattern": [{"SHAPE": "dd"}, {"ORTH": "."}, {"SHAPE": "dd"}, {"ORTH": "."}, {"SHAPE": "dddd"}]},
{"label": "TIME", "pattern": [{"SHAPE": "dd"}, {"ORTH": ":"}, {"SHAPE": "dd"}]}
])
@staticmethod
@Language.component("custom_sentence_boundary")
def custom_sentence_boundary(doc):
for token in doc[:-1]:
if token.text.endswith(".") and token.nbor(1).is_digit:
doc[token.i + 1].is_sent_start = False
return doc
class ScheduleFragment:
def __init__(self, text, entities):
self.text: str = text
self.entities = entities
# self.label: str = None
try:
self.date_classifier = self._load_classifier("adojode/date_classifier", "date_classifier")
self.time_classifier = self._load_classifier("adojode/time_classifier", "time_classifier")
except Exception as e:
print(f"Exception loading classifiers: {e}")
self._tokens = self.__tokenize()
def __repr__(self):
return f"ScheduleFragment(text={self.text!r}, entities={self.entities}, tokens={self._tokens})"
@staticmethod
def _load_classifier(repo_id, model_name):
return joblib.load(
hf_hub_download(repo_id=repo_id, filename=model_name + ".pkl")
)
def __tokenize(self, window_size=4):
if not self.text:
return []
tokens = self.text.split()
token_objects = []
for token in tokens:
token_label = "NEUTRAL" # Standard-Label
# Prüfe auf Connector (+ oder -)
if token in ["+", "-"]:
token_label = "CONNECTOR"
else:
# Prüfe, ob es sich um eine Entität (TIME, DATE) handelt
for entity in self.entities:
if entity.text in token:
token_label = entity.label_
break
token_objects.append(Token(token.strip(), token_label)) # Token-Objekt erstellen
processed_tokens = token_objects.copy()
# Sliding-Window-Klassifikation für TIME und DATE
for i in range(len(token_objects)):
if token_objects[i].label in ["TIME", "DATE"]:
window_text = [
"[TIME]" if t.label == "TIME" else "[DATE]" if t.label == "DATE" else t.text
for t in token_objects
]
start = max(i - (window_size - 1), 0)
end = min(start+window_size, len(token_objects)-1 ) # Fenster endet bei der Entität selbst
classification_sum = {}
num_iterations = 0 # Zähler für Normalisierung
while start <= i and end <= len(token_objects):
window = window_text[start:end]
# Klassifikation durchführen
classification = (
self.time_classifier(" ".join(window))
if token_objects[i].label == "TIME"
else self.date_classifier(" ".join(window))
)
# Klassifikationen aufsummieren
for label, score in classification.items():
classification_sum[label] = classification_sum.get(label, 0) + score
start += 1
end += 1
num_iterations += 1
# Normalisierung
if num_iterations > 0:
for label in classification_sum:
classification_sum[label] /= num_iterations
# Bestes Label zuweisen
new_label, max_value = max(classification_sum.items(), key=lambda x: x[1])
if max_value < 0.8:
new_label = "NEUTRAL"
processed_tokens[i] = Token(token_objects[i].text, new_label) # Neues Token-Objekt mit aktualisiertem Label
return processed_tokens
@property
def tokens(self):
return self._tokens
class Token:
def __init__(self, text, label):
self.text = text
self.label = label
def __repr__(self):
return f"Token(text={self.text!r}, label={self.label!r})"
class ScheduleExtractor(NLPProcessor):
def __init__(self):
super().__init__()
def extract(self, md):
analyzer = MarkdownAnalyzer(md)
md_elements = analyzer.identify_all().get("block_elements")
schedule_fragments = []
for element in md_elements:
text = self.__preprocess(element.text)
doc = self.nlp(text)
for sent in doc.sents:
doc_small = self.nlp(sent.text)
if doc_small.ents:
schedule_fragment = ScheduleFragment(text=doc_small.text, entities=doc_small.ents)
schedule_fragments.append(schedule_fragment)
date_times = []
for fragment in schedule_fragments:
filtered_tokens = [token for token in fragment.tokens if token.label != "NEUTRAL"]
start_date = end_date = start_time = end_time = admittance_time = None
for i, token in enumerate(filtered_tokens):
if token.label == "EVENT_DATE":
if i + 1 < len(filtered_tokens) and filtered_tokens[i + 1].text == "-" or start_date is None:
start_date = datetime.strptime(token.text, "%d.%m.%Y") # DD.MM.YYYY
elif i - 1 >= 0 and filtered_tokens[i - 1].text == "-" or end_date is None:
end_date = datetime.strptime(token.text, "%d.%m.%Y") # DD.MM.YYYY
elif token.label == "EVENT_TIME":
if i + 1 < len(filtered_tokens) and filtered_tokens[i + 1].text == "-" or start_time is None:
start_time = datetime.combine(datetime.today().date(),
datetime.strptime(token.text, "%H:%M").time())
elif i - 1 >= 0 and filtered_tokens[i - 1].text == "-" or end_time is None:
end_time = datetime.combine(datetime.today().date(),
datetime.strptime(token.text, "%H:%M").time())
elif token.label == "ADMITTANCE_TIME":
admittance_time = datetime.combine(datetime.today().date(),
datetime.strptime(token.text, "%H:%M").time())
if start_date and end_date and start_time and end_time and admittance_time:
date_times.append(Schedule(start_date, end_date, start_time, end_time, admittance_time))
start_date = end_date = start_time = end_time = admittance_time = None
date_times.append(Schedule(start_date, end_date, start_time, end_time, admittance_time))
date_times = self.__remove_subsets(date_times)
return list(set(date_times))
def __remove_subsets(self,date_times):
filtered = []
for dt in date_times:
# Falls dt in einem anderen Objekt vollständig enthalten ist, überspringen
if any(
all(
v1 == v2 or v1 is None # None-Werte ignorieren
for v1, v2 in zip(dt.__dict__.values(), other.__dict__.values())
)
for other in filtered
):
continue
# Entferne alle Objekte, die von dt überdeckt werden
filtered = [
other for other in filtered if not all(
v2 == v1 or v2 is None # Prüft, ob `other` in `dt` enthalten ist
for v1, v2 in zip(dt.__dict__.values(), other.__dict__.values())
)
]
filtered.append(dt)
return filtered
def __preprocess(self, text):
text = normalize_data(text)
replacements = {" und ": " + ", " bis ": " - ", " um ": " ", " ab ": " ", " zum ": " ", ",": ""}
for old, new in replacements.items():
text = text.replace(old, new)
# Leerzeichen vor und nach Datumsformat dd.mm.yyyy
text = re.sub(r"(?<!\d)(\d{2}\.\d{2}\.\d{4})(?!\d)", r" \1 ", text)
# Leerzeichen vor und nach Zeitformat hh:mm
text = re.sub(r"(?<!\d)(\d{2}:\d{2})(?!\d)", r" \1 ", text)
# Entferne doppelte Leerzeichen, die durch die Ersetzungen entstehen können
text = re.sub(r"\s+", " ", text).strip()
return text