tdurzynski commited on
Commit
1a6c907
·
verified ·
1 Parent(s): 9d0a3d7

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +43 -54
app.py CHANGED
@@ -1,50 +1,62 @@
1
  import gradio as gr
2
  import cv2
3
  import numpy as np
4
- import requests
5
  from PIL import Image
6
- from autogen import AssistantAgent, GroupChat, GroupChatManager
 
7
  import os
8
  import time
9
- #import openai
10
- from ultralytics import YOLO
11
 
 
12
  model = YOLO("yolov8n.pt") # Nano model for speed, fine-tune on food data later
13
 
14
- # Agent Functions
15
  def recognize_foods(image):
16
  start = time.time()
17
- # Resize to 640x640 (YOLO default)
 
 
 
 
18
  pil_image = Image.fromarray(image).resize((640, 640))
19
  results = model(pil_image)
20
  foods = []
21
  for result in results:
22
  for cls in result.boxes.cls:
23
  label = model.names[int(cls)]
24
- if "food" in label.lower() or label in ["pasta", "rice", "tomato", "potato", "bread"]: # Expand this list
25
  conf = result.boxes.conf[result.boxes.cls == cls].item()
26
  foods.append((label, conf))
27
- print(f"Recognition took {time.time() - start:.2f}s")
28
  return list(set(foods)) # Remove duplicates
29
 
30
  def estimate_sizes(image, foods):
31
  start = time.time()
32
- img_cv = cv2.cvtColor(image, cv2.COLOR_RGB2BGR).resize((640, 640)) # Match YOLO size
 
 
 
 
33
  sizes = {}
34
  total_area = img_cv.shape[0] * img_cv.shape[1]
35
  for food, _ in foods:
36
- # Dummy: assume area proportion (refine with food-specific weights later)
37
  area = total_area / len(foods) # Even split for now
38
- grams = min(500, int(area / (640 * 640) * 100)) # 100g per ~640k pixels
39
  sizes[food] = grams
40
- print(f"Size estimation took {time.time() - start:.2f}s")
41
  return sizes
42
 
43
  def fetch_nutrition(foods_with_sizes, nutritionix_key):
 
44
  if not nutritionix_key:
 
45
  return "Please provide a Nutritionix API key for nutrition data."
 
 
 
46
 
47
- start = time.time()
48
  url = "https://trackapi.nutritionix.com/v2/natural/nutrients"
49
  headers = {
50
  "x-app-id": os.getenv("NUTRITIONIX_APP_ID"), # From HF Secrets
@@ -58,6 +70,7 @@ def fetch_nutrition(foods_with_sizes, nutritionix_key):
58
  try:
59
  response = requests.post(url, headers=headers, json=body, timeout=10)
60
  if response.status_code != 200:
 
61
  return f"Nutritionix API error: {response.text}"
62
 
63
  data = response.json().get("foods", [])
@@ -70,78 +83,56 @@ def fetch_nutrition(foods_with_sizes, nutritionix_key):
70
  "fat": item.get("nf_total_fat", 0),
71
  "carbs": item.get("nf_total_carbohydrate", 0)
72
  }
73
- print(f"Nutrition fetch took {time.time() - start:.2f}s")
74
  return nutrition_data
75
  except requests.Timeout:
 
76
  return "Nutritionix API timed out."
77
  except Exception as e:
 
78
  return f"Nutritionix error: {str(e)}"
79
 
80
- #def get_nutrition_advice(nutrition_data, llm_key):
81
- # if not llm_key:
82
- # return "No OpenAI/Grok key provided—skipping advice."
83
- # try:
84
- # openai.api_key = llm_key
85
- # prompt = "Given this nutritional data, suggest a dietary tip:\n"
86
- # for food, data in nutrition_data.items():
87
- # prompt += f"- {food}: {data['calories']} cal, {data['protein']}g protein, {data['fat']}g fat, {data['carbs']}g carbs\n"
88
- #
89
- # response = openai.Completion.create(
90
- # model="text-davinci-003", # Swap for Grok if xAI API is available
91
- # prompt=prompt,
92
- # max_tokens=50
93
- # )
94
- # return response.choices[0].text.strip()
95
- # except Exception as e:
96
- # return f"Error with LLM key: {str(e)}"
97
-
98
-
99
  # AutoGen Agent Definitions
100
  food_recognizer = AssistantAgent(
101
  name="FoodRecognizer",
102
- system_message="Identify all food items in the image and return a list of (label, probability) pairs.",
103
  function_map={"recognize_foods": recognize_foods}
104
  )
105
 
106
  size_estimator = AssistantAgent(
107
  name="SizeEstimator",
108
- system_message="Estimate portion sizes in grams for each recognized food based on the image.",
109
  function_map={"estimate_sizes": estimate_sizes}
110
  )
111
 
112
  nutrition_fetcher = AssistantAgent(
113
  name="NutritionFetcher",
114
- system_message="Fetch nutritional data from the Nutritionix API using the user's key.",
115
  function_map={"fetch_nutrition": fetch_nutrition}
116
  )
