File size: 11,663 Bytes
7d32a43
a5f1bbb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1714109
a5f1bbb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
63b95c1
a5f1bbb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7f5d383
a5f1bbb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
import spaces
import torch
import numpy as np
import pandas as pd

# Import the PyFLAGR modules for rank aggregation
import pyflagr.Linear as Linear
import pyflagr.Majoritarian as Majoritarian

from operator import itemgetter

from haystack import Document
# from haystack.pipeline import Pipeline
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.embedders import SentenceTransformersTextEmbedder, SentenceTransformersDocumentEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever

from sentence_transformers import SentenceTransformer

from tqdm import tqdm
from sklearn.metrics.pairwise import cosine_similarity
from transformers import AutoModelForCausalLM, AutoTokenizer, AutoModel, pipeline


class LLMGenerator:
    def __init__(self, llm_model, tokenizer, llm_name):
        self.llm_model = llm_model
        self.tokenizer = tokenizer
        self.llm_name = llm_name

    # @spaces.GPU(duration=120)
    def generate_answer(self, texts, query, mode='validate'):
        template_texts =""
        for i, text in enumerate(texts):
            template_texts += f'{i+1}. {text} \n'

        if mode == 'validate':
            conversation = [ {'role': 'user', 'content': f'Given the following query: "{query}"? \nIs the following document relevant to answer this query?\n{template_texts} \nResponse: Yes / No'} ]
        elif mode == 'summarize':
            conversation = [ {'role': 'user', 'content': f'For the following query and documents, try to answer the given query based on the documents.\nQuery: {query} \nDocuments: {template_texts}.'} ]
        elif mode == 'h_summarize':
            conversation = [ {'role': 'user', 'content': f'The documents below describe a developing disaster event. Based on these documents, write a brief summary in the form of a paragraph, highlighting the most crucial information. \nDocuments: {template_texts}'} ]

        prompt = self.tokenizer.apply_chat_template(conversation, tokenize=False, add_generation_prompt=True)
        inputs = self.tokenizer(prompt, return_tensors="pt").to(self.llm_model.device) 
        outputs = self.llm_model.generate(**inputs, use_cache=True, max_length=4096,do_sample=True,temperature=0.7,top_p=0.95,top_k=10,repetition_penalty=1.1)
        output_text = self.tokenizer.decode(outputs[0]) 
        if self.llm_name == "solar":
            assistant_respond = output_text.split("Assistant:")[1]
        elif self.llm_name == "phi3mini":
            assistant_respond = output_text.split("<|assistant|>")[1]
            assistant_respond = assistant_respond[:-7]
        else:
            assistant_respond = output_text.split("[/INST]")[1]
        if mode == 'validate':
            if 'Yes' in assistant_respond:
                return True
            else:
                return False
        elif mode == 'summarize':
            return assistant_respond
        elif mode == 'h_summarize':
            return assistant_respond


class QAIndexer:
    def __init__(self, index_type, emb_model):
        # @spaces.GPU(duration=1000)
        self.document_embedder = SentenceTransformersDocumentEmbedder(model=emb_model)
        self.document_embedder.warm_up()
        if index_type == 'in_memory':
            self.document_store = InMemoryDocumentStore(embedding_similarity_function="cosine")


    def index(self, docs_to_index):
        documents_with_embeddings = self.document_embedder.run(docs_to_index)
        self.document_store.write_documents(documents_with_embeddings['documents'])

    
    def index_dataframe(self, data):
        docs_to_index = self.read_dataframe(data)
        self.index(docs_to_index)
    
    def index_stream(self, stream_data):
        docs_to_index = self.read_stream(stream_data)
        self.index(docs_to_index)
    
    def read_dataframe(self, data):
        # Convert Dataframe to list of dicts for DocumentStore
        docs_to_index = [Document(content=row['text'],id=str(row['order'])) for idx, row in data.iterrows()]
        return docs_to_index
    
    def read_stream(self, stream_data):
        # stream consist of single docs for now
        docs_to_index = [Document(content=doc['text'],id=str(doc['id'])) for doc in [stream_data]]
        return docs_to_index


class QARetriever:
    def __init__(self, document_store):
        self.retriever = InMemoryEmbeddingRetriever(document_store=document_store)

    def retrieve(self, query, topk):
        retrieval_results = self.retriever.run(query_embedding=query, top_k=topk)
        documents = [x.to_dict() for x in retrieval_results["documents"]]
        return documents


def rank_aggregation(aggregator, lists, k):
    if aggregator == 'linear':
        csum = Linear.CombSUM(norm='score')
        df_out, df_eval = csum.aggregate(input_file=lists)
    elif aggregator == 'outrank':
        outrank = Majoritarian.OutrankingApproach(eval_pts=7)
        df_out, df_eval = outrank.aggregate(input_file=lists)

    df_out['query_ids'] = df_out.index
    queries = list(df_out['query_ids'].unique())
    results = []
    for query in queries:
        df_query = df_out[df_out['query_ids'] == query][:k]
        rank = 0
        for index, r_q in df_query.iterrows():
            rank += 1
            doc_id = r_q['Voter']
            score_doc = r_q['Score']
            results.append({'qid': query, 'docid': doc_id, 'rank': rank, 'score': score_doc})
    return results


