Spaces:
Sleeping
Sleeping
from transformers import AutoTokenizer | |
from rank_bm25 import BM25Okapi | |
import json | |
import re | |
tokenizer = AutoTokenizer.from_pretrained("deepvk/USER-bge-m3") | |
with open('search.json', 'r', encoding='utf-8') as file: | |
data = json.load(file) | |
def prepare_data(data=data): | |
for item in data: | |
corpus = [item['name'].lower()] + [x.lower() for x in item['synonyms']] | |
corpus = " ".join(corpus) | |
item['corpus'] = corpus | |
item['tokenized_corpus'] = tokenizer.tokenize(corpus) | |
return data | |
def clean_text(text): | |
""" | |
Очищает текст от специальных символов, чтобы можно было искать по содержимому. | |
""" | |
return re.sub(r'[^\w\s]', '', text).lower() | |
def search_bm25(query, data): | |
""" | |
Выполняет поиск по запросу с использованием BM25 + токенизатора. | |
query: строка, поисковый запрос. | |
data: список словарей, содержащих информацию о каждом элементе (name, id, synonyms, tokenized_corpus). | |
return: список словарей с уникальными name, id и score. | |
""" | |
tokenized_corpus = [item['tokenized_corpus'] for item in data] | |
bm25 = BM25Okapi(tokenized_corpus) | |
tokenized_query = tokenizer.tokenize(query) | |
doc_scores = bm25.get_scores(tokenized_query) | |
results = [] | |
for idx in reversed(doc_scores.argsort()): | |
item = data[idx] | |
if item['name'] not in [result['name'] for result in results] and doc_scores[idx] > 0: | |
results.append({ | |
'name': item['name'], | |
'id': item['id'], | |
'score': float(doc_scores[idx]) | |
}) | |
return results | |
def search_exact(query, data): | |
""" | |
Выполняет точный поиск подстроки в 'name' и 'synonyms'. | |
Слово либо является началом слова, либо целым словом. | |
""" | |
results = [] | |
cleaned_query = clean_text(query) | |
query_regex = re.compile(r'\b' + re.escape(cleaned_query) + r'\b|\b' + re.escape(cleaned_query)) # Выбираем только целые слова или начала слов (чтобы не искать подстроки) | |
# Сначала ищем подстроку в 'name' | |
for item in data: | |
cleaned_name = clean_text(item['name']) | |
if query_regex.search(cleaned_name): | |
results.append({ | |
'name': item['name'], | |
'id': item['id'], | |
'score': 1 # В данном случае score не важен, но можно оставить | |
}) | |
# Затем ищем подстроку в 'synonyms' | |
for item in data: | |
for synonym in item['synonyms']: | |
cleaned_synonym = clean_text(synonym) | |
if query_regex.search(cleaned_synonym): | |
if not any(res['id'] == item['id'] for res in results): # Избегаем дублирования | |
results.append({ | |
'name': item['name'], | |
'id': item['id'], | |
'score': 1 | |
}) | |
break # Достаточно найти одно совпадение среди синонимов | |
return results | |
#TODO: Добавить поиск подкатегорий | |
def merge_results(results1, results2): | |
""" | |
Объединяет два списка результатов с чередованием элементов, избегая дубликатов. | |
results1: первый список результатов (список словарей с ключами 'name', 'id', 'score') | |
results2: второй список результатов (список словарей с ключами 'name', 'id', 'score') | |
return: объединённый список результатов | |
""" | |
merged_results = [] | |
seen_ids = set() # Набор для отслеживания уникальных id | |
# Чередуем элементы из двух списков | |
for res1, res2 in zip(results1, results2): | |
# Добавляем элемент из первого списка, если его id ещё не было | |
if res1['id'] not in seen_ids: | |
merged_results.append(res1) | |
seen_ids.add(res1['id']) | |
# Добавляем элемент из второго списка, если его id ещё не было | |
if res2['id'] not in seen_ids: | |
merged_results.append(res2) | |
seen_ids.add(res2['id']) | |
for res in results1[len(results2):]: | |
if res['id'] not in seen_ids: | |
merged_results.append(res) | |
seen_ids.add(res['id']) | |
for res in results2[len(results1):]: | |
if res['id'] not in seen_ids: | |
merged_results.append(res) | |
seen_ids.add(res['id']) | |
return merged_results |