Source code for pyk.kcfg.show

  1from __future__ import annotations
  2
  3import logging
  4from typing import TYPE_CHECKING
  5
  6from graphviz import Digraph
  7
  8from ..cterm.show import CTermShow
  9from ..kast.inner import KApply, KRewrite, top_down
 10from ..kast.manip import (
 11    flatten_label,
 12    inline_cell_maps,
 13    minimize_rule_like,
 14    minimize_term,
 15    ml_pred_to_bool,
 16    push_down_rewrites,
 17    remove_generated_cells,
 18    sort_ac_collections,
 19)
 20from ..kast.outer import KRule
 21from ..kast.prelude.k import DOTS
 22from ..kast.prelude.ml import mlAnd
 23from ..kast.pretty import PrettyPrinter
 24from ..utils import add_indent, ensure_dir_path
 25from .kcfg import KCFG
 26
 27if TYPE_CHECKING:
 28    from collections.abc import Iterable
 29    from pathlib import Path
 30    from typing import Final
 31
 32    from ..cterm import CSubst
 33    from ..kast import KInner
 34    from ..kast.outer import KDefinition, KFlatModule, KImport, KSentence
 35    from .kcfg import NodeIdLike
 36
 37_LOGGER: Final = logging.getLogger(__name__)
 38
 39
