1from __future__ import annotations
2
3import logging
4from typing import TYPE_CHECKING
5
6from graphviz import Digraph
7
8from ..kast.inner import KApply, KRewrite, top_down
9from ..kast.manip import (
10 flatten_label,
11 inline_cell_maps,
12 minimize_rule_like,
13 minimize_term,
14 ml_pred_to_bool,
15 push_down_rewrites,
16 remove_generated_cells,
17 sort_ac_collections,
18)
19from ..kast.outer import KRule
20from ..prelude.k import DOTS
21from ..prelude.ml import mlAnd
22from ..utils import add_indent, ensure_dir_path
23from .kcfg import KCFG
24
25if TYPE_CHECKING:
26 from collections.abc import Iterable
27 from pathlib import Path
28 from typing import Final
29
30 from ..cterm import CSubst
31 from ..kast import KInner
32 from ..kast.outer import KDefinition, KFlatModule, KImport, KSentence
33 from ..ktool.kprint import KPrint
34 from .kcfg import NodeIdLike
35
36_LOGGER: Final = logging.getLogger(__name__)
37
38
[docs]
39class NodePrinter:
40 kprint: KPrint
41 full_printer: bool
42 minimize: bool
43
44 def __init__(self, kprint: KPrint, full_printer: bool = False, minimize: bool = False):
45 self.kprint = kprint
46 self.full_printer = full_printer
47 self.minimize = minimize
48
[docs]
49 def print_node(self, kcfg: KCFG, node: KCFG.Node) -> list[str]:
50 attrs = self.node_attrs(kcfg, node)
51 attr_str = ' (' + ', '.join(attrs) + ')' if attrs else ''
52 node_strs = [f'{node.id}{attr_str}']
53 if self.full_printer:
54 kast = node.cterm.kast
55 if self.minimize:
56 kast = minimize_term(kast)
57 node_strs.extend(' ' + line for line in self.kprint.pretty_print(kast).split('\n'))
58 return node_strs
59
[docs]
60 def node_attrs(self, kcfg: KCFG, node: KCFG.Node) -> list[str]:
61 attrs = []
62 if kcfg.is_root(node.id):
63 attrs.append('root')
64 if kcfg.is_stuck(node.id):
65 attrs.append('stuck')
66 if kcfg.is_vacuous(node.id):
67 attrs.append('vacuous')
68 if kcfg.is_leaf(node.id):
69 attrs.append('leaf')
70 if kcfg.is_split(node.id):
71 attrs.append('split')
72 attrs.extend(['@' + alias for alias in sorted(kcfg.aliases(node.id))])
73 return attrs
74
75
[docs]
76class KCFGShow:
77 kprint: KPrint
78 node_printer: NodePrinter
79
80 def __init__(self, kprint: KPrint, node_printer: NodePrinter | None = None):
81 self.kprint = kprint
82 self.node_printer = node_printer if node_printer is not None else NodePrinter(kprint)
83
[docs]
84 def node_short_info(self, kcfg: KCFG, node: KCFG.Node) -> list[str]:
85 return self.node_printer.print_node(kcfg, node)
86
[docs]
87 @staticmethod
88 def hide_cells(term: KInner, omit_cells: Iterable[str]) -> KInner:
89 def _hide_cells(_k: KInner) -> KInner:
90 if type(_k) == KApply and _k.label.name in omit_cells:
91 return DOTS
92 return _k
93
94 if omit_cells:
95 return top_down(_hide_cells, term)
96 return term
97
[docs]
98 @staticmethod
99 def simplify_config(config: KInner, omit_cells: Iterable[str]) -> KInner:
100 config = inline_cell_maps(config)
101 config = sort_ac_collections(config)
102 config = KCFGShow.hide_cells(config, omit_cells)
103 return config
104
[docs]
105 @staticmethod
106 def make_unique_segments(segments: Iterable[tuple[str, Iterable[str]]]) -> Iterable[tuple[str, Iterable[str]]]:
107 _segments = []
108 used_ids = []
109 for id, seg_lines in segments:
110 suffix = ''
111 counter = 0
112 while f'{id}{suffix}' in used_ids:
113 suffix = f'_{counter}'
114 counter += 1
115 new_id = f'{id}{suffix}'
116 used_ids.append(new_id)
117 _segments.append((f'{new_id}', [l.rstrip() for l in seg_lines]))
118 return _segments
119
[docs]
120 def pretty_segments(self, kcfg: KCFG, minimize: bool = True) -> Iterable[tuple[str, Iterable[str]]]:
121 """Return a pretty version of the KCFG in segments.
122
123 Each segment is a tuple of an identifier and a list of lines to be printed for that segment (Tuple[str, Iterable[str]).
124 The identifier tells you whether that segment is for a given node, edge, or just pretty spacing ('unknown').
125 This is useful for applications which want to pretty print in chunks, so that they can know which printed region corresponds to each node/edge.
126 """
127 processed_nodes: list[KCFG.Node] = []
128 ret_lines: list[tuple[str, list[str]]] = []
129
130 def _multi_line_print(
131 label: str, lines: list[str], default: str = 'None', indent: int = 4, max_width: int | None = None
132 ) -> list[str]:
133 ret_lines = []
134 if len(lines) == 0:
135 ret_lines.append(f'{label}: {default}')
136 else:
137 ret_lines.append(f'{label}:')
138 ret_lines.extend([f'{indent * " "}{line}' for line in lines])
139 if max_width is not None:
140 ret_lines = [
141 ret_line if len(ret_line) <= max_width else ret_line[0:max_width] + '...' for ret_line in ret_lines
142 ]
143 return ret_lines
144
145 def _print_csubst(
146 csubst: CSubst, subst_first: bool = False, indent: int = 4, minimize: bool = False
147 ) -> list[str]:
148 max_width = 78 if minimize else None
149 _constraint_strs = [
150 self.kprint.pretty_print(ml_pred_to_bool(constraint, unsafe=True)) for constraint in csubst.constraints
151 ]
152 constraint_strs = _multi_line_print('constraint', _constraint_strs, 'true')
153 if len(csubst.subst.minimize()) > 0 and minimize:
154 subst_strs = ['subst: ...']
155 else:
156 _subst_strs = [
157 line
158 for k, v in csubst.subst.minimize().items()
159 for line in f'{k} <- {self.kprint.pretty_print(v)}'.split('\n')
160 ]
161 subst_strs = _multi_line_print('subst', _subst_strs, '.Subst', max_width=max_width)
162 if subst_first:
163 return subst_strs + constraint_strs
164 return constraint_strs + subst_strs
165
166 def _print_node(node: KCFG.Node) -> list[str]:
167 return self.node_short_info(kcfg, node)
168
169 def _print_edge(edge: KCFG.Edge) -> list[str]:
170 if edge.depth == 1:
171 return ['(' + str(edge.depth) + ' step)']
172 else:
173 return ['(' + str(edge.depth) + ' steps)']
174
175 def _print_merged_edge(merged_edge: KCFG.MergedEdge) -> list[str]:
176 res = '('
177 for edge in merged_edge.edges:
178 res += f'{edge.depth}|'
179 res = res[:-1] + ' steps)'
180 return [res] if len(res) < 78 else ['(merged edge)']
181
182 def _print_cover(cover: KCFG.Cover) -> Iterable[str]:
183 return _print_csubst(cover.csubst, subst_first=False, indent=4, minimize=minimize)
184
185 def _print_split_edge(split: KCFG.Split, target_id: int) -> list[str]:
186 return _print_csubst(split.splits[target_id], subst_first=True, indent=4, minimize=minimize)
187
188 def _print_subgraph(indent: str, curr_node: KCFG.Node, prior_on_trace: list[KCFG.Node]) -> None:
189 processed = curr_node in processed_nodes
190 processed_nodes.append(curr_node)
191 successors = list(kcfg.successors(curr_node.id))
192
193 curr_node_strs = _print_node(curr_node)
194
195 ret_node_lines = []
196 suffix = []
197 elbow = '├─'
198 node_indent = '│ '
199 if kcfg.is_root(curr_node.id):
200 elbow = '┌─'
201 elif processed or not successors:
202 elbow = '└─'
203 node_indent = ' '
204 if curr_node in prior_on_trace:
205 suffix = ['(looped back)', '']
206 elif processed and not kcfg.is_leaf(curr_node.id):
207 suffix = ['(continues as previously)', '']
208 else:
209 suffix = ['']
210 ret_node_lines.append(indent + elbow + ' ' + curr_node_strs[0])
211 ret_node_lines.extend(add_indent(indent + node_indent, curr_node_strs[1:]))
212 ret_node_lines.extend(add_indent(indent + ' ', suffix))
213 ret_lines.append((f'node_{curr_node.id}', ret_node_lines))
214
215 if processed or not successors:
216 return
217 successor = successors[0]
218
219 if isinstance(successor, KCFG.MultiEdge):
220 ret_lines.append(('unknown', [f'{indent}┃']))
221 multiedge_label = '1 step' if type(successor) is KCFG.NDBranch else 'branch'
222 multiedge_id = 'ndbranch' if type(successor) is KCFG.NDBranch else 'split'
223 ret_lines.append(('unknown', [f'{indent}┃ ({multiedge_label})']))
224
225 for target in successor.targets[:-1]:
226 if type(successor) is KCFG.Split:
227 ret_edge_lines = _print_split_edge(successor, target.id)
228 ret_edge_lines = [indent + '┣━━┓ ' + ret_edge_lines[0]] + add_indent(
229 indent + '┃ ┃ ', ret_edge_lines[1:]
230 )
231 elif type(successor) is KCFG.NDBranch:
232 ret_edge_lines = [indent + '┣━━┓ ']
233 else:
234 raise AssertionError()
235 ret_edge_lines.append(indent + '┃ │')
236 ret_lines.append((f'{multiedge_id}_{curr_node.id}_{target.id}', ret_edge_lines))
237 _print_subgraph(indent + '┃ ', target, prior_on_trace + [curr_node])
238 target = successor.targets[-1]
239 if type(successor) is KCFG.Split:
240 ret_edge_lines = _print_split_edge(successor, target.id)
241 ret_edge_lines = [indent + '┗━━┓ ' + ret_edge_lines[0]] + add_indent(
242 indent + ' ┃ ', ret_edge_lines[1:]
243 )
244 elif type(successor) is KCFG.NDBranch:
245 ret_edge_lines = [indent + '┗━━┓ ']
246 else:
247 raise AssertionError()
248 ret_edge_lines.append(indent + ' │')
249 ret_lines.append((f'{multiedge_id}_{curr_node.id}_{target.id}', ret_edge_lines))
250 _print_subgraph(indent + ' ', target, prior_on_trace + [curr_node])
251
252 elif isinstance(successor, KCFG.EdgeLike):
253 ret_lines.append(('unknown', [f'{indent}│']))
254
255 if type(successor) is KCFG.Edge:
256 ret_edge_lines = []
257 ret_edge_lines.extend(add_indent(indent + '│ ', _print_edge(successor)))
258 ret_lines.append((f'edge_{successor.source.id}_{successor.target.id}', ret_edge_lines))
259
260 elif type(successor) is KCFG.MergedEdge:
261 ret_edge_lines = []
262 ret_edge_lines.extend(add_indent(indent + '│ ', _print_merged_edge(successor)))
263 ret_lines.append((f'merged_edge_{successor.source.id}_{successor.target.id}', ret_edge_lines))
264
265 elif type(successor) is KCFG.Cover:
266 ret_edge_lines = []
267 ret_edge_lines.extend(add_indent(indent + '┊ ', _print_cover(successor)))
268 ret_lines.append((f'cover_{successor.source.id}_{successor.target.id}', ret_edge_lines))
269
270 _print_subgraph(indent, successor.target, prior_on_trace + [curr_node])
271
272 def _sorted_init_nodes() -> tuple[list[KCFG.Node], list[KCFG.Node]]:
273 sorted_init_nodes = sorted(node for node in kcfg.nodes if node not in processed_nodes)
274 init_nodes = []
275 init_leaf_nodes = []
276 remaining_nodes = []
277 for node in sorted_init_nodes:
278 if kcfg.is_root(node.id):
279 if kcfg.is_leaf(node.id):
280 init_leaf_nodes.append(node)
281 else:
282 init_nodes.append(node)
283 else:
284 remaining_nodes.append(node)
285 return (init_nodes + init_leaf_nodes, remaining_nodes)
286
287 init, _ = _sorted_init_nodes()
288 while init:
289 ret_lines.append(('unknown', ['']))
290 _print_subgraph('', init[0], [])
291 init, _ = _sorted_init_nodes()
292 _, remaining = _sorted_init_nodes()
293 if remaining:
294 ret_lines.append(('unknown', ['', 'Remaining Nodes:']))
295 for node in remaining:
296 ret_node_lines = [''] + _print_node(node)
297 ret_lines.append((f'node_{node.id}', ret_node_lines))
298
299 return KCFGShow.make_unique_segments(ret_lines)
300
[docs]
301 def pretty(
302 self,
303 kcfg: KCFG,
304 minimize: bool = True,
305 ) -> Iterable[str]:
306 return (line for _, seg_lines in self.pretty_segments(kcfg, minimize=minimize) for line in seg_lines)
307
[docs]
308 def to_module(
309 self,
310 cfg: KCFG,
311 module_name: str | None = None,
312 omit_cells: Iterable[str] = (),
313 parseable_output: bool = True,
314 defunc_with: KDefinition | None = None,
315 imports: Iterable[KImport] = (),
316 ) -> KFlatModule:
317 def _process_sentence(sent: KSentence) -> KSentence:
318 if type(sent) is KRule:
319 sent = sent.let(body=KCFGShow.hide_cells(sent.body, omit_cells))
320 if parseable_output:
321 sent = sent.let(body=remove_generated_cells(sent.body))
322 sent = minimize_rule_like(sent)
323 return sent
324
325 module = cfg.to_module(module_name, defunc_with=defunc_with, imports=imports)
326 return module.let(sentences=[_process_sentence(sent) for sent in module.sentences])
327
[docs]
328 def show(
329 self,
330 cfg: KCFG,
331 nodes: Iterable[NodeIdLike] = (),
332 node_deltas: Iterable[tuple[NodeIdLike, NodeIdLike]] = (),
333 to_module: bool = False,
334 minimize: bool = True,
335 sort_collections: bool = False,
336 omit_cells: Iterable[str] = (),
337 module_name: str | None = None,
338 ) -> list[str]:
339 res_lines: list[str] = []
340 res_lines += self.pretty(cfg, minimize=minimize)
341
342 nodes_printed = False
343
344 for node_id in nodes:
345 nodes_printed = True
346 kast = cfg.node(node_id).cterm.kast
347 kast = KCFGShow.hide_cells(kast, omit_cells)
348 if minimize:
349 kast = minimize_term(kast)
350 res_lines.append('')
351 res_lines.append('')
352 res_lines.append(f'Node {node_id}:')
353 res_lines.append('')
354 res_lines.append(self.kprint.pretty_print(kast, sort_collections=sort_collections))
355 res_lines.append('')
356
357 for node_id_1, node_id_2 in node_deltas:
358 nodes_printed = True
359 config_1 = KCFGShow.simplify_config(cfg.node(node_id_1).cterm.config, omit_cells)
360 config_2 = KCFGShow.simplify_config(cfg.node(node_id_2).cterm.config, omit_cells)
361 config_delta = push_down_rewrites(KRewrite(config_1, config_2))
362 if minimize:
363 config_delta = minimize_term(config_delta)
364 res_lines.append('')
365 res_lines.append('')
366 res_lines.append(f'State Delta {node_id_1} => {node_id_2}:')
367 res_lines.append('')
368 res_lines.append(self.kprint.pretty_print(config_delta, sort_collections=sort_collections))
369 res_lines.append('')
370
371 if not (nodes_printed):
372 res_lines.append('')
373 res_lines.append('')
374 res_lines.append('')
375
376 if to_module:
377 module = self.to_module(cfg, module_name, omit_cells=omit_cells)
378 res_lines.append(self.kprint.pretty_print(module, sort_collections=sort_collections))
379
380 return res_lines
381
[docs]
382 def dot(self, kcfg: KCFG) -> Digraph:
383 def _short_label(label: str) -> str:
384 return '\n'.join(
385 [
386 label_line if len(label_line) < 100 else (label_line[0:100] + ' ...')
387 for label_line in label.split('\n')
388 ]
389 )
390
391 graph = Digraph()
392
393 for node in kcfg.nodes:
394 label = '\n'.join(self.node_short_info(kcfg, node))
395 class_attrs = ' '.join(self.node_printer.node_attrs(kcfg, node))
396 attrs = {'class': class_attrs} if class_attrs else {}
397 graph.node(name=node.id, label=label, **attrs)
398
399 for edge in kcfg.edges():
400 depth = edge.depth
401 label = f'{depth} steps'
402 graph.edge(tail_name=edge.source.id, head_name=edge.target.id, label=f' {label} ')
403
404 for cover in kcfg.covers():
405 label = ', '.join(
406 f'{k} |-> {self.kprint.pretty_print(v)}' for k, v in cover.csubst.subst.minimize().items()
407 )
408 label = _short_label(label)
409 attrs = {'class': 'abstraction', 'style': 'dashed'}
410 graph.edge(tail_name=cover.source.id, head_name=cover.target.id, label=f' {label} ', **attrs)
411
412 for split in kcfg.splits():
413 for target_id, csubst in split.splits.items():
414 label = '\n#And'.join(
415 f'{self.kprint.pretty_print(v)}' for v in split.source.cterm.constraints + csubst.constraints
416 )
417 graph.edge(tail_name=split.source.id, head_name=target_id, label=f' {label} ')
418
419 for ndbranch in kcfg.ndbranches():
420 for target in ndbranch.target_ids:
421 label = '1 step'
422 graph.edge(tail_name=ndbranch.source.id, head_name=target, label=f' {label} ')
423
424 return graph
425
[docs]
426 def dump(self, cfgid: str, cfg: KCFG, dump_dir: Path, dot: bool = False) -> None:
427 ensure_dir_path(dump_dir)
428
429 cfg_file = dump_dir / f'{cfgid}.json'
430 cfg_file.write_text(cfg.to_json())
431 _LOGGER.info(f'Wrote CFG file {cfgid}: {cfg_file}')
432
433 if dot:
434 cfg_dot = self.dot(cfg)
435 dot_file = dump_dir / f'{cfgid}.dot'
436 dot_file.write_text(cfg_dot.source)
437 _LOGGER.info(f'Wrote DOT file {cfgid}: {dot_file}')
438
439 nodes_dir = dump_dir / 'nodes'
440 ensure_dir_path(nodes_dir)
441 for node in cfg.nodes:
442 node_file = nodes_dir / f'config_{node.id}.txt'
443 node_minimized_file = nodes_dir / f'config_minimized_{node.id}.txt'
444 node_constraint_file = nodes_dir / f'constraint_{node.id}.txt'
445
446 config = node.cterm.config
447 if not node_file.exists():
448 node_file.write_text(self.kprint.pretty_print(config))
449 _LOGGER.info(f'Wrote node file {cfgid}: {node_file}')
450 config = minimize_term(config)
451 if not node_minimized_file.exists():
452 node_minimized_file.write_text(self.kprint.pretty_print(config))
453 _LOGGER.info(f'Wrote node file {cfgid}: {node_minimized_file}')
454 if not node_constraint_file.exists():
455 constraint = mlAnd(node.cterm.constraints)
456 node_constraint_file.write_text(self.kprint.pretty_print(constraint))
457 _LOGGER.info(f'Wrote node file {cfgid}: {node_constraint_file}')
458
459 edges_dir = dump_dir / 'edges'
460 ensure_dir_path(edges_dir)
461 for edge in cfg.edges():
462 edge_file = edges_dir / f'config_{edge.source.id}_{edge.target.id}.txt'
463 edge_minimized_file = edges_dir / f'config_minimized_{edge.source.id}_{edge.target.id}.txt'
464
465 config = push_down_rewrites(KRewrite(edge.source.cterm.config, edge.target.cterm.config))
466 if not edge_file.exists():
467 edge_file.write_text(self.kprint.pretty_print(config))
468 _LOGGER.info(f'Wrote edge file {cfgid}: {edge_file}')
469 config = minimize_term(config)
470 if not edge_minimized_file.exists():
471 edge_minimized_file.write_text(self.kprint.pretty_print(config))
472 _LOGGER.info(f'Wrote edge file {cfgid}: {edge_minimized_file}')
473
474 covers_dir = dump_dir / 'covers'
475 ensure_dir_path(covers_dir)
476 for cover in cfg.covers():
477 cover_file = covers_dir / f'config_{cover.source.id}_{cover.target.id}.txt'
478 cover_constraint_file = covers_dir / f'constraint_{cover.source.id}_{cover.target.id}.txt'
479
480 subst_equalities = flatten_label(
481 '#And', cover.csubst.pred(sort_with=self.kprint.definition, constraints=False)
482 )
483
484 if not cover_file.exists():
485 cover_file.write_text('\n'.join(self.kprint.pretty_print(se) for se in subst_equalities))
486 _LOGGER.info(f'Wrote cover file {cfgid}: {cover_file}')
487 if not cover_constraint_file.exists():
488 cover_constraint_file.write_text(self.kprint.pretty_print(cover.csubst.constraint))
489 _LOGGER.info(f'Wrote cover file {cfgid}: {cover_constraint_file}')