Source code for pyk.kcfg.show

  1from __future__ import annotations
  2
  3import logging
  4from typing import TYPE_CHECKING
  5
  6from graphviz import Digraph
  7
  8from ..cterm.show import CTermShow
  9from ..kast.inner import KApply, KRewrite, top_down
 10from ..kast.manip import (
 11    flatten_label,
 12    inline_cell_maps,
 13    minimize_rule_like,
 14    minimize_term,
 15    ml_pred_to_bool,
 16    push_down_rewrites,
 17    remove_generated_cells,
 18    sort_ac_collections,
 19)
 20from ..kast.outer import KRule
 21from ..kast.prelude.k import DOTS
 22from ..kast.prelude.ml import mlAnd
 23from ..kast.pretty import PrettyPrinter
 24from ..utils import add_indent, ensure_dir_path
 25from .kcfg import KCFG
 26
 27if TYPE_CHECKING:
 28    from collections.abc import Iterable
 29    from pathlib import Path
 30    from typing import Final
 31
 32    from ..cterm import CSubst
 33    from ..kast import KInner
 34    from ..kast.outer import KDefinition, KFlatModule, KImport, KSentence
 35    from .kcfg import NodeIdLike
 36
 37_LOGGER: Final = logging.getLogger(__name__)
 38
 39
