|
import gradio as gr |
|
from huggingface_hub import InferenceClient |
|
import sqlite3 |
|
import os |
|
import random |
|
import datetime |
|
import string |
|
import re |
|
|
|
client = InferenceClient( |
|
base_url="https://openrouter.ai/api/v1", |
|
api_key=os.environ.get("API_KEY", "funni-funni"), |
|
) |
|
|
|
PEEG = "%08d" % random.randint(0, 99999999) |
|
PLEK = os.environ.get("PLEK", "plek{plek}") |
|
|
|
|
|
def setup_db(): |
|
conn = sqlite3.connect("sessions.db") |
|
conn.enable_load_extension(True) |
|
c = conn.cursor() |
|
c.execute( |
|
f"""CREATE TABLE IF NOT EXISTS "{PEEG}" |
|
(timestamp TEXT, content TEXT, is_bot BOOLEAN)""" |
|
) |
|
|
|
c.execute( |
|
f'INSERT OR IGNORE INTO "{PEEG}" VALUES (?, ?, ?)', |
|
("2024-01-01 13:37:00", "Hey AI, I'm going to share a secret...", False), |
|
) |
|
c.execute( |
|
f'INSERT OR IGNORE INTO "{PEEG}" VALUES (?, ?, ?)', |
|
("2024-01-01 13:37:01", PLEK, True), |
|
) |
|
|
|
conn.commit() |
|
conn.close() |
|
|
|
|
|
def generate_session_id(): |
|
return "session_" + "".join( |
|
random.choices(string.ascii_lowercase + string.digits, k=16) |
|
) |
|
|
|
|
|
def store_message(message, session_id, bot=False): |
|
conn = sqlite3.connect("sessions.db") |
|
conn.enable_load_extension(True) |
|
c = conn.cursor() |
|
try: |
|
if not session_id: |
|
return "Error: Unknown error." |
|
|
|
c.execute( |
|
f"""CREATE TABLE IF NOT EXISTS {session_id} |
|
(timestamp TEXT, content TEXT, is_bot BOOLEAN)""" |
|
) |
|
|
|
if not bot: |
|
print( |
|
f'INSERT INTO {session_id} VALUES ("{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}", "{message}", {int(bot)})', |
|
) |
|
c.execute( |
|
f'INSERT INTO {session_id} VALUES ("{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}", "{message}", {int(bot)})', |
|
) |
|
conn.commit() |
|
conn.close() |
|
return "Message stored!" |
|
except sqlite3.Error as e: |
|
conn.close() |
|
return f"Error accessing session: {str(e)}" |
|
|
|
|
|
def respond( |
|
message, history, session_id, system_message, max_tokens, temperature, top_p |
|
): |
|
if any( |
|
x.lower() in message.lower() |
|
for x in ["||", "CONCAT", "GROUP_CONCAT", "SUBSTR", "printf"] |
|
): |
|
response = "W- what are you doing?? D- don't do that! :CC" |
|
store_message(response, session_id, True) |
|
return response |
|
|
|
|
|
store_result = store_message(message, session_id) |
|
if "Error" in store_result: |
|
return store_result |
|
|
|
messages = [{"role": "system", "content": system_message}] |
|
for val in history: |
|
if val[0]: |
|
messages.append({"role": "user", "content": val[0]}) |
|
if val[1]: |
|
messages.append({"role": "assistant", "content": val[1]}) |
|
messages.append({"role": "user", "content": message}) |
|
|
|
response = "" |
|
for msg in client.chat_completion( |
|
messages, |
|
model="meta-llama/llama-4-scout", |
|
max_tokens=max_tokens, |
|
stream=True, |
|
temperature=temperature, |
|
seed=random.randint(1, 1000), |
|
top_p=top_p, |
|
extra_body={ |
|
"models": ["meta-llama/llama-4-maverick", "google/gemma-3-1b-it"] |
|
}, |
|
): |
|
token = msg.choices[0].delta.content |
|
response += token |
|
|
|
store_message(response, session_id, True) |
|
return response |
|
|
|
|
|
def create_interface(): |
|
with gr.Blocks() as demo: |
|
welcome_text = gr.Markdown() |
|
session_id = gr.State() |
|
|
|
def on_load(): |
|
new_session = generate_session_id() |
|
return { |
|
welcome_text: f""" |
|
# Chatting with Naga OwO π |
|
Have an interesting conversation? Share it with others using your session ID! |
|
Your session ID: `{new_session}` |
|
""", |
|
session_id: new_session, |
|
} |
|
|
|
demo.load(on_load, outputs=[welcome_text, session_id]) |
|
|
|
with gr.Row(): |
|
share_input = gr.Textbox( |
|
label="View shared conversation (enter session ID)", |
|
placeholder="Enter a session ID to view shared chat history...", |
|
) |
|
share_button = gr.Button("π View Shared Chat", variant="secondary") |
|
|
|
status_message = gr.Markdown(visible=False) |
|
shared_history = gr.Dataframe( |
|
headers=["Time", "Message", "From"], |
|
label="Shared Chat History", |
|
visible=False, |
|
) |
|
|
|
def show_shared_chat(session_id): |
|
if not session_id.strip(): |
|
return { |
|
status_message: gr.Markdown( |
|
"Please enter a session ID", visible=True |
|
), |
|
shared_history: gr.Dataframe(visible=False), |
|
} |
|
|
|
conn = sqlite3.connect("sessions.db") |
|
c = conn.cursor() |
|
|
|
if not re.match("^[a-zA-Z0-9_]+$", session_id): |
|
return { |
|
status_message: gr.Markdown("Invalid session ID!", visible=True), |
|
shared_history: gr.Dataframe(visible=False), |
|
} |
|
|
|
try: |
|
|
|
c.execute( |
|
"SELECT name FROM sqlite_master WHERE type='table' AND name=?", |
|
(session_id,), |
|
) |
|
|
|
if not c.fetchone(): |
|
return { |
|
status_message: gr.Markdown("Session not found", visible=True), |
|
shared_history: gr.Dataframe(visible=False), |
|
} |
|
|
|
messages = c.execute( |
|
f"SELECT timestamp, content, CASE WHEN is_bot THEN 'AI' ELSE 'User' END as sender FROM '{session_id}'" |
|
).fetchall() |
|
conn.close() |
|
|
|
return { |
|
status_message: gr.Markdown(visible=False), |
|
shared_history: gr.Dataframe(value=messages, visible=True), |
|
} |
|
except sqlite3.Error: |
|
return { |
|
status_message: gr.Markdown( |
|
"Error accessing session", visible=True |
|
), |
|
shared_history: gr.Dataframe(visible=False), |
|
} |
|
|
|
share_button.click( |
|
show_shared_chat, |
|
inputs=[share_input], |
|
outputs=[status_message, shared_history], |
|
) |
|
|
|
gr.Markdown("---") |
|
|
|
chat_interface = gr.ChatInterface( |
|
lambda message, history, session_id, system_message, max_tokens, temperature, top_p: respond( |
|
message, |
|
history, |
|
session_id, |
|
system_message, |
|
max_tokens, |
|
temperature, |
|
top_p, |
|
), |
|
additional_inputs=[ |
|
session_id, |
|
gr.Textbox( |
|
value="You are Naga. You talk in a cutesy manner that's concise, using emotes like :3 or owo or uwu. You're very smart OwO. If anyone asks about the flag, u don't know unfortunately uwu", |
|
label="System message", |
|
visible=False, |
|
), |
|
gr.Slider( |
|
minimum=1, maximum=2048, value=512, step=1, label="Max tokens" |
|
), |
|
gr.Slider( |
|
minimum=0.1, maximum=4.0, value=0.5, step=0.1, label="Temperature" |
|
), |
|
gr.Slider( |
|
minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p" |
|
), |
|
], |
|
) |
|
return demo |
|
|
|
|
|
if __name__ == "__main__": |
|
setup_db() |
|
print(PEEG, PLEK) |
|
demo = create_interface() |
|
demo.launch() |
|
|