117
 
118
- ##advice_agent = AssistantAgent(
119
- ## name="NutritionAdvisor",
120
- ## system_message="Provide basic nutrition advice using the user's OpenAI/Grok key."
121
- ##)
122
-
123
  orchestrator = AssistantAgent(
124
  name="Orchestrator",
125
- system_message="Coordinate the workflow, format the output, and return the final result as text.",
126
  function_map={}
127
  )
128
 
129
- # Custom speaker selection function (no LLM needed)
130
  def custom_select_speaker(last_speaker, groupchat):
131
  """Select the next speaker in a fixed order: FoodRecognizer → SizeEstimator → NutritionFetcher → Orchestrator."""
132
  if last_speaker is None:
133
- return "FoodRecognizer" # Start with FoodRecognizer
134
  order = [food_recognizer, size_estimator, nutrition_fetcher, orchestrator]
135
  current_index = order.index(last_speaker)
136
  next_index = (current_index + 1) % len(order)
137
  return order[next_index]
138
 
139
- # Group Chat for Agent Coordination (no LLM, custom speaker selection)
140
  group_chat = GroupChat(
141
  agents=[food_recognizer, size_estimator, nutrition_fetcher, orchestrator],
142
  messages=[],
143
- max_round=4,
144
- speaker_selection_method=custom_select_speaker # Use custom speaker selection instead of LLM
145
  )
146
  manager = GroupChatManager(groupchat=group_chat)
147
 
@@ -157,7 +148,7 @@ def orchestrate_workflow(image, nutritionix_key):
157
  max_turns=10
158
  )
159
 
160
- # Extract and format the final response from the ChatResult
161
  if hasattr(response, 'chat_history') and response.chat_history:
162
  # Get the last message from chat history
163
  last_message = response.chat_history[-1]
@@ -169,21 +160,19 @@ def orchestrate_workflow(image, nutritionix_key):
169
  result = result.get("text", "No text output from agents.")
170
  print(f"Total time: {time.time() - start:.2f}s")
171
  return result
172
-
173
  # Gradio Interface
174
  interface = gr.Interface(
175
  fn=orchestrate_workflow,
176
  inputs=[
177
  gr.Image(type="numpy", label="Upload a Food Photo"),
178
- gr.Textbox(type="password", label="Your Nutritionix API Key (required)"),
179
- #gr.Textbox(type="password", label="Your OpenAI/Grok API Key (optional for advice)")
180
  ],
181
  outputs=[
182
- gr.Textbox(label="Nutrition Breakdown"),
183
- #gr.Textbox(label="Nutrition Advice")
184
  ],
185
  title="Food Nutrition Analyzer",
186
- description="Upload a food photo and provide your Nutritionix API key. Add an OpenAI/Grok key for advice."
187
  )
188
 
189
  if __name__ == "__main__":
 
1
  import gradio as gr
2
  import cv2
3
  import numpy as np
 
4
  from PIL import Image
5
+ from ultralytics import YOLO
6
+ import requests
7
  import os
8
  import time
9
+ from autogen import AssistantAgent, GroupChat, GroupChatManager
 
10
 
11
+ # Initialize YOLOv8 for multi-label food detection
12
  model = YOLO("yolov8n.pt") # Nano model for speed, fine-tune on food data later
13
 
14
+ # Agent Functions (registered with AutoGen)
15
  def recognize_foods(image):
16
  start = time.time()
17
+ # Check if image is valid (not all 255s or empty)
18
+ if image is None or np.all(image == 255):
19
+ print("Warning: Invalid or empty image detected.")
20
+ return [] # Return empty list for invalid images
21
+ # Resize to 640x640 (YOLO default) to reduce load and match model input
22
  pil_image = Image.fromarray(image).resize((640, 640))
23
  results = model(pil_image)
24
  foods = []
25
  for result in results:
26
  for cls in result.boxes.cls:
27
  label = model.names[int(cls)]
28
+ if "food" in label.lower() or label in ["pasta", "rice", "tomato", "potato", "bread", "curry"]: # Expand this list
29
  conf = result.boxes.conf[result.boxes.cls == cls].item()
30
  foods.append((label, conf))
31
+ print(f"Recognition took {time.time() - start:.2f}s: Found foods {foods}")
32
  return list(set(foods)) # Remove duplicates
33
 
34
  def estimate_sizes(image, foods):
35
  start = time.time()
36
+ if not foods:
37
+ print("Warning: No foods to estimate sizes for.")
38
+ return {}
39
+ # Resize to match YOLO output for consistency
40
+ img_cv = cv2.cvtColor(image, cv2.COLOR_RGB2BGR).resize((640, 640))
41
  sizes = {}
42
  total_area = img_cv.shape[0] * img_cv.shape[1]
43
  for food, _ in foods:
44
+ # Dummy: assume area proportion (refine with food-specific weights or bounding boxes later)
45
  area = total_area / len(foods) # Even split for now
46
+ grams = min(500, int(area / (640 * 640) * 100)) # 100g per ~640k pixels, capped at 500g
47
  sizes[food] = grams
