|
import os |
|
import re |
|
from datetime import datetime |
|
|
|
import joblib |
|
import spacy |
|
from dotenv import load_dotenv |
|
from huggingface_hub import hf_hub_download, login |
|
from spacy import Language |
|
from spacy.tokenizer import Tokenizer |
|
from spacy.util import compile_suffix_regex, compile_infix_regex |
|
|
|
from src.resources.TEXTS import TEXTS |
|
from src.utils.Event import Schedule |
|
from src.utils.helpers import normalize_data |
|
from src.utils.markdown_processing.CustomMarkdownAnalyzer.MarkdownAnalyzer import MarkdownAnalyzer |
|
|
|
load_dotenv() |
|
token = os.getenv("HUGGING_FACE_SPACES_TOKEN") |
|
login(token=token) |
|
|
|
|
|
placeholder = { |
|
"DATE_RANGE_TIME_RANGE": "[DATE] [TIME] - [DATE] [TIME]", |
|
"DATE_RANGE": "[DATE] - [DATE]", |
|
"DATE_TIME_RANGE": "[DATE] [TIME] - [TIME]", |
|
"TIME_RANGE": "[TIME] - [TIME]", |
|
"DATE_TIME": "[DATE] [TIME]", |
|
"DATE": "[DATE]", |
|
"TIME": "[TIME]" |
|
} |
|
|
|
def convert_to_schedule(date_time, label): |
|
print("Converting ", date_time, label) |
|
try: |
|
if label == "DATE_RANGE_TIME_RANGE": |
|
return Schedule( |
|
start_date=datetime.strptime(date_time[0], "%d.%m.%Y").date(), |
|
end_date=datetime.strptime(date_time[2], "%d.%m.%Y").date(), |
|
start_time=datetime.strptime(date_time[1], "%H:%M").time(), |
|
end_time=datetime.strptime(date_time[3], "%H:%M").time(), |
|
admittance_time=None |
|
) |
|
|
|
if label == "DATE_RANGE": |
|
return Schedule( |
|
start_date=datetime.strptime(date_time[0], "%d.%m.%Y").date(), |
|
end_date=datetime.strptime(date_time[1], "%d.%m.%Y").date(), |
|
start_time=None, |
|
end_time=None, |
|
admittance_time=None |
|
) |
|
|
|
if label == "DATE_TIME_RANGE": |
|
return Schedule( |
|
start_date=datetime.strptime(date_time[0], "%d.%m.%Y").date(), |
|
end_date=None, |
|
start_time=datetime.strptime(date_time[1], "%H:%M").time(), |
|
end_time=datetime.strptime(date_time[2], "%H:%M").time(), |
|
admittance_time=None |
|
) |
|
|
|
if label == "TIME_RANGE": |
|
return Schedule( |
|
start_date=None, |
|
end_date=None, |
|
start_time=datetime.strptime(date_time[0], "%H:%M").time(), |
|
end_time=datetime.strptime(date_time[1], "%H:%M").time(), |
|
admittance_time=None |
|
) |
|
|
|
if label == "DATE_TIME": |
|
return Schedule( |
|
start_date=datetime.strptime(date_time[0], "%d.%m.%Y").date(), |
|
end_date=None, |
|
start_time=datetime.strptime(date_time[1], "%H:%M").time(), |
|
end_time=None, |
|
admittance_time=None |
|
) |
|
|
|
if label == "DATE": |
|
return Schedule( |
|
start_date=datetime.strptime(date_time, "%d.%m.%Y").date(), |
|
end_date=None, |
|
start_time=None, |
|
end_time=None, |
|
admittance_time=None |
|
) |
|
|
|
if label == "TIME": |
|
return Schedule( |
|
start_date=None, |
|
end_date=None, |
|
start_time=datetime.strptime(date_time, "%H:%M").time(), |
|
end_time=None, |
|
admittance_time=None |
|
) |
|
except Exception as e: |
|
print(e) |
|
return None |
|
|
|
def _load_classifier(repo_id, model_name): |
|
return joblib.load( |
|
hf_hub_download(repo_id=repo_id, filename=model_name + ".pkl") |
|
) |
|
|
|
def classify_date_time(date_times, label, text): |
|
|
|
segments = text.split(f"[{label}]") |
|
tokens = [] |
|
|
|
date_time_positions = [] |
|
for i, segment in enumerate(segments): |
|
tokens.extend(segment.split()) |
|
if i < len(date_times): |
|
tokens.append(placeholder.get(label, "ERROR")) |
|
date_time_positions.append(len(tokens)-1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
window_size = 5 |
|
event_date_total = 0 |
|
other_total = 0 |
|
|
|
|
|
schedules = [] |
|
for i, date_time in enumerate(date_times): |
|
|
|
start = max(0, date_time_positions[i] - (window_size - 1)) |
|
|
|
|
|
while start + window_size <= len(tokens): |
|
window = tokens[start:start + window_size] |
|
|
|
|
|
|
|
if label == "TIME": |
|
time_class = time_classifier(" ".join(window)) |
|
|
|
else: |
|
date_class = date_classifier(" ".join(window)) |
|
|
|
|
|
|
|
event_date_total += date_class.get('EVENT_DATE', 0) |
|
other_total += date_class.get('OTHER', 0) |
|
|
|
|
|
start += 1 |
|
|
|
|
|
if label == "TIME": |
|
pass |
|
else: |
|
|
|
|
|
if event_date_total > other_total: |
|
|
|
schedule = convert_to_schedule(date_time, label) |
|
schedules.append(schedule) |
|
|
|
|
|
return schedules |
|
|
|
try: |
|
date_classifier = _load_classifier("adojode/date_classifier", "date_classifier") |
|
time_classifier = _load_classifier("adojode/time_classifier", "time_classifier") |
|
except Exception as e: |
|
print("Error loading classifier models from hugging face: ", e) |
|
|
|
|
|
|
|
def extract_schedules(text): |
|
try: |
|
normalized = normalize_data(text) |
|
|
|
|
|
|
|
cleaned = re.sub(r"\*", " ", normalized) |
|
cleaned = re.sub(r"=", " ", cleaned) |
|
cleaned = re.sub(r"#", " ", cleaned) |
|
cleaned = re.sub(r"(-|—|–|bis)", "-", cleaned) |
|
cleaned = re.sub(r"(und|sowie)", "+", cleaned) |
|
|
|
|
|
|
|
cleaned = re.sub(r"(von|vom|am|um|ab)", " ", cleaned, flags=re.IGNORECASE) |
|
cleaned = re.sub(r",", " ", cleaned) |
|
cleaned = re.sub(r"\|", " ", cleaned) |
|
cleaned = re.sub(r"\s+", " ", cleaned) |
|
|
|
|
|
matches = {} |
|
|
|
|
|
date_range_time_range_pattern = r"(\d{2}\.\d{2}\.\d{4})\s*(\d{2}:\d{2})\s*-\s*(\d{2}\.\d{2}\.\d{4})\s*(\d{2}:\d{2})" |
|
match = re.findall(date_range_time_range_pattern, cleaned) |
|
if match: |
|
matches["DATE_RANGE_TIME_RANGE"] = match |
|
|
|
cleaned = re.sub(date_range_time_range_pattern, "[DATE_RANGE_TIME_RANGE]", cleaned) |
|
|
|
|
|
date_range_pattern = r"(\d{2}\.\d{2}\.\d{4})\s*-\s*(\d{2}\.\d{2}\.\d{4})" |
|
match = re.findall(date_range_pattern, cleaned) |
|
if match: |
|
matches["DATE_RANGE"] = match |
|
|
|
cleaned = re.sub(date_range_pattern, "[DATE_RANGE]", cleaned) |
|
|
|
|
|
date_time_range_pattern = r"(\d{2}\.\d{2}\.\d{4})\s*(\d{2}:\d{2})\s*-\s*(\d{2}:\d{2})" |
|
match = re.findall(date_time_range_pattern, cleaned) |
|
if match: |
|
matches["DATE_TIME_RANGE"] = match |
|
|
|
cleaned = re.sub(date_time_range_pattern, "[DATE_TIME_RANGE]", cleaned) |
|
|
|
|
|
time_range_pattern = r"(\d{2}:\d{2})\s*-\s*(\d{2}:\d{2})" |
|
match = re.findall(time_range_pattern, cleaned) |
|
if match: |
|
matches["TIME_RANGE"] = match |
|
|
|
cleaned = re.sub(time_range_pattern, "[TIME_RANGE]", cleaned) |
|
|
|
|
|
date_time_pattern = r"(\d{2}\.\d{2}\.\d{4})\s*(\d{2}:\d{2})" |
|
match = re.findall(date_time_pattern, cleaned) |
|
if match: |
|
matches["DATE_TIME"] = match |
|
|
|
cleaned = re.sub(date_time_pattern, "[DATE_TIME]", cleaned) |
|
|
|
date_pattern = r"(\d{2}\.\d{2}\.\d{4})" |
|
match = re.findall(date_pattern, cleaned) |
|
if match: |
|
matches["DATE"] = match |
|
|
|
cleaned = re.sub(date_pattern, "[DATE]", cleaned) |
|
|
|
time_pattern = r"(\d{2}:\d{2})" |
|
match = re.findall(time_pattern, cleaned) |
|
if match: |
|
matches["TIME"] = match |
|
|
|
cleaned = re.sub(time_pattern, "[TIME]", cleaned) |
|
|
|
|
|
event_schedules = [] |
|
|
|
|
|
if len(matches)==1: |
|
key, value = next(iter(matches.items())) |
|
|
|
event_schedules.append(convert_to_schedule(label=key,date_time=value[0])) |
|
return event_schedules |
|
|
|
|
|
for key, value in matches.items(): |
|
|
|
schedules = classify_date_time(date_times=value, label=key, text=cleaned) |
|
if schedules: |
|
event_schedules.extend(schedules) |
|
|
|
|
|
if len(event_schedules)==1: |
|
return event_schedules |
|
|
|
|
|
print(event_schedules) |
|
unique_schedules = [] |
|
for i, schedule in enumerate(event_schedules): |
|
if any(schedule in other for j, other in enumerate(event_schedules) if |
|
i != j): |
|
continue |
|
unique_schedules.append(schedule) |
|
return unique_schedules |
|
|
|
except Exception as ex: |
|
print(ex) |
|
|
|
|
|
|
|
|
|
|
|
for text in TEXTS: |
|
print(text) |
|
schedules = extract_schedules(text) |
|
print("*" * 100) |
|
print("EXTRACTED SCHEDULES: ") |
|
print(schedules) |
|
print("*" * 100) |