amaye15 commited on
Commit
fbb85b5
·
1 Parent(s): 527a73c
Files changed (1) hide show
  1. app/main.py +79 -113
app/main.py CHANGED
@@ -63,83 +63,68 @@ app.include_router(api_router, prefix="/api")
63
  async def make_api_request(method: str, endpoint: str, **kwargs):
64
  async with httpx.AsyncClient() as client:
65
  url = f"{API_BASE_URL}{endpoint}"
66
- try:
67
- response = await client.request(method, url, **kwargs)
68
- response.raise_for_status()
69
- return response.json()
70
- except httpx.RequestError as e:
71
- logger.error(f"HTTP Request failed: {e.request.method} {e.request.url} - {e}")
72
- return {"error": f"Network error contacting API: {e}"}
73
  except httpx.HTTPStatusError as e:
74
  logger.error(f"HTTP Status error: {e.response.status_code} - {e.response.text}")
75
  try: detail = e.response.json().get("detail", e.response.text)
76
  except json.JSONDecodeError: detail = e.response.text
77
  return {"error": f"API Error: {detail}"}
78
- except Exception as e:
79
- logger.error(f"Unexpected error during API call: {e}")
80
- return {"error": f"An unexpected error occurred: {str(e)}"}
81
 
82
- # --- WebSocket handling within Gradio ---
83
- # <<< MODIFIED: Accept trigger state object >>>
84
  async def listen_to_websockets(token: str, notification_list_state: gr.State, notification_trigger_state: gr.State):
85
  """Connects to WS and updates state list and trigger when a message arrives."""
86
  ws_listener_id = f"WSListener-{os.getpid()}-{asyncio.current_task().get_name()}"
87
  logger.info(f"[{ws_listener_id}] Starting WebSocket listener task.")
88
 
89
  if not token:
90
- logger.warning(f"[{ws_listener_id}] No token provided. Listener task exiting.")
91
- # <<< Return original state values >>>
92
- return notification_list_state.value, notification_trigger_state.value
93
 
94
  ws_url_base = API_BASE_URL.replace("http", "ws")
95
  ws_url = f"{ws_url_base}/ws/{token}"
96
- logger.info(f"[{ws_listener_id}] Attempting to connect to WebSocket: {ws_url}")
97
 
98
  try:
99
  async with websockets.connect(ws_url, open_timeout=15.0) as websocket:
100
- logger.info(f"[{ws_listener_id}] WebSocket connected successfully to {ws_url}")
101
  while True:
102
  try:
103
  message_str = await asyncio.wait_for(websocket.recv(), timeout=300.0)
104
- logger.info(f"[{ws_listener_id}] Received raw message: {message_str}")
105
  try:
106
  message_data = json.loads(message_str)
107
- logger.info(f"[{ws_listener_id}] Parsed message data: {message_data}")
108
-
109
  if message_data.get("type") == "new_user":
110
  notification = schemas.Notification(**message_data)
111
- logger.info(f"[{ws_listener_id}] Processing 'new_user' notification: {notification.message}")
112
-
113
- # <<< Modify state objects' values >>>
114
- current_list = notification_list_state.value.copy()
115
  current_list.insert(0, notification.message)
116
  if len(current_list) > 10: current_list.pop()
117
- notification_list_state.value = current_list
118
- logger.info(f"[{ws_listener_id}] State list updated via state object. New length: {len(notification_list_state.value)}. Content: {notification_list_state.value[:5]}")
119
-
120
- # <<< Update trigger state object's value >>>
121
- notification_trigger_state.value += 1
122
- logger.info(f"[{ws_listener_id}] Incremented notification trigger to {notification_trigger_state.value}")
123
-
124
  else:
125
- logger.warning(f"[{ws_listener_id}] Received message of unknown type: {message_data.get('type')}")
126
  # ... (error handling for parsing) ...