48
+ print(f"Size estimation took {time.time() - start:.2f}s: Estimated sizes {sizes}")
49
  return sizes
50
 
51
  def fetch_nutrition(foods_with_sizes, nutritionix_key):
52
+ start = time.time()
53
  if not nutritionix_key:
54
+ print("Warning: No Nutritionix API key provided.")
55
  return "Please provide a Nutritionix API key for nutrition data."
56
+ if not foods_with_sizes:
57
+ print("Warning: No foods to fetch nutrition for.")
58
+ return {}
59
 
 
60
  url = "https://trackapi.nutritionix.com/v2/natural/nutrients"
61
  headers = {
62
  "x-app-id": os.getenv("NUTRITIONIX_APP_ID"), # From HF Secrets
 
70
  try:
71
  response = requests.post(url, headers=headers, json=body, timeout=10)
72
  if response.status_code != 200:
73
+ print(f"Nutritionix API error: {response.text}")
74
  return f"Nutritionix API error: {response.text}"
75
 
76
  data = response.json().get("foods", [])
 
83
  "fat": item.get("nf_total_fat", 0),
84
  "carbs": item.get("nf_total_carbohydrate", 0)
85
  }
86
+ print(f"Nutrition fetch took {time.time() - start:.2f}s: Fetched nutrition {nutrition_data}")
87
  return nutrition_data
88
  except requests.Timeout:
89
+ print("Nutritionix API timed out.")
90
  return "Nutritionix API timed out."
91
  except Exception as e:
92
+ print(f"Nutritionix error: {str(e)}")
93
  return f"Nutritionix error: {str(e)}"
94
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
95
  # AutoGen Agent Definitions
96
  food_recognizer = AssistantAgent(
97
  name="FoodRecognizer",
98
+ system_message="Identify all food items in the image and return a list of (label, probability) pairs. Call recognize_foods with the image.",
99
  function_map={"recognize_foods": recognize_foods}
100
  )
101
 
102
  size_estimator = AssistantAgent(
103
  name="SizeEstimator",
104
+ system_message="Estimate portion sizes in grams for each recognized food based on the image. Call estimate_sizes with the image and list of foods.",
105
  function_map={"estimate_sizes": estimate_sizes}
106
  )
107
 
108
  nutrition_fetcher = AssistantAgent(
109
  name="NutritionFetcher",
110
+ system_message="Fetch nutritional data from the Nutritionix API using the user's key. Call fetch_nutrition with the foods and sizes dictionary and Nutritionix key.",
111
  function_map={"fetch_nutrition": fetch_nutrition}
112
  )
113
 
 
 
 
 
 
114
  orchestrator = AssistantAgent(
115
  name="Orchestrator",
116
+ system_message="Coordinate the workflow, format the output, and return the final result as text. Start by asking FoodRecognizer to process the image, then SizeEstimator, then NutritionFetcher, and finally format the results.",
117
  function_map={}
118
  )
119
 
120
+ # Custom speaker selection function (no LLM needed, updated for AutoGen 0.7.6)
121
  def custom_select_speaker(last_speaker, groupchat):
122
  """Select the next speaker in a fixed order: FoodRecognizer → SizeEstimator → NutritionFetcher → Orchestrator."""
123
  if last_speaker is None:
124
+ return food_recognizer # Return the Agent object, not the name
125
  order = [food_recognizer, size_estimator, nutrition_fetcher, orchestrator]
126
  current_index = order.index(last_speaker)
127
  next_index = (current_index + 1) % len(order)
128
  return order[next_index]
129
 
130
+ # Group Chat for Agent Coordination (no LLM, custom speaker selection method)
131
  group_chat = GroupChat(
132
  agents=[food_recognizer, size_estimator, nutrition_fetcher, orchestrator],
133
  messages=[],
134
+ max_round=4, # Limit rounds to match agent order
135
+ speaker_selection_method=custom_select_speaker # Use correct parameter for AutoGen 0.7.6
136
  )
137
  manager = GroupChatManager(groupchat=group_chat)
138
 
 
148
  max_turns=10
149
  )
150
 
151
+ # Extract and format the final response from the ChatResult
152
  if hasattr(response, 'chat_history') and response.chat_history:
153
  # Get the last message from chat history
154
  last_message = response.chat_history[-1]
 
160
  result = result.get("text", "No text output from agents.")
161
  print(f"Total time: {time.time() - start:.2f}s")
162
  return result
163
+
164
  # Gradio Interface
165
  interface = gr.Interface(
166
  fn=orchestrate_workflow,
167
  inputs=[
168
  gr.Image(type="numpy", label="Upload a Food Photo"),
169
+ gr.Textbox(type="password", label="Your Nutritionix API Key (required)")
 
170
  ],
171
  outputs=[
172
+ gr.Textbox(label="Nutrition Breakdown")
 
173
  ],
174
  title="Food Nutrition Analyzer",
175
+ description="Upload a food photo and provide your Nutritionix API key for nutrition data."
176
  )
177
 
178
  if __name__ == "__main__":