Spaces:
Runtime error
Runtime error
Update app.py
Browse files
app.py
CHANGED
@@ -7,6 +7,7 @@ import requests
|
|
7 |
import os
|
8 |
import time
|
9 |
from autogen import AssistantAgent, GroupChat, GroupChatManager
|
|
|
10 |
|
11 |
# Initialize YOLOv8 for multi-label food detection
|
12 |
model = YOLO("yolov8n.pt") # Nano model for speed, fine-tune on food data later
|
@@ -14,20 +15,31 @@ model = YOLO("yolov8n.pt") # Nano model for speed, fine-tune on food data later
|
|
14 |
# Agent Functions (registered with AutoGen)
|
15 |
def recognize_foods(image):
|
16 |
start = time.time()
|
17 |
-
# Check if image is valid (not
|
18 |
-
if image is None or
|
19 |
print("Warning: Invalid or empty image detected.")
|
20 |
return [] # Return empty list for invalid images
|
21 |
-
|
22 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
23 |
results = model(pil_image)
|
24 |
foods = []
|
|
|
25 |
for result in results:
|
26 |
for cls in result.boxes.cls:
|
27 |
label = model.names[int(cls)]
|
28 |
if "food" in label.lower() or label in ["pasta", "rice", "tomato", "potato", "bread", "curry"]: # Expand this list
|
29 |
conf = result.boxes.conf[result.boxes.cls == cls].item()
|
30 |
foods.append((label, conf))
|
|
|
|
|
|
|
31 |
print(f"Recognition took {time.time() - start:.2f}s: Found foods {foods}")
|
32 |
return list(set(foods)) # Remove duplicates
|
33 |
|
@@ -36,15 +48,36 @@ def estimate_sizes(image, foods):
|
|
36 |
if not foods:
|
37 |
print("Warning: No foods to estimate sizes for.")
|
38 |
return {}
|
|
|
39 |
# Resize to match YOLO output for consistency
|
40 |
-
|
|
|
|
|
|
|
|
|
|
|
41 |
sizes = {}
|
42 |
total_area = img_cv.shape[0] * img_cv.shape[1]
|
43 |
-
|
44 |
-
|
45 |
-
|
46 |
-
|
47 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
48 |
print(f"Size estimation took {time.time() - start:.2f}s: Estimated sizes {sizes}")
|
49 |
return sizes
|
50 |
|
@@ -92,56 +125,93 @@ def fetch_nutrition(foods_with_sizes, nutritionix_key):
|
|
92 |
print(f"Nutritionix error: {str(e)}")
|
93 |
return f"Nutritionix error: {str(e)}"
|
94 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
95 |
# AutoGen Agent Definitions
|
96 |
food_recognizer = AssistantAgent(
|
97 |
name="FoodRecognizer",
|
98 |
-
system_message="Identify all food items in the image and return a list of (label, probability) pairs. Call recognize_foods with the image.",
|
99 |
function_map={"recognize_foods": recognize_foods}
|
100 |
)
|
101 |
|
102 |
size_estimator = AssistantAgent(
|
103 |
name="SizeEstimator",
|
104 |
-
system_message="Estimate portion sizes in grams for each recognized food based on the image. Call estimate_sizes with the image and list of foods.",
|
105 |
function_map={"estimate_sizes": estimate_sizes}
|
106 |
)
|
107 |
|
108 |
nutrition_fetcher = AssistantAgent(
|
109 |
name="NutritionFetcher",
|
110 |
-
system_message="Fetch nutritional data from the Nutritionix API using the user's key. Call fetch_nutrition with the foods and sizes dictionary and Nutritionix key.",
|
111 |
function_map={"fetch_nutrition": fetch_nutrition}
|
112 |
)
|
113 |
|
|
|
|
|
|
|
|
|
|
|
|
|
114 |
orchestrator = AssistantAgent(
|
115 |
name="Orchestrator",
|
116 |
-
system_message="Coordinate the workflow, format the output, and return the final result as text. Start by asking FoodRecognizer to process the image, then SizeEstimator, then NutritionFetcher, and finally format the results.",
|
117 |
function_map={}
|
118 |
)
|
119 |
|
120 |
# Custom speaker selection function (no LLM needed, updated for AutoGen 0.7.6)
|
121 |
def custom_select_speaker(last_speaker, groupchat):
|
122 |
-
"""Select the next speaker in a fixed order: FoodRecognizer → SizeEstimator → NutritionFetcher → Orchestrator."""
|
123 |
if last_speaker is None:
|
124 |
return food_recognizer # Return the Agent object, not the name
|
125 |
-
order = [food_recognizer, size_estimator, nutrition_fetcher, orchestrator]
|
126 |
current_index = order.index(last_speaker)
|
127 |
next_index = (current_index + 1) % len(order)
|
128 |
return order[next_index]
|
129 |
|
130 |
-
# Group Chat for Agent Coordination (no LLM, custom speaker selection method)
|
131 |
group_chat = GroupChat(
|
132 |
-
agents=[food_recognizer, size_estimator, nutrition_fetcher, orchestrator],
|
133 |
messages=[],
|
134 |
-
max_round=
|
135 |
speaker_selection_method=custom_select_speaker # Use correct parameter for AutoGen 0.7.6
|
136 |
)
|
137 |
manager = GroupChatManager(groupchat=group_chat)
|
138 |
|
139 |
# Orchestrator Logic (via AutoGen chat)
|
140 |
-
def orchestrate_workflow(image, nutritionix_key):
|
141 |
start = time.time()
|
142 |
|
143 |
-
# Initiate chat with Orchestrator, passing image and
|
144 |
message = f"Process this image: {image} with Nutritionix key: {nutritionix_key}"
|
|
|
|
|
145 |
response = manager.initiate_chat(
|
146 |
orchestrator,
|
147 |
message=message,
|
@@ -158,21 +228,33 @@ def orchestrate_workflow(image, nutritionix_key):
|
|
158 |
|
159 |
if isinstance(result, dict):
|
160 |
result = result.get("text", "No text output from agents.")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
161 |
print(f"Total time: {time.time() - start:.2f}s")
|
162 |
-
return
|
163 |
|
164 |
# Gradio Interface
|
165 |
interface = gr.Interface(
|
166 |
fn=orchestrate_workflow,
|
167 |
inputs=[
|
168 |
gr.Image(type="numpy", label="Upload a Food Photo"),
|
169 |
-
gr.Textbox(type="password", label="Your Nutritionix API Key (required)")
|
|
|
170 |
],
|
171 |
outputs=[
|
172 |
-
gr.Textbox(label="Nutrition Breakdown")
|
|
|
173 |
],
|
174 |
title="Food Nutrition Analyzer",
|
175 |
-
description="Upload a food photo and provide your Nutritionix API key for nutrition
|
176 |
)
|
177 |
|
178 |
if __name__ == "__main__":
|
|
|
7 |
import os
|
8 |
import time
|
9 |
from autogen import AssistantAgent, GroupChat, GroupChatManager
|
10 |
+
import openai
|
11 |
|
12 |
# Initialize YOLOv8 for multi-label food detection
|
13 |
model = YOLO("yolov8n.pt") # Nano model for speed, fine-tune on food data later
|
|
|
15 |
# Agent Functions (registered with AutoGen)
|
16 |
def recognize_foods(image):
|
17 |
start = time.time()
|
18 |
+
# Check if image is valid (not None or empty)
|
19 |
+
if image is None or image.size == 0:
|
20 |
print("Warning: Invalid or empty image detected.")
|
21 |
return [] # Return empty list for invalid images
|
22 |
+
|
23 |
+
# Convert to RGB and resize to 640x640
|
24 |
+
try:
|
25 |
+
pil_image = Image.fromarray(image).convert('RGB').resize((640, 640))
|
26 |
+
except Exception as e:
|
27 |
+
print(f"Error processing image: {str(e)}")
|
28 |
+
return [] # Return empty list on preprocessing failure
|
29 |
+
|
30 |
+
# Run YOLOv8 detection
|
31 |
results = model(pil_image)
|
32 |
foods = []
|
33 |
+
detected = False
|
34 |
for result in results:
|
35 |
for cls in result.boxes.cls:
|
36 |
label = model.names[int(cls)]
|
37 |
if "food" in label.lower() or label in ["pasta", "rice", "tomato", "potato", "bread", "curry"]: # Expand this list
|
38 |
conf = result.boxes.conf[result.boxes.cls == cls].item()
|
39 |
foods.append((label, conf))
|
40 |
+
detected = True
|
41 |
+
if not detected:
|
42 |
+
print("Warning: No food items detected in the image.")
|
43 |
print(f"Recognition took {time.time() - start:.2f}s: Found foods {foods}")
|
44 |
return list(set(foods)) # Remove duplicates
|
45 |
|
|
|
48 |
if not foods:
|
49 |
print("Warning: No foods to estimate sizes for.")
|
50 |
return {}
|
51 |
+
|
52 |
# Resize to match YOLO output for consistency
|
53 |
+
try:
|
54 |
+
img_cv = cv2.cvtColor(image, cv2.COLOR_RGB2BGR).resize((640, 640))
|
55 |
+
except Exception as e:
|
56 |
+
print(f"Error resizing image for size estimation: {str(e)}")
|
57 |
+
return {}
|
58 |
+
|
59 |
sizes = {}
|
60 |
total_area = img_cv.shape[0] * img_cv.shape[1]
|
61 |
+
|
62 |
+
# Use YOLO bounding boxes for more accurate sizing (if available)
|
63 |
+
pil_image = Image.fromarray(image).convert('RGB').resize((640, 640))
|
64 |
+
results = model(pil_image)
|
65 |
+
for result in results:
|
66 |
+
for box, cls in zip(result.boxes.xyxy, result.boxes.cls):
|
67 |
+
label = model.names[int(cls)]
|
68 |
+
if label in [food for food, _ in foods]:
|
69 |
+
box_area = (box[2] - box[0]) * (box[3] - box[1]) # Width * Height
|
70 |
+
# Simple heuristic: scale box area to grams (tune this based on data)
|
71 |
+
grams = min(500, int((box_area / (640 * 640)) * 500)) # Cap at 500g
|
72 |
+
sizes[label] = grams
|
73 |
+
|
74 |
+
# Fallback: even split if no boxes found
|
75 |
+
if not sizes:
|
76 |
+
for food, _ in foods:
|
77 |
+
area = total_area / len(foods) # Even split for now
|
78 |
+
grams = min(500, int(area / (640 * 640) * 100)) # 100g per ~640k pixels, capped at 500g
|
79 |
+
sizes[food] = grams
|
80 |
+
|
81 |
print(f"Size estimation took {time.time() - start:.2f}s: Estimated sizes {sizes}")
|
82 |
return sizes
|
83 |
|
|
|
125 |
print(f"Nutritionix error: {str(e)}")
|
126 |
return f"Nutritionix error: {str(e)}"
|
127 |
|
128 |
+
def get_nutrition_advice(nutrition_data, openai_key):
|
129 |
+
start = time.time()
|
130 |
+
if not openai_key:
|
131 |
+
print("Warning: No OpenAI API key provided—skipping advice.")
|
132 |
+
return "No OpenAI key provided—skipping advice."
|
133 |
+
if not nutrition_data:
|
134 |
+
print("Warning: No nutrition data to advise on.")
|
135 |
+
return "No nutrition data available for advice."
|
136 |
+
|
137 |
+
try:
|
138 |
+
openai.api_key = openai_key
|
139 |
+
prompt = "Given this nutritional data, suggest a short dietary tip (max 50 words):\n" + "\n".join(
|
140 |
+
[f"- {food}: {data['calories']} cal, {data['protein']}g protein, {data['fat']}g fat, {data['carbs']}g carbs"
|
141 |
+
for food, data in nutrition_data.items()]
|
142 |
+
)
|
143 |
+
response = openai.Completion.create(
|
144 |
+
model="text-davinci-003",
|
145 |
+
prompt=prompt,
|
146 |
+
max_tokens=50,
|
147 |
+
temperature=0.7,
|
148 |
+
timeout=5
|
149 |
+
)
|
150 |
+
advice = response.choices[0].text.strip()
|
151 |
+
print(f"Advice took {time.time() - start:.2f}s: {advice}")
|
152 |
+
return advice
|
153 |
+
except Exception as e:
|
154 |
+
print(f"LLM error: {str(e)}")
|
155 |
+
return f"Error with OpenAI key: {str(e)}"
|
156 |
+
|
157 |
# AutoGen Agent Definitions
|
158 |
food_recognizer = AssistantAgent(
|
159 |
name="FoodRecognizer",
|
160 |
+
system_message="Identify all food items in the image and return a list of (label, probability) pairs. Call recognize_foods with the image provided in the message.",
|
161 |
function_map={"recognize_foods": recognize_foods}
|
162 |
)
|
163 |
|
164 |
size_estimator = AssistantAgent(
|
165 |
name="SizeEstimator",
|
166 |
+
system_message="Estimate portion sizes in grams for each recognized food based on the image. Call estimate_sizes with the image and list of foods from the previous message.",
|
167 |
function_map={"estimate_sizes": estimate_sizes}
|
168 |
)
|
169 |
|
170 |
nutrition_fetcher = AssistantAgent(
|
171 |
name="NutritionFetcher",
|
172 |
+
system_message="Fetch nutritional data from the Nutritionix API using the user's key. Call fetch_nutrition with the foods and sizes dictionary from the previous message and the Nutritionix key from the initial message.",
|
173 |
function_map={"fetch_nutrition": fetch_nutrition}
|
174 |
)
|
175 |
|
176 |
+
advice_agent = AssistantAgent(
|
177 |
+
name="NutritionAdvisor",
|
178 |
+
system_message="Provide basic nutrition advice based on the food data using the user's OpenAI key. Call get_nutrition_advice with the nutrition data from the previous message and the OpenAI key from the initial message.",
|
179 |
+
function_map={"get_nutrition_advice": get_nutrition_advice}
|
180 |
+
)
|
181 |
+
|
182 |
orchestrator = AssistantAgent(
|
183 |
name="Orchestrator",
|
184 |
+
system_message="Coordinate the workflow, format the output, and return the final result as text. Start by asking FoodRecognizer to process the image, then SizeEstimator, then NutritionFetcher, then NutritionAdvisor (if OpenAI key provided), and finally format the results into 'Food Analysis:\\n- food1 (size1g, prob1% confidence): calories1 cal, protein1g protein, fat1g fat, carbs1g carbs\\n...' for each food, followed by '\\nNutrition Advice:\\n' and the advice if available.",
|
185 |
function_map={}
|
186 |
)
|
187 |
|
188 |
# Custom speaker selection function (no LLM needed, updated for AutoGen 0.7.6)
|
189 |
def custom_select_speaker(last_speaker, groupchat):
|
190 |
+
"""Select the next speaker in a fixed order: FoodRecognizer → SizeEstimator → NutritionFetcher → NutritionAdvisor → Orchestrator."""
|
191 |
if last_speaker is None:
|
192 |
return food_recognizer # Return the Agent object, not the name
|
193 |
+
order = [food_recognizer, size_estimator, nutrition_fetcher, advice_agent, orchestrator]
|
194 |
current_index = order.index(last_speaker)
|
195 |
next_index = (current_index + 1) % len(order)
|
196 |
return order[next_index]
|
197 |
|
198 |
+
# Group Chat for Agent Coordination (no LLM for selection, custom speaker selection method)
|
199 |
group_chat = GroupChat(
|
200 |
+
agents=[food_recognizer, size_estimator, nutrition_fetcher, advice_agent, orchestrator],
|
201 |
messages=[],
|
202 |
+
max_round=5, # Increase for advice agent
|
203 |
speaker_selection_method=custom_select_speaker # Use correct parameter for AutoGen 0.7.6
|
204 |
)
|
205 |
manager = GroupChatManager(groupchat=group_chat)
|
206 |
|
207 |
# Orchestrator Logic (via AutoGen chat)
|
208 |
+
def orchestrate_workflow(image, nutritionix_key, openai_key=None):
|
209 |
start = time.time()
|
210 |
|
211 |
+
# Initiate chat with Orchestrator, passing image and keys as message
|
212 |
message = f"Process this image: {image} with Nutritionix key: {nutritionix_key}"
|
213 |
+
if openai_key:
|
214 |
+
message += f" and OpenAI key: {openai_key}"
|
215 |
response = manager.initiate_chat(
|
216 |
orchestrator,
|
217 |
message=message,
|
|
|
228 |
|
229 |
if isinstance(result, dict):
|
230 |
result = result.get("text", "No text output from agents.")
|
231 |
+
|
232 |
+
# Split result into nutrition and advice if OpenAI key was provided
|
233 |
+
if openai_key and isinstance(result, str) and "\nNutrition Advice:\n" in result:
|
234 |
+
parts = result.split("\nNutrition Advice:\n", 1)
|
235 |
+
nutrition = parts[0] if parts[0] else "No nutrition data."
|
236 |
+
advice = parts[1] if len(parts) > 1 else "No advice available."
|
237 |
+
else:
|
238 |
+
nutrition = result if result != "No output from agents." else "No nutrition data."
|
239 |
+
advice = "No advice available (OpenAI key required)."
|
240 |
+
|
241 |
print(f"Total time: {time.time() - start:.2f}s")
|
242 |
+
return nutrition, advice
|
243 |
|
244 |
# Gradio Interface
|
245 |
interface = gr.Interface(
|
246 |
fn=orchestrate_workflow,
|
247 |
inputs=[
|
248 |
gr.Image(type="numpy", label="Upload a Food Photo"),
|
249 |
+
gr.Textbox(type="password", label="Your Nutritionix API Key (required)"),
|
250 |
+
gr.Textbox(type="password", label="Your OpenAI API Key (optional for advice)")
|
251 |
],
|
252 |
outputs=[
|
253 |
+
gr.Textbox(label="Nutrition Breakdown"),
|
254 |
+
gr.Textbox(label="Nutrition Advice")
|
255 |
],
|
256 |
title="Food Nutrition Analyzer",
|
257 |
+
description="Upload a food photo and provide your Nutritionix API key. Add an OpenAI key for nutrition advice."
|
258 |
)
|
259 |
|
260 |
if __name__ == "__main__":
|