127
- except json.JSONDecodeError: logger.error(f"[{ws_listener_id}] Failed to decode JSON: {message_str}")
128
- except Exception as parse_err: logger.error(f"[{ws_listener_id}] Error processing message: {parse_err}")
129
  # ... (error handling for websocket recv/connection) ...
130
  except asyncio.TimeoutError: logger.debug(f"[{ws_listener_id}] WebSocket recv timed out."); continue
131
- except websockets.ConnectionClosedOK: logger.info(f"[{ws_listener_id}] WebSocket connection closed normally."); break
132
- except websockets.ConnectionClosedError as e: logger.error(f"[{ws_listener_id}] WebSocket connection closed with error: {e}"); break
133
- except Exception as e: logger.error(f"[{ws_listener_id}] Error in listener receive loop: {e}"); await asyncio.sleep(1); break # Break on unknown errors too
134
  # ... (error handling for websocket connect) ...
135
- except asyncio.TimeoutError: logger.error(f"[{ws_listener_id}] WebSocket initial connection timed out: {ws_url}")
136
- except websockets.exceptions.InvalidURI: logger.error(f"[{ws_listener_id}] Invalid WebSocket URI: {ws_url}")
137
  except websockets.exceptions.WebSocketException as e: logger.error(f"[{ws_listener_id}] WebSocket connection failed: {e}")
138
- except Exception as e: logger.error(f"[{ws_listener_id}] Unexpected error in WebSocket listener task: {e}")
139
 
140
  logger.info(f"[{ws_listener_id}] Listener task finished.")
141
- # <<< Return original state values >>>
142
- return notification_list_state.value, notification_trigger_state.value
143
 
144
  # --- Gradio Interface ---
145
  with gr.Blocks(theme=gr.themes.Soft()) as demo:
@@ -148,8 +133,9 @@ with gr.Blocks(theme=gr.themes.Soft()) as demo:
148
  user_info = gr.State(None)
149
  notification_list = gr.State([])
150
  websocket_task = gr.State(None)
151
- # <<< Add Dummy State >>>
152
- notification_trigger = gr.State(0) # Simple counter
 
153
 
154
  # --- UI Components ---
155
  with gr.Tabs() as tabs:
@@ -175,129 +161,109 @@ with gr.Blocks(theme=gr.themes.Soft()) as demo:
175
  logout_button = gr.Button("Logout")
176
  gr.Markdown("---")
177
  gr.Markdown("## Real-time Notifications")
178
- notification_display = gr.Textbox(
179
- label="New User Alerts",
180
- lines=5,
181
- max_lines=10,
182
- interactive=False,
183
- # <<< REMOVE every=1 >>>
184
  )
185
- # <<< Add Dummy Component (Hidden) >>>
186
- dummy_trigger_output = gr.Textbox(label="trigger", visible=False)
187
 
188
 
189
  # --- Event Handlers ---
190
 
191
  # Registration Logic (remains the same)
192
  async def handle_register(email, password, confirm_password):
193
- if not email or not password or not confirm_password: return gr.update(value="Please fill in all fields.")
194
- if password != confirm_password: return gr.update(value="Passwords do not match.")
195
- if len(password) < 8: return gr.update(value="Password must be at least 8 characters long.")
196
  payload = {"email": email, "password": password}
197
  result = await make_api_request("post", "/register", json=payload)
198
- if "error" in result: return gr.update(value=f"Registration failed: {result['error']}")
199
- else: return gr.update(value=f"Registration successful for {result.get('email')}! Please log in.")
200
-
201
  reg_button.click(handle_register, inputs=[reg_email, reg_password, reg_confirm_password], outputs=[reg_status])
202
 
203
  # Login Logic
204
- # <<< MODIFIED: Pass trigger state, update outputs >>>
205
- async def handle_login(email, password, current_task, current_trigger_val):
206
- # --- Output state needs to align with return values ---
207
- # Returns: login_status, auth_token, user_info, tabs, welcome_tab, welcome_message, websocket_task, notification_trigger
208
- outputs_tuple = (
209
- gr.update(value="Please enter email and password."), # login_status
210
- None, None, None, gr.update(visible=False), None, # auth_token, user_info, tabs, welcome_tab, welcome_message
211
- current_task, # websocket_task (no change)
212
- current_trigger_val # notification_trigger (no change)
213
- )
214
- if not email or not password: return outputs_tuple
215
 
