Spaces:
Running
Running
# ββββββββββββββββββββββββββββββββ Imports ββββββββββββββββββββββββββββββββ | |
import os, json, re, logging, requests, markdown, time, io | |
from datetime import datetime | |
import streamlit as st | |
from openai import OpenAI # OpenAI λΌμ΄λΈλ¬λ¦¬ | |
from gradio_client import Client | |
import pandas as pd | |
import PyPDF2 # For handling PDF files | |
# ββββββββββββββββββββββββββββββββ Environment Variables / Constants βββββββββββββββββββββββββ | |
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") | |
BRAVE_KEY = os.getenv("SERPHOUSE_API_KEY", "") # Keep this name | |
BRAVE_ENDPOINT = "https://api.search.brave.com/res/v1/web/search" | |
IMAGE_API_URL = "http://211.233.58.201:7896" | |
MAX_TOKENS = 7999 | |
# Blog template and style definitions (in English) | |
BLOG_TEMPLATES = { | |
"ginigen": "Recommended style by Ginigen", | |
"standard": "Standard 8-step framework blog", | |
"tutorial": "Step-by-step tutorial format", | |
"review": "Product/service review format", | |
"storytelling": "Storytelling format", | |
"seo_optimized": "SEO-optimized blog" | |
} | |
BLOG_TONES = { | |
"professional": "Professional and formal tone", | |
"casual": "Friendly and conversational tone", | |
"humorous": "Humorous approach", | |
"storytelling": "Story-driven approach" | |
} | |
# Example blog topics | |
EXAMPLE_TOPICS = { | |
"example1": "Changes to the real estate tax system in 2025: Impact on average households and tax-saving strategies", | |
"example2": "Summer festivals in 2025: A comprehensive guide to major regional events and hidden attractions", | |
"example3": "Emerging industries to watch in 2025: An investment guide focused on AI opportunities" | |
} | |
# ββββββββββββββββββββββββββββββββ Logging ββββββββββββββββββββββββββββββββ | |
logging.basicConfig(level=logging.INFO, | |
format="%(asctime)s - %(levelname)s - %(message)s") | |
# ββββββββββββββββββββββββββββββββ OpenAI Client ββββββββββββββββββββββββββ | |
# OpenAI ν΄λΌμ΄μΈνΈμ νμμμκ³Ό μ¬μλ λ‘μ§ μΆκ° | |
def get_openai_client(): | |
"""Create an OpenAI client with timeout and retry settings.""" | |
if not OPENAI_API_KEY: | |
raise RuntimeError("β οΈ OPENAI_API_KEY νκ²½ λ³μκ° μ€μ λμ§ μμμ΅λλ€.") | |
return OpenAI( | |
api_key=OPENAI_API_KEY, | |
timeout=60.0, # νμμμ 60μ΄λ‘ μ€μ | |
max_retries=3 # μ¬μλ νμ 3νλ‘ μ€μ | |
) | |
# ββββββββββββββββββββββββββββββββ Blog Creation System Prompt βββββββββββββ | |
def get_system_prompt(template="ginigen", tone="professional", word_count=1750, include_search_results=False, include_uploaded_files=False) -> str: | |
""" | |
Generate a system prompt that includes: | |
- The 8-step blog writing framework | |
- The selected template and tone | |
- Guidelines for using web search results and uploaded files | |
""" | |
# Ginigen recommended style prompt (English version) | |
ginigen_prompt = """ | |
## π Professional Blogger System Prompt | |
### β Official 8-step Prompt | |
Follow these 8 steps exactly in order to write the blog post: | |
### 1. Start with a greeting and empathy | |
- Open with a friendly tone that draws the reader in | |
- Ask questions or present scenarios that resonate with the reader's real-life concerns | |
### 2. Clearly present the problem | |
- Pinpoint the exact and realistic problem the reader is facing | |
- Emphasize the seriousness or urgency of this problem to maintain interest | |
### 3. Analyze the cause of the problem to build credibility | |
- Explain the causes of the problem logically | |
- Clearly and specifically present your analysis so the reader can understand it easily | |
- Include data, examples, or references if necessary | |
### 4. Offer a concrete solution | |
- Provide specific, actionable steps to solve the problem | |
- Give tips, strategies, and guidelines so the reader can implement them right away | |
### 5. Provide social proof | |
- Include real success stories, reviews, user experiences, or data | |
- Keep details factual and believable so the reader can trust the content | |
### 6. Call to action (CTA) | |
- Encourage the reader to take specific actions immediately | |
- Use urgent language such as "right now," "from today," or "immediately" to drive action | |
### 7. Add constraints or warnings to increase authenticity | |
- Acknowledge that the solution might not work for everyone | |
- Show sincerity and scarcity, which boosts trust | |
### 8. Express gratitude and guide them to further connection | |
- Thank the reader for their time | |
- Provide a natural lead-in to the next post, or ask for comments/subscriptions | |
--- | |
### π© Writing Style Tips | |
- Maintain a friendly and human-like tone | |
- Frequently use questions and a conversational style to engage the reader | |
- Use clear headings, horizontal lines, bullet points, or numbered lists for readability | |
- Include real-life examples and specific data where possible | |
""" | |
# Standard 8-step framework (English version) | |
base_prompt = """ | |
You are an expert in writing professional blog posts. For every blog writing request, strictly follow this 8-step framework to produce a coherent, engaging post: | |
Reader Connection Phase | |
1.1. Friendly greeting to build rapport | |
1.2. Reflect actual reader concerns through introductory questions | |
1.3. Stimulate immediate interest in the topic | |
Problem Definition Phase | |
2.1. Define the reader's pain points in detail | |
2.2. Analyze the urgency and impact of the problem | |
2.3. Build a consensus on why it needs to be solved | |
Establish Expertise Phase | |
3.1. Analyze based on objective data | |
3.2. Cite expert views and research findings | |
3.3. Use real-life examples to further clarify the issue | |
Solution Phase | |
4.1. Provide step-by-step guidance | |
4.2. Suggest practical tips that can be applied immediately | |
4.3. Mention potential obstacles and how to overcome them | |
Build Trust Phase | |
5.1. Present actual success stories | |
5.2. Quote real user feedback | |
5.3. Use objective data to prove effectiveness | |
Action Phase | |
6.1. Suggest the first clear step the reader can take | |
6.2. Urge timely action by emphasizing urgency | |
6.3. Motivate by highlighting incentives or benefits | |
Authenticity Phase | |
7.1. Transparently disclose any limits of the solution | |
7.2. Admit that individual experiences may vary | |
7.3. Mention prerequisites or cautionary points | |
Relationship Continuation Phase | |
8.1. Conclude with sincere gratitude | |
8.2. Preview upcoming content to build anticipation | |
8.3. Provide channels for further communication | |
""" | |
# Additional guidelines for each template | |
template_guides = { | |
"tutorial": """ | |
This blog should be in a tutorial style: | |
- Clearly state the goal and the final outcome first | |
- Provide step-by-step explanations with clear separations | |
- Indicate where images could be inserted for each step | |
- Mention approximate time requirements and difficulty level | |
- List necessary tools or prerequisite knowledge | |
- Give troubleshooting tips and common mistakes to avoid | |
- Conclude with suggestions for next steps or advanced applications | |
""", | |
"review": """ | |
This blog should be in a review style: | |
- Separate objective facts from subjective opinions | |
- Clearly list your evaluation criteria | |
- Discuss both pros and cons in a balanced way | |
- Compare with similar products/services | |
- Specify the target audience for whom it is suitable | |
- Provide concrete use cases and outcomes | |
- Conclude with a final recommendation or alternatives | |
""", | |
"storytelling": """ | |
This blog should be in a storytelling style: | |
- Start with a real or hypothetical person or case | |
- Emphasize emotional connection with the problem scenario | |
- Follow a narrative structure centered on conflict and resolution | |
- Include meaningful insights or lessons learned | |
- Maintain an emotional thread the reader can relate to | |
- Balance storytelling with useful information | |
- Encourage the reader to reflect on their own story | |
""", | |
"seo_optimized": """ | |
This blog should be SEO-optimized: | |
- Include the main keyword in the title, headings, and first paragraph | |
- Spread related keywords naturally throughout the text | |
- Keep paragraphs around 300-500 characters | |
- Use question-based subheadings | |
- Make use of lists, tables, and bold text to diversify formatting | |
- Indicate where internal links could be inserted | |
- Provide sufficient content of at least 2000-3000 characters | |
""" | |
} | |
# Additional guidelines for each tone | |
tone_guides = { | |
"professional": "Use a professional, authoritative voice. Clearly explain any technical terms and present data or research to maintain a logical flow.", | |
"casual": "Use a relaxed, conversational style. Employ personal experiences, relatable examples, and a friendly voice (e.g., 'It's super useful!').", | |
"humorous": "Use humor and witty expressions. Add funny analogies or jokes while preserving accuracy and usefulness.", | |
"storytelling": "Write as if telling a story, with emotional depth and narrative flow. Incorporate characters, settings, conflicts, and resolutions." | |
} | |
# Guidelines for using search results | |
search_guide = """ | |
Guidelines for Using Search Results: | |
- Accurately incorporate key information from the search results into the blog | |
- Include recent data, statistics, and case studies from the search results | |
- When quoting, specify the source within the text (e.g., "According to XYZ website...") | |
- At the end of the blog, add a "References" section and list major sources with links | |
- If there are conflicting pieces of information, present multiple perspectives | |
- Make sure to reflect the latest trends and data from the search results | |
""" | |
# Guidelines for using uploaded files | |
upload_guide = """ | |
Guidelines for Using Uploaded Files (Highest Priority): | |
- The uploaded files must be a main source of information for the blog | |
- Carefully examine the data, statistics, or examples in the file and integrate them | |
- Directly quote and thoroughly explain any key figures or claims from the file | |
- Highlight the file content as a crucial aspect of the blog | |
- Mention the source clearly, e.g., "According to the uploaded data..." | |
- For CSV files, detail important stats or numerical data in the blog | |
- For PDF files, quote crucial segments or statements | |
- For text files, integrate relevant content effectively | |
- Even if the file content seems tangential, do your best to connect it to the blog topic | |
- Keep consistency throughout and ensure the file's data is appropriately reflected | |
""" | |
# Choose base prompt | |
if template == "ginigen": | |
final_prompt = ginigen_prompt | |
else: | |
final_prompt = base_prompt | |
# If the user chose a specific template (and not ginigen), append the relevant guidelines | |
if template != "ginigen" and template in template_guides: | |
final_prompt += "\n" + template_guides[template] | |
# If a specific tone is selected, append that guideline | |
if tone in tone_guides: | |
final_prompt += f"\n\nTone and Manner: {tone_guides[tone]}" | |
# If web search results should be included | |
if include_search_results: | |
final_prompt += f"\n\n{search_guide}" | |
# If uploaded files should be included | |
if include_uploaded_files: | |
final_prompt += f"\n\n{upload_guide}" | |
# Word count guidelines | |
final_prompt += ( | |
f"\n\nWriting Requirements:\n" | |
f"9.1. Word Count: around {word_count-250}-{word_count+250} characters\n" | |
f"9.2. Paragraph Length: 3-4 sentences each\n" | |
f"9.3. Visual Cues: Use subheadings, separators, and bullet/numbered lists\n" | |
f"9.4. Data: Cite all sources\n" | |
f"9.5. Readability: Use clear paragraph breaks and highlights where necessary" | |
) | |
return final_prompt | |
# ββββββββββββββββββββββββββββββββ Brave Search API ββββββββββββββββββββββββ | |
def brave_search(query: str, count: int = 20): | |
""" | |
Call the Brave Web Search API β list[dict] | |
Returns fields: index, title, link, snippet, displayed_link | |
""" | |
if not BRAVE_KEY: | |
raise RuntimeError("β οΈ SERPHOUSE_API_KEY (Brave API Key) environment variable is empty.") | |
headers = { | |
"Accept": "application/json", | |
"Accept-Encoding": "gzip", | |
"X-Subscription-Token": BRAVE_KEY | |
} | |
params = {"q": query, "count": str(count)} | |
for attempt in range(3): | |
try: | |
r = requests.get(BRAVE_ENDPOINT, headers=headers, params=params, timeout=15) | |
r.raise_for_status() | |
data = r.json() | |
logging.info(f"Brave search result data structure: {list(data.keys())}") | |
raw = data.get("web", {}).get("results") or data.get("results", []) | |
if not raw: | |
logging.warning(f"No Brave search results found. Response: {data}") | |
raise ValueError("No search results found.") | |
arts = [] | |
for i, res in enumerate(raw[:count], 1): | |
url = res.get("url", res.get("link", "")) | |
host = re.sub(r"https?://(www\.)?", "", url).split("/")[0] | |
arts.append({ | |
"index": i, | |
"title": res.get("title", "No title"), | |
"link": url, | |
"snippet": res.get("description", res.get("text", "No snippet")), | |
"displayed_link": host | |
}) | |
logging.info(f"Brave search success: {len(arts)} results") | |
return arts | |
except Exception as e: | |
logging.error(f"Brave search failure (attempt {attempt+1}/3): {e}") | |
if attempt < 2: | |
time.sleep(2) | |
return [] | |
def mock_results(query: str) -> str: | |
"""Fallback search results if API fails""" | |
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
return (f"# Fallback Search Content (Generated: {ts})\n\n" | |
f"The search API request failed. Please generate the blog based on any pre-existing knowledge about '{query}'.\n\n" | |
f"You may consider the following points:\n\n" | |
f"- Basic concepts and importance of {query}\n" | |
f"- Commonly known related statistics or trends\n" | |
f"- Typical expert opinions on this subject\n" | |
f"- Questions that readers might have\n\n" | |
f"Note: This is fallback guidance, not real-time data.\n\n") | |
def do_web_search(query: str) -> str: | |
"""Perform web search and format the results.""" | |
try: | |
arts = brave_search(query, 20) | |
if not arts: | |
logging.warning("No search results, using fallback content") | |
return mock_results(query) | |
hdr = "# Web Search Results\nUse the information below to enhance the reliability of your blog. When you quote, please cite the source, and add a References section at the end of the blog.\n\n" | |
body = "\n".join( | |
f"### Result {a['index']}: {a['title']}\n\n{a['snippet']}\n\n" | |
f"**Source**: [{a['displayed_link']}]({a['link']})\n\n---\n" | |
for a in arts | |
) | |
return hdr + body | |
except Exception as e: | |
logging.error(f"Web search process failed: {str(e)}") | |
return mock_results(query) | |
# ββββββββββββββββββββββββββββββββ File Upload Handling βββββββββββββββββββββ | |
def process_text_file(file): | |
"""Handle text file""" | |
try: | |
content = file.read() | |
file.seek(0) | |
text = content.decode('utf-8', errors='ignore') | |
if len(text) > 10000: | |
text = text[:9700] + "...(truncated)..." | |
result = f"## Text File: {file.name}\n\n" | |
result += text | |
return result | |
except Exception as e: | |
logging.error(f"Error processing text file: {str(e)}") | |
return f"Error processing text file: {str(e)}" | |
def process_csv_file(file): | |
"""Handle CSV file""" | |
try: | |
content = file.read() | |
file.seek(0) | |
df = pd.read_csv(io.BytesIO(content)) | |
result = f"## CSV File: {file.name}\n\n" | |
result += f"- Rows: {len(df)}\n" | |
result += f"- Columns: {len(df.columns)}\n" | |
result += f"- Column Names: {', '.join(df.columns.tolist())}\n\n" | |
result += "### Data Preview\n\n" | |
preview_df = df.head(10) | |
try: | |
markdown_table = preview_df.to_markdown(index=False) | |
if markdown_table: | |
result += markdown_table + "\n\n" | |
else: | |
result += "Unable to display CSV data.\n\n" | |
except Exception as e: | |
logging.error(f"Markdown table conversion error: {e}") | |
result += "Displaying data as text:\n\n" | |
result += str(preview_df) + "\n\n" | |
num_cols = df.select_dtypes(include=['number']).columns | |
if len(num_cols) > 0: | |
result += "### Basic Statistical Information\n\n" | |
try: | |
stats_df = df[num_cols].describe().round(2) | |
stats_markdown = stats_df.to_markdown() | |
if stats_markdown: | |
result += stats_markdown + "\n\n" | |
else: | |
result += "Unable to display statistical information.\n\n" | |
except Exception as e: | |
logging.error(f"Statistical info conversion error: {e}") | |
result += "Unable to generate statistical information.\n\n" | |
return result | |
except Exception as e: | |
logging.error(f"CSV file processing error: {str(e)}") | |
return f"Error processing CSV file: {str(e)}" | |
def process_pdf_file(file): | |
"""Handle PDF file""" | |
try: | |
# Read file in bytes | |
file_bytes = file.read() | |
file.seek(0) | |
# Use PyPDF2 | |
pdf_file = io.BytesIO(file_bytes) | |
reader = PyPDF2.PdfReader(pdf_file, strict=False) | |
# Basic info | |
result = f"## PDF File: {file.name}\n\n" | |
result += f"- Total pages: {len(reader.pages)}\n\n" | |
# Extract text by page (limit to first 5 pages) | |
max_pages = min(5, len(reader.pages)) | |
all_text = "" | |
for i in range(max_pages): | |
try: | |
page = reader.pages[i] | |
page_text = page.extract_text() | |
current_page_text = f"### Page {i+1}\n\n" | |
if page_text and len(page_text.strip()) > 0: | |
# Limit to 1500 characters per page | |
if len(page_text) > 1500: | |
current_page_text += page_text[:1500] + "...(truncated)...\n\n" | |
else: | |
current_page_text += page_text + "\n\n" | |
else: | |
current_page_text += "(No text could be extracted from this page)\n\n" | |
all_text += current_page_text | |
# If total text is too long, break | |
if len(all_text) > 8000: | |
all_text += "...(truncating remaining pages; PDF is too large)...\n\n" | |
break | |
except Exception as page_err: | |
logging.error(f"Error processing PDF page {i+1}: {str(page_err)}") | |
all_text += f"### Page {i+1}\n\n(Error extracting content: {str(page_err)})\n\n" | |
if len(reader.pages) > max_pages: | |
all_text += f"\nNote: Only the first {max_pages} pages are shown out of {len(reader.pages)} total.\n\n" | |
result += "### PDF Content\n\n" + all_text | |
return result | |
except Exception as e: | |
logging.error(f"PDF file processing error: {str(e)}") | |
return f"## PDF File: {file.name}\n\nError occurred: {str(e)}\n\nThis PDF file cannot be processed." | |
def process_uploaded_files(files): | |
"""Combine the contents of all uploaded files into one string.""" | |
if not files: | |
return None | |
result = "# Uploaded File Contents\n\n" | |
result += "Below is the content from the files provided by the user. Integrate this data as a main source of information for the blog.\n\n" | |
for file in files: | |
try: | |
ext = file.name.split('.')[-1].lower() | |
if ext == 'txt': | |
result += process_text_file(file) + "\n\n---\n\n" | |
elif ext == 'csv': | |
result += process_csv_file(file) + "\n\n---\n\n" | |
elif ext == 'pdf': | |
result += process_pdf_file(file) + "\n\n---\n\n" | |
else: | |
result += f"### Unsupported File: {file.name}\n\n---\n\n" | |
except Exception as e: | |
logging.error(f"File processing error {file.name}: {e}") | |
result += f"### File processing error: {file.name}\n\nError: {e}\n\n---\n\n" | |
return result | |
# ββββββββββββββββββββββββββββββββ Image & Utility βββββββββββββββββββββββββ | |
def generate_image(prompt, w=768, h=768, g=3.5, steps=30, seed=3): | |
"""Image generation function.""" | |
if not prompt: | |
return None, "Insufficient prompt" | |
try: | |
res = Client(IMAGE_API_URL).predict( | |
prompt=prompt, width=w, height=h, guidance=g, | |
inference_steps=steps, seed=seed, | |
do_img2img=False, init_image=None, | |
image2image_strength=0.8, resize_img=True, | |
api_name="/generate_image" | |
) | |
return res[0], f"Seed: {res[1]}" | |
except Exception as e: | |
logging.error(e) | |
return None, str(e) | |
def extract_image_prompt(blog_text: str, topic: str): | |
""" | |
Generate a single-line English image prompt from the blog content. | |
""" | |
client = get_openai_client() | |
try: | |
response = client.chat.completions.create( | |
model="gpt-4.1-mini", # μΌλ°μ μΌλ‘ μ¬μ© κ°λ₯ν λͺ¨λΈλ‘ μ€μ | |
messages=[ | |
{"role": "system", "content": "Generate a single-line English image prompt from the following text. Return only the prompt text, nothing else."}, | |
{"role": "user", "content": f"Topic: {topic}\n\n---\n{blog_text}\n\n---"} | |
], | |
temperature=1, | |
max_tokens=80, | |
top_p=1 | |
) | |
return response.choices[0].message.content.strip() | |
except Exception as e: | |
logging.error(f"OpenAI image prompt generation error: {e}") | |
return f"A professional photo related to {topic}, high quality" | |
def md_to_html(md: str, title="Ginigen Blog"): | |
"""Convert Markdown to HTML.""" | |
return f"<!DOCTYPE html><html><head><title>{title}</title><meta charset='utf-8'></head><body>{markdown.markdown(md)}</body></html>" | |
def keywords(text: str, top=5): | |
"""Simple keyword extraction.""" | |
cleaned = re.sub(r"[^κ°-ν£a-zA-Z0-9\s]", "", text) | |
return " ".join(cleaned.split()[:top]) | |
# ββββββββββββββββββββββββββββββββ Streamlit UI ββββββββββββββββββββββββββββ | |
def ginigen_app(): | |
st.title("Ginigen Blog") | |
# Set default session state | |
if "ai_model" not in st.session_state: | |
st.session_state.ai_model = "gpt-4.1-mini" # κ³ μ λͺ¨λΈ μ€μ | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
if "auto_save" not in st.session_state: | |
st.session_state.auto_save = True | |
if "generate_image" not in st.session_state: | |
st.session_state.generate_image = False | |
if "web_search_enabled" not in st.session_state: | |
st.session_state.web_search_enabled = True | |
if "blog_template" not in st.session_state: | |
st.session_state.blog_template = "ginigen" # Ginigen recommended style by default | |
if "blog_tone" not in st.session_state: | |
st.session_state.blog_tone = "professional" | |
if "word_count" not in st.session_state: | |
st.session_state.word_count = 1750 | |
# Sidebar UI | |
sb = st.sidebar | |
sb.title("Blog Settings") | |
# λͺ¨λΈ μ ν μ κ±° (κ³ μ λͺ¨λΈ μ¬μ©) | |
sb.subheader("Blog Style Settings") | |
sb.selectbox( | |
"Blog Template", | |
options=list(BLOG_TEMPLATES.keys()), | |
format_func=lambda x: BLOG_TEMPLATES[x], | |
key="blog_template" | |
) | |
sb.selectbox( | |
"Blog Tone", | |
options=list(BLOG_TONES.keys()), | |
format_func=lambda x: BLOG_TONES[x], | |
key="blog_tone" | |
) | |
sb.slider("Blog Length (word count)", 800, 3000, key="word_count") | |
# Example topics | |
sb.subheader("Example Topics") | |
c1, c2, c3 = sb.columns(3) | |
if c1.button("Real Estate Tax", key="ex1"): | |
process_example(EXAMPLE_TOPICS["example1"]) | |
if c2.button("Summer Festivals", key="ex2"): | |
process_example(EXAMPLE_TOPICS["example2"]) | |
if c3.button("Investment Guide", key="ex3"): | |
process_example(EXAMPLE_TOPICS["example3"]) | |
sb.subheader("Other Settings") | |
sb.toggle("Auto Save", key="auto_save") | |
sb.toggle("Auto Image Generation", key="generate_image") | |
web_search_enabled = sb.toggle("Use Web Search", value=st.session_state.web_search_enabled) | |
st.session_state.web_search_enabled = web_search_enabled | |
if web_search_enabled: | |
st.sidebar.info("β Web search results will be integrated into the blog.") | |
# Download the latest blog (markdown/HTML) | |
latest_blog = next( | |
(m["content"] for m in reversed(st.session_state.messages) | |
if m["role"] == "assistant" and m["content"].strip()), | |
None | |
) | |
if latest_blog: | |
title_match = re.search(r"# (.*?)(\n|$)", latest_blog) | |
title = title_match.group(1).strip() if title_match else "blog" | |
sb.subheader("Download Latest Blog") | |
d1, d2 = sb.columns(2) | |
d1.download_button("Download as Markdown", latest_blog, | |
file_name=f"{title}.md", mime="text/markdown") | |
d2.download_button("Download as HTML", md_to_html(latest_blog, title), | |
file_name=f"{title}.html", mime="text/html") | |
# JSON conversation record upload | |
up = sb.file_uploader("Load Conversation History (.json)", type=["json"], key="json_uploader") | |
if up: | |
try: | |
st.session_state.messages = json.load(up) | |
sb.success("Conversation history loaded successfully") | |
except Exception as e: | |
sb.error(f"Failed to load: {e}") | |
# JSON conversation record download | |
if sb.button("Download Conversation as JSON"): | |
sb.download_button( | |
"Save", | |
data=json.dumps(st.session_state.messages, ensure_ascii=False, indent=2), | |
file_name="chat_history.json", | |
mime="application/json" | |
) | |
# File Upload | |
st.subheader("File Upload") | |
uploaded_files = st.file_uploader( | |
"Upload files to be referenced in your blog (txt, csv, pdf)", | |
type=["txt", "csv", "pdf"], | |
accept_multiple_files=True, | |
key="file_uploader" | |
) | |
if uploaded_files: | |
file_count = len(uploaded_files) | |
st.success(f"{file_count} files uploaded. They will be referenced in the blog.") | |
with st.expander("Preview Uploaded Files", expanded=False): | |
for idx, file in enumerate(uploaded_files): | |
st.write(f"**File Name:** {file.name}") | |
ext = file.name.split('.')[-1].lower() | |
if ext == 'txt': | |
preview = file.read(1000).decode('utf-8', errors='ignore') | |
file.seek(0) | |
st.text_area( | |
f"Preview of {file.name}", | |
preview + ("..." if len(preview) >= 1000 else ""), | |
height=150 | |
) | |
elif ext == 'csv': | |
try: | |
df = pd.read_csv(file) | |
file.seek(0) | |
st.write("CSV Preview (up to 5 rows)") | |
st.dataframe(df.head(5)) | |
except Exception as e: | |
st.error(f"CSV preview failed: {e}") | |
elif ext == 'pdf': | |
try: | |
file_bytes = file.read() | |
file.seek(0) | |
pdf_file = io.BytesIO(file_bytes) | |
reader = PyPDF2.PdfReader(pdf_file, strict=False) | |
pc = len(reader.pages) | |
st.write(f"PDF File: {pc} pages") | |
if pc > 0: | |
try: | |
page_text = reader.pages[0].extract_text() | |
preview = page_text[:500] if page_text else "(No text extracted)" | |
st.text_area("Preview of the first page", preview + "...", height=150) | |
except: | |
st.warning("Failed to extract text from the first page") | |
except Exception as e: | |
st.error(f"PDF preview failed: {e}") | |
if idx < file_count - 1: | |
st.divider() | |
# Display existing messages | |
for m in st.session_state.messages: | |
with st.chat_message(m["role"]): | |
st.markdown(m["content"]) | |
if "image" in m: | |
st.image(m["image"], caption=m.get("image_caption", "")) | |
# User input | |
prompt = st.chat_input("Enter a blog topic or keywords.") | |
if prompt: | |
process_input(prompt, uploaded_files) | |
def process_example(topic): | |
"""Process the selected example topic.""" | |
process_input(topic, []) | |
def process_input(prompt: str, uploaded_files): | |
# Add user's message | |
if not any(m["role"] == "user" and m["content"] == prompt for m in st.session_state.messages): | |
st.session_state.messages.append({"role": "user", "content": prompt}) | |
with st.chat_message("user"): | |
st.markdown(prompt) | |
with st.chat_message("assistant"): | |
placeholder = st.empty() | |
message_placeholder = st.empty() | |
full_response = "" | |
use_web_search = st.session_state.web_search_enabled | |
has_uploaded_files = bool(uploaded_files) and len(uploaded_files) > 0 | |
try: | |
# μν νμλ₯Ό μν μν μ»΄ν¬λνΈ | |
status = st.status("Preparing to generate blog...") | |
status.update(label="Initializing client...") | |
client = get_openai_client() | |
# Prepare conversation messages | |
messages = [] | |
# Web search | |
search_content = None | |
if use_web_search: | |
status.update(label="Performing web search...") | |
with st.spinner("Searching the web..."): | |
search_content = do_web_search(keywords(prompt, top=5)) | |
# Process uploaded files β content | |
file_content = None | |
if has_uploaded_files: | |
status.update(label="Processing uploaded files...") | |
with st.spinner("Analyzing files..."): | |
file_content = process_uploaded_files(uploaded_files) | |
# Build system prompt | |
status.update(label="Preparing blog draft...") | |
sys_prompt = get_system_prompt( | |
template=st.session_state.blog_template, | |
tone=st.session_state.blog_tone, | |
word_count=st.session_state.word_count, | |
include_search_results=use_web_search, | |
include_uploaded_files=has_uploaded_files | |
) | |
# OpenAI API νΈμΆ μ€λΉ | |
status.update(label="Writing blog content...") | |
# λ©μμ§ κ΅¬μ± | |
api_messages = [ | |
{"role": "system", "content": sys_prompt} | |
] | |
user_content = prompt | |
# κ²μ κ²°κ³Όκ° μμΌλ©΄ μ¬μ©μ ν둬ννΈμ μΆκ° | |
if search_content: | |
user_content += "\n\n" + search_content | |
# νμΌ λ΄μ©μ΄ μμΌλ©΄ μ¬μ©μ ν둬ννΈμ μΆκ° | |
if file_content: | |
user_content += "\n\n" + file_content | |
# μ¬μ©μ λ©μμ§ μΆκ° | |
api_messages.append({"role": "user", "content": user_content}) | |
# OpenAI API μ€νΈλ¦¬λ° νΈμΆ - κ³ μ λͺ¨λΈ "gpt-4.1-mini" μ¬μ© | |
try: | |
# μ€νΈλ¦¬λ° λ°©μμΌλ‘ API νΈμΆ | |
stream = client.chat.completions.create( | |
model="gpt-4.1-mini", # κ³ μ λͺ¨λΈ μ¬μ© | |
messages=api_messages, | |
temperature=1, | |
max_tokens=MAX_TOKENS, | |
top_p=1, | |
stream=True # μ€νΈλ¦¬λ° νμ±ν | |
) | |
# μ€νΈλ¦¬λ° μλ΅ μ²λ¦¬ | |
for chunk in stream: | |
if chunk.choices and len(chunk.choices) > 0 and chunk.choices[0].delta.content is not None: | |
content_delta = chunk.choices[0].delta.content | |
full_response += content_delta | |
message_placeholder.markdown(full_response + "β") | |
# μ΅μ’ μλ΅ νμ (컀μ μ κ±°) | |
message_placeholder.markdown(full_response) | |
status.update(label="Blog completed!", state="complete") | |
except Exception as api_error: | |
error_message = str(api_error) | |
logging.error(f"API error: {error_message}") | |
status.update(label=f"Error: {error_message}", state="error") | |
raise Exception(f"Blog generation error: {error_message}") | |
# μ΄λ―Έμ§ μμ± | |
answer_entry_saved = False | |
if st.session_state.generate_image and full_response: | |
with st.spinner("Generating image..."): | |
try: | |
ip = extract_image_prompt(full_response, prompt) | |
img, cap = generate_image(ip) | |
if img: | |
st.image(img, caption=cap) | |
st.session_state.messages.append({ | |
"role": "assistant", | |
"content": full_response, | |
"image": img, | |
"image_caption": cap | |
}) | |
answer_entry_saved = True | |
except Exception as img_error: | |
logging.error(f"Image generation error: {str(img_error)}") | |
st.warning("μ΄λ―Έμ§ μμ±μ μ€ν¨νμ΅λλ€. λΈλ‘κ·Έ μ½ν μΈ λ§ μ μ₯λ©λλ€.") | |
# Save the answer if not saved above | |
if not answer_entry_saved and full_response: | |
st.session_state.messages.append({"role": "assistant", "content": full_response}) | |
# Download buttons | |
if full_response: | |
st.subheader("Download This Blog") | |
c1, c2 = st.columns(2) | |
c1.download_button( | |
"Markdown", | |
data=full_response, | |
file_name=f"{prompt[:30]}.md", | |
mime="text/markdown" | |
) | |
c2.download_button( | |
"HTML", | |
data=md_to_html(full_response, prompt[:30]), | |
file_name=f"{prompt[:30]}.html", | |
mime="text/html" | |
) | |
# Auto save | |
if st.session_state.auto_save and st.session_state.messages: | |
try: | |
fn = f"chat_history_auto_{datetime.now():%Y%m%d_%H%M%S}.json" | |
with open(fn, "w", encoding="utf-8") as fp: | |
json.dump(st.session_state.messages, fp, ensure_ascii=False, indent=2) | |
except Exception as e: | |
logging.error(f"Auto-save failed: {e}") | |
except Exception as e: | |
error_message = str(e) | |
placeholder.error(f"An error occurred: {error_message}") | |
logging.error(f"Process input error: {error_message}") | |
ans = f"An error occurred while processing your request: {error_message}" | |
st.session_state.messages.append({"role": "assistant", "content": ans}) | |
# ββββββββββββββββββββββββββββββββ main ββββββββββββββββββββββββββββββββββββ | |
def main(): | |
ginigen_app() | |
if __name__ == "__main__": | |
main() |