budu_search_new / search.py
etadevosyan's picture
First commit
00887f1
raw
history blame contribute delete
5.11 kB
from transformers import AutoTokenizer
from rank_bm25 import BM25Okapi
import json
import re
tokenizer = AutoTokenizer.from_pretrained("deepvk/USER-bge-m3")
with open('search.json', 'r', encoding='utf-8') as file:
data = json.load(file)
def prepare_data(data=data):
for item in data:
corpus = [item['name'].lower()] + [x.lower() for x in item['synonyms']]
corpus = " ".join(corpus)
item['corpus'] = corpus
item['tokenized_corpus'] = tokenizer.tokenize(corpus)
return data
def clean_text(text):
"""
Очищает текст от специальных символов, чтобы можно было искать по содержимому.
"""
return re.sub(r'[^\w\s]', '', text).lower()
def search_bm25(query, data):
"""
Выполняет поиск по запросу с использованием BM25 + токенизатора.
query: строка, поисковый запрос.
data: список словарей, содержащих информацию о каждом элементе (name, id, synonyms, tokenized_corpus).
return: список словарей с уникальными name, id и score.
"""
tokenized_corpus = [item['tokenized_corpus'] for item in data]
bm25 = BM25Okapi(tokenized_corpus)
tokenized_query = tokenizer.tokenize(query)
doc_scores = bm25.get_scores(tokenized_query)
results = []
for idx in reversed(doc_scores.argsort()):
item = data[idx]
if item['name'] not in [result['name'] for result in results] and doc_scores[idx] > 0:
results.append({
'name': item['name'],
'id': item['id'],
'score': float(doc_scores[idx])
})
return results
def search_exact(query, data):
"""
Выполняет точный поиск подстроки в 'name' и 'synonyms'.
Слово либо является началом слова, либо целым словом.
"""
results = []
cleaned_query = clean_text(query)
query_regex = re.compile(r'\b' + re.escape(cleaned_query) + r'\b|\b' + re.escape(cleaned_query)) # Выбираем только целые слова или начала слов (чтобы не искать подстроки)
# Сначала ищем подстроку в 'name'
for item in data:
cleaned_name = clean_text(item['name'])
if query_regex.search(cleaned_name):
results.append({
'name': item['name'],
'id': item['id'],
'score': 1 # В данном случае score не важен, но можно оставить
})
# Затем ищем подстроку в 'synonyms'
for item in data:
for synonym in item['synonyms']:
cleaned_synonym = clean_text(synonym)
if query_regex.search(cleaned_synonym):
if not any(res['id'] == item['id'] for res in results): # Избегаем дублирования
results.append({
'name': item['name'],
'id': item['id'],
'score': 1
})
break # Достаточно найти одно совпадение среди синонимов
return results
#TODO: Добавить поиск подкатегорий
def merge_results(results1, results2):
"""
Объединяет два списка результатов с чередованием элементов, избегая дубликатов.
results1: первый список результатов (список словарей с ключами 'name', 'id', 'score')
results2: второй список результатов (список словарей с ключами 'name', 'id', 'score')
return: объединённый список результатов
"""
merged_results = []
seen_ids = set() # Набор для отслеживания уникальных id
# Чередуем элементы из двух списков
for res1, res2 in zip(results1, results2):
# Добавляем элемент из первого списка, если его id ещё не было
if res1['id'] not in seen_ids:
merged_results.append(res1)
seen_ids.add(res1['id'])
# Добавляем элемент из второго списка, если его id ещё не было
if res2['id'] not in seen_ids:
merged_results.append(res2)
seen_ids.add(res2['id'])
for res in results1[len(results2):]:
if res['id'] not in seen_ids:
merged_results.append(res)
seen_ids.add(res['id'])
for res in results2[len(results1):]:
if res['id'] not in seen_ids:
merged_results.append(res)
seen_ids.add(res['id'])
return merged_results