216
  payload = {"email": email, "password": password}
217
  result = await make_api_request("post", "/login", json=payload)
218
 
219
- if "error" in result:
220
- outputs_tuple = (gr.update(value=f"Login failed: {result['error']}"), None, None, None, gr.update(visible=False), None, current_task, current_trigger_val)
221
- return outputs_tuple
222
  else:
223
  token = result.get("access_token")
224
  user_data = await dependencies.get_optional_current_user(token)
225
- if not user_data:
226
- outputs_tuple = (gr.update(value="Login succeeded but failed to fetch user data."), None, None, None, gr.update(visible=False), None, current_task, current_trigger_val)
227
- return outputs_tuple
228
 
229
  if current_task and not current_task.done():
230
  current_task.cancel()
231
  try: await current_task
232
  except asyncio.CancelledError: logger.info("Previous WebSocket task cancelled.")
233
 
234
- # <<< Pass both state objects to listener >>>
235
  new_task = asyncio.create_task(listen_to_websockets(token, notification_list, notification_trigger))
236
 
237
  welcome_msg = f"Welcome, {user_data.email}!"
238
- # --- Ensure number of return values matches outputs ---
239
  return (
240
- gr.update(value="Login successful!"), # login_status
241
- token, # auth_token state
242
- user_data.model_dump(), # user_info state
243
- gr.update(selected="welcome_tab"), # Switch Tabs
244
- gr.update(visible=True), # Make welcome tab visible
245
- gr.update(value=welcome_msg), # Update welcome message markdown
246
- new_task, # websocket_task state
247
- 0 # Reset notification_trigger state to 0 on login
248
  )
249
 
250
- # <<< MODIFIED: Add notification_trigger to inputs/outputs >>>
251
  login_button.click(
252
  handle_login,
253
- inputs=[login_email, login_password, websocket_task, notification_trigger],
254
- outputs=[login_status, auth_token, user_info, tabs, welcome_tab, welcome_message, websocket_task, notification_trigger]
255
  )
256
 
257
- # Function to update the notification display based on the list state
258
- # <<< Triggered by dummy component now >>>
259
- def update_notification_ui(notif_list): # Takes the list value directly now
260
- log_msg = f"UI Update Triggered via Dummy. State List Length: {len(notif_list)}. Content: {notif_list[:5]}"
261
- logger.info(log_msg) # Use info level to ensure visibility
262
- new_value = "\n".join(notif_list)
263
- return gr.update(value=new_value)
264
-
265
- # <<< Add Event handler for the dummy trigger >>>
266
- # When the dummy_trigger_output *would* change (because notification_trigger state changed)...
267
- # ...call the update_notification_ui function.
268
- # Pass the *current* value of notification_list state as input to the function.
269
- dummy_trigger_output.change(
270
- fn=update_notification_ui,
271
- inputs=[notification_list], # Input is the list state we want to display
272
- outputs=[notification_display] # Output updates the real display textbox
273
- )
274
 
275
- # <<< Link the dummy trigger state to the dummy output component >>>
276
- # This makes the dummy_trigger_output.change event fire when notification_trigger changes
277
- notification_trigger.change(lambda x: x, inputs=notification_trigger, outputs=dummy_trigger_output, queue=False) # Use queue=False for immediate trigger if possible
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
278
 
279
 
280
  # Logout Logic
281
- # <<< MODIFIED: Add notification_trigger to outputs >>>
282
  async def handle_logout(current_task):
283
  if current_task and not current_task.done():
284
  current_task.cancel()
285
  try: await current_task
286
  except asyncio.CancelledError: logger.info("WebSocket task cancelled on logout.")
