Spaces:
Runtime error
Runtime error
File size: 11,596 Bytes
9c323ee |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 |
"""
The interface to load log datasets. The datasets currently supported include
HDFS and BGL.
Authors:
LogPAI Team
"""
import pandas as pd
import os
import numpy as np
import re
from sklearn.utils import shuffle
from collections import OrderedDict
def _split_data(x_data, y_data=None, train_ratio=0, split_type='uniform'):
if split_type == 'uniform' and y_data is not None:
pos_idx = y_data > 0
x_pos = x_data[pos_idx]
y_pos = y_data[pos_idx]
x_neg = x_data[~pos_idx]
y_neg = y_data[~pos_idx]
train_pos = int(train_ratio * x_pos.shape[0])
train_neg = int(train_ratio * x_neg.shape[0])
x_train = np.hstack([x_pos[0:train_pos], x_neg[0:train_neg]])
y_train = np.hstack([y_pos[0:train_pos], y_neg[0:train_neg]])
x_test = np.hstack([x_pos[train_pos:], x_neg[train_neg:]])
y_test = np.hstack([y_pos[train_pos:], y_neg[train_neg:]])
elif split_type == 'sequential':
num_train = int(train_ratio * x_data.shape[0])
x_train = x_data[0:num_train]
x_test = x_data[num_train:]
if y_data is None:
y_train = None
y_test = None
else:
y_train = y_data[0:num_train]
y_test = y_data[num_train:]
# Random shuffle
indexes = shuffle(np.arange(x_train.shape[0]))
x_train = x_train[indexes]
if y_train is not None:
y_train = y_train[indexes]
return (x_train, y_train), (x_test, y_test)
def load_HDFS(log_file, label_file=None, window='session', train_ratio=0.5, split_type='sequential', save_csv=False, window_size=0):
""" Load HDFS structured log into train and test data
Arguments
---------
log_file: str, the file path of structured log.
label_file: str, the file path of anomaly labels, None for unlabeled data
window: str, the window options including `session` (default).
train_ratio: float, the ratio of training data for train/test split.
split_type: `uniform` or `sequential`, which determines how to split dataset. `uniform` means
to split positive samples and negative samples equally when setting label_file. `sequential`
means to split the data sequentially without label_file. That is, the first part is for training,
while the second part is for testing.
Returns
-------
(x_train, y_train): the training data
(x_test, y_test): the testing data
"""
print('====== Input data summary ======')
if log_file.endswith('.npz'):
# Split training and validation set in a class-uniform way
data = np.load(log_file)
x_data = data['x_data']
y_data = data['y_data']
(x_train, y_train), (x_test, y_test) = _split_data(x_data, y_data, train_ratio, split_type)
elif log_file.endswith('.csv'):
assert window == 'session', "Only window=session is supported for HDFS dataset."
print("Loading", log_file)
struct_log = pd.read_csv(log_file, engine='c',
na_filter=False, memory_map=True)
data_dict = OrderedDict()
for idx, row in struct_log.iterrows():
blkId_list = re.findall(r'(blk_-?\d+)', row['Content'])
blkId_set = set(blkId_list)
for blk_Id in blkId_set:
if not blk_Id in data_dict:
data_dict[blk_Id] = []
data_dict[blk_Id].append(row['EventId'])
data_df = pd.DataFrame(list(data_dict.items()), columns=['BlockId', 'EventSequence'])
if label_file:
# Split training and validation set in a class-uniform way
label_data = pd.read_csv(label_file, engine='c', na_filter=False, memory_map=True)
label_data = label_data.set_index('BlockId')
label_dict = label_data['Label'].to_dict()
data_df['Label'] = data_df['BlockId'].apply(lambda x: 1 if label_dict[x] == 'Anomaly' else 0)
# Split train and test data
(x_train, y_train), (x_test, y_test) = _split_data(data_df['EventSequence'].values,
data_df['Label'].values, train_ratio, split_type)
print(y_train.sum(), y_test.sum())
if save_csv:
data_df.to_csv('data_instances.csv', index=False)
if window_size > 0:
x_train, window_y_train, y_train = slice_hdfs(x_train, y_train, window_size)
x_test, window_y_test, y_test = slice_hdfs(x_test, y_test, window_size)
log = "{} {} windows ({}/{} anomaly), {}/{} normal"
print(log.format("Train:", x_train.shape[0], y_train.sum(), y_train.shape[0], (1-y_train).sum(), y_train.shape[0]))
print(log.format("Test:", x_test.shape[0], y_test.sum(), y_test.shape[0], (1-y_test).sum(), y_test.shape[0]))
return (x_train, window_y_train, y_train), (x_test, window_y_test, y_test)
if label_file is None:
if split_type == 'uniform':
split_type = 'sequential'
print('Warning: Only split_type=sequential is supported \
if label_file=None.'.format(split_type))
# Split training and validation set sequentially
x_data = data_df['EventSequence'].values
(x_train, _), (x_test, _) = _split_data(x_data, train_ratio=train_ratio, split_type=split_type)
print('Total: {} instances, train: {} instances, test: {} instances'.format(
x_data.shape[0], x_train.shape[0], x_test.shape[0]))
return (x_train, None), (x_test, None), data_df
else:
raise NotImplementedError('load_HDFS() only support csv and npz files!')
num_train = x_train.shape[0]
num_test = x_test.shape[0]
num_total = num_train + num_test
num_train_pos = sum(y_train)
num_test_pos = sum(y_test)
num_pos = num_train_pos + num_test_pos
print('Total: {} instances, {} anomaly, {} normal' \
.format(num_total, num_pos, num_total - num_pos))
print('Train: {} instances, {} anomaly, {} normal' \
.format(num_train, num_train_pos, num_train - num_train_pos))
print('Test: {} instances, {} anomaly, {} normal\n' \
.format(num_test, num_test_pos, num_test - num_test_pos))
return (x_train, y_train), (x_test, y_test)
def slice_hdfs(x, y, window_size):
results_data = []
print("Slicing {} sessions, with window {}".format(x.shape[0], window_size))
for idx, sequence in enumerate(x):
seqlen = len(sequence)
i = 0
while (i + window_size) < seqlen:
slice = sequence[i: i + window_size]
results_data.append([idx, slice, sequence[i + window_size], y[idx]])
i += 1
else:
slice = sequence[i: i + window_size]
slice += ["#Pad"] * (window_size - len(slice))
results_data.append([idx, slice, "#Pad", y[idx]])
results_df = pd.DataFrame(results_data, columns=["SessionId", "EventSequence", "Label", "SessionLabel"])
print("Slicing done, {} windows generated".format(results_df.shape[0]))
return results_df[["SessionId", "EventSequence"]], results_df["Label"], results_df["SessionLabel"]
def load_BGL(log_file, label_file=None, window='sliding', time_interval=60, stepping_size=60,
train_ratio=0.8):
""" TODO
"""
def bgl_preprocess_data(para, raw_data, event_mapping_data):
""" split logs into sliding windows, built an event count matrix and get the corresponding label
Args:
--------
para: the parameters dictionary
raw_data: list of (label, time)
event_mapping_data: a list of event index, where each row index indicates a corresponding log
Returns:
--------
event_count_matrix: event count matrix, where each row is an instance (log sequence vector)
labels: a list of labels, 1 represents anomaly
"""
# create the directory for saving the sliding windows (start_index, end_index), which can be directly loaded in future running
if not os.path.exists(para['save_path']):
os.mkdir(para['save_path'])
log_size = raw_data.shape[0]
sliding_file_path = para['save_path']+'sliding_'+str(para['window_size'])+'h_'+str(para['step_size'])+'h.csv'
#=============divide into sliding windows=========#
start_end_index_list = [] # list of tuples, tuple contains two number, which represent the start and end of sliding time window
label_data, time_data = raw_data[:,0], raw_data[:, 1]
if not os.path.exists(sliding_file_path):
# split into sliding window
start_time = time_data[0]
start_index = 0
end_index = 0
# get the first start, end index, end time
for cur_time in time_data:
if cur_time < start_time + para['window_size']*3600:
end_index += 1
end_time = cur_time
else:
start_end_pair=tuple((start_index,end_index))
start_end_index_list.append(start_end_pair)
break
# move the start and end index until next sliding window
while end_index < log_size:
start_time = start_time + para['step_size']*3600
end_time = end_time + para['step_size']*3600
for i in range(start_index,end_index):
if time_data[i] < start_time:
i+=1
else:
break
for j in range(end_index, log_size):
if time_data[j] < end_time:
j+=1
else:
break
start_index = i
end_index = j
start_end_pair = tuple((start_index, end_index))
start_end_index_list.append(start_end_pair)
inst_number = len(start_end_index_list)
print('there are %d instances (sliding windows) in this dataset\n'%inst_number)
np.savetxt(sliding_file_path,start_end_index_list,delimiter=',',fmt='%d')
else:
print('Loading start_end_index_list from file')
start_end_index_list = pd.read_csv(sliding_file_path, header=None).values
inst_number = len(start_end_index_list)
print('there are %d instances (sliding windows) in this dataset' % inst_number)
# get all the log indexes in each time window by ranging from start_index to end_index
expanded_indexes_list=[]
for t in range(inst_number):
index_list = []
expanded_indexes_list.append(index_list)
for i in range(inst_number):
start_index = start_end_index_list[i][0]
end_index = start_end_index_list[i][1]
for l in range(start_index, end_index):
expanded_indexes_list[i].append(l)
event_mapping_data = [row[0] for row in event_mapping_data]
event_num = len(list(set(event_mapping_data)))
print('There are %d log events'%event_num)
#=============get labels and event count of each sliding window =========#
labels = []
event_count_matrix = np.zeros((inst_number,event_num))
for j in range(inst_number):
label = 0 #0 represent success, 1 represent failure
for k in expanded_indexes_list[j]:
event_index = event_mapping_data[k]
event_count_matrix[j, event_index] += 1
if label_data[k]:
label = 1
continue
labels.append(label)
assert inst_number == len(labels)
print("Among all instances, %d are anomalies"%sum(labels))
assert event_count_matrix.shape[0] == len(labels)
return event_count_matrix, labels
|