Spaces:
Sleeping
Sleeping
File size: 5,106 Bytes
00887f1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
from transformers import AutoTokenizer
from rank_bm25 import BM25Okapi
import json
import re
tokenizer = AutoTokenizer.from_pretrained("deepvk/USER-bge-m3")
with open('search.json', 'r', encoding='utf-8') as file:
data = json.load(file)
def prepare_data(data=data):
for item in data:
corpus = [item['name'].lower()] + [x.lower() for x in item['synonyms']]
corpus = " ".join(corpus)
item['corpus'] = corpus
item['tokenized_corpus'] = tokenizer.tokenize(corpus)
return data
def clean_text(text):
"""
Очищает текст от специальных символов, чтобы можно было искать по содержимому.
"""
return re.sub(r'[^\w\s]', '', text).lower()
def search_bm25(query, data):
"""
Выполняет поиск по запросу с использованием BM25 + токенизатора.
query: строка, поисковый запрос.
data: список словарей, содержащих информацию о каждом элементе (name, id, synonyms, tokenized_corpus).
return: список словарей с уникальными name, id и score.
"""
tokenized_corpus = [item['tokenized_corpus'] for item in data]
bm25 = BM25Okapi(tokenized_corpus)
tokenized_query = tokenizer.tokenize(query)
doc_scores = bm25.get_scores(tokenized_query)
results = []
for idx in reversed(doc_scores.argsort()):
item = data[idx]
if item['name'] not in [result['name'] for result in results] and doc_scores[idx] > 0:
results.append({
'name': item['name'],
'id': item['id'],
'score': float(doc_scores[idx])
})
return results
def search_exact(query, data):
"""
Выполняет точный поиск подстроки в 'name' и 'synonyms'.
Слово либо является началом слова, либо целым словом.
"""
results = []
cleaned_query = clean_text(query)
query_regex = re.compile(r'\b' + re.escape(cleaned_query) + r'\b|\b' + re.escape(cleaned_query)) # Выбираем только целые слова или начала слов (чтобы не искать подстроки)
# Сначала ищем подстроку в 'name'
for item in data:
cleaned_name = clean_text(item['name'])
if query_regex.search(cleaned_name):
results.append({
'name': item['name'],
'id': item['id'],
'score': 1 # В данном случае score не важен, но можно оставить
})
# Затем ищем подстроку в 'synonyms'
for item in data:
for synonym in item['synonyms']:
cleaned_synonym = clean_text(synonym)
if query_regex.search(cleaned_synonym):
if not any(res['id'] == item['id'] for res in results): # Избегаем дублирования
results.append({
'name': item['name'],
'id': item['id'],
'score': 1
})
break # Достаточно найти одно совпадение среди синонимов
return results
#TODO: Добавить поиск подкатегорий
def merge_results(results1, results2):
"""
Объединяет два списка результатов с чередованием элементов, избегая дубликатов.
results1: первый список результатов (список словарей с ключами 'name', 'id', 'score')
results2: второй список результатов (список словарей с ключами 'name', 'id', 'score')
return: объединённый список результатов
"""
merged_results = []
seen_ids = set() # Набор для отслеживания уникальных id
# Чередуем элементы из двух списков
for res1, res2 in zip(results1, results2):
# Добавляем элемент из первого списка, если его id ещё не было
if res1['id'] not in seen_ids:
merged_results.append(res1)
seen_ids.add(res1['id'])
# Добавляем элемент из второго списка, если его id ещё не было
if res2['id'] not in seen_ids:
merged_results.append(res2)
seen_ids.add(res2['id'])
for res in results1[len(results2):]:
if res['id'] not in seen_ids:
merged_results.append(res)
seen_ids.add(res['id'])
for res in results2[len(results1):]:
if res['id'] not in seen_ids:
merged_results.append(res)
seen_ids.add(res['id'])
return merged_results |