Source code for pyk.kcfg.kcfg

   1from __future__ import annotations
   2
   3import json
   4import logging
   5from abc import ABC, abstractmethod
   6from collections.abc import Container
   7from dataclasses import dataclass, field
   8from threading import RLock
   9from typing import TYPE_CHECKING, Final, List, Union, cast, final
  10
  11from ..cterm import CSubst, CTerm, cterm_build_claim, cterm_build_rule
  12from ..kast import EMPTY_ATT
  13from ..kast.inner import KApply
  14from ..kast.manip import (
  15    bool_to_ml_pred,
  16    extract_lhs,
  17    extract_rhs,
  18    inline_cell_maps,
  19    minimize_rule_like,
  20    rename_generated_vars,
  21    sort_ac_collections,
  22)
  23from ..kast.outer import KFlatModule
  24from ..prelude.kbool import andBool
  25from ..utils import ensure_dir_path, not_none
  26
  27if TYPE_CHECKING:
  28    from collections.abc import Iterable, Mapping, MutableMapping
  29    from pathlib import Path
  30    from types import TracebackType
  31    from typing import Any
  32
  33    from pyk.kore.rpc import LogEntry
  34
  35    from ..kast import KAtt
  36    from ..kast.inner import KInner
  37    from ..kast.outer import KClaim, KDefinition, KImport, KRuleLike
  38
  39
  40NodeIdLike = int | str
  41
  42_LOGGER: Final = logging.getLogger(__name__)
  43
  44
