@@ -245,6 +245,41 @@ def replace_target(self, node: KCFG.Node) -> KCFG.Edge:
245
245
assert node .id == self .target .id
246
246
return KCFG .Edge (self .source , node , self .depth , self .rules )
247
247
248
+ @final
249
+ @dataclass (frozen = True )
250
+ class MergedEdge (EdgeLike ):
251
+ """Merged edge is a collection of edges that have been merged into a single edge."""
252
+
253
+ source : KCFG .Node
254
+ target : KCFG .Node
255
+ edges : tuple [KCFG .Edge , ...]
256
+
257
+ def to_dict (self ) -> dict [str , Any ]:
258
+ return {
259
+ 'source' : self .source .id ,
260
+ 'target' : self .target .id ,
261
+ 'edges' : [edge .to_dict () for edge in self .edges ],
262
+ }
263
+
264
+ @staticmethod
265
+ def from_dict (dct : dict [str , Any ], nodes : Mapping [int , KCFG .Node ]) -> KCFG .Successor :
266
+ return KCFG .MergedEdge (
267
+ nodes [dct ['source' ]],
268
+ nodes [dct ['target' ]],
269
+ tuple (KCFG .Edge .from_dict (edge , nodes ) for edge in dct ['edges' ]),
270
+ )
271
+
272
+ def replace_source (self , node : KCFG .Node ) -> KCFG .Successor :
273
+ assert node .id == self .source .id
274
+ return KCFG .MergedEdge (node , self .target , self .edges )
275
+
276
+ def replace_target (self , node : KCFG .Node ) -> KCFG .Successor :
277
+ assert node .id == self .target .id
278
+ return KCFG .MergedEdge (self .source , node , self .edges )
279
+
280
+ def to_rule (self , label : str , claim : bool = False , priority : int | None = None ) -> KRuleLike :
281
+ return KCFG .Edge (self .source , self .target , 1 , ()).to_rule (label , claim , priority )
282
+
248
283
@final
249
284
@dataclass (frozen = True )
250
285
class Cover (EdgeLike ):
@@ -389,6 +424,7 @@ def replace_target(self, node: KCFG.Node) -> KCFG.NDBranch:
389
424
_deleted_nodes : set [int ]
390
425
391
426
_edges : dict [int , Edge ]
427
+ _merged_edges : dict [int , MergedEdge ]
392
428
_covers : dict [int , Cover ]
393
429
_splits : dict [int , Split ]
394
430
_ndbranches : dict [int , NDBranch ]
@@ -408,6 +444,7 @@ def __init__(self, cfg_dir: Path | None = None, optimize_memory: bool = True) ->
408
444
self ._created_nodes = set ()
409
445
self ._deleted_nodes = set ()
410
446
self ._edges = {}
447
+ self ._merged_edges = {}
411
448
self ._covers = {}
412
449
self ._splits = {}
413
450
self ._ndbranches = {}
@@ -421,6 +458,8 @@ def __contains__(self, item: object) -> bool:
421
458
return self .contains_node (item )
422
459
if type (item ) is KCFG .Edge :
423
460
return self .contains_edge (item )
461
+ if type (item ) is KCFG .MergedEdge :
462
+ return self .contains_merged_edge (item )
424
463
if type (item ) is KCFG .Cover :
425
464
return self .contains_cover (item )
426
465
if type (item ) is KCFG .Split :
@@ -502,6 +541,8 @@ def path_length(_path: Iterable[KCFG.Successor]) -> int:
502
541
return 1 + KCFG .path_length (_path [1 :])
503
542
elif type (_path [0 ]) is KCFG .Edge :
504
543
return _path [0 ].depth + KCFG .path_length (_path [1 :])
544
+ elif type (_path [0 ]) is KCFG .MergedEdge :
545
+ return min (edge .depth for edge in _path [0 ].edges ) + KCFG .path_length (_path [1 :]) # todo: check this
505
546
raise ValueError (f'Cannot handle Successor type: { type (_path [0 ])} ' )
506
547
507
548
def extend (
@@ -576,6 +617,7 @@ def to_dict_no_nodes(self) -> dict[str, Any]:
576
617
def to_dict (self ) -> dict [str , Any ]:
577
618
nodes = [node .to_dict () for node in self .nodes ]
578
619
edges = [edge .to_dict () for edge in self .edges ()]
620
+ merged_edges = [merged_edge .to_dict () for merged_edge in self .merged_edges ()]
579
621
covers = [cover .to_dict () for cover in self .covers ()]
580
622
splits = [split .to_dict () for split in self .splits ()]
581
623
ndbranches = [ndbranch .to_dict () for ndbranch in self .ndbranches ()]
@@ -586,6 +628,7 @@ def to_dict(self) -> dict[str, Any]:
586
628
'next' : self ._node_id ,
587
629
'nodes' : nodes ,
588
630
'edges' : edges ,
631
+ 'merged_edges' : merged_edges ,
589
632
'covers' : covers ,
590
633
'splits' : splits ,
591
634
'ndbranches' : ndbranches ,
@@ -608,6 +651,10 @@ def from_dict(dct: Mapping[str, Any], optimize_memory: bool = True) -> KCFG:
608
651
edge = KCFG .Edge .from_dict (edge_dict , cfg ._nodes )
609
652
cfg .add_successor (edge )
610
653
654
+ for edge_dict in dct .get ('merged_edges' ) or []:
655
+ merged_edge = KCFG .MergedEdge .from_dict (edge_dict , cfg ._nodes )
656
+ cfg .add_successor (merged_edge )
657
+
611
658
for cover_dict in dct .get ('covers' ) or []:
612
659
cover = KCFG .Cover .from_dict (cover_dict , cfg ._nodes )
613
660
cfg .add_successor (cover )
@@ -636,9 +683,11 @@ def to_json(self) -> str:
636
683
def from_json (s : str , optimize_memory : bool = True ) -> KCFG :
637
684
return KCFG .from_dict (json .loads (s ), optimize_memory = optimize_memory )
638
685
639
- def to_rules (self , priority : int = 20 , id : str | None = None ) -> list [KRuleLike ]:
640
- id = '' if id is None else f'{ id } -'
641
- return [e .to_rule (f'{ id } BASIC-BLOCK' , priority = priority ) for e in self .edges ()]
686
+ def to_rules (self , _id : str | None = None , priority : int = 20 ) -> list [KRuleLike ]:
687
+ _id = 'BASIC-BLOCK' if _id is None else _id
688
+ return [e .to_rule (_id , priority = priority ) for e in self .edges ()] + [
689
+ m .to_rule (_id , priority = priority ) for m in self .merged_edges ()
690
+ ]
642
691
643
692
def to_module (
644
693
self ,
@@ -707,6 +756,9 @@ def remove_node(self, node_id: NodeIdLike) -> None:
707
756
self ._deleted_nodes .add (node .id )
708
757
709
758
self ._edges = {k : s for k , s in self ._edges .items () if k != node_id and node_id not in s .target_ids }
759
+ self ._merged_edges = {
760
+ k : s for k , s in self ._merged_edges .items () if k != node_id and node_id not in s .target_ids
761
+ }
710
762
self ._covers = {k : s for k , s in self ._covers .items () if k != node_id and node_id not in s .target_ids }
711
763
712
764
self ._splits = {k : s for k , s in self ._splits .items () if k != node_id and node_id not in s .target_ids }
@@ -721,6 +773,8 @@ def _update_refs(self, node_id: int) -> None:
721
773
new_succ = succ .replace_source (node )
722
774
if type (new_succ ) is KCFG .Edge :
723
775
self ._edges [new_succ .source .id ] = new_succ
776
+ if type (new_succ ) is KCFG .MergedEdge :
777
+ self ._merged_edges [new_succ .source .id ] = new_succ
724
778
if type (new_succ ) is KCFG .Cover :
725
779
self ._covers [new_succ .source .id ] = new_succ
726
780
if type (new_succ ) is KCFG .Split :
@@ -732,6 +786,8 @@ def _update_refs(self, node_id: int) -> None:
732
786
new_pred = pred .replace_target (node )
733
787
if type (new_pred ) is KCFG .Edge :
734
788
self ._edges [new_pred .source .id ] = new_pred
789
+ if type (new_pred ) is KCFG .MergedEdge :
790
+ self ._merged_edges [new_pred .source .id ] = new_pred
735
791
if type (new_pred ) is KCFG .Cover :
736
792
self ._covers [new_pred .source .id ] = new_pred
737
793
if type (new_pred ) is KCFG .Split :
@@ -768,17 +824,19 @@ def replace_node(self, node: KCFG.Node) -> None:
768
824
769
825
def successors (self , source_id : NodeIdLike ) -> list [Successor ]:
770
826
out_edges : Iterable [KCFG .Successor ] = self .edges (source_id = source_id )
827
+ out_merged_edges : Iterable [KCFG .Successor ] = self .merged_edges (source_id = source_id )
771
828
out_covers : Iterable [KCFG .Successor ] = self .covers (source_id = source_id )
772
829
out_splits : Iterable [KCFG .Successor ] = self .splits (source_id = source_id )
773
830
out_ndbranches : Iterable [KCFG .Successor ] = self .ndbranches (source_id = source_id )
774
- return list (out_edges ) + list (out_covers ) + list (out_splits ) + list (out_ndbranches )
831
+ return list (out_edges ) + list (out_merged_edges ) + list ( out_covers ) + list (out_splits ) + list (out_ndbranches )
775
832
776
833
def predecessors (self , target_id : NodeIdLike ) -> list [Successor ]:
777
834
in_edges : Iterable [KCFG .Successor ] = self .edges (target_id = target_id )
835
+ in_merged_edges : Iterable [KCFG .Successor ] = self .merged_edges (target_id = target_id )
778
836
in_covers : Iterable [KCFG .Successor ] = self .covers (target_id = target_id )
779
837
in_splits : Iterable [KCFG .Successor ] = self .splits (target_id = target_id )
780
838
in_ndbranches : Iterable [KCFG .Successor ] = self .ndbranches (target_id = target_id )
781
- return list (in_edges ) + list (in_covers ) + list (in_splits ) + list (in_ndbranches )
839
+ return list (in_edges ) + list (in_merged_edges ) + list ( in_covers ) + list (in_splits ) + list (in_ndbranches )
782
840
783
841
def _check_no_successors (self , source_id : NodeIdLike ) -> None :
784
842
if len (self .successors (source_id )) > 0 :
@@ -797,6 +855,8 @@ def add_successor(self, succ: KCFG.Successor) -> None:
797
855
self ._check_no_zero_loops (succ .source .id , succ .target_ids )
798
856
if type (succ ) is KCFG .Edge :
799
857
self ._edges [succ .source .id ] = succ
858
+ elif type (succ ) is KCFG .MergedEdge :
859
+ self ._merged_edges [succ .source .id ] = succ
800
860
elif type (succ ) is KCFG .Cover :
801
861
self ._covers [succ .source .id ] = succ
802
862
else :
@@ -846,6 +906,46 @@ def remove_edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> None:
846
906
raise ValueError (f'Edge does not exist: { source_id } -> { target_id } ' )
847
907
self ._edges .pop (source_id )
848
908
909
+ def merged_edge (self , source_id : NodeIdLike , target_id : NodeIdLike ) -> MergedEdge | None :
910
+ source_id = self ._resolve (source_id )
911
+ target_id = self ._resolve (target_id )
912
+ merged_edge = self ._merged_edges .get (source_id , None )
913
+ return merged_edge if merged_edge is not None and merged_edge .target .id == target_id else None
914
+
915
+ def merged_edges (
916
+ self , * , source_id : NodeIdLike | None = None , target_id : NodeIdLike | None = None
917
+ ) -> list [MergedEdge ]:
918
+ source_id = self ._resolve (source_id ) if source_id is not None else None
919
+ target_id = self ._resolve (target_id ) if target_id is not None else None
920
+ return [
921
+ merged_edge
922
+ for merged_edge in self ._merged_edges .values ()
923
+ if (source_id is None or source_id == merged_edge .source .id )
924
+ and (target_id is None or target_id == merged_edge .target .id )
925
+ ]
926
+
927
+ def contains_merged_edge (self , edge : MergedEdge ) -> bool :
928
+ if other := self .merged_edge (source_id = edge .source .id , target_id = edge .target .id ):
929
+ return edge == other
930
+ return False
931
+
932
+ def create_merged_edge (self , source_id : NodeIdLike , target_id : NodeIdLike , edges : Iterable [Edge ]) -> MergedEdge :
933
+ if len (list (edges )) == 0 :
934
+ raise ValueError (f'Cannot build KCFG MergedEdge with no edges: { edges } ' )
935
+ source = self .node (source_id )
936
+ target = self .node (target_id )
937
+ merged_edge = KCFG .MergedEdge (source , target , tuple (edges ))
938
+ self .add_successor (merged_edge )
939
+ return merged_edge
940
+
941
+ def remove_merged_edge (self , source_id : NodeIdLike , target_id : NodeIdLike ) -> None :
942
+ source_id = self ._resolve (source_id )
943
+ target_id = self ._resolve (target_id )
944
+ merged_edge = self .merged_edge (source_id , target_id )
945
+ if not merged_edge :
946
+ raise ValueError (f'MergedEdge does not exist: { source_id } -> { target_id } ' )
947
+ self ._merged_edges .pop (source_id )
948
+
849
949
def cover (self , source_id : NodeIdLike , target_id : NodeIdLike ) -> Cover | None :
850
950
source_id = self ._resolve (source_id )
851
951
target_id = self ._resolve (target_id )
@@ -887,8 +987,10 @@ def remove_cover(self, source_id: NodeIdLike, target_id: NodeIdLike) -> None:
887
987
self ._covers .pop (source_id )
888
988
889
989
def edge_likes (self , * , source_id : NodeIdLike | None = None , target_id : NodeIdLike | None = None ) -> list [EdgeLike ]:
890
- return cast ('List[KCFG.EdgeLike]' , self .edges (source_id = source_id , target_id = target_id )) + cast (
891
- 'List[KCFG.EdgeLike]' , self .covers (source_id = source_id , target_id = target_id )
990
+ return (
991
+ cast ('List[KCFG.EdgeLike]' , self .edges (source_id = source_id , target_id = target_id ))
992
+ + cast ('List[KCFG.EdgeLike]' , self .covers (source_id = source_id , target_id = target_id ))
993
+ + cast ('List[KCFG.EdgeLike]' , self .merged_edges (source_id = source_id , target_id = target_id ))
892
994
)
893
995
894
996
def add_vacuous (self , node_id : NodeIdLike ) -> None :
0 commit comments