broadfield-dev commited on
Commit
1684743
·
verified ·
1 Parent(s): ad2d527

Create parser.py

Browse files
Files changed (1) hide show
  1. parser.py +328 -0
parser.py ADDED
@@ -0,0 +1,328 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # parser.py
2
+ import ast
3
+
4
+ def get_category(node, parent=None):
5
+ """Determine the category of an AST node or variable context, including variable roles."""
6
+ if isinstance(node, (ast.Import, ast.ImportFrom)):
7
+ return 'import'
8
+ elif isinstance(node, ast.FunctionDef) or isinstance(node, ast.AsyncFunctionDef):
9
+ return 'function'
10
+ elif isinstance(node, ast.ClassDef):
11
+ return 'class'
12
+ elif isinstance(node, ast.If):
13
+ return 'if'
14
+ elif isinstance(node, ast.While):
15
+ return 'while'
16
+ elif isinstance(node, ast.For):
17
+ return 'for'
18
+ elif isinstance(node, ast.Try):
19
+ return 'try'
20
+ elif isinstance(node, ast.Return):
21
+ return 'return'
22
+ elif isinstance(node, ast.Expr):
23
+ return 'expression'
24
+ elif isinstance(node, ast.ExceptHandler):
25
+ return 'except'
26
+ elif isinstance(node, (ast.Assign, ast.AnnAssign, ast.AugAssign)):
27
+ if parent and isinstance(parent, (ast.FunctionDef, ast.AsyncFunctionDef, ast.If, ast.Try, ast.While, ast.For)):
28
+ return 'assigned_variable'
29
+ elif isinstance(node, ast.arg): # Input variables in function definitions
30
+ if parent and isinstance(parent, (ast.FunctionDef, ast.AsyncFunctionDef)):
31
+ return 'input_variable'
32
+ elif isinstance(node, ast.Name): # Returned variables in return statements
33
+ if parent and isinstance(parent, ast.Return):
34
+ return 'returned_variable'
35
+ else:
36
+ return 'other' # Default to 'other' for unrecognized nodes
37
+
38
+ def create_vector(category, level, location, total_lines, parent_path):
39
+ """Create a 6D vector optimized for role similarity, integrating variable roles into category_id."""
40
+ category_map = {
41
+ 'import': 1, 'function': 2, 'async_function': 3, 'class': 4,
42
+ 'if': 5, 'while': 6, 'for': 7, 'try': 8, 'expression': 9, 'spacer': 10,
43
+ 'other': 11, 'elif': 12, 'else': 13, 'except': 14, 'finally': 15, 'return': 16,
44
+ 'assigned_variable': 17, 'input_variable': 18, 'returned_variable': 19
45
+ }
46
+ category_id = category_map.get(category, 0) # Default to 0 for unknown categories
47
+ start_line, end_line = location
48
+ span = (end_line - start_line + 1) / total_lines
49
+ center_pos = ((start_line + end_line) / 2) / total_lines
50
+ parent_depth = len(parent_path)
51
+ parent_weight = sum(category_map.get(parent.split('[')[0].lower(), 0) * (1 / (i + 1))
52
+ for i, parent in enumerate(parent_path)) / max(1, len(category_map))
53
+ return [category_id, level, center_pos, span, parent_depth, parent_weight]
54
+
55
+ def is_blank_or_comment(line):
56
+ """Check if a line is blank or a comment."""
57
+ stripped = line.strip()
58
+ return not stripped or stripped.startswith('#')
59
+
60
+ def parse_node(node, lines, prev_end, level=0, total_lines=None, parent_path=None, counters=None, processed_lines=None):
61
+ if total_lines is None:
62
+ total_lines = len(lines)
63
+ if parent_path is None:
64
+ parent_path = []
65
+ if counters is None:
66
+ counters = {cat: 0 for cat in ['import', 'function', 'async_function', 'class', 'if', 'while', 'for', 'try', 'return', 'expression', 'other', 'spacer', 'elif', 'else', 'except', 'finally', 'assigned_variable', 'input_variable', 'returned_variable']}
67
+ if processed_lines is None:
68
+ processed_lines = set()
69
+
70
+ parts = []
71
+ start_line = getattr(node, 'lineno', prev_end + 1)
72
+ end_line = getattr(node, 'end_lineno', start_line)
73
+
74
+ # Skip if any lines are already processed
75
+ if any(line in processed_lines for line in range(start_line, end_line + 1)):
76
+ return parts, []
77
+
78
+ # Get category, default to 'other' if None
79
+ category = get_category(node, parent_path[-1] if parent_path else None) or 'other'
80
+ if category not in counters:
81
+ category = 'other'
82
+ counters[category] += 1
83
+ node_id = f"{category.capitalize()}[{counters[category]}]"
84
+
85
+ # Spacer before node (only for blank lines or comments)
86
+ if start_line > prev_end + 1:
87
+ spacer_lines = lines[prev_end:start_line - 1]
88
+ spacer_lines_set = set(range(prev_end + 1, start_line))
89
+ if not spacer_lines_set.issubset(processed_lines):
90
+ for i, line in enumerate(spacer_lines, prev_end + 1):
91
+ if i not in processed_lines and is_blank_or_comment(line):
92
+ counters['spacer'] += 1
93
+ spacer_node_id = f"Spacer[{counters['spacer']}]"
94
+ parts.append({
95
+ 'category': 'spacer',
96
+ 'source': line,
97
+ 'location': (i, i),
98
+ 'level': level,
99
+ 'vector': create_vector('spacer', level, (i, i), total_lines, parent_path),
100
+ 'parent_path': ' -> '.join(parent_path) if parent_path else 'Top-Level',
101
+ 'node_id': spacer_node_id
102
+ })
103
+ processed_lines.add(i)
104
+
105
+ # Current node's header (e.g., 'def', 'if', 'try')
106
+ current_path = parent_path + [node_id]
107
+ if start_line not in processed_lines and not is_blank_or_comment(lines[start_line - 1]):
108
+ parts.append({
109
+ 'category': category,
110
+ 'source': lines[start_line - 1],
111
+ 'location': (start_line, start_line),
112
+ 'level': level,
113
+ 'vector': create_vector(category, level, (start_line, start_line), total_lines, current_path),
114
+ 'parent_path': ' -> '.join(parent_path) if parent_path else 'Top-Level',
115
+ 'node_id': node_id
116
+ })
117
+ processed_lines.add(start_line)
118
+
119
+ # Handle variables in function definitions (input variables)
120
+ category_sequence = [category]
121
+ if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)) and node.args.args:
122
+ for arg in node.args.args:
123
+ var_start = start_line # Assume args are on the same line as function def for simplicity
124
+ if var_start not in processed_lines:
125
+ arg_category = get_category(arg, node) or 'input_variable'
126
+ if arg_category not in counters:
127
+ arg_category = 'input_variable'
128
+ counters[arg_category] += 1
129
+ var_node_id = f"InputVariable[{counters[arg_category]}]"
130
+ parts.append({
131
+ 'category': arg_category,
132
+ 'source': f" {arg.arg},", # Indented as part of function
133
+ 'location': (var_start, var_start),
134
+ 'level': level + 1,
135
+ 'vector': create_vector(arg_category, level + 1, (var_start, var_start), total_lines, current_path),
136
+ 'parent_path': f"{current_path[0]} -> {var_node_id}",
137
+ 'node_id': var_node_id
138
+ })
139
+ processed_lines.add(var_start)
140
+ category_sequence.append(arg_category)
141
+
142
+ # Process nested bodies
143
+ nested_prev_end = start_line
144
+ for attr in ('body', 'orelse', 'handlers', 'finalbody'):
145
+ if hasattr(node, attr) and getattr(node, attr):
146
+ for child in getattr(node, attr):
147
+ child_start = getattr(child, 'lineno', nested_prev_end + 1)
148
+ child_end = getattr(child, 'end_lineno', child_start)
149
+ if not any(line in processed_lines for line in range(child_start, child_end + 1)):
150
+ if attr == 'orelse' and isinstance(node, ast.If) and child_start != start_line:
151
+ sub_category = 'elif' if 'elif' in lines[child_start - 1] else 'else'
152
+ if child_start not in processed_lines and not is_blank_or_comment(lines[child_start - 1]):
153
+ counters[sub_category] += 1
154
+ sub_node_id = f"{sub_category.capitalize()}[{counters[sub_category]}]"
155
+ parts.append({
156
+ 'category': sub_category,
157
+ 'source': lines[child_start - 1],
158
+ 'location': (child_start, child_start),
159
+ 'level': level,
160
+ 'vector': create_vector(sub_category, level, (child_start, child_start), total_lines, current_path),
161
+ 'parent_path': ' -> '.join(parent_path) if parent_path else 'Top-Level',
162
+ 'node_id': sub_node_id
163
+ })
164
+ processed_lines.add(child_start)
165
+ category_sequence.append(sub_category)
166
+ child_parts, child_seq = parse_node(child, lines, child_start, level + 1, total_lines, current_path, counters, processed_lines)
167
+ parts.extend(child_parts)
168
+ category_sequence.extend(child_seq)
169
+ nested_prev_end = max(nested_prev_end, child_parts[-1]['location'][1] if child_parts else child_start)
170
+ elif attr == 'handlers' and isinstance(child, ast.ExceptHandler):
171
+ if child_start not in processed_lines and not is_blank_or_comment(lines[child_start - 1]):
172
+ counters['except'] += 1
173
+ sub_node_id = f"Except[{counters['except']}]"
174
+ parts.append({
175
+ 'category': 'except',
176
+ 'source': lines[child_start - 1],
177
+ 'location': (child_start, child_start),
178
+ 'level': level,
179
+ 'vector': create_vector('except', level, (child_start, child_start), total_lines, current_path),
180
+ 'parent_path': ' -> '.join(parent_path) if parent_path else 'Top-Level',
181
+ 'node_id': sub_node_id
182
+ })
183
+ processed_lines.add(child_start)
184
+ category_sequence.append('except')
185
+ child_parts, child_seq = parse_node(child, lines, child_start, level + 1, total_lines, current_path, counters, processed_lines)
186
+ parts.extend(child_parts)
187
+ category_sequence.extend(child_seq)
188
+ nested_prev_end = max(nested_prev_end, child_parts[-1]['location'][1] if child_parts else child_start)
189
+ elif attr == 'finalbody':
190
+ if child_start not in processed_lines and not is_blank_or_comment(lines[child_start - 1]):
191
+ counters['finally'] += 1
192
+ sub_node_id = f"Finally[{counters['finally']}]"
193
+ parts.append({
194
+ 'category': 'finally',
195
+ 'source': lines[child_start - 1],
196
+ 'location': (child_start, child_start),
197
+ 'level': level,
198
+ 'vector': create_vector('finally', level, (child_start, child_start), total_lines, current_path),
199
+ 'parent_path': ' -> '.join(parent_path) if parent_path else 'Top-Level',
200
+ 'node_id': sub_node_id
201
+ })
202
+ processed_lines.add(child_start)
203
+ category_sequence.append('finally')
204
+ child_parts, child_seq = parse_node(child, lines, child_start, level + 1, total_lines, current_path, counters, processed_lines)
205
+ parts.extend(child_parts)
206
+ category_sequence.extend(child_seq)
207
+ nested_prev_end = max(nested_prev_end, child_parts[-1]['location'][1] if child_parts else child_start)
208
+ else:
209
+ # Handle assignments and returns for variable detection
210
+ if isinstance(child, (ast.Assign, ast.AnnAssign, ast.AugAssign)):
211
+ # Handle different target structures
212
+ if isinstance(child, ast.Assign):
213
+ for target in child.targets:
214
+ if isinstance(target, ast.Name):
215
+ var_start = child.lineno
216
+ if var_start not in processed_lines and not is_blank_or_comment(lines[var_start - 1]):
217
+ counters['assigned_variable'] += 1
218
+ var_node_id = f"AssignedVariable[{counters['assigned_variable']}]"
219
+ parts.append({
220
+ 'category': 'assigned_variable',
221
+ 'source': lines[var_start - 1],
222
+ 'location': (var_start, var_start),
223
+ 'level': level + 1,
224
+ 'vector': create_vector('assigned_variable', level + 1, (var_start, var_start), total_lines, current_path),
225
+ 'parent_path': f"{current_path[0]} -> {var_node_id}",
226
+ 'node_id': var_node_id
227
+ })
228
+ processed_lines.add(var_start)
229
+ category_sequence.append('assigned_variable')
230
+ else: # AnnAssign or AugAssign
231
+ target = child.target
232
+ if isinstance(target, ast.Name):
233
+ var_start = child.lineno
234
+ if var_start not in processed_lines and not is_blank_or_comment(lines[var_start - 1]):
235
+ counters['assigned_variable'] += 1
236
+ var_node_id = f"AssignedVariable[{counters['assigned_variable']}]"
237
+ parts.append({
238
+ 'category': 'assigned_variable',
239
+ 'source': lines[var_start - 1],
240
+ 'location': (var_start, var_start),
241
+ 'level': level + 1,
242
+ 'vector': create_vector('assigned_variable', level + 1, (var_start, var_start), total_lines, current_path),
243
+ 'parent_path': f"{current_path[0]} -> {var_node_id}",
244
+ 'node_id': var_node_id
245
+ })
246
+ processed_lines.add(var_start)
247
+ category_sequence.append('assigned_variable')
248
+ elif isinstance(child, ast.Return):
249
+ for value in ast.walk(child):
250
+ if isinstance(value, ast.Name):
251
+ var_start = child.lineno
252
+ if var_start not in processed_lines and not is_blank_or_comment(lines[var_start - 1]):
253
+ counters['returned_variable'] += 1
254
+ var_node_id = f"ReturnedVariable[{counters['returned_variable']}]"
255
+ parts.append({
256
+ 'category': 'returned_variable',
257
+ 'source': lines[var_start - 1],
258
+ 'location': (var_start, var_start),
259
+ 'level': level + 1,
260
+ 'vector': create_vector('returned_variable', level + 1, (var_start, var_start), total_lines, current_path),
261
+ 'parent_path': f"{current_path[0]} -> {var_node_id}",
262
+ 'node_id': var_node_id
263
+ })
264
+ processed_lines.add(var_start)
265
+ category_sequence.append('returned_variable')
266
+ child_parts, child_seq = parse_node(child, lines, nested_prev_end, level + 1, total_lines, current_path, counters, processed_lines)
267
+ parts.extend(child_parts)
268
+ category_sequence.extend(child_seq)
269
+ nested_prev_end = child_parts[-1]['location'][1] if child_parts else nested_prev_end
270
+
271
+ # Update end_line and source of the parent node if its body extends it
272
+ if nested_prev_end > start_line and start_line not in processed_lines:
273
+ final_end = nested_prev_end
274
+ if start_line not in processed_lines:
275
+ parts[-1]['location'] = (start_line, final_end)
276
+ parts[-1]['source'] = ''.join(lines[start_line - 1:final_end])
277
+ parts[-1]['vector'] = create_vector(category, level, (start_line, final_end), total_lines, current_path)
278
+ processed_lines.update(range(start_line, final_end + 1))
279
+
280
+ return parts, category_sequence
281
+
282
+ def parse_python_code(code):
283
+ lines = code.splitlines(keepends=True)
284
+ total_lines = len(lines)
285
+ try:
286
+ tree = ast.parse(code)
287
+ except SyntaxError:
288
+ return ([{'category': 'error', 'source': 'Invalid Python code', 'location': (1, 1), 'level': 0, 'vector': [0, 0, 1.0, 0.0, 0, 0], 'parent_path': 'Top-Level', 'node_id': 'Error[1]'}], ['error'])
289
+
290
+ parts = []
291
+ prev_end = 0
292
+ processed_lines = set()
293
+ category_sequence = []
294
+
295
+ for stmt in tree.body:
296
+ stmt_parts, stmt_seq = parse_node(stmt, lines, prev_end, total_lines=total_lines, processed_lines=processed_lines)
297
+ parts.extend(stmt_parts)
298
+ category_sequence.extend(stmt_seq)
299
+ prev_end = stmt_parts[-1]['location'][1] if stmt_parts else prev_end
300
+
301
+ if prev_end < total_lines:
302
+ remaining_lines = lines[prev_end:]
303
+ remaining_lines_set = set(range(prev_end + 1, total_lines + 1))
304
+ if not remaining_lines_set.issubset(processed_lines):
305
+ for i, line in enumerate(remaining_lines, prev_end + 1):
306
+ if i not in processed_lines:
307
+ if is_blank_or_comment(line):
308
+ counters = {'spacer': 0}
309
+ counters['spacer'] += 1
310
+ spacer_node_id = f"Spacer[{counters['spacer']}]"
311
+ parts.append({
312
+ 'category': 'spacer',
313
+ 'source': line,
314
+ 'location': (i, i),
315
+ 'level': 0,
316
+ 'vector': create_vector('spacer', 0, (i, i), total_lines, []),
317
+ 'parent_path': 'Top-Level',
318
+ 'node_id': spacer_node_id
319
+ })
320
+ processed_lines.add(i)
321
+ category_sequence.append('spacer')
322
+
323
+ return parts, category_sequence
324
+
325
+ def is_blank_or_comment(line):
326
+ """Check if a line is blank or a comment."""
327
+ stripped = line.strip()
328
+ return not stripped or stripped.startswith('#')