manaviel85370
create new date extractor, optimize testing and pipelines
58c260c
import os
import re
from datetime import datetime
import joblib
import spacy
from dotenv import load_dotenv
from huggingface_hub import hf_hub_download, login
from spacy import Language
from spacy.tokenizer import Tokenizer
from spacy.util import compile_suffix_regex, compile_infix_regex
from src.resources.TEXTS import TEXTS
from src.utils.Event import Schedule
from src.utils.helpers import normalize_data
from src.utils.markdown_processing.CustomMarkdownAnalyzer.MarkdownAnalyzer import MarkdownAnalyzer
load_dotenv()
token = os.getenv("HUGGING_FACE_SPACES_TOKEN")
login(token=token)
placeholder = {
"DATE_RANGE_TIME_RANGE": "[DATE] [TIME] - [DATE] [TIME]",
"DATE_RANGE": "[DATE] - [DATE]",
"DATE_TIME_RANGE": "[DATE] [TIME] - [TIME]",
"TIME_RANGE": "[TIME] - [TIME]",
"DATE_TIME": "[DATE] [TIME]",
"DATE": "[DATE]",
"TIME": "[TIME]"
}
def convert_to_schedule(date_time, label):
print("Converting ", date_time, label)
try:
if label == "DATE_RANGE_TIME_RANGE":
return Schedule(
start_date=datetime.strptime(date_time[0], "%d.%m.%Y").date(),
end_date=datetime.strptime(date_time[2], "%d.%m.%Y").date(),
start_time=datetime.strptime(date_time[1], "%H:%M").time(),
end_time=datetime.strptime(date_time[3], "%H:%M").time(),
admittance_time=None
)
if label == "DATE_RANGE":
return Schedule(
start_date=datetime.strptime(date_time[0], "%d.%m.%Y").date(),
end_date=datetime.strptime(date_time[1], "%d.%m.%Y").date(),
start_time=None,
end_time=None,
admittance_time=None
)
if label == "DATE_TIME_RANGE":
return Schedule(
start_date=datetime.strptime(date_time[0], "%d.%m.%Y").date(),
end_date=None,
start_time=datetime.strptime(date_time[1], "%H:%M").time(),
end_time=datetime.strptime(date_time[2], "%H:%M").time(),
admittance_time=None
)
if label == "TIME_RANGE":
return Schedule(
start_date=None,
end_date=None,
start_time=datetime.strptime(date_time[0], "%H:%M").time(),
end_time=datetime.strptime(date_time[1], "%H:%M").time(),
admittance_time=None
)
if label == "DATE_TIME":
return Schedule(
start_date=datetime.strptime(date_time[0], "%d.%m.%Y").date(),
end_date=None,
start_time=datetime.strptime(date_time[1], "%H:%M").time(),
end_time=None,
admittance_time=None
)
if label == "DATE":
return Schedule(
start_date=datetime.strptime(date_time, "%d.%m.%Y").date(),
end_date=None,
start_time=None,
end_time=None,
admittance_time=None
)
if label == "TIME":
return Schedule(
start_date=None,
end_date=None,
start_time=datetime.strptime(date_time, "%H:%M").time(),
end_time=None,
admittance_time=None
)
except Exception as e:
print(e)
return None
def _load_classifier(repo_id, model_name):
return joblib.load(
hf_hub_download(repo_id=repo_id, filename=model_name + ".pkl")
)
def classify_date_time(date_times, label, text):
# Text anhand des Platzhalters [LABEL] in Segmente teilen
segments = text.split(f"[{label}]")
tokens = []
# print(date_times)
date_time_positions = []
for i, segment in enumerate(segments):
tokens.extend(segment.split()) # Segment als Token hinzufügen
if i < len(date_times): # Falls noch Date-Times übrig sind
tokens.append(placeholder.get(label, "ERROR")) # Date-Time als eigenes Token einfügen
date_time_positions.append(len(tokens)-1)
# print("TOKENS:", tokens)
# print(date_time_positions)
# print(len(date_time_positions)==len(date_times))
# sliding window classification
window_size = 5
event_date_total = 0
other_total = 0
schedules = []
for i, date_time in enumerate(date_times):
# Berechne den Start-Index für das Fenster
start = max(0, date_time_positions[i] - (window_size - 1))
# Führe Klassifikation für jedes Fenster durch
while start + window_size <= len(tokens): # Solange das Fenster in den Tokens bleibt
window = tokens[start:start + window_size]
# print(window)
# Klassifikation durchführen
if label == "TIME":
time_class = time_classifier(" ".join(window))
# print(time_class)
else:
date_class = date_classifier(" ".join(window))
# print(date_class)
# Aufaddieren der Werte
event_date_total += date_class.get('EVENT_DATE', 0)
other_total += date_class.get('OTHER', 0)
# Fenster verschieben
start += 1
# Rückgabe der Gesamtsummen
if label == "TIME":
pass
else:
# print("Gesamtsumme EVENT_DATE:", event_date_total)
# print("Gesamtsumme OTHER:", other_total)
if event_date_total > other_total:
schedule = convert_to_schedule(date_time, label)
schedules.append(schedule)
# print(date_time)
# print("EVENT DATE: ", schedule)
return schedules
try:
date_classifier = _load_classifier("adojode/date_classifier", "date_classifier")
time_classifier = _load_classifier("adojode/time_classifier", "time_classifier")
except Exception as e:
print("Error loading classifier models from hugging face: ", e)
def extract_schedules(text):
try:
normalized = normalize_data(text)
# print("*"*100)
# print(normalized)
# print("*"*100)
cleaned = re.sub(r"\*", " ", normalized)
cleaned = re.sub(r"=", " ", cleaned)
cleaned = re.sub(r"#", " ", cleaned)
cleaned = re.sub(r"(-|—|–|bis)", "-", cleaned)
cleaned = re.sub(r"(und|sowie)", "+", cleaned)
# cleaned = re.sub( r"\b(?:mo|di|mi|do|fr|sa|so|montag|dienstag|mittwoch|donnerstag|freitag|samstag|sonntag)(?:s?)\b",
# " ", cleaned, flags=re.IGNORECASE)
cleaned = re.sub(r"(von|vom|am|um|ab)", " ", cleaned, flags=re.IGNORECASE)
cleaned = re.sub(r",", " ", cleaned)
cleaned = re.sub(r"\|", " ", cleaned)
cleaned = re.sub(r"\s+", " ", cleaned)
matches = {}
# Match für das Datum und die Zeit mit einer Zeitspanne
date_range_time_range_pattern = r"(\d{2}\.\d{2}\.\d{4})\s*(\d{2}:\d{2})\s*-\s*(\d{2}\.\d{2}\.\d{4})\s*(\d{2}:\d{2})"
match = re.findall(date_range_time_range_pattern, cleaned)
if match:
matches["DATE_RANGE_TIME_RANGE"] = match
# print("DATE_RANGE_TIME_RANGE matches:", matches["DATE_RANGE_TIME_RANGE"])
cleaned = re.sub(date_range_time_range_pattern, "[DATE_RANGE_TIME_RANGE]", cleaned)
# Match für das Datum mit einem Zeitraum ohne Zeitangabe
date_range_pattern = r"(\d{2}\.\d{2}\.\d{4})\s*-\s*(\d{2}\.\d{2}\.\d{4})"
match = re.findall(date_range_pattern, cleaned)
if match:
matches["DATE_RANGE"] = match
# print("DATE_RANGE matches:", matches["DATE_RANGE"])
cleaned = re.sub(date_range_pattern, "[DATE_RANGE]", cleaned)
# Match für das Datum mit einer Zeitspanne ohne Start- und Enddatum
date_time_range_pattern = r"(\d{2}\.\d{2}\.\d{4})\s*(\d{2}:\d{2})\s*-\s*(\d{2}:\d{2})"
match = re.findall(date_time_range_pattern, cleaned)
if match:
matches["DATE_TIME_RANGE"] = match
# print("DATE_TIME_RANGE matches:", matches["DATE_TIME_RANGE"])
cleaned = re.sub(date_time_range_pattern, "[DATE_TIME_RANGE]", cleaned)
# Match für eine reine Zeitspanne ohne Datum
time_range_pattern = r"(\d{2}:\d{2})\s*-\s*(\d{2}:\d{2})"
match = re.findall(time_range_pattern, cleaned)
if match:
matches["TIME_RANGE"] = match
# print("TIME_RANGE matches:", matches["TIME_RANGE"])
cleaned = re.sub(time_range_pattern, "[TIME_RANGE]", cleaned)
# Match für Datum mit Zeitangabe
date_time_pattern = r"(\d{2}\.\d{2}\.\d{4})\s*(\d{2}:\d{2})"
match = re.findall(date_time_pattern, cleaned)
if match:
matches["DATE_TIME"] = match
# print("DATE_TIME matches:", matches["DATE_TIME"])
cleaned = re.sub(date_time_pattern, "[DATE_TIME]", cleaned)
date_pattern = r"(\d{2}\.\d{2}\.\d{4})"
match = re.findall(date_pattern, cleaned)
if match:
matches["DATE"] = match
# print("DATE matches:", matches["DATE"])
cleaned = re.sub(date_pattern, "[DATE]", cleaned)
time_pattern = r"(\d{2}:\d{2})"
match = re.findall(time_pattern, cleaned)
if match:
matches["TIME"] = match
# print("TIME matches:", matches["TIME"])
cleaned = re.sub(time_pattern, "[TIME]", cleaned)
event_schedules = []
# return date_time if only one found
if len(matches)==1:
key, value = next(iter(matches.items()))
event_schedules.append(convert_to_schedule(label=key,date_time=value[0]))
return event_schedules
for key, value in matches.items():
# print(f"{key}: {value}")
schedules = classify_date_time(date_times=value, label=key, text=cleaned)
if schedules:
event_schedules.extend(schedules)
if len(event_schedules)==1:
return event_schedules
print(event_schedules)
unique_schedules = []
for i, schedule in enumerate(event_schedules):
if any(schedule in other for j, other in enumerate(event_schedules) if
i != j):
continue
unique_schedules.append(schedule)
return unique_schedules
except Exception as ex:
print(ex)
# TEXTS = ["\n\nTermin für öffentliche Besichtigung\n=================================== \n\n07.01.2025\n\n * Am 07.01.2025\n* Von 18:00 bis 19:00 Uhr\n* Tasköprüstraße 10 (ehemalige Selgros-Markthalle)\n* Termin im Kalender speichern\n"]
for text in TEXTS:
print(text)
schedules = extract_schedules(text)
print("*" * 100)
print("EXTRACTED SCHEDULES: ")
print(schedules)
print("*" * 100)