287
- # --- Ensure number of return values matches outputs ---
288
  return ( None, None, [], None, # auth_token, user_info, notification_list, websocket_task
289
  gr.update(selected="login_tab"), gr.update(visible=False), # tabs, welcome_tab
290
  gr.update(value=""), gr.update(value=""), # welcome_message, login_status
291
- 0 ) # notification_trigger (reset to 0)
292
 
293
- # <<< MODIFIED: Add notification_trigger to outputs >>>
294
  logout_button.click(
295
  handle_logout,
296
  inputs=[websocket_task],
297
  outputs=[
298
  auth_token, user_info, notification_list, websocket_task,
299
  tabs, welcome_tab, welcome_message, login_status,
300
- notification_trigger # Add trigger to outputs
301
  ]
302
  )
303
 
 
63
  async def make_api_request(method: str, endpoint: str, **kwargs):
64
  async with httpx.AsyncClient() as client:
65
  url = f"{API_BASE_URL}{endpoint}"
66
+ try: response = await client.request(method, url, **kwargs); response.raise_for_status(); return response.json()
67
+ except httpx.RequestError as e: logger.error(f"HTTP Request failed: {e.request.method} {e.request.url} - {e}"); return {"error": f"Network error: {e}"}
 
 
 
 
 
68
  except httpx.HTTPStatusError as e:
69
  logger.error(f"HTTP Status error: {e.response.status_code} - {e.response.text}")
70
  try: detail = e.response.json().get("detail", e.response.text)
71
  except json.JSONDecodeError: detail = e.response.text
72
  return {"error": f"API Error: {detail}"}
73
+ except Exception as e: logger.error(f"Unexpected error during API call: {e}"); return {"error": f"Unexpected error: {str(e)}"}
 
 
74
 
75
+ # --- WebSocket handling ---
76
+ # <<< Pass state objects by reference >>>
77
  async def listen_to_websockets(token: str, notification_list_state: gr.State, notification_trigger_state: gr.State):
78
  """Connects to WS and updates state list and trigger when a message arrives."""
79
  ws_listener_id = f"WSListener-{os.getpid()}-{asyncio.current_task().get_name()}"
80
  logger.info(f"[{ws_listener_id}] Starting WebSocket listener task.")
81
 
82
  if not token:
83
+ logger.warning(f"[{ws_listener_id}] No token provided. Task exiting.")
84
+ return # Just exit, don't need to return state values
 
85
 
86
  ws_url_base = API_BASE_URL.replace("http", "ws")
87
  ws_url = f"{ws_url_base}/ws/{token}"
88
+ logger.info(f"[{ws_listener_id}] Attempting to connect: {ws_url}")
89
 
90
  try:
91
  async with websockets.connect(ws_url, open_timeout=15.0) as websocket:
92
+ logger.info(f"[{ws_listener_id}] WebSocket connected successfully.")
93
  while True:
94
  try:
95
  message_str = await asyncio.wait_for(websocket.recv(), timeout=300.0)
96
+ logger.info(f"[{ws_listener_id}] Received: {message_str[:100]}...")
97
  try:
98
  message_data = json.loads(message_str)
 
 
99
  if message_data.get("type") == "new_user":
100
  notification = schemas.Notification(**message_data)
101
+ logger.info(f"[{ws_listener_id}] Processing 'new_user': {notification.message}")
102
+ # --- Modify state values directly ---
103
+ current_list = notification_list_state.value.copy() # Operate on copy
 
104
  current_list.insert(0, notification.message)
105
  if len(current_list) > 10: current_list.pop()
106
+ notification_list_state.value = current_list # Assign back modified copy
107
+ notification_trigger_state.value += 1 # Increment trigger
108
+ # --- Log the update ---
109
+ logger.info(f"[{ws_listener_id}] States updated: list len={len(notification_list_state.value)}, trigger={notification_trigger_state.value}")
 
 
 
110
  else:
111
+ logger.warning(f"[{ws_listener_id}] Unknown message type: {message_data.get('type')}")
112
  # ... (error handling for parsing) ...
