Chandima Prabhath commited on
Commit
0e53ecc
·
1 Parent(s): 571de7b

Implement task queue for sequential message processing and refactor audio/image handling

Browse files
Files changed (1) hide show
  1. app.py +52 -21
app.py CHANGED
@@ -2,6 +2,7 @@ import os
2
  import threading
3
  import requests
4
  import logging
 
5
  from fastapi import FastAPI, Request, HTTPException
6
  from fastapi.responses import PlainTextResponse, JSONResponse
7
  from FLUX import generate_image
@@ -11,20 +12,45 @@ from llm import generate_llm
11
  # Configure logging for debugging
12
  logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s")
13
 
 
14
  GREEN_API_URL = os.getenv("GREEN_API_URL")
15
  GREEN_API_MEDIA_URL = os.getenv("GREEN_API_MEDIA_URL", "https://api.green-api.com")
16
  GREEN_API_TOKEN = os.getenv("GREEN_API_TOKEN")
17
  GREEN_API_ID_INSTANCE = os.getenv("GREEN_API_ID_INSTANCE")
18
  WEBHOOK_AUTH_TOKEN = os.getenv("WEBHOOK_AUTH_TOKEN")
19
- PORT = 7860
20
  image_dir = "/tmp/images"
21
  audio_dir = "/tmp/audio"
22
 
23
  if not all([GREEN_API_URL, GREEN_API_TOKEN, GREEN_API_ID_INSTANCE, WEBHOOK_AUTH_TOKEN]):
24
  raise ValueError("Environment variables are not set properly")
25
 
 
 
 
26
  app = FastAPI()
27
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28
  def send_message(message_id, to_number, message, retries=3):
29
  chat_id = to_number if to_number.endswith('@g.us') else to_number
30
  url = f"{GREEN_API_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendMessage/{GREEN_API_TOKEN}"
@@ -44,6 +70,7 @@ def send_message(message_id, to_number, message, retries=3):
44
  continue
45
  return {"error": str(e)}
46
 
 
47
  def send_image(message_id, to_number, image_path, retries=3):
48
  chat_id = to_number if to_number.endswith('@g.us') else to_number
49
  url = f"{GREEN_API_MEDIA_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendFileByUpload/{GREEN_API_TOKEN}"
@@ -60,10 +87,8 @@ def send_image(message_id, to_number, image_path, retries=3):
60
  continue
61
  return {"error": str(e)}
62
 
 
63
  def send_audio(message_id, to_number, audio_path, retries=3):
64
- """
65
- Send an audio file using the Green API similar to send_image.
66
- """
67
  logging.debug("Entering send_audio")
68
  chat_id = to_number if to_number.endswith('@g.us') else to_number
69
 
@@ -91,6 +116,7 @@ def send_audio(message_id, to_number, audio_path, retries=3):
91
  logging.debug(f"Failed to open audio file: {e}")
92
  return {"error": str(e)}
93
 
 
94
  def response_text(message_id, chat_id, prompt):
95
  try:
96
  msg = generate_llm(prompt)
@@ -98,27 +124,28 @@ def response_text(message_id, chat_id, prompt):
98
  except Exception as e:
99
  send_message(message_id, chat_id, "There was an error processing your request.")
100
 
 
101
  def response_audio(message_id, chat_id, prompt):
102
  logging.debug("Entering response_audio with prompt: %s", prompt)
103
  try:
104
  result = generate_voice_reply(prompt, model="openai-audio", voice="coral", audio_dir=audio_dir)
105
  logging.debug("Result from generate_voice_reply: %s", result)
106
- # Check result and also ensure the audio_file_path is not None or empty
107
  if result and result[0]:
108
- audio_file_path, audio_data = result
109
  logging.debug("Audio file path generated: %s", audio_file_path)
110
  send_result = send_audio(message_id, chat_id, audio_file_path)
111
  logging.debug("Result from send_audio: %s", send_result)
112
  if os.path.exists(audio_file_path):
113
- os.remove(audio_file_path) # Clean up the file after sending
114
  logging.debug("Removed audio file: %s", audio_file_path)
115
  else:
116
- logging.debug("generate_voice_reply returned None or empty audio file path, falling back to response_text")
117
  response_text(message_id, chat_id, prompt)
118
  except Exception as e:
119
  logging.debug("Exception in response_audio: %s", e)
120
  send_message(message_id, chat_id, "There was an error generating the audio. Please try again later.")
121
 
 
122
  def handle_image_generation(message_id, chat_id, prompt):
123
  try:
124
  image, image_path, returned_prompt, image_url = generate_image(prompt, message_id, message_id, image_dir)
@@ -131,7 +158,7 @@ def handle_image_generation(message_id, chat_id, prompt):
131
  )
132
  else:
