File size: 7,339 Bytes
c2e7c2a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
import numpy as np
import torch
import json
import pandas as pd
from tqdm import tqdm
from typing import List, Dict, Tuple, Set, Union, Optional
from langchain.docstore.document import Document
from langchain_community.vectorstores import FAISS
from langchain_community.vectorstores.faiss import DistanceStrategy
from langchain_core.embeddings.embeddings import Embeddings
from FlagEmbedding import BGEM3FlagModel
def setup_gpu_info() -> None:
print(f"Số lượng GPU khả dụng: {torch.cuda.device_count()}")
print(f"GPU hiện tại: {torch.cuda.current_device()}")
print(f"Tên GPU: {torch.cuda.get_device_name(0)}")
def load_model(model_name: str, use_fp16: bool = False) -> BGEM3FlagModel:
return BGEM3FlagModel(model_name, use_fp16=use_fp16)
def load_json_file(file_path: str) -> dict:
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
def load_jsonl_file(file_path: str) -> List[Dict]:
corpus = []
with open(file_path, "r", encoding="utf-8") as file:
for line in file:
data = json.loads(line.strip())
corpus.append(data)
return corpus
def extract_corpus_from_legal_documents(legal_data: dict) -> List[Dict]:
corpus = []
for document in legal_data:
for article in document['articles']:
chunk = {
"law_id": document['law_id'],
"article_id": article['article_id'],
"title": article['title'],
"text": article['title'] + '\n' + article['text']
}
corpus.append(chunk)
return corpus
def convert_corpus_to_documents(corpus: List[Dict[str, str]]) -> List[Document]:
documents = []
for i in tqdm(range(len(corpus)), desc="Converting corpus to documents"):
context = corpus[i]['text']
metadata = {
'law_id': corpus[i]['law_id'],
'article_id': corpus[i]['article_id'],
'title': corpus[i]['title']
}
documents.append(Document(page_content=context, metadata=metadata))
return documents
class CustomEmbedding(Embeddings):
"""Custom embedding class that uses the BGEM3FlagModel."""
def __init__(self, model: BGEM3FlagModel, batch_size: int = 1):
self.model = model
self.batch_size = batch_size
def embed_documents(self, texts: List[str]) -> List[List[float]]:
embeddings = []
for i in tqdm(range(0, len(texts), self.batch_size), desc="Embedding documents"):
batch_texts = texts[i:i+self.batch_size]
batch_embeddings = self._get_batch_embeddings(batch_texts)
embeddings.extend(batch_embeddings)
torch.cuda.empty_cache()
return np.vstack(embeddings)
def embed_query(self, text: str) -> List[float]:
embedding = self.model.encode(text, max_length=256)['dense_vecs']
return embedding
def _get_batch_embeddings(self, texts: List[str]) -> List[List[float]]:
with torch.no_grad():
outputs = self.model.encode(texts, batch_size=self.batch_size, max_length=2048)['dense_vecs']
batch_embeddings = outputs
del outputs
return batch_embeddings
class VectorDB:
"""Vector database for document retrieval."""
def __init__(
self,
documents: List[Document],
embedding: Embeddings,
vector_db=FAISS,
index_path: Optional[str] = None
) -> None:
self.vector_db = vector_db
self.embedding = embedding
self.index_path = index_path
self.db = self._build_db(documents)
def _build_db(self, documents: List[Document]):
if self.index_path:
db = self.vector_db.load_local(
self.index_path,
self.embedding,
allow_dangerous_deserialization=True
)
else:
db = self.vector_db.from_documents(
documents=documents,
embedding=self.embedding,
distance_strategy=DistanceStrategy.DOT_PRODUCT
)
return db
def get_retriever(self, search_type: str = "similarity", search_kwargs: dict = {"k": 10}):
retriever = self.db.as_retriever(search_type=search_type, search_kwargs=search_kwargs)
return retriever
def save_local(self, folder_path: str) -> None:
self.db.save_local(folder_path)
def process_sample(sample: dict, retriever) -> List[int]:
question = sample['question']
docs = retriever.invoke(question)
retrieved_article_full_ids = [
docs[i].metadata['law_id'] + "#" + docs[i].metadata['article_id']
for i in range(len(docs))
]
indexes = []
for article in sample['relevant_articles']:
article_full_id = article['law_id'] + "#" + article['article_id']
if article_full_id in retrieved_article_full_ids:
idx = retrieved_article_full_ids.index(article_full_id) + 1
indexes.append(idx)
else:
indexes.append(0)
return indexes
def calculate_metrics(all_indexes: List[List[int]], num_samples: int, selected_keys: Set[str]) -> Dict[str, float]:
count = [len(indexes) for indexes in all_indexes]
result = {}
for thres in [1, 3, 5, 10, 100]:
found = [[y for y in x if 0 < y <= thres] for x in all_indexes]
found_count = [len(x) for x in found]
acc = sum(1 for i in range(num_samples) if found_count[i] > 0) / num_samples
rec = sum(found_count[i] / count[i] for i in range(num_samples)) / num_samples
pre = sum(found_count[i] / thres for i in range(num_samples)) / num_samples
mrr = sum(1 / min(x) if x else 0 for x in found) / num_samples
if f"Accuracy@{thres}" in selected_keys:
result[f"Accuracy@{thres}"] = acc
if f"MRR@{thres}" in selected_keys:
result[f"MRR@{thres}"] = mrr
return result
def save_results(result: Dict[str, float], output_path: str) -> None:
with open(output_path, "w", encoding="utf-8") as f:
json.dump(result, f, indent=4, ensure_ascii=False)
print(f"Results saved to {output_path}")
def main():
setup_gpu_info()
model = load_model('AITeamVN/Vietnamese_Embedding', use_fp16=False)
samples = load_json_file('zalo_kaggle/train_question_answer.json')['items']
legal_data = load_json_file('zalo_kaggle/legal_corpus.json')
corpus = extract_corpus_from_legal_documents(legal_data)
documents = convert_corpus_to_documents(corpus)
embedding = CustomEmbedding(model, batch_size=1) # Increased batch size for efficiency time
vectordb = VectorDB(
documents=documents,
embedding=embedding,
vector_db=FAISS,
index_path=None
)
retriever = vectordb.get_retriever(search_type="similarity", search_kwargs={"k": 100})
all_indexes = []
for sample in tqdm(samples, desc="Processing samples"):
all_indexes.append(process_sample(sample, retriever))
selected_keys = {"Accuracy@1", "Accuracy@3", "Accuracy@5", "Accuracy@10", "MRR@10", "Accuracy@100"}
result = calculate_metrics(all_indexes, len(samples), selected_keys)
print(result)
save_results(result, "zalo_kaggle/Vietnamese_Embedding.json")
if __name__ == "__main__":
main() |