|
import re |
|
import os |
|
from dotenv import load_dotenv |
|
|
|
from src.utils.Event import Schedule |
|
from huggingface_hub import hf_hub_download |
|
import joblib |
|
from huggingface_hub import login |
|
from datetime import datetime |
|
|
|
from src.utils.helpers import normalize_data |
|
|
|
load_dotenv() |
|
token = os.getenv("HUGGING_FACE_SPACES_TOKEN") |
|
login(token=token) |
|
|
|
|
|
|
|
|
|
|
|
class ScheduleExtractorV3: |
|
def __init__(self): |
|
try: |
|
self.date_classifier = self._load_classifier("adojode/date_classifier", "date_classifier") |
|
self.time_classifier = self._load_classifier("adojode/time_classifier", "time_classifier") |
|
except Exception as e: |
|
print("Error loading classifier models from hugging face: ", e) |
|
|
|
def _load_classifier(self,repo_id, model_name): |
|
return joblib.load( |
|
hf_hub_download(repo_id=repo_id, filename=model_name + ".pkl") |
|
) |
|
|
|
def extract(self, text): |
|
try: |
|
normalized = normalize_data(text) |
|
|
|
|
|
|
|
cleaned = re.sub(r"\*", " ", normalized) |
|
cleaned = re.sub(r"=", " ", cleaned) |
|
cleaned = re.sub(r"#", " ", cleaned) |
|
cleaned = re.sub(r"(-|—|–|bis)", "-", cleaned) |
|
cleaned = re.sub(r"(und|sowie)", "+", cleaned) |
|
|
|
|
|
|
|
cleaned = re.sub(r"(von|vom|am|um|ab)", " ", cleaned, flags=re.IGNORECASE) |
|
cleaned = re.sub(r",", " ", cleaned) |
|
cleaned = re.sub(r"\|", " ", cleaned) |
|
cleaned = re.sub(r"\s+", " ", cleaned) |
|
|
|
matches = {} |
|
|
|
|
|
date_range_time_range_pattern = r"(\d{2}\.\d{2}\.\d{4})\s*(\d{2}:\d{2})\s*-\s*(\d{2}\.\d{2}\.\d{4})\s*(\d{2}:\d{2})" |
|
match = re.findall(date_range_time_range_pattern, cleaned) |
|
if match: |
|
matches["DATE_RANGE_TIME_RANGE"] = match |
|
|
|
cleaned = re.sub(date_range_time_range_pattern, "[DATE_RANGE_TIME_RANGE]", cleaned) |
|
|
|
|
|
date_range_pattern = r"(\d{2}\.\d{2}\.\d{4})\s*-\s*(\d{2}\.\d{2}\.\d{4})" |
|
match = re.findall(date_range_pattern, cleaned) |
|
if match: |
|
matches["DATE_RANGE"] = match |
|
|
|
cleaned = re.sub(date_range_pattern, "[DATE_RANGE]", cleaned) |
|
|
|
|
|
date_time_range_pattern = r"(\d{2}\.\d{2}\.\d{4})\s*(\d{2}:\d{2})\s*-\s*(\d{2}:\d{2})" |
|
match = re.findall(date_time_range_pattern, cleaned) |
|
if match: |
|
matches["DATE_TIME_RANGE"] = match |
|
|
|
cleaned = re.sub(date_time_range_pattern, "[DATE_TIME_RANGE]", cleaned) |
|
|
|
|
|
time_range_pattern = r"(\d{2}:\d{2})\s*-\s*(\d{2}:\d{2})" |
|
match = re.findall(time_range_pattern, cleaned) |
|
if match: |
|
matches["TIME_RANGE"] = match |
|
|
|
cleaned = re.sub(time_range_pattern, "[TIME_RANGE]", cleaned) |
|
|
|
|
|
date_time_pattern = r"(\d{2}\.\d{2}\.\d{4})\s*(\d{2}:\d{2})" |
|
match = re.findall(date_time_pattern, cleaned) |
|
if match: |
|
matches["DATE_TIME"] = match |
|
|
|
cleaned = re.sub(date_time_pattern, "[DATE_TIME]", cleaned) |
|
|
|
date_pattern = r"(\d{2}\.\d{2}\.\d{4})" |
|
match = re.findall(date_pattern, cleaned) |
|
if match: |
|
matches["DATE"] = match |
|
|
|
cleaned = re.sub(date_pattern, "[DATE]", cleaned) |
|
|
|
time_pattern = r"(\d{2}:\d{2})" |
|
match = re.findall(time_pattern, cleaned) |
|
if match: |
|
matches["TIME"] = match |
|
|
|
cleaned = re.sub(time_pattern, "[TIME]", cleaned) |
|
|
|
event_schedules = [] |
|
|
|
|
|
if len(matches) == 1: |
|
key, value = next(iter(matches.items())) |
|
|
|
event_schedules.append(self.convert_to_schedule(label=key, date_time=value[0])) |
|
return event_schedules |
|
|
|
for key, value in matches.items(): |
|
|
|
schedules = self.classify_date_time(date_times=value, label=key, text=cleaned) |
|
if schedules: |
|
event_schedules.extend(schedules) |
|
|
|
if len(event_schedules) == 1: |
|
return event_schedules |
|
|
|
|
|
unique_schedules = [] |
|
for i, schedule in enumerate(event_schedules): |
|
if any(schedule in other for j, other in enumerate(event_schedules) if |
|
i != j): |
|
continue |
|
unique_schedules.append(schedule) |
|
|
|
if len(unique_schedules) == 2: |
|
first, second = unique_schedules |
|
print("Versuche Schedules zu mergen....", first,second) |
|
if any(not e for e in [first.start_date, second.start_date]) and any(not e for e in [first.end_date, second.end_date]) and any(not e for e in [first.start_time, second.start_time]) and any(not e for e in [first.end_time, second.end_time]) and any(not e for e in [first.admittance_time, second.admittance_time]): |
|
merged = Schedule( |
|
start_date=first.start_date or second.start_date, |
|
end_date=first.end_date or second.end_date, |
|
start_time=first.start_time or second.start_time, |
|
end_time=first.end_time or second.end_time, |
|
admittance_time=first.admittance_time or second.admittance_time |
|
) |
|
print("Merged:", merged) |
|
return [merged] |
|
return unique_schedules |
|
|
|
except Exception as ex: |
|
print(ex) |
|
|
|
def classify_date_time(self, date_times, label, text): |
|
|
|
segments = text.split(f"[{label}]") |
|
tokens = [] |
|
|
|
date_time_positions = [] |
|
for i, segment in enumerate(segments): |
|
tokens.extend(segment.split()) |
|
if i < len(date_times): |
|
tokens.append(placeholder.get(label, "ERROR")) |
|
date_time_positions.append(len(tokens) - 1) |
|
|
|
|
|
|
|
window_size = 5 |
|
event_date_total = 0 |
|
other_total = 0 |
|
|
|
schedules = [] |
|
for i, date_time in enumerate(date_times): |
|
|
|
start = max(0, date_time_positions[i] - (window_size - 1)) |
|
|
|
|
|
while start + window_size <= len(tokens): |
|
window = tokens[start:start + window_size] |
|
|
|
|
|
|
|
if label == "TIME": |
|
time_class = self.time_classifier(" ".join(window)) |
|
|
|
else: |
|
date_class = self.date_classifier(" ".join(window)) |
|
|
|
|
|
|
|
event_date_total += date_class.get('EVENT_DATE', 0) |
|
other_total += date_class.get('OTHER', 0) |
|
|
|
|
|
start += 1 |
|
|
|
|
|
if label == "TIME": |
|
pass |
|
else: |
|
|
|
|
|
if event_date_total > other_total: |
|
schedule = self.convert_to_schedule(date_time, label) |
|
schedules.append(schedule) |
|
|
|
|
|
return schedules |
|
|
|
def convert_to_schedule(self,date_time, label): |
|
try: |
|
if label == "DATE_RANGE_TIME_RANGE": |
|
return Schedule( |
|
start_date=datetime.strptime(date_time[0], "%d.%m.%Y").date(), |
|
end_date=datetime.strptime(date_time[2], "%d.%m.%Y").date(), |
|
start_time=datetime.strptime(date_time[1], "%H:%M").time(), |
|
end_time=datetime.strptime(date_time[3], "%H:%M").time(), |
|
admittance_time=None |
|
) |
|
|
|
if label == "DATE_RANGE": |
|
return Schedule( |
|
start_date=datetime.strptime(date_time[0], "%d.%m.%Y").date(), |
|
end_date=datetime.strptime(date_time[1], "%d.%m.%Y").date(), |
|
start_time=None, |
|
end_time=None, |
|
admittance_time=None |
|
) |
|
|
|
if label == "DATE_TIME_RANGE": |
|
return Schedule( |
|
start_date=datetime.strptime(date_time[0], "%d.%m.%Y").date(), |
|
end_date=None, |
|
start_time=datetime.strptime(date_time[1], "%H:%M").time(), |
|
end_time=datetime.strptime(date_time[2], "%H:%M").time(), |
|
admittance_time=None |
|
) |
|
|
|
if label == "TIME_RANGE": |
|
return Schedule( |
|
start_date=None, |
|
end_date=None, |
|
start_time=datetime.strptime(date_time[0], "%H:%M").time(), |
|
end_time=datetime.strptime(date_time[1], "%H:%M").time(), |
|
admittance_time=None |
|
) |
|
|
|
if label == "DATE_TIME": |
|
return Schedule( |
|
start_date=datetime.strptime(date_time[0], "%d.%m.%Y").date(), |
|
end_date=None, |
|
start_time=datetime.strptime(date_time[1], "%H:%M").time(), |
|
end_time=None, |
|
admittance_time=None |
|
) |
|
|
|
if label == "DATE": |
|
return Schedule( |
|
start_date=datetime.strptime(date_time, "%d.%m.%Y").date(), |
|
end_date=None, |
|
start_time=None, |
|
end_time=None, |
|
admittance_time=None |
|
) |
|
|
|
if label == "TIME": |
|
return Schedule( |
|
start_date=None, |
|
end_date=None, |
|
start_time=datetime.strptime(date_time, "%H:%M").time(), |
|
end_time=None, |
|
admittance_time=None |
|
) |
|
except Exception as e: |
|
print(e) |
|
return None |
|
|
|
placeholder = { |
|
"DATE_RANGE_TIME_RANGE": "[DATE] [TIME] - [DATE] [TIME]", |
|
"DATE_RANGE": "[DATE] - [DATE]", |
|
"DATE_TIME_RANGE": "[DATE] [TIME] - [TIME]", |
|
"TIME_RANGE": "[TIME] - [TIME]", |
|
"DATE_TIME": "[DATE] [TIME]", |
|
"DATE": "[DATE]", |
|
"TIME": "[TIME]" |
|
} |
|
|
|
|
|
|
|
|