133
  send_message(message_id, chat_id, "Failed to generate image. Please try again later.")
134
- except Exception as e:
135
  send_message(message_id, chat_id, "There was an error generating the image. Please try again later.")
136
 
137
  @app.get("/", response_class=PlainTextResponse)
@@ -143,23 +170,20 @@ async def whatsapp_webhook(request: Request):
143
  auth_header = request.headers.get('Authorization', '').strip()
144
  if auth_header != f"Bearer {WEBHOOK_AUTH_TOKEN}":
145
  raise HTTPException(status_code=403, detail="Unauthorized")
 
146
  try:
147
  data = await request.json()
148
  except Exception:
149
  return JSONResponse(content={"error": "Invalid JSON"}, status_code=400)
 
150
  if data.get('typeWebhook') != 'incomingMessageReceived':
151
  return {"success": True}
 
152
  try:
153
  chat_id = data['senderData']['chatId']
154
  message_id = data['idMessage']
155
  message_data = data.get('messageData', {})
156
 
157
- # Ignore messages that are replies
158
- if 'extendedTextMessageData' in message_data:
159
- if message_data['extendedTextMessageData'].get('quotedMessageId'):
160
- return {"success": True}
161
- # If needed, add checks for other message types here
162
-
163
  if 'textMessageData' in message_data:
164
  body = message_data['textMessageData']['textMessage'].strip()
165
  elif 'extendedTextMessageData' in message_data:
@@ -168,20 +192,27 @@ async def whatsapp_webhook(request: Request):
168
  return {"success": True}
169
  except KeyError as e:
170
  return JSONResponse(content={"error": f"Missing key in data: {e}"}, status_code=200)
 
 
 
 
 
 
 
 
 
171
  if body.lower().startswith('/imagine'):
172
  prompt = body.replace('/imagine', '').strip()
173
  if not prompt:
174
  send_message(message_id, chat_id, "Please provide a prompt after /imagine.")
175
  else:
176
  send_message(message_id, chat_id, "Generating...")
177
- threading.Thread(target=handle_image_generation, args=(message_id, chat_id, prompt)).start()
178
  else:
179
- threading.Thread(target=response_audio, args=(message_id, chat_id, body)).start()
 
180
  return {"success": True}
181
 
182
- def main():
183
  import uvicorn
184
  uvicorn.run(app, host="0.0.0.0", port=PORT)
185
-
186
- if __name__ == '__main__':
187
- main()
 
2
  import threading
3
  import requests
4
  import logging
5
+ import queue
6
  from fastapi import FastAPI, Request, HTTPException
7
  from fastapi.responses import PlainTextResponse, JSONResponse
8
  from FLUX import generate_image
 
12
  # Configure logging for debugging
13
  logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s")
14
 
15
+ # Environment variables
16
  GREEN_API_URL = os.getenv("GREEN_API_URL")
17
  GREEN_API_MEDIA_URL = os.getenv("GREEN_API_MEDIA_URL", "https://api.green-api.com")
18
  GREEN_API_TOKEN = os.getenv("GREEN_API_TOKEN")
19
  GREEN_API_ID_INSTANCE = os.getenv("GREEN_API_ID_INSTANCE")
20
  WEBHOOK_AUTH_TOKEN = os.getenv("WEBHOOK_AUTH_TOKEN")
21
+ PORT = int(os.getenv("PORT", 7860))
22
  image_dir = "/tmp/images"
23
  audio_dir = "/tmp/audio"
24
 
25
  if not all([GREEN_API_URL, GREEN_API_TOKEN, GREEN_API_ID_INSTANCE, WEBHOOK_AUTH_TOKEN]):
26
  raise ValueError("Environment variables are not set properly")
27
 
28
+ # Task queue for processing messages sequentially
29
+ task_queue = queue.Queue()
30
+
31
  app = FastAPI()
32
 
33
+ # Worker thread to process queued tasks one by one
34
+ def worker():
35
+ while True:
36
+ task = task_queue.get()
37
+ try:
38
+ typ = task.get("type")
39
+ mid = task.get("message_id")
40
+ cid = task.get("chat_id")
41
+ if typ == "image":
42
+ handle_image_generation(mid, cid, task.get("prompt"))
43
+ elif typ == "audio":
44
+ response_audio(mid, cid, task.get("prompt"))
45
+ except Exception as e:
46
+ logging.error(f"Error processing task {task}: {e}")
47
+ finally:
48
+ task_queue.task_done()
49
+
50
+ # Start the worker thread
51
+ t = threading.Thread(target=worker, daemon=True)
52
+ t.start()
53
+
54
  def send_message(message_id, to_number, message, retries=3):
55
  chat_id = to_number if to_number.endswith('@g.us') else to_number
56
  url = f"{GREEN_API_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendMessage/{GREEN_API_TOKEN}"
 