[docs] 40class NodePrinter: 41 cterm_show: CTermShow 42 full_printer: bool 43 minimize: bool 44 45 def __init__(self, cterm_show: CTermShow, full_printer: bool = False): 46 self.cterm_show = cterm_show 47 self.full_printer = full_printer 48
[docs] 49 def print_node(self, kcfg: KCFG, node: KCFG.Node) -> list[str]: 50 attrs = self.node_attrs(kcfg, node) 51 attr_str = ' (' + ', '.join(attrs) + ')' if attrs else '' 52 node_strs = [f'{node.id}{attr_str}'] 53 if self.full_printer: 54 node_strs.extend(' ' + line for line in self.cterm_show.show(node.cterm)) 55 return node_strs
56
[docs] 57 def node_attrs(self, kcfg: KCFG, node: KCFG.Node) -> list[str]: 58 attrs = [] 59 if kcfg.is_root(node.id): 60 attrs.append('root') 61 if kcfg.is_stuck(node.id): 62 attrs.append('stuck') 63 if kcfg.is_vacuous(node.id): 64 attrs.append('vacuous') 65 if kcfg.is_leaf(node.id): 66 attrs.append('leaf') 67 if kcfg.is_split(node.id): 68 attrs.append('split') 69 attrs.extend(['@' + alias for alias in sorted(kcfg.aliases(node.id))]) 70 return attrs
71 72
[docs] 73class KCFGShow: 74 pretty_printer: PrettyPrinter 75 node_printer: NodePrinter 76 77 def __init__(self, defn: KDefinition, node_printer: NodePrinter | None = None): 78 self.pretty_printer = PrettyPrinter(defn) 79 self.node_printer = node_printer if node_printer else NodePrinter(CTermShow(self.pretty_printer.print)) 80
[docs] 81 def node_short_info(self, kcfg: KCFG, node: KCFG.Node) -> list[str]: 82 return self.node_printer.print_node(kcfg, node)
83
[docs] 84 @staticmethod 85 def hide_cells(term: KInner, omit_cells: Iterable[str]) -> KInner: 86 def _hide_cells(_k: KInner) -> KInner: 87 if type(_k) == KApply and _k.label.name in omit_cells: 88 return DOTS 89 return _k 90 91 if omit_cells: 92 return top_down(_hide_cells, term) 93 return term
94
[docs] 95 @staticmethod 96 def simplify_config(config: KInner, omit_cells: Iterable[str]) -> KInner: 97 config = inline_cell_maps(config) 98 config = sort_ac_collections(config) 99 config = KCFGShow.hide_cells(config, omit_cells) 100 return config
101
[docs] 102 @staticmethod 103 def make_unique_segments(segments: Iterable[tuple[str, Iterable[str]]]) -> Iterable[tuple[str, Iterable[str]]]: 104 _segments = [] 105 used_ids = [] 106 for id, seg_lines in segments: 107 suffix = '' 108 counter = 0 109 while f'{id}{suffix}' in used_ids: 110 suffix = f'_{counter}' 111 counter += 1 112 new_id = f'{id}{suffix}' 113 used_ids.append(new_id) 114 _segments.append((f'{new_id}', [l.rstrip() for l in seg_lines])) 115 return _segments
116
[docs] 117 def pretty_segments(self, kcfg: KCFG, minimize: bool = True) -> Iterable[tuple[str, Iterable[str]]]: 118 """Return a pretty version of the KCFG in segments. 119 120 Each segment is a tuple of an identifier and a list of lines to be printed for that segment (Tuple[str, Iterable[str]). 121 The identifier tells you whether that segment is for a given node, edge, or just pretty spacing ('unknown'). 122 This is useful for applications which want to pretty print in chunks, so that they can know which printed region corresponds to each node/edge. 123 """ 124 processed_nodes: list[KCFG.Node] = [] 125 ret_lines: list[tuple[str, list[str]]] = [] 126 127 def _multi_line_print(label: str, lines: list[str], default: str = 'None', indent: int = 4) -> list[str]: 128 ret_lines = [] 129 if len(lines) == 0: 130 ret_lines.append(f'{label}: {default}') 131 else: 132 ret_lines.append(f'{label}:') 133 ret_lines.extend([f'{indent * " "}{line}' for line in lines]) 134 return ret_lines 135 136 def _print_csubst( 137 csubst: CSubst, subst_first: bool = False, indent: int = 4, minimize: bool = False 138 ) -> list[str]: 139 _constraint_strs = [ 140 self.pretty_printer.print(ml_pred_to_bool(constraint, unsafe=True)) for constraint in csubst.constraints 141 ] 142 constraint_strs = _multi_line_print('constraint', _constraint_strs, 'true') 143 if len(csubst.subst.minimize()) > 0 and minimize: 144 subst_strs = ['subst: ...'] 145 else: 146 _subst_strs = [ 147 line 148 for k, v in csubst.subst.minimize().items() 149 for line in f'{k} <- {self.pretty_printer.print(v)}'.split('\n') 150 ] 151 subst_strs = _multi_line_print('subst', _subst_strs, '.Subst') 152 if subst_first: 153 return subst_strs + constraint_strs 154 return constraint_strs + subst_strs 155 156 def _print_node(node: KCFG.Node) -> list[str]: 157 return self.node_short_info(kcfg, node) 158 159 def _print_edge(edge: KCFG.Edge) -> list[str]: 160 if edge.depth == 1: 161 return ['(' + str(edge.depth) + ' step)'] 162 else: 163 return ['(' + str(edge.depth) + ' steps)'] 164 165 def _print_merged_edge(merged_edge: KCFG.MergedEdge) -> list[str]: 166 res = '(' 167 for edge in merged_edge.edges: 168 res += f'{edge.depth}|' 169 res = res[:-1] + ' steps)' 170 return [res] if len(res) < 78 else ['(merged edge)'] 171 172 def _print_cover(cover: KCFG.Cover) -> Iterable[str]: 173 return _print_csubst(cover.csubst, subst_first=False, indent=4, minimize=minimize) 174 175 def _print_split_edge(split: KCFG.Split, target_id: int) -> list[str]: 176 return _print_csubst(split.splits[target_id], subst_first=True, indent=4, minimize=minimize) 177 178 def _print_subgraph(indent: str, curr_node: KCFG.Node, prior_on_trace: list[KCFG.Node]) -> None: 179 processed = curr_node in processed_nodes 180 processed_nodes.append(curr_node) 181 successors = list(kcfg.successors(curr_node.id)) 182 183 curr_node_strs = _print_node(curr_node) 184 185 ret_node_lines = [] 186 suffix = [] 187 elbow = '├─' 188 node_indent = '│ ' 189 if kcfg.is_root(curr_node.id): 190 elbow = '┌─' 191 elif processed or not successors: 192 elbow = '└─' 193 node_indent = ' ' 194 if curr_node in prior_on_trace: 195 suffix = ['(looped back)', ''] 196 elif processed and not kcfg.is_leaf(curr_node.id): 197 suffix = ['(continues as previously)', ''] 198 else: 199 suffix = [''] 200 ret_node_lines.append(indent + elbow + ' ' + curr_node_strs[0]) 201 ret_node_lines.extend(add_indent(indent + node_indent, curr_node_strs[1:])) 202 ret_node_lines.extend(add_indent(indent + ' ', suffix)) 203 ret_lines.append((f'node_{curr_node.id}', ret_node_lines)) 204 205 if processed or not successors: 206 return 207 successor = successors[0] 208 209 if isinstance(successor, KCFG.MultiEdge): 210 ret_lines.append(('unknown', [f'{indent}┃'])) 211 multiedge_label = '1 step' if type(successor) is KCFG.NDBranch else 'branch' 212 multiedge_id = 'ndbranch' if type(successor) is KCFG.NDBranch else 'split' 213 ret_lines.append(('unknown', [f'{indent}┃ ({multiedge_label})'])) 214 215 for target in successor.targets[:-1]: 216 if type(successor) is KCFG.Split: 217 ret_edge_lines = _print_split_edge(successor, target.id) 218 ret_edge_lines = [indent + '┣━━┓ ' + ret_edge_lines[0]] + add_indent( 219 indent + '┃ ┃ ', ret_edge_lines[1:] 220 ) 221 elif type(successor) is KCFG.NDBranch: 222 ret_edge_lines = [indent + '┣━━┓ '] 223 else: 224 raise AssertionError() 225 ret_edge_lines.append(indent + '┃ │') 226 ret_lines.append((f'{multiedge_id}_{curr_node.id}_{target.id}', ret_edge_lines)) 227 _print_subgraph(indent + '┃ ', target, prior_on_trace + [curr_node]) 228 target = successor.targets[-1] 229 if type(successor) is KCFG.Split: 230 ret_edge_lines = _print_split_edge(successor, target.id) 231 ret_edge_lines = [indent + '┗━━┓ ' + ret_edge_lines[0]] + add_indent( 232 indent + ' ┃ ', ret_edge_lines[1:] 233 ) 234 elif type(successor) is KCFG.NDBranch: 235 ret_edge_lines = [indent + '┗━━┓ '] 236 else: 237 raise AssertionError() 238 ret_edge_lines.append(indent + ' │') 239 ret_lines.append((f'{multiedge_id}_{curr_node.id}_{target.id}', ret_edge_lines)) 240 _print_subgraph(indent + ' ', target, prior_on_trace + [curr_node]) 241 242 elif isinstance(successor, KCFG.EdgeLike): 243 ret_lines.append(('unknown', [f'{indent}│'])) 244 245 if type(successor) is KCFG.Edge: 246 ret_edge_lines = [] 247 ret_edge_lines.extend(add_indent(indent + '│ ', _print_edge(successor))) 248 ret_lines.append((f'edge_{successor.source.id}_{successor.target.id}', ret_edge_lines)) 249 250 elif type(successor) is KCFG.MergedEdge: 251 ret_edge_lines = [] 252 ret_edge_lines.extend(add_indent(indent + '│ ', _print_merged_edge(successor))) 253 ret_lines.append((f'merged_edge_{successor.source.id}_{successor.target.id}', ret_edge_lines)) 254 255 elif type(successor) is KCFG.Cover: 256 ret_edge_lines = [] 257 ret_edge_lines.extend(add_indent(indent + '┊ ', _print_cover(successor))) 258 ret_lines.append((f'cover_{successor.source.id}_{successor.target.id}', ret_edge_lines)) 259 260 _print_subgraph(indent, successor.target, prior_on_trace + [curr_node]) 261 262 def _sorted_init_nodes() -> tuple[list[KCFG.Node], list[KCFG.Node]]: 263 sorted_init_nodes = sorted(node for node in kcfg.nodes if node not in processed_nodes) 264 init_nodes = [] 265 init_leaf_nodes = [] 266 remaining_nodes = [] 267 for node in sorted_init_nodes: 268 if kcfg.is_root(node.id): 269 if kcfg.is_leaf(node.id): 270 init_leaf_nodes.append(node) 271 else: 272 init_nodes.append(node) 273 else: 274 remaining_nodes.append(node) 275 return (init_nodes + init_leaf_nodes, remaining_nodes) 276 277 init, _ = _sorted_init_nodes() 278 while init: 279 ret_lines.append(('unknown', [''])) 280 _print_subgraph('', init[0], []) 281 init, _ = _sorted_init_nodes() 282 _, remaining = _sorted_init_nodes() 283 if remaining: 284 ret_lines.append(('unknown', ['', 'Remaining Nodes:'])) 285 for node in remaining: 286 ret_node_lines = [''] + _print_node(node) 287 ret_lines.append((f'node_{node.id}', ret_node_lines)) 288 289 return KCFGShow.make_unique_segments(ret_lines)
290
[docs] 291 def pretty( 292 self, 293 kcfg: KCFG, 294 minimize: bool = True, 295 ) -> Iterable[str]: 296 return (line for _, seg_lines in self.pretty_segments(kcfg, minimize=minimize) for line in seg_lines)
297
[docs] 298 def to_module( 299 self, 300 cfg: KCFG, 301 module_name: str | None = None, 302 omit_cells: Iterable[str] = (), 303 parseable_output: bool = True, 304 defunc_with: KDefinition | None = None, 305 imports: Iterable[KImport] = (), 306 ) -> KFlatModule: 307 def _process_sentence(sent: KSentence) -> KSentence: 308 if type(sent) is KRule: 309 sent = sent.let(body=KCFGShow.hide_cells(sent.body, omit_cells)) 310 if parseable_output: 311 sent = sent.let(body=remove_generated_cells(sent.body)) 312 sent = minimize_rule_like(sent) 313 return sent 314 315 module = cfg.to_module(module_name, defunc_with=defunc_with, imports=imports) 316 return module.let(sentences=[_process_sentence(sent) for sent in module.sentences])
317
[docs] 318 def show( 319 self, 320 cfg: KCFG, 321 nodes: Iterable[NodeIdLike] = (), 322 node_deltas: Iterable[tuple[NodeIdLike, NodeIdLike]] = (), 323 to_module: bool = False, 324 minimize: bool = True, 325 omit_cells: Iterable[str] = (), 326 module_name: str | None = None, 327 ) -> list[str]: 328 res_lines: list[str] = [] 329 res_lines += self.pretty(cfg, minimize=minimize) 330 331 nodes_printed = False 332 333 for node_id in nodes: 334 nodes_printed = True 335 kast = cfg.node(node_id).cterm.kast 336 kast = KCFGShow.hide_cells(kast, omit_cells) 337 if minimize: 338 kast = minimize_term(kast) 339 res_lines.append('') 340 res_lines.append('') 341 res_lines.append(f'Node {node_id}:') 342 res_lines.append('') 343 res_lines.append(self.pretty_printer.print(kast)) 344 res_lines.append('') 345 346 for node_id_1, node_id_2 in node_deltas: 347 nodes_printed = True 348 config_1 = KCFGShow.simplify_config(cfg.node(node_id_1).cterm.config, omit_cells) 349 config_2 = KCFGShow.simplify_config(cfg.node(node_id_2).cterm.config, omit_cells) 350 config_delta = push_down_rewrites(KRewrite(config_1, config_2)) 351 if minimize: 352 config_delta = minimize_term(config_delta) 353 res_lines.append('') 354 res_lines.append('') 355 res_lines.append(f'State Delta {node_id_1} => {node_id_2}:') 356 res_lines.append('') 357 res_lines.append(self.pretty_printer.print(config_delta)) 358 res_lines.append('') 359 360 if not (nodes_printed): 361 res_lines.append('') 362 res_lines.append('') 363 res_lines.append('') 364 365 if to_module: 366 module = self.to_module(cfg, module_name, omit_cells=omit_cells) 367 res_lines.append(self.pretty_printer.print(module)) 368 369 return res_lines
370
[docs] 371 def dot(self, kcfg: KCFG) -> Digraph: 372 def _short_label(label: str) -> str: 373 return '\n'.join( 374 [ 375 label_line if len(label_line) < 100 else (label_line[0:100] + ' ...') 376 for label_line in label.split('\n') 377 ] 378 ) 379 380 graph = Digraph() 381 382 for node in kcfg.nodes: 383 label = '\n'.join(self.node_short_info(kcfg, node)) 384 class_attrs = ' '.join(self.node_printer.node_attrs(kcfg, node)) 385 attrs = {'class': class_attrs} if class_attrs else {} 386 graph.node(name=node.id, label=label, **attrs) 387 388 for edge in kcfg.edges(): 389 depth = edge.depth 390 label = f'{depth} steps' 391 graph.edge(tail_name=edge.source.id, head_name=edge.target.id, label=f' {label} ') 392 393 for cover in kcfg.covers(): 394 label = ', '.join( 395 f'{k} |-> {self.pretty_printer.print(v)}' for k, v in cover.csubst.subst.minimize().items() 396 ) 397 label = _short_label(label) 398 attrs = {'class': 'abstraction', 'style': 'dashed'} 399 graph.edge(tail_name=cover.source.id, head_name=cover.target.id, label=f' {label} ', **attrs) 400 401 for split in kcfg.splits(): 402 for target_id, csubst in split.splits.items(): 403 label = '\n#And'.join( 404 f'{self.pretty_printer.print(v)}' for v in split.source.cterm.constraints + csubst.constraints 405 ) 406 graph.edge(tail_name=split.source.id, head_name=target_id, label=f' {label} ') 407 408 for ndbranch in kcfg.ndbranches(): 409 for target in ndbranch.target_ids: 410 label = '1 step' 411 graph.edge(tail_name=ndbranch.source.id, head_name=target, label=f' {label} ') 412 413 return graph
414
[docs] 415 def dump(self, cfgid: str, cfg: KCFG, dump_dir: Path, dot: bool = False) -> None: 416 ensure_dir_path(dump_dir) 417 418 cfg_file = dump_dir / f'{cfgid}.json' 419 cfg_file.write_text(cfg.to_json()) 420 _LOGGER.info(f'Wrote CFG file {cfgid}: {cfg_file}') 421 422 if dot: 423 cfg_dot = self.dot(cfg) 424 dot_file = dump_dir / f'{cfgid}.dot' 425 dot_file.write_text(cfg_dot.source) 426 _LOGGER.info(f'Wrote DOT file {cfgid}: {dot_file}') 427 428 nodes_dir = dump_dir / 'nodes' 429 ensure_dir_path(nodes_dir) 430 for node in cfg.nodes: 431 node_file = nodes_dir / f'config_{node.id}.txt' 432 node_minimized_file = nodes_dir / f'config_minimized_{node.id}.txt' 433 node_constraint_file = nodes_dir / f'constraint_{node.id}.txt' 434 435 config = node.cterm.config 436 if not node_file.exists(): 437 node_file.write_text(self.pretty_printer.print(config)) 438 _LOGGER.info(f'Wrote node file {cfgid}: {node_file}') 439 config = minimize_term(config) 440 if not node_minimized_file.exists(): 441 node_minimized_file.write_text(self.pretty_printer.print(config)) 442 _LOGGER.info(f'Wrote node file {cfgid}: {node_minimized_file}') 443 if not node_constraint_file.exists(): 444 constraint = mlAnd(node.cterm.constraints) 445 node_constraint_file.write_text(self.pretty_printer.print(constraint)) 446 _LOGGER.info(f'Wrote node file {cfgid}: {node_constraint_file}') 447 448 edges_dir = dump_dir / 'edges' 449 ensure_dir_path(edges_dir) 450 for edge in cfg.edges(): 451 edge_file = edges_dir / f'config_{edge.source.id}_{edge.target.id}.txt' 452 edge_minimized_file = edges_dir / f'config_minimized_{edge.source.id}_{edge.target.id}.txt' 453 454 config = push_down_rewrites(KRewrite(edge.source.cterm.config, edge.target.cterm.config)) 455 if not edge_file.exists(): 456 edge_file.write_text(self.pretty_printer.print(config)) 457 _LOGGER.info(f'Wrote edge file {cfgid}: {edge_file}') 458 config = minimize_term(config) 459 if not edge_minimized_file.exists(): 460 edge_minimized_file.write_text(self.pretty_printer.print(config)) 461 _LOGGER.info(f'Wrote edge file {cfgid}: {edge_minimized_file}') 462 463 covers_dir = dump_dir / 'covers' 464 ensure_dir_path(covers_dir) 465 for cover in cfg.covers(): 466 cover_file = covers_dir / f'config_{cover.source.id}_{cover.target.id}.txt' 467 cover_constraint_file = covers_dir / f'constraint_{cover.source.id}_{cover.target.id}.txt' 468 469 subst_equalities = flatten_label( 470 '#And', cover.csubst.pred(sort_with=self.pretty_printer.definition, constraints=False) 471 ) 472 473 if not cover_file.exists(): 474 cover_file.write_text('\n'.join(self.pretty_printer.print(se) for se in subst_equalities)) 475 _LOGGER.info(f'Wrote cover file {cfgid}: {cover_file}') 476 if not cover_constraint_file.exists(): 477 cover_constraint_file.write_text(self.pretty_printer.print(cover.csubst.constraint)) 478 _LOGGER.info(f'Wrote cover file {cfgid}: {cover_constraint_file}')