import gradio as gr import tempfile from weaviate.client import Client import weaviate import time import pandas as pd from openpyxl import Workbook from openpyxl.utils.dataframe import dataframe_to_rows import tempfile from sentence_transformers import SentenceTransformer ############################ ### Variable Declaration ### ############################ # -- Global Variables g_product_details={} g_client=None g_weaviate_url="" g_ui_model_name="" def update_global_variables(ui_action_dropdown, ui_model_name,ui_weaviate_url,ui_chatbot,ui_download_excel, ui_upload_excel): global g_ui_model_name global g_weaviate_url # Reset values to defaults g_ui_model_name="" g_weaviate_url="" ui_product_dropdown=gr.Dropdown( interactive=False ) ui_download_excel = gr.File( visible=False, interactive=False ) ui_upload_excel = gr.UploadButton( visible=False ) ui_chatbot.clear() # Loading global variables ui_chatbot.append((None,"Loading Parameters, API Key & Weaviate URL")) try: # Validation for Model Details if ui_model_name != "": print('Setting g_ui_model_name - '+ui_model_name) g_ui_model_name=ui_model_name ui_chatbot.append((None,"Updated SBert Model")) else: print("exception in function - update_global_variables") raise ValueError('Required Sbert Model Name') # Validation for Weaviate URL if ui_weaviate_url != "": print('Setting g_weaviate_url - '+ui_weaviate_url) g_weaviate_url=ui_weaviate_url weaviate_client() ui_chatbot.append((None,"Updated Weaviate URL")) # Load Product Details update_products_variable() ui_product_dropdown = update_products_lov() else: print('Required Weaviate URL') ui_chatbot.append((None,"Required Weaviate URL")) # If Action = Query, Enable ui_download_excel if ui_action_dropdown == "Query": ui_upload_excel = gr.UploadButton( visible=True, interactive=True ) except Exception as e: print('Exception in loading parameters - '+str(e)) ui_chatbot.append((None,"Exception "+str(e)+"")) raise ValueError(str(e)) finally: return ui_chatbot,ui_product_dropdown,ui_download_excel, ui_upload_excel ############################ ###### Generic Code ####### ############################ # -- Generate Mapping HTML Table def convert_mapping_data_to_html_table(table_data): html_table = f"""
Input Key Description Certainty
{table_data['input']} {table_data['key']} {table_data['description']} {table_data['certainty']}


""" return html_table # -- Generate Object Search HTML Table def convert_object_id_data_to_html_table(table_data_items): html_table="" for table_data in table_data_items: html_table += f"""
Object ID Key Description
{table_data['id']} {table_data['key']} {table_data['description']}