113
+ except json.JSONDecodeError: logger.error(f"[{ws_listener_id}] JSON Decode Error: {message_str}")
114
+ except Exception as parse_err: logger.error(f"[{ws_listener_id}] Message Processing Error: {parse_err}")
115
  # ... (error handling for websocket recv/connection) ...
116
  except asyncio.TimeoutError: logger.debug(f"[{ws_listener_id}] WebSocket recv timed out."); continue
117
+ except websockets.ConnectionClosedOK: logger.info(f"[{ws_listener_id}] WebSocket closed normally."); break
118
+ except websockets.ConnectionClosedError as e: logger.error(f"[{ws_listener_id}] WebSocket closed with error: {e}"); break
119
+ except Exception as e: logger.error(f"[{ws_listener_id}] Listener loop error: {e}"); await asyncio.sleep(1); break
120
  # ... (error handling for websocket connect) ...
121
+ except asyncio.TimeoutError: logger.error(f"[{ws_listener_id}] WebSocket initial connection timed out.")
122
+ except websockets.exceptions.InvalidURI: logger.error(f"[{ws_listener_id}] Invalid WebSocket URI.")
123
  except websockets.exceptions.WebSocketException as e: logger.error(f"[{ws_listener_id}] WebSocket connection failed: {e}")
124
+ except Exception as e: logger.error(f"[{ws_listener_id}] Unexpected error in listener task: {e}")
125
 
126
  logger.info(f"[{ws_listener_id}] Listener task finished.")
127
+ # No need to return state values when task ends
 
128
 
129
  # --- Gradio Interface ---
130
  with gr.Blocks(theme=gr.themes.Soft()) as demo:
 
133
  user_info = gr.State(None)
134
  notification_list = gr.State([])
135
  websocket_task = gr.State(None)
136
+ # Add trigger states
137
+ notification_trigger = gr.State(0)
138
+ last_polled_trigger = gr.State(0) # State to track last seen trigger
139
 
140
  # --- UI Components ---
141
  with gr.Tabs() as tabs:
 
161
  logout_button = gr.Button("Logout")
162
  gr.Markdown("---")
163
  gr.Markdown("## Real-time Notifications")
164
+ notification_display = gr.Textbox( # Visible display
165
+ label="New User Alerts", lines=5, max_lines=10, interactive=False
 
 
 
 
166
  )
167
+ # <<< Hidden component for polling >>>
168
+ dummy_poller = gr.Number(label="poller", value=0, visible=False, every=1)
169
 
170
 
171
  # --- Event Handlers ---
172
 
173
  # Registration Logic (remains the same)
174
  async def handle_register(email, password, confirm_password):
175
+ if not email or not password or not confirm_password: return gr.update(value="Please fill fields.")
176
+ if password != confirm_password: return gr.update(value="Passwords mismatch.")
177
+ if len(password) < 8: return gr.update(value="Password >= 8 chars.")
178
  payload = {"email": email, "password": password}
179
  result = await make_api_request("post", "/register", json=payload)
180
+ if "error" in result: return gr.update(value=f"Register failed: {result['error']}")
181
+ else: return gr.update(value=f"Register success: {result.get('email')}! Log in.")
 
182
  reg_button.click(handle_register, inputs=[reg_email, reg_password, reg_confirm_password], outputs=[reg_status])
183
 
184
  # Login Logic
185
+ # <<< MODIFIED: Pass/Reset both trigger states >>>
186
+ async def handle_login(email, password, current_task, current_trigger_val, current_last_poll_val):
187
+ # Define failure outputs matching the outputs list
188
+ fail_outputs = (gr.update(value="..."), None, None, None, gr.update(visible=False), None, current_task, current_trigger_val, current_last_poll_val)
189
+
190
+ if not email or not password: return fail_outputs[:1] + (gr.update(value="Enter email/password"),) + fail_outputs[2:]
 
 
 
 
 
191
 
192
  payload = {"email": email, "password": password}
