1
1
from typing import List
2
2
from scipy .stats import betabinom
3
3
import numpy as np
4
- from csle_tolerance .dao .intrusion_recovery_pomdp_config import (
5
- IntrusionRecoveryPomdpConfig ,
6
- )
7
- from csle_tolerance .dao .intrusion_recovery_game_config import (
8
- IntrusionRecoveryGameConfig ,
9
- )
4
+ from csle_tolerance .dao .intrusion_recovery_pomdp_config import IntrusionRecoveryPomdpConfig
5
+ from csle_tolerance .dao .intrusion_recovery_game_config import IntrusionRecoveryGameConfig
10
6
11
7
12
8
class IntrusionRecoveryPomdpUtil :
@@ -197,9 +193,9 @@ def transition_function_game(s: int, s_prime: int, a1: int, a2: int, p_a: float,
197
193
elif s_prime == 0 and a1 == 0 and a2 == 1 and s == 0 :
198
194
return (1 - p_a ) * (1 - p_c_1 )
199
195
elif (
200
- (s_prime == 0 and a2 == 0 and s == 0 )
201
- or (s_prime == 0 and s == 1 and a1 == 1 )
202
- or (s_prime == 1 and s == 1 and a1 == 0 )
196
+ (s_prime == 0 and a2 == 0 and s == 0 )
197
+ or (s_prime == 0 and s == 1 and a1 == 1 )
198
+ or (s_prime == 1 and s == 1 and a1 == 0 )
203
199
):
204
200
return 1 - p_c_1
205
201
elif s_prime == 1 and s == 0 and a2 == 1 :
@@ -344,7 +340,7 @@ def bayes_filter(s_prime: int, o: int, a: int, b: List[float], states: List[int]
344
340
345
341
for s in states :
346
342
temp += (
347
- observation_tensor [s_prime ][o ] * transition_tensor [a ][s ][s_prime ] * b [s ]
343
+ observation_tensor [s_prime ][o ] * transition_tensor [a ][s ][s_prime ] * b [s ]
348
344
)
349
345
b_prime_s_prime = temp / norm
350
346
if round (b_prime_s_prime , 2 ) > 1 :
@@ -372,9 +368,9 @@ def p_o_given_b_a1_a2(o: int, b: List[float], a: int, states: List[int], transit
372
368
for s in states :
373
369
for s_prime in states :
374
370
prob += (
375
- b [s ]
376
- * transition_tensor [a ][s ][s_prime ]
377
- * observation_tensor [s_prime ][o ]
371
+ b [s ]
372
+ * transition_tensor [a ][s ][s_prime ]
373
+ * observation_tensor [s_prime ][o ]
378
374
)
379
375
assert prob < 1
380
376
return prob
@@ -447,7 +443,7 @@ def pomdp_solver_file(config: IntrusionRecoveryPomdpConfig) -> str:
447
443
for o in config .observations :
448
444
c = config .cost_tensor [a ][s ]
449
445
file_str = (
450
- file_str + f"R: { a } : { s } : { s_prime } : { o } { c :.80f} \n "
446
+ file_str + f"R: { a } : { s } : { s_prime } : { o } { c :.80f} \n "
451
447
)
452
448
return file_str
453
449
@@ -501,15 +497,13 @@ def generate_os_posg_game_file(game_config: IntrusionRecoveryGameConfig) -> str:
501
497
"""
502
498
num_partitions = 1
503
499
transitions = IntrusionRecoveryPomdpUtil .generate_transitions (
504
- game_config = game_config
505
- )
500
+ game_config = game_config )
506
501
rewards = IntrusionRecoveryPomdpUtil .generate_rewards (game_config = game_config )
507
502
game_description = (
508
503
f"{ len (game_config .states )} { num_partitions } { len (game_config .actions )} "
509
504
f"{ len (game_config .actions )} "
510
505
f"{ len (game_config .observations )} { len (transitions )} "
511
- f"{ len (rewards )} { game_config .discount_factor } "
512
- )
506
+ f"{ len (rewards )} { game_config .discount_factor } " )
513
507
state_desriptions = []
514
508
for s in game_config .states :
515
509
state_desriptions .append (f"{ s } { 0 } " )
@@ -518,22 +512,16 @@ def generate_os_posg_game_file(game_config: IntrusionRecoveryGameConfig) -> str:
518
512
519
513
player_2_legal_actions = []
520
514
for _ in game_config .states :
521
- player_2_legal_actions .append (
522
- " " .join (list (map (lambda x : str (x ), game_config .actions )))
523
- )
515
+ player_2_legal_actions .append (" " .join (list (map (lambda x : str (x ), game_config .actions ))))
524
516
525
517
player_1_legal_actions = []
526
- player_1_legal_actions .append (
527
- " " .join (list (map (lambda x : str (x ), game_config .actions )))
528
- )
518
+ player_1_legal_actions .append (" " .join (list (map (lambda x : str (x ), game_config .actions ))))
529
519
530
520
obs_desriptions = []
531
521
for i , o in enumerate (game_config .observations ):
532
522
obs_desriptions .append (f"o_{ o } " )
533
523
534
- initial_belief_str = (
535
- f"{ 0 } { ' ' .join (list (map (lambda x : str (x ), game_config .b1 )))} "
536
- )
524
+ initial_belief_str = f"{ 0 } { ' ' .join (list (map (lambda x : str (x ), game_config .b1 )))} "
537
525
game_file_str = ""
538
526
game_file_str = game_file_str + game_description + "\n "
539
527
game_file_str = game_file_str + "\n " .join (state_desriptions ) + "\n "
0 commit comments