-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathDPI.py
More file actions
92 lines (74 loc) · 3.07 KB
/
DPI.py
File metadata and controls
92 lines (74 loc) · 3.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import pandas as pd
import numpy as np
import sys
def add_counter_label(frame):
"""
Adds the ID column to the panda frame
:param frame: a panda frame
:return: a fixed panda frame with an additional id column
"""
frame_len = len(frame)
frame = frame.assign(ID=np.arange(0, frame_len))
return frame
class ResultsDPI:
"""Class that represents the DPI checks"""
def __init__(self):
self.df_smalltrain = pd.read_csv("CsvFiles/AlgoTest.csv")
self.df_stest = pd.read_csv("CsvFiles/AlgoSmallTest.csv")
self.df_results = pd.read_csv("CsvFiles/Results.csv")
self.min_max = []
def stage2(self):
""" STAGE 2 - MIN <-> MAX [[min,max],[min,max]....,[min,max]] """
grouped = self.df_smalltrain.groupby('num_class')
normal = grouped.get_group(1) # normal
normal_max_list = list(normal.max())
normal_min_list = list(normal[normal > 0].min())
normal_min_list = list(map(int, normal_min_list))
self.min_max = map(list, zip(normal_min_list, normal_max_list))
print("=======================================")
print("STAGE 2")
self.min_max = list(self.min_max)
print(self.min_max)
def stage3(self):
""" STAGE 3 - adding ID column to the test and result panda frames """
self.df_stest = add_counter_label(self.df_stest)
self.df_results = add_counter_label(self.df_results)
print("=======================================")
print("STAGE 3")
print(f"{self.df_stest} \n {self.df_results} ")
def stage4(self):
"""
Stage 4 - DPI , compare between the two lists ID and check if each field is between
the min and max of each attribute
(taking anomaly from results (anomaly = 0))
"""
print("=======================================")
print("STAGE 4")
self.df_stest = self.df_stest[self.df_stest.num_class != 1]
# merges the prediction with their values
check_pd = pd.merge(self.df_stest, self.df_results, on='ID')
check_pd_fix = check_pd.drop(["num_class_x", "num_class_y", "p_type_y", "ID"], axis=1)
check_pd_fix.to_csv("CsvFiles/Check.csv")
print(check_pd_fix)
columns = check_pd_fix.columns.tolist()
sys.stdout = open("CsvFiles/log.txt", "w")
for index, row in check_pd_fix.iterrows():
lst = row.tolist()
print("-------------------------------------------")
print(lst)
for i in range(len(self.min_max) - 1): # -1 so it won't run on the class
if lst[i] != 0:
if lst[i] < self.min_max[i][0]:
print(f"Lower than {self.min_max[i][0]} in {columns[i]}")
elif lst[i] > self.min_max[i][1]:
print(f"Higher than {self.min_max[i][1]} in {columns[i]}")
sys.stdout.close()
def initialize(self):
self.stage2()
self.stage3()
self.stage4()
def main():
dpi = ResultsDPI()
dpi.initialize()
if __name__ == "__main__":
main()