193
  result = await make_api_request("post", "/login", json=payload)
194
 
195
+ if "error" in result: return (gr.update(value=f"Login failed: {result['error']}"),) + fail_outputs[1:]
 
 
196
  else:
197
  token = result.get("access_token")
198
  user_data = await dependencies.get_optional_current_user(token)
199
+ if not user_data: return (gr.update(value="Login ok but user fetch failed."),) + fail_outputs[1:]
 
 
200
 
201
  if current_task and not current_task.done():
202
  current_task.cancel()
203
  try: await current_task
204
  except asyncio.CancelledError: logger.info("Previous WebSocket task cancelled.")
205
 
206
+ # <<< Pass state objects to listener >>>
207
  new_task = asyncio.create_task(listen_to_websockets(token, notification_list, notification_trigger))
208
 
209
  welcome_msg = f"Welcome, {user_data.email}!"
210
+ # Reset triggers on successful login
211
  return (
212
+ gr.update(value="Login successful!"), token, user_data.model_dump(), # status, token, user_info
213
+ gr.update(selected="welcome_tab"), gr.update(visible=True), gr.update(value=welcome_msg), # UI changes
214
+ new_task, 0, 0 # websocket_task, notification_trigger, last_polled_trigger
 
 
 
 
 
215
  )
216
 
217
+ # <<< MODIFIED: Add last_polled_trigger to inputs/outputs >>>
218
  login_button.click(
219
  handle_login,
220
+ inputs=[login_email, login_password, websocket_task, notification_trigger, last_polled_trigger],
221
+ outputs=[login_status, auth_token, user_info, tabs, welcome_tab, welcome_message, websocket_task, notification_trigger, last_polled_trigger]
222
  )
223
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
224
 
225
+ # <<< Polling function >>>
226
+ def poll_and_update(current_trigger_value, last_known_trigger, current_notif_list):
227
+ """ Checks trigger state change and updates UI if needed. """
228
+ if current_trigger_value != last_known_trigger:
229
+ logger.info(f"Polling detected trigger change ({last_known_trigger} -> {current_trigger_value}). Updating UI.")
230
+ new_value = "\n".join(current_notif_list)
231
+ # Return new display value AND update last_known_trigger state value
232
+ return gr.update(value=new_value), current_trigger_value
233
+ else:
234
+ # No change, return NoUpdate for display AND existing last_known_trigger value
235
+ return gr.NoUpdate(), last_known_trigger
236
+
237
+ # <<< Attach polling function to the dummy component's change event >>>
238
+ dummy_poller.change(
239
+ fn=poll_and_update,
240
+ inputs=[notification_trigger, last_polled_trigger, notification_list],
241
+ outputs=[notification_display, last_polled_trigger], # Update display & last polled state
242
+ queue=False # Try immediate update
243
+ )
244
 
245
 
246
  # Logout Logic
247
+ # <<< MODIFIED: Reset both trigger states >>>
248
  async def handle_logout(current_task):
249
  if current_task and not current_task.done():
250
  current_task.cancel()
251
  try: await current_task
252
  except asyncio.CancelledError: logger.info("WebSocket task cancelled on logout.")
253
+ # Reset all relevant states
254
  return ( None, None, [], None, # auth_token, user_info, notification_list, websocket_task
255
  gr.update(selected="login_tab"), gr.update(visible=False), # tabs, welcome_tab
256
  gr.update(value=""), gr.update(value=""), # welcome_message, login_status
257
+ 0, 0 ) # notification_trigger, last_polled_trigger (reset)
258
 
259
+ # <<< MODIFIED: Add last_polled_trigger to outputs >>>
260
  logout_button.click(
261
  handle_logout,
262
  inputs=[websocket_task],
263
  outputs=[
264
  auth_token, user_info, notification_list, websocket_task,
265
  tabs, welcome_tab, welcome_message, login_status,
266
+ notification_trigger, last_polled_trigger # Add trigger states here
267
  ]
268
  )
269