1from __future__ import annotations
2
3import logging
4from typing import TYPE_CHECKING
5
6from graphviz import Digraph
7
8from ..kast.inner import KApply, KRewrite, top_down
9from ..kast.manip import (
10 flatten_label,
11 inline_cell_maps,
12 minimize_rule_like,
13 minimize_term,
14 ml_pred_to_bool,
15 push_down_rewrites,
16 remove_generated_cells,
17 sort_ac_collections,
18)
19from ..kast.outer import KRule
20from ..prelude.k import DOTS
21from ..prelude.ml import mlAnd
22from ..utils import add_indent, ensure_dir_path
23from .kcfg import KCFG
24
25if TYPE_CHECKING:
26 from collections.abc import Iterable
27 from pathlib import Path
28 from typing import Final
29
30 from ..cterm import CSubst
31 from ..kast import KInner
32 from ..kast.outer import KFlatModule, KSentence
33 from ..ktool.kprint import KPrint
34 from .kcfg import NodeIdLike
35
36_LOGGER: Final = logging.getLogger(__name__)
37
38
[docs]
39class NodePrinter:
40 kprint: KPrint
41 full_printer: bool
42 minimize: bool
43
44 def __init__(self, kprint: KPrint, full_printer: bool = False, minimize: bool = False):
45 self.kprint = kprint
46 self.full_printer = full_printer
47 self.minimize = minimize
48
[docs]
49 def print_node(self, kcfg: KCFG, node: KCFG.Node) -> list[str]:
50 attrs = self.node_attrs(kcfg, node)
51 attr_str = ' (' + ', '.join(attrs) + ')' if attrs else ''
52 node_strs = [f'{node.id}{attr_str}']
53 if self.full_printer:
54 kast = node.cterm.kast
55 if self.minimize:
56 kast = minimize_term(kast)
57 node_strs.extend(' ' + line for line in self.kprint.pretty_print(kast).split('\n'))
58 return node_strs
59
[docs]
60 def node_attrs(self, kcfg: KCFG, node: KCFG.Node) -> list[str]:
61 attrs = []
62 if kcfg.is_root(node.id):
63 attrs.append('root')
64 if kcfg.is_stuck(node.id):
65 attrs.append('stuck')
66 if kcfg.is_vacuous(node.id):
67 attrs.append('vacuous')
68 if kcfg.is_leaf(node.id):
69 attrs.append('leaf')
70 if kcfg.is_split(node.id):
71 attrs.append('split')
72 attrs.extend(['@' + alias for alias in sorted(kcfg.aliases(node.id))])
73 return attrs
74
75
[docs]
76class KCFGShow:
77 kprint: KPrint
78 node_printer: NodePrinter
79
80 def __init__(self, kprint: KPrint, node_printer: NodePrinter | None = None):
81 self.kprint = kprint
82 self.node_printer = node_printer if node_printer is not None else NodePrinter(kprint)
83
[docs]
84 def node_short_info(self, kcfg: KCFG, node: KCFG.Node) -> list[str]:
85 return self.node_printer.print_node(kcfg, node)
86
[docs]
87 @staticmethod
88 def hide_cells(term: KInner, omit_cells: Iterable[str]) -> KInner:
89 def _hide_cells(_k: KInner) -> KInner:
90 if type(_k) == KApply and _k.label.name in omit_cells:
91 return DOTS
92 return _k
93
94 if omit_cells:
95 return top_down(_hide_cells, term)
96 return term
97
[docs]
98 @staticmethod
99 def simplify_config(config: KInner, omit_cells: Iterable[str]) -> KInner:
100 config = inline_cell_maps(config)
101 config = sort_ac_collections(config)
102 config = KCFGShow.hide_cells(config, omit_cells)
103 return config
104
[docs]
105 @staticmethod
106 def make_unique_segments(segments: Iterable[tuple[str, Iterable[str]]]) -> Iterable[tuple[str, Iterable[str]]]:
107 _segments = []
108 used_ids = []
109 for id, seg_lines in segments:
110 suffix = ''
111 counter = 0
112 while f'{id}{suffix}' in used_ids:
113 suffix = f'_{counter}'
114 counter += 1
115 new_id = f'{id}{suffix}'
116 used_ids.append(new_id)
117 _segments.append((f'{new_id}', [l.rstrip() for l in seg_lines]))
118 return _segments
119
[docs]
120 def pretty_segments(self, kcfg: KCFG, minimize: bool = True) -> Iterable[tuple[str, Iterable[str]]]:
121 """Return a pretty version of the KCFG in segments.
122
123 Each segment is a tuple of an identifier and a list of lines to be printed for that segment (Tuple[str, Iterable[str]).
124 The identifier tells you whether that segment is for a given node, edge, or just pretty spacing ('unknown').
125 This is useful for applications which want to pretty print in chunks, so that they can know which printed region corresponds to each node/edge.
126 """
127 processed_nodes: list[KCFG.Node] = []
128 ret_lines: list[tuple[str, list[str]]] = []
129
130 def _multi_line_print(
131 label: str, lines: list[str], default: str = 'None', indent: int = 4, max_width: int | None = None
132 ) -> list[str]:
133 ret_lines = []
134 if len(lines) == 0:
135 ret_lines.append(f'{label}: {default}')
136 else:
137 ret_lines.append(f'{label}:')
138 ret_lines.extend([f'{indent * " "}{line}' for line in lines])
139 if max_width is not None:
140 ret_lines = [
141 ret_line if len(ret_line) <= max_width else ret_line[0:max_width] + '...' for ret_line in ret_lines
142 ]
143 return ret_lines
144
145 def _print_csubst(
146 csubst: CSubst, subst_first: bool = False, indent: int = 4, minimize: bool = False
147 ) -> list[str]:
148 max_width = 78 if minimize else None
149 _constraint_strs = [
150 self.kprint.pretty_print(ml_pred_to_bool(constraint, unsafe=True)) for constraint in csubst.constraints
151 ]
152 constraint_strs = _multi_line_print('constraint', _constraint_strs, 'true')
153 if len(csubst.subst.minimize()) > 0 and minimize:
154 subst_strs = ['subst: ...']
155 else:
156 _subst_strs = [
157 line
158 for k, v in csubst.subst.minimize().items()
159 for line in f'{k} <- {self.kprint.pretty_print(v)}'.split('\n')
160 ]
161 subst_strs = _multi_line_print('subst', _subst_strs, '.Subst', max_width=max_width)
162 if subst_first:
163 return subst_strs + constraint_strs
164 return constraint_strs + subst_strs
165
166 def _print_node(node: KCFG.Node) -> list[str]:
167 return self.node_short_info(kcfg, node)
168
169 def _print_edge(edge: KCFG.Edge) -> list[str]:
170 if edge.depth == 1:
171 return ['(' + str(edge.depth) + ' step)']
172 else:
173 return ['(' + str(edge.depth) + ' steps)']
174
175 def _print_merged_edge(merged_edge: KCFG.MergedEdge) -> list[str]:
176 res = '('
177 for edge in merged_edge.edges:
178 res += f'{edge.depth}|'
179 res = res[:-1] + ' steps)'
180 return [res] if len(res) < 78 else ['(merged edge)']
181
182 def _print_cover(cover: KCFG.Cover) -> Iterable[str]:
183 return _print_csubst(cover.csubst, subst_first=False, indent=4, minimize=minimize)
184
185 def _print_split_edge(split: KCFG.Split, target_id: int) -> list[str]:
186 return _print_csubst(split.splits[target_id], subst_first=True, indent=4, minimize=minimize)
187
188 def _print_subgraph(indent: str, curr_node: KCFG.Node, prior_on_trace: list[KCFG.Node]) -> None:
189 processed = curr_node in processed_nodes
190 processed_nodes.append(curr_node)
191 successors = list(kcfg.successors(curr_node.id))
192
193 curr_node_strs = _print_node(curr_node)
194
195 ret_node_lines = []
196 suffix = []
197 elbow = '├─'
198 node_indent = '│ '
199 if kcfg.is_root(curr_node.id):
200 elbow = '┌─'
201 elif processed or not successors:
202 elbow = '└─'
203 node_indent = ' '
204 if curr_node in prior_on_trace:
205 suffix = ['(looped back)', '']
206 elif processed and not kcfg.is_leaf(curr_node.id):
207 suffix = ['(continues as previously)', '']
208 else:
209 suffix = ['']
210 ret_node_lines.append(indent + elbow + ' ' + curr_node_strs[0])
211 ret_node_lines.extend(add_indent(indent + node_indent, curr_node_strs[1:]))
212 ret_node_lines.extend(add_indent(indent + ' ', suffix))
213 ret_lines.append((f'node_{curr_node.id}', ret_node_lines))
214
215 if processed or not successors:
216 return
217 successor = successors[0]
218
219 if isinstance(successor, KCFG.MultiEdge):
220 ret_lines.append(('unknown', [f'{indent}┃']))
221 multiedge_label = '1 step' if type(successor) is KCFG.NDBranch else 'branch'
222 multiedge_id = 'ndbranch' if type(successor) is KCFG.NDBranch else 'split'
223 ret_lines.append(('unknown', [f'{indent}┃ ({multiedge_label})']))
224
225 for target in successor.targets[:-1]:
226 if type(successor) is KCFG.Split:
227 ret_edge_lines = _print_split_edge(successor, target.id)
228 ret_edge_lines = [indent + '┣━━┓ ' + ret_edge_lines[0]] + add_indent(
229 indent + '┃ ┃ ', ret_edge_lines[1:]
230 )
231 elif type(successor) is KCFG.NDBranch:
232 ret_edge_lines = [indent + '┣━━┓ ']
233 else:
234 raise AssertionError()
235 ret_edge_lines.append(indent + '┃ │')
236 ret_lines.append((f'{multiedge_id}_{curr_node.id}_{target.id}', ret_edge_lines))
237 _print_subgraph(indent + '┃ ', target, prior_on_trace + [curr_node])
238 target = successor.targets[-1]
239 if type(successor) is KCFG.Split:
240 ret_edge_lines = _print_split_edge(successor, target.id)
241 ret_edge_lines = [indent + '┗━━┓ ' + ret_edge_lines[0]] + add_indent(
242 indent + ' ┃ ', ret_edge_lines[1:]
243 )
244 elif type(successor) is KCFG.NDBranch:
245 ret_edge_lines = [indent + '┗━━┓ ']
246 else:
247 raise AssertionError()
248 ret_edge_lines.append(indent + ' │')
249 ret_lines.append((f'{multiedge_id}_{curr_node.id}_{target.id}', ret_edge_lines))
250 _print_subgraph(indent + ' ', target, prior_on_trace + [curr_node])
251
252 elif isinstance(successor, KCFG.EdgeLike):
253 ret_lines.append(('unknown', [f'{indent}│']))
254
255 if type(successor) is KCFG.Edge:
256 ret_edge_lines = []
257 ret_edge_lines.extend(add_indent(indent + '│ ', _print_edge(successor)))
258 ret_lines.append((f'edge_{successor.source.id}_{successor.target.id}', ret_edge_lines))
259
260 elif type(successor) is KCFG.MergedEdge:
261 ret_edge_lines = []
262 ret_edge_lines.extend(add_indent(indent + '│ ', _print_merged_edge(successor)))
263 ret_lines.append((f'merged_edge_{successor.source.id}_{successor.target.id}', ret_edge_lines))
264
265 elif type(successor) is KCFG.Cover:
266 ret_edge_lines = []
267 ret_edge_lines.extend(add_indent(indent + '┊ ', _print_cover(successor)))
268 ret_lines.append((f'cover_{successor.source.id}_{successor.target.id}', ret_edge_lines))
269
270 _print_subgraph(indent, successor.target, prior_on_trace + [curr_node])
271
272 def _sorted_init_nodes() -> tuple[list[KCFG.Node], list[KCFG.Node]]:
273 sorted_init_nodes = sorted(node for node in kcfg.nodes if node not in processed_nodes)
274 init_nodes = []
275 init_leaf_nodes = []
276 remaining_nodes = []
277 for node in sorted_init_nodes:
278 if kcfg.is_root(node.id):
279 if kcfg.is_leaf(node.id):
280 init_leaf_nodes.append(node)
281 else:
282 init_nodes.append(node)
283 else:
284 remaining_nodes.append(node)
285 return (init_nodes + init_leaf_nodes, remaining_nodes)
286
287 init, _ = _sorted_init_nodes()
288 while init:
289 ret_lines.append(('unknown', ['']))
290 _print_subgraph('', init[0], [])
291 init, _ = _sorted_init_nodes()
292 _, remaining = _sorted_init_nodes()
293 if remaining:
294 ret_lines.append(('unknown', ['', 'Remaining Nodes:']))
295 for node in remaining:
296 ret_node_lines = [''] + _print_node(node)
297 ret_lines.append((f'node_{node.id}', ret_node_lines))
298
299 return KCFGShow.make_unique_segments(ret_lines)
300
[docs]
301 def pretty(
302 self,
303 kcfg: KCFG,
304 minimize: bool = True,
305 ) -> Iterable[str]:
306 return (line for _, seg_lines in self.pretty_segments(kcfg, minimize=minimize) for line in seg_lines)
307
[docs]
308 def to_module(
309 self,
310 cfg: KCFG,
311 module_name: str | None = None,
312 omit_cells: Iterable[str] = (),
313 parseable_output: bool = True,
314 ) -> KFlatModule:
315 def _process_sentence(sent: KSentence) -> KSentence:
316 if type(sent) is KRule:
317 sent = sent.let(body=KCFGShow.hide_cells(sent.body, omit_cells))
318 if parseable_output:
319 sent = sent.let(body=remove_generated_cells(sent.body))
320 sent = minimize_rule_like(sent)
321 return sent
322
323 module = cfg.to_module(module_name)
324 return module.let(sentences=[_process_sentence(sent) for sent in module.sentences])
325
[docs]
326 def show(
327 self,
328 cfg: KCFG,
329 nodes: Iterable[NodeIdLike] = (),
330 node_deltas: Iterable[tuple[NodeIdLike, NodeIdLike]] = (),
331 to_module: bool = False,
332 minimize: bool = True,
333 sort_collections: bool = False,
334 omit_cells: Iterable[str] = (),
335 module_name: str | None = None,
336 ) -> list[str]:
337 res_lines: list[str] = []
338 res_lines += self.pretty(cfg, minimize=minimize)
339
340 nodes_printed = False
341
342 for node_id in nodes:
343 nodes_printed = True
344 kast = cfg.node(node_id).cterm.kast
345 kast = KCFGShow.hide_cells(kast, omit_cells)
346 if minimize:
347 kast = minimize_term(kast)
348 res_lines.append('')
349 res_lines.append('')
350 res_lines.append(f'Node {node_id}:')
351 res_lines.append('')
352 res_lines.append(self.kprint.pretty_print(kast, sort_collections=sort_collections))
353 res_lines.append('')
354
355 for node_id_1, node_id_2 in node_deltas:
356 nodes_printed = True
357 config_1 = KCFGShow.simplify_config(cfg.node(node_id_1).cterm.config, omit_cells)
358 config_2 = KCFGShow.simplify_config(cfg.node(node_id_2).cterm.config, omit_cells)
359 config_delta = push_down_rewrites(KRewrite(config_1, config_2))
360 if minimize:
361 config_delta = minimize_term(config_delta)
362 res_lines.append('')
363 res_lines.append('')
364 res_lines.append(f'State Delta {node_id_1} => {node_id_2}:')
365 res_lines.append('')
366 res_lines.append(self.kprint.pretty_print(config_delta, sort_collections=sort_collections))
367 res_lines.append('')
368
369 if not (nodes_printed):
370 res_lines.append('')
371 res_lines.append('')
372 res_lines.append('')
373
374 if to_module:
375 module = self.to_module(cfg, module_name, omit_cells=omit_cells)
376 res_lines.append(self.kprint.pretty_print(module, sort_collections=sort_collections))
377
378 return res_lines
379
[docs]
380 def dot(self, kcfg: KCFG) -> Digraph:
381 def _short_label(label: str) -> str:
382 return '\n'.join(
383 [
384 label_line if len(label_line) < 100 else (label_line[0:100] + ' ...')
385 for label_line in label.split('\n')
386 ]
387 )
388
389 graph = Digraph()
390
391 for node in kcfg.nodes:
392 label = '\n'.join(self.node_short_info(kcfg, node))
393 class_attrs = ' '.join(self.node_printer.node_attrs(kcfg, node))
394 attrs = {'class': class_attrs} if class_attrs else {}
395 graph.node(name=node.id, label=label, **attrs)
396
397 for edge in kcfg.edges():
398 depth = edge.depth
399 label = f'{depth} steps'
400 graph.edge(tail_name=edge.source.id, head_name=edge.target.id, label=f' {label} ')
401
402 for cover in kcfg.covers():
403 label = ', '.join(
404 f'{k} |-> {self.kprint.pretty_print(v)}' for k, v in cover.csubst.subst.minimize().items()
405 )
406 label = _short_label(label)
407 attrs = {'class': 'abstraction', 'style': 'dashed'}
408 graph.edge(tail_name=cover.source.id, head_name=cover.target.id, label=f' {label} ', **attrs)
409
410 for split in kcfg.splits():
411 for target_id, csubst in split.splits.items():
412 label = '\n#And'.join(
413 f'{self.kprint.pretty_print(v)}' for v in split.source.cterm.constraints + csubst.constraints
414 )
415 graph.edge(tail_name=split.source.id, head_name=target_id, label=f' {label} ')
416
417 for ndbranch in kcfg.ndbranches():
418 for target in ndbranch.target_ids:
419 label = '1 step'
420 graph.edge(tail_name=ndbranch.source.id, head_name=target, label=f' {label} ')
421
422 return graph
423
[docs]
424 def dump(self, cfgid: str, cfg: KCFG, dump_dir: Path, dot: bool = False) -> None:
425 ensure_dir_path(dump_dir)
426
427 cfg_file = dump_dir / f'{cfgid}.json'
428 cfg_file.write_text(cfg.to_json())
429 _LOGGER.info(f'Wrote CFG file {cfgid}: {cfg_file}')
430
431 if dot:
432 cfg_dot = self.dot(cfg)
433 dot_file = dump_dir / f'{cfgid}.dot'
434 dot_file.write_text(cfg_dot.source)
435 _LOGGER.info(f'Wrote DOT file {cfgid}: {dot_file}')
436
437 nodes_dir = dump_dir / 'nodes'
438 ensure_dir_path(nodes_dir)
439 for node in cfg.nodes:
440 node_file = nodes_dir / f'config_{node.id}.txt'
441 node_minimized_file = nodes_dir / f'config_minimized_{node.id}.txt'
442 node_constraint_file = nodes_dir / f'constraint_{node.id}.txt'
443
444 config = node.cterm.config
445 if not node_file.exists():
446 node_file.write_text(self.kprint.pretty_print(config))
447 _LOGGER.info(f'Wrote node file {cfgid}: {node_file}')
448 config = minimize_term(config)
449 if not node_minimized_file.exists():
450 node_minimized_file.write_text(self.kprint.pretty_print(config))
451 _LOGGER.info(f'Wrote node file {cfgid}: {node_minimized_file}')
452 if not node_constraint_file.exists():
453 constraint = mlAnd(node.cterm.constraints)
454 node_constraint_file.write_text(self.kprint.pretty_print(constraint))
455 _LOGGER.info(f'Wrote node file {cfgid}: {node_constraint_file}')
456
457 edges_dir = dump_dir / 'edges'
458 ensure_dir_path(edges_dir)
459 for edge in cfg.edges():
460 edge_file = edges_dir / f'config_{edge.source.id}_{edge.target.id}.txt'
461 edge_minimized_file = edges_dir / f'config_minimized_{edge.source.id}_{edge.target.id}.txt'
462
463 config = push_down_rewrites(KRewrite(edge.source.cterm.config, edge.target.cterm.config))
464 if not edge_file.exists():
465 edge_file.write_text(self.kprint.pretty_print(config))
466 _LOGGER.info(f'Wrote edge file {cfgid}: {edge_file}')
467 config = minimize_term(config)
468 if not edge_minimized_file.exists():
469 edge_minimized_file.write_text(self.kprint.pretty_print(config))
470 _LOGGER.info(f'Wrote edge file {cfgid}: {edge_minimized_file}')
471
472 covers_dir = dump_dir / 'covers'
473 ensure_dir_path(covers_dir)
474 for cover in cfg.covers():
475 cover_file = covers_dir / f'config_{cover.source.id}_{cover.target.id}.txt'
476 cover_constraint_file = covers_dir / f'constraint_{cover.source.id}_{cover.target.id}.txt'
477
478 subst_equalities = flatten_label(
479 '#And', cover.csubst.pred(sort_with=self.kprint.definition, constraints=False)
480 )
481
482 if not cover_file.exists():
483 cover_file.write_text('\n'.join(self.kprint.pretty_print(se) for se in subst_equalities))
484 _LOGGER.info(f'Wrote cover file {cfgid}: {cover_file}')
485 if not cover_constraint_file.exists():
486 cover_constraint_file.write_text(self.kprint.pretty_print(cover.csubst.constraint))
487 _LOGGER.info(f'Wrote cover file {cfgid}: {cover_constraint_file}')