Spaces:
dxdcx
/
Running on CPU Upgrade

dxdcx commited on
Commit
6f932dc
·
verified ·
1 Parent(s): 09013ea

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +125 -77
app.py CHANGED
@@ -153,29 +153,45 @@ import re
153
  import mimetypes # Added for MIME type detection
154
  import requests
155
  from typing import List, Dict, Union, Any
 
 
 
156
 
157
  def encode_image_to_base64(image_path: str) -> str:
158
- """Convert an image file to base64 encoding."""
159
  with open(image_path, "rb") as image_file:
160
  return base64.b64encode(image_file.read()).decode('utf-8')
161
 
162
- def get_mime_type(file_path: str) -> str:
163
- """Determine MIME type based on file extension."""
164
- extension = file_path.split('.')[-1].lower()
165
- if extension == 'jpg' or extension == 'jpeg':
166
- return "image/jpeg"
167
- elif extension == 'png':
 
 
 
 
168
  return "image/png"
169
- elif extension == 'gif':
170
  return "image/gif"
171
- elif extension == 'webp':
172
  return "image/webp"
173
- else:
174
- return f"image/{extension}"
 
 
 
 
 
 
 
175
 
176
- def process_interleaved_images(message: Dict[str, Any]) -> List[Dict[str, Any]]:
177
- """Process a message with interleaved text and image tags."""
178
- parts = re.split(r"(<image>)", message["text"])
 
 
179
 
180
  final_content = []
181
  current_text = ""
@@ -183,96 +199,128 @@ def process_interleaved_images(message: Dict[str, Any]) -> List[Dict[str, Any]]:
183
 
184
  for part in parts:
185
  if part == "<image>":
186
- # Add accumulated text if any
187
  if current_text.strip():
188
  final_content.append({"type": "text", "text": current_text.strip()})
189
  current_text = ""
190
 
191
- # Process and add the image as base64
192
- if image_index < len(message.get('files', [])):
193
- image_path = message['files'][image_index]
194
- base64_data = encode_image_to_base64(image_path)
195
- mime_type = get_mime_type(image_path)
196
-
197
- final_content.append({
198
- "type": "image_url",
199
- "image_url": {"url": f"data:{mime_type};base64,{base64_data}"}
200
- })
 
 
 
 
 
 
 
 
201
  image_index += 1
 
 
 
202
  else:
203
  current_text += part
204
 
205
- # Add any remaining text
206
  if current_text.strip():
207
  final_content.append({"type": "text", "text": current_text.strip()})
208
 
209
  return final_content
210
 
211
- def process_video(video_path: str) -> List[Dict[str, Any]]:
212
- """Placeholder for video processing function."""
213
- # Implementation depends on requirements for video processing
214
- pass
215
 
216
- def process_new_user_message(message: Dict[str, Any]) -> List[Dict[str, Any]]:
217
- """Process user message with potential images or videos."""
218
- # If no files, just return the text
219
- if not message.get("files", []):
220
- return [{"role": "user", "content": message["text"]}]
221
-
222
- # Handle video files
223
- if message["files"] and message["files"][0].endswith(".mp4"):
224
- text_message = {"role": "user", "content": message["text"]}
225
- video_messages = process_video(message["files"][0])
226
  return [text_message] + video_messages
227
-
228
- # Handle interleaved images
229
- if "<image>" in message["text"]:
230
- content = process_interleaved_images(message)
231
  return [{"role": "user", "content": content}]
232
 
233
- # Handle text with images appended
234
- content = [{"type": "text", "text": message["text"]}]
235
- for file_path in message.get("files", []):
236
- base64_data = encode_image_to_base64(file_path)
237
- mime_type = get_mime_type(file_path)
238
-
239
- content.append({
240
- "type": "image_url",
241
- "image_url": {"url": f"data:{mime_type};base64,{base64_data}"}
242
- })
 
 
 
 
 
 
 
 
 
 
 
 
 
243
 
 
 
 
244
  return [{"role": "user", "content": content}]
245
 