70
  continue
71
  return {"error": str(e)}
72
 
73
+
74
  def send_image(message_id, to_number, image_path, retries=3):
75
  chat_id = to_number if to_number.endswith('@g.us') else to_number
76
  url = f"{GREEN_API_MEDIA_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendFileByUpload/{GREEN_API_TOKEN}"
 
87
  continue
88
  return {"error": str(e)}
89
 
90
+
91
  def send_audio(message_id, to_number, audio_path, retries=3):
 
 
 
92
  logging.debug("Entering send_audio")
93
  chat_id = to_number if to_number.endswith('@g.us') else to_number
94
 
 
116
  logging.debug(f"Failed to open audio file: {e}")
117
  return {"error": str(e)}
118
 
119
+
120
  def response_text(message_id, chat_id, prompt):
121
  try:
122
  msg = generate_llm(prompt)
 
124
  except Exception as e:
125
  send_message(message_id, chat_id, "There was an error processing your request.")
126
 
127
+
128
  def response_audio(message_id, chat_id, prompt):
129
  logging.debug("Entering response_audio with prompt: %s", prompt)
130
  try:
131
  result = generate_voice_reply(prompt, model="openai-audio", voice="coral", audio_dir=audio_dir)
132
  logging.debug("Result from generate_voice_reply: %s", result)
 
133
  if result and result[0]:
134
+ audio_file_path, _ = result
135
  logging.debug("Audio file path generated: %s", audio_file_path)
136
  send_result = send_audio(message_id, chat_id, audio_file_path)
137
  logging.debug("Result from send_audio: %s", send_result)
138
  if os.path.exists(audio_file_path):
139
+ os.remove(audio_file_path)
140
  logging.debug("Removed audio file: %s", audio_file_path)
141
  else:
142
+ logging.debug("Falling back to text response")
143
  response_text(message_id, chat_id, prompt)
144
  except Exception as e:
145
  logging.debug("Exception in response_audio: %s", e)
146
  send_message(message_id, chat_id, "There was an error generating the audio. Please try again later.")
147
 
148
+
149
  def handle_image_generation(message_id, chat_id, prompt):
150
  try:
151
  image, image_path, returned_prompt, image_url = generate_image(prompt, message_id, message_id, image_dir)
 
158
  )
159
  else:
160
  send_message(message_id, chat_id, "Failed to generate image. Please try again later.")
161
+ except Exception:
162
  send_message(message_id, chat_id, "There was an error generating the image. Please try again later.")
163
 
164
  @app.get("/", response_class=PlainTextResponse)
 
170
  auth_header = request.headers.get('Authorization', '').strip()
171
  if auth_header != f"Bearer {WEBHOOK_AUTH_TOKEN}":
172
  raise HTTPException(status_code=403, detail="Unauthorized")
173
+
174
  try:
175
  data = await request.json()
176
  except Exception:
177
  return JSONResponse(content={"error": "Invalid JSON"}, status_code=400)
178
+
179
  if data.get('typeWebhook') != 'incomingMessageReceived':
180
  return {"success": True}
181
+
182
  try:
183
  chat_id = data['senderData']['chatId']
184
  message_id = data['idMessage']
185
  message_data = data.get('messageData', {})
186
 
 
 
 
 
 
 
187
  if 'textMessageData' in message_data:
188
  body = message_data['textMessageData']['textMessage'].strip()
189
  elif 'extendedTextMessageData' in message_data:
 
192
  return {"success": True}
193
  except KeyError as e:
194
  return JSONResponse(content={"error": f"Missing key in data: {e}"}, status_code=200)
195
+
196
+ # Ignore replies between other users
197
+ if 'extendedTextMessageData' in message_data:
198
+ ctx = message_data['extendedTextMessageData'].get('contextInfo', {})
199
+ if ctx.get('quotedMessageId'):
200
+ logging.debug(f"Ignoring reply message (quotedMessageId={ctx['quotedMessageId']})")
201
+ return {"success": True}
202
+
203
+ # Enqueue tasks instead of spawning threads
204
  if body.lower().startswith('/imagine'):
205
  prompt = body.replace('/imagine', '').strip()
206
  if not prompt:
207
  send_message(message_id, chat_id, "Please provide a prompt after /imagine.")
208
  else:
209
  send_message(message_id, chat_id, "Generating...")
210
+ task_queue.put({"type": "image", "message_id": message_id, "chat_id": chat_id, "prompt": prompt})
211
  else:
212
+ task_queue.put({"type": "audio", "message_id": message_id, "chat_id": chat_id, "prompt": body})
213
+
214
  return {"success": True}
215
 
216
+ if __name__ == '__main__':
217
  import uvicorn
218
  uvicorn.run(app, host="0.0.0.0", port=PORT)