import os import re import time import random import zipfile import javalang import torch import torch.nn as nn import torch.nn.functional as F import torch_geometric from torch_geometric.data import Data, Dataset, DataLoader from sklearn.model_selection import train_test_split from sklearn.metrics import precision_recall_fscore_support from tqdm import tqdm import networkx as nx # ---- Utility functions ---- def unzip_dataset(zip_path, extract_to): with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(extract_to) def normalize_java_code(code): # Remove single-line comments code = re.sub(r'//.*?\n', '', code) # Remove multi-line comments code = re.sub(r'/\*.*?\*/', '', code, flags=re.DOTALL) # Remove extra spaces and blank lines code = re.sub(r'\s+', ' ', code) return code.strip() def safe_parse_java(code): try: tokens = list(javalang.tokenizer.tokenize(code)) parser = javalang.parser.Parser(tokens) tree = parser.parse() return tree except Exception: return None def ast_to_graph(ast): graph = nx.DiGraph() def dfs(node, parent_id=None): node_id = len(graph) graph.add_node(node_id, label=type(node).__name__) if parent_id is not None: graph.add_edge(parent_id, node_id) for child in getattr(node, 'children', []): if isinstance(child, (list, tuple)): for item in child: if isinstance(item, javalang.ast.Node): dfs(item, node_id) elif isinstance(child, javalang.ast.Node): dfs(child, node_id) dfs(ast) return graph def tokenize_java_code(code): try: tokens = list(javalang.tokenizer.tokenize(code)) token_list = [token.value for token in tokens] return token_list except: return [] # ---- Data Preprocessing ---- class CloneDataset(Dataset): def __init__(self, root_dir, transform=None): super().__init__() self.data_list = [] self.labels = [] self.skipped_files = 0 self.max_tokens = 5000 clone_dirs = { "Clone_Type1": 1, "Clone_Type2": 1, "Clone_Type3 - ST": 1, "Clone_Type3 - VST": 1, "Clone_Type3 - MT": 0 # Assuming MT = Not Clone } for clone_type, label in clone_dirs.items(): clone_path = os.path.join(root_dir, 'Subject_CloneTypes_Directories', clone_type) for root, _, files in os.walk(clone_path): for file in files: if file.endswith(".java"): file_path = os.path.join(root, file) with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: code = f.read() code = normalize_java_code(code) if len(code.split()) > self.max_tokens: self.skipped_files += 1 continue ast = safe_parse_java(code) if ast is None: self.skipped_files += 1 continue graph = ast_to_graph(ast) tokens = tokenize_java_code(code) if not tokens: self.skipped_files += 1 continue data = { 'graph': graph, 'tokens': tokens, 'label': label } self.data_list.append(data) def len(self): return len(self.data_list) def get(self, idx): data_item = self.data_list[idx] graph = data_item['graph'] tokens = data_item['tokens'] label = data_item['label'] # Graph processing edge_index = torch.tensor(list(graph.edges)).t().contiguous() node_features = torch.arange(graph.number_of_nodes()).unsqueeze(1).float() # Token processing token_indices = torch.tensor([hash(t) % 5000 for t in tokens], dtype=torch.long) return edge_index, node_features, token_indices, torch.tensor(label, dtype=torch.long) # ---- Models ---- class GNNEncoder(nn.Module): def __init__(self, in_channels=1, hidden_dim=64): super().__init__() self.conv1 = torch_geometric.nn.GCNConv(in_channels, hidden_dim) self.conv2 = torch_geometric.nn.GCNConv(hidden_dim, hidden_dim) def forward(self, x, edge_index): x = self.conv1(x, edge_index) x = F.relu(x) x = self.conv2(x, edge_index) x = F.relu(x) return torch.mean(x, dim=0) # Graph-level embedding class RNNEncoder(nn.Module): def __init__(self, vocab_size=5000, embedding_dim=64, hidden_dim=64): super().__init__() self.embedding = nn.Embedding(vocab_size, embedding_dim) self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True) def forward(self, tokens): embeds = self.embedding(tokens) _, (hidden, _) = self.lstm(embeds) return hidden.squeeze(0) class HybridClassifier(nn.Module): def __init__(self): super().__init__() self.gnn = GNNEncoder() self.rnn = RNNEncoder() self.fc = nn.Linear(128, 2) def forward(self, edge_index, node_features, tokens): gnn_out = self.gnn(node_features, edge_index) rnn_out = self.rnn(tokens) combined = torch.cat([gnn_out, rnn_out], dim=-1) out = self.fc(combined) return out # ---- Training and Evaluation ---- def train(model, optimizer, loader, device): model.train() total_loss = 0 for edge_index, node_features, tokens, labels in loader: edge_index = edge_index.to(device) node_features = node_features.to(device) tokens = tokens.to(device) labels = labels.to(device) optimizer.zero_grad() outputs = model(edge_index, node_features, tokens) loss = F.cross_entropy(outputs.unsqueeze(0), labels.unsqueeze(0)) loss.backward() optimizer.step() total_loss += loss.item() return total_loss / len(loader) def evaluate(model, loader, device): model.eval() preds, labels_all = [], [] with torch.no_grad(): for edge_index, node_features, tokens, labels in loader: edge_index = edge_index.to(device) node_features = node_features.to(device) tokens = tokens.to(device) labels = labels.to(device) outputs = model(edge_index, node_features, tokens) pred = outputs.argmax(dim=-1) preds.append(pred.cpu().numpy()) labels_all.append(labels.cpu().numpy()) preds = np.concatenate(preds) labels_all = np.concatenate(labels_all) precision, recall, f1, _ = precision_recall_fscore_support(labels_all, preds, average='binary') return precision, recall, f1 # ---- Main Execution ---- if __name__ == "__main__": import numpy as np dataset_root = '/content/dataset/archive (1)' unzip_dataset('/content/dataset/archive (1).zip', dataset_root) dataset = CloneDataset(dataset_root) print(f"Total valid samples: {dataset.len()}") print(f"Total skipped files: {dataset.skipped_files}") indices = list(range(dataset.len())) train_idx, temp_idx = train_test_split(indices, test_size=0.2, random_state=42) val_idx, test_idx = train_test_split(temp_idx, test_size=0.5, random_state=42) train_set = torch.utils.data.Subset(dataset, train_idx) val_set = torch.utils.data.Subset(dataset, val_idx) test_set = torch.utils.data.Subset(dataset, test_idx) batch_size = 1 # small because of variable graph sizes train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True) val_loader = DataLoader(val_set, batch_size=batch_size) test_loader = DataLoader(test_set, batch_size=batch_size) device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') model = HybridClassifier().to(device) optimizer = torch.optim.Adam(model.parameters(), lr=0.001) epochs = 5 start_time = time.time() for epoch in range(epochs): train_loss = train(model, optimizer, train_loader, device) precision, recall, f1 = evaluate(model, val_loader, device) print(f"Epoch {epoch+1}: Loss={train_loss:.4f}, Precision={precision:.4f}, Recall={recall:.4f}, F1={f1:.4f}") precision, recall, f1 = evaluate(model, test_loader, device) total_time = time.time() - start_time print(f"Test Precision: {precision:.4f}, Recall: {recall:.4f}, F1: {f1:.4f}") print(f"Total execution time: {total_time:.2f} seconds")