|
|
|
|
|
import gradio as gr |
|
import json |
|
import pandas as pd |
|
from transformers import AutoTokenizer, AutoModelForQuestionAnswering |
|
import torch |
|
import re |
|
from pymorphy2 import MorphAnalyzer |
|
import logging |
|
|
|
|
|
logging.basicConfig( |
|
filename="dialog_logs.log", |
|
level=logging.INFO, |
|
format="%(asctime)s - %(message)s", |
|
datefmt="%Y-%m-%d %H:%M:%S" |
|
) |
|
|
|
|
|
def log_dialog(user_input, model_answer): |
|
if user_input.strip() == "" or model_answer.strip() == "": |
|
return |
|
logging.info(f"User: {user_input}") |
|
logging.info(f"Model: {model_answer}") |
|
|
|
|
|
model_name = "timpal0l/mdeberta-v3-base-squad2" |
|
tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
model = AutoModelForQuestionAnswering.from_pretrained(model_name) |
|
|
|
|
|
def load_questions_from_json(file_path="questions.json"): |
|
with open(file_path, "r", encoding="utf-8") as file: |
|
return json.load(file) |
|
|
|
|
|
def find_answer_in_json(questions_data, message): |
|
for item in questions_data: |
|
if message.lower() in item["question"].lower(): |
|
return item["answer"] |
|
return None |
|
|
|
|
|
def load_real_estate_data(file_path="real_estate.csv"): |
|
try: |
|
data = pd.read_csv(file_path, sep=";", encoding="utf-8") |
|
if data.empty: |
|
raise ValueError("Файл данных пустой или поврежден.") |
|
return data |
|
except Exception as e: |
|
print(f"Ошибка при загрузке файла данных: {e}") |
|
return pd.DataFrame(columns=["item_id", "address", "metro", "met_range", "price", "description", "type", "area", "RENT"]) |
|
|
|
|
|
morph = MorphAnalyzer() |
|
base_types = ["офис", "квартира", "апартаменты", "свободное назначение", "студия", "дом", "ОСЗ"] |
|
|
|
|
|
def analyze_query(query): |
|
query = query.lower() |
|
filters = {} |
|
words = query.split() |
|
lemmas = [morph.parse(word)[0].normal_form for word in words] |
|
|
|
|
|
types_found = [] |
|
for base_type in base_types: |
|
if any(morph.parse(lemma)[0].normal_form == base_type for lemma in lemmas): |
|
types_found.append(base_type) |
|
if types_found: |
|
filters["type"] = types_found |
|
|
|
|
|
if any(word in query for word in ["аренда", "снять", "аренду", "арендовать", "сниму"]): |
|
filters["rent"] = "Аренда" |
|
elif any(word in query for word in ["продажа", "купить", "покупке", "покупки", "продаю", "продажи"]): |
|
filters["rent"] = "Продажа" |
|
|
|
|
|
def normalize_number(text): |
|
match = re.search(r"(\d+[\s]*[.,]?\d*)\s*(млн|тр|т\.р|тыс|тысруб|тыс\.руб|тр|т\.р.|тыр|тыр.|тыщ|тыш|тысяч|млнруб|млн\.руб|мн|М|миллионов|милионов|лямов|лимонов)?", text) |
|
if not match: |
|
return None |
|
number = float(match.group(1).replace(" ", "").replace(",", ".")) |
|
unit = match.group(2) |
|
if unit in ["млн", "м", "млнруб", "млн.руб", "мн", "М", "миллионов", "милионов", "лямов", "лимонов"]: |
|
return int(number * 1_000_000) |
|
elif unit in ["тр", "т.р", "т.р.", "тыс", "тысруб", "тыс.руб", "тыр", "тыр.", "тыщ", "тыш", "тысяч"]: |
|
return int(number * 1_000) |
|
else: |
|
return int(number) |
|
|
|
|
|
price_keywords = ["руб", "р.", "рублей", "рубля"] |
|
price_match = re.findall(rf"(до|<|>\s*)(\d+[\s]*[.,]?\d*)\s*(млн|тр|т\.р|тыс|тысруб|тыс\.руб|тр|т\.р.|тыр|тыр.|тыщ|тыш|тысяч|млнруб|млн\.руб|мн|М|миллионов|милионов|лямов|лимонов)?\s*({'|'.join(price_keywords)})\b", query) |
|
if price_match: |
|
operator = price_match[0][0] |
|
raw_price = price_match[0][1] + (price_match[0][2] or "") |
|
normalized_price = normalize_number(raw_price) |
|
if normalized_price is not None: |
|
if operator in ["до", "<"]: |
|
filters["price_max"] = normalized_price |
|
elif operator in ["от", ">"]: |
|
filters["price_min"] = normalized_price |
|
|
|
|
|
area_keywords = ["квм", "кв.м", "кв метров", "кв м", "кв.м.", "квадратов", "квадрата", "м2", "метров", "м²"] |
|
area_range_match_1 = re.findall(rf"(\d+)\s*-\s*(\d+)\s*({'|'.join(area_keywords)})", query) |
|
area_range_match_2 = re.findall(rf"(\d+)\s*[.\s]+\s*(\d+)\s*({'|'.join(area_keywords)})", query) |
|
if area_range_match_1: |
|
filters["area_min"] = int(area_range_match_1[0][0]) |
|
filters["area_max"] = int(area_range_match_1[0][1]) |
|
elif area_range_match_2: |
|
filters["area_min"] = int(area_range_match_2[0][0]) |
|
filters["area_max"] = int(area_range_match_2[0][1]) |
|
else: |
|
area_min_match = re.findall(rf"\b(от)\s*(\d+)\s*({'|'.join(area_keywords)})", query) |
|
area_max_match = re.findall(rf"\b(до)\s*(\d+)\s*({'|'.join(area_keywords)})", query) |
|
if area_min_match: |
|
filters["area_min"] = int(area_min_match[0][1]) |
|
if area_max_match: |
|
filters["area_max"] = int(area_max_match[0][1]) |
|
|
|
return filters |
|
|
|
|
|
def search_real_estate(dataframe, filters): |
|
if not filters: |
|
return [] |
|
|
|
mask = pd.Series([True] * len(dataframe)) |
|
|
|
|
|
if "type" in filters: |
|
mask &= dataframe["type"].str.lower().isin(filters["type"]) |
|
|
|
|
|
if "rent" in filters: |
|
mask &= dataframe["RENT"].str.lower() == filters["rent"].lower() |
|
|
|
|
|
if "price_min" in filters and "price_max" in filters: |
|
mask &= (dataframe["price"] >= filters["price_min"]) & (dataframe["price"] <= filters["price_max"]) |
|
elif "price_min" in filters: |
|
mask &= dataframe["price"] >= filters["price_min"] |
|
elif "price_max" in filters: |
|
mask &= dataframe["price"] <= filters["price_max"] |
|
|
|
|
|
if "area_min" in filters and "area_max" in filters: |
|
mask &= (dataframe["area"] >= filters["area_min"]) & (dataframe["area"] <= filters["area_max"]) |
|
elif "area_min" in filters: |
|
mask &= dataframe["area"] >= filters["area_min"] |
|
elif "area_max" in filters: |
|
mask &= dataframe["area"] <= filters["area_max"] |
|
|
|
results = dataframe[mask] |
|
if not results.empty: |
|
return results.to_dict(orient="records") |
|
return [] |
|
|
|
|
|
def create_context(dataframe, filters): |
|
if "type" in filters and "rent" in filters: |
|
relevant_data = dataframe[ |
|
(dataframe["type"].str.lower().isin(filters["type"])) & |
|
(dataframe["RENT"].str.lower() == filters["rent"].lower()) |
|
] |
|
else: |
|
relevant_data = dataframe |
|
|
|
context = " ".join(relevant_data["description"].dropna().tolist()) + " " + \ |
|
" ".join(relevant_data["type"].dropna().tolist()) + " " + \ |
|
" ".join(relevant_data["address"].dropna().tolist()) |
|
return context |
|
|
|
|
|
def generate_answer(question, context): |
|
inputs = tokenizer(question, context, return_tensors="pt", truncation=True, padding=True) |
|
with torch.no_grad(): |
|
outputs = model(**inputs) |
|
answer_start = torch.argmax(outputs.start_logits) |
|
answer_end = torch.argmax(outputs.end_logits) + 1 |
|
tokens = tokenizer.convert_ids_to_tokens(inputs["input_ids"][0]) |
|
answer = tokenizer.convert_tokens_to_string(tokens[answer_start:answer_end]) |
|
if answer.strip() == "[CLS]" or not answer.strip(): |
|
return "Не удалось найти точный ответ." |
|
return answer |
|
|
|
|
|
|
|
def predict(message, history=None): |
|
if history is None: |
|
history = [] |
|
|
|
logging.info(f"Получено сообщение: {message}") |
|
logging.info(f"История диалога: {history}") |
|
|
|
|
|
questions_data = load_questions_from_json() |
|
real_estate_data = load_real_estate_data() |
|
|
|
|
|
json_answer = find_answer_in_json(questions_data, message) |
|
if json_answer: |
|
log_dialog(message, json_answer) |
|
return [{"role": "assistant", "content": json_answer}] |
|
|
|
|
|
filters = analyze_query(message) |
|
|
|
|
|
filtered_results = search_real_estate(real_estate_data, filters) |
|
if filtered_results: |
|
response = "" |
|
for result in filtered_results: |
|
response += ( |
|
f"ID: {result.get('item_id', 'Не указано')}\n" |
|
f"Тип: {result.get('type', 'Не указано')}\n" |
|
f"Адрес: {result.get('address', 'Не указано')}\n" |
|
f"Цена: {result.get('price', 'Не указано')} руб.\n" |
|
f"Площадь: {result.get('area', 'Не указано')} м²\n" |
|
f"Метро: {result.get('metro', 'Не указано')}\n" |
|
f"До метро: {result.get('met_range', 'Не указано')}\n" |
|
f"Описание: {result.get('description', 'Не указано')}\n" |
|
f"Фото: {result.get('foto', 'Не указано')}\n" |
|
f"Записаться на просмотр: {result.get('order', 'Не указано')}\n" |
|
f"---\n" |
|
) |
|
log_dialog(message, response.strip()) |
|
return [{"role": "assistant", "content": response.strip()}] |
|
|
|
|
|
context = create_context(real_estate_data, filters) |
|
model_answer = get_model_response(message, context) |
|
log_dialog(message, model_answer) |
|
return [{"role": "assistant", "content": model_answer}] |
|
|
|
|
|
demo = gr.ChatInterface( |
|
fn=predict, |
|
title="ИИ-ассистент по недвижимости", |
|
description="Задайте вопрос о недвижимости, и я помогу вам найти подходящий объект!", |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |