Source code for pyk.kcfg.show

  1from __future__ import annotations
  2
  3import logging
  4from typing import TYPE_CHECKING
  5
  6from graphviz import Digraph
  7
  8from ..kast.inner import KApply, KRewrite, top_down
  9from ..kast.manip import (
 10    flatten_label,
 11    inline_cell_maps,
 12    minimize_rule_like,
 13    minimize_term,
 14    ml_pred_to_bool,
 15    push_down_rewrites,
 16    remove_generated_cells,
 17    sort_ac_collections,
 18)
 19from ..kast.outer import KRule
 20from ..prelude.k import DOTS
 21from ..prelude.ml import mlAnd
 22from ..utils import add_indent, ensure_dir_path
 23from .kcfg import KCFG
 24
 25if TYPE_CHECKING:
 26    from collections.abc import Iterable
 27    from pathlib import Path
 28    from typing import Final
 29
 30    from ..cterm import CSubst
 31    from ..kast import KInner
 32    from ..kast.outer import KFlatModule, KSentence
 33    from ..ktool.kprint import KPrint
 34    from .kcfg import NodeIdLike
 35
 36_LOGGER: Final = logging.getLogger(__name__)
 37
 38
[docs] 39class NodePrinter: 40 kprint: KPrint 41 full_printer: bool 42 minimize: bool 43 44 def __init__(self, kprint: KPrint, full_printer: bool = False, minimize: bool = False): 45 self.kprint = kprint 46 self.full_printer = full_printer 47 self.minimize = minimize 48
[docs] 49 def print_node(self, kcfg: KCFG, node: KCFG.Node) -> list[str]: 50 attrs = self.node_attrs(kcfg, node) 51 attr_str = ' (' + ', '.join(attrs) + ')' if attrs else '' 52 node_strs = [f'{node.id}{attr_str}'] 53 if self.full_printer: 54 kast = node.cterm.kast 55 if self.minimize: 56 kast = minimize_term(kast) 57 node_strs.extend(' ' + line for line in self.kprint.pretty_print(kast).split('\n')) 58 return node_strs
59
[docs] 60 def node_attrs(self, kcfg: KCFG, node: KCFG.Node) -> list[str]: 61 attrs = [] 62 if kcfg.is_root(node.id): 63 attrs.append('root') 64 if kcfg.is_stuck(node.id): 65 attrs.append('stuck') 66 if kcfg.is_vacuous(node.id): 67 attrs.append('vacuous') 68 if kcfg.is_leaf(node.id): 69 attrs.append('leaf') 70 if kcfg.is_split(node.id): 71 attrs.append('split') 72 attrs.extend(['@' + alias for alias in sorted(kcfg.aliases(node.id))]) 73 return attrs
74 75
[docs] 76class KCFGShow: 77 kprint: KPrint 78 node_printer: NodePrinter 79 80 def __init__(self, kprint: KPrint, node_printer: NodePrinter | None = None): 81 self.kprint = kprint 82 self.node_printer = node_printer if node_printer is not None else NodePrinter(kprint) 83
[docs] 84 def node_short_info(self, kcfg: KCFG, node: KCFG.Node) -> list[str]: 85 return self.node_printer.print_node(kcfg, node)
86
[docs] 87 @staticmethod 88 def hide_cells(term: KInner, omit_cells: Iterable[str]) -> KInner: 89 def _hide_cells(_k: KInner) -> KInner: 90 if type(_k) == KApply and _k.label.name in omit_cells: 91 return DOTS 92 return _k 93 94 if omit_cells: 95 return top_down(_hide_cells, term) 96 return term
97
[docs] 98 @staticmethod 99 def simplify_config(config: KInner, omit_cells: Iterable[str]) -> KInner: 100 config = inline_cell_maps(config) 101 config = sort_ac_collections(config) 102 config = KCFGShow.hide_cells(config, omit_cells) 103 return config
104
[docs] 105 @staticmethod 106 def make_unique_segments(segments: Iterable[tuple[str, Iterable[str]]]) -> Iterable[tuple[str, Iterable[str]]]: 107 _segments = [] 108 used_ids = [] 109 for id, seg_lines in segments: 110 suffix = '' 111 counter = 0 112 while f'{id}{suffix}' in used_ids: 113 suffix = f'_{counter}' 114 counter += 1 115 new_id = f'{id}{suffix}' 116 used_ids.append(new_id) 117 _segments.append((f'{new_id}', [l.rstrip() for l in seg_lines])) 118 return _segments
119
[docs] 120 def pretty_segments(self, kcfg: KCFG, minimize: bool = True) -> Iterable[tuple[str, Iterable[str]]]: 121 """Return a pretty version of the KCFG in segments. 122 123 Each segment is a tuple of an identifier and a list of lines to be printed for that segment (Tuple[str, Iterable[str]). 124 The identifier tells you whether that segment is for a given node, edge, or just pretty spacing ('unknown'). 125 This is useful for applications which want to pretty print in chunks, so that they can know which printed region corresponds to each node/edge. 126 """ 127 processed_nodes: list[KCFG.Node] = [] 128 ret_lines: list[tuple[str, list[str]]] = [] 129 130 def _multi_line_print( 131 label: str, lines: list[str], default: str = 'None', indent: int = 4, max_width: int | None = None 132 ) -> list[str]: 133 ret_lines = [] 134 if len(lines) == 0: 135 ret_lines.append(f'{label}: {default}') 136 else: 137 ret_lines.append(f'{label}:') 138 ret_lines.extend([f'{indent * " "}{line}' for line in lines]) 139 if max_width is not None: 140 ret_lines = [ 141 ret_line if len(ret_line) <= max_width else ret_line[0:max_width] + '...' for ret_line in ret_lines 142 ] 143 return ret_lines 144 145 def _print_csubst( 146 csubst: CSubst, subst_first: bool = False, indent: int = 4, minimize: bool = False 147 ) -> list[str]: 148 max_width = 78 if minimize else None 149 _constraint_strs = [ 150 self.kprint.pretty_print(ml_pred_to_bool(constraint, unsafe=True)) for constraint in csubst.constraints 151 ] 152 constraint_strs = _multi_line_print('constraint', _constraint_strs, 'true') 153 if len(csubst.subst.minimize()) > 0 and minimize: 154 subst_strs = ['subst: ...'] 155 else: 156 _subst_strs = [ 157 line 158 for k, v in csubst.subst.minimize().items() 159 for line in f'{k} <- {self.kprint.pretty_print(v)}'.split('\n') 160 ] 161 subst_strs = _multi_line_print('subst', _subst_strs, '.Subst', max_width=max_width) 162 if subst_first: 163 return subst_strs + constraint_strs 164 return constraint_strs + subst_strs 165 166 def _print_node(node: KCFG.Node) -> list[str]: 167 return self.node_short_info(kcfg, node) 168 169 def _print_edge(edge: KCFG.Edge) -> list[str]: 170 if edge.depth == 1: 171 return ['(' + str(edge.depth) + ' step)'] 172 else: 173 return ['(' + str(edge.depth) + ' steps)'] 174 175 def _print_merged_edge(merged_edge: KCFG.MergedEdge) -> list[str]: 176 res = '(' 177 for edge in merged_edge.edges: 178 res += f'{edge.depth}|' 179 res = res[:-1] + ' steps)' 180 return [res] if len(res) < 78 else ['(merged edge)'] 181 182 def _print_cover(cover: KCFG.Cover) -> Iterable[str]: 183 return _print_csubst(cover.csubst, subst_first=False, indent=4, minimize=minimize) 184 185 def _print_split_edge(split: KCFG.Split, target_id: int) -> list[str]: 186 return _print_csubst(split.splits[target_id], subst_first=True, indent=4, minimize=minimize) 187 188 def _print_subgraph(indent: str, curr_node: KCFG.Node, prior_on_trace: list[KCFG.Node]) -> None: 189 processed = curr_node in processed_nodes 190 processed_nodes.append(curr_node) 191 successors = list(kcfg.successors(curr_node.id)) 192 193 curr_node_strs = _print_node(curr_node) 194 195 ret_node_lines = [] 196 suffix = [] 197 elbow = '├─' 198 node_indent = '│ ' 199 if kcfg.is_root(curr_node.id): 200 elbow = '┌─' 201 elif processed or not successors: 202 elbow = '└─' 203 node_indent = ' ' 204 if curr_node in prior_on_trace: 205 suffix = ['(looped back)', ''] 206 elif processed and not kcfg.is_leaf(curr_node.id): 207 suffix = ['(continues as previously)', ''] 208 else: 209 suffix = [''] 210 ret_node_lines.append(indent + elbow + ' ' + curr_node_strs[0]) 211 ret_node_lines.extend(add_indent(indent + node_indent, curr_node_strs[1:])) 212 ret_node_lines.extend(add_indent(indent + ' ', suffix)) 213 ret_lines.append((f'node_{curr_node.id}', ret_node_lines)) 214 215 if processed or not successors: 216 return 217 successor = successors[0] 218 219 if isinstance(successor, KCFG.MultiEdge): 220 ret_lines.append(('unknown', [f'{indent}┃'])) 221 multiedge_label = '1 step' if type(successor) is KCFG.NDBranch else 'branch' 222 multiedge_id = 'ndbranch' if type(successor) is KCFG.NDBranch else 'split' 223 ret_lines.append(('unknown', [f'{indent}┃ ({multiedge_label})'])) 224 225 for target in successor.targets[:-1]: 226 if type(successor) is KCFG.Split: 227 ret_edge_lines = _print_split_edge(successor, target.id) 228 ret_edge_lines = [indent + '┣━━┓ ' + ret_edge_lines[0]] + add_indent( 229 indent + '┃ ┃ ', ret_edge_lines[1:] 230 ) 231 elif type(successor) is KCFG.NDBranch: 232 ret_edge_lines = [indent + '┣━━┓ '] 233 else: 234 raise AssertionError() 235 ret_edge_lines.append(indent + '┃ │') 236 ret_lines.append((f'{multiedge_id}_{curr_node.id}_{target.id}', ret_edge_lines)) 237 _print_subgraph(indent + '┃ ', target, prior_on_trace + [curr_node]) 238 target = successor.targets[-1] 239 if type(successor) is KCFG.Split: 240 ret_edge_lines = _print_split_edge(successor, target.id) 241 ret_edge_lines = [indent + '┗━━┓ ' + ret_edge_lines[0]] + add_indent( 242 indent + ' ┃ ', ret_edge_lines[1:] 243 ) 244 elif type(successor) is KCFG.NDBranch: 245 ret_edge_lines = [indent + '┗━━┓ '] 246 else: 247 raise AssertionError() 248 ret_edge_lines.append(indent + ' │') 249 ret_lines.append((f'{multiedge_id}_{curr_node.id}_{target.id}', ret_edge_lines)) 250 _print_subgraph(indent + ' ', target, prior_on_trace + [curr_node]) 251 252 elif isinstance(successor, KCFG.EdgeLike): 253 ret_lines.append(('unknown', [f'{indent}│'])) 254 255 if type(successor) is KCFG.Edge: 256 ret_edge_lines = [] 257 ret_edge_lines.extend(add_indent(indent + '│ ', _print_edge(successor))) 258 ret_lines.append((f'edge_{successor.source.id}_{successor.target.id}', ret_edge_lines)) 259 260 elif type(successor) is KCFG.MergedEdge: 261 ret_edge_lines = [] 262 ret_edge_lines.extend(add_indent(indent + '│ ', _print_merged_edge(successor))) 263 ret_lines.append((f'merged_edge_{successor.source.id}_{successor.target.id}', ret_edge_lines)) 264 265 elif type(successor) is KCFG.Cover: 266 ret_edge_lines = [] 267 ret_edge_lines.extend(add_indent(indent + '┊ ', _print_cover(successor))) 268 ret_lines.append((f'cover_{successor.source.id}_{successor.target.id}', ret_edge_lines)) 269 270 _print_subgraph(indent, successor.target, prior_on_trace + [curr_node]) 271 272 def _sorted_init_nodes() -> tuple[list[KCFG.Node], list[KCFG.Node]]: 273 sorted_init_nodes = sorted(node for node in kcfg.nodes if node not in processed_nodes) 274 init_nodes = [] 275 init_leaf_nodes = [] 276 remaining_nodes = [] 277 for node in sorted_init_nodes: 278 if kcfg.is_root(node.id): 279 if kcfg.is_leaf(node.id): 280 init_leaf_nodes.append(node) 281 else: 282 init_nodes.append(node) 283 else: 284 remaining_nodes.append(node) 285 return (init_nodes + init_leaf_nodes, remaining_nodes) 286 287 init, _ = _sorted_init_nodes() 288 while init: 289 ret_lines.append(('unknown', [''])) 290 _print_subgraph('', init[0], []) 291 init, _ = _sorted_init_nodes() 292 _, remaining = _sorted_init_nodes() 293 if remaining: 294 ret_lines.append(('unknown', ['', 'Remaining Nodes:'])) 295 for node in remaining: 296 ret_node_lines = [''] + _print_node(node) 297 ret_lines.append((f'node_{node.id}', ret_node_lines)) 298 299 return KCFGShow.make_unique_segments(ret_lines)
300
[docs] 301 def pretty( 302 self, 303 kcfg: KCFG, 304 minimize: bool = True, 305 ) -> Iterable[str]: 306 return (line for _, seg_lines in self.pretty_segments(kcfg, minimize=minimize) for line in seg_lines)
307
[docs] 308 def to_module( 309 self, 310 cfg: KCFG, 311 module_name: str | None = None, 312 omit_cells: Iterable[str] = (), 313 parseable_output: bool = True, 314 ) -> KFlatModule: 315 def _process_sentence(sent: KSentence) -> KSentence: 316 if type(sent) is KRule: 317 sent = sent.let(body=KCFGShow.hide_cells(sent.body, omit_cells)) 318 if parseable_output: 319 sent = sent.let(body=remove_generated_cells(sent.body)) 320 sent = minimize_rule_like(sent) 321 return sent 322 323 module = cfg.to_module(module_name) 324 return module.let(sentences=[_process_sentence(sent) for sent in module.sentences])
325
[docs] 326 def show( 327 self, 328 cfg: KCFG, 329 nodes: Iterable[NodeIdLike] = (), 330 node_deltas: Iterable[tuple[NodeIdLike, NodeIdLike]] = (), 331 to_module: bool = False, 332 minimize: bool = True, 333 sort_collections: bool = False, 334 omit_cells: Iterable[str] = (), 335 module_name: str | None = None, 336 ) -> list[str]: 337 res_lines: list[str] = [] 338 res_lines += self.pretty(cfg, minimize=minimize) 339 340 nodes_printed = False 341 342 for node_id in nodes: 343 nodes_printed = True 344 kast = cfg.node(node_id).cterm.kast 345 kast = KCFGShow.hide_cells(kast, omit_cells) 346 if minimize: 347 kast = minimize_term(kast) 348 res_lines.append('') 349 res_lines.append('') 350 res_lines.append(f'Node {node_id}:') 351 res_lines.append('') 352 res_lines.append(self.kprint.pretty_print(kast, sort_collections=sort_collections)) 353 res_lines.append('') 354 355 for node_id_1, node_id_2 in node_deltas: 356 nodes_printed = True 357 config_1 = KCFGShow.simplify_config(cfg.node(node_id_1).cterm.config, omit_cells) 358 config_2 = KCFGShow.simplify_config(cfg.node(node_id_2).cterm.config, omit_cells) 359 config_delta = push_down_rewrites(KRewrite(config_1, config_2)) 360 if minimize: 361 config_delta = minimize_term(config_delta) 362 res_lines.append('') 363 res_lines.append('') 364 res_lines.append(f'State Delta {node_id_1} => {node_id_2}:') 365 res_lines.append('') 366 res_lines.append(self.kprint.pretty_print(config_delta, sort_collections=sort_collections)) 367 res_lines.append('') 368 369 if not (nodes_printed): 370 res_lines.append('') 371 res_lines.append('') 372 res_lines.append('') 373 374 if to_module: 375 module = self.to_module(cfg, module_name, omit_cells=omit_cells) 376 res_lines.append(self.kprint.pretty_print(module, sort_collections=sort_collections)) 377 378 return res_lines
379
[docs] 380 def dot(self, kcfg: KCFG) -> Digraph: 381 def _short_label(label: str) -> str: 382 return '\n'.join( 383 [ 384 label_line if len(label_line) < 100 else (label_line[0:100] + ' ...') 385 for label_line in label.split('\n') 386 ] 387 ) 388 389 graph = Digraph() 390 391 for node in kcfg.nodes: 392 label = '\n'.join(self.node_short_info(kcfg, node)) 393 class_attrs = ' '.join(self.node_printer.node_attrs(kcfg, node)) 394 attrs = {'class': class_attrs} if class_attrs else {} 395 graph.node(name=node.id, label=label, **attrs) 396 397 for edge in kcfg.edges(): 398 depth = edge.depth 399 label = f'{depth} steps' 400 graph.edge(tail_name=edge.source.id, head_name=edge.target.id, label=f' {label} ') 401 402 for cover in kcfg.covers(): 403 label = ', '.join( 404 f'{k} |-> {self.kprint.pretty_print(v)}' for k, v in cover.csubst.subst.minimize().items() 405 ) 406 label = _short_label(label) 407 attrs = {'class': 'abstraction', 'style': 'dashed'} 408 graph.edge(tail_name=cover.source.id, head_name=cover.target.id, label=f' {label} ', **attrs) 409 410 for split in kcfg.splits(): 411 for target_id, csubst in split.splits.items(): 412 label = '\n#And'.join( 413 f'{self.kprint.pretty_print(v)}' for v in split.source.cterm.constraints + csubst.constraints 414 ) 415 graph.edge(tail_name=split.source.id, head_name=target_id, label=f' {label} ') 416 417 for ndbranch in kcfg.ndbranches(): 418 for target in ndbranch.target_ids: 419 label = '1 step' 420 graph.edge(tail_name=ndbranch.source.id, head_name=target, label=f' {label} ') 421 422 return graph
423
[docs] 424 def dump(self, cfgid: str, cfg: KCFG, dump_dir: Path, dot: bool = False) -> None: 425 ensure_dir_path(dump_dir) 426 427 cfg_file = dump_dir / f'{cfgid}.json' 428 cfg_file.write_text(cfg.to_json()) 429 _LOGGER.info(f'Wrote CFG file {cfgid}: {cfg_file}') 430 431 if dot: 432 cfg_dot = self.dot(cfg) 433 dot_file = dump_dir / f'{cfgid}.dot' 434 dot_file.write_text(cfg_dot.source) 435 _LOGGER.info(f'Wrote DOT file {cfgid}: {dot_file}') 436 437 nodes_dir = dump_dir / 'nodes' 438 ensure_dir_path(nodes_dir) 439 for node in cfg.nodes: 440 node_file = nodes_dir / f'config_{node.id}.txt' 441 node_minimized_file = nodes_dir / f'config_minimized_{node.id}.txt' 442 node_constraint_file = nodes_dir / f'constraint_{node.id}.txt' 443 444 config = node.cterm.config 445 if not node_file.exists(): 446 node_file.write_text(self.kprint.pretty_print(config)) 447 _LOGGER.info(f'Wrote node file {cfgid}: {node_file}') 448 config = minimize_term(config) 449 if not node_minimized_file.exists(): 450 node_minimized_file.write_text(self.kprint.pretty_print(config)) 451 _LOGGER.info(f'Wrote node file {cfgid}: {node_minimized_file}') 452 if not node_constraint_file.exists(): 453 constraint = mlAnd(node.cterm.constraints) 454 node_constraint_file.write_text(self.kprint.pretty_print(constraint)) 455 _LOGGER.info(f'Wrote node file {cfgid}: {node_constraint_file}') 456 457 edges_dir = dump_dir / 'edges' 458 ensure_dir_path(edges_dir) 459 for edge in cfg.edges(): 460 edge_file = edges_dir / f'config_{edge.source.id}_{edge.target.id}.txt' 461 edge_minimized_file = edges_dir / f'config_minimized_{edge.source.id}_{edge.target.id}.txt' 462 463 config = push_down_rewrites(KRewrite(edge.source.cterm.config, edge.target.cterm.config)) 464 if not edge_file.exists(): 465 edge_file.write_text(self.kprint.pretty_print(config)) 466 _LOGGER.info(f'Wrote edge file {cfgid}: {edge_file}') 467 config = minimize_term(config) 468 if not edge_minimized_file.exists(): 469 edge_minimized_file.write_text(self.kprint.pretty_print(config)) 470 _LOGGER.info(f'Wrote edge file {cfgid}: {edge_minimized_file}') 471 472 covers_dir = dump_dir / 'covers' 473 ensure_dir_path(covers_dir) 474 for cover in cfg.covers(): 475 cover_file = covers_dir / f'config_{cover.source.id}_{cover.target.id}.txt' 476 cover_constraint_file = covers_dir / f'constraint_{cover.source.id}_{cover.target.id}.txt' 477 478 subst_equalities = flatten_label( 479 '#And', cover.csubst.pred(sort_with=self.kprint.definition, constraints=False) 480 ) 481 482 if not cover_file.exists(): 483 cover_file.write_text('\n'.join(self.kprint.pretty_print(se) for se in subst_equalities)) 484 _LOGGER.info(f'Wrote cover file {cfgid}: {cover_file}') 485 if not cover_constraint_file.exists(): 486 cover_constraint_file.write_text(self.kprint.pretty_print(cover.csubst.constraint)) 487 _LOGGER.info(f'Wrote cover file {cfgid}: {cover_constraint_file}')