1from __future__ import annotations
2
3import logging
4from typing import TYPE_CHECKING
5
6from graphviz import Digraph
7
8from ..cterm.show import CTermShow
9from ..kast.inner import KApply, KRewrite, top_down
10from ..kast.manip import (
11 flatten_label,
12 inline_cell_maps,
13 minimize_rule_like,
14 minimize_term,
15 ml_pred_to_bool,
16 push_down_rewrites,
17 remove_generated_cells,
18 sort_ac_collections,
19)
20from ..kast.outer import KRule
21from ..kast.prelude.k import DOTS
22from ..kast.prelude.ml import mlAnd
23from ..kast.pretty import PrettyPrinter
24from ..utils import add_indent, ensure_dir_path
25from .kcfg import KCFG
26
27if TYPE_CHECKING:
28 from collections.abc import Iterable
29 from pathlib import Path
30 from typing import Final
31
32 from ..cterm import CSubst
33 from ..kast import KInner
34 from ..kast.outer import KDefinition, KFlatModule, KImport, KSentence
35 from .kcfg import NodeIdLike
36
37_LOGGER: Final = logging.getLogger(__name__)
38
39
[docs]
40class NodePrinter:
41 cterm_show: CTermShow
42 full_printer: bool
43 minimize: bool
44
45 def __init__(self, cterm_show: CTermShow, full_printer: bool = False):
46 self.cterm_show = cterm_show
47 self.full_printer = full_printer
48
[docs]
49 def print_node(self, kcfg: KCFG, node: KCFG.Node) -> list[str]:
50 attrs = self.node_attrs(kcfg, node)
51 attr_str = ' (' + ', '.join(attrs) + ')' if attrs else ''
52 node_strs = [f'{node.id}{attr_str}']
53 if self.full_printer:
54 node_strs.extend(' ' + line for line in self.cterm_show.show(node.cterm))
55 return node_strs
56
[docs]
57 def node_attrs(self, kcfg: KCFG, node: KCFG.Node) -> list[str]:
58 attrs = []
59 if kcfg.is_root(node.id):
60 attrs.append('root')
61 if kcfg.is_stuck(node.id):
62 attrs.append('stuck')
63 if kcfg.is_vacuous(node.id):
64 attrs.append('vacuous')
65 if kcfg.is_leaf(node.id):
66 attrs.append('leaf')
67 if kcfg.is_split(node.id):
68 attrs.append('split')
69 attrs.extend(['@' + alias for alias in sorted(kcfg.aliases(node.id))])
70 return attrs
71
72
[docs]
73class KCFGShow:
74 pretty_printer: PrettyPrinter
75 node_printer: NodePrinter
76
77 def __init__(self, defn: KDefinition, node_printer: NodePrinter | None = None):
78 self.pretty_printer = PrettyPrinter(defn)
79 self.node_printer = node_printer if node_printer else NodePrinter(CTermShow(self.pretty_printer.print))
80
[docs]
81 def node_short_info(self, kcfg: KCFG, node: KCFG.Node) -> list[str]:
82 return self.node_printer.print_node(kcfg, node)
83
[docs]
84 @staticmethod
85 def hide_cells(term: KInner, omit_cells: Iterable[str]) -> KInner:
86 def _hide_cells(_k: KInner) -> KInner:
87 if type(_k) == KApply and _k.label.name in omit_cells:
88 return DOTS
89 return _k
90
91 if omit_cells:
92 return top_down(_hide_cells, term)
93 return term
94
[docs]
95 @staticmethod
96 def simplify_config(config: KInner, omit_cells: Iterable[str]) -> KInner:
97 config = inline_cell_maps(config)
98 config = sort_ac_collections(config)
99 config = KCFGShow.hide_cells(config, omit_cells)
100 return config
101
[docs]
102 @staticmethod
103 def make_unique_segments(segments: Iterable[tuple[str, Iterable[str]]]) -> Iterable[tuple[str, Iterable[str]]]:
104 _segments = []
105 used_ids = []
106 for id, seg_lines in segments:
107 suffix = ''
108 counter = 0
109 while f'{id}{suffix}' in used_ids:
110 suffix = f'_{counter}'
111 counter += 1
112 new_id = f'{id}{suffix}'
113 used_ids.append(new_id)
114 _segments.append((f'{new_id}', [l.rstrip() for l in seg_lines]))
115 return _segments
116
[docs]
117 def pretty_segments(self, kcfg: KCFG, minimize: bool = True) -> Iterable[tuple[str, Iterable[str]]]:
118 """Return a pretty version of the KCFG in segments.
119
120 Each segment is a tuple of an identifier and a list of lines to be printed for that segment (Tuple[str, Iterable[str]).
121 The identifier tells you whether that segment is for a given node, edge, or just pretty spacing ('unknown').
122 This is useful for applications which want to pretty print in chunks, so that they can know which printed region corresponds to each node/edge.
123 """
124 processed_nodes: list[KCFG.Node] = []
125 ret_lines: list[tuple[str, list[str]]] = []
126
127 def _multi_line_print(label: str, lines: list[str], default: str = 'None', indent: int = 4) -> list[str]:
128 ret_lines = []
129 if len(lines) == 0:
130 ret_lines.append(f'{label}: {default}')
131 else:
132 ret_lines.append(f'{label}:')
133 ret_lines.extend([f'{indent * " "}{line}' for line in lines])
134 return ret_lines
135
136 def _print_csubst(
137 csubst: CSubst, subst_first: bool = False, indent: int = 4, minimize: bool = False
138 ) -> list[str]:
139 _constraint_strs = [
140 self.pretty_printer.print(ml_pred_to_bool(constraint, abstract_unsafe=True))
141 for constraint in csubst.constraints
142 ]
143 constraint_strs = _multi_line_print('constraint', _constraint_strs, 'true')
144 if len(csubst.subst.minimize()) > 0 and minimize:
145 subst_strs = ['subst: ...']
146 else:
147 _subst_strs = [
148 line
149 for k, v in csubst.subst.minimize().items()
150 for line in f'{k} <- {self.pretty_printer.print(v)}'.split('\n')
151 ]
152 subst_strs = _multi_line_print('subst', _subst_strs, '.Subst')
153 if subst_first:
154 return subst_strs + constraint_strs
155 return constraint_strs + subst_strs
156
157 def _print_node(node: KCFG.Node) -> list[str]:
158 return self.node_short_info(kcfg, node)
159
160 def _print_edge(edge: KCFG.Edge) -> list[str]:
161 if edge.depth == 1:
162 return ['(' + str(edge.depth) + ' step)']
163 else:
164 return ['(' + str(edge.depth) + ' steps)']
165
166 def _print_merged_edge(merged_edge: KCFG.MergedEdge) -> list[str]:
167 res = '('
168 for edge in merged_edge.edges:
169 res += f'{edge.depth}|'
170 res = res[:-1] + ' steps)'
171 return [res] if len(res) < 78 else ['(merged edge)']
172
173 def _print_cover(cover: KCFG.Cover) -> Iterable[str]:
174 return _print_csubst(cover.csubst, subst_first=False, indent=4, minimize=minimize)
175
176 def _print_split_edge(split: KCFG.Split, target_id: int) -> list[str]:
177 return _print_csubst(split.splits[target_id], subst_first=True, indent=4, minimize=minimize)
178
179 def _print_subgraph(indent: str, curr_node: KCFG.Node, prior_on_trace: list[KCFG.Node]) -> None:
180 processed = curr_node in processed_nodes
181 processed_nodes.append(curr_node)
182 successors = list(kcfg.successors(curr_node.id))
183
184 curr_node_strs = _print_node(curr_node)
185
186 ret_node_lines = []
187 suffix = []
188 elbow = '├─'
189 node_indent = '│ '
190 if kcfg.is_root(curr_node.id):
191 elbow = '┌─'
192 elif processed or not successors:
193 elbow = '└─'
194 node_indent = ' '
195 if curr_node in prior_on_trace:
196 suffix = ['(looped back)', '']
197 elif processed and not kcfg.is_leaf(curr_node.id):
198 suffix = ['(continues as previously)', '']
199 else:
200 suffix = ['']
201 ret_node_lines.append(indent + elbow + ' ' + curr_node_strs[0])
202 ret_node_lines.extend(add_indent(indent + node_indent, curr_node_strs[1:]))
203 ret_node_lines.extend(add_indent(indent + ' ', suffix))
204 ret_lines.append((f'node_{curr_node.id}', ret_node_lines))
205
206 if processed or not successors:
207 return
208 successor = successors[0]
209
210 if isinstance(successor, KCFG.MultiEdge):
211 ret_lines.append(('unknown', [f'{indent}┃']))
212 multiedge_label = '1 step' if type(successor) is KCFG.NDBranch else 'branch'
213 multiedge_id = 'ndbranch' if type(successor) is KCFG.NDBranch else 'split'
214 ret_lines.append(('unknown', [f'{indent}┃ ({multiedge_label})']))
215
216 for target in successor.targets[:-1]:
217 if type(successor) is KCFG.Split:
218 ret_edge_lines = _print_split_edge(successor, target.id)
219 ret_edge_lines = [indent + '┣━━┓ ' + ret_edge_lines[0]] + add_indent(
220 indent + '┃ ┃ ', ret_edge_lines[1:]
221 )
222 elif type(successor) is KCFG.NDBranch:
223 ret_edge_lines = [indent + '┣━━┓ ']
224 else:
225 raise AssertionError()
226 ret_edge_lines.append(indent + '┃ │')
227 ret_lines.append((f'{multiedge_id}_{curr_node.id}_{target.id}', ret_edge_lines))
228 _print_subgraph(indent + '┃ ', target, prior_on_trace + [curr_node])
229 target = successor.targets[-1]
230 if type(successor) is KCFG.Split:
231 ret_edge_lines = _print_split_edge(successor, target.id)
232 ret_edge_lines = [indent + '┗━━┓ ' + ret_edge_lines[0]] + add_indent(
233 indent + ' ┃ ', ret_edge_lines[1:]
234 )
235 elif type(successor) is KCFG.NDBranch:
236 ret_edge_lines = [indent + '┗━━┓ ']
237 else:
238 raise AssertionError()
239 ret_edge_lines.append(indent + ' │')
240 ret_lines.append((f'{multiedge_id}_{curr_node.id}_{target.id}', ret_edge_lines))
241 _print_subgraph(indent + ' ', target, prior_on_trace + [curr_node])
242
243 elif isinstance(successor, KCFG.EdgeLike):
244 ret_lines.append(('unknown', [f'{indent}│']))
245
246 if type(successor) is KCFG.Edge:
247 ret_edge_lines = []
248 ret_edge_lines.extend(add_indent(indent + '│ ', _print_edge(successor)))
249 ret_lines.append((f'edge_{successor.source.id}_{successor.target.id}', ret_edge_lines))
250
251 elif type(successor) is KCFG.MergedEdge:
252 ret_edge_lines = []
253 ret_edge_lines.extend(add_indent(indent + '│ ', _print_merged_edge(successor)))
254 ret_lines.append((f'merged_edge_{successor.source.id}_{successor.target.id}', ret_edge_lines))
255
256 elif type(successor) is KCFG.Cover:
257 ret_edge_lines = []
258 ret_edge_lines.extend(add_indent(indent + '┊ ', _print_cover(successor)))
259 ret_lines.append((f'cover_{successor.source.id}_{successor.target.id}', ret_edge_lines))
260
261 _print_subgraph(indent, successor.target, prior_on_trace + [curr_node])
262
263 def _sorted_init_nodes() -> tuple[list[KCFG.Node], list[KCFG.Node]]:
264 sorted_init_nodes = sorted(node for node in kcfg.nodes if node not in processed_nodes)
265 init_nodes = []
266 init_leaf_nodes = []
267 remaining_nodes = []
268 for node in sorted_init_nodes:
269 if kcfg.is_root(node.id):
270 if kcfg.is_leaf(node.id):
271 init_leaf_nodes.append(node)
272 else:
273 init_nodes.append(node)
274 else:
275 remaining_nodes.append(node)
276 return (init_nodes + init_leaf_nodes, remaining_nodes)
277
278 init, _ = _sorted_init_nodes()
279 while init:
280 ret_lines.append(('unknown', ['']))
281 _print_subgraph('', init[0], [])
282 init, _ = _sorted_init_nodes()
283 _, remaining = _sorted_init_nodes()
284 if remaining:
285 ret_lines.append(('unknown', ['', 'Remaining Nodes:']))
286 for node in remaining:
287 ret_node_lines = [''] + _print_node(node)
288 ret_lines.append((f'node_{node.id}', ret_node_lines))
289
290 return KCFGShow.make_unique_segments(ret_lines)
291
[docs]
292 def pretty(
293 self,
294 kcfg: KCFG,
295 minimize: bool = True,
296 ) -> Iterable[str]:
297 return (line for _, seg_lines in self.pretty_segments(kcfg, minimize=minimize) for line in seg_lines)
298
[docs]
299 def to_module(
300 self,
301 cfg: KCFG,
302 module_name: str | None = None,
303 omit_cells: Iterable[str] = (),
304 parseable_output: bool = True,
305 defunc_with: KDefinition | None = None,
306 imports: Iterable[KImport] = (),
307 ) -> KFlatModule:
308 def _process_sentence(sent: KSentence) -> KSentence:
309 if type(sent) is KRule:
310 sent = sent.let(body=KCFGShow.hide_cells(sent.body, omit_cells))
311 if parseable_output:
312 sent = sent.let(body=remove_generated_cells(sent.body))
313 sent = minimize_rule_like(sent)
314 return sent
315
316 module = cfg.to_module(module_name, defunc_with=defunc_with, imports=imports)
317 return module.let(sentences=[_process_sentence(sent) for sent in module.sentences])
318
[docs]
319 def show(
320 self,
321 cfg: KCFG,
322 nodes: Iterable[NodeIdLike] = (),
323 node_deltas: Iterable[tuple[NodeIdLike, NodeIdLike]] = (),
324 to_module: bool = False,
325 minimize: bool = True,
326 omit_cells: Iterable[str] = (),
327 module_name: str | None = None,
328 ) -> list[str]:
329 res_lines: list[str] = []
330 res_lines += self.pretty(cfg, minimize=minimize)
331
332 nodes_printed = False
333
334 for node_id in nodes:
335 nodes_printed = True
336 kast = cfg.node(node_id).cterm.kast
337 kast = KCFGShow.hide_cells(kast, omit_cells)
338 if minimize:
339 kast = minimize_term(kast)
340 res_lines.append('')
341 res_lines.append('')
342 res_lines.append(f'Node {node_id}:')
343 res_lines.append('')
344 res_lines.append(self.pretty_printer.print(kast))
345 res_lines.append('')
346
347 for node_id_1, node_id_2 in node_deltas:
348 nodes_printed = True
349 config_1 = KCFGShow.simplify_config(cfg.node(node_id_1).cterm.config, omit_cells)
350 config_2 = KCFGShow.simplify_config(cfg.node(node_id_2).cterm.config, omit_cells)
351 config_delta = push_down_rewrites(KRewrite(config_1, config_2))
352 if minimize:
353 config_delta = minimize_term(config_delta)
354 res_lines.append('')
355 res_lines.append('')
356 res_lines.append(f'State Delta {node_id_1} => {node_id_2}:')
357 res_lines.append('')
358 res_lines.append(self.pretty_printer.print(config_delta))
359 res_lines.append('')
360
361 if not (nodes_printed):
362 res_lines.append('')
363 res_lines.append('')
364 res_lines.append('')
365
366 if to_module:
367 module = self.to_module(cfg, module_name, omit_cells=omit_cells)
368 res_lines.append(self.pretty_printer.print(module))
369
370 return res_lines
371
[docs]
372 def dot(self, kcfg: KCFG) -> Digraph:
373 def _short_label(label: str) -> str:
374 return '\n'.join(
375 [
376 label_line if len(label_line) < 100 else (label_line[0:100] + ' ...')
377 for label_line in label.split('\n')
378 ]
379 )
380
381 graph = Digraph()
382
383 for node in kcfg.nodes:
384 label = '\n'.join(self.node_short_info(kcfg, node))
385 class_attrs = ' '.join(self.node_printer.node_attrs(kcfg, node))
386 attrs = {'class': class_attrs} if class_attrs else {}
387 graph.node(name=node.id, label=label, **attrs)
388
389 for edge in kcfg.edges():
390 depth = edge.depth
391 label = f'{depth} steps'
392 graph.edge(tail_name=edge.source.id, head_name=edge.target.id, label=f' {label} ')
393
394 for cover in kcfg.covers():
395 label = ', '.join(
396 f'{k} |-> {self.pretty_printer.print(v)}' for k, v in cover.csubst.subst.minimize().items()
397 )
398 label = _short_label(label)
399 attrs = {'class': 'abstraction', 'style': 'dashed'}
400 graph.edge(tail_name=cover.source.id, head_name=cover.target.id, label=f' {label} ', **attrs)
401
402 for split in kcfg.splits():
403 for target_id, csubst in split.splits.items():
404 label = '\n#And'.join(
405 f'{self.pretty_printer.print(v)}' for v in split.source.cterm.constraints + csubst.constraints
406 )
407 graph.edge(tail_name=split.source.id, head_name=target_id, label=f' {label} ')
408
409 for ndbranch in kcfg.ndbranches():
410 for target in ndbranch.target_ids:
411 label = '1 step'
412 graph.edge(tail_name=ndbranch.source.id, head_name=target, label=f' {label} ')
413
414 return graph
415
[docs]
416 def dump(self, cfgid: str, cfg: KCFG, dump_dir: Path, dot: bool = False) -> None:
417 ensure_dir_path(dump_dir)
418
419 cfg_file = dump_dir / f'{cfgid}.json'
420 cfg_file.write_text(cfg.to_json())
421 _LOGGER.info(f'Wrote CFG file {cfgid}: {cfg_file}')
422
423 if dot:
424 cfg_dot = self.dot(cfg)
425 dot_file = dump_dir / f'{cfgid}.dot'
426 dot_file.write_text(cfg_dot.source)
427 _LOGGER.info(f'Wrote DOT file {cfgid}: {dot_file}')
428
429 nodes_dir = dump_dir / 'nodes'
430 ensure_dir_path(nodes_dir)
431 for node in cfg.nodes:
432 node_file = nodes_dir / f'config_{node.id}.txt'
433 node_minimized_file = nodes_dir / f'config_minimized_{node.id}.txt'
434 node_constraint_file = nodes_dir / f'constraint_{node.id}.txt'
435
436 config = node.cterm.config
437 if not node_file.exists():
438 node_file.write_text(self.pretty_printer.print(config))
439 _LOGGER.info(f'Wrote node file {cfgid}: {node_file}')
440 config = minimize_term(config)
441 if not node_minimized_file.exists():
442 node_minimized_file.write_text(self.pretty_printer.print(config))
443 _LOGGER.info(f'Wrote node file {cfgid}: {node_minimized_file}')
444 if not node_constraint_file.exists():
445 constraint = mlAnd(node.cterm.constraints)
446 node_constraint_file.write_text(self.pretty_printer.print(constraint))
447 _LOGGER.info(f'Wrote node file {cfgid}: {node_constraint_file}')
448
449 edges_dir = dump_dir / 'edges'
450 ensure_dir_path(edges_dir)
451 for edge in cfg.edges():
452 edge_file = edges_dir / f'config_{edge.source.id}_{edge.target.id}.txt'
453 edge_minimized_file = edges_dir / f'config_minimized_{edge.source.id}_{edge.target.id}.txt'
454
455 config = push_down_rewrites(KRewrite(edge.source.cterm.config, edge.target.cterm.config))
456 if not edge_file.exists():
457 edge_file.write_text(self.pretty_printer.print(config))
458 _LOGGER.info(f'Wrote edge file {cfgid}: {edge_file}')
459 config = minimize_term(config)
460 if not edge_minimized_file.exists():
461 edge_minimized_file.write_text(self.pretty_printer.print(config))
462 _LOGGER.info(f'Wrote edge file {cfgid}: {edge_minimized_file}')
463
464 covers_dir = dump_dir / 'covers'
465 ensure_dir_path(covers_dir)
466 for cover in cfg.covers():
467 cover_file = covers_dir / f'config_{cover.source.id}_{cover.target.id}.txt'
468 cover_constraint_file = covers_dir / f'constraint_{cover.source.id}_{cover.target.id}.txt'
469
470 subst_equalities = flatten_label(
471 '#And', cover.csubst.pred(sort_with=self.pretty_printer.definition, constraints=False)
472 )
473
474 if not cover_file.exists():
475 cover_file.write_text('\n'.join(self.pretty_printer.print(se) for se in subst_equalities))
476 _LOGGER.info(f'Wrote cover file {cfgid}: {cover_file}')
477 if not cover_constraint_file.exists():
478 cover_constraint_file.write_text(self.pretty_printer.print(cover.csubst.constraint))
479 _LOGGER.info(f'Wrote cover file {cfgid}: {cover_constraint_file}')