import os import re from datetime import datetime import joblib import spacy from dotenv import load_dotenv from huggingface_hub import hf_hub_download, login from spacy import Language from spacy.tokenizer import Tokenizer from spacy.util import compile_suffix_regex, compile_infix_regex from src.resources.TEXTS import TEXTS from src.utils.Event import Schedule from src.utils.helpers import normalize_data from src.utils.markdown_processing.CustomMarkdownAnalyzer.MarkdownAnalyzer import MarkdownAnalyzer load_dotenv() token = os.getenv("HUGGING_FACE_SPACES_TOKEN") login(token=token) placeholder = { "DATE_RANGE_TIME_RANGE": "[DATE] [TIME] - [DATE] [TIME]", "DATE_RANGE": "[DATE] - [DATE]", "DATE_TIME_RANGE": "[DATE] [TIME] - [TIME]", "TIME_RANGE": "[TIME] - [TIME]", "DATE_TIME": "[DATE] [TIME]", "DATE": "[DATE]", "TIME": "[TIME]" } def convert_to_schedule(date_time, label): print("Converting ", date_time, label) try: if label == "DATE_RANGE_TIME_RANGE": return Schedule( start_date=datetime.strptime(date_time[0], "%d.%m.%Y").date(), end_date=datetime.strptime(date_time[2], "%d.%m.%Y").date(), start_time=datetime.strptime(date_time[1], "%H:%M").time(), end_time=datetime.strptime(date_time[3], "%H:%M").time(), admittance_time=None ) if label == "DATE_RANGE": return Schedule( start_date=datetime.strptime(date_time[0], "%d.%m.%Y").date(), end_date=datetime.strptime(date_time[1], "%d.%m.%Y").date(), start_time=None, end_time=None, admittance_time=None ) if label == "DATE_TIME_RANGE": return Schedule( start_date=datetime.strptime(date_time[0], "%d.%m.%Y").date(), end_date=None, start_time=datetime.strptime(date_time[1], "%H:%M").time(), end_time=datetime.strptime(date_time[2], "%H:%M").time(), admittance_time=None ) if label == "TIME_RANGE": return Schedule( start_date=None, end_date=None, start_time=datetime.strptime(date_time[0], "%H:%M").time(), end_time=datetime.strptime(date_time[1], "%H:%M").time(), admittance_time=None ) if label == "DATE_TIME": return Schedule( start_date=datetime.strptime(date_time[0], "%d.%m.%Y").date(), end_date=None, start_time=datetime.strptime(date_time[1], "%H:%M").time(), end_time=None, admittance_time=None ) if label == "DATE": return Schedule( start_date=datetime.strptime(date_time, "%d.%m.%Y").date(), end_date=None, start_time=None, end_time=None, admittance_time=None ) if label == "TIME": return Schedule( start_date=None, end_date=None, start_time=datetime.strptime(date_time, "%H:%M").time(), end_time=None, admittance_time=None ) except Exception as e: print(e) return None def _load_classifier(repo_id, model_name): return joblib.load( hf_hub_download(repo_id=repo_id, filename=model_name + ".pkl") ) def classify_date_time(date_times, label, text): # Text anhand des Platzhalters [LABEL] in Segmente teilen segments = text.split(f"[{label}]") tokens = [] # print(date_times) date_time_positions = [] for i, segment in enumerate(segments): tokens.extend(segment.split()) # Segment als Token hinzufügen if i < len(date_times): # Falls noch Date-Times übrig sind tokens.append(placeholder.get(label, "ERROR")) # Date-Time als eigenes Token einfügen date_time_positions.append(len(tokens)-1) # print("TOKENS:", tokens) # print(date_time_positions) # print(len(date_time_positions)==len(date_times)) # sliding window classification window_size = 5 event_date_total = 0 other_total = 0 schedules = [] for i, date_time in enumerate(date_times): # Berechne den Start-Index für das Fenster start = max(0, date_time_positions[i] - (window_size - 1)) # Führe Klassifikation für jedes Fenster durch while start + window_size <= len(tokens): # Solange das Fenster in den Tokens bleibt window = tokens[start:start + window_size] # print(window) # Klassifikation durchführen if label == "TIME": time_class = time_classifier(" ".join(window)) # print(time_class) else: date_class = date_classifier(" ".join(window)) # print(date_class) # Aufaddieren der Werte event_date_total += date_class.get('EVENT_DATE', 0) other_total += date_class.get('OTHER', 0) # Fenster verschieben start += 1 # Rückgabe der Gesamtsummen if label == "TIME": pass else: # print("Gesamtsumme EVENT_DATE:", event_date_total) # print("Gesamtsumme OTHER:", other_total) if event_date_total > other_total: schedule = convert_to_schedule(date_time, label) schedules.append(schedule) # print(date_time) # print("EVENT DATE: ", schedule) return schedules try: date_classifier = _load_classifier("adojode/date_classifier", "date_classifier") time_classifier = _load_classifier("adojode/time_classifier", "time_classifier") except Exception as e: print("Error loading classifier models from hugging face: ", e) def extract_schedules(text): try: normalized = normalize_data(text) # print("*"*100) # print(normalized) # print("*"*100) cleaned = re.sub(r"\*", " ", normalized) cleaned = re.sub(r"=", " ", cleaned) cleaned = re.sub(r"#", " ", cleaned) cleaned = re.sub(r"(-|—|–|bis)", "-", cleaned) cleaned = re.sub(r"(und|sowie)", "+", cleaned) # cleaned = re.sub( r"\b(?:mo|di|mi|do|fr|sa|so|montag|dienstag|mittwoch|donnerstag|freitag|samstag|sonntag)(?:s?)\b", # " ", cleaned, flags=re.IGNORECASE) cleaned = re.sub(r"(von|vom|am|um|ab)", " ", cleaned, flags=re.IGNORECASE) cleaned = re.sub(r",", " ", cleaned) cleaned = re.sub(r"\|", " ", cleaned) cleaned = re.sub(r"\s+", " ", cleaned) matches = {} # Match für das Datum und die Zeit mit einer Zeitspanne date_range_time_range_pattern = r"(\d{2}\.\d{2}\.\d{4})\s*(\d{2}:\d{2})\s*-\s*(\d{2}\.\d{2}\.\d{4})\s*(\d{2}:\d{2})" match = re.findall(date_range_time_range_pattern, cleaned) if match: matches["DATE_RANGE_TIME_RANGE"] = match # print("DATE_RANGE_TIME_RANGE matches:", matches["DATE_RANGE_TIME_RANGE"]) cleaned = re.sub(date_range_time_range_pattern, "[DATE_RANGE_TIME_RANGE]", cleaned) # Match für das Datum mit einem Zeitraum ohne Zeitangabe date_range_pattern = r"(\d{2}\.\d{2}\.\d{4})\s*-\s*(\d{2}\.\d{2}\.\d{4})" match = re.findall(date_range_pattern, cleaned) if match: matches["DATE_RANGE"] = match # print("DATE_RANGE matches:", matches["DATE_RANGE"]) cleaned = re.sub(date_range_pattern, "[DATE_RANGE]", cleaned) # Match für das Datum mit einer Zeitspanne ohne Start- und Enddatum date_time_range_pattern = r"(\d{2}\.\d{2}\.\d{4})\s*(\d{2}:\d{2})\s*-\s*(\d{2}:\d{2})" match = re.findall(date_time_range_pattern, cleaned) if match: matches["DATE_TIME_RANGE"] = match # print("DATE_TIME_RANGE matches:", matches["DATE_TIME_RANGE"]) cleaned = re.sub(date_time_range_pattern, "[DATE_TIME_RANGE]", cleaned) # Match für eine reine Zeitspanne ohne Datum time_range_pattern = r"(\d{2}:\d{2})\s*-\s*(\d{2}:\d{2})" match = re.findall(time_range_pattern, cleaned) if match: matches["TIME_RANGE"] = match # print("TIME_RANGE matches:", matches["TIME_RANGE"]) cleaned = re.sub(time_range_pattern, "[TIME_RANGE]", cleaned) # Match für Datum mit Zeitangabe date_time_pattern = r"(\d{2}\.\d{2}\.\d{4})\s*(\d{2}:\d{2})" match = re.findall(date_time_pattern, cleaned) if match: matches["DATE_TIME"] = match # print("DATE_TIME matches:", matches["DATE_TIME"]) cleaned = re.sub(date_time_pattern, "[DATE_TIME]", cleaned) date_pattern = r"(\d{2}\.\d{2}\.\d{4})" match = re.findall(date_pattern, cleaned) if match: matches["DATE"] = match # print("DATE matches:", matches["DATE"]) cleaned = re.sub(date_pattern, "[DATE]", cleaned) time_pattern = r"(\d{2}:\d{2})" match = re.findall(time_pattern, cleaned) if match: matches["TIME"] = match # print("TIME matches:", matches["TIME"]) cleaned = re.sub(time_pattern, "[TIME]", cleaned) event_schedules = [] # return date_time if only one found if len(matches)==1: key, value = next(iter(matches.items())) event_schedules.append(convert_to_schedule(label=key,date_time=value[0])) return event_schedules for key, value in matches.items(): # print(f"{key}: {value}") schedules = classify_date_time(date_times=value, label=key, text=cleaned) if schedules: event_schedules.extend(schedules) if len(event_schedules)==1: return event_schedules print(event_schedules) unique_schedules = [] for i, schedule in enumerate(event_schedules): if any(schedule in other for j, other in enumerate(event_schedules) if i != j): continue unique_schedules.append(schedule) return unique_schedules except Exception as ex: print(ex) # TEXTS = ["\n\nTermin für öffentliche Besichtigung\n=================================== \n\n07.01.2025\n\n * Am 07.01.2025\n* Von 18:00 bis 19:00 Uhr\n* Tasköprüstraße 10 (ehemalige Selgros-Markthalle)\n* Termin im Kalender speichern\n"] for text in TEXTS: print(text) schedules = extract_schedules(text) print("*" * 100) print("EXTRACTED SCHEDULES: ") print(schedules) print("*" * 100)