1from __future__ import annotations
2
3import logging
4from typing import TYPE_CHECKING
5
6from graphviz import Digraph
7
8from ..cterm.show import CTermShow
9from ..kast.inner import KApply, KRewrite, top_down
10from ..kast.manip import (
11 flatten_label,
12 inline_cell_maps,
13 minimize_rule_like,
14 minimize_term,
15 ml_pred_to_bool,
16 push_down_rewrites,
17 remove_generated_cells,
18 sort_ac_collections,
19)
20from ..kast.outer import KRule
21from ..kast.prelude.k import DOTS
22from ..kast.prelude.ml import mlAnd
23from ..kast.pretty import PrettyPrinter
24from ..utils import add_indent, ensure_dir_path
25from .kcfg import KCFG
26
27if TYPE_CHECKING:
28 from collections.abc import Iterable
29 from pathlib import Path
30 from typing import Final
31
32 from ..cterm import CSubst
33 from ..kast import KInner
34 from ..kast.outer import KDefinition, KFlatModule, KImport, KSentence
35 from .kcfg import NodeIdLike
36
37_LOGGER: Final = logging.getLogger(__name__)
38
39
[docs]
40class NodePrinter:
41 cterm_show: CTermShow
42 full_printer: bool
43 minimize: bool
44
45 def __init__(self, cterm_show: CTermShow, full_printer: bool = False):
46 self.cterm_show = cterm_show
47 self.full_printer = full_printer
48
[docs]
49 def print_node(self, kcfg: KCFG, node: KCFG.Node) -> list[str]:
50 attrs = self.node_attrs(kcfg, node)
51 attr_str = ' (' + ', '.join(attrs) + ')' if attrs else ''
52 node_strs = [f'{node.id}{attr_str}']
53 if self.full_printer:
54 node_strs.extend(' ' + line for line in self.cterm_show.show(node.cterm))
55 return node_strs
56
[docs]
57 def node_attrs(self, kcfg: KCFG, node: KCFG.Node) -> list[str]:
58 attrs = []
59 if kcfg.is_root(node.id):
60 attrs.append('root')
61 if kcfg.is_stuck(node.id):
62 attrs.append('stuck')
63 if kcfg.is_vacuous(node.id):
64 attrs.append('vacuous')
65 if kcfg.is_leaf(node.id):
66 attrs.append('leaf')
67 if kcfg.is_split(node.id):
68 attrs.append('split')
69 attrs.extend(['@' + alias for alias in sorted(kcfg.aliases(node.id))])
70 return attrs
71
72
[docs]
73class KCFGShow:
74 pretty_printer: PrettyPrinter
75 node_printer: NodePrinter
76
77 def __init__(self, defn: KDefinition, node_printer: NodePrinter | None = None):
78 self.pretty_printer = PrettyPrinter(defn)
79 self.node_printer = node_printer if node_printer else NodePrinter(CTermShow(self.pretty_printer.print))
80
[docs]
81 def node_short_info(self, kcfg: KCFG, node: KCFG.Node) -> list[str]:
82 return self.node_printer.print_node(kcfg, node)
83
[docs]
84 @staticmethod
85 def hide_cells(term: KInner, omit_cells: Iterable[str]) -> KInner:
86 def _hide_cells(_k: KInner) -> KInner:
87 if type(_k) == KApply and _k.label.name in omit_cells:
88 return DOTS
89 return _k
90
91 if omit_cells:
92 return top_down(_hide_cells, term)
93 return term
94
[docs]
95 @staticmethod
96 def simplify_config(config: KInner, omit_cells: Iterable[str]) -> KInner:
97 config = inline_cell_maps(config)
98 config = sort_ac_collections(config)
99 config = KCFGShow.hide_cells(config, omit_cells)
100 return config
101
[docs]
102 @staticmethod
103 def make_unique_segments(segments: Iterable[tuple[str, Iterable[str]]]) -> Iterable[tuple[str, Iterable[str]]]:
104 _segments = []
105 used_ids = []
106 for id, seg_lines in segments:
107 suffix = ''
108 counter = 0
109 while f'{id}{suffix}' in used_ids:
110 suffix = f'_{counter}'
111 counter += 1
112 new_id = f'{id}{suffix}'
113 used_ids.append(new_id)
114 _segments.append((f'{new_id}', [l.rstrip() for l in seg_lines]))
115 return _segments
116
[docs]
117 def pretty_segments(self, kcfg: KCFG, minimize: bool = True) -> Iterable[tuple[str, Iterable[str]]]:
118 """Return a pretty version of the KCFG in segments.
119
120 Each segment is a tuple of an identifier and a list of lines to be printed for that segment (Tuple[str, Iterable[str]).
121 The identifier tells you whether that segment is for a given node, edge, or just pretty spacing ('unknown').
122 This is useful for applications which want to pretty print in chunks, so that they can know which printed region corresponds to each node/edge.
123 """
124 processed_nodes: list[KCFG.Node] = []
125 ret_lines: list[tuple[str, list[str]]] = []
126
127 def _multi_line_print(label: str, lines: list[str], default: str = 'None', indent: int = 4) -> list[str]:
128 ret_lines = []
129 if len(lines) == 0:
130 ret_lines.append(f'{label}: {default}')
131 else:
132 ret_lines.append(f'{label}:')
133 ret_lines.extend([f'{indent * " "}{line}' for line in lines])
134 return ret_lines
135
136 def _print_csubst(
137 csubst: CSubst, subst_first: bool = False, indent: int = 4, minimize: bool = False
138 ) -> list[str]:
139 _constraint_strs = [
140 self.pretty_printer.print(ml_pred_to_bool(constraint, unsafe=True)) for constraint in csubst.constraints
141 ]
142 constraint_strs = _multi_line_print('constraint', _constraint_strs, 'true')
143 if len(csubst.subst.minimize()) > 0 and minimize:
144 subst_strs = ['subst: ...']
145 else:
146 _subst_strs = [
147 line
148 for k, v in csubst.subst.minimize().items()
149 for line in f'{k} <- {self.pretty_printer.print(v)}'.split('\n')
150 ]
151 subst_strs = _multi_line_print('subst', _subst_strs, '.Subst')
152 if subst_first:
153 return subst_strs + constraint_strs
154 return constraint_strs + subst_strs
155
156 def _print_node(node: KCFG.Node) -> list[str]:
157 return self.node_short_info(kcfg, node)
158
159 def _print_edge(edge: KCFG.Edge) -> list[str]:
160 if edge.depth == 1:
161 return ['(' + str(edge.depth) + ' step)']
162 else:
163 return ['(' + str(edge.depth) + ' steps)']
164
165 def _print_merged_edge(merged_edge: KCFG.MergedEdge) -> list[str]:
166 res = '('
167 for edge in merged_edge.edges:
168 res += f'{edge.depth}|'
169 res = res[:-1] + ' steps)'
170 return [res] if len(res) < 78 else ['(merged edge)']
171
172 def _print_cover(cover: KCFG.Cover) -> Iterable[str]:
173 return _print_csubst(cover.csubst, subst_first=False, indent=4, minimize=minimize)
174
175 def _print_split_edge(split: KCFG.Split, target_id: int) -> list[str]:
176 return _print_csubst(split.splits[target_id], subst_first=True, indent=4, minimize=minimize)
177
178 def _print_subgraph(indent: str, curr_node: KCFG.Node, prior_on_trace: list[KCFG.Node]) -> None:
179 processed = curr_node in processed_nodes
180 processed_nodes.append(curr_node)
181 successors = list(kcfg.successors(curr_node.id))
182
183 curr_node_strs = _print_node(curr_node)
184
185 ret_node_lines = []
186 suffix = []
187 elbow = '├─'
188 node_indent = '│ '
189 if kcfg.is_root(curr_node.id):
190 elbow = '┌─'
191 elif processed or not successors:
192 elbow = '└─'
193 node_indent = ' '
194 if curr_node in prior_on_trace:
195 suffix = ['(looped back)', '']
196 elif processed and not kcfg.is_leaf(curr_node.id):
197 suffix = ['(continues as previously)', '']
198 else:
199 suffix = ['']
200 ret_node_lines.append(indent + elbow + ' ' + curr_node_strs[0])
201 ret_node_lines.extend(add_indent(indent + node_indent, curr_node_strs[1:]))
202 ret_node_lines.extend(add_indent(indent + ' ', suffix))
203 ret_lines.append((f'node_{curr_node.id}', ret_node_lines))
204
205 if processed or not successors:
206 return
207 successor = successors[0]
208
209 if isinstance(successor, KCFG.MultiEdge):
210 ret_lines.append(('unknown', [f'{indent}┃']))
211 multiedge_label = '1 step' if type(successor) is KCFG.NDBranch else 'branch'
212 multiedge_id = 'ndbranch' if type(successor) is KCFG.NDBranch else 'split'
213 ret_lines.append(('unknown', [f'{indent}┃ ({multiedge_label})']))
214
215 for target in successor.targets[:-1]:
216 if type(successor) is KCFG.Split:
217 ret_edge_lines = _print_split_edge(successor, target.id)
218 ret_edge_lines = [indent + '┣━━┓ ' + ret_edge_lines[0]] + add_indent(
219 indent + '┃ ┃ ', ret_edge_lines[1:]
220 )
221 elif type(successor) is KCFG.NDBranch:
222 ret_edge_lines = [indent + '┣━━┓ ']
223 else:
224 raise AssertionError()
225 ret_edge_lines.append(indent + '┃ │')
226 ret_lines.append((f'{multiedge_id}_{curr_node.id}_{target.id}', ret_edge_lines))
227 _print_subgraph(indent + '┃ ', target, prior_on_trace + [curr_node])
228 target = successor.targets[-1]
229 if type(successor) is KCFG.Split:
230 ret_edge_lines = _print_split_edge(successor, target.id)
231 ret_edge_lines = [indent + '┗━━┓ ' + ret_edge_lines[0]] + add_indent(
232 indent + ' ┃ ', ret_edge_lines[1:]
233 )
234 elif type(successor) is KCFG.NDBranch:
235 ret_edge_lines = [indent + '┗━━┓ ']
236 else:
237 raise AssertionError()
238 ret_edge_lines.append(indent + ' │')
239 ret_lines.append((f'{multiedge_id}_{curr_node.id}_{target.id}', ret_edge_lines))
240 _print_subgraph(indent + ' ', target, prior_on_trace + [curr_node])
241
242 elif isinstance(successor, KCFG.EdgeLike):
243 ret_lines.append(('unknown', [f'{indent}│']))
244
245 if type(successor) is KCFG.Edge:
246 ret_edge_lines = []
247 ret_edge_lines.extend(add_indent(indent + '│ ', _print_edge(successor)))
248 ret_lines.append((f'edge_{successor.source.id}_{successor.target.id}', ret_edge_lines))
249
250 elif type(successor) is KCFG.MergedEdge:
251 ret_edge_lines = []
252 ret_edge_lines.extend(add_indent(indent + '│ ', _print_merged_edge(successor)))
253 ret_lines.append((f'merged_edge_{successor.source.id}_{successor.target.id}', ret_edge_lines))
254
255 elif type(successor) is KCFG.Cover:
256 ret_edge_lines = []
257 ret_edge_lines.extend(add_indent(indent + '┊ ', _print_cover(successor)))
258 ret_lines.append((f'cover_{successor.source.id}_{successor.target.id}', ret_edge_lines))
259
260 _print_subgraph(indent, successor.target, prior_on_trace + [curr_node])
261
262 def _sorted_init_nodes() -> tuple[list[KCFG.Node], list[KCFG.Node]]:
263 sorted_init_nodes = sorted(node for node in kcfg.nodes if node not in processed_nodes)
264 init_nodes = []
265 init_leaf_nodes = []
266 remaining_nodes = []
267 for node in sorted_init_nodes:
268 if kcfg.is_root(node.id):
269 if kcfg.is_leaf(node.id):
270 init_leaf_nodes.append(node)
271 else:
272 init_nodes.append(node)
273 else:
274 remaining_nodes.append(node)
275 return (init_nodes + init_leaf_nodes, remaining_nodes)
276
277 init, _ = _sorted_init_nodes()
278 while init:
279 ret_lines.append(('unknown', ['']))
280 _print_subgraph('', init[0], [])
281 init, _ = _sorted_init_nodes()
282 _, remaining = _sorted_init_nodes()
283 if remaining:
284 ret_lines.append(('unknown', ['', 'Remaining Nodes:']))
285 for node in remaining:
286 ret_node_lines = [''] + _print_node(node)
287 ret_lines.append((f'node_{node.id}', ret_node_lines))
288
289 return KCFGShow.make_unique_segments(ret_lines)
290
[docs]
291 def pretty(
292 self,
293 kcfg: KCFG,
294 minimize: bool = True,
295 ) -> Iterable[str]:
296 return (line for _, seg_lines in self.pretty_segments(kcfg, minimize=minimize) for line in seg_lines)
297
[docs]
298 def to_module(
299 self,
300 cfg: KCFG,
301 module_name: str | None = None,
302 omit_cells: Iterable[str] = (),
303 parseable_output: bool = True,
304 defunc_with: KDefinition | None = None,
305 imports: Iterable[KImport] = (),
306 ) -> KFlatModule:
307 def _process_sentence(sent: KSentence) -> KSentence:
308 if type(sent) is KRule:
309 sent = sent.let(body=KCFGShow.hide_cells(sent.body, omit_cells))
310 if parseable_output:
311 sent = sent.let(body=remove_generated_cells(sent.body))
312 sent = minimize_rule_like(sent)
313 return sent
314
315 module = cfg.to_module(module_name, defunc_with=defunc_with, imports=imports)
316 return module.let(sentences=[_process_sentence(sent) for sent in module.sentences])
317
[docs]
318 def show(
319 self,
320 cfg: KCFG,
321 nodes: Iterable[NodeIdLike] = (),
322 node_deltas: Iterable[tuple[NodeIdLike, NodeIdLike]] = (),
323 to_module: bool = False,
324 minimize: bool = True,
325 omit_cells: Iterable[str] = (),
326 module_name: str | None = None,
327 ) -> list[str]:
328 res_lines: list[str] = []
329 res_lines += self.pretty(cfg, minimize=minimize)
330
331 nodes_printed = False
332
333 for node_id in nodes:
334 nodes_printed = True
335 kast = cfg.node(node_id).cterm.kast
336 kast = KCFGShow.hide_cells(kast, omit_cells)
337 if minimize:
338 kast = minimize_term(kast)
339 res_lines.append('')
340 res_lines.append('')
341 res_lines.append(f'Node {node_id}:')
342 res_lines.append('')
343 res_lines.append(self.pretty_printer.print(kast))
344 res_lines.append('')
345
346 for node_id_1, node_id_2 in node_deltas:
347 nodes_printed = True
348 config_1 = KCFGShow.simplify_config(cfg.node(node_id_1).cterm.config, omit_cells)
349 config_2 = KCFGShow.simplify_config(cfg.node(node_id_2).cterm.config, omit_cells)
350 config_delta = push_down_rewrites(KRewrite(config_1, config_2))
351 if minimize:
352 config_delta = minimize_term(config_delta)
353 res_lines.append('')
354 res_lines.append('')
355 res_lines.append(f'State Delta {node_id_1} => {node_id_2}:')
356 res_lines.append('')
357 res_lines.append(self.pretty_printer.print(config_delta))
358 res_lines.append('')
359
360 if not (nodes_printed):
361 res_lines.append('')
362 res_lines.append('')
363 res_lines.append('')
364
365 if to_module:
366 module = self.to_module(cfg, module_name, omit_cells=omit_cells)
367 res_lines.append(self.pretty_printer.print(module))
368
369 return res_lines
370
[docs]
371 def dot(self, kcfg: KCFG) -> Digraph:
372 def _short_label(label: str) -> str:
373 return '\n'.join(
374 [
375 label_line if len(label_line) < 100 else (label_line[0:100] + ' ...')
376 for label_line in label.split('\n')
377 ]
378 )
379
380 graph = Digraph()
381
382 for node in kcfg.nodes:
383 label = '\n'.join(self.node_short_info(kcfg, node))
384 class_attrs = ' '.join(self.node_printer.node_attrs(kcfg, node))
385 attrs = {'class': class_attrs} if class_attrs else {}
386 graph.node(name=node.id, label=label, **attrs)
387
388 for edge in kcfg.edges():
389 depth = edge.depth
390 label = f'{depth} steps'
391 graph.edge(tail_name=edge.source.id, head_name=edge.target.id, label=f' {label} ')
392
393 for cover in kcfg.covers():
394 label = ', '.join(
395 f'{k} |-> {self.pretty_printer.print(v)}' for k, v in cover.csubst.subst.minimize().items()
396 )
397 label = _short_label(label)
398 attrs = {'class': 'abstraction', 'style': 'dashed'}
399 graph.edge(tail_name=cover.source.id, head_name=cover.target.id, label=f' {label} ', **attrs)
400
401 for split in kcfg.splits():
402 for target_id, csubst in split.splits.items():
403 label = '\n#And'.join(
404 f'{self.pretty_printer.print(v)}' for v in split.source.cterm.constraints + csubst.constraints
405 )
406 graph.edge(tail_name=split.source.id, head_name=target_id, label=f' {label} ')
407
408 for ndbranch in kcfg.ndbranches():
409 for target in ndbranch.target_ids:
410 label = '1 step'
411 graph.edge(tail_name=ndbranch.source.id, head_name=target, label=f' {label} ')
412
413 return graph
414
[docs]
415 def dump(self, cfgid: str, cfg: KCFG, dump_dir: Path, dot: bool = False) -> None:
416 ensure_dir_path(dump_dir)
417
418 cfg_file = dump_dir / f'{cfgid}.json'
419 cfg_file.write_text(cfg.to_json())
420 _LOGGER.info(f'Wrote CFG file {cfgid}: {cfg_file}')
421
422 if dot:
423 cfg_dot = self.dot(cfg)
424 dot_file = dump_dir / f'{cfgid}.dot'
425 dot_file.write_text(cfg_dot.source)
426 _LOGGER.info(f'Wrote DOT file {cfgid}: {dot_file}')
427
428 nodes_dir = dump_dir / 'nodes'
429 ensure_dir_path(nodes_dir)
430 for node in cfg.nodes:
431 node_file = nodes_dir / f'config_{node.id}.txt'
432 node_minimized_file = nodes_dir / f'config_minimized_{node.id}.txt'
433 node_constraint_file = nodes_dir / f'constraint_{node.id}.txt'
434
435 config = node.cterm.config
436 if not node_file.exists():
437 node_file.write_text(self.pretty_printer.print(config))
438 _LOGGER.info(f'Wrote node file {cfgid}: {node_file}')
439 config = minimize_term(config)
440 if not node_minimized_file.exists():
441 node_minimized_file.write_text(self.pretty_printer.print(config))
442 _LOGGER.info(f'Wrote node file {cfgid}: {node_minimized_file}')
443 if not node_constraint_file.exists():
444 constraint = mlAnd(node.cterm.constraints)
445 node_constraint_file.write_text(self.pretty_printer.print(constraint))
446 _LOGGER.info(f'Wrote node file {cfgid}: {node_constraint_file}')
447
448 edges_dir = dump_dir / 'edges'
449 ensure_dir_path(edges_dir)
450 for edge in cfg.edges():
451 edge_file = edges_dir / f'config_{edge.source.id}_{edge.target.id}.txt'
452 edge_minimized_file = edges_dir / f'config_minimized_{edge.source.id}_{edge.target.id}.txt'
453
454 config = push_down_rewrites(KRewrite(edge.source.cterm.config, edge.target.cterm.config))
455 if not edge_file.exists():
456 edge_file.write_text(self.pretty_printer.print(config))
457 _LOGGER.info(f'Wrote edge file {cfgid}: {edge_file}')
458 config = minimize_term(config)
459 if not edge_minimized_file.exists():
460 edge_minimized_file.write_text(self.pretty_printer.print(config))
461 _LOGGER.info(f'Wrote edge file {cfgid}: {edge_minimized_file}')
462
463 covers_dir = dump_dir / 'covers'
464 ensure_dir_path(covers_dir)
465 for cover in cfg.covers():
466 cover_file = covers_dir / f'config_{cover.source.id}_{cover.target.id}.txt'
467 cover_constraint_file = covers_dir / f'constraint_{cover.source.id}_{cover.target.id}.txt'
468
469 subst_equalities = flatten_label(
470 '#And', cover.csubst.pred(sort_with=self.pretty_printer.definition, constraints=False)
471 )
472
473 if not cover_file.exists():
474 cover_file.write_text('\n'.join(self.pretty_printer.print(se) for se in subst_equalities))
475 _LOGGER.info(f'Wrote cover file {cfgid}: {cover_file}')
476 if not cover_constraint_file.exists():
477 cover_constraint_file.write_text(self.pretty_printer.print(cover.csubst.constraint))
478 _LOGGER.info(f'Wrote cover file {cfgid}: {cover_constraint_file}')