""" # print(html_table) return html_table # -- Create Weaviate Connection def weaviate_client(): global g_client global g_weaviate_url try: g_client = Client(url=g_weaviate_url, timeout_config=(3.05, 9.1)) print("Weaviate client connected successfully!") except Exception as e: print("Failed to connect to the Weaviate instance."+str(e)) raise ValueError('Failed to connect to the Weaviate instance.') # -- Convert input to CamelCase def convert_to_camel_case(string): words = string.split('_') camel_case_words = [word.capitalize() for word in words] return ''.join(camel_case_words) # -- Create Sbert Embedding def creating_embeddings(sentences): global g_ui_model_name # print("Creating embedding for text"+ sentences) # Create OpenAI embeddings model = SentenceTransformer(g_ui_model_name) embeddings = model.encode(sentences) # for sentence, embedding in zip(sentences, embeddings): # print(embedding) # numpy.ndarray # print(embeddings.shape) return embeddings ############################ ## Update Product Details ## ############################ # -- Update Product LOV def update_products_lov(): global g_product_details print("started function - update_products_lov") product_details = [d["name"] for d in g_product_details] ui_product_dropdown = gr.Dropdown( choices=product_details, value=product_details[0], interactive=True ) print("completed function - update_products_lov") return ui_product_dropdown # -- Get Product global variable def update_products_variable(): global g_client global g_product_details print("started function - update_products_variable") try: api_response = g_client.query.get("Product", ["name","description"]).do() print("Product API Response") print(api_response) g_product_details = api_response['data']['Get']['Product'] product_details = [d["name"] for d in g_product_details] print("Product API Response") print(product_details) except Exception as e: print("Error getting Product Details") finally: print("completed function - update_products_variable") ############################ #### Search User Manual #### ############################ def search_um(ui_search_text, ui_product_dropdown): global g_client um_data = "No results from User Manual" print("started function - search_um") print("Product Selected -->"+ui_product_dropdown) try: if ui_product_dropdown: input_embedding=creating_embeddings(ui_search_text) vector = {"vector": input_embedding} response = g_client \ .query.get(convert_to_camel_case(ui_product_dropdown+"_um"), ["content", "_additional {certainty}"]) \ .with_near_vector(vector) \ .with_limit(1) \ .do() # print(result) if response: result = response['data']['Get'][convert_to_camel_case(ui_product_dropdown+"_um")][0]['content'] result_value = result.split('\nResult : ')[0] um_data = result_value else: um_data = "Please select product name to proceed" return um_data except Exception as e: raise ValueError(str(e)) finally: print("completed function - search_um") ############################ #### Search Mapping Data ### ############################ def search_mapping_data(ui_search_text, ui_product_dropdown): global g_client print("started function - search_mapping_data") print("Product Selected -->"+ui_product_dropdown) try: print("Performing Semantic Search") if ui_product_dropdown: input_embedding=creating_embeddings(ui_search_text) where_product_name = convert_to_camel_case(ui_product_dropdown+"_mapping") vector = {"vector": input_embedding} response = g_client \ .query.get(where_product_name, ["key","description", "_additional {certainty}"]) \ .with_near_vector(vector) \ .with_limit(1) \ .do() # print(result) if response: mapping = response['data']['Get'].get(convert_to_camel_case(ui_product_dropdown+"_mapping")) if mapping: for item in mapping: key = item['key'] description = item['description'] certainty = item['_additional']['certainty'] print("Key:", key) print("Description:", description) print("Certainty:", certainty) return { 'input': ui_search_text, 'key':key, 'description': description, 'certainty': certainty } else: print("Mapping has no data.") return { 'input': ui_search_text, 'key': None, 'description': None, 'certainty': None } except Exception as e: raise ValueError(str(e)) finally: print("completed function - search_mapping_data") def search_and_get_object_id_by_key(ui_search_text, ui_product_dropdown): global g_client items=[] print("started function - search_and_get_object_id_by_key") print("Product Selected -->"+ui_product_dropdown) try: print("Performing Normal Search") if ui_product_dropdown: product_name = convert_to_camel_case(ui_product_dropdown+"_mapping") where_filter = { "path": ["key"], "operator": "Equal", "valueString": ui_search_text } response = ( g_client.query .get(product_name, ["key","description"]) .with_where(where_filter) .with_limit(5) .with_additional(["id"]) .do() ) print(response) if response: mapping = response['data']['Get'].get(product_name) if mapping: for item in mapping: id = item['_additional']['id'] key = item['key'] description = item['description'] print("Id:", id) print("Key:", key) print("Description:", description) item = { 'input': ui_search_text, 'id': id, 'key':key, 'description': description } items.append(item) print("Added Item") else: print("Mapping has no data.") item= { 'input': ui_search_text, 'id': None, 'key': None, 'description': None } items.append(item) except Exception as e: print("Error - "+str(e)) raise ValueError(str(e)) finally: print("completed function - search_and_get_object_id_by_key") return items ############################ #### Update Mapping Data ### ############################ def update_mapping_by_object_id(ui_search_text, ui_product_dropdown): global g_client print("started function - update_mapping_by_object_id") try: object_id, description = ui_search_text.split(", ") embedding = creating_embeddings(description) product_name = convert_to_camel_case(ui_product_dropdown+"_mapping") data_object = { "description": description } g_client \ .data_object \ .update( data_object, class_name=product_name, uuid=object_id, consistency_level=weaviate.data.replication.ConsistencyLevel.ALL, vector=embedding ) except Exception as e: print("Update Error - "+str(e)) raise ValueError(str(e)) finally: print("completed function - update_mapping_by_object_id") ############################ #### Delete Mapping Data ### ############################ def delete_mapping_by_object_id(ui_search_text, ui_product_dropdown): global g_client print("completed function - delete_mapping_by_object_id") try: product_name = convert_to_camel_case(ui_product_dropdown+"_mapping") g_client. \ data_object.delete( ui_search_text, class_name=product_name, consistency_level=weaviate.data.replication.ConsistencyLevel.ALL ) except Exception as e: print("Delete Error - "+str(e)) raise ValueError(str(e)) finally: print("completed function - delete_mapping_by_object_id") ############################ ##### Search User Input #### ############################ def text_search(ui_action_dropdown, ui_product_dropdown, ui_search_text, ui_chatbot): print("started function - text_search") try: if ui_action_dropdown == 'Query': print("Starting to Query") ui_chatbot.append(("Searching: "+ ui_search_text,None)) um_search_results = search_um(ui_search_text, ui_product_dropdown) mapping_search_results = search_mapping_data(ui_search_text, ui_product_dropdown) ui_chatbot.append((None,"Mapping Results:
"+convert_mapping_data_to_html_table(mapping_search_results)+"User Manual Search Results:
"+um_search_results)) elif ui_action_dropdown == 'Get Object ID': print("Starting to Query Object ID") ui_chatbot.append(("Searching Object ID: "+ ui_search_text,None)) search_results = search_and_get_object_id_by_key(ui_search_text, ui_product_dropdown) ui_chatbot.append((None,"Object ID Results:
"+convert_object_id_data_to_html_table(search_results))) elif ui_action_dropdown == 'Update': print("Starting to Update") ui_chatbot.append(("Updating: "+ ui_search_text,None)) update_mapping_by_object_id(ui_search_text, ui_product_dropdown) elif ui_action_dropdown == 'Delete': print("Starting to Delete") ui_chatbot.append(("Deleting: "+ ui_search_text,None)) delete_mapping_by_object_id(ui_search_text, ui_product_dropdown) except Exception as e: print('Exception '+str(e)) ui_chatbot.append((None,"Exception "+str(e)+"")) finally: print("completed function - text_search") return ui_chatbot ############################ ##### Upload User Input #### ############################ def excel_file_search(ui_product_dropdown, ui_excel_upload, ui_chatbot): print("started function - excel_file_search") # Create an empty list to store the items items=[] output_file_path="" try: file_path = ui_excel_upload.name print("Uploaded xlsx location - "+file_path) # Read the Excel file xls = pd.ExcelFile(file_path) # Iterate over each sheet in the Excel file for sheet_name in xls.sheet_names: # Read the sheet into a DataFrame df = pd.read_excel(xls, sheet_name=sheet_name) # Iterate over each input value in the 'Input' column for input_value in df['Input']: # Create mapping search for each input mapping_search_results = search_mapping_data(input_value, ui_product_dropdown) # Create a dictionary item for the sheet item = { 'sheet': sheet_name, 'input': input_value, 'key': mapping_search_results['key'], 'description': mapping_search_results['description'], 'certainty': mapping_search_results['certainty'] } print('key: ' + item['key']) print('sheet: ' + item['sheet']) print('input: ' + item['input']) print('description: ' + item['description']) print('certainty: ' + str(item['certainty'])) # Append the item to the list items.append(item) # Creating xlsx file with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.xlsx', newline='\n') as temp_file: # Create a Pandas DataFrame from the items list df_items = pd.DataFrame(items) # Create a new Workbook object workbook = Workbook() # Iterate over each sheet in the DataFrame for sheet_name in df_items['sheet'].unique(): # Filter the DataFrame for the current sheet df_sheet = df_items[df_items['sheet'] == sheet_name] # Select only the 'key', 'description', and 'certainty' columns df_sheet = df_sheet[['input','key', 'description', 'certainty']] # Create a new sheet in the workbook sheet = workbook.create_sheet(title=sheet_name) # Write the DataFrame to the sheet for row in dataframe_to_rows(df_sheet, index=False, header=True): sheet.append(row) # Remove the default sheet created by openpyxl del workbook["Sheet"] # Save the Excel file workbook.save(temp_file.name) print("File Processing Completed - "+str(temp_file.name)) output_file_path=gr.File( visible=True, value=str(temp_file.name), interactive=True ) ui_chatbot.append((None, "File Processing Completed - "+str(temp_file.name))) if len(str(temp_file.name)) >0: gr.Button("Download", link="/file="+str(temp_file.name)) except Exception as e: print('Exception '+str(e)) ui_chatbot.append((None,"Exception "+str(e)+"")) finally: print("completed function - excel_file_search") return ui_chatbot, output_file_path ############################ ####### Main Program ####### ############################ # -- Start of Program - Main def main(): print("\nStarted Knowledge Base Chat Application") with gr.Blocks() as demo: with gr.Accordion("Settings"): ui_model_name=gr.Textbox(placeholder="Semantic Search Model, https://www.sbert.net/docs/pretrained_models.html#semantic-search",label="Semantic Search Model") ui_weaviate_url=gr.Textbox(placeholder="Weaviate URL, https://weaviate.xxx",label="Weaviate URL", type="password") ui_chatbot = gr.Chatbot([], elem_id="chatbot") with gr.Row(): with gr.Column(scale=0.2, min_width=0): ui_action_dropdown = gr.Dropdown( ["Query","Update","Delete","Get Object ID"], label="Action Type" ) with gr.Column(scale=0.2, min_width=0): ui_product_dropdown = gr.Dropdown( [], interactive=False, label="Select Product" ) with gr.Column(scale=0.6): ui_search_text = gr.Textbox( show_label=False, # lines=3.2, placeholder="Message me, I am your migration assistance", ) ui_upload_excel = gr.UploadButton("Upload Mapping File", file_types=["*.xlsx"]) ui_download_excel = gr.File(label="Download Recommendations", interactive=False, visible=False) # Loading global variables ui_action_dropdown.change( fn=update_global_variables, inputs=[ui_action_dropdown, ui_model_name,ui_weaviate_url,ui_chatbot,ui_download_excel, ui_upload_excel], outputs=[ui_chatbot,ui_product_dropdown,ui_download_excel, ui_upload_excel] ) try: # Search Text ui_search_text.submit(fn=text_search, inputs=[ui_action_dropdown, ui_product_dropdown, ui_search_text, ui_chatbot], outputs=[ui_chatbot] ) except Exception as e: ui_chatbot.append((None,"Exception Searching "+str(e)+"")) try: # Upload Mapping ui_upload_excel.upload(fn=excel_file_search, inputs=[ui_product_dropdown, ui_upload_excel, ui_chatbot], outputs=[ui_chatbot,ui_download_excel] ) except Exception as e: ui_chatbot.append((None,"Exception Searching Excel "+str(e)+"")) demo.launch(server_name="0.0.0.0",allowed_paths=["/tmp"]) # -- Calling Main Function if __name__ == '__main__': main()