Source code for pyk.kcfg.show

  1from __future__ import annotations
  2
  3import logging
  4from typing import TYPE_CHECKING
  5
  6from graphviz import Digraph
  7
  8from ..kast.inner import KApply, KRewrite, top_down
  9from ..kast.manip import (
 10    flatten_label,
 11    inline_cell_maps,
 12    minimize_rule_like,
 13    minimize_term,
 14    ml_pred_to_bool,
 15    push_down_rewrites,
 16    remove_generated_cells,
 17    sort_ac_collections,
 18)
 19from ..kast.outer import KRule
 20from ..prelude.k import DOTS
 21from ..prelude.ml import mlAnd
 22from ..utils import add_indent, ensure_dir_path
 23from .kcfg import KCFG
 24
 25if TYPE_CHECKING:
 26    from collections.abc import Iterable
 27    from pathlib import Path
 28    from typing import Final
 29
 30    from ..cterm import CSubst
 31    from ..kast import KInner
 32    from ..kast.outer import KDefinition, KFlatModule, KImport, KSentence
 33    from ..ktool.kprint import KPrint
 34    from .kcfg import NodeIdLike
 35
 36_LOGGER: Final = logging.getLogger(__name__)
 37
 38
[docs] 39class NodePrinter: 40 kprint: KPrint 41 full_printer: bool 42 minimize: bool 43 44 def __init__(self, kprint: KPrint, full_printer: bool = False, minimize: bool = False): 45 self.kprint = kprint 46 self.full_printer = full_printer 47 self.minimize = minimize 48
[docs] 49 def print_node(self, kcfg: KCFG, node: KCFG.Node) -> list[str]: 50 attrs = self.node_attrs(kcfg, node) 51 attr_str = ' (' + ', '.join(attrs) + ')' if attrs else '' 52 node_strs = [f'{node.id}{attr_str}'] 53 if self.full_printer: 54 kast = node.cterm.kast 55 if self.minimize: 56 kast = minimize_term(kast) 57 node_strs.extend(' ' + line for line in self.kprint.pretty_print(kast).split('\n')) 58 return node_strs
59
[docs] 60 def node_attrs(self, kcfg: KCFG, node: KCFG.Node) -> list[str]: 61 attrs = [] 62 if kcfg.is_root(node.id): 63 attrs.append('root') 64 if kcfg.is_stuck(node.id): 65 attrs.append('stuck') 66 if kcfg.is_vacuous(node.id): 67 attrs.append('vacuous') 68 if kcfg.is_leaf(node.id): 69 attrs.append('leaf') 70 if kcfg.is_split(node.id): 71 attrs.append('split') 72 attrs.extend(['@' + alias for alias in sorted(kcfg.aliases(node.id))]) 73 return attrs
74 75
[docs] 76class KCFGShow: 77 kprint: KPrint 78 node_printer: NodePrinter 79 80 def __init__(self, kprint: KPrint, node_printer: NodePrinter | None = None): 81 self.kprint = kprint 82 self.node_printer = node_printer if node_printer is not None else NodePrinter(kprint) 83
[docs] 84 def node_short_info(self, kcfg: KCFG, node: KCFG.Node) -> list[str]: 85 return self.node_printer.print_node(kcfg, node)
86
[docs] 87 @staticmethod 88 def hide_cells(term: KInner, omit_cells: Iterable[str]) -> KInner: 89 def _hide_cells(_k: KInner) -> KInner: 90 if type(_k) == KApply and _k.label.name in omit_cells: 91 return DOTS 92 return _k 93 94 if omit_cells: 95 return top_down(_hide_cells, term) 96 return term
97
[docs] 98 @staticmethod 99 def simplify_config(config: KInner, omit_cells: Iterable[str]) -> KInner: 100 config = inline_cell_maps(config) 101 config = sort_ac_collections(config) 102 config = KCFGShow.hide_cells(config, omit_cells) 103 return config
104
[docs] 105 @staticmethod 106 def make_unique_segments(segments: Iterable[tuple[str, Iterable[str]]]) -> Iterable[tuple[str, Iterable[str]]]: 107 _segments = [] 108 used_ids = [] 109 for id, seg_lines in segments: 110 suffix = '' 111 counter = 0 112 while f'{id}{suffix}' in used_ids: 113 suffix = f'_{counter}' 114 counter += 1 115 new_id = f'{id}{suffix}' 116 used_ids.append(new_id) 117 _segments.append((f'{new_id}', [l.rstrip() for l in seg_lines])) 118 return _segments
119
[docs] 120 def pretty_segments(self, kcfg: KCFG, minimize: bool = True) -> Iterable[tuple[str, Iterable[str]]]: 121 """Return a pretty version of the KCFG in segments. 122 123 Each segment is a tuple of an identifier and a list of lines to be printed for that segment (Tuple[str, Iterable[str]). 124 The identifier tells you whether that segment is for a given node, edge, or just pretty spacing ('unknown'). 125 This is useful for applications which want to pretty print in chunks, so that they can know which printed region corresponds to each node/edge. 126 """ 127 processed_nodes: list[KCFG.Node] = [] 128 ret_lines: list[tuple[str, list[str]]] = [] 129 130 def _multi_line_print( 131 label: str, lines: list[str], default: str = 'None', indent: int = 4, max_width: int | None = None 132 ) -> list[str]: 133 ret_lines = [] 134 if len(lines) == 0: 135 ret_lines.append(f'{label}: {default}') 136 else: 137 ret_lines.append(f'{label}:') 138 ret_lines.extend([f'{indent * " "}{line}' for line in lines]) 139 if max_width is not None: 140 ret_lines = [ 141 ret_line if len(ret_line) <= max_width else ret_line[0:max_width] + '...' for ret_line in ret_lines 142 ] 143 return ret_lines 144 145 def _print_csubst( 146 csubst: CSubst, subst_first: bool = False, indent: int = 4, minimize: bool = False 147 ) -> list[str]: 148 max_width = 78 if minimize else None 149 _constraint_strs = [ 150 self.kprint.pretty_print(ml_pred_to_bool(constraint, unsafe=True)) for constraint in csubst.constraints 151 ] 152 constraint_strs = _multi_line_print('constraint', _constraint_strs, 'true') 153 if len(csubst.subst.minimize()) > 0 and minimize: 154 subst_strs = ['subst: ...'] 155 else: 156 _subst_strs = [ 157 line 158 for k, v in csubst.subst.minimize().items() 159 for line in f'{k} <- {self.kprint.pretty_print(v)}'.split('\n') 160 ] 161 subst_strs = _multi_line_print('subst', _subst_strs, '.Subst', max_width=max_width) 162 if subst_first: 163 return subst_strs + constraint_strs 164 return constraint_strs + subst_strs 165 166 def _print_node(node: KCFG.Node) -> list[str]: 167 return self.node_short_info(kcfg, node) 168 169 def _print_edge(edge: KCFG.Edge) -> list[str]: 170 if edge.depth == 1: 171 return ['(' + str(edge.depth) + ' step)'] 172 else: 173 return ['(' + str(edge.depth) + ' steps)'] 174 175 def _print_merged_edge(merged_edge: KCFG.MergedEdge) -> list[str]: 176 res = '(' 177 for edge in merged_edge.edges: 178 res += f'{edge.depth}|' 179 res = res[:-1] + ' steps)' 180 return [res] if len(res) < 78 else ['(merged edge)'] 181 182 def _print_cover(cover: KCFG.Cover) -> Iterable[str]: 183 return _print_csubst(cover.csubst, subst_first=False, indent=4, minimize=minimize) 184 185 def _print_split_edge(split: KCFG.Split, target_id: int) -> list[str]: 186 return _print_csubst(split.splits[target_id], subst_first=True, indent=4, minimize=minimize) 187 188 def _print_subgraph(indent: str, curr_node: KCFG.Node, prior_on_trace: list[KCFG.Node]) -> None: 189 processed = curr_node in processed_nodes 190 processed_nodes.append(curr_node) 191 successors = list(kcfg.successors(curr_node.id)) 192 193 curr_node_strs = _print_node(curr_node) 194 195 ret_node_lines = [] 196 suffix = [] 197 elbow = '├─' 198 node_indent = '│ ' 199 if kcfg.is_root(curr_node.id): 200 elbow = '┌─' 201 elif processed or not successors: 202 elbow = '└─' 203 node_indent = ' ' 204 if curr_node in prior_on_trace: 205 suffix = ['(looped back)', ''] 206 elif processed and not kcfg.is_leaf(curr_node.id): 207 suffix = ['(continues as previously)', ''] 208 else: 209 suffix = [''] 210 ret_node_lines.append(indent + elbow + ' ' + curr_node_strs[0]) 211 ret_node_lines.extend(add_indent(indent + node_indent, curr_node_strs[1:])) 212 ret_node_lines.extend(add_indent(indent + ' ', suffix)) 213 ret_lines.append((f'node_{curr_node.id}', ret_node_lines)) 214 215 if processed or not successors: 216 return 217 successor = successors[0] 218 219 if isinstance(successor, KCFG.MultiEdge): 220 ret_lines.append(('unknown', [f'{indent}┃'])) 221 multiedge_label = '1 step' if type(successor) is KCFG.NDBranch else 'branch' 222 multiedge_id = 'ndbranch' if type(successor) is KCFG.NDBranch else 'split' 223 ret_lines.append(('unknown', [f'{indent}┃ ({multiedge_label})'])) 224 225 for target in successor.targets[:-1]: 226 if type(successor) is KCFG.Split: 227 ret_edge_lines = _print_split_edge(successor, target.id) 228 ret_edge_lines = [indent + '┣━━┓ ' + ret_edge_lines[0]] + add_indent( 229 indent + '┃ ┃ ', ret_edge_lines[1:] 230 ) 231 elif type(successor) is KCFG.NDBranch: 232 ret_edge_lines = [indent + '┣━━┓ '] 233 else: 234 raise AssertionError() 235 ret_edge_lines.append(indent + '┃ │') 236 ret_lines.append((f'{multiedge_id}_{curr_node.id}_{target.id}', ret_edge_lines)) 237 _print_subgraph(indent + '┃ ', target, prior_on_trace + [curr_node]) 238 target = successor.targets[-1] 239 if type(successor) is KCFG.Split: 240 ret_edge_lines = _print_split_edge(successor, target.id) 241 ret_edge_lines = [indent + '┗━━┓ ' + ret_edge_lines[0]] + add_indent( 242 indent + ' ┃ ', ret_edge_lines[1:] 243 ) 244 elif type(successor) is KCFG.NDBranch: 245 ret_edge_lines = [indent + '┗━━┓ '] 246 else: 247 raise AssertionError() 248 ret_edge_lines.append(indent + ' │') 249 ret_lines.append((f'{multiedge_id}_{curr_node.id}_{target.id}', ret_edge_lines)) 250 _print_subgraph(indent + ' ', target, prior_on_trace + [curr_node]) 251 252 elif isinstance(successor, KCFG.EdgeLike): 253 ret_lines.append(('unknown', [f'{indent}│'])) 254 255 if type(successor) is KCFG.Edge: 256 ret_edge_lines = [] 257 ret_edge_lines.extend(add_indent(indent + '│ ', _print_edge(successor))) 258 ret_lines.append((f'edge_{successor.source.id}_{successor.target.id}', ret_edge_lines)) 259 260 elif type(successor) is KCFG.MergedEdge: 261 ret_edge_lines = [] 262 ret_edge_lines.extend(add_indent(indent + '│ ', _print_merged_edge(successor))) 263 ret_lines.append((f'merged_edge_{successor.source.id}_{successor.target.id}', ret_edge_lines)) 264 265 elif type(successor) is KCFG.Cover: 266 ret_edge_lines = [] 267 ret_edge_lines.extend(add_indent(indent + '┊ ', _print_cover(successor))) 268 ret_lines.append((f'cover_{successor.source.id}_{successor.target.id}', ret_edge_lines)) 269 270 _print_subgraph(indent, successor.target, prior_on_trace + [curr_node]) 271 272 def _sorted_init_nodes() -> tuple[list[KCFG.Node], list[KCFG.Node]]: 273 sorted_init_nodes = sorted(node for node in kcfg.nodes if node not in processed_nodes) 274 init_nodes = [] 275 init_leaf_nodes = [] 276 remaining_nodes = [] 277 for node in sorted_init_nodes: 278 if kcfg.is_root(node.id): 279 if kcfg.is_leaf(node.id): 280 init_leaf_nodes.append(node) 281 else: 282 init_nodes.append(node) 283 else: 284 remaining_nodes.append(node) 285 return (init_nodes + init_leaf_nodes, remaining_nodes) 286 287 init, _ = _sorted_init_nodes() 288 while init: 289 ret_lines.append(('unknown', [''])) 290 _print_subgraph('', init[0], []) 291 init, _ = _sorted_init_nodes() 292 _, remaining = _sorted_init_nodes() 293 if remaining: 294 ret_lines.append(('unknown', ['', 'Remaining Nodes:'])) 295 for node in remaining: 296 ret_node_lines = [''] + _print_node(node) 297 ret_lines.append((f'node_{node.id}', ret_node_lines)) 298 299 return KCFGShow.make_unique_segments(ret_lines)
300
[docs] 301 def pretty( 302 self, 303 kcfg: KCFG, 304 minimize: bool = True, 305 ) -> Iterable[str]: 306 return (line for _, seg_lines in self.pretty_segments(kcfg, minimize=minimize) for line in seg_lines)
307
[docs] 308 def to_module( 309 self, 310 cfg: KCFG, 311 module_name: str | None = None, 312 omit_cells: Iterable[str] = (), 313 parseable_output: bool = True, 314 defunc_with: KDefinition | None = None, 315 imports: Iterable[KImport] = (), 316 ) -> KFlatModule: 317 def _process_sentence(sent: KSentence) -> KSentence: 318 if type(sent) is KRule: 319 sent = sent.let(body=KCFGShow.hide_cells(sent.body, omit_cells)) 320 if parseable_output: 321 sent = sent.let(body=remove_generated_cells(sent.body)) 322 sent = minimize_rule_like(sent) 323 return sent 324 325 module = cfg.to_module(module_name, defunc_with=defunc_with, imports=imports) 326 return module.let(sentences=[_process_sentence(sent) for sent in module.sentences])
327
[docs] 328 def show( 329 self, 330 cfg: KCFG, 331 nodes: Iterable[NodeIdLike] = (), 332 node_deltas: Iterable[tuple[NodeIdLike, NodeIdLike]] = (), 333 to_module: bool = False, 334 minimize: bool = True, 335 sort_collections: bool = False, 336 omit_cells: Iterable[str] = (), 337 module_name: str | None = None, 338 ) -> list[str]: 339 res_lines: list[str] = [] 340 res_lines += self.pretty(cfg, minimize=minimize) 341 342 nodes_printed = False 343 344 for node_id in nodes: 345 nodes_printed = True 346 kast = cfg.node(node_id).cterm.kast 347 kast = KCFGShow.hide_cells(kast, omit_cells) 348 if minimize: 349 kast = minimize_term(kast) 350 res_lines.append('') 351 res_lines.append('') 352 res_lines.append(f'Node {node_id}:') 353 res_lines.append('') 354 res_lines.append(self.kprint.pretty_print(kast, sort_collections=sort_collections)) 355 res_lines.append('') 356 357 for node_id_1, node_id_2 in node_deltas: 358 nodes_printed = True 359 config_1 = KCFGShow.simplify_config(cfg.node(node_id_1).cterm.config, omit_cells) 360 config_2 = KCFGShow.simplify_config(cfg.node(node_id_2).cterm.config, omit_cells) 361 config_delta = push_down_rewrites(KRewrite(config_1, config_2)) 362 if minimize: 363 config_delta = minimize_term(config_delta) 364 res_lines.append('') 365 res_lines.append('') 366 res_lines.append(f'State Delta {node_id_1} => {node_id_2}:') 367 res_lines.append('') 368 res_lines.append(self.kprint.pretty_print(config_delta, sort_collections=sort_collections)) 369 res_lines.append('') 370 371 if not (nodes_printed): 372 res_lines.append('') 373 res_lines.append('') 374 res_lines.append('') 375 376 if to_module: 377 module = self.to_module(cfg, module_name, omit_cells=omit_cells) 378 res_lines.append(self.kprint.pretty_print(module, sort_collections=sort_collections)) 379 380 return res_lines
381
[docs] 382 def dot(self, kcfg: KCFG) -> Digraph: 383 def _short_label(label: str) -> str: 384 return '\n'.join( 385 [ 386 label_line if len(label_line) < 100 else (label_line[0:100] + ' ...') 387 for label_line in label.split('\n') 388 ] 389 ) 390 391 graph = Digraph() 392 393 for node in kcfg.nodes: 394 label = '\n'.join(self.node_short_info(kcfg, node)) 395 class_attrs = ' '.join(self.node_printer.node_attrs(kcfg, node)) 396 attrs = {'class': class_attrs} if class_attrs else {} 397 graph.node(name=node.id, label=label, **attrs) 398 399 for edge in kcfg.edges(): 400 depth = edge.depth 401 label = f'{depth} steps' 402 graph.edge(tail_name=edge.source.id, head_name=edge.target.id, label=f' {label} ') 403 404 for cover in kcfg.covers(): 405 label = ', '.join( 406 f'{k} |-> {self.kprint.pretty_print(v)}' for k, v in cover.csubst.subst.minimize().items() 407 ) 408 label = _short_label(label) 409 attrs = {'class': 'abstraction', 'style': 'dashed'} 410 graph.edge(tail_name=cover.source.id, head_name=cover.target.id, label=f' {label} ', **attrs) 411 412 for split in kcfg.splits(): 413 for target_id, csubst in split.splits.items(): 414 label = '\n#And'.join( 415 f'{self.kprint.pretty_print(v)}' for v in split.source.cterm.constraints + csubst.constraints 416 ) 417 graph.edge(tail_name=split.source.id, head_name=target_id, label=f' {label} ') 418 419 for ndbranch in kcfg.ndbranches(): 420 for target in ndbranch.target_ids: 421 label = '1 step' 422 graph.edge(tail_name=ndbranch.source.id, head_name=target, label=f' {label} ') 423 424 return graph
425
[docs] 426 def dump(self, cfgid: str, cfg: KCFG, dump_dir: Path, dot: bool = False) -> None: 427 ensure_dir_path(dump_dir) 428 429 cfg_file = dump_dir / f'{cfgid}.json' 430 cfg_file.write_text(cfg.to_json()) 431 _LOGGER.info(f'Wrote CFG file {cfgid}: {cfg_file}') 432 433 if dot: 434 cfg_dot = self.dot(cfg) 435 dot_file = dump_dir / f'{cfgid}.dot' 436 dot_file.write_text(cfg_dot.source) 437 _LOGGER.info(f'Wrote DOT file {cfgid}: {dot_file}') 438 439 nodes_dir = dump_dir / 'nodes' 440 ensure_dir_path(nodes_dir) 441 for node in cfg.nodes: 442 node_file = nodes_dir / f'config_{node.id}.txt' 443 node_minimized_file = nodes_dir / f'config_minimized_{node.id}.txt' 444 node_constraint_file = nodes_dir / f'constraint_{node.id}.txt' 445 446 config = node.cterm.config 447 if not node_file.exists(): 448 node_file.write_text(self.kprint.pretty_print(config)) 449 _LOGGER.info(f'Wrote node file {cfgid}: {node_file}') 450 config = minimize_term(config) 451 if not node_minimized_file.exists(): 452 node_minimized_file.write_text(self.kprint.pretty_print(config)) 453 _LOGGER.info(f'Wrote node file {cfgid}: {node_minimized_file}') 454 if not node_constraint_file.exists(): 455 constraint = mlAnd(node.cterm.constraints) 456 node_constraint_file.write_text(self.kprint.pretty_print(constraint)) 457 _LOGGER.info(f'Wrote node file {cfgid}: {node_constraint_file}') 458 459 edges_dir = dump_dir / 'edges' 460 ensure_dir_path(edges_dir) 461 for edge in cfg.edges(): 462 edge_file = edges_dir / f'config_{edge.source.id}_{edge.target.id}.txt' 463 edge_minimized_file = edges_dir / f'config_minimized_{edge.source.id}_{edge.target.id}.txt' 464 465 config = push_down_rewrites(KRewrite(edge.source.cterm.config, edge.target.cterm.config)) 466 if not edge_file.exists(): 467 edge_file.write_text(self.kprint.pretty_print(config)) 468 _LOGGER.info(f'Wrote edge file {cfgid}: {edge_file}') 469 config = minimize_term(config) 470 if not edge_minimized_file.exists(): 471 edge_minimized_file.write_text(self.kprint.pretty_print(config)) 472 _LOGGER.info(f'Wrote edge file {cfgid}: {edge_minimized_file}') 473 474 covers_dir = dump_dir / 'covers' 475 ensure_dir_path(covers_dir) 476 for cover in cfg.covers(): 477 cover_file = covers_dir / f'config_{cover.source.id}_{cover.target.id}.txt' 478 cover_constraint_file = covers_dir / f'constraint_{cover.source.id}_{cover.target.id}.txt' 479 480 subst_equalities = flatten_label( 481 '#And', cover.csubst.pred(sort_with=self.kprint.definition, constraints=False) 482 ) 483 484 if not cover_file.exists(): 485 cover_file.write_text('\n'.join(self.kprint.pretty_print(se) for se in subst_equalities)) 486 _LOGGER.info(f'Wrote cover file {cfgid}: {cover_file}') 487 if not cover_constraint_file.exists(): 488 cover_constraint_file.write_text(self.kprint.pretty_print(cover.csubst.constraint)) 489 _LOGGER.info(f'Wrote cover file {cfgid}: {cover_constraint_file}')