import streamlit as st from docx import Document import os from langchain_core.prompts import PromptTemplate from transformers import AutoTokenizer, AutoModelForCausalLM import torch import time from sentence_transformers import SentenceTransformer from langchain.vectorstores import Chroma from langchain.docstore.document import Document as Document2 from langchain_community.embeddings import HuggingFaceEmbeddings import cohere from langchain_core.prompts import PromptTemplate # Load token from environment variable token = os.getenv("HF_TOKEN") print("my token is ", token) # Save the token to Hugging Face's system directory docs_folder = "./converted_docs" # Function to load .docx files from Google Drive folder def load_docx_files_from_drive(drive_folder): docx_files = [f for f in os.listdir(drive_folder) if f.endswith(".docx")] documents = [] for file_name in docx_files: file_path = os.path.join(drive_folder, file_name) doc = Document(file_path) content = "\n".join([p.text for p in doc.paragraphs if p.text.strip()]) documents.append(content) return documents # Load .docx files from Google Drive folder documents = load_docx_files_from_drive(docs_folder) def split_extracted_text_into_chunks(documents): print("Splitting text into chunks") # List to hold all chunks chunks = [] for doc_text in documents: # Split the document text into lines lines = doc_text.splitlines() # Initialize variables for splitting current_chunk = [] for line in lines: # Check if the line starts with "File Name:" if line.startswith("File Name:"): # If there's a current chunk, save it before starting a new one if current_chunk: chunks.append("\n".join(current_chunk)) current_chunk = [] # Reset the current chunk # Add the line to the current chunk current_chunk.append(line) # Add the last chunk for the current document if current_chunk: chunks.append("\n".join(current_chunk)) return chunks # Split the extracted documents into chunks chunks = split_extracted_text_into_chunks(documents) def save_chunks_to_file(chunks, output_file_path): print("Saving chunks to file") # Open the file in write mode with open(output_file_path, "w", encoding="utf-8") as file: for i, chunk in enumerate(chunks, start=1): # Write each chunk with a header for easy identification file.write(f"Chunk {i}:\n") file.write(chunk) file.write("\n" + "=" * 50 + "\n") # Path to save the chunks file output_file_path = "./chunks_output.txt" # Split the extracted documents into chunks chunks = split_extracted_text_into_chunks(documents) # Save the chunks to the file save_chunks_to_file(chunks, output_file_path) # Step 1: Load the model through LangChain's wrapper embedding_model = HuggingFaceEmbeddings( model_name="Omartificial-Intelligence-Space/Arabic-Triplet-Matryoshka-V2" ) print("#0") # Step 2: Embed the chunks (now simplified) def embed_chunks(chunks): status_text = st.empty() progress_bar = st.progress(0) results = [] total_chunks = len(chunks) for i, chunk in enumerate(chunks): result = { "chunk": chunk, "embedding": embedding_model.embed_query(chunk) } results.append(result) progress = (i + 1) / total_chunks progress_bar.progress(progress) status_text.text(f"Processed {i+1}/{total_chunks} chunks ({progress:.0%})") progress_bar.progress(1.0) status_text.text("Embedding complete!") return results embeddings = embed_chunks(chunks) print("#1") # Step 3: Prepare documents (unchanged) def prepare_documents_for_chroma(embeddings): status_text = st.empty() progress_bar = st.progress(0) documents = [] total_entries = len(embeddings) for i, entry in enumerate(embeddings, start=1): doc = Document2( page_content=entry["chunk"], metadata={"chunk_index": i} ) documents.append(doc) progress = i / total_entries progress_bar.progress(progress) status_text.text(f"Processing document {i}/{total_entries} ({progress:.0%})") progress_bar.progress(1.0) status_text.text(f"✅ Successfully prepared {total_entries} documents") return documents print("#2") documents = prepare_documents_for_chroma(embeddings) print("Creating the vectore store") # Step 4: Create Chroma store (fixed) vectorstore = Chroma.from_documents( documents=documents, embedding=embedding_model, # Proper embedding object persist_directory="./chroma_db", # Optional persistence ) class RAGPipeline: def __init__(self, vectorstore, api_key, model_name="c4ai-aya-expanse-8b", k=3): print("Initializing RAG Pipeline") self.vectorstore = vectorstore self.model_name = model_name self.k = k self.api_key = api_key self.client = cohere.Client(api_key) # Initialize the Cohere client self.retriever = self.vectorstore.as_retriever( search_type="mmr", search_kwargs={"k": 3} ) self.prompt_template = PromptTemplate.from_template(self._get_template()) def _get_template(self): return """[INST] <> أنت مساعد مفيد يقدم إجابات باللغة العربية بناءً على السياق المقدم. - أجب فقط باللغة العربية - إذا لم تجد إجابة في السياق، قل أنك لا تعرف - كن دقيقاً وواضحاً في إجاباتك -جاوب من السياق حصريا <> السياق: {context} السؤال: {question} الإجابة: [/INST]\ """ def generate_response(self, question): retrieved_docs = self._retrieve_documents(question) prompt = self._create_prompt(retrieved_docs, question) response = self._generate_response_cohere(prompt) return response def _retrieve_documents(self, question): retrieved_docs = self.retriever.invoke(question) # print("\n=== المستندات المسترجعة ===") # for i, doc in enumerate(retrieved_docs): # print(f"المستند {i+1}: {doc.page_content}") # print("==========================\n") # دمج النصوص المسترجعة في سياق واحد return " ".join([doc.page_content for doc in retrieved_docs]) def _create_prompt(self, docs, question): return self.prompt_template.format(context=docs, question=question) def _generate_response_cohere(self, prompt): # Call Cohere's generate API response = self.client.generate( model=self.model_name, prompt=prompt, max_tokens=2000, # Adjust token limit based on requirements temperature=0.3, # Control creativity stop_sequences=None, ) if response.generations: return response.generations[0].text.strip() else: raise Exception("No response generated by Cohere API.") st.title("Simple Text Generator") api_key = os.getenv("API_KEY") s = api_key[:5] print("KEY: ", s) rag_pipeline = RAGPipeline(vectorstore=vectorstore, api_key=api_key) print("Enter your question Here: ") question = st.text_input("أدخل سؤالك هنا") if st.button("Generate Answer"): response = rag_pipeline.generate_response(question) st.write(response) print("Question: ", question) print("Response: ", response)