Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
Update app.py
Browse files
app.py
CHANGED
@@ -151,14 +151,33 @@ def process_video(video_path: str) -> list[dict]:
|
|
151 |
import base64
|
152 |
import re
|
153 |
import mimetypes # Added for MIME type detection
|
|
|
|
|
|
|
|
|
|
|
154 |
|
155 |
-
def encode_image_to_base64(image_path):
|
156 |
-
|
157 |
with open(image_path, "rb") as image_file:
|
158 |
return base64.b64encode(image_file.read()).decode('utf-8')
|
159 |
|
160 |
-
def
|
161 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
162 |
parts = re.split(r"(<image>)", message["text"])
|
163 |
|
164 |
final_content = []
|
@@ -167,25 +186,22 @@ def process_interleaved_images(message: dict) -> list:
|
|
167 |
|
168 |
for part in parts:
|
169 |
if part == "<image>":
|
170 |
-
#
|
171 |
if current_text.strip():
|
172 |
final_content.append({"type": "text", "text": current_text.strip()})
|
173 |
current_text = ""
|
174 |
|
175 |
-
#
|
176 |
-
|
177 |
-
|
178 |
-
|
179 |
-
|
180 |
-
media_type = f"image/{image_extension}"
|
181 |
-
if image_extension == 'jpg':
|
182 |
-
media_type = "image/jpeg"
|
183 |
|
184 |
-
|
185 |
-
|
186 |
-
|
187 |
-
|
188 |
-
|
189 |
else:
|
190 |
current_text += part
|
191 |
|
@@ -195,66 +211,70 @@ def process_interleaved_images(message: dict) -> list:
|
|
195 |
|
196 |
return final_content
|
197 |
|
198 |
-
def
|
199 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
200 |
return [{"role": "user", "content": message["text"]}]
|
201 |
-
|
202 |
-
|
|
|
203 |
text_message = {"role": "user", "content": message["text"]}
|
204 |
video_messages = process_video(message["files"][0])
|
205 |
return [text_message] + video_messages
|
|
|
|
|
206 |
if "<image>" in message["text"]:
|
207 |
-
# For interleaved text and images
|
208 |
content = process_interleaved_images(message)
|
209 |
return [{"role": "user", "content": content}]
|
210 |
|
211 |
-
#
|
212 |
content = [{"type": "text", "text": message["text"]}]
|
213 |
-
for
|
214 |
-
|
215 |
-
|
216 |
-
|
217 |
-
image_extension = path.split('.')[-1].lower()
|
218 |
-
media_type = f"image/{image_extension}"
|
219 |
-
if image_extension == 'jpg':
|
220 |
-
media_type = "image/jpeg"
|
221 |
-
|
222 |
content.append({
|
223 |
"type": "image_url",
|
224 |
-
"image_url": {"url": f"data:{
|
225 |
})
|
226 |
|
227 |
return [{"role": "user", "content": content}]
|
228 |
|
229 |
-
def process_history(history:
|
|
|
230 |
messages = []
|
231 |
|
232 |
for item in history:
|
233 |
if item["role"] == "assistant":
|
234 |
messages.append({"role": "assistant", "content": item["content"]})
|
235 |
else: # user messages
|
236 |
-
content = item
|
237 |
if isinstance(content, str):
|
238 |
messages.append({"role": "user", "content": content})
|
239 |
-
|
240 |
-
#
|
241 |
image_path = content[0]
|
242 |
-
|
243 |
-
|
244 |
-
|
245 |
-
media_type = f"image/{image_extension}"
|
246 |
-
if image_extension == 'jpg':
|
247 |
-
media_type = "image/jpeg"
|
248 |
-
|
249 |
messages.append({
|
250 |
"role": "user",
|
251 |
"content": [
|
252 |
{
|
253 |
"type": "image_url",
|
254 |
-
"image_url": {"url": f"data:{
|
255 |
}
|
256 |
]
|
257 |
})
|
|
|
|
|
|
|
258 |
|
259 |
return messages
|
260 |
|
|
|
151 |
import base64
|
152 |
import re
|
153 |
import mimetypes # Added for MIME type detection
|
154 |
+
import base64
|
155 |
+
import re
|
156 |
+
import os
|
157 |
+
import requests
|
158 |
+
from typing import List, Dict, Union, Any
|
159 |
|
160 |
+
def encode_image_to_base64(image_path: str) -> str:
|
161 |
+
"""Convert an image file to base64 encoding."""
|
162 |
with open(image_path, "rb") as image_file:
|
163 |
return base64.b64encode(image_file.read()).decode('utf-8')
|
164 |
|
165 |
+
def get_mime_type(file_path: str) -> str:
|
166 |
+
"""Determine MIME type based on file extension."""
|
167 |
+
extension = file_path.split('.')[-1].lower()
|
168 |
+
if extension == 'jpg' or extension == 'jpeg':
|
169 |
+
return "image/jpeg"
|
170 |
+
elif extension == 'png':
|
171 |
+
return "image/png"
|
172 |
+
elif extension == 'gif':
|
173 |
+
return "image/gif"
|
174 |
+
elif extension == 'webp':
|
175 |
+
return "image/webp"
|
176 |
+
else:
|
177 |
+
return f"image/{extension}"
|
178 |
+
|
179 |
+
def process_interleaved_images(message: Dict[str, Any]) -> List[Dict[str, Any]]:
|
180 |
+
"""Process a message with interleaved text and image tags."""
|
181 |
parts = re.split(r"(<image>)", message["text"])
|
182 |
|
183 |
final_content = []
|
|
|
186 |
|
187 |
for part in parts:
|
188 |
if part == "<image>":
|
189 |
+
# Add accumulated text if any
|
190 |
if current_text.strip():
|
191 |
final_content.append({"type": "text", "text": current_text.strip()})
|
192 |
current_text = ""
|
193 |
|
194 |
+
# Process and add the image as base64
|
195 |
+
if image_index < len(message.get('files', [])):
|
196 |
+
image_path = message['files'][image_index]
|
197 |
+
base64_data = encode_image_to_base64(image_path)
|
198 |
+
mime_type = get_mime_type(image_path)
|
|
|
|
|
|
|
199 |
|
200 |
+
final_content.append({
|
201 |
+
"type": "image_url",
|
202 |
+
"image_url": {"url": f"data:{mime_type};base64,{base64_data}"}
|
203 |
+
})
|
204 |
+
image_index += 1
|
205 |
else:
|
206 |
current_text += part
|
207 |
|
|
|
211 |
|
212 |
return final_content
|
213 |
|
214 |
+
def process_video(video_path: str) -> List[Dict[str, Any]]:
|
215 |
+
"""Placeholder for video processing function."""
|
216 |
+
# Implementation depends on requirements for video processing
|
217 |
+
pass
|
218 |
+
|
219 |
+
def process_new_user_message(message: Dict[str, Any]) -> List[Dict[str, Any]]:
|
220 |
+
"""Process user message with potential images or videos."""
|
221 |
+
# If no files, just return the text
|
222 |
+
if not message.get("files", []):
|
223 |
return [{"role": "user", "content": message["text"]}]
|
224 |
+
|
225 |
+
# Handle video files
|
226 |
+
if message["files"] and message["files"][0].endswith(".mp4"):
|
227 |
text_message = {"role": "user", "content": message["text"]}
|
228 |
video_messages = process_video(message["files"][0])
|
229 |
return [text_message] + video_messages
|
230 |
+
|
231 |
+
# Handle interleaved images
|
232 |
if "<image>" in message["text"]:
|
|
|
233 |
content = process_interleaved_images(message)
|
234 |
return [{"role": "user", "content": content}]
|
235 |
|
236 |
+
# Handle text with images appended
|
237 |
content = [{"type": "text", "text": message["text"]}]
|
238 |
+
for file_path in message.get("files", []):
|
239 |
+
base64_data = encode_image_to_base64(file_path)
|
240 |
+
mime_type = get_mime_type(file_path)
|
241 |
+
|
|
|
|
|
|
|
|
|
|
|
242 |
content.append({
|
243 |
"type": "image_url",
|
244 |
+
"image_url": {"url": f"data:{mime_type};base64,{base64_data}"}
|
245 |
})
|
246 |
|
247 |
return [{"role": "user", "content": content}]
|
248 |
|
249 |
+
def process_history(history: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
250 |
+
"""Process conversation history, encoding images as base64."""
|
251 |
messages = []
|
252 |
|
253 |
for item in history:
|
254 |
if item["role"] == "assistant":
|
255 |
messages.append({"role": "assistant", "content": item["content"]})
|
256 |
else: # user messages
|
257 |
+
content = item.get("content")
|
258 |
if isinstance(content, str):
|
259 |
messages.append({"role": "user", "content": content})
|
260 |
+
elif isinstance(content, list) and len(content) > 0 and isinstance(content[0], str):
|
261 |
+
# Image file path
|
262 |
image_path = content[0]
|
263 |
+
base64_data = encode_image_to_base64(image_path)
|
264 |
+
mime_type = get_mime_type(image_path)
|
265 |
+
|
|
|
|
|
|
|
|
|
266 |
messages.append({
|
267 |
"role": "user",
|
268 |
"content": [
|
269 |
{
|
270 |
"type": "image_url",
|
271 |
+
"image_url": {"url": f"data:{mime_type};base64,{base64_data}"}
|
272 |
}
|
273 |
]
|
274 |
})
|
275 |
+
else:
|
276 |
+
# Already properly formatted content
|
277 |
+
messages.append({"role": "user", "content": content})
|
278 |
|
279 |
return messages
|
280 |
|