Source code for pyk.kcfg.minimize

  1from __future__ import annotations
  2
  3from functools import reduce
  4from typing import TYPE_CHECKING
  5
  6from ..cterm.cterm import cterms_anti_unify
  7from ..utils import partition, single
  8from .semantics import DefaultSemantics
  9
 10if TYPE_CHECKING:
 11    from collections.abc import Callable
 12
 13    from ..kast.outer import KDefinition
 14    from .kcfg import KCFG, NodeIdLike
 15    from .semantics import KCFGSemantics
 16
 17
[docs] 18class KCFGMinimizer: 19 kcfg: KCFG 20 semantics: KCFGSemantics 21 kdef: KDefinition | None 22 23 def __init__(self, kcfg: KCFG, heuristics: KCFGSemantics | None = None, kdef: KDefinition | None = None) -> None: 24 if heuristics is None: 25 heuristics = DefaultSemantics() 26 self.kcfg = kcfg 27 self.semantics = heuristics 28 self.kdef = kdef 29
[docs] 30 def lift_edge(self, b_id: NodeIdLike) -> None: 31 """Lift an edge up another edge directly preceding it. 32 33 `A --M steps--> B --N steps--> C` becomes `A --(M + N) steps--> C`. Node `B` is removed. 34 35 Args: 36 b_id: the identifier of the central node `B` of a sequence of edges `A --> B --> C`. 37 38 Raises: 39 AssertionError: If the edges in question are not in place. 40 """ 41 # Obtain edges `A -> B`, `B -> C` 42 a_to_b = single(self.kcfg.edges(target_id=b_id)) 43 b_to_c = single(self.kcfg.edges(source_id=b_id)) 44 # Remove the node `B`, effectively removing the entire initial structure 45 self.kcfg.remove_node(b_id) 46 # Create edge `A -> C` 47 self.kcfg.create_edge( 48 a_to_b.source.id, b_to_c.target.id, a_to_b.depth + b_to_c.depth, a_to_b.rules + b_to_c.rules 49 )
50
[docs] 51 def lift_edges(self) -> bool: 52 """Perform all possible edge lifts across the KCFG. 53 54 The KCFG is transformed to an equivalent in which no further edge lifts are possible. 55 56 Given the KCFG design, it is not possible for one edge lift to either disallow another or 57 allow another that was not previously possible. Therefore, this function is guaranteed to 58 lift all possible edges without having to loop. 59 60 Returns: 61 An indicator of whether or not at least one edge lift was performed. 62 """ 63 edges_to_lift = sorted( 64 [ 65 node.id 66 for node in self.kcfg.nodes 67 if len(self.kcfg.edges(source_id=node.id)) > 0 and len(self.kcfg.edges(target_id=node.id)) > 0 68 ] 69 ) 70 for node_id in edges_to_lift: 71 self.lift_edge(node_id) 72 return len(edges_to_lift) > 0
73
[docs] 74 def lift_split_edge(self, b_id: NodeIdLike) -> None: 75 """Lift a split up an edge directly preceding it. 76 77 `A --M steps--> B --[cond_1, ..., cond_N]--> [C_1, ..., C_N]` becomes 78 `A --[cond_1, ..., cond_N]--> [A #And cond_1 --M steps--> C_1, ..., A #And cond_N --M steps--> C_N]`. 79 Node `B` is removed. 80 81 Args: 82 b_id: The identifier of the central node `B` of the structure `A --> B --> [C_1, ..., C_N]`. 83 84 Raises: 85 AssertionError: If the structure in question is not in place. 86 AssertionError: If any of the `cond_i` contain variables not present in `A`. 87 """ 88 # Obtain edge `A -> B`, split `[cond_I, C_I | I = 1..N ]` 89 a_to_b = single(self.kcfg.edges(target_id=b_id)) 90 a = a_to_b.source 91 split_from_b = single(self.kcfg.splits(source_id=b_id)) 92 ci, csubsts = list(split_from_b.splits.keys()), list(split_from_b.splits.values()) 93 # Ensure split can be lifted soundly (i.e., that it does not introduce fresh variables) 94 assert ( 95 len(split_from_b.source_vars.difference(a.free_vars)) == 0 96 and len(split_from_b.target_vars.difference(split_from_b.source_vars)) == 0 # <-- Can we delete this check? 97 ) 98 # Create CTerms and CSubsts corresponding to the new targets of the split 99 new_cterms = [csubst(a.cterm) for csubst in csubsts] 100 # Remove the node `B`, effectively removing the entire initial structure 101 self.kcfg.remove_node(b_id) 102 # Create the nodes `[ A #And cond_I | I = 1..N ]`. 103 ai: list[NodeIdLike] = [self.kcfg.create_node(cterm).id for cterm in new_cterms] 104 # Create the edges `[A #And cond_1 --M steps--> C_I | I = 1..N ]` 105 for i in range(len(ai)): 106 self.kcfg.create_edge(ai[i], ci[i], a_to_b.depth, a_to_b.rules) 107 # Create the split `A --[cond_1, ..., cond_N]--> [A #And cond_1, ..., A #And cond_N] 108 self.kcfg.create_split_by_nodes(a.id, ai)
109
[docs] 110 def lift_split_split(self, b_id: NodeIdLike) -> None: 111 """Lift a split up a split directly preceding it, joining them into a single split. 112 113 `A --[..., cond_B, ...]--> [..., B, ...]` with `B --[cond_1, ..., cond_N]--> [C_1, ..., C_N]` becomes 114 `A --[..., cond_B #And cond_1, ..., cond_B #And cond_N, ...]--> [..., C_1, ..., C_N, ...]`. 115 Node `B` is removed. 116 117 Args: 118 b_id: the identifier of the node `B` of the structure 119 `A --[..., cond_B, ...]--> [..., B, ...]` with `B --[cond_1, ..., cond_N]--> [C_1, ..., C_N]`. 120 121 Raises: 122 AssertionError: If the structure in question is not in place. 123 """ 124 # Obtain splits `A --[..., cond_B, ...]--> [..., B, ...]` and 125 # `B --[cond_1, ..., cond_N]--> [C_1, ..., C_N]-> [C_1, ..., C_N]` 126 split_from_a, split_from_b = single(self.kcfg.splits(target_id=b_id)), single(self.kcfg.splits(source_id=b_id)) 127 splits_from_a, splits_from_b = split_from_a.splits, split_from_b.splits 128 a = split_from_a.source 129 list(splits_from_b.keys()) 130 # Ensure split can be lifted soundly (i.e., that it does not introduce fresh variables) 131 assert ( # <-- Does it will be a problem when using merging nodes, because it would introduce new variables? 132 len(split_from_b.source_vars.difference(a.free_vars)) == 0 133 and len(split_from_b.target_vars.difference(split_from_b.source_vars)) == 0 134 ) 135 # Remove the node `B`, thereby removing the two splits as well 136 splits_from_a.pop(self.kcfg.node(b_id).id) 137 self.kcfg.remove_node(b_id) 138 # Create the new split `A --[..., cond_B #And cond_1, ..., cond_B #And cond_N, ...]--> [..., C_1, ..., C_N, ...]` 139 self.kcfg.create_split_by_nodes(a.id, list(splits_from_a.keys()) + list(splits_from_b.keys()))
140
[docs] 141 def lift_splits(self) -> bool: 142 """Perform all possible split liftings. 143 144 The KCFG is transformed to an equivalent in which no further split lifts are possible. 145 146 Returns: 147 An indicator of whether or not at least one split lift was performed. 148 """ 149 150 def _lift_split(finder: Callable, lifter: Callable) -> bool: 151 result = False 152 while True: 153 splits_to_lift = sorted( 154 [ 155 node.id 156 for node in self.kcfg.nodes 157 if (splits := self.kcfg.splits(source_id=node.id)) != [] 158 and (sources := finder(target_id=node.id)) != [] 159 and (source := single(sources).source) 160 and (split := single(splits)) 161 and len(split.source_vars.difference(source.free_vars)) == 0 162 and len(split.target_vars.difference(split.source_vars)) == 0 163 ] 164 ) 165 for id in splits_to_lift: 166 lifter(id) 167 result = True 168 if len(splits_to_lift) == 0: 169 break 170 return result 171 172 def _fold_lift(result: bool, finder_lifter: tuple[Callable, Callable]) -> bool: 173 return _lift_split(finder_lifter[0], finder_lifter[1]) or result 174 175 return reduce( 176 _fold_lift, [(self.kcfg.edges, self.lift_split_edge), (self.kcfg.splits, self.lift_split_split)], False 177 )
178
[docs] 179 def merge_nodes(self) -> bool: 180 """Merge targets of Split for cutting down the number of branches, using heuristics KCFGSemantics.is_mergeable. 181 182 Side Effect: The KCFG is rewritten by the following rewrite pattern, 183 - Match: A -|Split|-> A_i -|Edge|-> B_i 184 - Rewrite: 185 - if `B_x, B_y, ..., B_z are not mergeable` then unchanged 186 - if `B_x, B_y, ..., B_z are mergeable`, then 187 - A -|Split|-> A_x or A_y or ... or A_z 188 - A_x or A_y or ... or A_z -|Edge|-> B_x or B_y or ... or B_z 189 - B_x or B_y or ... or B_z -|Split|-> B_x, B_y, ..., B_z 190 191 Specifically, when `B_merge = B_x or B_y or ... or B_z` 192 - `or`: fresh variables in places where the configurations differ 193 - `Edge` in A_merged -|Edge|-> B_merge: list of merged edges is from A_i -|Edge|-> B_i 194 - `Split` in B_merge -|Split|-> B_x, B_y, ..., B_z: subst for it is from A -|Split|-> A_1, A_2, ..., A_n 195 :param semantics: provides the is_mergeable heuristic 196 :return: whether any merge was performed 197 """ 198 199 def _is_mergeable(x: KCFG.Edge | KCFG.MergedEdge, y: KCFG.Edge | KCFG.MergedEdge) -> bool: 200 return self.semantics.is_mergeable(x.target.cterm, y.target.cterm) 201 202 # ---- Match ---- 203 204 # A -|Split|> Ai -|Edge/MergedEdge|> Mergeable Bi 205 sub_graphs: list[tuple[KCFG.Split, list[list[KCFG.Edge | KCFG.MergedEdge]]]] = [] 206 207 for split in self.kcfg.splits(): 208 _edges = [ 209 single(self.kcfg.general_edges(source_id=ai)) 210 for ai in split.target_ids 211 if self.kcfg.general_edges(source_id=ai) 212 ] 213 _partitions = partition(_edges, _is_mergeable) 214 if len(_partitions) < len(_edges): 215 sub_graphs.append((split, _partitions)) 216 217 if not sub_graphs: 218 return False 219 220 # ---- Rewrite ---- 221 222 for split, edge_partitions in sub_graphs: 223 224 # Remove the original sub-graphs 225 for p in edge_partitions: 226 if len(p) == 1: 227 continue 228 for e in p: 229 # TODO: remove the split and edges, then safely remove the nodes. 230 self.kcfg.remove_edges_around(e.source.id) 231 232 # Create A -|MergedEdge|-> Merged_Bi -|Split|-> Bi, if one edge partition covers all the splits 233 if len(edge_partitions) == 1: 234 merged_bi_cterm, merged_bi_subst = cterms_anti_unify( 235 [edge.target.cterm for edge in edge_partitions[0]], keep_values=True, kdef=self.kdef 236 ) 237 merged_bi = self.kcfg.create_node(merged_bi_cterm) 238 self.kcfg.create_merged_edge(split.source.id, merged_bi.id, edge_partitions[0]) 239 self.kcfg.create_split( 240 merged_bi.id, zip([e.target.id for e in edge_partitions[0]], merged_bi_subst, strict=True) 241 ) 242 continue 243 244 # Create A -|Split|-> Others & Merged_Ai -|MergedEdge|-> Merged_Bi -|Split|-> Bi 245 _split_nodes: list[NodeIdLike] = [] 246 for edge_partition in edge_partitions: 247 if len(edge_partition) == 1: 248 _split_nodes.append(edge_partition[0].source.id) 249 continue 250 merged_ai_cterm, _ = cterms_anti_unify( 251 [ai2bi.source.cterm for ai2bi in edge_partition], keep_values=True, kdef=self.kdef 252 ) 253 merged_bi_cterm, merged_bi_subst = cterms_anti_unify( 254 [ai2bi.target.cterm for ai2bi in edge_partition], keep_values=True, kdef=self.kdef 255 ) 256 merged_ai = self.kcfg.create_node(merged_ai_cterm) 257 _split_nodes.append(merged_ai.id) 258 merged_bi = self.kcfg.create_node(merged_bi_cterm) 259 self.kcfg.create_merged_edge(merged_ai.id, merged_bi.id, edge_partition) 260 self.kcfg.create_split( 261 merged_bi.id, zip([ai2bi.target.id for ai2bi in edge_partition], merged_bi_subst, strict=True) 262 ) 263 self.kcfg.create_split_by_nodes(split.source.id, _split_nodes) 264 265 return True
266
[docs] 267 def minimize(self, merge: bool = False) -> None: 268 """Minimize KCFG by repeatedly performing the lifting transformations. 269 270 The KCFG is transformed to an equivalent in which no further lifting transformations are possible. 271 The loop is designed so that each transformation is performed once in each iteration. 272 """ 273 repeat = True 274 while repeat: 275 repeat = self.lift_edges() 276 repeat = self.lift_splits() or repeat 277 278 repeat = True 279 while repeat and merge: 280 repeat = self.merge_nodes()