2
2
from scipy .stats import betabinom
3
3
import numpy as np
4
4
from csle_tolerance .dao .intrusion_recovery_pomdp_config import IntrusionRecoveryPomdpConfig
5
+ from csle_tolerance .dao .intrusion_recovery_game_config import IntrusionRecoveryGameConfig
5
6
6
7
7
8
class IntrusionRecoveryPomdpUtil :
@@ -26,7 +27,7 @@ def initial_belief(p_a: float) -> List[float]:
26
27
:param p_a: the attack probability
27
28
:return: the initial belief state
28
29
"""
29
- return [1 - p_a , p_a , 0 ]
30
+ return [1 , 0 , 0 ]
30
31
31
32
@staticmethod
32
33
def action_space () -> List [int ]:
@@ -165,6 +166,33 @@ def transition_function(s: int, s_prime: int, a: int, p_a: float, p_c_1: float,
165
166
else :
166
167
return 0
167
168
169
+ @staticmethod
170
+ def transition_function_game (s : int , s_prime : int , a1 : int , a2 : int , p_a : float , p_c_1 : float ) -> float :
171
+ """
172
+ The transition function of the POSG
173
+
174
+ :param s: the state
175
+ :param s_prime: the next state
176
+ :param a1: the defender action
177
+ :param a2: the attacker action
178
+ :param p_a: the intrusion probability
179
+ :param p_c_1: the crash probability
180
+ :return: P(s_prime | s, a1, a2)
181
+ """
182
+ if s == 2 and s_prime == 2 :
183
+ return 1.0
184
+ elif s_prime == 2 and s in [0 , 1 ]:
185
+ return p_c_1
186
+ elif s_prime == 0 and a1 == 0 and a2 == 1 and s == 0 :
187
+ return (1 - p_a ) * (1 - p_c_1 )
188
+ elif (s_prime == 0 and a2 == 0 and s == 0 ) or (s_prime == 0 and s == 1 and a1 == 1 ) \
189
+ or (s_prime == 1 and s == 1 and a1 == 0 ):
190
+ return (1 - p_c_1 )
191
+ elif (s_prime == 1 and s == 0 and a2 == 1 ):
192
+ return (1 - p_c_1 ) * p_a
193
+ else :
194
+ return 0
195
+
168
196
@staticmethod
169
197
def transition_tensor (states : List [int ], actions : List [int ], p_a : float , p_c_1 : float , p_c_2 : float , p_u : float ) \
170
198
-> List [List [List [float ]]]:
@@ -187,10 +215,39 @@ def transition_tensor(states: List[int], actions: List[int], p_a: float, p_c_1:
187
215
for s_prime in states :
188
216
s_a_transitions .append (IntrusionRecoveryPomdpUtil .transition_function (
189
217
s = s , s_prime = s_prime , a = a , p_a = p_a , p_c_1 = p_c_1 , p_c_2 = p_c_2 , p_u = p_u ))
218
+ assert round (sum (s_a_transitions ), 2 ) == 1.0
190
219
a_transitions .append (s_a_transitions )
191
220
transition_tensor .append (a_transitions )
192
221
return transition_tensor
193
222
223
+ @staticmethod
224
+ def transition_tensor_game (states : List [int ], defender_actions : List [int ], attacker_actions : List [int ],
225
+ p_a : float , p_c_1 : float ) -> List [List [List [List [float ]]]]:
226
+ """
227
+ Creates a |A|x|A|x|S|x|S| tensor with the transition probabilities of the POSG
228
+
229
+ :param states: the list of states
230
+ :param defender_actions: the list of defender actions
231
+ :param attacker_actions: the list of attacker actions
232
+ :param p_a: the intrusion probability
233
+ :param p_c_1: the crash probability
234
+ :return: the transition tensor
235
+ """
236
+ transition_tensor = []
237
+ for a1 in defender_actions :
238
+ a1_transitions = []
239
+ for a2 in attacker_actions :
240
+ a2_transitions = []
241
+ for s in states :
242
+ s_a1_a2_transitions = []
243
+ for s_prime in states :
244
+ s_a1_a2_transitions .append (IntrusionRecoveryPomdpUtil .transition_function_game (
245
+ s = s , s_prime = s_prime , a1 = a1 , a2 = a2 , p_a = p_a , p_c_1 = p_c_1 ))
246
+ a2_transitions .append (s_a1_a2_transitions )
247
+ a1_transitions .append (a2_transitions )
248
+ transition_tensor .append (a1_transitions )
249
+ return transition_tensor
250
+
194
251
@staticmethod
195
252
def sample_initial_state (b1 : List [float ]) -> int :
196
253
"""
@@ -217,6 +274,20 @@ def sample_next_observation(observation_tensor: List[List[float]], s_prime: int,
217
274
o = np .random .choice (np .arange (0 , len (observations )), p = observation_probs )
218
275
return int (o )
219
276
277
+ @staticmethod
278
+ def sample_next_state_game (transition_tensor : List [List [List [List [float ]]]], s : int , a1 : int , a2 : int ) -> int :
279
+ """
280
+ Samples the next observation
281
+
282
+ :param s: the current state
283
+ :param a1: the defender action
284
+ :param a2: the attacker action
285
+ :param transition_tensor: the transition tensor
286
+ :return: the next state a
287
+ """
288
+ s_prime = np .random .choice (np .arange (0 , len (transition_tensor [a1 ][a2 ][s ])), p = transition_tensor [a1 ][a2 ][s ])
289
+ return int (s_prime )
290
+
220
291
@staticmethod
221
292
def bayes_filter (s_prime : int , o : int , a : int , b : List [float ], states : List [int ], observations : List [int ],
222
293
observation_tensor : List [List [float ]], transition_tensor : List [List [List [float ]]]) -> float :
@@ -342,3 +413,92 @@ def pomdp_solver_file(config: IntrusionRecoveryPomdpConfig) -> str:
342
413
c = config .cost_tensor [a ][s ]
343
414
file_str = file_str + f"R: { a } : { s } : { s_prime } : { o } { c :.80f} \n "
344
415
return file_str
416
+
417
+ @staticmethod
418
+ def generate_transitions (game_config : IntrusionRecoveryGameConfig ) -> List [str ]:
419
+ """
420
+ Generates the transition rows of the POSG config file of HSVI
421
+
422
+ :param game_config: the game configuration
423
+ :return: list of transition rows
424
+ """
425
+ transitions = []
426
+ for s in game_config .states :
427
+ for a1 in game_config .actions :
428
+ for a2 in game_config .actions :
429
+ for s_prime in game_config .states :
430
+ for i , _ in enumerate (game_config .observations ):
431
+ tr_prob = game_config .transition_tensor [a1 ][a2 ][s ][s_prime ]
432
+ obs_prob = game_config .observation_tensor [a2 ][i ]
433
+ prob = tr_prob * obs_prob
434
+ if prob > 0 :
435
+ transition = f"{ s } { a1 } { a2 } { i } { s_prime } { prob } "
436
+ transitions .append (transition )
437
+
438
+ return transitions
439
+
440
+ @staticmethod
441
+ def generate_rewards (game_config : IntrusionRecoveryGameConfig ) -> List [str ]:
442
+ """
443
+ Generates the reward rows of the POSG config file of HSVI
444
+
445
+ :param game_config: the game configuration
446
+ :return: list of reward rows
447
+ """
448
+ rewards = []
449
+ for s in game_config .states :
450
+ for a1 in game_config .actions :
451
+ for a2 in game_config .actions :
452
+ r = - game_config .cost_tensor [a1 ][s ]
453
+ if r != 0 :
454
+ rew = f"{ s } { a1 } { a2 } { r } "
455
+ rewards .append (rew )
456
+ return rewards
457
+
458
+ @staticmethod
459
+ def generate_os_posg_game_file (game_config : IntrusionRecoveryGameConfig ) -> str :
460
+ """
461
+ Generates the POSG game file for HSVI
462
+
463
+ :param game_config: the game configuration
464
+ :return: a string with the contents of the config file
465
+ """
466
+ num_partitions = 1
467
+ transitions = IntrusionRecoveryPomdpUtil .generate_transitions (game_config = game_config )
468
+ rewards = IntrusionRecoveryPomdpUtil .generate_rewards (game_config = game_config )
469
+ game_description = f"{ len (game_config .states )} { num_partitions } { len (game_config .actions )} " \
470
+ f"{ len (game_config .actions )} " \
471
+ f"{ len (game_config .observations )} { len (transitions )} " \
472
+ f"{ len (rewards )} { game_config .discount_factor } "
473
+ state_desriptions = []
474
+ for s in game_config .states :
475
+ state_desriptions .append (f"{ s } { 0 } " )
476
+ player_1_actions = ["WAIT" , "RECOVER" ]
477
+ player_2_actions = ["FALSEALARM" , "ATTACK" ]
478
+
479
+ player_2_legal_actions = []
480
+ for _ in game_config .states :
481
+ player_2_legal_actions .append (" " .join (list (map (lambda x : str (x ), game_config .actions ))))
482
+
483
+ player_1_legal_actions = []
484
+ player_1_legal_actions .append (" " .join (list (map (lambda x : str (x ), game_config .actions ))))
485
+
486
+ obs_desriptions = []
487
+ for i , o in enumerate (game_config .observations ):
488
+ obs_desriptions .append (f"o_{ o } " )
489
+
490
+ initial_belief_str = f"{ 0 } { ' ' .join (list (map (lambda x : str (x ), game_config .b1 )))} "
491
+ game_file_str = ""
492
+ game_file_str = game_file_str + game_description + "\n "
493
+ game_file_str = game_file_str + "\n " .join (state_desriptions ) + "\n "
494
+ game_file_str = game_file_str + "\n " .join (player_1_actions ) + "\n "
495
+ game_file_str = game_file_str + "\n " .join (player_2_actions ) + "\n "
496
+ game_file_str = game_file_str + "\n " .join (obs_desriptions ) + "\n "
497
+ game_file_str = game_file_str + "\n " .join (player_2_legal_actions ) + "\n "
498
+ game_file_str = game_file_str + "\n " .join (player_1_legal_actions ) + "\n "
499
+ game_file_str = game_file_str + "\n " .join (transitions ) + "\n "
500
+ game_file_str = game_file_str + "\n " .join (rewards ) + "\n "
501
+ game_file_str = game_file_str + initial_belief_str
502
+ with open ('recovery_game.txt' , 'w' ) as f :
503
+ f .write (game_file_str )
504
+ return game_file_str
0 commit comments