-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
196 lines (157 loc) · 8.69 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
from Data_Preprocess import *
from DCkNN import *
from matplotlib import pyplot as plt
from tabulate import tabulate
from sklearn.metrics import roc_curve, roc_auc_score, confusion_matrix
import numpy as np
def load_activation_dataloaders(calc_activations: bool,
calc_anomal_activations: bool,
anomalous_class: str,
regular_class: str,
train_size: int,
test_size: int,
one_vs_other: bool,
use_retinal_dataset: bool,
*args: int
) -> Tuple[ActivationDataset, ActivationDataset, ActivationDataset]:
"""
takes the original regular and anomalous dataloaders and converts them to dataloaders of activations
:param calc_activations: True iff you wish to calculate the regular class activations
:param calc_anomal_activations: True iff you wish to calculate the anomaly class activations
:param anomalous_class: anomaly class name
:param regular_class: regular class name
:param train_size: the size we wish to use as training ( = subset of the original training size)
:return: three activation dataloaders for each dataset
"""
reg_train_loader, reg_test_loader, anomal_test_loader = load_dataloaders(train_size, test_size, anomalous_class,
regular_class, one_vs_other,
use_retinal_dataset, args)
if calc_activations:
calculate_activations_and_save(reg_train_loader, 'train', regular_class, False)
calculate_activations_and_save(reg_test_loader, 'test', regular_class, False)
if calc_anomal_activations: # the anomalous is only used for testing
calculate_activations_and_save(anomal_test_loader, 'test', anomalous_class, True)
reg_train_activations_loader = ActivationDataset(regular_class, 'train', False)
reg_test_activations_loader = ActivationDataset(regular_class, 'test', False)
anomal_test_activations_loader = ActivationDataset(anomalous_class, 'test', True)
return reg_train_activations_loader, reg_test_activations_loader, anomal_test_activations_loader
def visualize_results(anomal_class: str, reg_class: str, k: int):
anomal_act, regular_act = load_activations(anomal_class, reg_class, True)
for name, anomal, regular in zip(anomal_act, anomal_act.values(), regular_act.values()):
anomal = anomal.cpu().numpy()
regular = regular.cpu().numpy()
plt.scatter(range(len(anomal)), anomal, s=0.3)
plt.scatter(range(len(regular)), regular, s=0.3)
plt.title(name)
plt.ylabel(f"Mean kNN distance from k={k} neighbours")
plt.xlabel("# Sample")
plt.legend([reg_class, anomal_class])
plt.show()
def load_activations(anomal_class, reg_class, to_cpu: bool):
anomal_act, regular_act = {}, {}
for act in [f"layer{j}_block{i}" for j in [1, 2, 3, 4] for i in [0, 1]] + ['avgpool']:
for suffix in ["regular", "anomal"]:
if suffix == "regular":
val = torch.load(f"predictions/{act}_{reg_class}_{suffix}")
if to_cpu:
regular_act[act] = val.cpu()
else:
regular_act[act] = val
elif suffix == "anomal":
val = torch.load(f"predictions/{act}_{anomal_class}_{suffix}")
if to_cpu:
anomal_act[act] = val.cpu()
else:
anomal_act[act] = val
return anomal_act, regular_act
def get_roc_curve(anomal_class: str, reg_class: str, use_knn_sum: bool):
anomal_act, regular_act = load_activations(anomal_class, reg_class, True)
anomal_size, regular_size = anomal_act['layer1_block0'].shape, regular_act['layer1_block0'].shape
# 1 for anomal, 0 for regular
y_true = torch.cat((torch.ones(anomal_size), torch.zeros(regular_size)))
layers_knn_dict = {}
for layer_name in [f"layer{j}_block{i}" for j in [1, 2, 3, 4] for i in [0, 1]] + ['avgpool']:
knn_dist = torch.cat((anomal_act[layer_name], regular_act[layer_name]))
layers_knn_dict[layer_name] = knn_dist
if use_knn_sum:
final_knn = 0
for knn in layers_knn_dict.values():
final_knn += knn
final_fpr, final_tpr, final_thres = roc_curve(y_true, final_knn)
best_threshold = final_thres[np.argmin(final_fpr ** 2 + (1 - final_tpr) ** 2)]
y_pred = torch.where(final_knn <= best_threshold, 0, 1)
print(f"ROC score = {roc_auc_score(y_true, y_pred)}")
return
prediction_dict = {}
tpr_dict = {}
fpr_dict = {}
for layer_name, knn in layers_knn_dict.items():
# sklearn roc_curve can take the distances themselves (and not the predictions)
fpr, tpr, thresholds = roc_curve(y_true, knn)
# Now, we know that the best point on the TPR-vs-FPR curve is (x,y) = (FPR,TPR) = (0,1)
# we will chose the threshold that corresponds to the point that is closest to (0,1), i.e the point (x,y) for
# which sqrt(x^2+(y-1)^2) is minimal, and use this to extract the threshold (taking sqrt is redundant):
best_threshold = thresholds[np.argmin(fpr ** 2 + (1 - tpr) ** 2)]
# based on the threshold we generate a prediction - 1 = anomal, 0 = regular:
prediction = torch.where(knn <= best_threshold, 0, 1)
prediction_dict[layer_name] = prediction
# we also re calculate the roc curve based on the best threshold for each layer
fpr, tpr, thresholds = roc_curve(y_true, prediction)
tpr_dict[layer_name] = tpr
fpr_dict[layer_name] = fpr
# finally we take a makority vote of all predictions
final_prediction = majority_vote(prediction_dict)
final_fpr, final_tpr, final_thresholds = roc_curve(y_true, final_prediction)
# we now plot all our results where x axis = fpr, y axis = tpr
for curr_fpr, curr_tpr in zip(fpr_dict.values(), tpr_dict.values()):
plt.plot(curr_fpr, curr_tpr)
plt.plot(final_fpr, final_tpr)
plt.xlabel("FPR"), plt.ylabel("TPR"), plt.title("ROC Curve")
plt.legend([f"{layer_name} ({roc_auc_score(y_true, curr_pred):.4f})" for layer_name, curr_pred in
prediction_dict.items()] + [f'Final pred ({roc_auc_score(y_true, final_prediction):.4f})'])
plt.show()
for layer_name, pred in prediction_dict.items():
print(layer_name)
print(confusion_matrix(y_true, pred))
print('Final')
print(confusion_matrix(y_true, final_prediction))
def main(args):
k = 2
train_size = 1000
test_size = 1000
regular_class = 'mnistcls'
anomalous_class = 'mnist'
use_mvtec_dataset = False
calc_reg_activation = True
calc_anomal_activation = True
use_one_vs_other = True
args = args if use_one_vs_other else None
calculate_knn = True
calculate_knn_anomalous = True
visualize = True
do_ROC = True
print(" Running with the following setup:")
print(tabulate([['Regular Data', regular_class],
['Anomalous Data', anomalous_class],
['One-vs-Other (for CIFAR10 only)', use_one_vs_other],
['Calculating Regular Activations', calc_reg_activation],
['Calculating Anomalous Activations', calc_anomal_activation],
['Calculating Regular kNN', calculate_knn],
['Calculating Anomalous kNN', calculate_knn_anomalous],
['Visualizing Data', visualize],
['Plotting ROC', do_ROC],
['Number of neighbours (k)', k],
['Regular class instance ', f"class #{args}"],
['Train Size', train_size],
['Test size', test_size]],
tablefmt="fancy_grid"))
reg_train, reg_test, anomal_test = load_activation_dataloaders(calc_reg_activation, calc_anomal_activation,
anomalous_class, regular_class, train_size,
test_size, use_one_vs_other, use_mvtec_dataset,
args)
if calculate_knn: committee_kNN_from_all_files(k, reg_train, reg_test, regular_class, False)
if calculate_knn_anomalous: committee_kNN_from_all_files(k, reg_train, anomal_test, anomalous_class, True)
if visualize: visualize_results(anomalous_class, regular_class, k)
if do_ROC: get_roc_curve(anomalous_class, regular_class, True)
if __name__ == '__main__':
main(3)