Spaces:
Paused
Paused
Update app.py
Browse files
app.py
CHANGED
@@ -152,30 +152,13 @@ import base64
|
|
152 |
import re
|
153 |
import mimetypes # Added for MIME type detection
|
154 |
|
155 |
-
def encode_image_to_base64(image_path
|
|
|
156 |
with open(image_path, "rb") as image_file:
|
157 |
return base64.b64encode(image_file.read()).decode('utf-8')
|
158 |
|
159 |
-
def get_image_media_type(image_path: str) -> str:
|
160 |
-
ext = image_path.split('.')[-1].lower()
|
161 |
-
if ext in ("jpg", "jpeg"):
|
162 |
-
return "image/jpeg"
|
163 |
-
elif ext == "png":
|
164 |
-
return "image/png"
|
165 |
-
elif ext == "gif":
|
166 |
-
return "image/gif"
|
167 |
-
elif ext == "webp":
|
168 |
-
return "image/webp"
|
169 |
-
else:
|
170 |
-
# Fallback to mimetypes detection
|
171 |
-
mime_type, _ = mimetypes.guess_type(image_path)
|
172 |
-
if mime_type and mime_type.startswith("image/"):
|
173 |
-
return mime_type
|
174 |
-
# Default fallback if type is unknown or not a recognized image type
|
175 |
-
return "application/octet-stream"
|
176 |
-
|
177 |
-
|
178 |
def process_interleaved_images(message: dict) -> list:
|
|
|
179 |
parts = re.split(r"(<image>)", message["text"])
|
180 |
|
181 |
final_content = []
|
@@ -189,16 +172,20 @@ def process_interleaved_images(message: dict) -> list:
|
|
189 |
final_content.append({"type": "text", "text": current_text.strip()})
|
190 |
current_text = ""
|
191 |
|
192 |
-
# Add the image as base64
|
193 |
-
|
194 |
-
|
195 |
-
|
196 |
-
|
197 |
-
|
198 |
-
|
199 |
-
|
200 |
-
|
201 |
-
|
|
|
|
|
|
|
|
|
202 |
else:
|
203 |
current_text += part
|
204 |
|
@@ -208,16 +195,14 @@ def process_interleaved_images(message: dict) -> list:
|
|
208 |
|
209 |
return final_content
|
210 |
|
211 |
-
|
212 |
-
|
213 |
-
if not message.get("files"): # Check if "files" key exists and is not empty
|
214 |
return [{"role": "user", "content": message["text"]}]
|
215 |
-
|
216 |
if message["files"][0].endswith(".mp4"):
|
|
|
217 |
text_message = {"role": "user", "content": message["text"]}
|
218 |
-
video_messages = process_video(message["files"][0])
|
219 |
return [text_message] + video_messages
|
220 |
-
|
221 |
if "<image>" in message["text"]:
|
222 |
# For interleaved text and images
|
223 |
content = process_interleaved_images(message)
|
@@ -226,17 +211,21 @@ def process_new_user_message(message: dict) -> list:
|
|
226 |
# For text with images appended
|
227 |
content = [{"type": "text", "text": message["text"]}]
|
228 |
for path in message["files"]:
|
229 |
-
|
230 |
-
|
231 |
-
|
232 |
-
|
233 |
-
|
234 |
-
|
235 |
-
|
|
|
|
|
|
|
|
|
|
|
236 |
|
237 |
return [{"role": "user", "content": content}]
|
238 |
|
239 |
-
|
240 |
def process_history(history: list[dict]) -> list[dict]:
|
241 |
messages = []
|
242 |
|
@@ -244,32 +233,31 @@ def process_history(history: list[dict]) -> list[dict]:
|
|
244 |
if item["role"] == "assistant":
|
245 |
messages.append({"role": "assistant", "content": item["content"]})
|
246 |
else: # user messages
|
247 |
-
|
248 |
-
if isinstance(
|
249 |
-
messages.append({"role": "user", "content":
|
250 |
-
|
251 |
-
|
252 |
-
|
253 |
-
|
254 |
-
|
255 |
-
|
256 |
-
|
257 |
-
|
258 |
-
|
259 |
-
|
260 |
-
|
261 |
-
|
262 |
-
|
263 |
-
|
264 |
-
|
265 |
-
|
266 |
-
|
267 |
-
|
268 |
-
|
269 |
-
|
270 |
return messages
|
271 |
|
272 |
-
|
273 |
def run(message: dict, history: list[dict]) -> Iterator[str]:
|
274 |
if not validate_media_constraints(message, history):
|
275 |
yield ""
|
|
|
152 |
import re
|
153 |
import mimetypes # Added for MIME type detection
|
154 |
|
155 |
+
def encode_image_to_base64(image_path):
|
156 |
+
import base64
|
157 |
with open(image_path, "rb") as image_file:
|
158 |
return base64.b64encode(image_file.read()).decode('utf-8')
|
159 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
160 |
def process_interleaved_images(message: dict) -> list:
|
161 |
+
import re
|
162 |
parts = re.split(r"(<image>)", message["text"])
|
163 |
|
164 |
final_content = []
|
|
|
172 |
final_content.append({"type": "text", "text": current_text.strip()})
|
173 |
current_text = ""
|
174 |
|
175 |
+
# Add the image as base64
|
176 |
+
image_path = message['files'][image_index]
|
177 |
+
image_base64 = encode_image_to_base64(image_path)
|
178 |
+
# Determine media type based on file extension
|
179 |
+
image_extension = image_path.split('.')[-1].lower()
|
180 |
+
media_type = f"image/{image_extension}"
|
181 |
+
if image_extension == 'jpg':
|
182 |
+
media_type = "image/jpeg"
|
183 |
+
|
184 |
+
final_content.append({
|
185 |
+
"type": "image_url",
|
186 |
+
"image_url": {"url": f"data:{media_type};base64,{image_base64}"}
|
187 |
+
})
|
188 |
+
image_index += 1
|
189 |
else:
|
190 |
current_text += part
|
191 |
|
|
|
195 |
|
196 |
return final_content
|
197 |
|
198 |
+
def process_new_user_message(message: dict):
|
199 |
+
if not message["files"]:
|
|
|
200 |
return [{"role": "user", "content": message["text"]}]
|
|
|
201 |
if message["files"][0].endswith(".mp4"):
|
202 |
+
# For video, return text message followed by frame messages
|
203 |
text_message = {"role": "user", "content": message["text"]}
|
204 |
+
video_messages = process_video(message["files"][0])
|
205 |
return [text_message] + video_messages
|
|
|
206 |
if "<image>" in message["text"]:
|
207 |
# For interleaved text and images
|
208 |
content = process_interleaved_images(message)
|
|
|
211 |
# For text with images appended
|
212 |
content = [{"type": "text", "text": message["text"]}]
|
213 |
for path in message["files"]:
|
214 |
+
# Convert image to base64
|
215 |
+
image_base64 = encode_image_to_base64(path)
|
216 |
+
# Determine media type based on file extension
|
217 |
+
image_extension = path.split('.')[-1].lower()
|
218 |
+
media_type = f"image/{image_extension}"
|
219 |
+
if image_extension == 'jpg':
|
220 |
+
media_type = "image/jpeg"
|
221 |
+
|
222 |
+
content.append({
|
223 |
+
"type": "image_url",
|
224 |
+
"image_url": {"url": f"data:{media_type};base64,{image_base64}"}
|
225 |
+
})
|
226 |
|
227 |
return [{"role": "user", "content": content}]
|
228 |
|
|
|
229 |
def process_history(history: list[dict]) -> list[dict]:
|
230 |
messages = []
|
231 |
|
|
|
233 |
if item["role"] == "assistant":
|
234 |
messages.append({"role": "assistant", "content": item["content"]})
|
235 |
else: # user messages
|
236 |
+
content = item["content"]
|
237 |
+
if isinstance(content, str):
|
238 |
+
messages.append({"role": "user", "content": content})
|
239 |
+
else: # image content
|
240 |
+
# Convert image to base64
|
241 |
+
image_path = content[0]
|
242 |
+
image_base64 = encode_image_to_base64(image_path)
|
243 |
+
# Determine media type based on file extension
|
244 |
+
image_extension = image_path.split('.')[-1].lower()
|
245 |
+
media_type = f"image/{image_extension}"
|
246 |
+
if image_extension == 'jpg':
|
247 |
+
media_type = "image/jpeg"
|
248 |
+
|
249 |
+
messages.append({
|
250 |
+
"role": "user",
|
251 |
+
"content": [
|
252 |
+
{
|
253 |
+
"type": "image_url",
|
254 |
+
"image_url": {"url": f"data:{media_type};base64,{image_base64}"}
|
255 |
+
}
|
256 |
+
]
|
257 |
+
})
|
258 |
+
|
259 |
return messages
|
260 |
|
|
|
261 |
def run(message: dict, history: list[dict]) -> Iterator[str]:
|
262 |
if not validate_media_constraints(message, history):
|
263 |
yield ""
|