|
import gradio as gr |
|
import requests |
|
import json |
|
import os |
|
from datetime import datetime, timedelta |
|
from huggingface_hub import InferenceClient |
|
|
|
API_KEY = os.getenv("SERPHOUSE_API_KEY") |
|
hf_client = InferenceClient("CohereForAI/c4ai-command-r-plus-08-2024", token=os.getenv("HF_TOKEN")) |
|
|
|
|
|
|
|
COUNTRY_CODES = { |
|
"United States": "US", |
|
"United Kingdom": "GB", |
|
"Canada": "CA", |
|
"Australia": "AU", |
|
"Germany": "DE", |
|
"France": "FR", |
|
"Japan": "JP", |
|
"South Korea": "KR", |
|
"China": "CN", |
|
"Taiwan": "TW", |
|
"India": "IN", |
|
"Brazil": "BR", |
|
"Mexico": "MX", |
|
"Russia": "RU", |
|
"Italy": "IT", |
|
"Spain": "ES", |
|
"Netherlands": "NL", |
|
"Singapore": "SG", |
|
"Hong Kong": "HK", |
|
"Indonesia": "ID", |
|
"Malaysia": "MY", |
|
"Philippines": "PH", |
|
"Thailand": "TH", |
|
"Vietnam": "VN", |
|
"Belgium": "BE", |
|
"Denmark": "DK", |
|
"Finland": "FI", |
|
"Ireland": "IE", |
|
"Norway": "NO", |
|
"Poland": "PL", |
|
"Sweden": "SE", |
|
"Switzerland": "CH", |
|
"Austria": "AT", |
|
"Czech Republic": "CZ", |
|
"Greece": "GR", |
|
"Hungary": "HU", |
|
"Portugal": "PT", |
|
"Romania": "RO", |
|
"Turkey": "TR", |
|
"Israel": "IL", |
|
"Saudi Arabia": "SA", |
|
"United Arab Emirates": "AE", |
|
"South Africa": "ZA", |
|
"Argentina": "AR", |
|
"Chile": "CL", |
|
"Colombia": "CO", |
|
"Peru": "PE", |
|
"Venezuela": "VE", |
|
"New Zealand": "NZ", |
|
"Bangladesh": "BD", |
|
"Pakistan": "PK", |
|
"Egypt": "EG", |
|
"Morocco": "MA", |
|
"Nigeria": "NG", |
|
"Kenya": "KE", |
|
"Ukraine": "UA", |
|
"Croatia": "HR", |
|
"Slovakia": "SK", |
|
"Bulgaria": "BG", |
|
"Serbia": "RS", |
|
"Estonia": "EE", |
|
"Latvia": "LV", |
|
"Lithuania": "LT", |
|
"Slovenia": "SI", |
|
"Luxembourg": "LU", |
|
"Malta": "MT", |
|
"Cyprus": "CY", |
|
"Iceland": "IS" |
|
} |
|
|
|
|
|
def is_english(text): |
|
|
|
return all(ord(char) < 128 for char in text.replace(' ', '')) |
|
|
|
COUNTRY_LANGUAGES = { |
|
"South Korea": "ko", |
|
"Japan": "ja", |
|
"China": "zh", |
|
"Taiwan": "zh-tw", |
|
"Russia": "ru", |
|
"France": "fr", |
|
"Germany": "de", |
|
"Spain": "es", |
|
"Italy": "it", |
|
"Netherlands": "nl", |
|
"Portugal": "pt", |
|
"Thailand": "th", |
|
"Vietnam": "vi", |
|
"Indonesia": "id", |
|
"Malaysia": "ms", |
|
"Saudi Arabia": "ar", |
|
"United Arab Emirates": "ar", |
|
"Egypt": "ar", |
|
"Morocco": "ar", |
|
"Greece": "el", |
|
"Poland": "pl", |
|
"Czech Republic": "cs", |
|
"Hungary": "hu", |
|
"Turkey": "tr", |
|
"Romania": "ro", |
|
"Bulgaria": "bg", |
|
"Croatia": "hr", |
|
"Serbia": "sr", |
|
"Slovakia": "sk", |
|
"Slovenia": "sl", |
|
"Estonia": "et", |
|
"Latvia": "lv", |
|
"Lithuania": "lt", |
|
"Ukraine": "uk", |
|
"Israel": "he", |
|
"Bangladesh": "bn", |
|
"Pakistan": "ur", |
|
"Finland": "fi", |
|
"Denmark": "da", |
|
"Norway": "no", |
|
"Sweden": "sv", |
|
"Iceland": "is", |
|
"Philippines": "fil", |
|
"Brazil": "pt-br", |
|
"Argentina": "es-ar", |
|
"Chile": "es-cl", |
|
"Colombia": "es-co", |
|
"Peru": "es-pe", |
|
"Venezuela": "es-ve" |
|
} |
|
|
|
|
|
def translate_query(query, country): |
|
try: |
|
if is_english(query): |
|
print(f"English query detected, using original: {query}") |
|
return query[:255] |
|
|
|
if country == "South Korea": |
|
return query[:255] |
|
|
|
if country in COUNTRY_LANGUAGES: |
|
query = query[:100] |
|
target_lang = COUNTRY_LANGUAGES[country] |
|
|
|
|
|
prompt = f"""Translate this text to {target_lang} language. |
|
For Japanese, use Kanji and Kana. |
|
For Chinese (China), use Simplified Chinese. |
|
For Chinese (Taiwan), use Traditional Chinese. |
|
For Korean, use Hangul. |
|
Only output the translated text without any explanation. |
|
Text to translate: {query}""" |
|
|
|
translated = hf_client.text_generation( |
|
prompt, |
|
max_new_tokens=50, |
|
temperature=0.1 |
|
) |
|
translated = translated.strip()[:255] |
|
print(f"Original query: {query}") |
|
print(f"Translated query: {translated}") |
|
return translated |
|
return query[:255] |
|
except Exception as e: |
|
print(f"Translation error: {str(e)}") |
|
return query[:255] |
|
|
|
|
|
|
|
|
|
MAJOR_COUNTRIES = list(COUNTRY_CODES.keys()) |
|
|
|
|
|
def search_serphouse(query, country, page=1, num_result=10): |
|
url = "https://api.serphouse.com/serp/live" |
|
|
|
|
|
translated_query = translate_query(query, country) |
|
print(f"Original query: {query}") |
|
print(f"Translated query: {translated_query}") |
|
|
|
payload = { |
|
"data": { |
|
"q": translated_query, |
|
"domain": "google.com", |
|
"loc": country, |
|
"lang": COUNTRY_LANGUAGES.get(country, "en"), |
|
"device": "desktop", |
|
"serp_type": "web", |
|
"page": "1", |
|
"verbatim": "0", |
|
"gfilter": "0", |
|
"num_result": "10" |
|
} |
|
} |
|
|
|
headers = { |
|
"accept": "application/json", |
|
"content-type": "application/json", |
|
"authorization": f"Bearer {API_KEY}" |
|
} |
|
|
|
try: |
|
response = requests.post(url, json=payload, headers=headers) |
|
print("Request payload:", json.dumps(payload, indent=2, ensure_ascii=False)) |
|
print("Response status:", response.status_code) |
|
print("Response content:", response.text[:500]) |
|
|
|
response.raise_for_status() |
|
return {"results": response.json(), "translated_query": translated_query} |
|
except requests.RequestException as e: |
|
error_msg = f"Error: {str(e)}" |
|
if hasattr(response, 'text'): |
|
error_msg += f"\nResponse content: {response.text}" |
|
return {"error": error_msg, "translated_query": query} |
|
|
|
def format_results_from_raw(response_data): |
|
if "error" in response_data: |
|
return "Error: " + response_data["error"], [] |
|
|
|
try: |
|
results = response_data["results"] |
|
translated_query = response_data["translated_query"] |
|
|
|
news_results = results.get('results', {}).get('results', {}).get('news', []) |
|
if not news_results: |
|
return "๊ฒ์ ๊ฒฐ๊ณผ๊ฐ ์์ต๋๋ค.", [] |
|
|
|
articles = [] |
|
for idx, result in enumerate(news_results, 1): |
|
articles.append({ |
|
"index": idx, |
|
"title": result.get("title", "์ ๋ชฉ ์์"), |
|
"link": result.get("url", result.get("link", "#")), |
|
"snippet": result.get("snippet", "๋ด์ฉ ์์"), |
|
"channel": result.get("channel", result.get("source", "์ ์ ์์")), |
|
"time": result.get("time", result.get("date", "์ ์ ์๋ ์๊ฐ")), |
|
"image_url": result.get("img", result.get("thumbnail", "")), |
|
"translated_query": translated_query |
|
}) |
|
return "", articles |
|
except Exception as e: |
|
return f"๊ฒฐ๊ณผ ์ฒ๋ฆฌ ์ค ์ค๋ฅ ๋ฐ์: {str(e)}", [] |
|
|
|
def serphouse_search(query, country): |
|
response_data = search_serphouse(query, country) |
|
return format_results_from_raw(response_data) |
|
|
|
css = """ |
|
footer {visibility: hidden;} |
|
""" |
|
|
|
with gr.Blocks(theme="Nymbo/Nymbo_Theme", css=css, title="NewsAI ์๋น์ค") as iface: |
|
gr.Markdown("๊ฒ์์ด๋ฅผ ์
๋ ฅํ๊ณ ์ํ๋ ๊ตญ๊ฐ๋ฅผ ์ ํํ๋ฉด, ํด๋น ๊ตญ๊ฐ์ ์ธ์ด๋ก ๋ฒ์ญ๋ ๊ฒ์์ด๋ก ๋ด์ค๋ฅผ ๊ฒ์ํฉ๋๋ค.") |
|
|
|
with gr.Column(): |
|
with gr.Row(): |
|
query = gr.Textbox(label="๊ฒ์์ด") |
|
country = gr.Dropdown(MAJOR_COUNTRIES, label="๊ตญ๊ฐ", value="South Korea") |
|
|
|
|
|
translated_display = gr.Markdown(visible=True) |
|
|
|
search_button = gr.Button("๊ฒ์", variant="primary") |
|
|
|
progress = gr.Progress() |
|
status_message = gr.Markdown(visible=False) |
|
articles_state = gr.State([]) |
|
|
|
article_components = [] |
|
for i in range(100): |
|
with gr.Group(visible=False) as article_group: |
|
title = gr.Markdown() |
|
image = gr.Image(width=200, height=150) |
|
snippet = gr.Markdown() |
|
info = gr.Markdown() |
|
|
|
article_components.append({ |
|
'group': article_group, |
|
'title': title, |
|
'image': image, |
|
'snippet': snippet, |
|
'info': info, |
|
'index': i, |
|
}) |
|
|
|
def search_and_display(query, country, articles_state, progress=gr.Progress()): |
|
progress(0, desc="๊ฒ์ ์์...") |
|
|
|
|
|
translated_query = translate_query(query, country) |
|
if is_english(query): |
|
translated_display_text = f"์์ด ๊ฒ์์ด: {query}" |
|
elif country == "South Korea": |
|
translated_display_text = f"๊ฒ์์ด: {query}" |
|
elif translated_query != query: |
|
translated_display_text = f"์๋ณธ ๊ฒ์์ด: {query}\n๋ฒ์ญ๋ ๊ฒ์์ด: {translated_query}" |
|
else: |
|
translated_display_text = f"๊ฒ์์ด: {query}" |
|
|
|
progress(0.2, desc="๊ฒ์ ์ค...") |
|
|
|
|
|
response_data = search_serphouse(query, country) |
|
error_message, articles = format_results_from_raw(response_data) |
|
|
|
outputs = [gr.update(value=translated_display_text, visible=True)] |
|
|
|
if error_message: |
|
outputs.append(gr.update(value=error_message, visible=True)) |
|
for comp in article_components: |
|
outputs.extend([ |
|
gr.update(visible=False), gr.update(), gr.update(), |
|
gr.update(), gr.update() |
|
]) |
|
articles_state = [] |
|
else: |
|
outputs.append(gr.update(value="", visible=False)) |
|
total_articles = len(articles) |
|
for idx, comp in enumerate(article_components): |
|
progress((idx + 1) / total_articles, desc=f"๊ฒฐ๊ณผ ํ์ ์ค... {idx + 1}/{total_articles}") |
|
if idx < len(articles): |
|
article = articles[idx] |
|
image_url = article['image_url'] |
|
image_update = gr.update(value=image_url, visible=True) if image_url and not image_url.startswith('data:image') else gr.update(value=None, visible=False) |
|
|
|
outputs.extend([ |
|
gr.update(visible=True), |
|
gr.update(value=f"### [{article['title']}]({article['link']})"), |
|
image_update, |
|
gr.update(value=f"**์์ฝ:** {article['snippet']}"), |
|
gr.update(value=f"**์ถ์ฒ:** {article['channel']} | **์๊ฐ:** {article['time']}") |
|
]) |
|
else: |
|
outputs.extend([ |
|
gr.update(visible=False), gr.update(), gr.update(), |
|
gr.update(), gr.update() |
|
]) |
|
articles_state = articles |
|
|
|
progress(1.0, desc="์๋ฃ!") |
|
outputs.append(articles_state) |
|
outputs.append(gr.update(visible=False)) |
|
return outputs |
|
|
|
|
|
search_outputs = [translated_display, gr.Markdown(visible=False)] |
|
for comp in article_components: |
|
search_outputs.extend([ |
|
comp['group'], |
|
comp['title'], |
|
comp['image'], |
|
comp['snippet'], |
|
comp['info'] |
|
]) |
|
search_outputs.extend([articles_state, status_message]) |
|
|
|
|
|
search_button.click( |
|
fn=search_and_display, |
|
inputs=[query, country, articles_state], |
|
outputs=search_outputs, |
|
show_progress=True |
|
) |
|
|
|
iface.launch() |