1from __future__ import annotations
2
3from functools import reduce
4from typing import TYPE_CHECKING
5
6from ..cterm.cterm import cterms_anti_unify
7from ..utils import partition, single
8from .semantics import DefaultSemantics
9
10if TYPE_CHECKING:
11 from collections.abc import Callable
12
13 from ..kast.outer import KDefinition
14 from .kcfg import KCFG, NodeIdLike
15 from .semantics import KCFGSemantics
16
17
[docs]
18class KCFGMinimizer:
19 kcfg: KCFG
20 semantics: KCFGSemantics
21 kdef: KDefinition | None
22
23 def __init__(self, kcfg: KCFG, heuristics: KCFGSemantics | None = None, kdef: KDefinition | None = None) -> None:
24 if heuristics is None:
25 heuristics = DefaultSemantics()
26 self.kcfg = kcfg
27 self.semantics = heuristics
28 self.kdef = kdef
29
[docs]
30 def lift_edge(self, b_id: NodeIdLike) -> None:
31 """Lift an edge up another edge directly preceding it.
32
33 `A --M steps--> B --N steps--> C` becomes `A --(M + N) steps--> C`. Node `B` is removed.
34
35 Args:
36 b_id: the identifier of the central node `B` of a sequence of edges `A --> B --> C`.
37
38 Raises:
39 AssertionError: If the edges in question are not in place.
40 """
41 # Obtain edges `A -> B`, `B -> C`
42 a_to_b = single(self.kcfg.edges(target_id=b_id))
43 b_to_c = single(self.kcfg.edges(source_id=b_id))
44 # Remove the node `B`, effectively removing the entire initial structure
45 self.kcfg.remove_node(b_id)
46 # Create edge `A -> C`
47 self.kcfg.create_edge(
48 a_to_b.source.id, b_to_c.target.id, a_to_b.depth + b_to_c.depth, a_to_b.rules + b_to_c.rules
49 )
50
[docs]
51 def lift_edges(self) -> bool:
52 """Perform all possible edge lifts across the KCFG.
53
54 The KCFG is transformed to an equivalent in which no further edge lifts are possible.
55
56 Given the KCFG design, it is not possible for one edge lift to either disallow another or
57 allow another that was not previously possible. Therefore, this function is guaranteed to
58 lift all possible edges without having to loop.
59
60 Returns:
61 An indicator of whether or not at least one edge lift was performed.
62 """
63 edges_to_lift = sorted(
64 [
65 node.id
66 for node in self.kcfg.nodes
67 if len(self.kcfg.edges(source_id=node.id)) > 0 and len(self.kcfg.edges(target_id=node.id)) > 0
68 ]
69 )
70 for node_id in edges_to_lift:
71 self.lift_edge(node_id)
72 return len(edges_to_lift) > 0
73
[docs]
74 def lift_split_edge(self, b_id: NodeIdLike) -> None:
75 """Lift a split up an edge directly preceding it.
76
77 `A --M steps--> B --[cond_1, ..., cond_N]--> [C_1, ..., C_N]` becomes
78 `A --[cond_1, ..., cond_N]--> [A #And cond_1 --M steps--> C_1, ..., A #And cond_N --M steps--> C_N]`.
79 Node `B` is removed.
80
81 Args:
82 b_id: The identifier of the central node `B` of the structure `A --> B --> [C_1, ..., C_N]`.
83
84 Raises:
85 AssertionError: If the structure in question is not in place.
86 AssertionError: If any of the `cond_i` contain variables not present in `A`.
87 """
88 # Obtain edge `A -> B`, split `[cond_I, C_I | I = 1..N ]`
89 a_to_b = single(self.kcfg.edges(target_id=b_id))
90 a = a_to_b.source
91 split_from_b = single(self.kcfg.splits(source_id=b_id))
92 ci, csubsts = list(split_from_b.splits.keys()), list(split_from_b.splits.values())
93 # Ensure split can be lifted soundly (i.e., that it does not introduce fresh variables)
94 assert (
95 len(split_from_b.source_vars.difference(a.free_vars)) == 0
96 and len(split_from_b.target_vars.difference(split_from_b.source_vars)) == 0 # <-- Can we delete this check?
97 )
98 # Create CTerms and CSubsts corresponding to the new targets of the split
99 new_cterms = [csubst(a.cterm) for csubst in csubsts]
100 # Remove the node `B`, effectively removing the entire initial structure
101 self.kcfg.remove_node(b_id)
102 # Create the nodes `[ A #And cond_I | I = 1..N ]`.
103 ai: list[NodeIdLike] = [self.kcfg.create_node(cterm).id for cterm in new_cterms]
104 # Create the edges `[A #And cond_1 --M steps--> C_I | I = 1..N ]`
105 for i in range(len(ai)):
106 self.kcfg.create_edge(ai[i], ci[i], a_to_b.depth, a_to_b.rules)
107 # Create the split `A --[cond_1, ..., cond_N]--> [A #And cond_1, ..., A #And cond_N]
108 self.kcfg.create_split_by_nodes(a.id, ai)
109
[docs]
110 def lift_split_split(self, b_id: NodeIdLike) -> None:
111 """Lift a split up a split directly preceding it, joining them into a single split.
112
113 `A --[..., cond_B, ...]--> [..., B, ...]` with `B --[cond_1, ..., cond_N]--> [C_1, ..., C_N]` becomes
114 `A --[..., cond_B #And cond_1, ..., cond_B #And cond_N, ...]--> [..., C_1, ..., C_N, ...]`.
115 Node `B` is removed.
116
117 Args:
118 b_id: the identifier of the node `B` of the structure
119 `A --[..., cond_B, ...]--> [..., B, ...]` with `B --[cond_1, ..., cond_N]--> [C_1, ..., C_N]`.
120
121 Raises:
122 AssertionError: If the structure in question is not in place.
123 """
124 # Obtain splits `A --[..., cond_B, ...]--> [..., B, ...]` and
125 # `B --[cond_1, ..., cond_N]--> [C_1, ..., C_N]-> [C_1, ..., C_N]`
126 split_from_a, split_from_b = single(self.kcfg.splits(target_id=b_id)), single(self.kcfg.splits(source_id=b_id))
127 splits_from_a, splits_from_b = split_from_a.splits, split_from_b.splits
128 a = split_from_a.source
129 list(splits_from_b.keys())
130 # Ensure split can be lifted soundly (i.e., that it does not introduce fresh variables)
131 assert ( # <-- Does it will be a problem when using merging nodes, because it would introduce new variables?
132 len(split_from_b.source_vars.difference(a.free_vars)) == 0
133 and len(split_from_b.target_vars.difference(split_from_b.source_vars)) == 0
134 )
135 # Remove the node `B`, thereby removing the two splits as well
136 splits_from_a.pop(self.kcfg.node(b_id).id)
137 self.kcfg.remove_node(b_id)
138 # Create the new split `A --[..., cond_B #And cond_1, ..., cond_B #And cond_N, ...]--> [..., C_1, ..., C_N, ...]`
139 self.kcfg.create_split_by_nodes(a.id, list(splits_from_a.keys()) + list(splits_from_b.keys()))
140
[docs]
141 def lift_splits(self) -> bool:
142 """Perform all possible split liftings.
143
144 The KCFG is transformed to an equivalent in which no further split lifts are possible.
145
146 Returns:
147 An indicator of whether or not at least one split lift was performed.
148 """
149
150 def _lift_split(finder: Callable, lifter: Callable) -> bool:
151 result = False
152 while True:
153 splits_to_lift = sorted(
154 [
155 node.id
156 for node in self.kcfg.nodes
157 if (splits := self.kcfg.splits(source_id=node.id)) != []
158 and (sources := finder(target_id=node.id)) != []
159 and (source := single(sources).source)
160 and (split := single(splits))
161 and len(split.source_vars.difference(source.free_vars)) == 0
162 and len(split.target_vars.difference(split.source_vars)) == 0
163 ]
164 )
165 for id in splits_to_lift:
166 lifter(id)
167 result = True
168 if len(splits_to_lift) == 0:
169 break
170 return result
171
172 def _fold_lift(result: bool, finder_lifter: tuple[Callable, Callable]) -> bool:
173 return _lift_split(finder_lifter[0], finder_lifter[1]) or result
174
175 return reduce(
176 _fold_lift, [(self.kcfg.edges, self.lift_split_edge), (self.kcfg.splits, self.lift_split_split)], False
177 )
178
[docs]
179 def merge_nodes(self) -> bool:
180 """Merge targets of Split for cutting down the number of branches, using heuristics KCFGSemantics.is_mergeable.
181
182 Side Effect: The KCFG is rewritten by the following rewrite pattern,
183 - Match: A -|Split|-> A_i -|Edge|-> B_i
184 - Rewrite:
185 - if `B_x, B_y, ..., B_z are not mergeable` then unchanged
186 - if `B_x, B_y, ..., B_z are mergeable`, then
187 - A -|Split|-> A_x or A_y or ... or A_z
188 - A_x or A_y or ... or A_z -|Edge|-> B_x or B_y or ... or B_z
189 - B_x or B_y or ... or B_z -|Split|-> B_x, B_y, ..., B_z
190
191 Specifically, when `B_merge = B_x or B_y or ... or B_z`
192 - `or`: fresh variables in places where the configurations differ
193 - `Edge` in A_merged -|Edge|-> B_merge: list of merged edges is from A_i -|Edge|-> B_i
194 - `Split` in B_merge -|Split|-> B_x, B_y, ..., B_z: subst for it is from A -|Split|-> A_1, A_2, ..., A_n
195 :param semantics: provides the is_mergeable heuristic
196 :return: whether any merge was performed
197 """
198
199 def _is_mergeable(x: KCFG.Edge | KCFG.MergedEdge, y: KCFG.Edge | KCFG.MergedEdge) -> bool:
200 return self.semantics.is_mergeable(x.target.cterm, y.target.cterm)
201
202 # ---- Match ----
203
204 # A -|Split|> Ai -|Edge/MergedEdge|> Mergeable Bi
205 sub_graphs: list[tuple[KCFG.Split, list[list[KCFG.Edge | KCFG.MergedEdge]]]] = []
206
207 for split in self.kcfg.splits():
208 _edges = [
209 single(self.kcfg.general_edges(source_id=ai))
210 for ai in split.target_ids
211 if self.kcfg.general_edges(source_id=ai)
212 ]
213 _partitions = partition(_edges, _is_mergeable)
214 if len(_partitions) < len(_edges):
215 sub_graphs.append((split, _partitions))
216
217 if not sub_graphs:
218 return False
219
220 # ---- Rewrite ----
221
222 for split, edge_partitions in sub_graphs:
223
224 # Remove the original sub-graphs
225 for p in edge_partitions:
226 if len(p) == 1:
227 continue
228 for e in p:
229 # TODO: remove the split and edges, then safely remove the nodes.
230 self.kcfg.remove_edges_around(e.source.id)
231
232 # Create A -|MergedEdge|-> Merged_Bi -|Split|-> Bi, if one edge partition covers all the splits
233 if len(edge_partitions) == 1:
234 merged_bi_cterm, merged_bi_subst = cterms_anti_unify(
235 [edge.target.cterm for edge in edge_partitions[0]], keep_values=True, kdef=self.kdef
236 )
237 merged_bi = self.kcfg.create_node(merged_bi_cterm)
238 self.kcfg.create_merged_edge(split.source.id, merged_bi.id, edge_partitions[0])
239 self.kcfg.create_split(
240 merged_bi.id, zip([e.target.id for e in edge_partitions[0]], merged_bi_subst, strict=True)
241 )
242 continue
243
244 # Create A -|Split|-> Others & Merged_Ai -|MergedEdge|-> Merged_Bi -|Split|-> Bi
245 _split_nodes: list[NodeIdLike] = []
246 for edge_partition in edge_partitions:
247 if len(edge_partition) == 1:
248 _split_nodes.append(edge_partition[0].source.id)
249 continue
250 merged_ai_cterm, _ = cterms_anti_unify(
251 [ai2bi.source.cterm for ai2bi in edge_partition], keep_values=True, kdef=self.kdef
252 )
253 merged_bi_cterm, merged_bi_subst = cterms_anti_unify(
254 [ai2bi.target.cterm for ai2bi in edge_partition], keep_values=True, kdef=self.kdef
255 )
256 merged_ai = self.kcfg.create_node(merged_ai_cterm)
257 _split_nodes.append(merged_ai.id)
258 merged_bi = self.kcfg.create_node(merged_bi_cterm)
259 self.kcfg.create_merged_edge(merged_ai.id, merged_bi.id, edge_partition)
260 self.kcfg.create_split(
261 merged_bi.id, zip([ai2bi.target.id for ai2bi in edge_partition], merged_bi_subst, strict=True)
262 )
263 self.kcfg.create_split_by_nodes(split.source.id, _split_nodes)
264
265 return True
266
[docs]
267 def minimize(self, merge: bool = False) -> None:
268 """Minimize KCFG by repeatedly performing the lifting transformations.
269
270 The KCFG is transformed to an equivalent in which no further lifting transformations are possible.
271 The loop is designed so that each transformation is performed once in each iteration.
272 """
273 repeat = True
274 while repeat:
275 repeat = self.lift_edges()
276 repeat = self.lift_splits() or repeat
277
278 repeat = True
279 while repeat and merge:
280 repeat = self.merge_nodes()