LiamKhoaLe commited on
Commit
03ab354
·
1 Parent(s): 33151ff

Update navigation with visited path and unreachable target flagged

Browse files
Files changed (1) hide show
  1. app.py +49 -49
app.py CHANGED
@@ -170,62 +170,59 @@ def highlight_water_mask_on_frame(frame, binary_mask, color=(255, 0, 0), alpha=0
170
 
171
  # ── A* and KNN over binary water grid ─────────────────────────────────
172
  def astar(start, goal, occ):
173
- h = lambda a,b: abs(a[0]-b[0])+abs(a[1]-b[1])
174
- N8 = [(-1,-1),(-1,0),(-1,1),(0,-1),(0,1),(1,-1),(1,0),(1,1)]
175
- openq=[(0,start)]; g={start:0}; came={}
 
 
 
 
176
  while openq:
177
- _,cur=heapq.heappop(openq)
178
- if cur==goal:
179
- p=[cur]; # reconstruct
180
- while cur in came: cur=came[cur]; p.append(cur)
 
 
181
  return p[::-1]
182
- for dx,dy in N8:
183
- nx,ny=cur[0]+dx,cur[1]+dy
184
- # out-of-bounds / blocked
185
- if not (0<=nx<640 and 0<=ny<640) or occ[ny,nx]==0: continue
186
- # if diagonal, ensure both orthogonals are free
187
- if abs(dx)==1 and abs(dy)==1:
188
- if occ[cur[1]+dy, cur[0]]==0 or occ[cur[1], cur[0]+dx]==0:
 
 
 
189
  continue
190
- ng=g[cur]+1
191
- if (nx,ny) not in g or ng<g[(nx,ny)]:
192
- g[(nx,ny)]=ng
193
- f=ng+h((nx,ny),goal)
194
- heapq.heappush(openq,(f,(nx,ny)))
195
- came[(nx,ny)]=cur
 
 
196
  return []
 
197
  # KNN fit optimal path
198
  def knn_path(start, targets, occ):
199
  todo = targets[:]; path=[]
200
  cur = tuple(start)
201
- while todo:
202
- # KNN follow a Greedy approach, which may not guarantee shortest path, hence only use A*
203
- # nbrs = NearestNeighbors(n_neighbors=1).fit(todo)
204
- # _,idx = nbrs.kneighbors([cur]); nxt=tuple(todo[idx[0][0]])
205
- # seg = astar(cur, nxt, occ)
206
- # if seg:
207
- # if path and seg[0]==path[-1]: seg=seg[1:]
208
- # path.extend(seg)
209
- # cur = nxt; todo.remove(list(nxt))
210
- best = None
211
- best_len = float('inf')
212
- best_seg = []
213
- # Try A* to each target, find shortest actual path
214
- for t in todo:
215
- seg = astar(cur, tuple(t), occ)
216
- if seg and len(seg) < best_len:
217
- best = tuple(t)
218
- best_len = len(seg)
219
- best_seg = seg
220
- if not best:
221
- print("⚠️ Some garbage unreachable")
222
- break # stop if no reachable targets left
223
- if path and path[-1] == best_seg[0]:
224
- best_seg = best_seg[1:] # avoid duplicate point
225
- path.extend(best_seg)
226
- cur = best
227
- todo.remove(list(best))
228
- return path
229
 
230
 
231
  # ── Robot sprite/class -──────────────────────────────────────────────────
@@ -487,7 +484,10 @@ def _pipeline(uid,img_path):
487
  robot = Robot(SPRITE)
488
  # Robot will be spawn on the closest movable mask to top-left
489
  robot.pos = [spawn_x, spawn_y]
490
- path = knn_path(robot.pos, centres, movable_mask)
 
 
 
491
 
492
  # 4- Video synthesis
493
  out_tmp=f"{OUTPUT_DIR}/{uid}_tmp.mp4"
 
170
 
171
  # ── A* and KNN over binary water grid ─────────────────────────────────
172
  def astar(start, goal, occ):
173
+ h = lambda a, b: abs(a[0] - b[0]) + abs(a[1] - b[1])
174
+ N8 = [(-1,-1), (-1,0), (-1,1), (0,-1), (0,1), (1,-1), (1,0), (1,1)]
175
+ openq = [(0, start)]
176
+ g = {start: 0}
177
+ came = {}
178
+ visited = set()
179
+ H, W = occ.shape
180
  while openq:
181
+ _, cur = heapq.heappop(openq)
182
+ if cur == goal:
183
+ p = [cur]
184
+ while cur in came:
185
+ cur = came[cur]
186
+ p.append(cur)
187
  return p[::-1]
188
+ if cur in visited:
189
+ continue
190
+ visited.add(cur)
191
+ for dx, dy in N8:
192
+ nx, ny = cur[0] + dx, cur[1] + dy
193
+ if not (0 <= nx < W and 0 <= ny < H): continue
194
+ if occ[ny, nx] == 0: continue
195
+ # prevent corner-cutting on diagonals
196
+ if abs(dx) == 1 and abs(dy) == 1:
197
+ if occ[cur[1] + dy, cur[0]] == 0 or occ[cur[1], cur[0] + dx] == 0:
198
  continue
199
+ next_node = (nx, ny)
200
+ ng = g[cur] + 1
201
+ if next_node not in g or ng < g[next_node]:
202
+ g[next_node] = ng
203
+ f = ng + h(next_node, goal)
204
+ heapq.heappush(openq, (f, next_node))
205
+ came[next_node] = cur
206
+ print(f"[A*] No path from {start} to {goal}")
207
  return []
208
+
209
  # KNN fit optimal path
210
  def knn_path(start, targets, occ):
211
  todo = targets[:]; path=[]
212
  cur = tuple(start)
213
+ reachable = []; unreachable = []
214
+ for t in todo:
215
+ seg = astar(cur, tuple(t), occ)
216
+ if seg:
217
+ if path and path[-1] == seg[0]:
218
+ seg = seg[1:]
219
+ path.extend(seg)
220
+ reachable.append(t)
221
+ cur = tuple(t)
222
+ else:
223
+ unreachable.append(t)
224
+ print(f"⚠️ Unreachable garbage at {t}")
225
+ return path, reachable
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
226
 
227
 
228
  # ── Robot sprite/class -──────────────────────────────────────────────────
 
484
  robot = Robot(SPRITE)
485
  # Robot will be spawn on the closest movable mask to top-left
486
  robot.pos = [spawn_x, spawn_y]
487
+ path, unreachable_targets = knn_path(robot.pos, centres, movable_mask)
488
+ objs = [{"pos": p, "col": False, "unreachable": False} for p in centres if p not in unreachable_targets]
489
+ objs += [{"pos": p, "col": False, "unreachable": True} for p in unreachable_targets]
490
+
491
 
492
  # 4- Video synthesis
493
  out_tmp=f"{OUTPUT_DIR}/{uid}_tmp.mp4"