Tip / app.py
kfkas's picture
z
edb47bf
raw
history blame
43.7 kB
import os
import shutil
import cv2
import base64
import uuid
from flask import Flask
import gradio as gr
import re
import pandas as pd
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
from selenium.common.exceptions import NoSuchElementException, TimeoutException
import time
class GoogleReviewManager:
"""
구글 리뷰 크롤링을 통해 리뷰 데이터를 한 번만 가져와 텍스트로 저장하고,
DEFAULT_PROMPT_TEMPLATE에 적용할 리뷰 문자열을 생성하는 클래스.
"""
def __init__(self, url, target_review_count=20):
self.url = url
self.target_review_count = target_review_count
self.reviews_text = self.fetch_reviews_text()
def fetch_reviews_text(self):
df_reviews = self.google_review_crawling(self.target_review_count, self.url)
if df_reviews.empty:
return "(구글 리뷰를 불러오지 못했습니다.)"
reviews = []
for index, row in df_reviews.iterrows():
# 예: [4.5 stars] Excellent service and food.
reviews.append(f"[{row['Rating']} stars] {row['Review Text']}")
# 각 리뷰를 개행 문자로 구분하여 하나의 문자열로 생성
return "\n".join(reviews)
@staticmethod
def format_google_reviews(reviews_text):
# 각 줄로 분리하고, 이미 "####"가 포함된 줄은 제외하여 순수한 리뷰 내용만 남김
reviews = [line for line in reviews_text.split("\n") if line.strip() and "####" not in line]
formatted_reviews = []
for i, review in enumerate(reviews, start=1):
formatted_reviews.append(f"#### Google Review {i} ####\n{review}")
return "\n\n".join(formatted_reviews)
def google_review_crawling(self, TARGET_REVIEW_COUNT, url):
try:
service = Service(ChromeDriverManager().install())
options = webdriver.ChromeOptions()
options.add_argument("--headless=new")
options.add_argument("--disable-gpu")
options.add_argument("--window-size=600,600")
# 언어 설정 (한글 리뷰를 원하면 "--lang=ko"로 변경)
options.add_argument("--lang=en")
options.add_argument(
"user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36")
driver = webdriver.Chrome(service=service, options=options)
print("웹 드라이버 설정 완료 (헤드리스 모드).")
except Exception as e:
print(f"웹 드라이버 설정 중 오류 발생: {e}")
exit()
reviews_data = []
processed_keys = set() # 중복 리뷰 방지를 위한 고유 키 저장
try:
driver.get(url)
print("Google Maps 접속 완료.")
time.sleep(3)
zoom_level = 0.7
driver.execute_script(f"document.body.style.zoom = '{zoom_level}'")
try:
overall_rating_selector = (By.CSS_SELECTOR, "div.F7nice span[aria-hidden='true']")
overall_rating_element = WebDriverWait(driver, 10).until(
EC.visibility_of_element_located(overall_rating_selector)
)
overall_rating_text = overall_rating_element.text
print(f"총 평점 찾음: {overall_rating_text}")
except (TimeoutException, NoSuchElementException):
print("총 평점 요소를 찾지 못했습니다.")
# 리뷰 탭으로 이동
review_tab_button = None
possible_review_selectors = [
(By.XPATH, "//button[@role='tab'][contains(., 'Reviews')]"),
(By.XPATH, "//button[contains(text(), 'Reviews')]"),
(By.CSS_SELECTOR, "button[aria-label*='Reviews']"),
]
wait_for_review_tab = WebDriverWait(driver, 10)
for selector in possible_review_selectors:
try:
review_tab_button = wait_for_review_tab.until(
EC.element_to_be_clickable(selector)
)
print(f"리뷰 탭 버튼 찾음 (방식: {selector[0]}, 값: {selector[1]})")
break
except TimeoutException:
continue
if not review_tab_button:
print("리뷰 탭 버튼을 찾을 수 없습니다.")
raise NoSuchElementException("리뷰 탭 버튼 없음")
review_tab_button.click()
time.sleep(3)
# 정렬 버튼 클릭 후 '최신순' 선택
try:
sort_button_selector = (By.XPATH,
"//button[contains(@aria-label, '정렬 기준') or contains(@aria-label, 'Sort by') or .//span[contains(text(), 'Sort')]]")
sort_button = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable(sort_button_selector)
)
print("정렬 기준 버튼 찾음. 클릭 시도...")
sort_button.click()
time.sleep(1)
newest_option_selector = (By.XPATH, "//div[@role='menuitemradio'][contains(., 'Newest')]")
newest_option = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable(newest_option_selector)
)
print("최신순 옵션 찾음. 클릭 시도...")
newest_option.click()
print("최신순으로 정렬 적용됨. 잠시 대기...")
time.sleep(3)
except (TimeoutException, NoSuchElementException) as e:
print(f"정렬 적용 중 오류 발생: {e}. 기본 정렬 상태로 진행합니다.")
time.sleep(3)
scrollable_div_selector = (By.CSS_SELECTOR, "div.m6QErb.DxyBCb.kA9KIf.dS8AEf.XiKgde[tabindex='-1']")
review_elements_selector = (By.CSS_SELECTOR, "div.jftiEf.fontBodyMedium")
try:
scrollable_div = WebDriverWait(driver, 15).until(
EC.presence_of_element_located(scrollable_div_selector)
)
print("리뷰 스크롤 영역 찾음.")
except TimeoutException:
print("리뷰 스크롤 영역을 찾을 수 없습니다.")
scrollable_div = None
time.sleep(5)
def get_review_text(review):
try:
more_button = review.find_element(By.CSS_SELECTOR, "button.w8nwRe.kyuRq")
driver.execute_script("arguments[0].scrollIntoView(true);", more_button)
time.sleep(0.3)
driver.execute_script("arguments[0].click();", more_button)
time.sleep(0.5)
except Exception:
pass
try:
review_text = review.find_element(By.CSS_SELECTOR, "span.wiI7pd").text
return review_text.strip()
except Exception:
return ""
loop_count = 0
max_loop = 50
while len(reviews_data) < TARGET_REVIEW_COUNT and loop_count < max_loop:
loop_count += 1
previous_count = len(reviews_data)
all_reviews = driver.find_elements(*review_elements_selector)
print(f"Loop {loop_count}: 총 {len(all_reviews)}개의 리뷰 요소 발견.")
for review in all_reviews:
try:
reviewer_name = review.find_element(By.CSS_SELECTOR, "div.d4r55").text
except Exception:
reviewer_name = "N/A"
try:
review_date = review.find_element(By.CSS_SELECTOR, "span.rsqaWe").text
except Exception:
review_date = "N/A"
unique_key = reviewer_name + review_date
if unique_key in processed_keys:
continue
processed_keys.add(unique_key)
review_text = get_review_text(review)
if review_text:
try:
rating_span = review.find_element(By.CSS_SELECTOR, "span.kvMYJc")
rating = rating_span.get_attribute("aria-label")
except Exception:
rating = "N/A"
review_info = {
"reviewer_name": reviewer_name,
"rating": rating,
"date": review_date,
"text": review_text.replace('\n', ' ')
}
reviews_data.append(review_info)
print(f"리뷰 추가: {reviewer_name}, {review_date}")
if len(reviews_data) >= TARGET_REVIEW_COUNT:
break
if len(reviews_data) == previous_count:
print("새로운 리뷰가 추가되지 않아 스크롤을 중단합니다.")
break
if scrollable_div:
for i in range(20):
time.sleep(0.1)
scroll_amount = 1000
driver.execute_script('arguments[0].scrollBy(0, arguments[1]);', scrollable_div, scroll_amount)
time.sleep(2)
else:
break
if reviews_data:
review_list = []
for review in reviews_data[:TARGET_REVIEW_COUNT]:
rating_str = review['rating']
if rating_str != "N/A":
try:
rating_num = float(rating_str.split()[0])
except Exception:
rating_num = None
else:
rating_num = None
review_list.append({
"Name": review['reviewer_name'],
"Rating": rating_num,
"Date / Time Ago": review['date'],
"Review Text": review['text']
})
df_reviews = pd.DataFrame(review_list)
else:
df_reviews = pd.DataFrame()
except Exception as e:
print(f"스크립트 실행 중 예기치 않은 오류 발생: {e}")
df_reviews = pd.DataFrame()
finally:
if 'driver' in locals() and driver:
driver.quit()
return df_reviews
# --- Config 클래스 (Gemma, GPT4o 제거, Qwen만 사용) ---
class Config:
"""애플리케이션 설정 및 상수"""
FOOD_ITEMS = [
{"name": "짜장면", "image": "images/food1.jpg", "price": 7.00},
{"name": "짬뽕", "image": "images/food2.jpg", "price": 8.50},
{"name": "탕수육", "image": "images/food3.jpg", "price": 15.00},
{"name": "볶음밥", "image": "images/food4.jpg", "price": 7.50},
{"name": "깐풍기", "image": "images/food5.jpg", "price": 18.00},
{"name": "마파두부", "image": "images/food6.jpg", "price": 12.00},
{"name": "콜라", "image": "images/food6.jpg", "price": 12.00},
{"name": "사이다", "image": "images/food6.jpg", "price": 12.00},
]
# 알리바바 Qwen API 키 (기본값은 빈 문자열)
QWEN_API_KEY = ""
DEFAULT_PROMPT_TEMPLATE = (
"### Persona ###\n"
"You are an expert tip calculation assistant focusing on service quality observed in a video, and you also consider the user's review, star rating, and recent Google reviews. "
"Your role is to evaluate all these aspects evenly by assigning each a score out of 100, then calculate the overall average score to determine the appropriate tip percentage.\n\n"
"### Task ###\n"
"1. **Video Analysis**: Analyze the video frames as described in the 'Video Caption' to observe the staff's actions and interactions. "
"Assign a score out of 100 based on the observed service quality.\n\n"
"2. **Bill Amount Determination**: Determine the bill amount by following these steps:\n"
" - If an explicit bill amount is mentioned in the 'Video Caption', use it.\n"
" - Otherwise, use the 'Calculated Subtotal' provided.\n"
" - If neither is available, assume a default value of $50.\n\n"
"3. **Overall Service Quality Evaluation**:\n"
" Evaluate the service quality by evenly scoring the following four components, each out of 100:\n"
" a) **Video Service Score**: Service quality observed in the video (from Video Caption).\n"
" b) **Google Review Score**: Insights from the Recent Google Reviews (note if any racist, extremely negative comments, or quality issues are reported).\n"
" c) **User Review Score**: The user's review (if the review mentions improvements or enhanced service, especially if it indicates that previously reported issues like racism have been resolved, adjust the negative impact accordingly).\n"
" d) **Star Rating Score**: The user's star rating, interpreted on a scale where 5/5 corresponds to 100 points.\n"
" - Calculate the overall average score by taking the mean of these four scores.\n\n"
"4. **Service Quality Classification and Tip Guidelines**:\n"
" - Based on the overall average score, classify the service quality as follows:\n"
" - Poor Service: Overall average score < 60 (Tip range: 0% ~ 5% of the bill)\n"
" - Average Service: Overall average score ≥ 60 and < 80 (Tip range: 10% ~ 15% of the bill)\n"
" - Good Service: Overall average score ≥ 80 (Tip range: 15% ~ 20% of the bill)\n"
" - Select a specific tip percentage within the appropriate range.\n"
" - Calculate the tip amount by multiplying the determined bill amount by the chosen tip percentage (round to two decimal places).\n"
" - Calculate the final total bill by adding the tip amount to the subtotal (round to two decimal places).\n\n"
"5. **Review Prioritization and Score Adjustment**:\n"
" - Even though all four factors are evaluated equally, if the user's review explicitly indicates improvements (e.g., stating 'There's no racism!' or similar phrases), then increase the Google Review score by adjusting or restoring it to a value higher than the score initially reduced due to negative comments. In other words, if improvements are noted, add a positive increment (using a '+' adjustment) to the Google Review score, ensuring it is higher than the negatively impacted value.\n"
" - Ensure that the user's review score takes priority when there is a conflict with the negative tone of the Google Reviews.\n\n"
"### User Context ###\n"
" - Current Country: USA\n"
" - Restaurant Name: The Golden Spoon (Assumed)\n"
" - Calculated Subtotal: ${calculated_subtotal:.2f}\n"
" - User Star Rating: {star_rating} / 5\n"
" - Currently User Review: {user_review}\n\n"
"### Recent Google Review ###\n\n"
"{google_reviews}\n\n"
"#### Recent Google Review 3 ####\n"
"[1.0 stars] It was a very racist remark, 매우 인종차별적인 발언을 겪었어요. 모욕적입니다,,\n\n"
"### Input ###\n"
"Video Caption:\n{{caption_text}}\n\n"
"### Output ###\n"
"Return your answer in the exact format below:\n"
"Video Text Analysis: [Summary of the observed actions and interactions of the staff in the video along with the assigned score (out of 100) based on video analysis.]\n"
"Recent Google Review Analysis: [Summary of the insights from the Google Reviews, including any negative or racist comments, with the assigned score (out of 100).]\n"
"User Review Analysis: [Summary of the user's review including any improvements or enhanced service mentions, with the assigned score (out of 100).]\n"
"Star Rating Analysis: [Interpret the user's star rating (e.g., converting 5/5 to 100 points) and include the assigned score (out of 100).]\n"
"Overall Analysis: [Step-by-step explanation detailing:\n"
" - How the bill amount was determined;\n"
" - How each of the four components was scored and how any negative scores (e.g., for racism) were adjusted based on improvement indications in the user's review;\n"
" - How the overall average score was calculated;\n"
" - The reasoning for the final service quality classification based on the average score;\n"
" - How the tip percentage was chosen within the guideline range and the detailed calculation for the tip amount and final total bill.]\n\n"
"### Example Output Indicators (for reference only) ###\n"
"**Final Tip Percentage**: 12.5%\n"
"**Final Tip Amount**: $6.25\n"
"**Final Total Bill**: $56.25\n\n"
"### Output Indicators ###\n"
"**Final Tip Percentage**: [X]% (only floating point)\n"
"**Final Tip Amount**: $[Calculated Tip]\n"
"**Final Total Bill**: $[Subtotal + Tip]\n"
)
CUSTOM_CSS = """
#food-container {
display: grid;
grid-template-columns: repeat(3, 1fr);
gap: 10px;
overflow-y: auto;
height: 600px;
}
#qwen-button {
background-color: #8A2BE2 !important;
color: white !important;
border-color: #8A2BE2 !important;
}
#qwen-button:hover {
background-color: #7722CC !important;
}
"""
def __init__(self):
review_url = "https://www.google.com/maps/place/Wolfgang%E2%80%99s+Steakhouse/data=!3m1!4b1!4m6!3m5!1s0x357ca4778cdd1105:0x27d5ead252b66bfd!8m2!3d37.5244965!4d127.0414635!16s%2Fg%2F11c3pwpp26?hl=en&entry=ttu&g_ep=EgoyMDI1MDQwMi4xIKXMDSoASAFQAw%3D%3D"
self.google_review_manager = GoogleReviewManager(review_url, target_review_count=2)
# 여기서 정적 메서드를 통해 리뷰를 미리 포맷하여 저장합니다.
self.GOOGLE_REVIEWS = GoogleReviewManager.format_google_reviews(
self.google_review_manager.reviews_text
)
# 이미지 디렉토리 확인
if not os.path.exists("images"):
print("경고: 'images' 폴더를 찾을 수 없습니다. 음식 이미지가 표시되지 않을 수 있습니다.")
for item in self.FOOD_ITEMS:
if not os.path.exists(item["image"]):
print(f"경고: 이미지 파일을 찾을 수 없습니다 - {item['image']}")
# --- ModelClients (알리바바 Qwen API만 사용) ---
class ModelClients:
def __init__(self, config: Config):
self.config = config
from openai import OpenAI as QwenOpenAI
self.qwen_client = QwenOpenAI(
api_key=config.QWEN_API_KEY,
base_url="https://dashscope-intl.aliyuncs.com/compatible-mode/v1",
)
def encode_video_qwen(self, video_path):
with open(video_path, "rb") as video_file:
return base64.b64encode(video_file.read()).decode("utf-8")
# --- VideoProcessor: 비디오 프레임 추출 ---
class VideoProcessor:
def extract_video_frames(self, video_path, output_folder=None, fps=1):
if not video_path:
return [], None
if output_folder is None:
output_folder = f"frames_list/frames_{uuid.uuid4().hex}"
os.makedirs(output_folder, exist_ok=True)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"오류: 비디오 파일을 열 수 없습니다 - {video_path}")
return [], None
frame_paths = []
frame_rate = cap.get(cv2.CAP_PROP_FPS)
if not frame_rate or frame_rate == 0:
print("경고: FPS를 읽을 수 없습니다, 기본값 4으로 설정합니다.")
frame_rate = 4.0
frame_interval = int(frame_rate / fps) if fps > 0 else 1
if frame_interval <= 0:
frame_interval = 1
frame_count = 0
saved_frame_count = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if frame is None:
print(f"경고: {frame_count}번째 프레임이 비어있습니다.")
frame_count += 1
continue
if frame_count % frame_interval == 0:
frame_path = os.path.join(output_folder, f"frame_{saved_frame_count}.jpg")
try:
if cv2.imwrite(frame_path, frame):
frame_paths.append(frame_path)
saved_frame_count += 1
else:
print(f"경고: {frame_path} 저장 실패.")
except Exception as e:
print(f"경고: 프레임 저장 오류 ({frame_path}): {e}")
frame_count += 1
cap.release()
if not frame_paths:
print("경고: 프레임 추출 실패.")
if os.path.exists(output_folder):
shutil.rmtree(output_folder)
return [], None
return frame_paths, output_folder
def cleanup_temp_files(self, video_path, frame_folder):
if video_path and "temp_video_" in video_path and os.path.exists(video_path):
try:
os.remove(video_path)
print(f"임시 비디오 파일 삭제: {video_path}")
except OSError as e:
print(f"임시 비디오 파일 삭제 오류: {e}")
if frame_folder and os.path.exists(frame_folder):
try:
shutil.rmtree(frame_folder)
print(f"프레임 폴더 삭제: {frame_folder}")
except OSError as e:
print(f"프레임 폴더 삭제 오류: {e}")
# --- TipCalculator (알리바바 Qwen API를 사용한 팁 계산) ---
class TipCalculator:
def __init__(self, config: Config, model_clients: ModelClients, video_processor: VideoProcessor):
self.config = config
self.model_clients = model_clients
self.video_processor = video_processor
def parse_llm_output(self, output_text):
"""LLM 출력을 파싱하여 팁 계산 결과 추출"""
analysis = "Analysis not found."
tip_percentage = 0.0
tip_amount = 0.0
total_bill = 0.0
# Analysis 부분: "Analysis:" 이후부터 "**Final Tip Percentage**" 이전까지 추출
analysis_match = re.search(r"Analysis:\s*(.*?)\*\*Final Tip Percentage\*\*", output_text,
re.DOTALL | re.IGNORECASE)
if analysis_match:
analysis = analysis_match.group(1).strip()
else:
analysis_match_alt = re.search(r"Analysis:\s*(.*)", output_text, re.DOTALL | re.IGNORECASE)
if analysis_match_alt:
analysis = analysis_match_alt.group(1).strip()
# **Final Tip Percentage** 추출 (예: **Final Tip Percentage**: 2.00%)
percentage_match = re.search(r"\*\*Final Tip Percentage\*\*:\s*([0-9]+(?:\.[0-9]+)?)%", output_text,
re.DOTALL | re.IGNORECASE)
if percentage_match:
try:
tip_percentage = float(percentage_match.group(1))
except ValueError:
print(f"경고: Tip Percentage 변환 실패 - {percentage_match.group(1)}")
tip_percentage = 0.0
# **Final Tip Amount** 추출 (예: **Final Tip Amount**: $1.44)
tip_match = re.search(r"\*\*Final Tip Amount\*\*:\s*\$?\s*([0-9]+(?:\.[0-9]+)?)", output_text, re.IGNORECASE)
if tip_match:
try:
tip_amount = float(tip_match.group(1))
except ValueError:
print(f"경고: Tip Amount 변환 실패 - {tip_match.group(1)}")
tip_amount = 0.0
else:
print(f"경고: 출력에서 Tip Amount를 찾을 수 없습니다:\n{output_text}")
# **Final Total Bill** 추출 (예: **Final Total Bill**: $73.44)
total_match = re.search(r"\*\*Final Total Bill\*\*:\s*\$?\s*([0-9]+(?:\.[0-9]+)?)", output_text, re.IGNORECASE)
if total_match:
try:
total_bill = float(total_match.group(1))
except ValueError:
print(f"경고: Total Bill 변환 실패 - {total_match.group(1)}")
if len(analysis) < 20 and analysis == "Analysis not found.":
analysis = output_text
return analysis, tip_percentage, tip_amount, output_text
def process_tip_qwen(self, video_file_path, star_rating, user_review, calculated_subtotal, custom_prompt=None):
if not os.path.exists(video_file_path):
return "Error: 비디오 파일 경로가 유효하지 않습니다.", 0.0, 0.0, [], None, ""
base64_video = self.model_clients.encode_video_qwen(video_file_path)
omni_caption_prompt = '''
Task 1: Describe the waiters' actions in these restaurant video frames. Please check for mistakes or negative behaviors.
Task 2: Provide a short chronological summary of the entire scene.
'''
omni_result = self.model_clients.qwen_client.chat.completions.create(
model="qwen2.5-omni-7b",
messages=[
{
"role": "system",
"content": [{"type": "text", "text": "You are a helpful assistant."}],
},
{
"role": "user",
"content": [
{"type": "video_url", "video_url": {"url": f"data:;base64,{base64_video}"}},
{"type": "text", "text": omni_caption_prompt},
],
},
],
modalities=["text"],
stream=True,
stream_options={"include_usage": True},
)
all_omni_chunks = list(omni_result)
caption_text = ""
for chunk in all_omni_chunks[:-1]:
if not chunk.choices:
continue
if chunk.choices[0].delta.content:
caption_text += chunk.choices[0].delta.content
if not caption_text.strip():
caption_text = "(No caption from Omni)"
user_review = user_review.strip() if user_review else "(No user review)"
if custom_prompt is None:
prompt = self.config.DEFAULT_PROMPT_TEMPLATE.format(
calculated_subtotal=calculated_subtotal,
star_rating=star_rating,
user_review=user_review
)
else:
try:
prompt = custom_prompt.format(
calculated_subtotal=calculated_subtotal,
star_rating=star_rating,
user_review=user_review
)
except:
prompt = self.config.DEFAULT_PROMPT_TEMPLATE.format(
calculated_subtotal=calculated_subtotal,
star_rating=star_rating,
user_review=user_review
)
final_prompt = prompt.replace("{caption_text}", caption_text)
qvq_result = self.model_clients.qwen_client.chat.completions.create(
model="qwen2.5-vl-32b-instruct",
messages=[
{"role": "system", "content": [{"type": "text", "text": "You are a helpful assistant."}]},
{"role": "user", "content": [{"type": "text", "text": final_prompt}]},
],
modalities=["text"],
stream=True,
)
all_qvq_chunks = list(qvq_result)
final_reasoning = ""
final_answer = ""
is_answering = False
for c in all_qvq_chunks[:-1]:
if not c.choices:
continue
d = c.choices[0].delta
if hasattr(d, "reasoning_content") and d.reasoning_content:
final_reasoning += d.reasoning_content
if d.content:
if not is_answering:
print("\n" + "=" * 20 + "Complete Response" + "=" * 20 + "\n")
is_answering = True
final_answer += d.content
final_text = final_reasoning + "\n" + final_answer
analysis, tip_percentage, tip_amount, output_text = self.parse_llm_output(final_text)
return analysis, tip_percentage, tip_amount, [], None, output_text
def calculate_manual_tip(self, tip_percent, subtotal):
tip_amount = subtotal * (tip_percent / 100)
total_bill = subtotal + tip_amount
analysis_output = f"Manual calculation using fixed tip percentage of {tip_percent}%."
tip_output = f"${tip_amount:.2f} ({tip_percent:.1f}%)"
total_bill_output = f"${total_bill:.2f}"
return analysis_output, tip_output, total_bill_output
# --- UIHandler: Gradio 인터페이스 이벤트 처리 (알리바바 API 키 입력 포함) ---
class UIHandler:
def __init__(self, config: Config, tip_calculator: TipCalculator, video_processor: VideoProcessor):
self.config = config
self.tip_calculator = tip_calculator
self.video_processor = video_processor
def update_subtotal_and_prompt(self, *args):
"""사용자 입력에 따라 소계 및 프롬프트 업데이트"""
num_food_items = len(self.config.FOOD_ITEMS)
quantities = args[:num_food_items]
star_rating = args[num_food_items]
user_review = args[num_food_items + 1]
calculated_subtotal = 0.0
for i in range(num_food_items):
calculated_subtotal += self.config.FOOD_ITEMS[i]['price'] * quantities[i]
user_review_text = user_review.strip() if user_review and user_review.strip() else "(No user review provided)"
# Google 리뷰 텍스트를 동적으로 포맷팅 (정적 메서드 호출)
formatted_google_reviews = GoogleReviewManager.format_google_reviews(self.config.GOOGLE_REVIEWS)
# print(formatted_google_reviews)
updated_prompt = self.config.DEFAULT_PROMPT_TEMPLATE.format(
calculated_subtotal=calculated_subtotal,
star_rating=star_rating,
user_review=user_review_text,
google_reviews=formatted_google_reviews
)
updated_prompt = updated_prompt.replace("{caption_text}", "{{caption_text}}")
return calculated_subtotal, updated_prompt
def compute_tip(self, alibaba_key, video_file_obj, subtotal, star_rating, user_review, custom_prompt_text):
analysis_output = "계산을 시작합니다..."
tip_percentage = 0.0
tip_output = "$0.00"
total_bill_output = f"${subtotal:.2f}"
if video_file_obj is None:
return "오류: 비디오 파일을 업로드해주세요.", "$0.00", total_bill_output, custom_prompt_text, gr.update(value=None)
try:
if alibaba_key and alibaba_key.strip():
from openai import OpenAI as QwenOpenAI
self.tip_calculator.model_clients.qwen_client = QwenOpenAI(
api_key=alibaba_key,
base_url="https://dashscope-intl.aliyuncs.com/compatible-mode/v1"
)
temp_video_path = f"temp_video_{uuid.uuid4().hex}.mp4"
original_path = video_file_obj.name if hasattr(video_file_obj, 'name') else video_file_obj
shutil.copyfile(original_path, temp_video_path)
print(f"임시 비디오 파일 생성: {temp_video_path}")
except Exception as e:
print(f"임시 비디오 파일 생성 오류: {e}")
return f"오류: 비디오 파일을 처리할 수 없습니다: {e}", "$0.00", total_bill_output, custom_prompt_text, None
frame_folder = None
try:
analysis, tip_percentage, tip_amount, _, _, output_text = self.tip_calculator.process_tip_qwen(
temp_video_path, star_rating, user_review, subtotal, custom_prompt_text
)
if "Error" in analysis:
analysis_output = analysis
tip_amount = 0.0
else:
analysis_output = f"Tip Percentage: {tip_percentage:.1f}%\n\n{output_text}"
tip_output = f"${tip_amount:.2f} ({tip_percentage:.1f}%)"
total_bill = subtotal + tip_amount
total_bill_output = f"${total_bill:.2f}"
except Exception as e:
print(f"팁 계산 중 오류 발생 (qwen): {e}")
analysis_output = f"오류 발생: {e}"
tip_output = "$0.00"
total_bill_output = f"${subtotal:.2f}"
finally:
self.video_processor.cleanup_temp_files(temp_video_path, frame_folder)
return analysis_output, tip_output, total_bill_output, custom_prompt_text, gr.update(value=None)
def auto_tip_and_invoice(self, alibaba_key, video_file_obj, subtotal, star_rating, review, prompt, *quantities):
analysis, tip_disp, total_bill_disp, prompt_out, vid_out = self.compute_tip(
alibaba_key, video_file_obj, subtotal, star_rating, review, prompt
)
#print(analysis, tip_disp, total_bill_disp, prompt_out, vid_out)
invoice = self.update_invoice_summary(*quantities, tip_disp, total_bill_disp)
return analysis, tip_disp, total_bill_disp, prompt_out, vid_out, invoice
def update_invoice_summary(self, *args):
num_items = len(self.config.FOOD_ITEMS)
quantities = args[:num_items]
if len(args) >= num_items + 2:
tip_str = args[num_items]
total_bill_str = args[num_items + 1]
else:
tip_str = "$0.00"
total_bill_str = "$0.00"
summary = ""
for i, q in enumerate(quantities):
try:
q_val = float(q)
except:
q_val = 0
if q_val > 0:
item = self.config.FOOD_ITEMS[i]
total_price = item['price'] * q_val
summary += f"{item['name']} x{int(q_val)} : ${total_price:.2f}\n"
if summary == "":
summary = "주문한 메뉴가 없습니다."
summary += f"\nTip: {tip_str}\nTotal Bill: {total_bill_str}"
return summary
def manual_tip_and_invoice(self, tip_percent, subtotal, *quantities):
analysis, tip_disp, total_bill_disp = self.tip_calculator.calculate_manual_tip(tip_percent, subtotal)
invoice = self.update_invoice_summary(*quantities, tip_disp, total_bill_disp)
return analysis, tip_disp, total_bill_disp, invoice
def process_payment(self, total_bill):
return f"{total_bill} 결제되었습니다."
# --- App: 모든 컴포넌트 연결 및 Gradio 인터페이스 실행 ---
class App:
def __init__(self):
self.config = Config()
self.model_clients = ModelClients(self.config)
self.video_processor = VideoProcessor()
self.tip_calculator = TipCalculator(self.config, self.model_clients, self.video_processor)
self.ui_handler = UIHandler(self.config, self.tip_calculator, self.video_processor)
self.flask_app = Flask(__name__)
def create_gradio_blocks(self):
with gr.Blocks(title="Video Tip Calculation Interface", theme=gr.themes.Soft(),
css=self.config.CUSTOM_CSS) as interface:
gr.Markdown("## Video Tip Calculation Interface (Structured)")
quantity_inputs = []
subtotal_display = gr.Number(label="Subtotal ($)", value=0.0, interactive=False, visible=False)
with gr.Row():
with gr.Column(scale=2):
gr.Markdown("### 1. Select Food Items")
with gr.Column(elem_id="food-container"):
for item in self.config.FOOD_ITEMS:
with gr.Column():
gr.Image(
value=item["image"],
label=None,
show_label=False,
width=150,
height=150,
interactive=False
)
gr.Markdown(f"**{item['name']}**")
gr.Markdown(f"Price: ${item['price']:.2f}")
q_input = gr.Number(
label="Qty",
value=0,
minimum=0,
step=1,
elem_id=f"qty_{item['name'].replace(' ', '_')}"
)
quantity_inputs.append(q_input)
subtotal_visible_display = gr.Textbox(label="Subtotal", value="$0.00", interactive=False)
gr.Markdown("### 2. Service Feedback")
review_input = gr.Textbox(label="Review", placeholder="서비스 리뷰를 작성해주세요.", lines=3)
rating_input = gr.Radio(choices=[1, 2, 3, 4, 5], value=3, label="⭐Star Rating (1-5)⭐", type="value")
gr.Markdown("### 3. Calculate Tip")
with gr.Row():
btn_5 = gr.Button("5%")
btn_10 = gr.Button("10%")
btn_15 = gr.Button("15%")
btn_20 = gr.Button("20%")
btn_25 = gr.Button("25%")
with gr.Row():
qwen_btn = gr.Button("Alibaba-Qwen", variant="tertiary", elem_id="qwen-button")
gr.Markdown("### 4. Results")
tip_display = gr.Textbox(label="Calculated Tip", value="$0.00", interactive=False)
total_bill_display = gr.Textbox(label="Total Bill (Subtotal + Tip)", value="$0.00", interactive=False)
payment_btn = gr.Button("결제하기")
payment_result = gr.Textbox(label="Payment Result", value="", interactive=False)
with gr.Column(scale=1):
gr.Markdown("### 5. Upload & Prompt")
alibaba_key_input = gr.Textbox(label="Alibaba API Key", placeholder="Enter your Alibaba API Key", lines=1)
video_input = gr.Video(label="Upload Service Video")
prompt_display = gr.Textbox(
label="Tip Calculation Prompt (Review/Rating/Subtotal 반영)",
lines=20,
max_lines=30,
interactive=True,
value=self.config.DEFAULT_PROMPT_TEMPLATE.format(
calculated_subtotal=0.0,
star_rating=3,
user_review="(No user review provided)",
google_reviews=GoogleReviewManager.format_google_reviews(self.config.GOOGLE_REVIEWS)
).replace("{caption_text}", "{{caption_text}}")
)
gr.Markdown("### 6. AI Analysis")
analysis_display = gr.Textbox(label="AI Analysis", lines=10, max_lines=15, interactive=True)
gr.Markdown("### 7. 청구서")
order_summary_display = gr.Textbox(label="청구서", value="주문한 메뉴가 없습니다.", interactive=True)
subtotal_display.change(
fn=lambda x: f"${x:.2f}",
inputs=[subtotal_display],
outputs=[subtotal_visible_display]
)
inputs_for_prompt_update = quantity_inputs + [rating_input, review_input]
outputs_for_prompt_update = [subtotal_display, prompt_display]
for comp in inputs_for_prompt_update:
comp.change(
fn=self.ui_handler.update_subtotal_and_prompt,
inputs=inputs_for_prompt_update,
outputs=outputs_for_prompt_update
)
for comp in quantity_inputs:
comp.change(
fn=self.ui_handler.update_invoice_summary,
inputs=quantity_inputs,
outputs=order_summary_display
)
compute_inputs = [alibaba_key_input, video_input, subtotal_display, rating_input, review_input, prompt_display] + quantity_inputs
compute_outputs = [
analysis_display, tip_display, total_bill_display, prompt_display, video_input, order_summary_display
]
qwen_btn.click(
fn=lambda alibaba_key, vid, sub, rat, rev, prom, *qty: self.ui_handler.auto_tip_and_invoice(
alibaba_key, vid, sub, rat, rev, prom, *qty
),
inputs=compute_inputs,
outputs=compute_outputs
)
btn_5.click(
fn=lambda sub, *qty: self.ui_handler.manual_tip_and_invoice(5, sub, *qty),
inputs=[subtotal_display] + quantity_inputs,
outputs=[analysis_display, tip_display, total_bill_display, order_summary_display]
)
btn_10.click(
fn=lambda sub, *qty: self.ui_handler.manual_tip_and_invoice(10, sub, *qty),
inputs=[subtotal_display] + quantity_inputs,
outputs=[analysis_display, tip_display, total_bill_display, order_summary_display]
)
btn_15.click(
fn=lambda sub, *qty: self.ui_handler.manual_tip_and_invoice(15, sub, *qty),
inputs=[subtotal_display] + quantity_inputs,
outputs=[analysis_display, tip_display, total_bill_display, order_summary_display]
)
btn_20.click(
fn=lambda sub, *qty: self.ui_handler.manual_tip_and_invoice(20, sub, *qty),
inputs=[subtotal_display] + quantity_inputs,
outputs=[analysis_display, tip_display, total_bill_display, order_summary_display]
)
btn_25.click(
fn=lambda sub, *qty: self.ui_handler.manual_tip_and_invoice(25, sub, *qty),
inputs=[subtotal_display] + quantity_inputs,
outputs=[analysis_display, tip_display, total_bill_display, order_summary_display]
)
payment_btn.click(
fn=self.ui_handler.process_payment,
inputs=[total_bill_display],
outputs=[payment_result]
)
return interface
def run_gradio(self):
interface = self.create_gradio_blocks()
interface.launch(share=True)
def run_flask(self):
@self.flask_app.route("/")
def index():
return "Hello Flask"
self.flask_app.run(host="0.0.0.0", port=5000, debug=True)
if __name__ == "__main__":
app = App()
app.run_gradio()