pdf-multilanguage-qa-role / pdf_processor.py
stivenDR14
update model and prompt
1c7d5c8
import json
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_ollama import OllamaEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_ollama import OllamaLLM
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from ibm_watsonx_ai.metanames import EmbedTextParamsMetaNames
from langchain_ibm import WatsonxLLM, WatsonxEmbeddings
from langchain_huggingface import HuggingFaceEndpoint, HuggingFaceEmbeddings
from ibm_watsonx_ai import APIClient, Credentials
from utils import AI_MODELS, TRANSLATIONS
import chromadb
import requests
import os
from dotenv import load_dotenv
import re
from sklearn.cluster import KMeans
from sklearn.metrics.pairwise import cosine_similarity
OLLAMA_LLM = "granite3.1-dense"
OLLAMA_EMBEDDINGS = "granite-embedding:278m"
load_dotenv()
ENVIRONMENT = os.getenv("ENVIRONMENT")
HUGGINGFACE_TOKEN = os.getenv("HUGGINGFACE_TOKEN")
api_key_watsonx = os.getenv('WATSONX_APIKEY')
projectid_watsonx = os.getenv('WATSONX_PROJECT_ID')
endpoint_watsonx = "https://us-south.ml.cloud.ibm.com"
def set_up_watsonx():
token_watsonx = authenticate_watsonx(api_key_watsonx)
if token_watsonx == None:
return None
parameters = {
"max_new_tokens": 1500,
"min_new_tokens": 1,
"temperature": 0.7,
"top_k": 50,
"top_p": 1,
}
embed_params = {
EmbedTextParamsMetaNames.TRUNCATE_INPUT_TOKENS: 1,
EmbedTextParamsMetaNames.RETURN_OPTIONS: {"input_text": True},
}
credentials = Credentials(
url = endpoint_watsonx,
api_key = api_key_watsonx,
)
client = APIClient(credentials, project_id=projectid_watsonx)
client.set_token(token_watsonx)
watsonx_llm = WatsonxLLM(
model_id="ibm/granite-3-2-8b-instruct",
watsonx_client=client,
params = parameters
)
watsonx_embedding = WatsonxEmbeddings(
model_id="ibm/granite-embedding-278m-multilingual",
url=endpoint_watsonx,
project_id=projectid_watsonx,
params=embed_params,
)
return watsonx_llm, watsonx_embedding
def authenticate_watsonx(api_key):
url = "https://iam.cloud.ibm.com/identity/token"
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"grant_type": "urn:ibm:params:oauth:grant-type:apikey",
"apikey": api_key
}
response = requests.post(url, headers=headers, data=data)
if response.status_code == 200:
token = response.json().get('access_token')
os.environ["WATSONX_TOKEN"] = token
return token
else:
print("Authentication failed. Status code:", response.status_code)
print("Response:", response.text)
return None
class PDFProcessor:
def __init__(self):
self.language = list(TRANSLATIONS.keys())[0]
def set_language(self, language):
self.language = language
def set_llm(self, ai_model, type_model, api_key, project_id_watsonx):
if ai_model == "Open AI / GPT-4o-mini":
current_llm = ChatOpenAI(
model="gpt-4o",
temperature=0.5,
max_tokens=None,
timeout=None,
max_retries=2,
api_key=api_key,
)
embeding_model = OpenAIEmbeddings(
model="text-embedding-3-small",
api_key=api_key,
)
elif ai_model == "IBM Granite3.1 dense / Ollama local":
if type_model == "Local":
try:
# Verificar que Ollama est谩 funcionando y el modelo est谩 disponible
current_llm = OllamaLLM(model=OLLAMA_LLM)
# Intenta hacer un embedding de prueba
test_embedding = OllamaEmbeddings(model=OLLAMA_EMBEDDINGS)
test_embedding.embed_query("test")
embeding_model = test_embedding
except Exception as e:
print(f"Error with Ollama: {e}")
# Fallback a otro modelo o manejo de error
raise Exception("Please ensure Ollama is running and the models are pulled: \n" +
f"ollama pull {OLLAMA_LLM}\n" +
f"ollama pull {OLLAMA_EMBEDDINGS}")
else:
current_llm, embeding_model = set_up_watsonx()
else:
if ENVIRONMENT != "dev":
print("HUGGINGFACE accessing")
current_llm = HuggingFaceEndpoint(
repo_id= AI_MODELS[ai_model],
temperature=0.2,
huggingfacehub_api_token=HUGGINGFACE_TOKEN,
)
else:
current_llm = HuggingFaceEndpoint(
repo_id= AI_MODELS[ai_model],
temperature=0.2,
)
embeding_model = HuggingFaceEmbeddings(
model_name="ibm-granite/granite-embedding-278m-multilingual",
)
return current_llm, embeding_model
def process_pdf(self, vectorstore, pdf_file, chunk_size, chunk_overlap, ai_model, type_model, api_key, project_id_watsonx):
defined_chunk_size = 1000
defined_chunk_overlap = 150
if (ai_model == "Open AI / GPT-4o-mini" and (api_key == "")) : #or (ai_model == "IBM Granite3.1 dense / Ollama local" and type_model == "Api Key" and (api_key == "" or project_id_watsonx == "")
return TRANSLATIONS[self.language]["api_key_required"]
if pdf_file is not None:
loader = PyPDFLoader(file_path=pdf_file.name)
documents = loader.load()
#delete empty page_content documents from documents
documents = [doc for doc in documents if doc.page_content]
if(ai_model == "Open AI / GPT-4o-mini" or ai_model == "IBM Granite3.1 dense / Ollama local"):
if type_model == "Api Key":
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=defined_chunk_size,
chunk_overlap=defined_chunk_overlap,
separators=["\n\n", "\n"]
)
else:
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=defined_chunk_size,
chunk_overlap=defined_chunk_overlap,
)
else:
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=defined_chunk_size,
chunk_overlap=defined_chunk_overlap
)
#print(text_splitter)
texts = text_splitter.split_documents(documents)
_, embeddings = self.set_llm(ai_model, type_model, api_key, project_id_watsonx)
print("vectorstore: ", vectorstore)
#delete all documents from the vectorstore
if vectorstore:
vectorstore.delete_collection()
chromadb.api.client.SharedSystemClient.clear_system_cache()
new_client = chromadb.EphemeralClient()
vectorstore = Chroma.from_documents(
documents=texts,
embedding=embeddings,
client=new_client,
collection_name="pdf_collection"
#persist_directory="./chroma_db"
)
print("vectorstore: ", vectorstore)
return TRANSLATIONS[self.language]["pdf_processed"], vectorstore #+ f" ---- Chunks: {len(vectorstore.get()["documents"])}"
else:
return TRANSLATIONS[self.language]["load_pdf_first"], None
def get_qa_response(self, vectorstore, message, history, ai_model, type_model, api_key, project_id_watsonx, k=4):
current_llm, _ = self.set_llm(ai_model, type_model, api_key, project_id_watsonx)
if not vectorstore:
return TRANSLATIONS[self.language]["load_pdf_first"]
retriever = vectorstore.as_retriever(search_kwargs={"k": k})
qa_chain = RetrievalQA.from_chain_type(
llm=current_llm,
chain_type="stuff",
retriever=retriever,
return_source_documents=True,
)
result = qa_chain.invoke({"query": f"{message}.\n You must answer it in {self.language}. Remember not to mention anything that is not in the text. Do not extend information that is not provided in the text. "})
unique_page_labels = {doc.metadata['page_label'] for doc in result["source_documents"]}
page_labels_text = " & ".join([f"Page: {page}" for page in sorted(unique_page_labels)])
return result["result"] + "\n\nSources: " + page_labels_text
def summarizer_by_k_means(self, vectorstore, ai_model, type_model, api_key, project_id_watsonx, k, summary_prompt, just_get_documments=False):
print("Summarizer by k means in language: ", self.language)
if not vectorstore:
return TRANSLATIONS[self.language]["load_pdf_first"]
current_llm, _ = self.set_llm(ai_model, type_model, api_key, project_id_watsonx)
# Get all the documents from the vectorstore
documents = vectorstore.get(include=["embeddings", "documents"])
documentsByIds = documents["ids"]
documentsByEmbeddings = documents["embeddings"]
documentsByDocuments = documents["documents"]
print("documents length: ", len(documentsByEmbeddings))
#depending on the length of the documents, create a number of clusters, if is les than 12, create 3 clusters, if is les than 36, create 6 clusters, if is less than 108, create 12 clusters, else create 24 clusters
number_for_CreateClusters = 2
if len(documentsByEmbeddings) <= 16:
number_for_CreateClusters = 2
elif len(documentsByEmbeddings) <= 64:
number_for_CreateClusters = 4
elif len(documentsByEmbeddings) <= 128:
number_for_CreateClusters = 8
else:
number_for_CreateClusters = 12
num_clusters = max(1, len(documentsByEmbeddings) // number_for_CreateClusters)
print("num_clusters: ", num_clusters)
kmeans = KMeans(n_clusters=num_clusters, random_state=42)
kmeans.fit(documentsByEmbeddings)
summary_documents = []
map_ids_documents = {}
#for each cluster, choose the document embedding with the highest similarity to the centroid, based on numpy cosine similarity, and keep a map of ids of the documents
for i in range(num_clusters):
# Get the indices of the documents in the cluster
cluster_indices = [j for j, label in enumerate(kmeans.labels_) if label == i]
if not cluster_indices: # If there are no documents in this cluster, continue
continue
# Get the embeddings of the documents in this cluster
cluster_embeddings = [documentsByEmbeddings[j] for j in cluster_indices]
# Calculate the similarity with the centroid
centroid = kmeans.cluster_centers_[i]
similarities = [cosine_similarity([embedding], [centroid])[0][0] for embedding in cluster_embeddings]
# Find the most similar document to the centroid
most_similar_index = cluster_indices[similarities.index(max(similarities))]
# Add the most similar document to the summary list
summary_documents.append(documentsByDocuments[most_similar_index])
map_ids_documents[most_similar_index] = documentsByIds[most_similar_index]
print("map_ids_documents: ", map_ids_documents)
# Join the summary documents into a single string
summary_text = "\n".join(summary_documents)
print("summary_documents: ", summary_text)
if just_get_documments:
return summary_text
summary_chain = summary_prompt | current_llm
final_summary = summary_chain.invoke({"texts": summary_text, "language": self.language})
return final_summary
def get_summary(self, vectorstore, ai_model, type_model, api_key, project_id_watsonx, just_get_documments=False, k=10):
final_summary_prompt = PromptTemplate(
input_variables=["texts", "language"],
template="""
Combine the following texts into a cohesive and structured summary:
------------
{texts}
------------
Preserve the original meaning without adding external information or interpretations.
Ensure clarity, logical flow, and coherence between the combined points.
The summary must be in {language}.
The output must be in markdown format.
Summary:
"""
)
return self.summarizer_by_k_means(vectorstore, ai_model, type_model, api_key, project_id_watsonx, k, final_summary_prompt, just_get_documments)
def get_specialist_opinion(self, vectorstore, ai_model, type_model, api_key, project_id_watsonx, specialist_prompt):
questions_prompt = PromptTemplate(
input_variables=["text", "specialist_prompt", "language"],
template="""
* Act as a specialist based on the following instructions and behaviour that you will follow:
------------
{specialist_prompt}
------------
* Based on your role as specialist, create some different sintetized and concise aspects to ask to the knowledge base of the document about the following text:
------------
{text}
------------
* The key aspects and questions must be provided in JSON format with the following structure:
{{
"aspects": [
"Aspect 1",
"Aspect 2",
"Aspect 3",
"Aspect 4",
"Aspect 5",
"Aspect 6",
"Aspect 7",
"Aspect 8",
"Aspect 9",
"Aspect 10",
]
}}
------------
*Example of valid output:
{{
"aspects": [
"Finished date of the project",
"Payment of the project",
"Project extension"
]
}}
------------
* The aspects must be redacted in the language of {language}.
* The given structure must be followed strictly in front of the keys, just use the list of aspects, do not add any other key.
* Generate until 10 different aspects.
------------
Answer:
"""
)
if not vectorstore:
return TRANSLATIONS[self.language]["load_pdf_first"]
print(ai_model)
print(type_model)
current_llm, _ = self.set_llm(ai_model, type_model, api_key, project_id_watsonx)
summary_text = self.get_summary(vectorstore, ai_model, type_model, api_key, project_id_watsonx, True, 10)
questions_chain = questions_prompt | current_llm
questions = questions_chain.invoke({"text": summary_text, "specialist_prompt": specialist_prompt, "language": self.language})
print(questions)
# Usar una expresi贸n regular para extraer el JSON
match = re.search(r'\{.*\}', questions, re.DOTALL)
if match:
questions = match.group(0)
else:
raise ValueError("No valid JSON found in the response")
questions = questions.strip()
questions = json.loads(questions)
print(questions)
if len(questions["aspects"]) > 15:
questions["aspects"] = questions["aspects"][:15]
else:
questions["aspects"] = questions["aspects"]
aspects_text = "\n".join([f"* {aspect}: {self.get_qa_response(vectorstore, aspect, [], ai_model, type_model, api_key, project_id_watsonx, 2)}" for aspect in questions["aspects"]])
return aspects_text
""" Act煤a como un abogado altamente experimentado en derecho civil y contractual.
Examina si existen cl谩usulas abusivas, desproporcionadas o contrarias a la normativa vigente, y expl铆calas con claridad.
Basa tu an谩lisis en principios relevantes del derecho civil y contractual.
Ofrece un argumento estructurado y recomendaciones pr谩cticas.
Si hay m煤ltiples interpretaciones posibles, pres茅ntalas de manera objetiva.
Mant茅n un tono profesional, preciso y fundamentado.
Basado en lo que analices, proporciona una evaluaci贸n legal detallada """
""" Eres profesional en gerencia de proyectos y tienes una amplia experiencia en la creaci贸n, direcci贸n y ejecuci贸n de proyectos de tecnologia.
- Basa tu analisis en los objetivos el proyecto, el nicho en que se enfocan y su propuesta de valor.
- Ofrece un argumento estructurado y recomendaciones pr谩cticas en base a otros posibles nichos y soluciones relacionadas.
- Mant茅n un tono profesional, preciso y fundamentado.
Basado en el documento y tu experiencia, proporciona una evaluaci贸n detallada de los proyectos y actividades que se analizaron.
"""
""" Act煤a como un psicologo experto en recursos humanos, con amplia experiencia en el mejoramiento de hoas de vida de aspirantes a empleados.
Basado en el siguiente texto que detalla una vacante de trabajo, proporciona una evaluaci贸n detallada de c贸mo esa persona puede mejorar su perfil para ser contratada.
Descripci贸n de la vacante:
"""
""" Act煤a como un asesor e ingeniero financiero experto en lectura de reportes y an谩lisis de datos.
Basado en los datos y conclusiones del reporte, proporciona una evaluaci贸n financiera detallada y posibles escenarios tanto negativos como positivos que se puedan presentar.
Establece el riesgo que se corre en cada escenario, la probabilidad de ocurrencia de cada uno y la magnitud del impacto en el recurso.
Si hay m煤ltiples interpretaciones posibles, pres茅ntalas de manera objetiva.
Realiza una hip贸tesis que pronostique el futuro de la situaci贸n o recurso analizado, teniendo en cuenta los datos y conclusiones del reporte.
Presenta tus hipotesis en 3 aspectos, corto, mediano y largo plazo.
Mant茅n un tono profesional, preciso y fundamentado.
Basado en lo que analices, proporciona una evaluaci贸n en detalle sobre los activos, reportes y/o recursos que se analizaron"""