|
import os |
|
import shutil |
|
import cv2 |
|
import base64 |
|
import uuid |
|
from flask import Flask |
|
import gradio as gr |
|
import re |
|
import pandas as pd |
|
from selenium import webdriver |
|
from selenium.webdriver.common.by import By |
|
from selenium.webdriver.chrome.service import Service |
|
from selenium.webdriver.support.ui import WebDriverWait |
|
from selenium.webdriver.support import expected_conditions as EC |
|
from webdriver_manager.chrome import ChromeDriverManager |
|
from selenium.common.exceptions import NoSuchElementException, TimeoutException |
|
import time |
|
|
|
class GoogleReviewManager: |
|
""" |
|
구글 리뷰 크롤링을 통해 리뷰 데이터를 한 번만 가져와 텍스트로 저장하고, |
|
DEFAULT_PROMPT_TEMPLATE에 적용할 리뷰 문자열을 생성하는 클래스. |
|
""" |
|
def __init__(self, url, target_review_count=20): |
|
self.url = url |
|
self.target_review_count = target_review_count |
|
self.reviews_text = self.fetch_reviews_text() |
|
|
|
def fetch_reviews_text(self): |
|
df_reviews = self.google_review_crawling(self.target_review_count, self.url) |
|
if df_reviews.empty: |
|
return "(구글 리뷰를 불러오지 못했습니다.)" |
|
reviews = [] |
|
for index, row in df_reviews.iterrows(): |
|
|
|
reviews.append(f"[{row['Rating']} stars] {row['Review Text']}") |
|
|
|
return "\n".join(reviews) |
|
|
|
@staticmethod |
|
def format_google_reviews(reviews_text): |
|
|
|
reviews = [line for line in reviews_text.split("\n") if line.strip() and "####" not in line] |
|
formatted_reviews = [] |
|
for i, review in enumerate(reviews, start=1): |
|
formatted_reviews.append(f"#### Google Review {i} ####\n{review}") |
|
return "\n\n".join(formatted_reviews) |
|
|
|
def google_review_crawling(self, TARGET_REVIEW_COUNT, url): |
|
try: |
|
service = Service(ChromeDriverManager().install()) |
|
options = webdriver.ChromeOptions() |
|
options.add_argument("--headless=new") |
|
options.add_argument("--disable-gpu") |
|
options.add_argument("--window-size=600,600") |
|
|
|
options.add_argument("--lang=en") |
|
options.add_argument( |
|
"user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36") |
|
driver = webdriver.Chrome(service=service, options=options) |
|
print("웹 드라이버 설정 완료 (헤드리스 모드).") |
|
except Exception as e: |
|
print(f"웹 드라이버 설정 중 오류 발생: {e}") |
|
exit() |
|
|
|
reviews_data = [] |
|
processed_keys = set() |
|
|
|
try: |
|
driver.get(url) |
|
print("Google Maps 접속 완료.") |
|
time.sleep(3) |
|
zoom_level = 0.7 |
|
driver.execute_script(f"document.body.style.zoom = '{zoom_level}'") |
|
|
|
try: |
|
overall_rating_selector = (By.CSS_SELECTOR, "div.F7nice span[aria-hidden='true']") |
|
overall_rating_element = WebDriverWait(driver, 10).until( |
|
EC.visibility_of_element_located(overall_rating_selector) |
|
) |
|
overall_rating_text = overall_rating_element.text |
|
print(f"총 평점 찾음: {overall_rating_text}") |
|
except (TimeoutException, NoSuchElementException): |
|
print("총 평점 요소를 찾지 못했습니다.") |
|
|
|
|
|
review_tab_button = None |
|
possible_review_selectors = [ |
|
(By.XPATH, "//button[@role='tab'][contains(., 'Reviews')]"), |
|
(By.XPATH, "//button[contains(text(), 'Reviews')]"), |
|
(By.CSS_SELECTOR, "button[aria-label*='Reviews']"), |
|
] |
|
wait_for_review_tab = WebDriverWait(driver, 10) |
|
for selector in possible_review_selectors: |
|
try: |
|
review_tab_button = wait_for_review_tab.until( |
|
EC.element_to_be_clickable(selector) |
|
) |
|
print(f"리뷰 탭 버튼 찾음 (방식: {selector[0]}, 값: {selector[1]})") |
|
break |
|
except TimeoutException: |
|
continue |
|
if not review_tab_button: |
|
print("리뷰 탭 버튼을 찾을 수 없습니다.") |
|
raise NoSuchElementException("리뷰 탭 버튼 없음") |
|
review_tab_button.click() |
|
time.sleep(3) |
|
|
|
|
|
try: |
|
sort_button_selector = (By.XPATH, |
|
"//button[contains(@aria-label, '정렬 기준') or contains(@aria-label, 'Sort by') or .//span[contains(text(), 'Sort')]]") |
|
sort_button = WebDriverWait(driver, 10).until( |
|
EC.element_to_be_clickable(sort_button_selector) |
|
) |
|
print("정렬 기준 버튼 찾음. 클릭 시도...") |
|
sort_button.click() |
|
time.sleep(1) |
|
newest_option_selector = (By.XPATH, "//div[@role='menuitemradio'][contains(., 'Newest')]") |
|
newest_option = WebDriverWait(driver, 10).until( |
|
EC.element_to_be_clickable(newest_option_selector) |
|
) |
|
print("최신순 옵션 찾음. 클릭 시도...") |
|
newest_option.click() |
|
print("최신순으로 정렬 적용됨. 잠시 대기...") |
|
time.sleep(3) |
|
except (TimeoutException, NoSuchElementException) as e: |
|
print(f"정렬 적용 중 오류 발생: {e}. 기본 정렬 상태로 진행합니다.") |
|
time.sleep(3) |
|
|
|
scrollable_div_selector = (By.CSS_SELECTOR, "div.m6QErb.DxyBCb.kA9KIf.dS8AEf.XiKgde[tabindex='-1']") |
|
review_elements_selector = (By.CSS_SELECTOR, "div.jftiEf.fontBodyMedium") |
|
try: |
|
scrollable_div = WebDriverWait(driver, 15).until( |
|
EC.presence_of_element_located(scrollable_div_selector) |
|
) |
|
print("리뷰 스크롤 영역 찾음.") |
|
except TimeoutException: |
|
print("리뷰 스크롤 영역을 찾을 수 없습니다.") |
|
scrollable_div = None |
|
time.sleep(5) |
|
|
|
def get_review_text(review): |
|
try: |
|
more_button = review.find_element(By.CSS_SELECTOR, "button.w8nwRe.kyuRq") |
|
driver.execute_script("arguments[0].scrollIntoView(true);", more_button) |
|
time.sleep(0.3) |
|
driver.execute_script("arguments[0].click();", more_button) |
|
time.sleep(0.5) |
|
except Exception: |
|
pass |
|
try: |
|
review_text = review.find_element(By.CSS_SELECTOR, "span.wiI7pd").text |
|
return review_text.strip() |
|
except Exception: |
|
return "" |
|
|
|
loop_count = 0 |
|
max_loop = 50 |
|
while len(reviews_data) < TARGET_REVIEW_COUNT and loop_count < max_loop: |
|
loop_count += 1 |
|
previous_count = len(reviews_data) |
|
all_reviews = driver.find_elements(*review_elements_selector) |
|
print(f"Loop {loop_count}: 총 {len(all_reviews)}개의 리뷰 요소 발견.") |
|
|
|
for review in all_reviews: |
|
try: |
|
reviewer_name = review.find_element(By.CSS_SELECTOR, "div.d4r55").text |
|
except Exception: |
|
reviewer_name = "N/A" |
|
try: |
|
review_date = review.find_element(By.CSS_SELECTOR, "span.rsqaWe").text |
|
except Exception: |
|
review_date = "N/A" |
|
unique_key = reviewer_name + review_date |
|
if unique_key in processed_keys: |
|
continue |
|
processed_keys.add(unique_key) |
|
|
|
review_text = get_review_text(review) |
|
if review_text: |
|
try: |
|
rating_span = review.find_element(By.CSS_SELECTOR, "span.kvMYJc") |
|
rating = rating_span.get_attribute("aria-label") |
|
except Exception: |
|
rating = "N/A" |
|
review_info = { |
|
"reviewer_name": reviewer_name, |
|
"rating": rating, |
|
"date": review_date, |
|
"text": review_text.replace('\n', ' ') |
|
} |
|
reviews_data.append(review_info) |
|
print(f"리뷰 추가: {reviewer_name}, {review_date}") |
|
if len(reviews_data) >= TARGET_REVIEW_COUNT: |
|
break |
|
|
|
if len(reviews_data) == previous_count: |
|
print("새로운 리뷰가 추가되지 않아 스크롤을 중단합니다.") |
|
break |
|
|
|
if scrollable_div: |
|
for i in range(20): |
|
time.sleep(0.1) |
|
scroll_amount = 1000 |
|
driver.execute_script('arguments[0].scrollBy(0, arguments[1]);', scrollable_div, scroll_amount) |
|
time.sleep(2) |
|
else: |
|
break |
|
|
|
if reviews_data: |
|
review_list = [] |
|
for review in reviews_data[:TARGET_REVIEW_COUNT]: |
|
rating_str = review['rating'] |
|
if rating_str != "N/A": |
|
try: |
|
rating_num = float(rating_str.split()[0]) |
|
except Exception: |
|
rating_num = None |
|
else: |
|
rating_num = None |
|
|
|
review_list.append({ |
|
"Name": review['reviewer_name'], |
|
"Rating": rating_num, |
|
"Date / Time Ago": review['date'], |
|
"Review Text": review['text'] |
|
}) |
|
df_reviews = pd.DataFrame(review_list) |
|
else: |
|
df_reviews = pd.DataFrame() |
|
except Exception as e: |
|
print(f"스크립트 실행 중 예기치 않은 오류 발생: {e}") |
|
df_reviews = pd.DataFrame() |
|
finally: |
|
if 'driver' in locals() and driver: |
|
driver.quit() |
|
|
|
return df_reviews |
|
|
|
|
|
class Config: |
|
"""애플리케이션 설정 및 상수""" |
|
FOOD_ITEMS = [ |
|
{"name": "짜장면", "image": "images/food1.jpg", "price": 7.00}, |
|
{"name": "짬뽕", "image": "images/food2.jpg", "price": 8.50}, |
|
{"name": "탕수육", "image": "images/food3.jpg", "price": 15.00}, |
|
{"name": "볶음밥", "image": "images/food4.jpg", "price": 7.50}, |
|
{"name": "깐풍기", "image": "images/food5.jpg", "price": 18.00}, |
|
{"name": "마파두부", "image": "images/food6.jpg", "price": 12.00}, |
|
{"name": "콜라", "image": "images/food6.jpg", "price": 12.00}, |
|
{"name": "사이다", "image": "images/food6.jpg", "price": 12.00}, |
|
] |
|
|
|
QWEN_API_KEY = "" |
|
|
|
DEFAULT_PROMPT_TEMPLATE = ( |
|
"### Persona ###\n" |
|
"You are an expert tip calculation assistant focusing on service quality observed in a video, and you also consider the user's review, star rating, and recent Google reviews. " |
|
"Your role is to evaluate all these aspects evenly by assigning each a score out of 100, then calculate the overall average score to determine the appropriate tip percentage.\n\n" |
|
|
|
"### Task ###\n" |
|
"1. **Video Analysis**: Analyze the video frames as described in the 'Video Caption' to observe the staff's actions and interactions. " |
|
"Assign a score out of 100 based on the observed service quality.\n\n" |
|
|
|
"2. **Bill Amount Determination**: Determine the bill amount by following these steps:\n" |
|
" - If an explicit bill amount is mentioned in the 'Video Caption', use it.\n" |
|
" - Otherwise, use the 'Calculated Subtotal' provided.\n" |
|
" - If neither is available, assume a default value of $50.\n\n" |
|
|
|
"3. **Overall Service Quality Evaluation**:\n" |
|
" Evaluate the service quality by evenly scoring the following four components, each out of 100:\n" |
|
" a) **Video Service Score**: Service quality observed in the video (from Video Caption).\n" |
|
" b) **Google Review Score**: Insights from the Recent Google Reviews (note if any racist, extremely negative comments, or quality issues are reported).\n" |
|
" c) **User Review Score**: The user's review (if the review mentions improvements or enhanced service, especially if it indicates that previously reported issues like racism have been resolved, adjust the negative impact accordingly).\n" |
|
" d) **Star Rating Score**: The user's star rating, interpreted on a scale where 5/5 corresponds to 100 points.\n" |
|
" - Calculate the overall average score by taking the mean of these four scores.\n\n" |
|
|
|
"4. **Service Quality Classification and Tip Guidelines**:\n" |
|
" - Based on the overall average score, classify the service quality as follows:\n" |
|
" - Poor Service: Overall average score < 60 (Tip range: 0% ~ 5% of the bill)\n" |
|
" - Average Service: Overall average score ≥ 60 and < 80 (Tip range: 10% ~ 15% of the bill)\n" |
|
" - Good Service: Overall average score ≥ 80 (Tip range: 15% ~ 20% of the bill)\n" |
|
" - Select a specific tip percentage within the appropriate range.\n" |
|
" - Calculate the tip amount by multiplying the determined bill amount by the chosen tip percentage (round to two decimal places).\n" |
|
" - Calculate the final total bill by adding the tip amount to the subtotal (round to two decimal places).\n\n" |
|
|
|
"5. **Review Prioritization and Score Adjustment**:\n" |
|
" - Even though all four factors are evaluated equally, if the user's review explicitly indicates improvements (e.g., stating 'There's no racism!' or similar phrases), then increase the Google Review score by adjusting or restoring it to a value higher than the score initially reduced due to negative comments. In other words, if improvements are noted, add a positive increment (using a '+' adjustment) to the Google Review score, ensuring it is higher than the negatively impacted value.\n" |
|
" - Ensure that the user's review score takes priority when there is a conflict with the negative tone of the Google Reviews.\n\n" |
|
|
|
"### User Context ###\n" |
|
" - Current Country: USA\n" |
|
" - Restaurant Name: The Golden Spoon (Assumed)\n" |
|
" - Calculated Subtotal: ${calculated_subtotal:.2f}\n" |
|
" - User Star Rating: {star_rating} / 5\n" |
|
" - Currently User Review: {user_review}\n\n" |
|
|
|
"### Recent Google Review ###\n\n" |
|
"{google_reviews}\n\n" |
|
"#### Recent Google Review 3 ####\n" |
|
"[1.0 stars] It was a very racist remark, 매우 인종차별적인 발언을 겪었어요. 모욕적입니다,,\n\n" |
|
|
|
"### Input ###\n" |
|
"Video Caption:\n{{caption_text}}\n\n" |
|
|
|
"### Output ###\n" |
|
"Return your answer in the exact format below:\n" |
|
"Video Text Analysis: [Summary of the observed actions and interactions of the staff in the video along with the assigned score (out of 100) based on video analysis.]\n" |
|
"Recent Google Review Analysis: [Summary of the insights from the Google Reviews, including any negative or racist comments, with the assigned score (out of 100).]\n" |
|
"User Review Analysis: [Summary of the user's review including any improvements or enhanced service mentions, with the assigned score (out of 100).]\n" |
|
"Star Rating Analysis: [Interpret the user's star rating (e.g., converting 5/5 to 100 points) and include the assigned score (out of 100).]\n" |
|
"Overall Analysis: [Step-by-step explanation detailing:\n" |
|
" - How the bill amount was determined;\n" |
|
" - How each of the four components was scored and how any negative scores (e.g., for racism) were adjusted based on improvement indications in the user's review;\n" |
|
" - How the overall average score was calculated;\n" |
|
" - The reasoning for the final service quality classification based on the average score;\n" |
|
" - How the tip percentage was chosen within the guideline range and the detailed calculation for the tip amount and final total bill.]\n\n" |
|
|
|
"### Example Output Indicators (for reference only) ###\n" |
|
"**Final Tip Percentage**: 12.5%\n" |
|
"**Final Tip Amount**: $6.25\n" |
|
"**Final Total Bill**: $56.25\n\n" |
|
|
|
"### Output Indicators ###\n" |
|
"**Final Tip Percentage**: [X]% (only floating point)\n" |
|
"**Final Tip Amount**: $[Calculated Tip]\n" |
|
"**Final Total Bill**: $[Subtotal + Tip]\n" |
|
) |
|
|
|
CUSTOM_CSS = """ |
|
#food-container { |
|
display: grid; |
|
grid-template-columns: repeat(3, 1fr); |
|
gap: 10px; |
|
overflow-y: auto; |
|
height: 600px; |
|
} |
|
#qwen-button { |
|
background-color: #8A2BE2 !important; |
|
color: white !important; |
|
border-color: #8A2BE2 !important; |
|
} |
|
#qwen-button:hover { |
|
background-color: #7722CC !important; |
|
} |
|
""" |
|
|
|
def __init__(self): |
|
review_url = "https://www.google.com/maps/place/Wolfgang%E2%80%99s+Steakhouse/data=!3m1!4b1!4m6!3m5!1s0x357ca4778cdd1105:0x27d5ead252b66bfd!8m2!3d37.5244965!4d127.0414635!16s%2Fg%2F11c3pwpp26?hl=en&entry=ttu&g_ep=EgoyMDI1MDQwMi4xIKXMDSoASAFQAw%3D%3D" |
|
self.google_review_manager = GoogleReviewManager(review_url, target_review_count=2) |
|
|
|
self.GOOGLE_REVIEWS = GoogleReviewManager.format_google_reviews( |
|
self.google_review_manager.reviews_text |
|
) |
|
|
|
if not os.path.exists("images"): |
|
print("경고: 'images' 폴더를 찾을 수 없습니다. 음식 이미지가 표시되지 않을 수 있습니다.") |
|
for item in self.FOOD_ITEMS: |
|
if not os.path.exists(item["image"]): |
|
print(f"경고: 이미지 파일을 찾을 수 없습니다 - {item['image']}") |
|
|
|
|
|
|
|
class ModelClients: |
|
def __init__(self, config: Config): |
|
self.config = config |
|
from openai import OpenAI as QwenOpenAI |
|
self.qwen_client = QwenOpenAI( |
|
api_key=config.QWEN_API_KEY, |
|
base_url="https://dashscope-intl.aliyuncs.com/compatible-mode/v1", |
|
) |
|
|
|
def encode_video_qwen(self, video_path): |
|
with open(video_path, "rb") as video_file: |
|
return base64.b64encode(video_file.read()).decode("utf-8") |
|
|
|
|
|
|
|
class VideoProcessor: |
|
def extract_video_frames(self, video_path, output_folder=None, fps=1): |
|
if not video_path: |
|
return [], None |
|
if output_folder is None: |
|
output_folder = f"frames_list/frames_{uuid.uuid4().hex}" |
|
os.makedirs(output_folder, exist_ok=True) |
|
cap = cv2.VideoCapture(video_path) |
|
if not cap.isOpened(): |
|
print(f"오류: 비디오 파일을 열 수 없습니다 - {video_path}") |
|
return [], None |
|
frame_paths = [] |
|
frame_rate = cap.get(cv2.CAP_PROP_FPS) |
|
if not frame_rate or frame_rate == 0: |
|
print("경고: FPS를 읽을 수 없습니다, 기본값 4으로 설정합니다.") |
|
frame_rate = 4.0 |
|
frame_interval = int(frame_rate / fps) if fps > 0 else 1 |
|
if frame_interval <= 0: |
|
frame_interval = 1 |
|
frame_count = 0 |
|
saved_frame_count = 0 |
|
while cap.isOpened(): |
|
ret, frame = cap.read() |
|
if not ret: |
|
break |
|
if frame is None: |
|
print(f"경고: {frame_count}번째 프레임이 비어있습니다.") |
|
frame_count += 1 |
|
continue |
|
if frame_count % frame_interval == 0: |
|
frame_path = os.path.join(output_folder, f"frame_{saved_frame_count}.jpg") |
|
try: |
|
if cv2.imwrite(frame_path, frame): |
|
frame_paths.append(frame_path) |
|
saved_frame_count += 1 |
|
else: |
|
print(f"경고: {frame_path} 저장 실패.") |
|
except Exception as e: |
|
print(f"경고: 프레임 저장 오류 ({frame_path}): {e}") |
|
frame_count += 1 |
|
cap.release() |
|
if not frame_paths: |
|
print("경고: 프레임 추출 실패.") |
|
if os.path.exists(output_folder): |
|
shutil.rmtree(output_folder) |
|
return [], None |
|
return frame_paths, output_folder |
|
|
|
def cleanup_temp_files(self, video_path, frame_folder): |
|
if video_path and "temp_video_" in video_path and os.path.exists(video_path): |
|
try: |
|
os.remove(video_path) |
|
print(f"임시 비디오 파일 삭제: {video_path}") |
|
except OSError as e: |
|
print(f"임시 비디오 파일 삭제 오류: {e}") |
|
if frame_folder and os.path.exists(frame_folder): |
|
try: |
|
shutil.rmtree(frame_folder) |
|
print(f"프레임 폴더 삭제: {frame_folder}") |
|
except OSError as e: |
|
print(f"프레임 폴더 삭제 오류: {e}") |
|
|
|
|
|
|
|
class TipCalculator: |
|
def __init__(self, config: Config, model_clients: ModelClients, video_processor: VideoProcessor): |
|
self.config = config |
|
self.model_clients = model_clients |
|
self.video_processor = video_processor |
|
|
|
def parse_llm_output(self, output_text): |
|
"""LLM 출력을 파싱하여 팁 계산 결과 추출""" |
|
analysis = "Analysis not found." |
|
tip_percentage = 0.0 |
|
tip_amount = 0.0 |
|
total_bill = 0.0 |
|
|
|
|
|
analysis_match = re.search(r"Analysis:\s*(.*?)\*\*Final Tip Percentage\*\*", output_text, |
|
re.DOTALL | re.IGNORECASE) |
|
if analysis_match: |
|
analysis = analysis_match.group(1).strip() |
|
else: |
|
analysis_match_alt = re.search(r"Analysis:\s*(.*)", output_text, re.DOTALL | re.IGNORECASE) |
|
if analysis_match_alt: |
|
analysis = analysis_match_alt.group(1).strip() |
|
|
|
|
|
percentage_match = re.search(r"\*\*Final Tip Percentage\*\*:\s*([0-9]+(?:\.[0-9]+)?)%", output_text, |
|
re.DOTALL | re.IGNORECASE) |
|
if percentage_match: |
|
try: |
|
tip_percentage = float(percentage_match.group(1)) |
|
except ValueError: |
|
print(f"경고: Tip Percentage 변환 실패 - {percentage_match.group(1)}") |
|
tip_percentage = 0.0 |
|
|
|
|
|
tip_match = re.search(r"\*\*Final Tip Amount\*\*:\s*\$?\s*([0-9]+(?:\.[0-9]+)?)", output_text, re.IGNORECASE) |
|
if tip_match: |
|
try: |
|
tip_amount = float(tip_match.group(1)) |
|
except ValueError: |
|
print(f"경고: Tip Amount 변환 실패 - {tip_match.group(1)}") |
|
tip_amount = 0.0 |
|
else: |
|
print(f"경고: 출력에서 Tip Amount를 찾을 수 없습니다:\n{output_text}") |
|
|
|
|
|
total_match = re.search(r"\*\*Final Total Bill\*\*:\s*\$?\s*([0-9]+(?:\.[0-9]+)?)", output_text, re.IGNORECASE) |
|
if total_match: |
|
try: |
|
total_bill = float(total_match.group(1)) |
|
except ValueError: |
|
print(f"경고: Total Bill 변환 실패 - {total_match.group(1)}") |
|
if len(analysis) < 20 and analysis == "Analysis not found.": |
|
analysis = output_text |
|
|
|
return analysis, tip_percentage, tip_amount, output_text |
|
|
|
def process_tip_qwen(self, video_file_path, star_rating, user_review, calculated_subtotal, custom_prompt=None): |
|
if not os.path.exists(video_file_path): |
|
return "Error: 비디오 파일 경로가 유효하지 않습니다.", 0.0, 0.0, [], None, "" |
|
base64_video = self.model_clients.encode_video_qwen(video_file_path) |
|
omni_caption_prompt = ''' |
|
Task 1: Describe the waiters' actions in these restaurant video frames. Please check for mistakes or negative behaviors. |
|
Task 2: Provide a short chronological summary of the entire scene. |
|
''' |
|
omni_result = self.model_clients.qwen_client.chat.completions.create( |
|
model="qwen2.5-omni-7b", |
|
messages=[ |
|
{ |
|
"role": "system", |
|
"content": [{"type": "text", "text": "You are a helpful assistant."}], |
|
}, |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{"type": "video_url", "video_url": {"url": f"data:;base64,{base64_video}"}}, |
|
{"type": "text", "text": omni_caption_prompt}, |
|
], |
|
}, |
|
], |
|
modalities=["text"], |
|
stream=True, |
|
stream_options={"include_usage": True}, |
|
) |
|
all_omni_chunks = list(omni_result) |
|
caption_text = "" |
|
for chunk in all_omni_chunks[:-1]: |
|
if not chunk.choices: |
|
continue |
|
if chunk.choices[0].delta.content: |
|
caption_text += chunk.choices[0].delta.content |
|
if not caption_text.strip(): |
|
caption_text = "(No caption from Omni)" |
|
user_review = user_review.strip() if user_review else "(No user review)" |
|
if custom_prompt is None: |
|
prompt = self.config.DEFAULT_PROMPT_TEMPLATE.format( |
|
calculated_subtotal=calculated_subtotal, |
|
star_rating=star_rating, |
|
user_review=user_review |
|
) |
|
else: |
|
try: |
|
prompt = custom_prompt.format( |
|
calculated_subtotal=calculated_subtotal, |
|
star_rating=star_rating, |
|
user_review=user_review |
|
) |
|
except: |
|
prompt = self.config.DEFAULT_PROMPT_TEMPLATE.format( |
|
calculated_subtotal=calculated_subtotal, |
|
star_rating=star_rating, |
|
user_review=user_review |
|
) |
|
final_prompt = prompt.replace("{caption_text}", caption_text) |
|
qvq_result = self.model_clients.qwen_client.chat.completions.create( |
|
model="qwen2.5-vl-32b-instruct", |
|
messages=[ |
|
{"role": "system", "content": [{"type": "text", "text": "You are a helpful assistant."}]}, |
|
{"role": "user", "content": [{"type": "text", "text": final_prompt}]}, |
|
], |
|
modalities=["text"], |
|
stream=True, |
|
) |
|
all_qvq_chunks = list(qvq_result) |
|
final_reasoning = "" |
|
final_answer = "" |
|
is_answering = False |
|
for c in all_qvq_chunks[:-1]: |
|
if not c.choices: |
|
continue |
|
d = c.choices[0].delta |
|
if hasattr(d, "reasoning_content") and d.reasoning_content: |
|
final_reasoning += d.reasoning_content |
|
if d.content: |
|
if not is_answering: |
|
print("\n" + "=" * 20 + "Complete Response" + "=" * 20 + "\n") |
|
is_answering = True |
|
final_answer += d.content |
|
final_text = final_reasoning + "\n" + final_answer |
|
analysis, tip_percentage, tip_amount, output_text = self.parse_llm_output(final_text) |
|
return analysis, tip_percentage, tip_amount, [], None, output_text |
|
|
|
def calculate_manual_tip(self, tip_percent, subtotal): |
|
tip_amount = subtotal * (tip_percent / 100) |
|
total_bill = subtotal + tip_amount |
|
analysis_output = f"Manual calculation using fixed tip percentage of {tip_percent}%." |
|
tip_output = f"${tip_amount:.2f} ({tip_percent:.1f}%)" |
|
total_bill_output = f"${total_bill:.2f}" |
|
return analysis_output, tip_output, total_bill_output |
|
|
|
|
|
|
|
class UIHandler: |
|
def __init__(self, config: Config, tip_calculator: TipCalculator, video_processor: VideoProcessor): |
|
self.config = config |
|
self.tip_calculator = tip_calculator |
|
self.video_processor = video_processor |
|
|
|
def update_subtotal_and_prompt(self, *args): |
|
"""사용자 입력에 따라 소계 및 프롬프트 업데이트""" |
|
num_food_items = len(self.config.FOOD_ITEMS) |
|
quantities = args[:num_food_items] |
|
star_rating = args[num_food_items] |
|
user_review = args[num_food_items + 1] |
|
|
|
calculated_subtotal = 0.0 |
|
for i in range(num_food_items): |
|
calculated_subtotal += self.config.FOOD_ITEMS[i]['price'] * quantities[i] |
|
|
|
user_review_text = user_review.strip() if user_review and user_review.strip() else "(No user review provided)" |
|
|
|
|
|
formatted_google_reviews = GoogleReviewManager.format_google_reviews(self.config.GOOGLE_REVIEWS) |
|
|
|
updated_prompt = self.config.DEFAULT_PROMPT_TEMPLATE.format( |
|
calculated_subtotal=calculated_subtotal, |
|
star_rating=star_rating, |
|
user_review=user_review_text, |
|
google_reviews=formatted_google_reviews |
|
) |
|
updated_prompt = updated_prompt.replace("{caption_text}", "{{caption_text}}") |
|
|
|
return calculated_subtotal, updated_prompt |
|
|
|
def compute_tip(self, alibaba_key, video_file_obj, subtotal, star_rating, user_review, custom_prompt_text): |
|
analysis_output = "계산을 시작합니다..." |
|
tip_percentage = 0.0 |
|
tip_output = "$0.00" |
|
total_bill_output = f"${subtotal:.2f}" |
|
if video_file_obj is None: |
|
return "오류: 비디오 파일을 업로드해주세요.", "$0.00", total_bill_output, custom_prompt_text, gr.update(value=None) |
|
try: |
|
if alibaba_key and alibaba_key.strip(): |
|
from openai import OpenAI as QwenOpenAI |
|
self.tip_calculator.model_clients.qwen_client = QwenOpenAI( |
|
api_key=alibaba_key, |
|
base_url="https://dashscope-intl.aliyuncs.com/compatible-mode/v1" |
|
) |
|
temp_video_path = f"temp_video_{uuid.uuid4().hex}.mp4" |
|
original_path = video_file_obj.name if hasattr(video_file_obj, 'name') else video_file_obj |
|
shutil.copyfile(original_path, temp_video_path) |
|
print(f"임시 비디오 파일 생성: {temp_video_path}") |
|
except Exception as e: |
|
print(f"임시 비디오 파일 생성 오류: {e}") |
|
return f"오류: 비디오 파일을 처리할 수 없습니다: {e}", "$0.00", total_bill_output, custom_prompt_text, None |
|
frame_folder = None |
|
try: |
|
analysis, tip_percentage, tip_amount, _, _, output_text = self.tip_calculator.process_tip_qwen( |
|
temp_video_path, star_rating, user_review, subtotal, custom_prompt_text |
|
) |
|
if "Error" in analysis: |
|
analysis_output = analysis |
|
tip_amount = 0.0 |
|
else: |
|
analysis_output = f"Tip Percentage: {tip_percentage:.1f}%\n\n{output_text}" |
|
tip_output = f"${tip_amount:.2f} ({tip_percentage:.1f}%)" |
|
total_bill = subtotal + tip_amount |
|
total_bill_output = f"${total_bill:.2f}" |
|
except Exception as e: |
|
print(f"팁 계산 중 오류 발생 (qwen): {e}") |
|
analysis_output = f"오류 발생: {e}" |
|
tip_output = "$0.00" |
|
total_bill_output = f"${subtotal:.2f}" |
|
finally: |
|
self.video_processor.cleanup_temp_files(temp_video_path, frame_folder) |
|
return analysis_output, tip_output, total_bill_output, custom_prompt_text, gr.update(value=None) |
|
|
|
def auto_tip_and_invoice(self, alibaba_key, video_file_obj, subtotal, star_rating, review, prompt, *quantities): |
|
analysis, tip_disp, total_bill_disp, prompt_out, vid_out = self.compute_tip( |
|
alibaba_key, video_file_obj, subtotal, star_rating, review, prompt |
|
) |
|
|
|
invoice = self.update_invoice_summary(*quantities, tip_disp, total_bill_disp) |
|
return analysis, tip_disp, total_bill_disp, prompt_out, vid_out, invoice |
|
|
|
def update_invoice_summary(self, *args): |
|
num_items = len(self.config.FOOD_ITEMS) |
|
quantities = args[:num_items] |
|
if len(args) >= num_items + 2: |
|
tip_str = args[num_items] |
|
total_bill_str = args[num_items + 1] |
|
else: |
|
tip_str = "$0.00" |
|
total_bill_str = "$0.00" |
|
summary = "" |
|
for i, q in enumerate(quantities): |
|
try: |
|
q_val = float(q) |
|
except: |
|
q_val = 0 |
|
if q_val > 0: |
|
item = self.config.FOOD_ITEMS[i] |
|
total_price = item['price'] * q_val |
|
summary += f"{item['name']} x{int(q_val)} : ${total_price:.2f}\n" |
|
if summary == "": |
|
summary = "주문한 메뉴가 없습니다." |
|
summary += f"\nTip: {tip_str}\nTotal Bill: {total_bill_str}" |
|
return summary |
|
|
|
def manual_tip_and_invoice(self, tip_percent, subtotal, *quantities): |
|
analysis, tip_disp, total_bill_disp = self.tip_calculator.calculate_manual_tip(tip_percent, subtotal) |
|
invoice = self.update_invoice_summary(*quantities, tip_disp, total_bill_disp) |
|
return analysis, tip_disp, total_bill_disp, invoice |
|
|
|
def process_payment(self, total_bill): |
|
return f"{total_bill} 결제되었습니다." |
|
|
|
|
|
|
|
class App: |
|
def __init__(self): |
|
self.config = Config() |
|
self.model_clients = ModelClients(self.config) |
|
self.video_processor = VideoProcessor() |
|
self.tip_calculator = TipCalculator(self.config, self.model_clients, self.video_processor) |
|
self.ui_handler = UIHandler(self.config, self.tip_calculator, self.video_processor) |
|
self.flask_app = Flask(__name__) |
|
|
|
def create_gradio_blocks(self): |
|
with gr.Blocks(title="Video Tip Calculation Interface", theme=gr.themes.Soft(), |
|
css=self.config.CUSTOM_CSS) as interface: |
|
gr.Markdown("## Video Tip Calculation Interface (Structured)") |
|
quantity_inputs = [] |
|
subtotal_display = gr.Number(label="Subtotal ($)", value=0.0, interactive=False, visible=False) |
|
with gr.Row(): |
|
with gr.Column(scale=2): |
|
gr.Markdown("### 1. Select Food Items") |
|
with gr.Column(elem_id="food-container"): |
|
for item in self.config.FOOD_ITEMS: |
|
with gr.Column(): |
|
gr.Image( |
|
value=item["image"], |
|
label=None, |
|
show_label=False, |
|
width=150, |
|
height=150, |
|
interactive=False |
|
) |
|
gr.Markdown(f"**{item['name']}**") |
|
gr.Markdown(f"Price: ${item['price']:.2f}") |
|
q_input = gr.Number( |
|
label="Qty", |
|
value=0, |
|
minimum=0, |
|
step=1, |
|
elem_id=f"qty_{item['name'].replace(' ', '_')}" |
|
) |
|
quantity_inputs.append(q_input) |
|
subtotal_visible_display = gr.Textbox(label="Subtotal", value="$0.00", interactive=False) |
|
gr.Markdown("### 2. Service Feedback") |
|
review_input = gr.Textbox(label="Review", placeholder="서비스 리뷰를 작성해주세요.", lines=3) |
|
rating_input = gr.Radio(choices=[1, 2, 3, 4, 5], value=3, label="⭐Star Rating (1-5)⭐", type="value") |
|
gr.Markdown("### 3. Calculate Tip") |
|
with gr.Row(): |
|
btn_5 = gr.Button("5%") |
|
btn_10 = gr.Button("10%") |
|
btn_15 = gr.Button("15%") |
|
btn_20 = gr.Button("20%") |
|
btn_25 = gr.Button("25%") |
|
with gr.Row(): |
|
qwen_btn = gr.Button("Alibaba-Qwen", variant="tertiary", elem_id="qwen-button") |
|
gr.Markdown("### 4. Results") |
|
tip_display = gr.Textbox(label="Calculated Tip", value="$0.00", interactive=False) |
|
total_bill_display = gr.Textbox(label="Total Bill (Subtotal + Tip)", value="$0.00", interactive=False) |
|
payment_btn = gr.Button("결제하기") |
|
payment_result = gr.Textbox(label="Payment Result", value="", interactive=False) |
|
with gr.Column(scale=1): |
|
gr.Markdown("### 5. Upload & Prompt") |
|
alibaba_key_input = gr.Textbox(label="Alibaba API Key", placeholder="Enter your Alibaba API Key", lines=1) |
|
video_input = gr.Video(label="Upload Service Video") |
|
prompt_display = gr.Textbox( |
|
label="Tip Calculation Prompt (Review/Rating/Subtotal 반영)", |
|
lines=20, |
|
max_lines=30, |
|
interactive=True, |
|
value=self.config.DEFAULT_PROMPT_TEMPLATE.format( |
|
calculated_subtotal=0.0, |
|
star_rating=3, |
|
user_review="(No user review provided)", |
|
google_reviews=GoogleReviewManager.format_google_reviews(self.config.GOOGLE_REVIEWS) |
|
).replace("{caption_text}", "{{caption_text}}") |
|
) |
|
gr.Markdown("### 6. AI Analysis") |
|
analysis_display = gr.Textbox(label="AI Analysis", lines=10, max_lines=15, interactive=True) |
|
gr.Markdown("### 7. 청구서") |
|
order_summary_display = gr.Textbox(label="청구서", value="주문한 메뉴가 없습니다.", interactive=True) |
|
subtotal_display.change( |
|
fn=lambda x: f"${x:.2f}", |
|
inputs=[subtotal_display], |
|
outputs=[subtotal_visible_display] |
|
) |
|
inputs_for_prompt_update = quantity_inputs + [rating_input, review_input] |
|
outputs_for_prompt_update = [subtotal_display, prompt_display] |
|
for comp in inputs_for_prompt_update: |
|
comp.change( |
|
fn=self.ui_handler.update_subtotal_and_prompt, |
|
inputs=inputs_for_prompt_update, |
|
outputs=outputs_for_prompt_update |
|
) |
|
for comp in quantity_inputs: |
|
comp.change( |
|
fn=self.ui_handler.update_invoice_summary, |
|
inputs=quantity_inputs, |
|
outputs=order_summary_display |
|
) |
|
compute_inputs = [alibaba_key_input, video_input, subtotal_display, rating_input, review_input, prompt_display] + quantity_inputs |
|
compute_outputs = [ |
|
analysis_display, tip_display, total_bill_display, prompt_display, video_input, order_summary_display |
|
] |
|
qwen_btn.click( |
|
fn=lambda alibaba_key, vid, sub, rat, rev, prom, *qty: self.ui_handler.auto_tip_and_invoice( |
|
alibaba_key, vid, sub, rat, rev, prom, *qty |
|
), |
|
inputs=compute_inputs, |
|
outputs=compute_outputs |
|
) |
|
btn_5.click( |
|
fn=lambda sub, *qty: self.ui_handler.manual_tip_and_invoice(5, sub, *qty), |
|
inputs=[subtotal_display] + quantity_inputs, |
|
outputs=[analysis_display, tip_display, total_bill_display, order_summary_display] |
|
) |
|
btn_10.click( |
|
fn=lambda sub, *qty: self.ui_handler.manual_tip_and_invoice(10, sub, *qty), |
|
inputs=[subtotal_display] + quantity_inputs, |
|
outputs=[analysis_display, tip_display, total_bill_display, order_summary_display] |
|
) |
|
btn_15.click( |
|
fn=lambda sub, *qty: self.ui_handler.manual_tip_and_invoice(15, sub, *qty), |
|
inputs=[subtotal_display] + quantity_inputs, |
|
outputs=[analysis_display, tip_display, total_bill_display, order_summary_display] |
|
) |
|
btn_20.click( |
|
fn=lambda sub, *qty: self.ui_handler.manual_tip_and_invoice(20, sub, *qty), |
|
inputs=[subtotal_display] + quantity_inputs, |
|
outputs=[analysis_display, tip_display, total_bill_display, order_summary_display] |
|
) |
|
btn_25.click( |
|
fn=lambda sub, *qty: self.ui_handler.manual_tip_and_invoice(25, sub, *qty), |
|
inputs=[subtotal_display] + quantity_inputs, |
|
outputs=[analysis_display, tip_display, total_bill_display, order_summary_display] |
|
) |
|
payment_btn.click( |
|
fn=self.ui_handler.process_payment, |
|
inputs=[total_bill_display], |
|
outputs=[payment_result] |
|
) |
|
return interface |
|
|
|
def run_gradio(self): |
|
interface = self.create_gradio_blocks() |
|
interface.launch(share=True) |
|
|
|
def run_flask(self): |
|
@self.flask_app.route("/") |
|
def index(): |
|
return "Hello Flask" |
|
self.flask_app.run(host="0.0.0.0", port=5000, debug=True) |
|
|
|
|
|
if __name__ == "__main__": |
|
app = App() |
|
app.run_gradio() |
|
|