Source code for pyk.kcfg.kcfg

   1from __future__ import annotations
   2
   3import json
   4import logging
   5from abc import ABC, abstractmethod
   6from collections.abc import Container
   7from dataclasses import dataclass, field
   8from threading import RLock
   9from typing import TYPE_CHECKING, Final, List, Union, cast, final
  10
  11from ..cterm import CSubst, CTerm, cterm_build_claim, cterm_build_rule
  12from ..kast import EMPTY_ATT
  13from ..kast.inner import KApply
  14from ..kast.manip import (
  15    bool_to_ml_pred,
  16    extract_lhs,
  17    extract_rhs,
  18    inline_cell_maps,
  19    minimize_rule_like,
  20    rename_generated_vars,
  21    sort_ac_collections,
  22)
  23from ..kast.outer import KFlatModule
  24from ..kast.prelude.kbool import andBool
  25from ..utils import ensure_dir_path, not_none
  26
  27if TYPE_CHECKING:
  28    from collections.abc import Iterable, Mapping, MutableMapping
  29    from pathlib import Path
  30    from types import TracebackType
  31    from typing import Any
  32
  33    from ..kast import KAtt
  34    from ..kast.inner import KInner
  35    from ..kast.outer import KClaim, KDefinition, KImport, KRuleLike
  36    from ..kore.rpc import LogEntry
  37
  38
  39NodeIdLike = int | str
  40
  41_LOGGER: Final = logging.getLogger(__name__)
  42
  43