[docs] 45@dataclass(frozen=True) 46class NodeAttr: 47 value: str
48 49
[docs] 50class KCFGNodeAttr(NodeAttr): 51 VACUOUS = NodeAttr('vacuous') 52 STUCK = NodeAttr('stuck')
53 54
[docs] 55class KCFGStore: 56 store_path: Path 57 58 def __init__(self, store_path: Path) -> None: 59 self.store_path = store_path 60 ensure_dir_path(store_path) 61 ensure_dir_path(self.kcfg_node_dir) 62 63 @property 64 def kcfg_json_path(self) -> Path: 65 return self.store_path / 'kcfg.json' 66 67 @property 68 def kcfg_node_dir(self) -> Path: 69 return self.store_path / 'nodes' 70
[docs] 71 def kcfg_node_path(self, node_id: int) -> Path: 72 return self.kcfg_node_dir / f'{node_id}.json'
73
[docs] 74 def write_cfg_data( 75 self, kcfg: KCFG, dct: dict[str, Any], deleted_nodes: Iterable[int] = (), created_nodes: Iterable[int] = () 76 ) -> None: 77 vacuous_nodes = [ 78 node_id for node_id in kcfg._nodes.keys() if KCFGNodeAttr.VACUOUS in kcfg._nodes[node_id].attrs 79 ] 80 stuck_nodes = [node_id for node_id in kcfg._nodes.keys() if KCFGNodeAttr.STUCK in kcfg._nodes[node_id].attrs] 81 dct['vacuous'] = vacuous_nodes 82 dct['stuck'] = stuck_nodes 83 for node_id in deleted_nodes: 84 self.kcfg_node_path(node_id).unlink(missing_ok=True) 85 for node_id in created_nodes: 86 self.kcfg_node_path(node_id).write_text(json.dumps(kcfg._nodes[node_id].to_dict())) 87 self.kcfg_json_path.write_text(json.dumps(dct))
88
[docs] 89 def read_cfg_data(self) -> dict[str, Any]: 90 dct = json.loads(self.kcfg_json_path.read_text()) 91 nodes = [self.read_node_data(node_id) for node_id in dct.get('nodes') or []] 92 dct['nodes'] = nodes 93 94 new_nodes = [] 95 for node in dct['nodes']: 96 attrs = [] 97 if node['id'] in dct['vacuous']: 98 attrs.append(KCFGNodeAttr.VACUOUS.value) 99 if node['id'] in dct['stuck']: 100 attrs.append(KCFGNodeAttr.STUCK.value) 101 new_nodes.append({'id': node['id'], 'cterm': node['cterm'], 'attrs': attrs}) 102 103 dct['nodes'] = new_nodes 104 105 del dct['vacuous'] 106 del dct['stuck'] 107 108 return dct
109
[docs] 110 def read_node_data(self, node_id: int) -> dict[str, Any]: 111 return json.loads(self.kcfg_node_path(node_id).read_text())
112 113
[docs] 114class KCFG(Container[Union['KCFG.Node', 'KCFG.Successor']]):
[docs] 115 @final 116 @dataclass(frozen=True, order=True) 117 class Node: 118 id: int 119 cterm: CTerm 120 attrs: frozenset[NodeAttr] 121 122 def __init__(self, id: int, cterm: CTerm, attrs: Iterable[NodeAttr] = ()) -> None: 123 object.__setattr__(self, 'id', id) 124 object.__setattr__(self, 'cterm', cterm) 125 object.__setattr__(self, 'attrs', frozenset(attrs)) 126
[docs] 127 def to_dict(self) -> dict[str, Any]: 128 return {'id': self.id, 'cterm': self.cterm.to_dict(), 'attrs': [attr.value for attr in self.attrs]}
129
[docs] 130 @staticmethod 131 def from_dict(dct: dict[str, Any]) -> KCFG.Node: 132 return KCFG.Node(dct['id'], CTerm.from_dict(dct['cterm']), [NodeAttr(attr) for attr in dct['attrs']])
133
[docs] 134 def add_attr(self, attr: NodeAttr) -> KCFG.Node: 135 return KCFG.Node(self.id, self.cterm, list(self.attrs) + [attr])
136
[docs] 137 def remove_attr(self, attr: NodeAttr) -> KCFG.Node: 138 if attr not in self.attrs: 139 raise ValueError(f'Node {self.id} does not have attribute {attr.value}') 140 return KCFG.Node(self.id, self.cterm, self.attrs.difference([attr]))
141
[docs] 142 def discard_attr(self, attr: NodeAttr) -> KCFG.Node: 143 return KCFG.Node(self.id, self.cterm, self.attrs.difference([attr]))
144
[docs] 145 def let(self, cterm: CTerm | None = None, attrs: Iterable[KCFGNodeAttr] | None = None) -> KCFG.Node: 146 new_cterm = cterm if cterm is not None else self.cterm 147 new_attrs = attrs if attrs is not None else self.attrs 148 return KCFG.Node(self.id, new_cterm, new_attrs)
149 150 @property 151 def free_vars(self) -> frozenset[str]: 152 return frozenset(self.cterm.free_vars)
153
[docs] 154 class Successor(ABC): 155 source: KCFG.Node 156 157 def __lt__(self, other: Any) -> bool: 158 if not isinstance(other, KCFG.Successor): 159 return NotImplemented 160 return self.source < other.source 161 162 @property 163 def source_vars(self) -> frozenset[str]: 164 return frozenset(self.source.free_vars) 165 166 @property 167 @abstractmethod 168 def targets(self) -> tuple[KCFG.Node, ...]: ... 169 170 @property 171 def target_ids(self) -> list[int]: 172 return sorted(target.id for target in self.targets) 173 174 @property 175 def target_vars(self) -> frozenset[str]: 176 return frozenset(set.union(set(), *(target.free_vars for target in self.targets))) 177
[docs] 178 @abstractmethod 179 def replace_source(self, node: KCFG.Node) -> KCFG.Successor: ...
180
[docs] 181 @abstractmethod 182 def replace_target(self, node: KCFG.Node) -> KCFG.Successor: ...
183
[docs] 184 @abstractmethod 185 def to_dict(self) -> dict[str, Any]: ...
186
[docs] 187 @staticmethod 188 @abstractmethod 189 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Successor: ...
190
[docs] 191 class EdgeLike(Successor): 192 source: KCFG.Node 193 target: KCFG.Node 194 195 @property 196 def targets(self) -> tuple[KCFG.Node, ...]: 197 return (self.target,)
198
[docs] 199 @final 200 @dataclass(frozen=True) 201 class Edge(EdgeLike): 202 source: KCFG.Node 203 target: KCFG.Node 204 depth: int 205 rules: tuple[str, ...] 206
[docs] 207 def to_dict(self) -> dict[str, Any]: 208 return { 209 'source': self.source.id, 210 'target': self.target.id, 211 'depth': self.depth, 212 'rules': list(self.rules), 213 }
214
[docs] 215 @staticmethod 216 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Edge: 217 return KCFG.Edge(nodes[dct['source']], nodes[dct['target']], dct['depth'], tuple(dct['rules']))
218
[docs] 219 def to_rule( 220 self, 221 label: str, 222 claim: bool = False, 223 priority: int | None = None, 224 defunc_with: KDefinition | None = None, 225 minimize: bool = False, 226 ) -> KRuleLike: 227 def is_ceil_condition(kast: KInner) -> bool: 228 return type(kast) is KApply and kast.label.name == '#Ceil' 229 230 def _simplify_config(config: KInner) -> KInner: 231 return sort_ac_collections(inline_cell_maps(config)) 232 233 sentence_id = f'{label}-{self.source.id}-TO-{self.target.id}' 234 init_constraints = [c for c in self.source.cterm.constraints if not is_ceil_condition(c)] 235 init_cterm = CTerm(_simplify_config(self.source.cterm.config), init_constraints) 236 target_constraints = [c for c in self.target.cterm.constraints if not is_ceil_condition(c)] 237 target_cterm = CTerm(_simplify_config(self.target.cterm.config), target_constraints) 238 rule: KRuleLike 239 if claim: 240 rule, _ = cterm_build_claim(sentence_id, init_cterm, target_cterm) 241 else: 242 rule, _ = cterm_build_rule( 243 sentence_id, init_cterm, target_cterm, priority=priority, defunc_with=defunc_with 244 ) 245 if minimize: 246 rule = minimize_rule_like(rule) 247 return rule
248
[docs] 249 def replace_source(self, node: KCFG.Node) -> KCFG.Edge: 250 assert node.id == self.source.id 251 return KCFG.Edge(node, self.target, self.depth, self.rules)
252
[docs] 253 def replace_target(self, node: KCFG.Node) -> KCFG.Edge: 254 assert node.id == self.target.id 255 return KCFG.Edge(self.source, node, self.depth, self.rules)
256
[docs] 257 @final 258 @dataclass(frozen=True) 259 class MergedEdge(EdgeLike): 260 """Merged edge is a collection of edges that have been merged into a single edge.""" 261 262 source: KCFG.Node 263 target: KCFG.Node 264 edges: tuple[KCFG.Edge, ...] 265
[docs] 266 def to_dict(self) -> dict[str, Any]: 267 return { 268 'source': self.source.id, 269 'target': self.target.id, 270 'edges': [edge.to_dict() for edge in self.edges], 271 }
272
[docs] 273 @staticmethod 274 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Successor: 275 return KCFG.MergedEdge( 276 nodes[dct['source']], 277 nodes[dct['target']], 278 tuple(KCFG.Edge.from_dict(edge, nodes) for edge in dct['edges']), 279 )
280
[docs] 281 def replace_source(self, node: KCFG.Node) -> KCFG.Successor: 282 assert node.id == self.source.id 283 return KCFG.MergedEdge(node, self.target, self.edges)
284
[docs] 285 def replace_target(self, node: KCFG.Node) -> KCFG.Successor: 286 assert node.id == self.target.id 287 return KCFG.MergedEdge(self.source, node, self.edges)
288
[docs] 289 def to_rule(self, label: str, claim: bool = False, priority: int | None = None) -> KRuleLike: 290 return KCFG.Edge(self.source, self.target, 1, ()).to_rule(label, claim, priority)
291
[docs] 292 @final 293 @dataclass(frozen=True) 294 class Cover(EdgeLike): 295 source: KCFG.Node 296 target: KCFG.Node 297 csubst: CSubst 298
[docs] 299 def to_dict(self) -> dict[str, Any]: 300 return { 301 'source': self.source.id, 302 'target': self.target.id, 303 'csubst': self.csubst.to_dict(), 304 }
305
[docs] 306 @staticmethod 307 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Cover: 308 return KCFG.Cover(nodes[dct['source']], nodes[dct['target']], CSubst.from_dict(dct['csubst']))
309
[docs] 310 def replace_source(self, node: KCFG.Node) -> KCFG.Cover: 311 assert node.id == self.source.id 312 return KCFG.Cover(node, self.target, self.csubst)
313
[docs] 314 def replace_target(self, node: KCFG.Node) -> KCFG.Cover: 315 assert node.id == self.target.id 316 return KCFG.Cover(self.source, node, self.csubst)
317
[docs] 318 @dataclass(frozen=True) 319 class MultiEdge(Successor): 320 source: KCFG.Node 321 322 def __lt__(self, other: Any) -> bool: 323 if not type(other) is type(self): 324 return NotImplemented 325 return (self.source, self.target_ids) < (other.source, other.target_ids) 326
[docs] 327 @abstractmethod 328 def with_single_target(self, target: KCFG.Node) -> KCFG.MultiEdge: ...
329
[docs] 330 @final 331 @dataclass(frozen=True) 332 class Split(MultiEdge): 333 source: KCFG.Node 334 _targets: tuple[tuple[KCFG.Node, CSubst], ...] 335 336 def __init__(self, source: KCFG.Node, _targets: Iterable[tuple[KCFG.Node, CSubst]]) -> None: 337 object.__setattr__(self, 'source', source) 338 object.__setattr__(self, '_targets', tuple(_targets)) 339 340 @property 341 def targets(self) -> tuple[KCFG.Node, ...]: 342 return tuple(target for target, _ in self._targets) 343 344 @property 345 def splits(self) -> dict[int, CSubst]: 346 return {target.id: csubst for target, csubst in self._targets} 347
[docs] 348 def to_dict(self) -> dict[str, Any]: 349 return { 350 'source': self.source.id, 351 'targets': [ 352 { 353 'target': target.id, 354 'csubst': csubst.to_dict(), 355 } 356 for target, csubst in self._targets 357 ], 358 }
359
[docs] 360 @staticmethod 361 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Split: 362 _targets = [(nodes[target['target']], CSubst.from_dict(target['csubst'])) for target in dct['targets']] 363 return KCFG.Split(nodes[dct['source']], tuple(_targets))
364
[docs] 365 def with_single_target(self, target: KCFG.Node) -> KCFG.Split: 366 return KCFG.Split(self.source, ((target, self.splits[target.id]),))
367 368 @property 369 def covers(self) -> tuple[KCFG.Cover, ...]: 370 return tuple(KCFG.Cover(target, self.source, csubst) for target, csubst in self._targets) 371
[docs] 372 def replace_source(self, node: KCFG.Node) -> KCFG.Split: 373 assert node.id == self.source.id 374 return KCFG.Split(node, self._targets)
375
[docs] 376 def replace_target(self, node: KCFG.Node) -> KCFG.Split: 377 assert node.id in self.target_ids 378 new_targets = [ 379 (node, csubst) if node.id == target_node.id else (target_node, csubst) 380 for target_node, csubst in self._targets 381 ] 382 return KCFG.Split(self.source, tuple(new_targets))
383
[docs] 384 @final 385 @dataclass(frozen=True) 386 class NDBranch(MultiEdge): 387 source: KCFG.Node 388 _targets: tuple[KCFG.Node, ...] 389 rules: tuple[str, ...] 390 391 def __init__(self, source: KCFG.Node, _targets: Iterable[KCFG.Node], rules: tuple[str, ...]) -> None: 392 object.__setattr__(self, 'source', source) 393 object.__setattr__(self, '_targets', tuple(_targets)) 394 object.__setattr__(self, 'rules', rules) 395 396 @property 397 def targets(self) -> tuple[KCFG.Node, ...]: 398 return self._targets 399
[docs] 400 def to_dict(self) -> dict[str, Any]: 401 return { 402 'source': self.source.id, 403 'targets': [target.id for target in self.targets], 404 'rules': list(self.rules), 405 }
406
[docs] 407 @staticmethod 408 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.NDBranch: 409 return KCFG.NDBranch( 410 nodes[dct['source']], tuple([nodes[target] for target in dct['targets']]), tuple(dct['rules']) 411 )
412
[docs] 413 def with_single_target(self, target: KCFG.Node) -> KCFG.NDBranch: 414 return KCFG.NDBranch(self.source, (target,), ())
415 416 @property 417 def edges(self) -> tuple[KCFG.Edge, ...]: 418 return tuple(KCFG.Edge(self.source, target, 1, ()) for target in self.targets) 419
[docs] 420 def replace_source(self, node: KCFG.Node) -> KCFG.NDBranch: 421 assert node.id == self.source.id 422 return KCFG.NDBranch(node, self._targets, self.rules)
423
[docs] 424 def replace_target(self, node: KCFG.Node) -> KCFG.NDBranch: 425 assert node.id in self.target_ids 426 new_targets = [node if node.id == target_node.id else target_node for target_node in self._targets] 427 return KCFG.NDBranch(self.source, tuple(new_targets), self.rules)
428 429 _node_id: int 430 _nodes: MutableMapping[int, KCFG.Node] 431 432 _created_nodes: set[int] 433 _deleted_nodes: set[int] 434 435 _edges: dict[int, Edge] 436 _merged_edges: dict[int, MergedEdge] 437 _covers: dict[int, Cover] 438 _splits: dict[int, Split] 439 _ndbranches: dict[int, NDBranch] 440 _aliases: dict[str, int] 441 _lock: RLock 442 443 _kcfg_store: KCFGStore | None 444 445 def __init__(self, cfg_dir: Path | None = None, optimize_memory: bool = True) -> None: 446 self._node_id = 1 447 if optimize_memory: 448 from .store import OptimizedNodeStore 449 450 self._nodes = OptimizedNodeStore() 451 else: 452 self._nodes = {} 453 self._created_nodes = set() 454 self._deleted_nodes = set() 455 self._edges = {} 456 self._merged_edges = {} 457 self._covers = {} 458 self._splits = {} 459 self._ndbranches = {} 460 self._aliases = {} 461 self._lock = RLock() 462 if cfg_dir is not None: 463 self._kcfg_store = KCFGStore(cfg_dir) 464 465 def __contains__(self, item: object) -> bool: 466 if type(item) is KCFG.Node: 467 return self.contains_node(item) 468 if type(item) is KCFG.Edge: 469 return self.contains_edge(item) 470 if type(item) is KCFG.MergedEdge: 471 return self.contains_merged_edge(item) 472 if type(item) is KCFG.Cover: 473 return self.contains_cover(item) 474 if type(item) is KCFG.Split: 475 return self.contains_split(item) 476 if type(item) is KCFG.NDBranch: 477 return self.contains_ndbranch(item) 478 return False 479 480 def __enter__(self) -> KCFG: 481 self._lock.acquire() 482 return self 483 484 def __exit__( 485 self, 486 exc_type: type[BaseException] | None, 487 exc_value: BaseException | None, 488 traceback: TracebackType | None, 489 ) -> bool: 490 self._lock.release() 491 if exc_type is None: 492 return True 493 return False 494 495 @property 496 def nodes(self) -> list[KCFG.Node]: 497 return list(self._nodes.values()) 498 499 @property 500 def root(self) -> list[KCFG.Node]: 501 return [node for node in self.nodes if self.is_root(node.id)] 502 503 @property 504 def vacuous(self) -> list[KCFG.Node]: 505 return [node for node in self.nodes if self.is_vacuous(node.id)] 506 507 @property 508 def stuck(self) -> list[KCFG.Node]: 509 return [node for node in self.nodes if self.is_stuck(node.id)] 510 511 @property 512 def leaves(self) -> list[KCFG.Node]: 513 return [node for node in self.nodes if self.is_leaf(node.id)] 514 515 @property 516 def covered(self) -> list[KCFG.Node]: 517 return [node for node in self.nodes if self.is_covered(node.id)] 518 519 @property 520 def uncovered(self) -> list[KCFG.Node]: 521 return [node for node in self.nodes if not self.is_covered(node.id)] 522
[docs] 523 @staticmethod 524 def from_claim( 525 defn: KDefinition, claim: KClaim, cfg_dir: Path | None = None, optimize_memory: bool = True 526 ) -> tuple[KCFG, NodeIdLike, NodeIdLike]: 527 cfg = KCFG(cfg_dir=cfg_dir, optimize_memory=optimize_memory) 528 claim_body = claim.body 529 claim_body = defn.instantiate_cell_vars(claim_body) 530 claim_body = rename_generated_vars(claim_body) 531 532 claim_lhs = CTerm.from_kast(extract_lhs(claim_body)).add_constraint(bool_to_ml_pred(claim.requires)) 533 init_node = cfg.create_node(claim_lhs) 534 535 claim_rhs = CTerm.from_kast(extract_rhs(claim_body)).add_constraint( 536 bool_to_ml_pred(andBool([claim.requires, claim.ensures])) 537 ) 538 target_node = cfg.create_node(claim_rhs) 539 540 return cfg, init_node.id, target_node.id
541
[docs] 542 @staticmethod 543 def path_length(_path: Iterable[KCFG.Successor]) -> int: 544 _path = tuple(_path) 545 if len(_path) == 0: 546 return 0 547 if type(_path[0]) is KCFG.Split or type(_path[0]) is KCFG.Cover: 548 return KCFG.path_length(_path[1:]) 549 elif type(_path[0]) is KCFG.NDBranch: 550 return 1 + KCFG.path_length(_path[1:]) 551 elif type(_path[0]) is KCFG.Edge: 552 return _path[0].depth + KCFG.path_length(_path[1:]) 553 elif type(_path[0]) is KCFG.MergedEdge: 554 return min(edge.depth for edge in _path[0].edges) + KCFG.path_length(_path[1:]) # todo: check this 555 raise ValueError(f'Cannot handle Successor type: {type(_path[0])}')
556
[docs] 557 def extend( 558 self, 559 extend_result: KCFGExtendResult, 560 node: KCFG.Node, 561 logs: dict[int, tuple[LogEntry, ...]], 562 *, 563 optimize_kcfg: bool, 564 ) -> None: 565 566 def log(message: str, *, warning: bool = False) -> None: 567 result_info = extend_result.info if type(extend_result) is Step or type(extend_result) is Branch else '' 568 result_info_message = f': {result_info}' if result_info else '' 569 _LOGGER.log( 570 logging.WARNING if warning else logging.INFO, 571 f'Extending KCFG with: {message}{result_info_message}', 572 ) 573 574 match extend_result: 575 case Vacuous(): 576 self.add_vacuous(node.id) 577 log(f'vacuous node: {node.id}', warning=True) 578 579 case Stuck(): 580 self.add_stuck(node.id) 581 log(f'stuck node: {node.id}') 582 583 case Abstract(cterm): 584 new_node = self.create_node(cterm) 585 self.create_cover(node.id, new_node.id) 586 log(f'abstraction node: {node.id}') 587 588 case Step(cterm, depth, next_node_logs, rule_labels, _): 589 node_id = node.id 590 next_node = self.create_node(cterm) 591 # Optimization for steps consists of on-the-fly merging of consecutive edges and can 592 # be performed only if the current node has a single predecessor connected by an Edge 593 if ( 594 optimize_kcfg 595 and (len(predecessors := self.predecessors(target_id=node.id)) == 1) 596 and isinstance(in_edge := predecessors[0], KCFG.Edge) 597 ): 598 # The existing edge is removed and the step parameters are updated accordingly 599 self.remove_edge(in_edge.source.id, node.id) 600 node_id = in_edge.source.id 601 depth += in_edge.depth 602 rule_labels = list(in_edge.rules) + rule_labels 603 next_node_logs = logs[node.id] + next_node_logs if node.id in logs else next_node_logs 604 self.remove_node(node.id) 605 self.create_edge(node_id, next_node.id, depth, rule_labels) 606 logs[next_node.id] = next_node_logs 607 log(f'basic block at depth {depth}: {node_id} --> {next_node.id}') 608 609 case Branch(branches, _): 610 branch_node_ids = self.split_on_constraints(node.id, branches) 611 log(f'{len(branches)} branches: {node.id} --> {branch_node_ids}') 612 613 case NDBranch(cterms, next_node_logs, rule_labels): 614 next_ids = [self.create_node(cterm).id for cterm in cterms] 615 for i in next_ids: 616 logs[i] = next_node_logs 617 self.create_ndbranch(node.id, next_ids, rules=rule_labels) 618 log(f'{len(cterms)} non-deterministic branches: {node.id} --> {next_ids}') 619 620 case _: 621 raise AssertionError()
622
[docs] 623 def to_dict_no_nodes(self) -> dict[str, Any]: 624 nodes = list(self._nodes.keys()) 625 edges = [edge.to_dict() for edge in self.edges()] 626 merged_edges = [merged_edge.to_dict() for merged_edge in self.merged_edges()] 627 covers = [cover.to_dict() for cover in self.covers()] 628 splits = [split.to_dict() for split in self.splits()] 629 ndbranches = [ndbranch.to_dict() for ndbranch in self.ndbranches()] 630 631 aliases = dict(sorted(self._aliases.items())) 632 633 res = { 634 'next': self._node_id, 635 'nodes': nodes, 636 'edges': edges, 637 'merged_edges': merged_edges, 638 'covers': covers, 639 'splits': splits, 640 'ndbranches': ndbranches, 641 'aliases': aliases, 642 } 643 return {k: v for k, v in res.items() if v}
644
[docs] 645 def to_dict(self) -> dict[str, Any]: 646 nodes = [node.to_dict() for node in self.nodes] 647 edges = [edge.to_dict() for edge in self.edges()] 648 merged_edges = [merged_edge.to_dict() for merged_edge in self.merged_edges()] 649 covers = [cover.to_dict() for cover in self.covers()] 650 splits = [split.to_dict() for split in self.splits()] 651 ndbranches = [ndbranch.to_dict() for ndbranch in self.ndbranches()] 652 653 aliases = dict(sorted(self._aliases.items())) 654 655 res = { 656 'next': self._node_id, 657 'nodes': nodes, 658 'edges': edges, 659 'merged_edges': merged_edges, 660 'covers': covers, 661 'splits': splits, 662 'ndbranches': ndbranches, 663 'aliases': aliases, 664 } 665 return {k: v for k, v in res.items() if v}
666
[docs] 667 @staticmethod 668 def from_dict(dct: Mapping[str, Any], optimize_memory: bool = True) -> KCFG: 669 cfg = KCFG(optimize_memory=optimize_memory) 670 671 for node_dict in dct.get('nodes') or []: 672 node = KCFG.Node.from_dict(node_dict) 673 cfg.add_node(node) 674 675 max_id = max([node.id for node in cfg.nodes], default=0) 676 cfg._node_id = dct.get('next', max_id + 1) 677 678 for edge_dict in dct.get('edges') or []: 679 edge = KCFG.Edge.from_dict(edge_dict, cfg._nodes) 680 cfg.add_successor(edge) 681 682 for edge_dict in dct.get('merged_edges') or []: 683 merged_edge = KCFG.MergedEdge.from_dict(edge_dict, cfg._nodes) 684 cfg.add_successor(merged_edge) 685 686 for cover_dict in dct.get('covers') or []: 687 cover = KCFG.Cover.from_dict(cover_dict, cfg._nodes) 688 cfg.add_successor(cover) 689 690 for split_dict in dct.get('splits') or []: 691 split = KCFG.Split.from_dict(split_dict, cfg._nodes) 692 cfg.add_successor(split) 693 694 for ndbranch_dict in dct.get('ndbranches') or []: 695 ndbranch = KCFG.NDBranch.from_dict(ndbranch_dict, cfg._nodes) 696 cfg.add_successor(ndbranch) 697 698 for alias, node_id in dct.get('aliases', {}).items(): 699 cfg.add_alias(alias=alias, node_id=node_id) 700 701 return cfg
702
[docs] 703 def aliases(self, node_id: NodeIdLike) -> list[str]: 704 node_id = self._resolve(node_id) 705 return [alias for alias, value in self._aliases.items() if node_id == value]
706
[docs] 707 def to_json(self) -> str: 708 return json.dumps(self.to_dict(), sort_keys=True)
709
[docs] 710 @staticmethod 711 def from_json(s: str, optimize_memory: bool = True) -> KCFG: 712 return KCFG.from_dict(json.loads(s), optimize_memory=optimize_memory)
713
[docs] 714 def to_rules( 715 self, 716 _id: str | None = None, 717 priority: int = 20, 718 defunc_with: KDefinition | None = None, 719 ) -> list[KRuleLike]: 720 _id = 'BASIC-BLOCK' if _id is None else _id 721 return [e.to_rule(_id, priority=priority, defunc_with=defunc_with) for e in self.edges()] + [ 722 m.to_rule(_id, priority=priority) for m in self.merged_edges() 723 ]
724
[docs] 725 def to_module( 726 self, 727 module_name: str | None = None, 728 imports: Iterable[KImport] = (), 729 priority: int = 20, 730 att: KAtt = EMPTY_ATT, 731 defunc_with: KDefinition | None = None, 732 ) -> KFlatModule: 733 module_name = 'KCFG' if module_name is None else module_name 734 return KFlatModule( 735 module_name, self.to_rules(priority=priority, defunc_with=defunc_with), imports=imports, att=att 736 )
737 738 def _resolve_or_none(self, id_like: NodeIdLike) -> int | None: 739 if type(id_like) is int: 740 if id_like in self._nodes: 741 return id_like 742 743 return None 744 745 if type(id_like) is not str: 746 raise TypeError(f'Expected int or str for id_like, got: {id_like}') 747 748 if id_like.startswith('@'): 749 if id_like[1:] in self._aliases: 750 return self._aliases[id_like[1:]] 751 raise ValueError(f'Unknown alias: {id_like}') 752 753 return None 754 755 def _resolve(self, id_like: NodeIdLike) -> int: 756 match = self._resolve_or_none(id_like) 757 if not match: 758 raise ValueError(f'Unknown node: {id_like}') 759 return match 760
[docs] 761 def node(self, node_id: NodeIdLike) -> KCFG.Node: 762 node_id = self._resolve(node_id) 763 return self._nodes[node_id]
764
[docs] 765 def get_node(self, node_id: NodeIdLike) -> KCFG.Node | None: 766 resolved_id = self._resolve_or_none(node_id) 767 if resolved_id is None: 768 return None 769 return self._nodes[resolved_id]
770
[docs] 771 def contains_node(self, node: KCFG.Node) -> bool: 772 return bool(self.get_node(node.id))
773
[docs] 774 def add_node(self, node: KCFG.Node) -> None: 775 if node.id in self._nodes: 776 raise ValueError(f'Node with id already exists: {node.id}') 777 self._nodes[node.id] = node 778 self._created_nodes.add(node.id)
779
[docs] 780 def create_node(self, cterm: CTerm) -> KCFG.Node: 781 node = KCFG.Node(self._node_id, cterm) 782 self._node_id += 1 783 self._nodes[node.id] = node 784 self._created_nodes.add(node.id) 785 return node
786
[docs] 787 def remove_node(self, node_id: NodeIdLike) -> None: 788 self.remove_edges_around(node_id) 789 790 node_id = self._resolve(node_id) 791 self._nodes.pop(node_id) 792 self._deleted_nodes.add(node_id) 793 self._created_nodes.discard(node_id)
794
[docs] 795 def remove_edges_around(self, node_id: NodeIdLike) -> None: 796 node_id = self._resolve(node_id) 797 798 self._edges = {k: s for k, s in self._edges.items() if k != node_id and node_id not in s.target_ids} 799 self._merged_edges = { 800 k: s for k, s in self._merged_edges.items() if k != node_id and node_id not in s.target_ids 801 } 802 self._covers = {k: s for k, s in self._covers.items() if k != node_id and node_id not in s.target_ids} 803 804 self._splits = {k: s for k, s in self._splits.items() if k != node_id and node_id not in s.target_ids} 805 self._ndbranches = {k: b for k, b in self._ndbranches.items() if k != node_id and node_id not in b.target_ids} 806 807 for alias in [alias for alias, id in self._aliases.items() if id == node_id]: 808 self.remove_alias(alias)
809 810 def _update_refs(self, node_id: int) -> None: 811 node = self.node(node_id) 812 for succ in self.successors(node_id): 813 new_succ = succ.replace_source(node) 814 if type(new_succ) is KCFG.Edge: 815 self._edges[new_succ.source.id] = new_succ 816 if type(new_succ) is KCFG.MergedEdge: 817 self._merged_edges[new_succ.source.id] = new_succ 818 if type(new_succ) is KCFG.Cover: 819 self._covers[new_succ.source.id] = new_succ 820 if type(new_succ) is KCFG.Split: 821 self._splits[new_succ.source.id] = new_succ 822 if type(new_succ) is KCFG.NDBranch: 823 self._ndbranches[new_succ.source.id] = new_succ 824 825 for pred in self.predecessors(node_id): 826 new_pred = pred.replace_target(node) 827 if type(new_pred) is KCFG.Edge: 828 self._edges[new_pred.source.id] = new_pred 829 if type(new_pred) is KCFG.MergedEdge: 830 self._merged_edges[new_pred.source.id] = new_pred 831 if type(new_pred) is KCFG.Cover: 832 self._covers[new_pred.source.id] = new_pred 833 if type(new_pred) is KCFG.Split: 834 self._splits[new_pred.source.id] = new_pred 835 if type(new_pred) is KCFG.NDBranch: 836 self._ndbranches[new_pred.source.id] = new_pred 837
[docs] 838 def remove_attr(self, node_id: NodeIdLike, attr: NodeAttr) -> None: 839 node = self.node(node_id) 840 new_node = node.remove_attr(attr) 841 self.replace_node(new_node)
842
[docs] 843 def discard_attr(self, node_id: NodeIdLike, attr: NodeAttr) -> None: 844 node = self.node(node_id) 845 new_node = node.discard_attr(attr) 846 self.replace_node(new_node)
847
[docs] 848 def add_attr(self, node_id: NodeIdLike, attr: NodeAttr) -> None: 849 node = self.node(node_id) 850 new_node = node.add_attr(attr) 851 self.replace_node(new_node)
852
[docs] 853 def let_node( 854 self, node_id: NodeIdLike, cterm: CTerm | None = None, attrs: Iterable[KCFGNodeAttr] | None = None 855 ) -> None: 856 node = self.node(node_id) 857 new_node = node.let(cterm=cterm, attrs=attrs) 858 self.replace_node(new_node)
859
[docs] 860 def replace_node(self, node: KCFG.Node) -> None: 861 self._nodes[node.id] = node 862 self._created_nodes.add(node.id) 863 self._update_refs(node.id)
864
[docs] 865 def successors(self, source_id: NodeIdLike) -> list[Successor]: 866 out_edges: Iterable[KCFG.Successor] = self.edges(source_id=source_id) 867 out_merged_edges: Iterable[KCFG.Successor] = self.merged_edges(source_id=source_id) 868 out_covers: Iterable[KCFG.Successor] = self.covers(source_id=source_id) 869 out_splits: Iterable[KCFG.Successor] = self.splits(source_id=source_id) 870 out_ndbranches: Iterable[KCFG.Successor] = self.ndbranches(source_id=source_id) 871 return list(out_edges) + list(out_merged_edges) + list(out_covers) + list(out_splits) + list(out_ndbranches)
872
[docs] 873 def predecessors(self, target_id: NodeIdLike) -> list[Successor]: 874 in_edges: Iterable[KCFG.Successor] = self.edges(target_id=target_id) 875 in_merged_edges: Iterable[KCFG.Successor] = self.merged_edges(target_id=target_id) 876 in_covers: Iterable[KCFG.Successor] = self.covers(target_id=target_id) 877 in_splits: Iterable[KCFG.Successor] = self.splits(target_id=target_id) 878 in_ndbranches: Iterable[KCFG.Successor] = self.ndbranches(target_id=target_id) 879 return list(in_edges) + list(in_merged_edges) + list(in_covers) + list(in_splits) + list(in_ndbranches)
880 881 def _check_no_successors(self, source_id: NodeIdLike) -> None: 882 if len(self.successors(source_id)) > 0: 883 raise ValueError(f'Node already has successors: {source_id} -> {self.successors(source_id)}') 884 885 def _check_no_zero_loops(self, source_id: NodeIdLike, target_ids: Iterable[NodeIdLike]) -> None: 886 for target_id in target_ids: 887 path = self.shortest_path_between(target_id, source_id) 888 if path is not None and KCFG.path_length(path) == 0: 889 raise ValueError( 890 f'Adding successor would create zero-length loop with backedge: {source_id} -> {target_id}' 891 ) 892
[docs] 893 def add_successor(self, succ: KCFG.Successor) -> None: 894 self._check_no_successors(succ.source.id) 895 self._check_no_zero_loops(succ.source.id, succ.target_ids) 896 if type(succ) is KCFG.Edge: 897 self._edges[succ.source.id] = succ 898 elif type(succ) is KCFG.MergedEdge: 899 self._merged_edges[succ.source.id] = succ 900 elif type(succ) is KCFG.Cover: 901 self._covers[succ.source.id] = succ 902 else: 903 if len(succ.target_ids) <= 1: 904 raise ValueError( 905 f'Cannot create {type(succ)} node with less than 2 targets: {succ.source.id} -> {succ.target_ids}' 906 ) 907 if type(succ) is KCFG.Split: 908 self._splits[succ.source.id] = succ 909 elif type(succ) is KCFG.NDBranch: 910 self._ndbranches[succ.source.id] = succ
911
[docs] 912 def edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> Edge | None: 913 source_id = self._resolve(source_id) 914 target_id = self._resolve(target_id) 915 edge = self._edges.get(source_id, None) 916 return edge if edge is not None and edge.target.id == target_id else None
917
[docs] 918 def edges(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[Edge]: 919 source_id = self._resolve(source_id) if source_id is not None else None 920 target_id = self._resolve(target_id) if target_id is not None else None 921 return [ 922 edge 923 for edge in self._edges.values() 924 if (source_id is None or source_id == edge.source.id) and (target_id is None or target_id == edge.target.id) 925 ]
926
[docs] 927 def contains_edge(self, edge: Edge) -> bool: 928 if other := self.edge(source_id=edge.source.id, target_id=edge.target.id): 929 return edge == other 930 return False
931
[docs] 932 def create_edge(self, source_id: NodeIdLike, target_id: NodeIdLike, depth: int, rules: Iterable[str] = ()) -> Edge: 933 if depth <= 0: 934 raise ValueError(f'Cannot build KCFG Edge with non-positive depth: {depth}') 935 source = self.node(source_id) 936 target = self.node(target_id) 937 edge = KCFG.Edge(source, target, depth, tuple(rules)) 938 self.add_successor(edge) 939 return edge
940
[docs] 941 def remove_edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> None: 942 source_id = self._resolve(source_id) 943 target_id = self._resolve(target_id) 944 edge = self.edge(source_id, target_id) 945 if not edge: 946 raise ValueError(f'Edge does not exist: {source_id} -> {target_id}') 947 self._edges.pop(source_id)
948
[docs] 949 def merged_edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> MergedEdge | None: 950 source_id = self._resolve(source_id) 951 target_id = self._resolve(target_id) 952 merged_edge = self._merged_edges.get(source_id, None) 953 return merged_edge if merged_edge is not None and merged_edge.target.id == target_id else None
954
[docs] 955 def merged_edges( 956 self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None 957 ) -> list[MergedEdge]: 958 source_id = self._resolve(source_id) if source_id is not None else None 959 target_id = self._resolve(target_id) if target_id is not None else None 960 return [ 961 merged_edge 962 for merged_edge in self._merged_edges.values() 963 if (source_id is None or source_id == merged_edge.source.id) 964 and (target_id is None or target_id == merged_edge.target.id) 965 ]
966
[docs] 967 def contains_merged_edge(self, edge: MergedEdge) -> bool: 968 if other := self.merged_edge(source_id=edge.source.id, target_id=edge.target.id): 969 return edge == other 970 return False
971
[docs] 972 def create_merged_edge( 973 self, source_id: NodeIdLike, target_id: NodeIdLike, edges: Iterable[Edge | MergedEdge] 974 ) -> MergedEdge: 975 if len(list(edges)) == 0: 976 raise ValueError(f'Cannot build KCFG MergedEdge with no edges: {edges}') 977 source = self.node(source_id) 978 target = self.node(target_id) 979 flatten_edges: list[KCFG.Edge] = [] 980 for edge in edges: 981 if isinstance(edge, KCFG.MergedEdge): 982 flatten_edges.extend(edge.edges) 983 else: 984 flatten_edges.append(edge) 985 merged_edge = KCFG.MergedEdge(source, target, tuple(flatten_edges)) 986 self.add_successor(merged_edge) 987 return merged_edge
988
[docs] 989 def remove_merged_edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> None: 990 source_id = self._resolve(source_id) 991 target_id = self._resolve(target_id) 992 merged_edge = self.merged_edge(source_id, target_id) 993 if not merged_edge: 994 raise ValueError(f'MergedEdge does not exist: {source_id} -> {target_id}') 995 self._merged_edges.pop(source_id)
996
[docs] 997 def general_edges( 998 self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None 999 ) -> list[Edge | MergedEdge]: 1000 return self.edges(source_id=source_id, target_id=target_id) + self.merged_edges( 1001 source_id=source_id, target_id=target_id 1002 )
1003
[docs] 1004 def cover(self, source_id: NodeIdLike, target_id: NodeIdLike) -> Cover | None: 1005 source_id = self._resolve(source_id) 1006 target_id = self._resolve(target_id) 1007 cover = self._covers.get(source_id, None) 1008 return cover if cover is not None and cover.target.id == target_id else None
1009
[docs] 1010 def covers(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[Cover]: 1011 source_id = self._resolve(source_id) if source_id is not None else None 1012 target_id = self._resolve(target_id) if target_id is not None else None 1013 return [ 1014 cover 1015 for cover in self._covers.values() 1016 if (source_id is None or source_id == cover.source.id) 1017 and (target_id is None or target_id == cover.target.id) 1018 ]
1019
[docs] 1020 def contains_cover(self, cover: Cover) -> bool: 1021 if other := self.cover(source_id=cover.source.id, target_id=cover.target.id): 1022 return cover == other 1023 return False
1024
[docs] 1025 def create_cover(self, source_id: NodeIdLike, target_id: NodeIdLike, csubst: CSubst | None = None) -> Cover: 1026 source = self.node(source_id) 1027 target = self.node(target_id) 1028 if csubst is None: 1029 csubst = target.cterm.match_with_constraint(source.cterm) 1030 if csubst is None: 1031 raise ValueError(f'No matching between: {source.id} and {target.id}') 1032 cover = KCFG.Cover(source, target, csubst=csubst) 1033 self.add_successor(cover) 1034 return cover
1035
[docs] 1036 def remove_cover(self, source_id: NodeIdLike, target_id: NodeIdLike) -> None: 1037 source_id = self._resolve(source_id) 1038 target_id = self._resolve(target_id) 1039 cover = self.cover(source_id, target_id) 1040 if not cover: 1041 raise ValueError(f'Cover does not exist: {source_id} -> {target_id}') 1042 self._covers.pop(source_id)
1043
[docs] 1044 def edge_likes(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[EdgeLike]: 1045 return ( 1046 cast('List[KCFG.EdgeLike]', self.edges(source_id=source_id, target_id=target_id)) 1047 + cast('List[KCFG.EdgeLike]', self.covers(source_id=source_id, target_id=target_id)) 1048 + cast('List[KCFG.EdgeLike]', self.merged_edges(source_id=source_id, target_id=target_id)) 1049 )
1050
[docs] 1051 def add_vacuous(self, node_id: NodeIdLike) -> None: 1052 self.add_attr(node_id, KCFGNodeAttr.VACUOUS)
1053
[docs] 1054 def remove_vacuous(self, node_id: NodeIdLike) -> None: 1055 self.remove_attr(node_id, KCFGNodeAttr.VACUOUS)
1056
[docs] 1057 def discard_vacuous(self, node_id: NodeIdLike) -> None: 1058 self.discard_attr(node_id, KCFGNodeAttr.VACUOUS)
1059
[docs] 1060 def add_stuck(self, node_id: NodeIdLike) -> None: 1061 self.add_attr(node_id, KCFGNodeAttr.STUCK)
1062
[docs] 1063 def remove_stuck(self, node_id: NodeIdLike) -> None: 1064 self.remove_attr(node_id, KCFGNodeAttr.STUCK)
1065
[docs] 1066 def discard_stuck(self, node_id: NodeIdLike) -> None: 1067 self.discard_attr(node_id, KCFGNodeAttr.STUCK)
1068
[docs] 1069 def splits(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[Split]: 1070 source_id = self._resolve(source_id) if source_id is not None else None 1071 target_id = self._resolve(target_id) if target_id is not None else None 1072 return [ 1073 s 1074 for s in self._splits.values() 1075 if (source_id is None or source_id == s.source.id) and (target_id is None or target_id in s.target_ids) 1076 ]
1077
[docs] 1078 def contains_split(self, split: Split) -> bool: 1079 return split in self._splits.values()
1080
[docs] 1081 def create_split(self, source_id: NodeIdLike, splits: Iterable[tuple[NodeIdLike, CSubst]]) -> KCFG.Split: 1082 source_id = self._resolve(source_id) 1083 split = KCFG.Split(self.node(source_id), tuple((self.node(nid), csubst) for nid, csubst in list(splits))) 1084 self.add_successor(split) 1085 return split
1086
[docs] 1087 def create_split_by_nodes(self, source_id: NodeIdLike, target_ids: Iterable[NodeIdLike]) -> KCFG.Split | None: 1088 """Create a split without crafting a CSubst.""" 1089 source = self.node(source_id) 1090 targets = [self.node(nid) for nid in target_ids] 1091 try: 1092 csubsts = [not_none(source.cterm.match_with_constraint(target.cterm)) for target in targets] 1093 except ValueError: 1094 return None 1095 return self.create_split(source.id, zip(target_ids, csubsts, strict=True))
1096
[docs] 1097 def ndbranches(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[NDBranch]: 1098 source_id = self._resolve(source_id) if source_id is not None else None 1099 target_id = self._resolve(target_id) if target_id is not None else None 1100 return [ 1101 b 1102 for b in self._ndbranches.values() 1103 if (source_id is None or source_id == b.source.id) and (target_id is None or target_id in b.target_ids) 1104 ]
1105
[docs] 1106 def contains_ndbranch(self, ndbranch: NDBranch) -> bool: 1107 return ndbranch in self._ndbranches
1108
[docs] 1109 def create_ndbranch( 1110 self, source_id: NodeIdLike, ndbranches: Iterable[NodeIdLike], rules: Iterable[str] = () 1111 ) -> KCFG.NDBranch: 1112 source_id = self._resolve(source_id) 1113 ndbranch = KCFG.NDBranch(self.node(source_id), tuple(self.node(nid) for nid in list(ndbranches)), tuple(rules)) 1114 self.add_successor(ndbranch) 1115 return ndbranch
1116
[docs] 1117 def split_on_constraints(self, source_id: NodeIdLike, constraints: Iterable[KInner]) -> list[int]: 1118 source = self.node(source_id) 1119 branch_node_ids = [self.create_node(source.cterm.add_constraint(c)).id for c in constraints] 1120 csubsts = [not_none(source.cterm.match_with_constraint(self.node(id).cterm)) for id in branch_node_ids] 1121 self.create_split(source.id, zip(branch_node_ids, csubsts, strict=True)) 1122 return branch_node_ids
1123
[docs] 1124 def add_alias(self, alias: str, node_id: NodeIdLike) -> None: 1125 if '@' in alias: 1126 raise ValueError('Alias may not contain "@"') 1127 if alias in self._aliases: 1128 raise ValueError(f'Duplicate alias: {alias}') 1129 node_id = self._resolve(node_id) 1130 self._aliases[alias] = node_id
1131
[docs] 1132 def remove_alias(self, alias: str) -> None: 1133 if alias not in self._aliases: 1134 raise ValueError(f'Alias does not exist: {alias}') 1135 self._aliases.pop(alias)
1136
[docs] 1137 def is_root(self, node_id: NodeIdLike) -> bool: 1138 node_id = self._resolve(node_id) 1139 return len(self.predecessors(node_id)) == 0
1140
[docs] 1141 def is_vacuous(self, node_id: NodeIdLike) -> bool: 1142 return KCFGNodeAttr.VACUOUS in self.node(node_id).attrs
1143
[docs] 1144 def is_stuck(self, node_id: NodeIdLike) -> bool: 1145 return KCFGNodeAttr.STUCK in self.node(node_id).attrs
1146
[docs] 1147 def is_split(self, node_id: NodeIdLike) -> bool: 1148 node_id = self._resolve(node_id) 1149 return node_id in self._splits
1150
[docs] 1151 def is_ndbranch(self, node_id: NodeIdLike) -> bool: 1152 node_id = self._resolve(node_id) 1153 return node_id in self._ndbranches
1154
[docs] 1155 def is_leaf(self, node_id: NodeIdLike) -> bool: 1156 return len(self.successors(node_id)) == 0
1157
[docs] 1158 def is_covered(self, node_id: NodeIdLike) -> bool: 1159 node_id = self._resolve(node_id) 1160 return node_id in self._covers
1161
[docs] 1162 def prune(self, node_id: NodeIdLike, keep_nodes: Iterable[NodeIdLike] = ()) -> list[int]: 1163 nodes = self.reachable_nodes(node_id) 1164 keep_nodes = [self._resolve(nid) for nid in keep_nodes] 1165 pruned_nodes: list[int] = [] 1166 for node in nodes: 1167 if node.id not in keep_nodes: 1168 self.remove_node(node.id) 1169 pruned_nodes.append(node.id) 1170 return pruned_nodes
1171
[docs] 1172 def shortest_path_between( 1173 self, source_node_id: NodeIdLike, target_node_id: NodeIdLike 1174 ) -> tuple[Successor, ...] | None: 1175 paths = self.paths_between(source_node_id, target_node_id) 1176 if len(paths) == 0: 1177 return None 1178 return sorted(paths, key=(lambda path: KCFG.path_length(path)))[0]
1179
[docs] 1180 def shortest_distance_between(self, node_1_id: NodeIdLike, node_2_id: NodeIdLike) -> int | None: 1181 path_1 = self.shortest_path_between(node_1_id, node_2_id) 1182 path_2 = self.shortest_path_between(node_2_id, node_1_id) 1183 distance: int | None = None 1184 if path_1 is not None: 1185 distance = KCFG.path_length(path_1) 1186 if path_2 is not None: 1187 distance_2 = KCFG.path_length(path_2) 1188 if distance is None or distance_2 < distance: 1189 distance = distance_2 1190 return distance
1191
[docs] 1192 def zero_depth_between(self, node_1_id: NodeIdLike, node_2_id: NodeIdLike) -> bool: 1193 _node_1_id = self._resolve(node_1_id) 1194 _node_2_id = self._resolve(node_2_id) 1195 if _node_1_id == _node_2_id: 1196 return True 1197 # Short-circuit and don't run pathing algorithm if there is no 0 length path on the first step. 1198 path_lengths = [ 1199 self.path_length([successor]) for successor in self.successors(_node_1_id) + self.successors(_node_2_id) 1200 ] 1201 if 0 not in path_lengths: 1202 return False 1203 1204 shortest_distance = self.shortest_distance_between(_node_1_id, _node_2_id) 1205 1206 return shortest_distance is not None and shortest_distance == 0
1207
[docs] 1208 def paths_between(self, source_id: NodeIdLike, target_id: NodeIdLike) -> list[tuple[Successor, ...]]: 1209 source_id = self._resolve(source_id) 1210 target_id = self._resolve(target_id) 1211 1212 if source_id == target_id: 1213 return [()] 1214 1215 source_successors = list(self.successors(source_id)) 1216 assert len(source_successors) <= 1 1217 if len(source_successors) == 0: 1218 return [] 1219 1220 paths: list[tuple[KCFG.Successor, ...]] = [] 1221 worklist: list[list[KCFG.Successor]] = [[source_successors[0]]] 1222 1223 def _in_path(_nid: int, _path: list[KCFG.Successor]) -> bool: 1224 for succ in _path: 1225 if _nid == succ.source.id: 1226 return True 1227 if len(_path) > 0: 1228 if isinstance(_path[-1], KCFG.EdgeLike) and _path[-1].target.id == _nid: 1229 return True 1230 elif isinstance(_path[-1], KCFG.MultiEdge) and _nid in _path[-1].target_ids: 1231 return True 1232 return False 1233 1234 while worklist: 1235 curr_path = worklist.pop() 1236 curr_successor = curr_path[-1] 1237 successors: list[KCFG.Successor] = [] 1238 1239 if isinstance(curr_successor, KCFG.EdgeLike): 1240 if curr_successor.target.id == target_id: 1241 paths.append(tuple(curr_path)) 1242 continue 1243 else: 1244 successors = list(self.successors(curr_successor.target.id)) 1245 1246 elif isinstance(curr_successor, KCFG.MultiEdge): 1247 if len(list(curr_successor.targets)) == 1: 1248 target = list(curr_successor.targets)[0] 1249 if target.id == target_id: 1250 paths.append(tuple(curr_path)) 1251 continue 1252 else: 1253 successors = list(self.successors(target.id)) 1254 if len(list(curr_successor.targets)) > 1: 1255 curr_path = curr_path[0:-1] 1256 successors = [curr_successor.with_single_target(target) for target in curr_successor.targets] 1257 1258 for successor in successors: 1259 if isinstance(successor, KCFG.EdgeLike) and not _in_path(successor.target.id, curr_path): 1260 worklist.append(curr_path + [successor]) 1261 elif isinstance(successor, KCFG.MultiEdge): 1262 if len(list(successor.targets)) == 1: 1263 target = list(successor.targets)[0] 1264 if not _in_path(target.id, curr_path): 1265 worklist.append(curr_path + [successor]) 1266 elif len(list(successor.targets)) > 1: 1267 worklist.append(curr_path + [successor]) 1268 1269 return paths
1270
[docs] 1271 def reachable_nodes(self, source_id: NodeIdLike, *, reverse: bool = False) -> set[KCFG.Node]: 1272 visited: set[KCFG.Node] = set() 1273 worklist: list[KCFG.Node] = [self.node(source_id)] 1274 1275 while worklist: 1276 node = worklist.pop() 1277 1278 if node in visited: 1279 continue 1280 1281 visited.add(node) 1282 1283 if not reverse: 1284 worklist.extend(target for succ in self.successors(source_id=node.id) for target in succ.targets) 1285 else: 1286 worklist.extend(succ.source for succ in self.predecessors(target_id=node.id)) 1287 1288 return visited
1289
[docs] 1290 def write_cfg_data(self) -> None: 1291 assert self._kcfg_store is not None 1292 self._kcfg_store.write_cfg_data( 1293 self, self.to_dict_no_nodes(), deleted_nodes=self._deleted_nodes, created_nodes=self._created_nodes 1294 ) 1295 self._deleted_nodes.clear() 1296 self._created_nodes.clear()
1297
[docs] 1298 @staticmethod 1299 def read_cfg_data(cfg_dir: Path) -> KCFG: 1300 store = KCFGStore(cfg_dir) 1301 cfg = KCFG.from_dict(store.read_cfg_data()) 1302 cfg._kcfg_store = store 1303 return cfg
1304
[docs] 1305 @staticmethod 1306 def read_node_data(cfg_dir: Path, node_id: int) -> KCFG.Node: 1307 store = KCFGStore(cfg_dir) 1308 return KCFG.Node.from_dict(store.read_node_data(node_id))
1309 1310
[docs] 1311class KCFGExtendResult(ABC): ...
1312 1313
[docs] 1314@final 1315@dataclass(frozen=True) 1316class Vacuous(KCFGExtendResult): ...
1317 1318
[docs] 1319@final 1320@dataclass(frozen=True) 1321class Stuck(KCFGExtendResult): ...
1322 1323
[docs] 1324@final 1325@dataclass(frozen=True) 1326class Abstract(KCFGExtendResult): 1327 cterm: CTerm
1328 1329
[docs] 1330@final 1331@dataclass(frozen=True) 1332class Step(KCFGExtendResult): 1333 cterm: CTerm 1334 depth: int 1335 logs: tuple[LogEntry, ...] 1336 rule_labels: list[str] 1337 cut: bool = field(default=False) 1338 info: str = field(default='')
1339 1340
[docs] 1341@final 1342@dataclass(frozen=True) 1343class Branch(KCFGExtendResult): 1344 constraints: tuple[KInner, ...] 1345 heuristic: bool 1346 info: str = field(default='') 1347 1348 def __init__(self, constraints: Iterable[KInner], *, heuristic: bool = False, info: str = ''): 1349 object.__setattr__(self, 'constraints', tuple(constraints)) 1350 object.__setattr__(self, 'heuristic', heuristic) 1351 object.__setattr__(self, 'info', info)
1352 1353
[docs] 1354@final 1355@dataclass(frozen=True) 1356class NDBranch(KCFGExtendResult): 1357 cterms: tuple[CTerm, ...] 1358 logs: tuple[LogEntry, ...] 1359 rule_labels: tuple[str, ...] 1360 1361 def __init__(self, cterms: Iterable[CTerm], logs: Iterable[LogEntry,], rule_labels: Iterable[str]): 1362 object.__setattr__(self, 'cterms', tuple(cterms)) 1363 object.__setattr__(self, 'logs', tuple(logs)) 1364 object.__setattr__(self, 'rule_labels', tuple(rule_labels))