Spaces:
Sleeping
Sleeping
import openai | |
from langchain_community.document_loaders import PyPDFDirectoryLoader | |
from langchain_text_splitters import RecursiveCharacterTextSplitter | |
from langchain_openai import ChatOpenAI, OpenAIEmbeddings | |
from langchain_chroma import Chroma | |
import chromadb | |
import uuid | |
from docx import Document | |
from datetime import datetime | |
from docx.shared import Pt, RGBColor | |
from docx import Document | |
from docx.shared import Pt, RGBColor | |
from docx.oxml import OxmlElement, ns | |
from datetime import datetime | |
import os | |
import re | |
import uuid | |
import tempfile | |
import shutil | |
import time | |
import gradio as gr | |
# Panel Kontrolny # | |
HFS_vs_GoogleColab = 1 | |
# Access | |
if HFS_vs_GoogleColab == 0: | |
from google.colab import drive | |
drive.mount('/content/drive') | |
class CFG: | |
BASE_PATH = r'/content/drive/MyDrive/Colab Notebooks/MyProjects/Asystent_Analityka/' if HFS_vs_GoogleColab == 0 else "./" | |
nazwa_projektu_HF = "asystent_maklera" # zrobić automatyczny przełącznik GC vs HFS - asystent vs chatbot | |
rola = "Jesteś asystentem maklera" | |
kolekcja_bd = "pkobp" | |
# jeszcze można dodać nazwy i opisy Interface i ChatBota | |
model_llm = "gpt-4o-mini" # gpt-4o-mini, gpt-4o, o1-mini, gpt-4o, claude-3-opus-20240229, speakleash/Bielik-11B-v2.3-Instruct | |
temperature = 0.6 # od 0.1 do 0.6 | |
model_embeddings = "text-embedding-3-small" # "text-embedding-ada-002", text-embedding-3-small, text-embedding-3-large, ipipan/silver-retriever-base-v1.1 (razem z Bielkiem) | |
dimensions_embeddings = 1536 | |
chunk_size = 3200 # / 500 / 1500 | |
chunk_overlap = 500 # / 200 / 100 / 35 / 200 | |
# No_ReRanking SMALL | |
retriever_num_base_results = 5 | |
reranked_num_results = 3 | |
if HFS_vs_GoogleColab == 1: | |
# Hugging Face Secrets | |
openai_api_key = os.getenv("OPENAI_API_KEY") | |
openai.api_key = openai_api_key | |
# na HFS dodać do Secrets w Settings | |
else: | |
# Google Colab Secrets | |
from google.colab import userdata | |
os.environ["OPENAI_API_KEY"] = userdata.get('Elephant-key') | |
# client = OpenAI(api_key = userdata.get('Elephant-key')) | |
# client = Anthropic(api_key = userdata.get('anthropic-key')) | |
# Ścieżki do konkretnych katalogów dla konkretnych spółek | |
DATA_PATH = os.path.join(CFG.BASE_PATH, f"data_{CFG.kolekcja_bd}") | |
CHROMA_PATH = os.path.join(CFG.BASE_PATH, f"chroma_db_{CFG.kolekcja_bd}") | |
TEMP_PATH = os.path.join(CFG.BASE_PATH, f"answers_{CFG.kolekcja_bd}") | |
# Create the DATA directory if it doesn't exist | |
os.makedirs(DATA_PATH, exist_ok=True) | |
# Create the CHROMA directory if it doesn't exist | |
os.makedirs(CHROMA_PATH, exist_ok=True) | |
# Create the TEMP directory if it doesn't exist | |
os.makedirs(TEMP_PATH, exist_ok=True) | |
def initiate_embeding_model(model_embeddings=CFG.model_embeddings, model_dimensions=CFG.dimensions_embeddings): | |
# initiate the embeddings model | |
embeddings_model = OpenAIEmbeddings( | |
model = model_embeddings, | |
dimensions =model_dimensions | |
) | |
return embeddings_model | |
embeddings_model = initiate_embeding_model(CFG.model_embeddings, CFG.dimensions_embeddings) | |
# !!! | |
# To pewnie należy zmienić, żeby najpierw sprawdzał, czy istnieje taka baza - czyli żeby tylko inicjował ją a nie tworzył | |
# !!! | |
def create_vector_store(embeddings_model, CHROMA_PATH): | |
# Tworzenie pustej bazy ChromaDB - (!) natywnie, nie przez LangChain | |
client = chromadb.PersistentClient(path=CHROMA_PATH) | |
# initiate the vector store | |
vector_store = Chroma( | |
embedding_function = embeddings_model, | |
persist_directory = CHROMA_PATH, | |
) | |
# Sprawdzenie, czy baza jest pusta (powinna być) | |
print("Dostępne kolekcje:", client.list_collections()) | |
return vector_store, client | |
vector_store, client = create_vector_store(embeddings_model, CHROMA_PATH) | |
def read_data_from_chroma(collection_name): | |
# Set up the vectorstore to be the retriever | |
# 📌 1. Inicjalizacja kolekcji | |
vector_store = Chroma( | |
collection_name=collection_name, | |
embedding_function=embeddings_model, | |
persist_directory=CHROMA_PATH, | |
) | |
# Określenie liczby chunków zwracanych przez retriever | |
base_num_results = CFG.retriever_num_base_results | |
# Chroma nie obsługuje rerankingu, więc nie działa "fetch_k" i trzeba albo samemu zrobić reranking | |
# przy pomocy LLM (poniżej) albo użyć jakieś inne biblioteki robiącej to automatycznie (MultiQueryRetriever) | |
#rr_num_results = CFG.reranked_num_results | |
retriever = vector_store.as_retriever(search_kwargs={"k": base_num_results}) | |
return retriever | |
# usatw kolekcję do odczytu | |
collection_name = CFG.kolekcja_bd | |
retriever = read_data_from_chroma(collection_name) | |
# initiate the model | |
llm = ChatOpenAI(temperature=CFG.temperature, model=CFG.model_llm) | |
def response(query, historia=None): | |
# NO_ReRanking chunków | |
relevant_chunks = retriever.invoke(query) | |
# add all the chunks to 'knowledge' | |
knowledge = "" | |
sources_markdown = "" | |
zrodla = "" # ✅ Źródła dla pliku Word | |
cytaty = "" | |
nazwa_projektu = CFG.nazwa_projektu_HF | |
if HFS_vs_GoogleColab == 1: | |
# Dynamiczny link do GFS | |
# Pobranie nazwy aktywnego projektu Spaces (jeśli uruchomiony na HF) | |
HF_SPACE_ID = os.getenv("SPACE_ID", "smolinski/test") # Domyślnie "test", jeśli brak zmiennej | |
# Tworzenie dynamicznego BASE_URL dla aktualnego projektu | |
BASE_URL = f"https://huggingface.co/spaces/{HF_SPACE_ID}/resolve/main/data_{CFG.kolekcja_bd}/" | |
else: | |
BASE_URL = DATA_PATH + "/" | |
for relevant_chunk in relevant_chunks: | |
# Tworzenie linku do źródła | |
full_source = relevant_chunk.metadata.get("source", "Nieznane źródło") # Pobiera źródło | |
file_name = os.path.basename(full_source) # Usuwa ścieżkę, zostawia nazwę pliku | |
file_link = f"{BASE_URL}{file_name}" | |
file_link = file_link.replace(' ', '%20') # na wypadek spacji w nazwie – kodujemy je do URL | |
#page_number = relevant_chunk.metadata.get("page_number", "nieznana strona") # Pobranie numeru strony | |
page_number_raw = relevant_chunk.metadata.get("page_number", None) | |
try: | |
page_number = int(page_number_raw) + 1 | |
page_number = str(page_number) | |
except (ValueError, TypeError): | |
page_number = "nieznana strona" | |
# ✅ Linki zapisane w Markdownie dla Gradio | |
sources_markdown += f"\n- {file_name}, str. {page_number}: [otwórz]({file_link})" | |
# ✅ Chunki w osobnej zmiennej | |
cytaty += f"Cytat z {file_name}, strona {page_number}:\n\n{relevant_chunk.page_content}\n\n---\n\n" | |
knowledge += relevant_chunk.page_content + "\n\n---\n\n" | |
# print(cytaty) | |
# dodajemy historię do prompta (jeśli istnieje) | |
historia_text = "" | |
if historia: | |
for i, (q, a) in enumerate(historia[-5:], 1): | |
historia_text += f"\nPoprzednia rozmowa {i}:\nPytanie: {q}\nOdpowiedź: {a}\n" | |
# make the call to the LLM (including prompt) | |
if query is not None: | |
rag_prompt = f""" | |
{CFG.rola}, który szczegółowo i dokładnie odpowiada na pytania w oparciu o przekazaną wiedzę. | |
Dziel się wszystkimi posiadanymi informacjami na dany temat. | |
Na pytania o aktualne notowania odpowiadaj: Aktualne notowania dostępne są na stronie GPW. | |
Do podkreślenia lub wypunktowania najważniejszych rzeczy używaj pogrubionej czcionki, a do ciekawostek i dodatkowych rzeczy kursywy. | |
Podczas udzielania odpowiedzi korzystaj wyłącznie z poniższych informacji zawartych w sekcji „Wiedza”. | |
Bądź miły i uprzejmy, ale rzeczowy. Przykładaj większą wagę do nowszych informacji. | |
Jeśli pytanie jest zbyt ogólne nie odpowiadaj na nie, lecz poproś o doprecyzowanie. | |
Jeśli nie znasz odpowiedzi, napisz: Niestety nie posiadam informacji na ten temat. NIE WYMYŚLAJ NICZEGO.\n\n | |
{historia_text} | |
Pytanie: {query} | |
Wiedza:\n {knowledge} | |
""" | |
# the response to the Gradio App | |
response = llm(rag_prompt) | |
return response.content if response and response.content else "Brak odpowiedzi.", sources_markdown, cytaty | |
def zaktualizuj_historie(pytanie, odpowiedz, historia): | |
historia.append((pytanie, odpowiedz)) | |
return historia[-5:] | |
def wyczysc_formularz(): | |
return "", "", "", [] | |
# ✅ Funkcja do dodania cytatów do odpowiedzi | |
def dodaj_cytaty(odpowiedz, cytaty): | |
return f"{odpowiedz}\n\n---\n**Cytaty ze śródeł**\n---\n\n{cytaty}" if cytaty else odpowiedz | |
# 1. Inicjalizacja unikalnego folderu sesji | |
def init_user_session(): | |
cleanup_old_sessions(base_path=tempfile.gettempdir(), max_age_days=1) | |
session_id = str(uuid.uuid4()) | |
user_temp_dir = os.path.join(tempfile.gettempdir(), f"asystent_{session_id}") | |
os.makedirs(user_temp_dir, exist_ok=True) | |
return user_temp_dir | |
# 2. Czyszczenie katalogów starszych niż 1 dzień | |
def cleanup_old_sessions(base_path, max_age_days=1): | |
now = time.time() | |
for folder in os.listdir(base_path): | |
if folder.startswith("asystent_"): | |
folder_path = os.path.join(base_path, folder) | |
if os.path.isdir(folder_path): | |
folder_age = now - os.path.getctime(folder_path) | |
if folder_age > max_age_days * 86400: | |
shutil.rmtree(folder_path) | |
print(f"Usunięto stary folder sesji: {folder_path}") | |
# 3. Zapis odpowiedzi do pliku .docx | |
def zapisz_odpowiedz(odpowiedz, pytanie, sources, user_path): | |
if not odpowiedz or odpowiedz.strip() == "" or not pytanie.strip(): | |
print("Błąd: Odpowiedź lub pytanie są puste!") | |
return None | |
date_str = datetime.now().strftime("%Y-%m-%d") | |
file_name = "".join(c if c.isalnum() or c in (" ", "_", "-") else "_" for c in pytanie)[:50] | |
file_path = os.path.join(user_path, f"{file_name}_{date_str}.docx") | |
try: | |
doc = Document() | |
def formatuj_naglowek(paragraph, text, font_size=14, color=(0, 0, 0), bold=True): | |
run = paragraph.add_run(text) | |
run.bold = bold | |
run.font.size = Pt(font_size) | |
run.font.color.rgb = RGBColor(*color) | |
run.font.name = "Calibri" | |
paragraph.paragraph_format.line_spacing = 1.25 | |
paragraph.paragraph_format.space_before = Pt(5) | |
paragraph.paragraph_format.space_after = Pt(0) | |
def formatuj_paragraf(paragraph): | |
for run in paragraph.runs: | |
run.font.name = "Calibri" | |
run.font.size = Pt(12) | |
paragraph.paragraph_format.line_spacing = 1.25 | |
paragraph.paragraph_format.space_before = Pt(0) | |
paragraph.paragraph_format.space_after = Pt(5) | |
def dodaj_markdown_tekst(doc, text): | |
lines = text.splitlines() | |
for line in lines: | |
# Lista wypunktowana | |
if line.strip().startswith(("- ", "* ")): | |
para = doc.add_paragraph(style='List Bullet') | |
content = line.strip()[2:] | |
# Cytat | |
elif line.strip().startswith("> "): | |
para = doc.add_paragraph() | |
para.paragraph_format.left_indent = Pt(20) | |
content = line.strip()[2:] | |
else: | |
para = doc.add_paragraph() | |
content = line | |
# Markdown inline: **bold**, *italic*, ~~strikethrough~~ | |
pattern = r"(\*\*.*?\*\*|\*.*?\*|~~.*?~~|[^*~]+)" | |
parts = re.findall(pattern, content) | |
for part in parts: | |
clean = part.replace("**", "").replace("*", "").replace("~~", "") | |
run = para.add_run(clean) | |
if part.startswith("**") and part.endswith("**"): | |
run.bold = True | |
elif part.startswith("*") and part.endswith("*"): | |
run.italic = True | |
elif part.startswith("~~") and part.endswith("~~"): | |
run.font.strike = True | |
run.font.name = "Calibri" | |
run.font.size = Pt(12) | |
p1 = doc.add_paragraph() | |
formatuj_naglowek(p1, "Pytanie:") | |
p1 = doc.add_paragraph(pytanie) | |
formatuj_paragraf(p1) | |
doc.add_paragraph(" ") | |
p2 = doc.add_paragraph() | |
formatuj_naglowek(p2, "Odpowiedź:") | |
dodaj_markdown_tekst(doc, odpowiedz) | |
doc.add_paragraph(" ") | |
if sources and sources.strip(): | |
p3 = doc.add_paragraph() | |
formatuj_naglowek(p3, "Źródła:") | |
p3 = doc.add_paragraph(re.sub(r":.*", "", sources)) | |
formatuj_paragraf(p3) | |
doc.save(file_path) | |
print(f"Plik zapisany: {file_path}") | |
return file_path if os.path.exists(file_path) else None | |
except Exception as e: | |
print(f"Błąd podczas zapisu pliku: {e}") | |
return None | |
# 4. Lista plików użytkownika | |
def lista_plikow(user_path): | |
pliki = [os.path.join(user_path, f) for f in os.listdir(user_path) if f.endswith(".docx")] | |
pliki.sort(key=os.path.getctime, reverse=True) | |
return pliki if pliki else None | |
# 5. Czyszczenie folderu użytkownika | |
def wyczysc_folder(user_path): | |
if os.path.exists(user_path): | |
shutil.rmtree(user_path) | |
# call this function for every message added to the chatbot | |
def stream_response(query, history): | |
"""Obsługuje strumieniowanie i poprawnie czyści pole tekstowe.""" | |
history = history or [] # Inicjalizacja pustej historii, jeśli brak danych | |
# Pobranie pasujących fragmentów wiedzy | |
relevant_chunks = retriever.invoke(query) | |
knowledge = "\n\n---\n\n".join([relevant_chunk.page_content for relevant_chunk in relevant_chunks]) | |
# Tworzenie promptu dla modelu LLM | |
rag_prompt = f""" | |
{CFG.rola}, który szczegółowo i dokładnie odpowiada na pytania w oparciu o przekazaną wiedzę. | |
Dziel się wszystkimi posiadanymi informacjami na dany temat, tak by Twoje odpowiedzi były wyczerpujące. | |
Na pytania o aktualne notowania odpowiadaj: Aktualne notowania dostępne są na stronie GPW. | |
Do podkreślenia lub wypunktowania najważniejszych rzeczy używaj pogrubionej czcionki, a do ciekawostek i dodatkowych rzeczy kursywy. | |
Podczas udzielania odpowiedzi korzystaj wyłącznie z poniższych informacji zawartych w sekcji „Wiedza”. | |
Bądź miły i uprzejmy, ale rzeczowy. Przykładaj większą wagę do nowszych informacji. | |
Jeśli pytanie jest zbyt ogólne nie odpowiadaj na nie, lecz poproś o doprecyzowanie. | |
Jeśli nie znasz odpowiedzi, napisz: Niestety nie posiadam informacji na ten temat. NIE WYMYŚLAJ NICZEGO.\n\n | |
Pytanie: {query}\n\n | |
Historia rozmowy:\n {history} | |
Wiedza:\n {knowledge} | |
""" | |
print("Prompt:\n", rag_prompt) | |
print("Odpowiedź:") | |
# Strumieniowanie odpowiedzi do Gradio | |
partial_message = "" | |
for response in llm.stream(rag_prompt): | |
partial_message += response.content | |
yield history + [(query, partial_message)], query # **Tymczasowo zwracamy query, by pole nie było puste** | |
# Po zakończeniu strumieniowania dodajemy pełną wiadomość do historii i czyścimy input_text | |
history.append((query, partial_message)) | |
yield history, "" # **Finalnie zwracamy pusty string, by wyczyścić pole tekstowe** | |
with gr.Blocks(css=""" | |
.button_wyczysc-color { | |
background-color: #A9A9A9 !important; | |
color: white !important; | |
} | |
#markdown_odpowiedz { | |
border: 1px solid #ccc; | |
border-radius: 6px; | |
padding: 12px; | |
background-color: #f9f9f9; | |
margin-top: 6px; | |
min-height: 8em; | |
} | |
""") as gui: | |
session_dir = gr.State(value=init_user_session) | |
historia_formularza = gr.State([]) | |
gr.Markdown("# Asystent Maklera") | |
gr.Markdown("### Odpowiadam na pytania dotyczące Biura Maklerskiego.") | |
gr.Markdown("###### Pamiętaj jestem tylko chatbotem i czasami się mylę, a moje odpowiedzi nie mogą być traktowane jako rekomendacje inwestycyjne!") | |
with gr.Tabs(): | |
# ChatBot | |
with gr.TabItem("💬 Chat"): | |
chatbot = gr.Chatbot() | |
input_text_chat = gr.Textbox(placeholder="Napisz tutaj pytanie...", container=False, autoscroll=True, scale=7) | |
input_text_chat.submit(fn=stream_response, inputs=[input_text_chat, chatbot], outputs=[chatbot, input_text_chat]) | |
# Formularz | |
with gr.TabItem("📝 Formularz"): | |
input_text_form = gr.Textbox(label="Zadaj pytanie:", placeholder="Napisz tutaj pytanie...", lines=2, interactive=True) | |
with gr.Row(): | |
with gr.Column(scale=1): | |
submit_button = gr.Button("Wyślij pytanie") | |
with gr.Column(scale=1): | |
clear_answer_button = gr.Button("Wyczyść formularz", elem_classes="button_wyczysc-color") | |
with gr.Column(scale=7): | |
gr.Markdown("") | |
gr.Markdown("### Odpowiedź:") | |
output_answer = gr.Markdown( | |
value="", | |
elem_id="markdown_odpowiedz" | |
) | |
output_cytaty = gr.State("") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
zacytuj_button = gr.Button("Przytocz źródła") | |
with gr.Column(scale=8): | |
gr.Markdown("") | |
gr.Markdown("### Źródła:") | |
output_sources = gr.Markdown() | |
gr.Markdown("### Pobierz odpowiedzi:") | |
download_files = gr.File(label="Pliki do pobrania", interactive=False, file_types=[".docx"]) | |
# Logika przycisków | |
submit_button.click( | |
response, | |
inputs=[input_text_form, historia_formularza], | |
outputs=[output_answer, output_sources, output_cytaty] | |
).then( | |
zaktualizuj_historie, | |
inputs=[input_text_form, output_answer, historia_formularza], | |
outputs=historia_formularza | |
).then( | |
zapisz_odpowiedz, | |
inputs=[output_answer, input_text_form, output_sources, session_dir], | |
outputs=None | |
).then( | |
lista_plikow, | |
inputs=session_dir, | |
outputs=download_files | |
) | |
clear_answer_button.click( | |
wyczysc_formularz, | |
inputs=[], | |
outputs=[output_answer, input_text_form, output_sources, historia_formularza] | |
) | |
zacytuj_button.click( | |
dodaj_cytaty, | |
inputs=[output_answer, output_cytaty], | |
outputs=output_answer | |
).then( | |
zapisz_odpowiedz, | |
inputs=[output_answer, input_text_form, output_sources, session_dir], | |
outputs=None | |
).then( | |
lista_plikow, | |
inputs=session_dir, | |
outputs=download_files | |
) | |
gui.launch() |