class GenraPipeline:
    #@spaces.GPU(duration=120)
    def __init__(self, llm_name, emb_model, aggregator, contexts):
        self.qa_indexer = QAIndexer('in_memory', emb_model)
        self.qa_retriever = QARetriever(self.qa_indexer.document_store)
        self.encoder = SentenceTransformer(emb_model)
        self.contexts = contexts
        self.aggregator = aggregator
        self.answers_store = {}
        if llm_name == 'solar':
            self.tokenizer = AutoTokenizer.from_pretrained("Upstage/SOLAR-10.7B-Instruct-v1.0", use_fast=True)
            self.llm_model = AutoModelForCausalLM.from_pretrained(
            "Upstage/SOLAR-10.7B-Instruct-v1.0",
            device_map="auto", #device_map="cuda"
            #torch_dtype=torch.float16,
            )
            self.llm_generator = LLMGenerator(self.llm_model, self.tokenizer, llm_name)
        elif llm_name == 'mistral':
            self.tokenizer = AutoTokenizer.from_pretrained("mistralai/Mistral-7B-Instruct-v0.2", use_fast=True)
            self.llm_model = AutoModelForCausalLM.from_pretrained(
            "mistralai/Mistral-7B-Instruct-v0.2",
            device_map="auto", #device_map="cuda"
            #torch_dtype=torch.float16,
            )
            self.llm_generator = LLMGenerator(self.llm_model, self.tokenizer, llm_name)
        elif llm_name == 'phi3mini':
            self.tokenizer = AutoTokenizer.from_pretrained("microsoft/Phi-3-mini-128k-instruct", use_fast=True)
            self.llm_model = AutoModelForCausalLM.from_pretrained(
            "microsoft/Phi-3-mini-128k-instruct", 
            device_map="auto", 
            torch_dtype="auto", 
            trust_remote_code=True, 
            )
            self.llm_generator = LLMGenerator(self.llm_model, self.tokenizer, llm_name)
    
    def retrieval(self, batch_number, queries, topk, summarize_results=True):
        for qid,question in tqdm(queries[['id','query']].values):
            if len(self.contexts)<1:
                self.contexts.append(question)
            all_emb_c = []
            for c in self.contexts:
                c_emb = self.encoder.encode([c], convert_to_numpy=True)[0]
                all_emb_c.append(np.array(c_emb))
            all_emb_c = np.array(all_emb_c)
            avg_emb_c = np.mean(all_emb_c, axis=0)
            avg_emb_c = avg_emb_c.reshape((1, len(avg_emb_c)))
            # we want a list of floats for haystack retrievers
            hits = self.qa_retriever.retrieve(avg_emb_c[0].tolist(), 20) # topk or more?
            hyde_texts = []
            candidate_texts = []
            hit_count = 0
            while len(candidate_texts) < 5:
                if hit_count < len(hits):
                    json_doc = hits[hit_count]
                    doc_text = json_doc['content'] 
                    if self.llm_generator.generate_answer([doc_text], question, mode='validate'):
                        candidate_texts.append(doc_text) #candidate_texts.append(doc_text[0])
                    hit_count += 1
                else:
                    break
            if len(candidate_texts)<1:
                # no unswerable result
                results = []
            else:
                all_emb_c = []
                all_hits = []
                for i, c in enumerate(candidate_texts):
                    c_emb = self.encoder.encode([c], convert_to_numpy=True)[0]
                    c_emb = c_emb.reshape((1, len(c_emb)))
                    c_hits = self.qa_retriever.retrieve(c_emb[0].tolist(), topk) # changed to len(candidates)+1
                    rank=0
                    for hit in c_hits: # get each ranking with pyflagr format
                        rank += 1
                        # penalize score wrt hit counts (the smaller the better!)
                        all_hits.append({'qid': qid, 'voter':i, 'docid': hit['id'], 'rank': rank, 'score': hit['score']})
                # write pyglagr aggregation files
                tempfile = 'temp_rankings_file'
                with open(tempfile, 'w') as f:
                    for res in all_hits:
                        f.write(f"{res['qid']},V{res['voter']},{res['docid']},{res['score']},test\n")
                # run aggregation
                results = rank_aggregation(self.aggregator, tempfile, topk)
            
                # enhance each result with doc info
                for res in results:
                    res['document'] = self.qa_indexer.document_store.filter_documents(filters={'id':str(res['docid'])})[0].content

            if summarize_results:
                summary = self.summarize_results(question, results, candidate_texts)

            self.store_results(batch_number, question, results, summary)


    def store_results(self, batch_number, question, results, summary):
        if results:
            tweets = [t['document'] for t in results]
            if question in self.answers_store:
                self.answers_store[question].append({'batch_number':batch_number, 'tweets':tweets, 'summary':summary})
            else:
                self.answers_store[question] = [{'batch_number':batch_number, 'tweets':tweets, 'summary':summary}]
                


    def summarize_results(self, question, results, candidate_texts):
        if results:
            texts = [t['document'] for t in results] #+ candidate_texts
            summary = self.llm_generator.generate_answer(texts, question, mode='summarize')
        else:
            summary = "N/A"
        return summary

    def summarize_history(self, queries):
        h_per_q = []
        for qid,question in tqdm(queries[['id','query']].values):
            if question in self.answers_store:
                q_history = self.answers_store[question]
                q_hist_docs = self.order_history(q_history)
                h_per_q.extend(q_hist_docs)
        historical_summary = self.llm_generator.generate_answer(h_per_q, question, mode='h_summarize')
        return historical_summary
    
    def order_history(self, query_history):
        ordered_history = sorted(query_history, key=itemgetter('batch_number'))
        ordered_docs = [hist['summary'] for hist in ordered_history]
        return ordered_docs