|
import gradio as gr |
|
import tempfile |
|
from weaviate.client import Client |
|
import weaviate |
|
import time |
|
import pandas as pd |
|
from openpyxl import Workbook |
|
from openpyxl.utils.dataframe import dataframe_to_rows |
|
import tempfile |
|
from sentence_transformers import SentenceTransformer |
|
|
|
|
|
|
|
|
|
|
|
|
|
g_product_details={} |
|
g_client=None |
|
g_weaviate_url="" |
|
g_ui_model_name="" |
|
|
|
def update_global_variables(ui_action_dropdown, ui_model_name,ui_weaviate_url,ui_chatbot,ui_download_excel, ui_upload_excel): |
|
global g_ui_model_name |
|
global g_weaviate_url |
|
|
|
|
|
g_ui_model_name="" |
|
g_weaviate_url="" |
|
ui_product_dropdown=gr.Dropdown.update( |
|
interactive=False |
|
) |
|
ui_download_excel = gr.File.update( |
|
visible=False, |
|
interactive=False |
|
) |
|
ui_upload_excel = gr.UploadButton.update( |
|
visible=False |
|
) |
|
ui_chatbot.clear() |
|
|
|
|
|
ui_chatbot.append((None,"Loading Parameters, API Key & Weaviate URL")) |
|
|
|
try: |
|
|
|
if ui_model_name != "": |
|
print('Setting g_ui_model_name - '+ui_model_name) |
|
g_ui_model_name=ui_model_name |
|
ui_chatbot.append((None,"Updated SBert Model")) |
|
else: |
|
print("exception in function - update_global_variables") |
|
raise ValueError('Required Sbert Model Name') |
|
|
|
|
|
if ui_weaviate_url != "": |
|
print('Setting g_weaviate_url - '+ui_weaviate_url) |
|
g_weaviate_url=ui_weaviate_url |
|
weaviate_client() |
|
ui_chatbot.append((None,"Updated Weaviate URL")) |
|
|
|
|
|
update_products_variable() |
|
ui_product_dropdown = update_products_lov() |
|
else: |
|
print('Required Weaviate URL') |
|
ui_chatbot.append((None,"<b style='color:red'>Required Weaviate URL</b>")) |
|
|
|
|
|
if ui_action_dropdown == "Query": |
|
ui_upload_excel = gr.UploadButton.update( |
|
visible=True, |
|
interactive=True |
|
) |
|
|
|
except Exception as e: |
|
print('Exception in loading parameters - '+str(e)) |
|
ui_chatbot.append((None,"<b style='color:red'>Exception "+str(e)+"</b>")) |
|
raise ValueError(str(e)) |
|
finally: |
|
return ui_chatbot,ui_product_dropdown,ui_download_excel, ui_upload_excel |
|
|
|
|
|
|
|
|
|
|
|
|
|
def convert_mapping_data_to_html_table(table_data): |
|
|
|
html_table = f""" |
|
<table style="border-collapse: collapse; width: 100%;"> |
|
<tr> |
|
<th style="border: 1px solid black; text-align: center; padding: 8px;">Input</th> |
|
<th style="border: 1px solid black; text-align: center; padding: 8px;">Key</th> |
|
<th style="border: 1px solid black; text-align: center; padding: 8px;">Description</th> |
|
<th style="border: 1px solid black; text-align: center; padding: 8px;">Certainty</th> |
|
</tr> |
|
<tr> |
|
<td style="border: 1px solid black; text-align: center; padding: 8px;">{table_data['input']}</td> |
|
<td style="border: 1px solid black; text-align: center; padding: 8px;">{table_data['key']}</td> |
|
<td style="border: 1px solid black; text-align: center; padding: 8px;">{table_data['description']}</td> |
|
<td style="border: 1px solid black; text-align: center; padding: 8px;">{table_data['certainty']}</td> |
|
</tr> |
|
</table><br><br> |
|
""" |
|
|
|
return html_table |
|
|
|
|
|
def convert_object_id_data_to_html_table(table_data_items): |
|
|
|
html_table="" |
|
for table_data in table_data_items: |
|
html_table += f""" |
|
<table style="border-collapse: collapse; width: 100%;"> |
|
<tr> |
|
<th style="border: 1px solid black; text-align: center; padding: 8px;">Object ID</th> |
|
<th style="border: 1px solid black; text-align: center; padding: 8px;">Key</th> |
|
<th style="border: 1px solid black; text-align: center; padding: 8px;">Description</th> |
|
</tr> |
|
<tr> |
|
<td style="border: 1px solid black; text-align: center; padding: 8px;">{table_data['id']}</td> |
|
<td style="border: 1px solid black; text-align: center; padding: 8px;">{table_data['key']}</td> |
|
<td style="border: 1px solid black; text-align: center; padding: 8px;">{table_data['description']}</td> |
|
</tr> |
|
</table><br> |
|
""" |
|
|
|
|
|
return html_table |
|
|
|
|
|
def weaviate_client(): |
|
global g_client |
|
global g_weaviate_url |
|
|
|
try: |
|
g_client = Client(url=g_weaviate_url, timeout_config=(3.05, 9.1)) |
|
print("Weaviate client connected successfully!") |
|
except Exception as e: |
|
print("Failed to connect to the Weaviate instance."+str(e)) |
|
raise ValueError('Failed to connect to the Weaviate instance.') |
|
|
|
|
|
def convert_to_camel_case(string): |
|
words = string.split('_') |
|
camel_case_words = [word.capitalize() for word in words] |
|
return ''.join(camel_case_words) |
|
|
|
|
|
def creating_embeddings(sentences): |
|
global g_ui_model_name |
|
|
|
|
|
|
|
model = SentenceTransformer(g_ui_model_name) |
|
embeddings = model.encode(sentences) |
|
|
|
|
|
|
|
|
|
|
|
return embeddings |
|
|
|
|
|
|
|
|
|
|
|
|
|
def update_products_lov(): |
|
global g_product_details |
|
|
|
print("started function - update_products_lov") |
|
product_details = [d["name"] for d in g_product_details] |
|
ui_product_dropdown = gr.Dropdown.update( |
|
choices=product_details, |
|
value=product_details[0], |
|
interactive=True |
|
) |
|
print("completed function - update_products_lov") |
|
|
|
return ui_product_dropdown |
|
|
|
|
|
def update_products_variable(): |
|
global g_client |
|
global g_product_details |
|
|
|
print("started function - update_products_variable") |
|
|
|
try: |
|
api_response = g_client.query.get("Product", ["name","description"]).do() |
|
print("Product API Response") |
|
print(api_response) |
|
g_product_details = api_response['data']['Get']['Product'] |
|
product_details = [d["name"] for d in g_product_details] |
|
print("Product API Response") |
|
print(product_details) |
|
except Exception as e: |
|
print("Error getting Product Details") |
|
finally: |
|
print("completed function - update_products_variable") |
|
|
|
|
|
|
|
|
|
|
|
def search_um(ui_search_text, ui_product_dropdown): |
|
global g_client |
|
|
|
um_data = "No results from User Manual" |
|
|
|
print("started function - search_um") |
|
print("Product Selected -->"+ui_product_dropdown) |
|
|
|
try: |
|
|
|
if ui_product_dropdown: |
|
input_embedding=creating_embeddings(ui_search_text) |
|
vector = {"vector": input_embedding} |
|
|
|
response = g_client \ |
|
.query.get(convert_to_camel_case(ui_product_dropdown+"_um"), ["content", "_additional {certainty}"]) \ |
|
.with_near_vector(vector) \ |
|
.with_limit(1) \ |
|
.do() |
|
|
|
|
|
if response: |
|
result = response['data']['Get'][convert_to_camel_case(ui_product_dropdown+"_um")][0]['content'] |
|
result_value = result.split('\nResult : ')[0] |
|
um_data = result_value |
|
else: |
|
um_data = "Please select product name to proceed" |
|
|
|
return um_data |
|
|
|
except Exception as e: |
|
raise ValueError(str(e)) |
|
finally: |
|
print("completed function - search_um") |
|
|
|
|
|
|
|
|
|
|
|
def search_mapping_data(ui_search_text, ui_product_dropdown): |
|
global g_client |
|
|
|
print("started function - search_mapping_data") |
|
print("Product Selected -->"+ui_product_dropdown) |
|
try: |
|
print("Performing Semantic Search") |
|
if ui_product_dropdown: |
|
input_embedding=creating_embeddings(ui_search_text) |
|
|
|
where_product_name = convert_to_camel_case(ui_product_dropdown+"_mapping") |
|
vector = {"vector": input_embedding} |
|
response = g_client \ |
|
.query.get(where_product_name, ["key","description", "_additional {certainty}"]) \ |
|
.with_near_vector(vector) \ |
|
.with_limit(1) \ |
|
.do() |
|
|
|
|
|
if response: |
|
mapping = response['data']['Get'].get(convert_to_camel_case(ui_product_dropdown+"_mapping")) |
|
if mapping: |
|
for item in mapping: |
|
key = item['key'] |
|
description = item['description'] |
|
certainty = item['_additional']['certainty'] |
|
|
|
print("Key:", key) |
|
print("Description:", description) |
|
print("Certainty:", certainty) |
|
|
|
return { |
|
'input': ui_search_text, |
|
'key':key, |
|
'description': description, |
|
'certainty': certainty |
|
} |
|
else: |
|
print("Mapping has no data.") |
|
return { |
|
'input': ui_search_text, |
|
'key': None, |
|
'description': None, |
|
'certainty': None |
|
} |
|
|
|
except Exception as e: |
|
raise ValueError(str(e)) |
|
finally: |
|
print("completed function - search_mapping_data") |
|
|
|
def search_and_get_object_id_by_key(ui_search_text, ui_product_dropdown): |
|
global g_client |
|
items=[] |
|
|
|
print("started function - search_and_get_object_id_by_key") |
|
print("Product Selected -->"+ui_product_dropdown) |
|
|
|
try: |
|
print("Performing Normal Search") |
|
if ui_product_dropdown: |
|
|
|
product_name = convert_to_camel_case(ui_product_dropdown+"_mapping") |
|
where_filter = { |
|
"path": ["key"], |
|
"operator": "Equal", |
|
"valueString": ui_search_text |
|
} |
|
response = ( |
|
g_client.query |
|
.get(product_name, ["key","description"]) |
|
.with_where(where_filter) |
|
.with_limit(5) |
|
.with_additional(["id"]) |
|
.do() |
|
) |
|
print(response) |
|
|
|
if response: |
|
mapping = response['data']['Get'].get(product_name) |
|
if mapping: |
|
for item in mapping: |
|
id = item['_additional']['id'] |
|
key = item['key'] |
|
description = item['description'] |
|
|
|
print("Id:", id) |
|
print("Key:", key) |
|
print("Description:", description) |
|
item = { |
|
'input': ui_search_text, |
|
'id': id, |
|
'key':key, |
|
'description': description |
|
} |
|
items.append(item) |
|
print("Added Item") |
|
else: |
|
print("Mapping has no data.") |
|
item= { |
|
'input': ui_search_text, |
|
'id': None, |
|
'key': None, |
|
'description': None |
|
} |
|
items.append(item) |
|
|
|
except Exception as e: |
|
print("Error - "+str(e)) |
|
raise ValueError(str(e)) |
|
finally: |
|
print("completed function - search_and_get_object_id_by_key") |
|
return items |
|
|
|
|
|
|
|
|
|
|
|
def update_mapping_by_object_id(ui_search_text, ui_product_dropdown): |
|
global g_client |
|
|
|
print("started function - update_mapping_by_object_id") |
|
|
|
try: |
|
object_id, description = ui_search_text.split(", ") |
|
embedding = creating_embeddings(description) |
|
product_name = convert_to_camel_case(ui_product_dropdown+"_mapping") |
|
|
|
data_object = { |
|
"description": description |
|
} |
|
g_client \ |
|
.data_object \ |
|
.update( |
|
data_object, |
|
class_name=product_name, |
|
uuid=object_id, |
|
consistency_level=weaviate.data.replication.ConsistencyLevel.ALL, |
|
vector=embedding |
|
) |
|
|
|
except Exception as e: |
|
print("Update Error - "+str(e)) |
|
raise ValueError(str(e)) |
|
finally: |
|
print("completed function - update_mapping_by_object_id") |
|
|
|
|
|
|
|
|
|
|
|
def delete_mapping_by_object_id(ui_search_text, ui_product_dropdown): |
|
global g_client |
|
|
|
print("completed function - delete_mapping_by_object_id") |
|
|
|
try: |
|
product_name = convert_to_camel_case(ui_product_dropdown+"_mapping") |
|
g_client. \ |
|
data_object.delete( |
|
ui_search_text, |
|
class_name=product_name, |
|
consistency_level=weaviate.data.replication.ConsistencyLevel.ALL |
|
) |
|
except Exception as e: |
|
print("Delete Error - "+str(e)) |
|
raise ValueError(str(e)) |
|
finally: |
|
print("completed function - delete_mapping_by_object_id") |
|
|
|
|
|
|
|
|
|
|
|
def text_search(ui_action_dropdown, ui_product_dropdown, ui_search_text, ui_chatbot): |
|
|
|
print("started function - text_search") |
|
try: |
|
if ui_action_dropdown == 'Query': |
|
print("Starting to Query") |
|
ui_chatbot.append(("Searching: "+ ui_search_text,None)) |
|
um_search_results = search_um(ui_search_text, ui_product_dropdown) |
|
mapping_search_results = search_mapping_data(ui_search_text, ui_product_dropdown) |
|
|
|
ui_chatbot.append((None,"<b style='color:green'>Mapping Results: </b><br>"+convert_mapping_data_to_html_table(mapping_search_results)+"<b style='color:green'>User Manual Search Results: </b><br>"+um_search_results)) |
|
elif ui_action_dropdown == 'Get Object ID': |
|
print("Starting to Query Object ID") |
|
ui_chatbot.append(("Searching Object ID: "+ ui_search_text,None)) |
|
search_results = search_and_get_object_id_by_key(ui_search_text, ui_product_dropdown) |
|
ui_chatbot.append((None,"<b style='color:green'>Object ID Results: </b><br>"+convert_object_id_data_to_html_table(search_results))) |
|
elif ui_action_dropdown == 'Update': |
|
print("Starting to Update") |
|
ui_chatbot.append(("Updating: "+ ui_search_text,None)) |
|
update_mapping_by_object_id(ui_search_text, ui_product_dropdown) |
|
elif ui_action_dropdown == 'Delete': |
|
print("Starting to Delete") |
|
ui_chatbot.append(("Deleting: "+ ui_search_text,None)) |
|
delete_mapping_by_object_id(ui_search_text, ui_product_dropdown) |
|
except Exception as e: |
|
print('Exception '+str(e)) |
|
ui_chatbot.append((None,"<b style='color:red'>Exception "+str(e)+"</b>")) |
|
finally: |
|
print("completed function - text_search") |
|
return ui_chatbot |
|
|
|
|
|
|
|
|
|
|
|
def excel_file_search(ui_product_dropdown, ui_excel_upload, ui_chatbot): |
|
print("started function - excel_file_search") |
|
|
|
|
|
items=[] |
|
output_file_path="" |
|
|
|
try: |
|
file_path = ui_excel_upload.name |
|
print("Uploaded xlsx location - "+file_path) |
|
|
|
|
|
xls = pd.ExcelFile(file_path) |
|
|
|
|
|
for sheet_name in xls.sheet_names: |
|
|
|
|
|
df = pd.read_excel(xls, sheet_name=sheet_name) |
|
|
|
|
|
for input_value in df['Input']: |
|
|
|
mapping_search_results = search_mapping_data(input_value, ui_product_dropdown) |
|
|
|
|
|
item = { |
|
'sheet': sheet_name, |
|
'input': input_value, |
|
'key': mapping_search_results['key'], |
|
'description': mapping_search_results['description'], |
|
'certainty': mapping_search_results['certainty'] |
|
} |
|
|
|
print('key: ' + item['key']) |
|
print('sheet: ' + item['sheet']) |
|
print('input: ' + item['input']) |
|
print('description: ' + item['description']) |
|
print('certainty: ' + str(item['certainty'])) |
|
|
|
|
|
items.append(item) |
|
|
|
|
|
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.xlsx', newline='\n') as temp_file: |
|
|
|
df_items = pd.DataFrame(items) |
|
|
|
|
|
workbook = Workbook() |
|
|
|
|
|
for sheet_name in df_items['sheet'].unique(): |
|
|
|
df_sheet = df_items[df_items['sheet'] == sheet_name] |
|
|
|
|
|
df_sheet = df_sheet[['input','key', 'description', 'certainty']] |
|
|
|
|
|
sheet = workbook.create_sheet(title=sheet_name) |
|
|
|
|
|
for row in dataframe_to_rows(df_sheet, index=False, header=True): |
|
sheet.append(row) |
|
|
|
|
|
del workbook["Sheet"] |
|
|
|
|
|
workbook.save(temp_file.name) |
|
|
|
print("File Processing Completed - "+str(temp_file.name)) |
|
output_file_path=gr.File.update( visible=True, |
|
value=str(temp_file.name), |
|
interactive=True |
|
) |
|
ui_chatbot.append((None, "File Processing Completed - "+str(temp_file.name))) |
|
|
|
except Exception as e: |
|
print('Exception '+str(e)) |
|
ui_chatbot.append((None,"<b style='color:red'>Exception "+str(e)+"</b>")) |
|
finally: |
|
print("completed function - excel_file_search") |
|
return ui_chatbot, output_file_path |
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(): |
|
print("\nStarted Knowledge Base Chat Application") |
|
|
|
with gr.Blocks() as demo: |
|
with gr.Accordion("Settings"): |
|
ui_model_name=gr.Textbox(placeholder="Semantic Search Model, https://www.sbert.net/docs/pretrained_models.html#semantic-search",label="Semantic Search Model") |
|
ui_weaviate_url=gr.Textbox(placeholder="Weaviate URL, https://weaviate.xxx",label="Weaviate URL", type="password") |
|
|
|
ui_chatbot = gr.Chatbot([], elem_id="chatbot").style(height=450) |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=0.2, min_width=0): |
|
ui_action_dropdown = gr.Dropdown( |
|
["Query","Update","Delete","Get Object ID"], |
|
label="Action Type" |
|
) |
|
with gr.Column(scale=0.2, min_width=0): |
|
ui_product_dropdown = gr.Dropdown( |
|
[], |
|
interactive=False, |
|
label="Select Product" |
|
) |
|
with gr.Column(scale=0.6): |
|
ui_search_text = gr.Textbox( |
|
show_label=False, |
|
|
|
placeholder="Message me, I am your migration assistance", |
|
) |
|
|
|
ui_upload_excel = gr.UploadButton("Upload Mapping File", file_types=["*.xlsx"]) |
|
ui_download_excel = gr.File(label="Download Recommendations", interactive=False, visible=False) |
|
|
|
|
|
ui_action_dropdown.change( |
|
fn=update_global_variables, |
|
inputs=[ui_action_dropdown, ui_model_name,ui_weaviate_url,ui_chatbot,ui_download_excel, ui_upload_excel], |
|
outputs=[ui_chatbot,ui_product_dropdown,ui_download_excel, ui_upload_excel] |
|
) |
|
|
|
try: |
|
|
|
ui_search_text.submit(fn=text_search, |
|
inputs=[ui_action_dropdown, ui_product_dropdown, ui_search_text, ui_chatbot], |
|
outputs=[ui_chatbot] |
|
) |
|
except Exception as e: |
|
ui_chatbot.append((None,"<b style='color:red'>Exception Searching "+str(e)+"</b>")) |
|
|
|
try: |
|
|
|
ui_upload_excel.upload(fn=excel_file_search, |
|
inputs=[ui_product_dropdown, ui_upload_excel, ui_chatbot], |
|
outputs=[ui_chatbot,ui_download_excel] |
|
) |
|
except Exception as e: |
|
ui_chatbot.append((None,"<b style='color:red'>Exception Searching Excel "+str(e)+"</b>")) |
|
|
|
demo.launch(server_name="0.0.0.0",server_port=8080) |
|
|
|
|
|
if __name__ == '__main__': |
|
main() |