246
- def process_history(history: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
247
- """Process conversation history, encoding images as base64."""
248
  messages = []
249
 
250
  for item in history:
251
  if item["role"] == "assistant":
252
  messages.append({"role": "assistant", "content": item["content"]})
253
  else: # user messages
254
- content = item.get("content")
255
- if isinstance(content, str):
256
- messages.append({"role": "user", "content": content})
257
- elif isinstance(content, list) and len(content) > 0 and isinstance(content[0], str):
258
- # Image file path
259
- image_path = content[0]
260
- base64_data = encode_image_to_base64(image_path)
261
- mime_type = get_mime_type(image_path)
262
-
263
- messages.append({
264
- "role": "user",
265
- "content": [
266
- {
267
- "type": "image_url",
268
- "image_url": {"url": f"data:{mime_type};base64,{base64_data}"}
269
- }
270
- ]
271
- })
 
 
 
 
 
 
 
 
 
 
 
 
272
  else:
273
- # Already properly formatted content
274
- messages.append({"role": "user", "content": content})
275
-
276
  return messages
277
 
278
  def run(message: dict, history: list[dict]) -> Iterator[str]:
 
153
  import mimetypes # Added for MIME type detection
154
  import requests
155
  from typing import List, Dict, Union, Any
156
+ import base64
157
+ import re
158
+ import mimetypes # For fallback MIME type detection
159
 
160
  def encode_image_to_base64(image_path: str) -> str:
161
+ """Encodes a local image file to a base64 string."""
162
  with open(image_path, "rb") as image_file:
163
  return base64.b64encode(image_file.read()).decode('utf-8')
164
 
165
+ def get_image_media_type(image_path: str) -> str | None:
166
+ """
167
+ Determines the media type for an image file.
168
+ Returns the MIME string (e.g., "image/jpg") or None if not a recognized image type.
169
+ """
170
+ ext = image_path.split('.')[-1].lower()
171
+
172
+ if ext in ("jpg", "jpeg"):
173
+ return "image/jpg" # Align with the example snippet's "image/jpg"
174
+ elif ext == "png":
175
  return "image/png"
176
+ elif ext == "gif":
177
  return "image/gif"
178
+ elif ext == "webp":
179
  return "image/webp"
180
+
181
+ # Fallback to mimetypes for other potential image types
182
+ mime_type, _ = mimetypes.guess_type(image_path)
183
+ if mime_type and mime_type.startswith("image/"):
184
+ if mime_type == "image/jpeg": # If mimetypes returns image/jpeg, use image/jpg
185
+ return "image/jpg"
186
+ return mime_type
187
+
188
+ return None # Not a recognized/supported image type
189
 
190
+ def process_interleaved_images(message: dict) -> list:
191
+ """Processes messages with <image> tags interleaved with text."""
192
+ user_text = message.get("text", "")
193
+ files = message.get("files", [])
194
+ parts = re.split(r"(<image>)", user_text)
195
 
196
  final_content = []
197
  current_text = ""
 
199
 
200
  for part in parts:
201
  if part == "<image>":
 
202
  if current_text.strip():
203
  final_content.append({"type": "text", "text": current_text.strip()})
204
  current_text = ""
205
 
206
+ if image_index < len(files):
207
+ image_path = files[image_index]
208
+ media_type = get_image_media_type(image_path)
209
+ if media_type: # Proceed only if it's a recognized image type
210
+ try:
211
+ base64_image = encode_image_to_base64(image_path)
212
+ final_content.append({
213
+ "type": "image_url",
214
+ "image_url": {"url": f"data:{media_type};base64,{base64_image}"}
215
+ })
216
+ except FileNotFoundError:
217
+ # Optionally log this error or add a placeholder for the missing image
218
+ print(f"Warning: Image file not found: {image_path}")
219
+ except Exception as e:
220
+ print(f"Warning: Could not process image {image_path}: {e}")
221
+ else:
222
+ # File is not a recognized image type, or get_image_media_type returned None
223
+ print(f"Warning: File {image_path} is not a recognized image type or <image> tag mismatch.")
224
  image_index += 1
225
+ else:
226
+ # More <image> tags than files provided
227
+ print("Warning: <image> tag found but no corresponding file path in 'files' list.")
228
  else:
229
  current_text += part
230
 
 
231
  if current_text.strip():
232
  final_content.append({"type": "text", "text": current_text.strip()})
233
 
234
  return final_content
235
 
236
+ def process_new_user_message(message: dict) -> list:
237
+ """Processes a new user message, handling text, images, and potentially video."""
238
+ user_text = message.get("text", "")
239
+ files = message.get("files", [])
240
 
241
+ if not files:
242
+ return [{"role": "user", "content": user_text}]
243
+
244
+ if files and files[0].endswith(".mp4"):
245
+ text_message = {"role": "user", "content": user_text}
246
+ video_messages = process_video(files[0]) # process_video needs to be defined
 
 
 
 
247
  return [text_message] + video_messages
248
+
249
+ if "<image>" in user_text:
250
+ content = process_interleaved_images(message) # Pass the whole message dictionary
 
251
  return [{"role": "user", "content": content}]
252
 
253
+ # For text with images appended (if no <image> tags or if files exist beyond those for tags)
254
+ content = []
255
+ if user_text.strip(): # Add text part only if there's text
256
+ content.append({"type": "text", "text": user_text})
257
+
258
+ for path in files:
259
+ # This simplistic check assumes non-mp4 files could be images.
260
+ # If interleaved images already consumed some files, this might re-process or process remaining.
261
+ # A more sophisticated approach might be needed if mixing interleaved and appended from the same 'files' list.
262
+ if not path.endswith(".mp4"):
263
+ media_type = get_image_media_type(path)
264
+ if media_type: # Proceed only if it's a recognized image type
265
+ try:
266
+ base64_image = encode_image_to_base64(path)
267
+ content.append({
268
+ "type": "image_url",
269
+ "image_url": {"url": f"data:{media_type};base64,{base64_image}"}
270
+ })
271
+ except FileNotFoundError:
272
+ print(f"Warning: Image file not found during append: {path}")
273
+ except Exception as e:
274
+ print(f"Warning: Could not process image {path} during append: {e}")
275
+ # else: file is not a recognized image, skip.
276
 
277
+ if not content: # If after processing, content is empty (e.g. only non-image files provided with no text)
278
+ return [{"role": "user", "content": ""}] # Send empty content rather than nothing
279
+
280
  return [{"role": "user", "content": content}]
281
 
282
+ def process_history(history: list[dict]) -> list[dict]:
283
+ """Processes chat history, converting file:// image URLs to base64 data URLs."""
284
  messages = []
285
 
286
  for item in history:
287
  if item["role"] == "assistant":
288
  messages.append({"role": "assistant", "content": item["content"]})
289
  else: # user messages
290
+ current_content = item.get("content")
291
+ if isinstance(current_content, str):
292
+ messages.append({"role": "user", "content": current_content})
293
+ elif isinstance(current_content, list): # Multimodal content (list of dicts)
294
+ processed_content_parts = []
295
+ for part in current_content:
296
+ if part.get("type") == "image_url" and \
297
+ part.get("image_url", {}).get("url", "").startswith("file://"):
298
+ image_path = part["image_url"]["url"][7:] # Remove "file://"
299
+ media_type = get_image_media_type(image_path)
300
+ if media_type: # Proceed only if it's a recognized image type
301
+ try:
302
+ base64_image = encode_image_to_base64(image_path)
303
+ processed_content_parts.append({
304
+ "type": "image_url",
305
+ "image_url": {"url": f"data:{media_type};base64,{base64_image}"}
306
+ })
307
+ except FileNotFoundError:
308
+ print(f"Warning: History image file not found: {image_path}")
309
+ processed_content_parts.append(part) # Keep original part if file missing
310
+ except Exception as e:
311
+ print(f"Warning: Could not process history image {image_path}: {e}")
312
+ processed_content_parts.append(part) # Keep original part on other errors
313
+ else:
314
+ # Was a file:// URL but not a recognized image or path issue
315
+ print(f"Warning: History file {image_path} is not a recognized image type.")
316
+ processed_content_parts.append(part) # Keep original part
317
+ else:
318
+ processed_content_parts.append(part)
319
+ messages.append({"role": "user", "content": processed_content_parts})
320
  else:
321
+ # Content is not a string or list, pass as is or log warning
322
+ messages.append({"role": "user", "content": current_content if current_content is not None else ""})
323
+
324
  return messages
325
 
326
  def run(message: dict, history: list[dict]) -> Iterator[str]: