import os from datetime import datetime import json from huggingface_hub import HfApi import gradio as gr import csv def serialize_docs(docs:list)->list: new_docs = [] for doc in docs: new_doc = {} new_doc["page_content"] = doc.page_content new_doc["metadata"] = doc.metadata new_docs.append(new_doc) return new_docs ## AZURE LOGGING - DEPRECATED # def log_on_azure(file, logs, share_client): # """Log data to Azure Blob Storage. # Args: # file (str): Name of the file to store logs # logs (dict): Log data to store # share_client: Azure share client instance # """ # logs = json.dumps(logs) # file_client = share_client.get_file_client(file) # file_client.upload_file(logs) # def log_interaction_to_azure(history, output_query, sources, docs, share_client, user_id): # """Log chat interaction to Azure and Hugging Face. # Args: # history (list): Chat message history # output_query (str): Processed query # sources (list): Knowledge base sources used # docs (list): Retrieved documents # share_client: Azure share client instance # user_id (str): User identifier # """ # try: # # Log interaction to Azure if not in local environment # if os.getenv("GRADIO_ENV") != "local": # timestamp = str(datetime.now().timestamp()) # prompt = history[1]["content"] # logs = { # "user_id": str(user_id), # "prompt": prompt, # "query": prompt, # "question": output_query, # "sources": sources, # "docs": serialize_docs(docs), # "answer": history[-1].content, # "time": timestamp, # } # # Log to Azure # log_on_azure(f"{timestamp}.json", logs, share_client) # except Exception as e: # print(f"Error logging on Azure Blob Storage: {e}") # error_msg = f"ClimateQ&A Error: {str(e)[:100]} - The error has been noted, try another question and if the error remains, you can contact us :)" # raise gr.Error(error_msg) # def log_drias_interaction_to_azure(query, sql_query, data, share_client, user_id): # """Log Drias data interaction to Azure and Hugging Face. # Args: # query (str): User query # sql_query (str): SQL query used # data: Retrieved data # share_client: Azure share client instance # user_id (str): User identifier # """ # try: # # Log interaction to Azure if not in local environment # if os.getenv("GRADIO_ENV") != "local": # timestamp = str(datetime.now().timestamp()) # logs = { # "user_id": str(user_id), # "query": query, # "sql_query": sql_query, # "time": timestamp, # } # log_on_azure(f"drias_{timestamp}.json", logs, share_client) # print(f"Logged Drias interaction to Azure Blob Storage: {logs}") # else: # print("share_client or user_id is None, or GRADIO_ENV is local") # except Exception as e: # print(f"Error logging Drias interaction on Azure Blob Storage: {e}") # error_msg = f"Drias Error: {str(e)[:100]} - The error has been noted, try another question and if the error remains, you can contact us :)" # raise gr.Error(error_msg) ## HUGGING FACE LOGGING def log_on_huggingface(log_filename, logs): """Log data to Hugging Face dataset repository. Args: log_filename (str): Name of the file to store logs logs (dict): Log data to store """ try: # Get Hugging Face token from environment hf_token = os.getenv("HF_LOGS_TOKEN") if not hf_token: print("HF_LOGS_TOKEN not found in environment variables") return # Get repository name from environment or use default repo_id = os.getenv("HF_DATASET_REPO", "timeki/climateqa_logs") # Initialize HfApi api = HfApi(token=hf_token) # Add timestamp to the log data logs["timestamp"] = datetime.now().strftime("%Y%m%d_%H%M%S_%f") # Convert logs to JSON string logs_json = json.dumps(logs) # Upload directly from memory api.upload_file( path_or_fileobj=logs_json.encode('utf-8'), path_in_repo=log_filename, repo_id=repo_id, repo_type="dataset" ) except Exception as e: print(f"Error logging to Hugging Face: {e}") def log_interaction_to_huggingface(history, output_query, sources, docs, share_client, user_id): """Log chat interaction to Hugging Face. Args: history (list): Chat message history output_query (str): Processed query sources (list): Knowledge base sources used docs (list): Retrieved documents share_client: Azure share client instance (unused in this function) user_id (str): User identifier """ try: # Log interaction if not in local environment if os.getenv("GRADIO_ENV") != "local": timestamp = str(datetime.now().timestamp()) prompt = history[1]["content"] logs = { "user_id": str(user_id), "prompt": prompt, "query": prompt, "question": output_query, "sources": sources, "docs": serialize_docs(docs), "answer": history[-1].content, "time": timestamp, } # Log to Hugging Face log_on_huggingface(f"chat/{timestamp}.json", logs) except Exception as e: print(f"Error logging to Hugging Face: {e}") error_msg = f"ClimateQ&A Error: {str(e)[:100]} - The error has been noted, try another question and if the error remains, you can contact us :)" raise gr.Error(error_msg) def log_drias_interaction_to_huggingface(query, sql_query, user_id): """Log Drias data interaction to Hugging Face. Args: query (str): User query sql_query (str): SQL query used data: Retrieved data user_id (str): User identifier """ try: if os.getenv("GRADIO_ENV") != "local": timestamp = str(datetime.now().timestamp()) logs = { "user_id": str(user_id), "query": query, "sql_query": sql_query, "time": timestamp, } log_on_huggingface(f"drias/drias_{timestamp}.json", logs) print(f"Logged Drias interaction to Hugging Face: {logs}") else: print("share_client or user_id is None, or GRADIO_ENV is local") except Exception as e: print(f"Error logging Drias interaction to Hugging Face: {e}") error_msg = f"Drias Error: {str(e)[:100]} - The error has been noted, try another question and if the error remains, you can contact us :)" raise gr.Error(error_msg)