[docs] 40class NodePrinter: 41 cterm_show: CTermShow 42 full_printer: bool 43 minimize: bool 44 45 def __init__(self, cterm_show: CTermShow, full_printer: bool = False): 46 self.cterm_show = cterm_show 47 self.full_printer = full_printer 48
[docs] 49 def print_node(self, kcfg: KCFG, node: KCFG.Node) -> list[str]: 50 attrs = self.node_attrs(kcfg, node) 51 attr_str = ' (' + ', '.join(attrs) + ')' if attrs else '' 52 node_strs = [f'{node.id}{attr_str}'] 53 if self.full_printer: 54 node_strs.extend(' ' + line for line in self.cterm_show.show(node.cterm)) 55 return node_strs
56
[docs] 57 def node_attrs(self, kcfg: KCFG, node: KCFG.Node) -> list[str]: 58 attrs = [] 59 if kcfg.is_root(node.id): 60 attrs.append('root') 61 if kcfg.is_stuck(node.id): 62 attrs.append('stuck') 63 if kcfg.is_vacuous(node.id): 64 attrs.append('vacuous') 65 if kcfg.is_leaf(node.id): 66 attrs.append('leaf') 67 if kcfg.is_split(node.id): 68 attrs.append('split') 69 attrs.extend(['@' + alias for alias in sorted(kcfg.aliases(node.id))]) 70 return attrs
71 72
[docs] 73class KCFGShow: 74 pretty_printer: PrettyPrinter 75 node_printer: NodePrinter 76 77 def __init__(self, defn: KDefinition, node_printer: NodePrinter | None = None): 78 self.pretty_printer = PrettyPrinter(defn) 79 self.node_printer = node_printer if node_printer else NodePrinter(CTermShow(self.pretty_printer.print)) 80
[docs] 81 def node_short_info(self, kcfg: KCFG, node: KCFG.Node) -> list[str]: 82 return self.node_printer.print_node(kcfg, node)
83
[docs] 84 @staticmethod 85 def hide_cells(term: KInner, omit_cells: Iterable[str]) -> KInner: 86 def _hide_cells(_k: KInner) -> KInner: 87 if type(_k) == KApply and _k.label.name in omit_cells: 88 return DOTS 89 return _k 90 91 if omit_cells: 92 return top_down(_hide_cells, term) 93 return term
94
[docs] 95 @staticmethod 96 def simplify_config(config: KInner, omit_cells: Iterable[str]) -> KInner: 97 config = inline_cell_maps(config) 98 config = sort_ac_collections(config) 99 config = KCFGShow.hide_cells(config, omit_cells) 100 return config
101
[docs] 102 @staticmethod 103 def make_unique_segments(segments: Iterable[tuple[str, Iterable[str]]]) -> Iterable[tuple[str, Iterable[str]]]: 104 _segments = [] 105 used_ids = [] 106 for id, seg_lines in segments: 107 suffix = '' 108 counter = 0 109 while f'{id}{suffix}' in used_ids: 110 suffix = f'_{counter}' 111 counter += 1 112 new_id = f'{id}{suffix}' 113 used_ids.append(new_id) 114 _segments.append((f'{new_id}', [l.rstrip() for l in seg_lines])) 115 return _segments
116
[docs] 117 def pretty_segments(self, kcfg: KCFG, minimize: bool = True) -> Iterable[tuple[str, Iterable[str]]]: 118 """Return a pretty version of the KCFG in segments. 119 120 Each segment is a tuple of an identifier and a list of lines to be printed for that segment (Tuple[str, Iterable[str]). 121 The identifier tells you whether that segment is for a given node, edge, or just pretty spacing ('unknown'). 122 This is useful for applications which want to pretty print in chunks, so that they can know which printed region corresponds to each node/edge. 123 """ 124 processed_nodes: list[KCFG.Node] = [] 125 ret_lines: list[tuple[str, list[str]]] = [] 126 127 def _multi_line_print(label: str, lines: list[str], default: str = 'None', indent: int = 4) -> list[str]: 128 ret_lines = [] 129 if len(lines) == 0: 130 ret_lines.append(f'{label}: {default}') 131 else: 132 ret_lines.append(f'{label}:') 133 ret_lines.extend([f'{indent * " "}{line}' for line in lines]) 134 return ret_lines 135 136 def _print_csubst( 137 csubst: CSubst, subst_first: bool = False, indent: int = 4, minimize: bool = False 138 ) -> list[str]: 139 _constraint_strs = [ 140 self.pretty_printer.print(ml_pred_to_bool(constraint, abstract_unsafe=True)) 141 for constraint in csubst.constraints 142 ] 143 constraint_strs = _multi_line_print('constraint', _constraint_strs, 'true') 144 if len(csubst.subst.minimize()) > 0 and minimize: 145 subst_strs = ['subst: ...'] 146 else: 147 _subst_strs = [ 148 line 149 for k, v in csubst.subst.minimize().items() 150 for line in f'{k} <- {self.pretty_printer.print(v)}'.split('\n') 151 ] 152 subst_strs = _multi_line_print('subst', _subst_strs, '.Subst') 153 if subst_first: 154 return subst_strs + constraint_strs 155 return constraint_strs + subst_strs 156 157 def _print_node(node: KCFG.Node) -> list[str]: 158 return self.node_short_info(kcfg, node) 159 160 def _print_edge(edge: KCFG.Edge) -> list[str]: 161 if edge.depth == 1: 162 return ['(' + str(edge.depth) + ' step)'] 163 else: 164 return ['(' + str(edge.depth) + ' steps)'] 165 166 def _print_merged_edge(merged_edge: KCFG.MergedEdge) -> list[str]: 167 res = '(' 168 for edge in merged_edge.edges: 169 res += f'{edge.depth}|' 170 res = res[:-1] + ' steps)' 171 return [res] if len(res) < 78 else ['(merged edge)'] 172 173 def _print_cover(cover: KCFG.Cover) -> Iterable[str]: 174 return _print_csubst(cover.csubst, subst_first=False, indent=4, minimize=minimize) 175 176 def _print_split_edge(split: KCFG.Split, target_id: int) -> list[str]: 177 return _print_csubst(split.splits[target_id], subst_first=True, indent=4, minimize=minimize) 178 179 def _print_subgraph(indent: str, curr_node: KCFG.Node, prior_on_trace: list[KCFG.Node]) -> None: 180 processed = curr_node in processed_nodes 181 processed_nodes.append(curr_node) 182 successors = list(kcfg.successors(curr_node.id)) 183 184 curr_node_strs = _print_node(curr_node) 185 186 ret_node_lines = [] 187 suffix = [] 188 elbow = '├─' 189 node_indent = '│ ' 190 if kcfg.is_root(curr_node.id): 191 elbow = '┌─' 192 elif processed or not successors: 193 elbow = '└─' 194 node_indent = ' ' 195 if curr_node in prior_on_trace: 196 suffix = ['(looped back)', ''] 197 elif processed and not kcfg.is_leaf(curr_node.id): 198 suffix = ['(continues as previously)', ''] 199 else: 200 suffix = [''] 201 ret_node_lines.append(indent + elbow + ' ' + curr_node_strs[0]) 202 ret_node_lines.extend(add_indent(indent + node_indent, curr_node_strs[1:])) 203 ret_node_lines.extend(add_indent(indent + ' ', suffix)) 204 ret_lines.append((f'node_{curr_node.id}', ret_node_lines)) 205 206 if processed or not successors: 207 return 208 successor = successors[0] 209 210 if isinstance(successor, KCFG.MultiEdge): 211 ret_lines.append(('unknown', [f'{indent}┃'])) 212 multiedge_label = '1 step' if type(successor) is KCFG.NDBranch else 'branch' 213 multiedge_id = 'ndbranch' if type(successor) is KCFG.NDBranch else 'split' 214 ret_lines.append(('unknown', [f'{indent}┃ ({multiedge_label})'])) 215 216 for target in successor.targets[:-1]: 217 if type(successor) is KCFG.Split: 218 ret_edge_lines = _print_split_edge(successor, target.id) 219 ret_edge_lines = [indent + '┣━━┓ ' + ret_edge_lines[0]] + add_indent( 220 indent + '┃ ┃ ', ret_edge_lines[1:] 221 ) 222 elif type(successor) is KCFG.NDBranch: 223 ret_edge_lines = [indent + '┣━━┓ '] 224 else: 225 raise AssertionError() 226 ret_edge_lines.append(indent + '┃ │') 227 ret_lines.append((f'{multiedge_id}_{curr_node.id}_{target.id}', ret_edge_lines)) 228 _print_subgraph(indent + '┃ ', target, prior_on_trace + [curr_node]) 229 target = successor.targets[-1] 230 if type(successor) is KCFG.Split: 231 ret_edge_lines = _print_split_edge(successor, target.id) 232 ret_edge_lines = [indent + '┗━━┓ ' + ret_edge_lines[0]] + add_indent( 233 indent + ' ┃ ', ret_edge_lines[1:] 234 ) 235 elif type(successor) is KCFG.NDBranch: 236 ret_edge_lines = [indent + '┗━━┓ '] 237 else: 238 raise AssertionError() 239 ret_edge_lines.append(indent + ' │') 240 ret_lines.append((f'{multiedge_id}_{curr_node.id}_{target.id}', ret_edge_lines)) 241 _print_subgraph(indent + ' ', target, prior_on_trace + [curr_node]) 242 243 elif isinstance(successor, KCFG.EdgeLike): 244 ret_lines.append(('unknown', [f'{indent}│'])) 245 246 if type(successor) is KCFG.Edge: 247 ret_edge_lines = [] 248 ret_edge_lines.extend(add_indent(indent + '│ ', _print_edge(successor))) 249 ret_lines.append((f'edge_{successor.source.id}_{successor.target.id}', ret_edge_lines)) 250 251 elif type(successor) is KCFG.MergedEdge: 252 ret_edge_lines = [] 253 ret_edge_lines.extend(add_indent(indent + '│ ', _print_merged_edge(successor))) 254 ret_lines.append((f'merged_edge_{successor.source.id}_{successor.target.id}', ret_edge_lines)) 255 256 elif type(successor) is KCFG.Cover: 257 ret_edge_lines = [] 258 ret_edge_lines.extend(add_indent(indent + '┊ ', _print_cover(successor))) 259 ret_lines.append((f'cover_{successor.source.id}_{successor.target.id}', ret_edge_lines)) 260 261 _print_subgraph(indent, successor.target, prior_on_trace + [curr_node]) 262 263 def _sorted_init_nodes() -> tuple[list[KCFG.Node], list[KCFG.Node]]: 264 sorted_init_nodes = sorted(node for node in kcfg.nodes if node not in processed_nodes) 265 init_nodes = [] 266 init_leaf_nodes = [] 267 remaining_nodes = [] 268 for node in sorted_init_nodes: 269 if kcfg.is_root(node.id): 270 if kcfg.is_leaf(node.id): 271 init_leaf_nodes.append(node) 272 else: 273 init_nodes.append(node) 274 else: 275 remaining_nodes.append(node) 276 return (init_nodes + init_leaf_nodes, remaining_nodes) 277 278 init, _ = _sorted_init_nodes() 279 while init: 280 ret_lines.append(('unknown', [''])) 281 _print_subgraph('', init[0], []) 282 init, _ = _sorted_init_nodes() 283 _, remaining = _sorted_init_nodes() 284 if remaining: 285 ret_lines.append(('unknown', ['', 'Remaining Nodes:'])) 286 for node in remaining: 287 ret_node_lines = [''] + _print_node(node) 288 ret_lines.append((f'node_{node.id}', ret_node_lines)) 289 290 return KCFGShow.make_unique_segments(ret_lines)
291
[docs] 292 def pretty( 293 self, 294 kcfg: KCFG, 295 minimize: bool = True, 296 ) -> Iterable[str]: 297 return (line for _, seg_lines in self.pretty_segments(kcfg, minimize=minimize) for line in seg_lines)
298
[docs] 299 def to_module( 300 self, 301 cfg: KCFG, 302 module_name: str | None = None, 303 omit_cells: Iterable[str] = (), 304 parseable_output: bool = True, 305 defunc_with: KDefinition | None = None, 306 imports: Iterable[KImport] = (), 307 ) -> KFlatModule: 308 def _process_sentence(sent: KSentence) -> KSentence: 309 if type(sent) is KRule: 310 sent = sent.let(body=KCFGShow.hide_cells(sent.body, omit_cells)) 311 if parseable_output: 312 sent = sent.let(body=remove_generated_cells(sent.body)) 313 sent = minimize_rule_like(sent) 314 return sent 315 316 module = cfg.to_module(module_name, defunc_with=defunc_with, imports=imports) 317 return module.let(sentences=[_process_sentence(sent) for sent in module.sentences])
318
[docs] 319 def show( 320 self, 321 cfg: KCFG, 322 nodes: Iterable[NodeIdLike] = (), 323 node_deltas: Iterable[tuple[NodeIdLike, NodeIdLike]] = (), 324 to_module: bool = False, 325 minimize: bool = True, 326 omit_cells: Iterable[str] = (), 327 module_name: str | None = None, 328 ) -> list[str]: 329 res_lines: list[str] = [] 330 res_lines += self.pretty(cfg, minimize=minimize) 331 332 nodes_printed = False 333 334 for node_id in nodes: 335 nodes_printed = True 336 kast = cfg.node(node_id).cterm.kast 337 kast = KCFGShow.hide_cells(kast, omit_cells) 338 if minimize: 339 kast = minimize_term(kast) 340 res_lines.append('') 341 res_lines.append('') 342 res_lines.append(f'Node {node_id}:') 343 res_lines.append('') 344 res_lines.append(self.pretty_printer.print(kast)) 345 res_lines.append('') 346 347 for node_id_1, node_id_2 in node_deltas: 348 nodes_printed = True 349 config_1 = KCFGShow.simplify_config(cfg.node(node_id_1).cterm.config, omit_cells) 350 config_2 = KCFGShow.simplify_config(cfg.node(node_id_2).cterm.config, omit_cells) 351 config_delta = push_down_rewrites(KRewrite(config_1, config_2)) 352 if minimize: 353 config_delta = minimize_term(config_delta) 354 res_lines.append('') 355 res_lines.append('') 356 res_lines.append(f'State Delta {node_id_1} => {node_id_2}:') 357 res_lines.append('') 358 res_lines.append(self.pretty_printer.print(config_delta)) 359 res_lines.append('') 360 361 if not (nodes_printed): 362 res_lines.append('') 363 res_lines.append('') 364 res_lines.append('') 365 366 if to_module: 367 module = self.to_module(cfg, module_name, omit_cells=omit_cells) 368 res_lines.append(self.pretty_printer.print(module)) 369 370 return res_lines
371
[docs] 372 def dot(self, kcfg: KCFG) -> Digraph: 373 def _short_label(label: str) -> str: 374 return '\n'.join( 375 [ 376 label_line if len(label_line) < 100 else (label_line[0:100] + ' ...') 377 for label_line in label.split('\n') 378 ] 379 ) 380 381 graph = Digraph() 382 383 for node in kcfg.nodes: 384 label = '\n'.join(self.node_short_info(kcfg, node)) 385 class_attrs = ' '.join(self.node_printer.node_attrs(kcfg, node)) 386 attrs = {'class': class_attrs} if class_attrs else {} 387 graph.node(name=node.id, label=label, **attrs) 388 389 for edge in kcfg.edges(): 390 depth = edge.depth 391 label = f'{depth} steps' 392 graph.edge(tail_name=edge.source.id, head_name=edge.target.id, label=f' {label} ') 393 394 for cover in kcfg.covers(): 395 label = ', '.join( 396 f'{k} |-> {self.pretty_printer.print(v)}' for k, v in cover.csubst.subst.minimize().items() 397 ) 398 label = _short_label(label) 399 attrs = {'class': 'abstraction', 'style': 'dashed'} 400 graph.edge(tail_name=cover.source.id, head_name=cover.target.id, label=f' {label} ', **attrs) 401 402 for split in kcfg.splits(): 403 for target_id, csubst in split.splits.items(): 404 label = '\n#And'.join( 405 f'{self.pretty_printer.print(v)}' for v in split.source.cterm.constraints + csubst.constraints 406 ) 407 graph.edge(tail_name=split.source.id, head_name=target_id, label=f' {label} ') 408 409 for ndbranch in kcfg.ndbranches(): 410 for target in ndbranch.target_ids: 411 label = '1 step' 412 graph.edge(tail_name=ndbranch.source.id, head_name=target, label=f' {label} ') 413 414 return graph
415
[docs] 416 def dump(self, cfgid: str, cfg: KCFG, dump_dir: Path, dot: bool = False) -> None: 417 ensure_dir_path(dump_dir) 418 419 cfg_file = dump_dir / f'{cfgid}.json' 420 cfg_file.write_text(cfg.to_json()) 421 _LOGGER.info(f'Wrote CFG file {cfgid}: {cfg_file}') 422 423 if dot: 424 cfg_dot = self.dot(cfg) 425 dot_file = dump_dir / f'{cfgid}.dot' 426 dot_file.write_text(cfg_dot.source) 427 _LOGGER.info(f'Wrote DOT file {cfgid}: {dot_file}') 428 429 nodes_dir = dump_dir / 'nodes' 430 ensure_dir_path(nodes_dir) 431 for node in cfg.nodes: 432 node_file = nodes_dir / f'config_{node.id}.txt' 433 node_minimized_file = nodes_dir / f'config_minimized_{node.id}.txt' 434 node_constraint_file = nodes_dir / f'constraint_{node.id}.txt' 435 436 config = node.cterm.config 437 if not node_file.exists(): 438 node_file.write_text(self.pretty_printer.print(config)) 439 _LOGGER.info(f'Wrote node file {cfgid}: {node_file}') 440 config = minimize_term(config) 441 if not node_minimized_file.exists(): 442 node_minimized_file.write_text(self.pretty_printer.print(config)) 443 _LOGGER.info(f'Wrote node file {cfgid}: {node_minimized_file}') 444 if not node_constraint_file.exists(): 445 constraint = mlAnd(node.cterm.constraints) 446 node_constraint_file.write_text(self.pretty_printer.print(constraint)) 447 _LOGGER.info(f'Wrote node file {cfgid}: {node_constraint_file}') 448 449 edges_dir = dump_dir / 'edges' 450 ensure_dir_path(edges_dir) 451 for edge in cfg.edges(): 452 edge_file = edges_dir / f'config_{edge.source.id}_{edge.target.id}.txt' 453 edge_minimized_file = edges_dir / f'config_minimized_{edge.source.id}_{edge.target.id}.txt' 454 455 config = push_down_rewrites(KRewrite(edge.source.cterm.config, edge.target.cterm.config)) 456 if not edge_file.exists(): 457 edge_file.write_text(self.pretty_printer.print(config)) 458 _LOGGER.info(f'Wrote edge file {cfgid}: {edge_file}') 459 config = minimize_term(config) 460 if not edge_minimized_file.exists(): 461 edge_minimized_file.write_text(self.pretty_printer.print(config)) 462 _LOGGER.info(f'Wrote edge file {cfgid}: {edge_minimized_file}') 463 464 covers_dir = dump_dir / 'covers' 465 ensure_dir_path(covers_dir) 466 for cover in cfg.covers(): 467 cover_file = covers_dir / f'config_{cover.source.id}_{cover.target.id}.txt' 468 cover_constraint_file = covers_dir / f'constraint_{cover.source.id}_{cover.target.id}.txt' 469 470 subst_equalities = flatten_label( 471 '#And', cover.csubst.pred(sort_with=self.pretty_printer.definition, constraints=False) 472 ) 473 474 if not cover_file.exists(): 475 cover_file.write_text('\n'.join(self.pretty_printer.print(se) for se in subst_equalities)) 476 _LOGGER.info(f'Wrote cover file {cfgid}: {cover_file}') 477 if not cover_constraint_file.exists(): 478 cover_constraint_file.write_text(self.pretty_printer.print(cover.csubst.constraint)) 479 _LOGGER.info(f'Wrote cover file {cfgid}: {cover_constraint_file}')