dxdcx commited on
Commit
dd0f3c2
·
verified ·
1 Parent(s): 5969439

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +75 -40
app.py CHANGED
@@ -147,14 +147,32 @@ def process_video(video_path: str) -> list[dict]:
147
  })
148
 
149
  return image_messages
 
 
 
 
150
 
151
-
152
- def encode_image_to_base64(image_path):
153
- import mimetypes
154
- mime_type, _ = mimetypes.guess_type(image_path)
155
  with open(image_path, "rb") as image_file:
156
- encoded = base64.b64encode(image_file.read()).decode("utf-8")
157
- return f"data:{mime_type};base64,{encoded}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
158
 
159
 
160
  def process_interleaved_images(message: dict) -> list:
@@ -166,72 +184,89 @@ def process_interleaved_images(message: dict) -> list:
166
 
167
  for part in parts:
168
  if part == "<image>":
 
169
  if current_text.strip():
170
  final_content.append({"type": "text", "text": current_text.strip()})
171
  current_text = ""
172
- encoded_image = encode_image_to_base64(message["files"][image_index])
173
- final_content.append({
174
- "type": "image_url",
175
- "image_url": {"url": encoded_image}
176
- })
177
- image_index += 1
 
 
 
 
 
178
  else:
179
  current_text += part
180
-
 
181
  if current_text.strip():
182
  final_content.append({"type": "text", "text": current_text.strip()})
183
-
184
  return final_content
185
 
186
 
187
- def process_new_user_message(message: dict):
188
- if not message["files"]:
189
  return [{"role": "user", "content": message["text"]}]
190
 
191
  if message["files"][0].endswith(".mp4"):
192
  text_message = {"role": "user", "content": message["text"]}
193
- video_messages = process_video(message["files"][0])
194
  return [text_message] + video_messages
195
 
196
  if "<image>" in message["text"]:
 
197
  content = process_interleaved_images(message)
198
  return [{"role": "user", "content": content}]
199
 
200
  # For text with images appended
201
  content = [{"type": "text", "text": message["text"]}]
202
  for path in message["files"]:
203
- encoded_image = encode_image_to_base64(path)
204
- content.append({
205
- "type": "image_url",
206
- "image_url": {"url": encoded_image}
207
- })
 
 
208
 
209
  return [{"role": "user", "content": content}]
210
 
211
 
212
  def process_history(history: list[dict]) -> list[dict]:
213
  messages = []
214
-
215
  for item in history:
216
  if item["role"] == "assistant":
217
  messages.append({"role": "assistant", "content": item["content"]})
218
- else:
219
- content = item["content"]
220
- if isinstance(content, str):
221
- messages.append({"role": "user", "content": content})
222
- else:
223
- # Assume content[0] is a file path
224
- encoded_image = encode_image_to_base64(content[0])
225
- messages.append({
226
- "role": "user",
227
- "content": [
228
- {
229
- "type": "image_url",
230
- "image_url": {"url": encoded_image}
231
- }
232
- ]
233
- })
234
-
 
 
 
 
 
 
 
235
  return messages
236
 
237
 
 
147
  })
148
 
149
  return image_messages
150
+
151
+ import base64
152
+ import re
153
+ import mimetypes # Added for MIME type detection
154
 
155
+ def encode_image_to_base64(image_path: str) -> str:
 
 
 
156
  with open(image_path, "rb") as image_file:
157
+ return base64.b64encode(image_file.read()).decode('utf-8')
158
+
159
+ def get_image_media_type(image_path: str) -> str:
160
+ ext = image_path.split('.')[-1].lower()
161
+ if ext in ("jpg", "jpeg"):
162
+ return "image/jpeg"
163
+ elif ext == "png":
164
+ return "image/png"
165
+ elif ext == "gif":
166
+ return "image/gif"
167
+ elif ext == "webp":
168
+ return "image/webp"
169
+ else:
170
+ # Fallback to mimetypes detection
171
+ mime_type, _ = mimetypes.guess_type(image_path)
172
+ if mime_type and mime_type.startswith("image/"):
173
+ return mime_type
174
+ # Default fallback if type is unknown or not a recognized image type
175
+ return "application/octet-stream"
176
 
177
 
178
  def process_interleaved_images(message: dict) -> list:
 
184
 
185
  for part in parts:
186
  if part == "<image>":
187
+ # If we have accumulated text, add it first
188
  if current_text.strip():
189
  final_content.append({"type": "text", "text": current_text.strip()})
190
  current_text = ""
191
+
192
+ # Add the image as base64 data URL
193
+ if image_index < len(message['files']):
194
+ image_path = message['files'][image_index]
195
+ base64_image = encode_image_to_base64(image_path)
196
+ media_type = get_image_media_type(image_path)
197
+ final_content.append({
198
+ "type": "image_url",
199
+ "image_url": {"url": f"data:{media_type};base64,{base64_image}"}
200
+ })
201
+ image_index += 1
202
  else:
203
  current_text += part
204
+
205
+ # Add any remaining text
206
  if current_text.strip():
207
  final_content.append({"type": "text", "text": current_text.strip()})
208
+
209
  return final_content
210
 
211
 
212
+ def process_new_user_message(message: dict) -> list:
213
+ if not message.get("files"): # Check if "files" key exists and is not empty
214
  return [{"role": "user", "content": message["text"]}]
215
 
216
  if message["files"][0].endswith(".mp4"):
217
  text_message = {"role": "user", "content": message["text"]}
218
+ video_messages = process_video(message["files"][0]) # process_video needs to be defined
219
  return [text_message] + video_messages
220
 
221
  if "<image>" in message["text"]:
222
+ # For interleaved text and images
223
  content = process_interleaved_images(message)
224
  return [{"role": "user", "content": content}]
225
 
226
  # For text with images appended
227
  content = [{"type": "text", "text": message["text"]}]
228
  for path in message["files"]:
229
+ if not path.endswith(".mp4"): # Simple check to avoid processing videos as images
230
+ base64_image = encode_image_to_base64(path)
231
+ media_type = get_image_media_type(path)
232
+ content.append({
233
+ "type": "image_url",
234
+ "image_url": {"url": f"data:{media_type};base64,{base64_image}"}
235
+ })
236
 
237
  return [{"role": "user", "content": content}]
238
 
239
 
240
  def process_history(history: list[dict]) -> list[dict]:
241
  messages = []
242
+
243
  for item in history:
244
  if item["role"] == "assistant":
245
  messages.append({"role": "assistant", "content": item["content"]})
246
+ else: # user messages
247
+ current_content = item["content"]
248
+ if isinstance(current_content, str):
249
+ messages.append({"role": "user", "content": current_content})
250
+ elif isinstance(current_content, list): # Handles multimodal content (list of dicts)
251
+ processed_content_parts = []
252
+ for part in current_content:
253
+ if part.get("type") == "image_url" and \
254
+ part.get("image_url", {}).get("url", "").startswith("file://"):
255
+ image_path = part["image_url"]["url"][7:] # Remove "file://"
256
+ try:
257
+ base64_image = encode_image_to_base64(image_path)
258
+ media_type = get_image_media_type(image_path)
259
+ processed_content_parts.append({
260
+ "type": "image_url",
261
+ "image_url": {"url": f"data:{media_type};base64,{base64_image}"}
262
+ })
263
+ except FileNotFoundError:
264
+ # Handle missing file if necessary, e.g., skip or keep original
265
+ processed_content_parts.append(part) # Keep original if file not found
266
+ else:
267
+ processed_content_parts.append(part)
268
+ messages.append({"role": "user", "content": processed_content_parts})
269
+
270
  return messages
271
 
272