[docs] 44@dataclass(frozen=True) 45class NodeAttr: 46 value: str
47 48
[docs] 49class KCFGNodeAttr(NodeAttr): 50 VACUOUS = NodeAttr('vacuous') 51 STUCK = NodeAttr('stuck')
52 53
[docs] 54class KCFGStore: 55 store_path: Path 56 57 def __init__(self, store_path: Path) -> None: 58 self.store_path = store_path 59 ensure_dir_path(store_path) 60 ensure_dir_path(self.kcfg_node_dir) 61 62 @property 63 def kcfg_json_path(self) -> Path: 64 return self.store_path / 'kcfg.json' 65 66 @property 67 def kcfg_node_dir(self) -> Path: 68 return self.store_path / 'nodes' 69
[docs] 70 def kcfg_node_path(self, node_id: int) -> Path: 71 return self.kcfg_node_dir / f'{node_id}.json'
72
[docs] 73 def write_cfg_data( 74 self, kcfg: KCFG, dct: dict[str, Any], deleted_nodes: Iterable[int] = (), created_nodes: Iterable[int] = () 75 ) -> None: 76 vacuous_nodes = [ 77 node_id for node_id in kcfg._nodes.keys() if KCFGNodeAttr.VACUOUS in kcfg._nodes[node_id].attrs 78 ] 79 stuck_nodes = [node_id for node_id in kcfg._nodes.keys() if KCFGNodeAttr.STUCK in kcfg._nodes[node_id].attrs] 80 dct['vacuous'] = vacuous_nodes 81 dct['stuck'] = stuck_nodes 82 for node_id in deleted_nodes: 83 self.kcfg_node_path(node_id).unlink(missing_ok=True) 84 for node_id in created_nodes: 85 self.kcfg_node_path(node_id).write_text(json.dumps(kcfg._nodes[node_id].to_dict())) 86 self.kcfg_json_path.write_text(json.dumps(dct))
87
[docs] 88 def read_cfg_data(self) -> dict[str, Any]: 89 dct = json.loads(self.kcfg_json_path.read_text()) 90 nodes = [self.read_node_data(node_id) for node_id in dct.get('nodes') or []] 91 dct['nodes'] = nodes 92 93 new_nodes = [] 94 for node in dct['nodes']: 95 attrs = [] 96 if node['id'] in dct['vacuous']: 97 attrs.append(KCFGNodeAttr.VACUOUS.value) 98 if node['id'] in dct['stuck']: 99 attrs.append(KCFGNodeAttr.STUCK.value) 100 new_nodes.append({'id': node['id'], 'cterm': node['cterm'], 'attrs': attrs}) 101 102 dct['nodes'] = new_nodes 103 104 del dct['vacuous'] 105 del dct['stuck'] 106 107 return dct
108
[docs] 109 def read_node_data(self, node_id: int) -> dict[str, Any]: 110 return json.loads(self.kcfg_node_path(node_id).read_text())
111 112
[docs] 113class KCFG(Container[Union['KCFG.Node', 'KCFG.Successor']]):
[docs] 114 @final 115 @dataclass(frozen=True, order=True) 116 class Node: 117 id: int 118 cterm: CTerm 119 attrs: frozenset[NodeAttr] 120 121 def __init__(self, id: int, cterm: CTerm, attrs: Iterable[NodeAttr] = ()) -> None: 122 object.__setattr__(self, 'id', id) 123 object.__setattr__(self, 'cterm', cterm) 124 object.__setattr__(self, 'attrs', frozenset(attrs)) 125
[docs] 126 def to_dict(self) -> dict[str, Any]: 127 return {'id': self.id, 'cterm': self.cterm.to_dict(), 'attrs': [attr.value for attr in self.attrs]}
128
[docs] 129 @staticmethod 130 def from_dict(dct: dict[str, Any]) -> KCFG.Node: 131 return KCFG.Node(dct['id'], CTerm.from_dict(dct['cterm']), [NodeAttr(attr) for attr in dct['attrs']])
132
[docs] 133 def add_attr(self, attr: NodeAttr) -> KCFG.Node: 134 return KCFG.Node(self.id, self.cterm, list(self.attrs) + [attr])
135
[docs] 136 def remove_attr(self, attr: NodeAttr) -> KCFG.Node: 137 if attr not in self.attrs: 138 raise ValueError(f'Node {self.id} does not have attribute {attr.value}') 139 return KCFG.Node(self.id, self.cterm, self.attrs.difference([attr]))
140
[docs] 141 def discard_attr(self, attr: NodeAttr) -> KCFG.Node: 142 return KCFG.Node(self.id, self.cterm, self.attrs.difference([attr]))
143
[docs] 144 def let(self, cterm: CTerm | None = None, attrs: Iterable[KCFGNodeAttr] | None = None) -> KCFG.Node: 145 new_cterm = cterm if cterm is not None else self.cterm 146 new_attrs = attrs if attrs is not None else self.attrs 147 return KCFG.Node(self.id, new_cterm, new_attrs)
148 149 @property 150 def free_vars(self) -> frozenset[str]: 151 return frozenset(self.cterm.free_vars)
152
[docs] 153 class Successor(ABC): 154 source: KCFG.Node 155 156 def __lt__(self, other: Any) -> bool: 157 if not isinstance(other, KCFG.Successor): 158 return NotImplemented 159 return self.source < other.source 160 161 @property 162 def source_vars(self) -> frozenset[str]: 163 return frozenset(self.source.free_vars) 164 165 @property 166 @abstractmethod 167 def targets(self) -> tuple[KCFG.Node, ...]: ... 168 169 @property 170 def target_ids(self) -> list[int]: 171 return sorted(target.id for target in self.targets) 172 173 @property 174 def target_vars(self) -> frozenset[str]: 175 return frozenset(set.union(set(), *(target.free_vars for target in self.targets))) 176
[docs] 177 @abstractmethod 178 def replace_source(self, node: KCFG.Node) -> KCFG.Successor: ...
179
[docs] 180 @abstractmethod 181 def replace_target(self, node: KCFG.Node) -> KCFG.Successor: ...
182
[docs] 183 @abstractmethod 184 def to_dict(self) -> dict[str, Any]: ...
185
[docs] 186 @staticmethod 187 @abstractmethod 188 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Successor: ...
189
[docs] 190 class EdgeLike(Successor): 191 source: KCFG.Node 192 target: KCFG.Node 193 194 @property 195 def targets(self) -> tuple[KCFG.Node, ...]: 196 return (self.target,)
197
[docs] 198 @final 199 @dataclass(frozen=True) 200 class Edge(EdgeLike): 201 source: KCFG.Node 202 target: KCFG.Node 203 depth: int 204 rules: tuple[str, ...] 205
[docs] 206 def to_dict(self) -> dict[str, Any]: 207 return { 208 'source': self.source.id, 209 'target': self.target.id, 210 'depth': self.depth, 211 'rules': list(self.rules), 212 }
213
[docs] 214 @staticmethod 215 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Edge: 216 return KCFG.Edge(nodes[dct['source']], nodes[dct['target']], dct['depth'], tuple(dct['rules']))
217
[docs] 218 def to_rule( 219 self, 220 label: str, 221 claim: bool = False, 222 priority: int | None = None, 223 defunc_with: KDefinition | None = None, 224 minimize: bool = False, 225 ) -> KRuleLike: 226 def is_ceil_condition(kast: KInner) -> bool: 227 return type(kast) is KApply and kast.label.name == '#Ceil' 228 229 def _simplify_config(config: KInner) -> KInner: 230 return sort_ac_collections(inline_cell_maps(config)) 231 232 sentence_id = f'{label}-{self.source.id}-TO-{self.target.id}' 233 init_constraints = [c for c in self.source.cterm.constraints if not is_ceil_condition(c)] 234 init_cterm = CTerm(_simplify_config(self.source.cterm.config), init_constraints) 235 target_constraints = [c for c in self.target.cterm.constraints if not is_ceil_condition(c)] 236 target_cterm = CTerm(_simplify_config(self.target.cterm.config), target_constraints) 237 rule: KRuleLike 238 if claim: 239 rule, _ = cterm_build_claim(sentence_id, init_cterm, target_cterm) 240 else: 241 rule, _ = cterm_build_rule( 242 sentence_id, init_cterm, target_cterm, priority=priority, defunc_with=defunc_with 243 ) 244 if minimize: 245 rule = minimize_rule_like(rule) 246 return rule
247
[docs] 248 def replace_source(self, node: KCFG.Node) -> KCFG.Edge: 249 assert node.id == self.source.id 250 return KCFG.Edge(node, self.target, self.depth, self.rules)
251
[docs] 252 def replace_target(self, node: KCFG.Node) -> KCFG.Edge: 253 assert node.id == self.target.id 254 return KCFG.Edge(self.source, node, self.depth, self.rules)
255
[docs] 256 @final 257 @dataclass(frozen=True) 258 class MergedEdge(EdgeLike): 259 """Merged edge is a collection of edges that have been merged into a single edge.""" 260 261 source: KCFG.Node 262 target: KCFG.Node 263 edges: tuple[KCFG.Edge, ...] 264
[docs] 265 def to_dict(self) -> dict[str, Any]: 266 return { 267 'source': self.source.id, 268 'target': self.target.id, 269 'edges': [edge.to_dict() for edge in self.edges], 270 }
271
[docs] 272 @staticmethod 273 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Successor: 274 return KCFG.MergedEdge( 275 nodes[dct['source']], 276 nodes[dct['target']], 277 tuple(KCFG.Edge.from_dict(edge, nodes) for edge in dct['edges']), 278 )
279
[docs] 280 def replace_source(self, node: KCFG.Node) -> KCFG.Successor: 281 assert node.id == self.source.id 282 return KCFG.MergedEdge(node, self.target, self.edges)
283
[docs] 284 def replace_target(self, node: KCFG.Node) -> KCFG.Successor: 285 assert node.id == self.target.id 286 return KCFG.MergedEdge(self.source, node, self.edges)
287
[docs] 288 def to_rule(self, label: str, claim: bool = False, priority: int | None = None) -> KRuleLike: 289 return KCFG.Edge(self.source, self.target, 1, ()).to_rule(label, claim, priority)
290
[docs] 291 @final 292 @dataclass(frozen=True) 293 class Cover(EdgeLike): 294 source: KCFG.Node 295 target: KCFG.Node 296 csubst: CSubst 297
[docs] 298 def to_dict(self) -> dict[str, Any]: 299 return { 300 'source': self.source.id, 301 'target': self.target.id, 302 'csubst': self.csubst.to_dict(), 303 }
304
[docs] 305 @staticmethod 306 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Cover: 307 return KCFG.Cover(nodes[dct['source']], nodes[dct['target']], CSubst.from_dict(dct['csubst']))
308
[docs] 309 def replace_source(self, node: KCFG.Node) -> KCFG.Cover: 310 assert node.id == self.source.id 311 return KCFG.Cover(node, self.target, self.csubst)
312
[docs] 313 def replace_target(self, node: KCFG.Node) -> KCFG.Cover: 314 assert node.id == self.target.id 315 return KCFG.Cover(self.source, node, self.csubst)
316
[docs] 317 @dataclass(frozen=True) 318 class MultiEdge(Successor): 319 source: KCFG.Node 320 321 def __lt__(self, other: Any) -> bool: 322 if not type(other) is type(self): 323 return NotImplemented 324 return (self.source, self.target_ids) < (other.source, other.target_ids) 325
[docs] 326 @abstractmethod 327 def with_single_target(self, target: KCFG.Node) -> KCFG.MultiEdge: ...
328
[docs] 329 @final 330 @dataclass(frozen=True) 331 class Split(MultiEdge): 332 source: KCFG.Node 333 _targets: tuple[tuple[KCFG.Node, CSubst], ...] 334 335 def __init__(self, source: KCFG.Node, _targets: Iterable[tuple[KCFG.Node, CSubst]]) -> None: 336 object.__setattr__(self, 'source', source) 337 object.__setattr__(self, '_targets', tuple(_targets)) 338 339 @property 340 def targets(self) -> tuple[KCFG.Node, ...]: 341 return tuple(target for target, _ in self._targets) 342 343 @property 344 def splits(self) -> dict[int, CSubst]: 345 return {target.id: csubst for target, csubst in self._targets} 346
[docs] 347 def to_dict(self) -> dict[str, Any]: 348 return { 349 'source': self.source.id, 350 'targets': [ 351 { 352 'target': target.id, 353 'csubst': csubst.to_dict(), 354 } 355 for target, csubst in self._targets 356 ], 357 }
358
[docs] 359 @staticmethod 360 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Split: 361 _targets = [(nodes[target['target']], CSubst.from_dict(target['csubst'])) for target in dct['targets']] 362 return KCFG.Split(nodes[dct['source']], tuple(_targets))
363
[docs] 364 def with_single_target(self, target: KCFG.Node) -> KCFG.Split: 365 return KCFG.Split(self.source, ((target, self.splits[target.id]),))
366 367 @property 368 def covers(self) -> tuple[KCFG.Cover, ...]: 369 return tuple(KCFG.Cover(target, self.source, csubst) for target, csubst in self._targets) 370
[docs] 371 def replace_source(self, node: KCFG.Node) -> KCFG.Split: 372 assert node.id == self.source.id 373 return KCFG.Split(node, self._targets)
374
[docs] 375 def replace_target(self, node: KCFG.Node) -> KCFG.Split: 376 assert node.id in self.target_ids 377 new_targets = [ 378 (node, csubst) if node.id == target_node.id else (target_node, csubst) 379 for target_node, csubst in self._targets 380 ] 381 return KCFG.Split(self.source, tuple(new_targets))
382
[docs] 383 @final 384 @dataclass(frozen=True) 385 class NDBranch(MultiEdge): 386 source: KCFG.Node 387 _targets: tuple[KCFG.Node, ...] 388 rules: tuple[str, ...] 389 390 def __init__(self, source: KCFG.Node, _targets: Iterable[KCFG.Node], rules: tuple[str, ...]) -> None: 391 object.__setattr__(self, 'source', source) 392 object.__setattr__(self, '_targets', tuple(_targets)) 393 object.__setattr__(self, 'rules', rules) 394 395 @property 396 def targets(self) -> tuple[KCFG.Node, ...]: 397 return self._targets 398
[docs] 399 def to_dict(self) -> dict[str, Any]: 400 return { 401 'source': self.source.id, 402 'targets': [target.id for target in self.targets], 403 'rules': list(self.rules), 404 }
405
[docs] 406 @staticmethod 407 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.NDBranch: 408 return KCFG.NDBranch( 409 nodes[dct['source']], tuple([nodes[target] for target in dct['targets']]), tuple(dct['rules']) 410 )
411
[docs] 412 def with_single_target(self, target: KCFG.Node) -> KCFG.NDBranch: 413 return KCFG.NDBranch(self.source, (target,), ())
414 415 @property 416 def edges(self) -> tuple[KCFG.Edge, ...]: 417 return tuple(KCFG.Edge(self.source, target, 1, ()) for target in self.targets) 418
[docs] 419 def replace_source(self, node: KCFG.Node) -> KCFG.NDBranch: 420 assert node.id == self.source.id 421 return KCFG.NDBranch(node, self._targets, self.rules)
422
[docs] 423 def replace_target(self, node: KCFG.Node) -> KCFG.NDBranch: 424 assert node.id in self.target_ids 425 new_targets = [node if node.id == target_node.id else target_node for target_node in self._targets] 426 return KCFG.NDBranch(self.source, tuple(new_targets), self.rules)
427 428 _node_id: int 429 _nodes: MutableMapping[int, KCFG.Node] 430 431 _created_nodes: set[int] 432 _deleted_nodes: set[int] 433 434 _edges: dict[int, Edge] 435 _merged_edges: dict[int, MergedEdge] 436 _covers: dict[int, Cover] 437 _splits: dict[int, Split] 438 _ndbranches: dict[int, NDBranch] 439 _aliases: dict[str, int] 440 _lock: RLock 441 442 _kcfg_store: KCFGStore | None 443 444 def __init__(self, cfg_dir: Path | None = None, optimize_memory: bool = True) -> None: 445 self._node_id = 1 446 if optimize_memory: 447 from .store import OptimizedNodeStore 448 449 self._nodes = OptimizedNodeStore() 450 else: 451 self._nodes = {} 452 self._created_nodes = set() 453 self._deleted_nodes = set() 454 self._edges = {} 455 self._merged_edges = {} 456 self._covers = {} 457 self._splits = {} 458 self._ndbranches = {} 459 self._aliases = {} 460 self._lock = RLock() 461 if cfg_dir is not None: 462 self._kcfg_store = KCFGStore(cfg_dir) 463 464 def __contains__(self, item: object) -> bool: 465 if type(item) is KCFG.Node: 466 return self.contains_node(item) 467 if type(item) is KCFG.Edge: 468 return self.contains_edge(item) 469 if type(item) is KCFG.MergedEdge: 470 return self.contains_merged_edge(item) 471 if type(item) is KCFG.Cover: 472 return self.contains_cover(item) 473 if type(item) is KCFG.Split: 474 return self.contains_split(item) 475 if type(item) is KCFG.NDBranch: 476 return self.contains_ndbranch(item) 477 return False 478 479 def __enter__(self) -> KCFG: 480 self._lock.acquire() 481 return self 482 483 def __exit__( 484 self, 485 exc_type: type[BaseException] | None, 486 exc_value: BaseException | None, 487 traceback: TracebackType | None, 488 ) -> bool: 489 self._lock.release() 490 if exc_type is None: 491 return True 492 return False 493 494 @property 495 def nodes(self) -> list[KCFG.Node]: 496 return list(self._nodes.values()) 497 498 @property 499 def root(self) -> list[KCFG.Node]: 500 return [node for node in self.nodes if self.is_root(node.id)] 501 502 @property 503 def vacuous(self) -> list[KCFG.Node]: 504 return [node for node in self.nodes if self.is_vacuous(node.id)] 505 506 @property 507 def stuck(self) -> list[KCFG.Node]: 508 return [node for node in self.nodes if self.is_stuck(node.id)] 509 510 @property 511 def leaves(self) -> list[KCFG.Node]: 512 return [node for node in self.nodes if self.is_leaf(node.id)] 513 514 @property 515 def covered(self) -> list[KCFG.Node]: 516 return [node for node in self.nodes if self.is_covered(node.id)] 517 518 @property 519 def uncovered(self) -> list[KCFG.Node]: 520 return [node for node in self.nodes if not self.is_covered(node.id)] 521
[docs] 522 @staticmethod 523 def from_claim( 524 defn: KDefinition, claim: KClaim, cfg_dir: Path | None = None, optimize_memory: bool = True 525 ) -> tuple[KCFG, NodeIdLike, NodeIdLike]: 526 cfg = KCFG(cfg_dir=cfg_dir, optimize_memory=optimize_memory) 527 claim_body = claim.body 528 claim_body = defn.instantiate_cell_vars(claim_body) 529 claim_body = rename_generated_vars(claim_body) 530 531 claim_lhs = CTerm.from_kast(extract_lhs(claim_body)).add_constraint(bool_to_ml_pred(claim.requires)) 532 init_node = cfg.create_node(claim_lhs) 533 534 claim_rhs = CTerm.from_kast(extract_rhs(claim_body)).add_constraint( 535 bool_to_ml_pred(andBool([claim.requires, claim.ensures])) 536 ) 537 target_node = cfg.create_node(claim_rhs) 538 539 return cfg, init_node.id, target_node.id
540
[docs] 541 @staticmethod 542 def path_length(_path: Iterable[KCFG.Successor]) -> int: 543 _path = tuple(_path) 544 if len(_path) == 0: 545 return 0 546 if type(_path[0]) is KCFG.Split or type(_path[0]) is KCFG.Cover: 547 return KCFG.path_length(_path[1:]) 548 elif type(_path[0]) is KCFG.NDBranch: 549 return 1 + KCFG.path_length(_path[1:]) 550 elif type(_path[0]) is KCFG.Edge: 551 return _path[0].depth + KCFG.path_length(_path[1:]) 552 elif type(_path[0]) is KCFG.MergedEdge: 553 return min(edge.depth for edge in _path[0].edges) + KCFG.path_length(_path[1:]) # todo: check this 554 raise ValueError(f'Cannot handle Successor type: {type(_path[0])}')
555
[docs] 556 def extend( 557 self, 558 extend_result: KCFGExtendResult, 559 node: KCFG.Node, 560 logs: dict[int, tuple[LogEntry, ...]], 561 *, 562 optimize_kcfg: bool, 563 ) -> None: 564 565 def log(message: str, *, warning: bool = False) -> None: 566 result_info = extend_result.info if type(extend_result) is Step or type(extend_result) is Branch else '' 567 result_info_message = f': {result_info}' if result_info else '' 568 _LOGGER.log( 569 logging.WARNING if warning else logging.INFO, 570 f'Extending KCFG with: {message}{result_info_message}', 571 ) 572 573 match extend_result: 574 case Vacuous(): 575 self.add_vacuous(node.id) 576 log(f'vacuous node: {node.id}', warning=True) 577 578 case Stuck(): 579 self.add_stuck(node.id) 580 log(f'stuck node: {node.id}') 581 582 case Abstract(cterm): 583 new_node = self.create_node(cterm) 584 self.create_cover(node.id, new_node.id) 585 log(f'abstraction node: {node.id}') 586 587 case Step(cterm, depth, next_node_logs, rule_labels, _): 588 node_id = node.id 589 next_node = self.create_node(cterm) 590 # Optimization for steps consists of on-the-fly merging of consecutive edges and can 591 # be performed only if the current node has a single predecessor connected by an Edge 592 if ( 593 optimize_kcfg 594 and (len(predecessors := self.predecessors(target_id=node.id)) == 1) 595 and isinstance(in_edge := predecessors[0], KCFG.Edge) 596 ): 597 # The existing edge is removed and the step parameters are updated accordingly 598 self.remove_edge(in_edge.source.id, node.id) 599 node_id = in_edge.source.id 600 depth += in_edge.depth 601 rule_labels = list(in_edge.rules) + rule_labels 602 next_node_logs = logs[node.id] + next_node_logs if node.id in logs else next_node_logs 603 self.remove_node(node.id) 604 self.create_edge(node_id, next_node.id, depth, rule_labels) 605 logs[next_node.id] = next_node_logs 606 log(f'basic block at depth {depth}: {node_id} --> {next_node.id}') 607 608 case Branch(branches, _): 609 branch_node_ids = self.split_on_constraints(node.id, branches) 610 log(f'{len(branches)} branches: {node.id} --> {branch_node_ids}') 611 612 case NDBranch(cterms, next_node_logs, rule_labels): 613 next_ids = [self.create_node(cterm).id for cterm in cterms] 614 for i in next_ids: 615 logs[i] = next_node_logs 616 self.create_ndbranch(node.id, next_ids, rules=rule_labels) 617 log(f'{len(cterms)} non-deterministic branches: {node.id} --> {next_ids}') 618 619 case _: 620 raise AssertionError()
621
[docs] 622 def to_dict_no_nodes(self) -> dict[str, Any]: 623 nodes = list(self._nodes.keys()) 624 edges = [edge.to_dict() for edge in self.edges()] 625 merged_edges = [merged_edge.to_dict() for merged_edge in self.merged_edges()] 626 covers = [cover.to_dict() for cover in self.covers()] 627 splits = [split.to_dict() for split in self.splits()] 628 ndbranches = [ndbranch.to_dict() for ndbranch in self.ndbranches()] 629 630 aliases = dict(sorted(self._aliases.items())) 631 632 res = { 633 'next': self._node_id, 634 'nodes': nodes, 635 'edges': edges, 636 'merged_edges': merged_edges, 637 'covers': covers, 638 'splits': splits, 639 'ndbranches': ndbranches, 640 'aliases': aliases, 641 } 642 return {k: v for k, v in res.items() if v}
643
[docs] 644 def to_dict(self) -> dict[str, Any]: 645 nodes = [node.to_dict() for node in self.nodes] 646 edges = [edge.to_dict() for edge in self.edges()] 647 merged_edges = [merged_edge.to_dict() for merged_edge in self.merged_edges()] 648 covers = [cover.to_dict() for cover in self.covers()] 649 splits = [split.to_dict() for split in self.splits()] 650 ndbranches = [ndbranch.to_dict() for ndbranch in self.ndbranches()] 651 652 aliases = dict(sorted(self._aliases.items())) 653 654 res = { 655 'next': self._node_id, 656 'nodes': nodes, 657 'edges': edges, 658 'merged_edges': merged_edges, 659 'covers': covers, 660 'splits': splits, 661 'ndbranches': ndbranches, 662 'aliases': aliases, 663 } 664 return {k: v for k, v in res.items() if v}
665
[docs] 666 @staticmethod 667 def from_dict(dct: Mapping[str, Any], optimize_memory: bool = True) -> KCFG: 668 cfg = KCFG(optimize_memory=optimize_memory) 669 670 for node_dict in dct.get('nodes') or []: 671 node = KCFG.Node.from_dict(node_dict) 672 cfg.add_node(node) 673 674 max_id = max([node.id for node in cfg.nodes], default=0) 675 cfg._node_id = dct.get('next', max_id + 1) 676 677 for edge_dict in dct.get('edges') or []: 678 edge = KCFG.Edge.from_dict(edge_dict, cfg._nodes) 679 cfg.add_successor(edge) 680 681 for edge_dict in dct.get('merged_edges') or []: 682 merged_edge = KCFG.MergedEdge.from_dict(edge_dict, cfg._nodes) 683 cfg.add_successor(merged_edge) 684 685 for cover_dict in dct.get('covers') or []: 686 cover = KCFG.Cover.from_dict(cover_dict, cfg._nodes) 687 cfg.add_successor(cover) 688 689 for split_dict in dct.get('splits') or []: 690 split = KCFG.Split.from_dict(split_dict, cfg._nodes) 691 cfg.add_successor(split) 692 693 for ndbranch_dict in dct.get('ndbranches') or []: 694 ndbranch = KCFG.NDBranch.from_dict(ndbranch_dict, cfg._nodes) 695 cfg.add_successor(ndbranch) 696 697 for alias, node_id in dct.get('aliases', {}).items(): 698 cfg.add_alias(alias=alias, node_id=node_id) 699 700 return cfg
701
[docs] 702 def aliases(self, node_id: NodeIdLike) -> list[str]: 703 node_id = self._resolve(node_id) 704 return [alias for alias, value in self._aliases.items() if node_id == value]
705
[docs] 706 def to_json(self) -> str: 707 return json.dumps(self.to_dict(), sort_keys=True)
708
[docs] 709 @staticmethod 710 def from_json(s: str, optimize_memory: bool = True) -> KCFG: 711 return KCFG.from_dict(json.loads(s), optimize_memory=optimize_memory)
712
[docs] 713 def to_rules( 714 self, 715 _id: str | None = None, 716 priority: int = 20, 717 defunc_with: KDefinition | None = None, 718 ) -> list[KRuleLike]: 719 _id = 'BASIC-BLOCK' if _id is None else _id 720 return [e.to_rule(_id, priority=priority, defunc_with=defunc_with) for e in self.edges()] + [ 721 m.to_rule(_id, priority=priority) for m in self.merged_edges() 722 ]
723
[docs] 724 def to_module( 725 self, 726 module_name: str | None = None, 727 imports: Iterable[KImport] = (), 728 priority: int = 20, 729 att: KAtt = EMPTY_ATT, 730 defunc_with: KDefinition | None = None, 731 ) -> KFlatModule: 732 module_name = 'KCFG' if module_name is None else module_name 733 return KFlatModule( 734 module_name, self.to_rules(priority=priority, defunc_with=defunc_with), imports=imports, att=att 735 )
736 737 def _resolve_or_none(self, id_like: NodeIdLike) -> int | None: 738 if type(id_like) is int: 739 if id_like in self._nodes: 740 return id_like 741 742 return None 743 744 if type(id_like) is not str: 745 raise TypeError(f'Expected int or str for id_like, got: {id_like}') 746 747 if id_like.startswith('@'): 748 if id_like[1:] in self._aliases: 749 return self._aliases[id_like[1:]] 750 raise ValueError(f'Unknown alias: {id_like}') 751 752 return None 753 754 def _resolve(self, id_like: NodeIdLike) -> int: 755 match = self._resolve_or_none(id_like) 756 if not match: 757 raise ValueError(f'Unknown node: {id_like}') 758 return match 759
[docs] 760 def node(self, node_id: NodeIdLike) -> KCFG.Node: 761 node_id = self._resolve(node_id) 762 return self._nodes[node_id]
763
[docs] 764 def get_node(self, node_id: NodeIdLike) -> KCFG.Node | None: 765 resolved_id = self._resolve_or_none(node_id) 766 if resolved_id is None: 767 return None 768 return self._nodes[resolved_id]
769
[docs] 770 def contains_node(self, node: KCFG.Node) -> bool: 771 return bool(self.get_node(node.id))
772
[docs] 773 def add_node(self, node: KCFG.Node) -> None: 774 if node.id in self._nodes: 775 raise ValueError(f'Node with id already exists: {node.id}') 776 self._nodes[node.id] = node 777 self._created_nodes.add(node.id)
778
[docs] 779 def create_node(self, cterm: CTerm) -> KCFG.Node: 780 node = KCFG.Node(self._node_id, cterm) 781 self._node_id += 1 782 self._nodes[node.id] = node 783 self._created_nodes.add(node.id) 784 return node
785
[docs] 786 def remove_node(self, node_id: NodeIdLike) -> None: 787 self.remove_edges_around(node_id) 788 789 node_id = self._resolve(node_id) 790 self._nodes.pop(node_id) 791 self._deleted_nodes.add(node_id) 792 self._created_nodes.discard(node_id)
793
[docs] 794 def remove_edges_around(self, node_id: NodeIdLike) -> None: 795 node_id = self._resolve(node_id) 796 797 self._edges = {k: s for k, s in self._edges.items() if k != node_id and node_id not in s.target_ids} 798 self._merged_edges = { 799 k: s for k, s in self._merged_edges.items() if k != node_id and node_id not in s.target_ids 800 } 801 self._covers = {k: s for k, s in self._covers.items() if k != node_id and node_id not in s.target_ids} 802 803 self._splits = {k: s for k, s in self._splits.items() if k != node_id and node_id not in s.target_ids} 804 self._ndbranches = {k: b for k, b in self._ndbranches.items() if k != node_id and node_id not in b.target_ids} 805 806 for alias in [alias for alias, id in self._aliases.items() if id == node_id]: 807 self.remove_alias(alias)
808 809 def _update_refs(self, node_id: int) -> None: 810 node = self.node(node_id) 811 for succ in self.successors(node_id): 812 new_succ = succ.replace_source(node) 813 if type(new_succ) is KCFG.Edge: 814 self._edges[new_succ.source.id] = new_succ 815 if type(new_succ) is KCFG.MergedEdge: 816 self._merged_edges[new_succ.source.id] = new_succ 817 if type(new_succ) is KCFG.Cover: 818 self._covers[new_succ.source.id] = new_succ 819 if type(new_succ) is KCFG.Split: 820 self._splits[new_succ.source.id] = new_succ 821 if type(new_succ) is KCFG.NDBranch: 822 self._ndbranches[new_succ.source.id] = new_succ 823 824 for pred in self.predecessors(node_id): 825 new_pred = pred.replace_target(node) 826 if type(new_pred) is KCFG.Edge: 827 self._edges[new_pred.source.id] = new_pred 828 if type(new_pred) is KCFG.MergedEdge: 829 self._merged_edges[new_pred.source.id] = new_pred 830 if type(new_pred) is KCFG.Cover: 831 self._covers[new_pred.source.id] = new_pred 832 if type(new_pred) is KCFG.Split: 833 self._splits[new_pred.source.id] = new_pred 834 if type(new_pred) is KCFG.NDBranch: 835 self._ndbranches[new_pred.source.id] = new_pred 836
[docs] 837 def remove_attr(self, node_id: NodeIdLike, attr: NodeAttr) -> None: 838 node = self.node(node_id) 839 new_node = node.remove_attr(attr) 840 self.replace_node(new_node)
841
[docs] 842 def discard_attr(self, node_id: NodeIdLike, attr: NodeAttr) -> None: 843 node = self.node(node_id) 844 new_node = node.discard_attr(attr) 845 self.replace_node(new_node)
846
[docs] 847 def add_attr(self, node_id: NodeIdLike, attr: NodeAttr) -> None: 848 node = self.node(node_id) 849 new_node = node.add_attr(attr) 850 self.replace_node(new_node)
851
[docs] 852 def let_node( 853 self, node_id: NodeIdLike, cterm: CTerm | None = None, attrs: Iterable[KCFGNodeAttr] | None = None 854 ) -> None: 855 node = self.node(node_id) 856 new_node = node.let(cterm=cterm, attrs=attrs) 857 self.replace_node(new_node)
858
[docs] 859 def replace_node(self, node: KCFG.Node) -> None: 860 self._nodes[node.id] = node 861 self._created_nodes.add(node.id) 862 self._update_refs(node.id)
863
[docs] 864 def successors(self, source_id: NodeIdLike) -> list[Successor]: 865 out_edges: Iterable[KCFG.Successor] = self.edges(source_id=source_id) 866 out_merged_edges: Iterable[KCFG.Successor] = self.merged_edges(source_id=source_id) 867 out_covers: Iterable[KCFG.Successor] = self.covers(source_id=source_id) 868 out_splits: Iterable[KCFG.Successor] = self.splits(source_id=source_id) 869 out_ndbranches: Iterable[KCFG.Successor] = self.ndbranches(source_id=source_id) 870 return list(out_edges) + list(out_merged_edges) + list(out_covers) + list(out_splits) + list(out_ndbranches)
871
[docs] 872 def predecessors(self, target_id: NodeIdLike) -> list[Successor]: 873 in_edges: Iterable[KCFG.Successor] = self.edges(target_id=target_id) 874 in_merged_edges: Iterable[KCFG.Successor] = self.merged_edges(target_id=target_id) 875 in_covers: Iterable[KCFG.Successor] = self.covers(target_id=target_id) 876 in_splits: Iterable[KCFG.Successor] = self.splits(target_id=target_id) 877 in_ndbranches: Iterable[KCFG.Successor] = self.ndbranches(target_id=target_id) 878 return list(in_edges) + list(in_merged_edges) + list(in_covers) + list(in_splits) + list(in_ndbranches)
879 880 def _check_no_successors(self, source_id: NodeIdLike) -> None: 881 if len(self.successors(source_id)) > 0: 882 raise ValueError(f'Node already has successors: {source_id} -> {self.successors(source_id)}') 883 884 def _check_no_zero_loops(self, source_id: NodeIdLike, target_ids: Iterable[NodeIdLike]) -> None: 885 for target_id in target_ids: 886 path = self.shortest_path_between(target_id, source_id) 887 if path is not None and KCFG.path_length(path) == 0: 888 raise ValueError( 889 f'Adding successor would create zero-length loop with backedge: {source_id} -> {target_id}' 890 ) 891
[docs] 892 def add_successor(self, succ: KCFG.Successor) -> None: 893 self._check_no_successors(succ.source.id) 894 self._check_no_zero_loops(succ.source.id, succ.target_ids) 895 if type(succ) is KCFG.Edge: 896 self._edges[succ.source.id] = succ 897 elif type(succ) is KCFG.MergedEdge: 898 self._merged_edges[succ.source.id] = succ 899 elif type(succ) is KCFG.Cover: 900 self._covers[succ.source.id] = succ 901 else: 902 if len(succ.target_ids) <= 1: 903 raise ValueError( 904 f'Cannot create {type(succ)} node with less than 2 targets: {succ.source.id} -> {succ.target_ids}' 905 ) 906 if type(succ) is KCFG.Split: 907 self._splits[succ.source.id] = succ 908 elif type(succ) is KCFG.NDBranch: 909 self._ndbranches[succ.source.id] = succ
910
[docs] 911 def edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> Edge | None: 912 source_id = self._resolve(source_id) 913 target_id = self._resolve(target_id) 914 edge = self._edges.get(source_id, None) 915 return edge if edge is not None and edge.target.id == target_id else None
916
[docs] 917 def edges(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[Edge]: 918 source_id = self._resolve(source_id) if source_id is not None else None 919 target_id = self._resolve(target_id) if target_id is not None else None 920 return [ 921 edge 922 for edge in self._edges.values() 923 if (source_id is None or source_id == edge.source.id) and (target_id is None or target_id == edge.target.id) 924 ]
925
[docs] 926 def contains_edge(self, edge: Edge) -> bool: 927 if other := self.edge(source_id=edge.source.id, target_id=edge.target.id): 928 return edge == other 929 return False
930
[docs] 931 def create_edge(self, source_id: NodeIdLike, target_id: NodeIdLike, depth: int, rules: Iterable[str] = ()) -> Edge: 932 if depth <= 0: 933 raise ValueError(f'Cannot build KCFG Edge with non-positive depth: {depth}') 934 source = self.node(source_id) 935 target = self.node(target_id) 936 edge = KCFG.Edge(source, target, depth, tuple(rules)) 937 self.add_successor(edge) 938 return edge
939
[docs] 940 def remove_edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> None: 941 source_id = self._resolve(source_id) 942 target_id = self._resolve(target_id) 943 edge = self.edge(source_id, target_id) 944 if not edge: 945 raise ValueError(f'Edge does not exist: {source_id} -> {target_id}') 946 self._edges.pop(source_id)
947
[docs] 948 def merged_edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> MergedEdge | None: 949 source_id = self._resolve(source_id) 950 target_id = self._resolve(target_id) 951 merged_edge = self._merged_edges.get(source_id, None) 952 return merged_edge if merged_edge is not None and merged_edge.target.id == target_id else None
953
[docs] 954 def merged_edges( 955 self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None 956 ) -> list[MergedEdge]: 957 source_id = self._resolve(source_id) if source_id is not None else None 958 target_id = self._resolve(target_id) if target_id is not None else None 959 return [ 960 merged_edge 961 for merged_edge in self._merged_edges.values() 962 if (source_id is None or source_id == merged_edge.source.id) 963 and (target_id is None or target_id == merged_edge.target.id) 964 ]
965
[docs] 966 def contains_merged_edge(self, edge: MergedEdge) -> bool: 967 if other := self.merged_edge(source_id=edge.source.id, target_id=edge.target.id): 968 return edge == other 969 return False
970
[docs] 971 def create_merged_edge( 972 self, source_id: NodeIdLike, target_id: NodeIdLike, edges: Iterable[Edge | MergedEdge] 973 ) -> MergedEdge: 974 if len(list(edges)) == 0: 975 raise ValueError(f'Cannot build KCFG MergedEdge with no edges: {edges}') 976 source = self.node(source_id) 977 target = self.node(target_id) 978 flatten_edges: list[KCFG.Edge] = [] 979 for edge in edges: 980 if isinstance(edge, KCFG.MergedEdge): 981 flatten_edges.extend(edge.edges) 982 else: 983 flatten_edges.append(edge) 984 merged_edge = KCFG.MergedEdge(source, target, tuple(flatten_edges)) 985 self.add_successor(merged_edge) 986 return merged_edge
987
[docs] 988 def remove_merged_edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> None: 989 source_id = self._resolve(source_id) 990 target_id = self._resolve(target_id) 991 merged_edge = self.merged_edge(source_id, target_id) 992 if not merged_edge: 993 raise ValueError(f'MergedEdge does not exist: {source_id} -> {target_id}') 994 self._merged_edges.pop(source_id)
995
[docs] 996 def general_edges( 997 self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None 998 ) -> list[Edge | MergedEdge]: 999 return self.edges(source_id=source_id, target_id=target_id) + self.merged_edges( 1000 source_id=source_id, target_id=target_id 1001 )
1002
[docs] 1003 def cover(self, source_id: NodeIdLike, target_id: NodeIdLike) -> Cover | None: 1004 source_id = self._resolve(source_id) 1005 target_id = self._resolve(target_id) 1006 cover = self._covers.get(source_id, None) 1007 return cover if cover is not None and cover.target.id == target_id else None
1008
[docs] 1009 def covers(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[Cover]: 1010 source_id = self._resolve(source_id) if source_id is not None else None 1011 target_id = self._resolve(target_id) if target_id is not None else None 1012 return [ 1013 cover 1014 for cover in self._covers.values() 1015 if (source_id is None or source_id == cover.source.id) 1016 and (target_id is None or target_id == cover.target.id) 1017 ]
1018
[docs] 1019 def contains_cover(self, cover: Cover) -> bool: 1020 if other := self.cover(source_id=cover.source.id, target_id=cover.target.id): 1021 return cover == other 1022 return False
1023
[docs] 1024 def create_cover(self, source_id: NodeIdLike, target_id: NodeIdLike, csubst: CSubst | None = None) -> Cover: 1025 source = self.node(source_id) 1026 target = self.node(target_id) 1027 if csubst is None: 1028 csubst = target.cterm.match_with_constraint(source.cterm) 1029 if csubst is None: 1030 raise ValueError(f'No matching between: {source.id} and {target.id}') 1031 cover = KCFG.Cover(source, target, csubst=csubst) 1032 self.add_successor(cover) 1033 return cover
1034
[docs] 1035 def remove_cover(self, source_id: NodeIdLike, target_id: NodeIdLike) -> None: 1036 source_id = self._resolve(source_id) 1037 target_id = self._resolve(target_id) 1038 cover = self.cover(source_id, target_id) 1039 if not cover: 1040 raise ValueError(f'Cover does not exist: {source_id} -> {target_id}') 1041 self._covers.pop(source_id)
1042
[docs] 1043 def edge_likes(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[EdgeLike]: 1044 return ( 1045 cast('List[KCFG.EdgeLike]', self.edges(source_id=source_id, target_id=target_id)) 1046 + cast('List[KCFG.EdgeLike]', self.covers(source_id=source_id, target_id=target_id)) 1047 + cast('List[KCFG.EdgeLike]', self.merged_edges(source_id=source_id, target_id=target_id)) 1048 )
1049
[docs] 1050 def add_vacuous(self, node_id: NodeIdLike) -> None: 1051 self.add_attr(node_id, KCFGNodeAttr.VACUOUS)
1052
[docs] 1053 def remove_vacuous(self, node_id: NodeIdLike) -> None: 1054 self.remove_attr(node_id, KCFGNodeAttr.VACUOUS)
1055
[docs] 1056 def discard_vacuous(self, node_id: NodeIdLike) -> None: 1057 self.discard_attr(node_id, KCFGNodeAttr.VACUOUS)
1058
[docs] 1059 def add_stuck(self, node_id: NodeIdLike) -> None: 1060 self.add_attr(node_id, KCFGNodeAttr.STUCK)
1061
[docs] 1062 def remove_stuck(self, node_id: NodeIdLike) -> None: 1063 self.remove_attr(node_id, KCFGNodeAttr.STUCK)
1064
[docs] 1065 def discard_stuck(self, node_id: NodeIdLike) -> None: 1066 self.discard_attr(node_id, KCFGNodeAttr.STUCK)
1067
[docs] 1068 def splits(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[Split]: 1069 source_id = self._resolve(source_id) if source_id is not None else None 1070 target_id = self._resolve(target_id) if target_id is not None else None 1071 return [ 1072 s 1073 for s in self._splits.values() 1074 if (source_id is None or source_id == s.source.id) and (target_id is None or target_id in s.target_ids) 1075 ]
1076
[docs] 1077 def contains_split(self, split: Split) -> bool: 1078 return split in self._splits.values()
1079
[docs] 1080 def create_split(self, source_id: NodeIdLike, splits: Iterable[tuple[NodeIdLike, CSubst]]) -> KCFG.Split: 1081 source_id = self._resolve(source_id) 1082 split = KCFG.Split(self.node(source_id), tuple((self.node(nid), csubst) for nid, csubst in list(splits))) 1083 self.add_successor(split) 1084 return split
1085
[docs] 1086 def create_split_by_nodes(self, source_id: NodeIdLike, target_ids: Iterable[NodeIdLike]) -> KCFG.Split | None: 1087 """Create a split without crafting a CSubst.""" 1088 source = self.node(source_id) 1089 targets = [self.node(nid) for nid in target_ids] 1090 try: 1091 csubsts = [not_none(source.cterm.match_with_constraint(target.cterm)) for target in targets] 1092 except ValueError: 1093 return None 1094 return self.create_split(source.id, zip(target_ids, csubsts, strict=True))
1095
[docs] 1096 def ndbranches(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[NDBranch]: 1097 source_id = self._resolve(source_id) if source_id is not None else None 1098 target_id = self._resolve(target_id) if target_id is not None else None 1099 return [ 1100 b 1101 for b in self._ndbranches.values() 1102 if (source_id is None or source_id == b.source.id) and (target_id is None or target_id in b.target_ids) 1103 ]
1104
[docs] 1105 def contains_ndbranch(self, ndbranch: NDBranch) -> bool: 1106 return ndbranch in self._ndbranches
1107
[docs] 1108 def create_ndbranch( 1109 self, source_id: NodeIdLike, ndbranches: Iterable[NodeIdLike], rules: Iterable[str] = () 1110 ) -> KCFG.NDBranch: 1111 source_id = self._resolve(source_id) 1112 ndbranch = KCFG.NDBranch(self.node(source_id), tuple(self.node(nid) for nid in list(ndbranches)), tuple(rules)) 1113 self.add_successor(ndbranch) 1114 return ndbranch
1115
[docs] 1116 def split_on_constraints(self, source_id: NodeIdLike, constraints: Iterable[KInner]) -> list[int]: 1117 source = self.node(source_id) 1118 branch_node_ids = [self.create_node(source.cterm.add_constraint(c)).id for c in constraints] 1119 csubsts = [not_none(source.cterm.match_with_constraint(self.node(id).cterm)) for id in branch_node_ids] 1120 self.create_split(source.id, zip(branch_node_ids, csubsts, strict=True)) 1121 return branch_node_ids
1122
[docs] 1123 def add_alias(self, alias: str, node_id: NodeIdLike) -> None: 1124 if '@' in alias: 1125 raise ValueError('Alias may not contain "@"') 1126 if alias in self._aliases: 1127 raise ValueError(f'Duplicate alias: {alias}') 1128 node_id = self._resolve(node_id) 1129 self._aliases[alias] = node_id
1130
[docs] 1131 def remove_alias(self, alias: str) -> None: 1132 if alias not in self._aliases: 1133 raise ValueError(f'Alias does not exist: {alias}') 1134 self._aliases.pop(alias)
1135
[docs] 1136 def is_root(self, node_id: NodeIdLike) -> bool: 1137 node_id = self._resolve(node_id) 1138 return len(self.predecessors(node_id)) == 0
1139
[docs] 1140 def is_vacuous(self, node_id: NodeIdLike) -> bool: 1141 return KCFGNodeAttr.VACUOUS in self.node(node_id).attrs
1142
[docs] 1143 def is_stuck(self, node_id: NodeIdLike) -> bool: 1144 return KCFGNodeAttr.STUCK in self.node(node_id).attrs
1145
[docs] 1146 def is_split(self, node_id: NodeIdLike) -> bool: 1147 node_id = self._resolve(node_id) 1148 return node_id in self._splits
1149
[docs] 1150 def is_ndbranch(self, node_id: NodeIdLike) -> bool: 1151 node_id = self._resolve(node_id) 1152 return node_id in self._ndbranches
1153
[docs] 1154 def is_leaf(self, node_id: NodeIdLike) -> bool: 1155 return len(self.successors(node_id)) == 0
1156
[docs] 1157 def is_covered(self, node_id: NodeIdLike) -> bool: 1158 node_id = self._resolve(node_id) 1159 return node_id in self._covers
1160
[docs] 1161 def prune(self, node_id: NodeIdLike, keep_nodes: Iterable[NodeIdLike] = ()) -> list[int]: 1162 nodes = self.reachable_nodes(node_id) 1163 keep_nodes = [self._resolve(nid) for nid in keep_nodes] 1164 pruned_nodes: list[int] = [] 1165 for node in nodes: 1166 if node.id not in keep_nodes: 1167 self.remove_node(node.id) 1168 pruned_nodes.append(node.id) 1169 return pruned_nodes
1170
[docs] 1171 def shortest_path_between( 1172 self, source_node_id: NodeIdLike, target_node_id: NodeIdLike 1173 ) -> tuple[Successor, ...] | None: 1174 paths = self.paths_between(source_node_id, target_node_id) 1175 if len(paths) == 0: 1176 return None 1177 return sorted(paths, key=(lambda path: KCFG.path_length(path)))[0]
1178
[docs] 1179 def shortest_distance_between(self, node_1_id: NodeIdLike, node_2_id: NodeIdLike) -> int | None: 1180 path_1 = self.shortest_path_between(node_1_id, node_2_id) 1181 path_2 = self.shortest_path_between(node_2_id, node_1_id) 1182 distance: int | None = None 1183 if path_1 is not None: 1184 distance = KCFG.path_length(path_1) 1185 if path_2 is not None: 1186 distance_2 = KCFG.path_length(path_2) 1187 if distance is None or distance_2 < distance: 1188 distance = distance_2 1189 return distance
1190
[docs] 1191 def zero_depth_between(self, node_1_id: NodeIdLike, node_2_id: NodeIdLike) -> bool: 1192 _node_1_id = self._resolve(node_1_id) 1193 _node_2_id = self._resolve(node_2_id) 1194 if _node_1_id == _node_2_id: 1195 return True 1196 # Short-circuit and don't run pathing algorithm if there is no 0 length path on the first step. 1197 path_lengths = [ 1198 self.path_length([successor]) for successor in self.successors(_node_1_id) + self.successors(_node_2_id) 1199 ] 1200 if 0 not in path_lengths: 1201 return False 1202 1203 shortest_distance = self.shortest_distance_between(_node_1_id, _node_2_id) 1204 1205 return shortest_distance is not None and shortest_distance == 0
1206
[docs] 1207 def paths_between(self, source_id: NodeIdLike, target_id: NodeIdLike) -> list[tuple[Successor, ...]]: 1208 source_id = self._resolve(source_id) 1209 target_id = self._resolve(target_id) 1210 1211 if source_id == target_id: 1212 return [()] 1213 1214 source_successors = list(self.successors(source_id)) 1215 assert len(source_successors) <= 1 1216 if len(source_successors) == 0: 1217 return [] 1218 1219 paths: list[tuple[KCFG.Successor, ...]] = [] 1220 worklist: list[list[KCFG.Successor]] = [[source_successors[0]]] 1221 1222 def _in_path(_nid: int, _path: list[KCFG.Successor]) -> bool: 1223 for succ in _path: 1224 if _nid == succ.source.id: 1225 return True 1226 if len(_path) > 0: 1227 if isinstance(_path[-1], KCFG.EdgeLike) and _path[-1].target.id == _nid: 1228 return True 1229 elif isinstance(_path[-1], KCFG.MultiEdge) and _nid in _path[-1].target_ids: 1230 return True 1231 return False 1232 1233 while worklist: 1234 curr_path = worklist.pop() 1235 curr_successor = curr_path[-1] 1236 successors: list[KCFG.Successor] = [] 1237 1238 if isinstance(curr_successor, KCFG.EdgeLike): 1239 if curr_successor.target.id == target_id: 1240 paths.append(tuple(curr_path)) 1241 continue 1242 else: 1243 successors = list(self.successors(curr_successor.target.id)) 1244 1245 elif isinstance(curr_successor, KCFG.MultiEdge): 1246 if len(list(curr_successor.targets)) == 1: 1247 target = list(curr_successor.targets)[0] 1248 if target.id == target_id: 1249 paths.append(tuple(curr_path)) 1250 continue 1251 else: 1252 successors = list(self.successors(target.id)) 1253 if len(list(curr_successor.targets)) > 1: 1254 curr_path = curr_path[0:-1] 1255 successors = [curr_successor.with_single_target(target) for target in curr_successor.targets] 1256 1257 for successor in successors: 1258 if isinstance(successor, KCFG.EdgeLike) and not _in_path(successor.target.id, curr_path): 1259 worklist.append(curr_path + [successor]) 1260 elif isinstance(successor, KCFG.MultiEdge): 1261 if len(list(successor.targets)) == 1: 1262 target = list(successor.targets)[0] 1263 if not _in_path(target.id, curr_path): 1264 worklist.append(curr_path + [successor]) 1265 elif len(list(successor.targets)) > 1: 1266 worklist.append(curr_path + [successor]) 1267 1268 return paths
1269
[docs] 1270 def reachable_nodes(self, source_id: NodeIdLike, *, reverse: bool = False) -> set[KCFG.Node]: 1271 visited: set[KCFG.Node] = set() 1272 worklist: list[KCFG.Node] = [self.node(source_id)] 1273 1274 while worklist: 1275 node = worklist.pop() 1276 1277 if node in visited: 1278 continue 1279 1280 visited.add(node) 1281 1282 if not reverse: 1283 worklist.extend(target for succ in self.successors(source_id=node.id) for target in succ.targets) 1284 else: 1285 worklist.extend(succ.source for succ in self.predecessors(target_id=node.id)) 1286 1287 return visited
1288
[docs] 1289 def write_cfg_data(self) -> None: 1290 assert self._kcfg_store is not None 1291 self._kcfg_store.write_cfg_data( 1292 self, self.to_dict_no_nodes(), deleted_nodes=self._deleted_nodes, created_nodes=self._created_nodes 1293 ) 1294 self._deleted_nodes.clear() 1295 self._created_nodes.clear()
1296
[docs] 1297 @staticmethod 1298 def read_cfg_data(cfg_dir: Path) -> KCFG: 1299 store = KCFGStore(cfg_dir) 1300 cfg = KCFG.from_dict(store.read_cfg_data()) 1301 cfg._kcfg_store = store 1302 return cfg
1303
[docs] 1304 @staticmethod 1305 def read_node_data(cfg_dir: Path, node_id: int) -> KCFG.Node: 1306 store = KCFGStore(cfg_dir) 1307 return KCFG.Node.from_dict(store.read_node_data(node_id))
1308 1309
[docs] 1310class KCFGExtendResult(ABC): ...
1311 1312
[docs] 1313@final 1314@dataclass(frozen=True) 1315class Vacuous(KCFGExtendResult): ...
1316 1317
[docs] 1318@final 1319@dataclass(frozen=True) 1320class Stuck(KCFGExtendResult): ...
1321 1322
[docs] 1323@final 1324@dataclass(frozen=True) 1325class Abstract(KCFGExtendResult): 1326 cterm: CTerm
1327 1328
[docs] 1329@final 1330@dataclass(frozen=True) 1331class Step(KCFGExtendResult): 1332 cterm: CTerm 1333 depth: int 1334 logs: tuple[LogEntry, ...] 1335 rule_labels: list[str] 1336 cut: bool = field(default=False) 1337 info: str = field(default='')
1338 1339
[docs] 1340@final 1341@dataclass(frozen=True) 1342class Branch(KCFGExtendResult): 1343 constraints: tuple[KInner, ...] 1344 heuristic: bool 1345 info: str = field(default='') 1346 1347 def __init__(self, constraints: Iterable[KInner], *, heuristic: bool = False, info: str = ''): 1348 object.__setattr__(self, 'constraints', tuple(constraints)) 1349 object.__setattr__(self, 'heuristic', heuristic) 1350 object.__setattr__(self, 'info', info)
1351 1352
[docs] 1353@final 1354@dataclass(frozen=True) 1355class NDBranch(KCFGExtendResult): 1356 cterms: tuple[CTerm, ...] 1357 logs: tuple[LogEntry, ...] 1358 rule_labels: tuple[str, ...] 1359 1360 def __init__(self, cterms: Iterable[CTerm], logs: Iterable[LogEntry,], rule_labels: Iterable[str]): 1361 object.__setattr__(self, 'cterms', tuple(cterms)) 1362 object.__setattr__(self, 'logs', tuple(logs)) 1363 object.__setattr__(self, 'rule_labels', tuple(rule_labels))