Source code for pyk.kcfg.kcfg

   1from __future__ import annotations
   2
   3import json
   4import logging
   5from abc import ABC, abstractmethod
   6from collections.abc import Container
   7from dataclasses import dataclass, field
   8from threading import RLock
   9from typing import TYPE_CHECKING, Final, List, Union, cast, final
  10
  11from ..cterm import CSubst, CTerm, cterm_build_claim, cterm_build_rule
  12from ..kast import EMPTY_ATT
  13from ..kast.inner import KApply
  14from ..kast.manip import (
  15    bool_to_ml_pred,
  16    extract_lhs,
  17    extract_rhs,
  18    inline_cell_maps,
  19    minimize_rule_like,
  20    rename_generated_vars,
  21    sort_ac_collections,
  22)
  23from ..kast.outer import KFlatModule
  24from ..prelude.kbool import andBool
  25from ..utils import ensure_dir_path, not_none
  26
  27if TYPE_CHECKING:
  28    from collections.abc import Iterable, Mapping, MutableMapping
  29    from pathlib import Path
  30    from types import TracebackType
  31    from typing import Any
  32
  33    from pyk.kore.rpc import LogEntry
  34
  35    from ..kast import KAtt
  36    from ..kast.inner import KInner
  37    from ..kast.outer import KClaim, KDefinition, KImport, KRuleLike
  38
  39
  40NodeIdLike = int | str
  41
  42_LOGGER: Final = logging.getLogger(__name__)
  43
  44
[docs] 45@dataclass(frozen=True) 46class NodeAttr: 47 value: str
48 49
[docs] 50class KCFGNodeAttr(NodeAttr): 51 VACUOUS = NodeAttr('vacuous') 52 STUCK = NodeAttr('stuck')
53 54
[docs] 55class KCFGStore: 56 store_path: Path 57 58 def __init__(self, store_path: Path) -> None: 59 self.store_path = store_path 60 ensure_dir_path(store_path) 61 ensure_dir_path(self.kcfg_node_dir) 62 63 @property 64 def kcfg_json_path(self) -> Path: 65 return self.store_path / 'kcfg.json' 66 67 @property 68 def kcfg_node_dir(self) -> Path: 69 return self.store_path / 'nodes' 70
[docs] 71 def kcfg_node_path(self, node_id: int) -> Path: 72 return self.kcfg_node_dir / f'{node_id}.json'
73
[docs] 74 def write_cfg_data( 75 self, kcfg: KCFG, dct: dict[str, Any], deleted_nodes: Iterable[int] = (), created_nodes: Iterable[int] = () 76 ) -> None: 77 vacuous_nodes = [ 78 node_id for node_id in kcfg._nodes.keys() if KCFGNodeAttr.VACUOUS in kcfg._nodes[node_id].attrs 79 ] 80 stuck_nodes = [node_id for node_id in kcfg._nodes.keys() if KCFGNodeAttr.STUCK in kcfg._nodes[node_id].attrs] 81 dct['vacuous'] = vacuous_nodes 82 dct['stuck'] = stuck_nodes 83 for node_id in deleted_nodes: 84 self.kcfg_node_path(node_id).unlink(missing_ok=True) 85 for node_id in created_nodes: 86 self.kcfg_node_path(node_id).write_text(json.dumps(kcfg._nodes[node_id].to_dict())) 87 self.kcfg_json_path.write_text(json.dumps(dct))
88
[docs] 89 def read_cfg_data(self) -> dict[str, Any]: 90 dct = json.loads(self.kcfg_json_path.read_text()) 91 nodes = [self.read_node_data(node_id) for node_id in dct.get('nodes') or []] 92 dct['nodes'] = nodes 93 94 new_nodes = [] 95 for node in dct['nodes']: 96 attrs = [] 97 if node['id'] in dct['vacuous']: 98 attrs.append(KCFGNodeAttr.VACUOUS.value) 99 if node['id'] in dct['stuck']: 100 attrs.append(KCFGNodeAttr.STUCK.value) 101 new_nodes.append({'id': node['id'], 'cterm': node['cterm'], 'attrs': attrs}) 102 103 dct['nodes'] = new_nodes 104 105 del dct['vacuous'] 106 del dct['stuck'] 107 108 return dct
109
[docs] 110 def read_node_data(self, node_id: int) -> dict[str, Any]: 111 return json.loads(self.kcfg_node_path(node_id).read_text())
112 113
[docs] 114class KCFG(Container[Union['KCFG.Node', 'KCFG.Successor']]):
[docs] 115 @final 116 @dataclass(frozen=True, order=True) 117 class Node: 118 id: int 119 cterm: CTerm 120 attrs: frozenset[NodeAttr] 121 122 def __init__(self, id: int, cterm: CTerm, attrs: Iterable[NodeAttr] = ()) -> None: 123 object.__setattr__(self, 'id', id) 124 object.__setattr__(self, 'cterm', cterm) 125 object.__setattr__(self, 'attrs', frozenset(attrs)) 126
[docs] 127 def to_dict(self) -> dict[str, Any]: 128 return {'id': self.id, 'cterm': self.cterm.to_dict(), 'attrs': [attr.value for attr in self.attrs]}
129
[docs] 130 @staticmethod 131 def from_dict(dct: dict[str, Any]) -> KCFG.Node: 132 return KCFG.Node(dct['id'], CTerm.from_dict(dct['cterm']), [NodeAttr(attr) for attr in dct['attrs']])
133
[docs] 134 def add_attr(self, attr: NodeAttr) -> KCFG.Node: 135 return KCFG.Node(self.id, self.cterm, list(self.attrs) + [attr])
136
[docs] 137 def remove_attr(self, attr: NodeAttr) -> KCFG.Node: 138 if attr not in self.attrs: 139 raise ValueError(f'Node {self.id} does not have attribute {attr.value}') 140 return KCFG.Node(self.id, self.cterm, self.attrs.difference([attr]))
141
[docs] 142 def discard_attr(self, attr: NodeAttr) -> KCFG.Node: 143 return KCFG.Node(self.id, self.cterm, self.attrs.difference([attr]))
144
[docs] 145 def let(self, cterm: CTerm | None = None, attrs: Iterable[KCFGNodeAttr] | None = None) -> KCFG.Node: 146 new_cterm = cterm if cterm is not None else self.cterm 147 new_attrs = attrs if attrs is not None else self.attrs 148 return KCFG.Node(self.id, new_cterm, new_attrs)
149 150 @property 151 def free_vars(self) -> frozenset[str]: 152 return frozenset(self.cterm.free_vars)
153
[docs] 154 class Successor(ABC): 155 source: KCFG.Node 156 157 def __lt__(self, other: Any) -> bool: 158 if not isinstance(other, KCFG.Successor): 159 return NotImplemented 160 return self.source < other.source 161 162 @property 163 def source_vars(self) -> frozenset[str]: 164 return frozenset(self.source.free_vars) 165 166 @property 167 @abstractmethod 168 def targets(self) -> tuple[KCFG.Node, ...]: ... 169 170 @property 171 def target_ids(self) -> list[int]: 172 return sorted(target.id for target in self.targets) 173 174 @property 175 def target_vars(self) -> frozenset[str]: 176 return frozenset(set.union(set(), *(target.free_vars for target in self.targets))) 177
[docs] 178 @abstractmethod 179 def replace_source(self, node: KCFG.Node) -> KCFG.Successor: ...
180
[docs] 181 @abstractmethod 182 def replace_target(self, node: KCFG.Node) -> KCFG.Successor: ...
183
[docs] 184 @abstractmethod 185 def to_dict(self) -> dict[str, Any]: ...
186
[docs] 187 @staticmethod 188 @abstractmethod 189 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Successor: ...
190
[docs] 191 class EdgeLike(Successor): 192 source: KCFG.Node 193 target: KCFG.Node 194 195 @property 196 def targets(self) -> tuple[KCFG.Node, ...]: 197 return (self.target,)
198
[docs] 199 @final 200 @dataclass(frozen=True) 201 class Edge(EdgeLike): 202 source: KCFG.Node 203 target: KCFG.Node 204 depth: int 205 rules: tuple[str, ...] 206
[docs] 207 def to_dict(self) -> dict[str, Any]: 208 return { 209 'source': self.source.id, 210 'target': self.target.id, 211 'depth': self.depth, 212 'rules': list(self.rules), 213 }
214
[docs] 215 @staticmethod 216 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Edge: 217 return KCFG.Edge(nodes[dct['source']], nodes[dct['target']], dct['depth'], tuple(dct['rules']))
218
[docs] 219 def to_rule( 220 self, 221 label: str, 222 claim: bool = False, 223 priority: int | None = None, 224 defunc_with: KDefinition | None = None, 225 minimize: bool = False, 226 ) -> KRuleLike: 227 def is_ceil_condition(kast: KInner) -> bool: 228 return type(kast) is KApply and kast.label.name == '#Ceil' 229 230 def _simplify_config(config: KInner) -> KInner: 231 return sort_ac_collections(inline_cell_maps(config)) 232 233 sentence_id = f'{label}-{self.source.id}-TO-{self.target.id}' 234 init_constraints = [c for c in self.source.cterm.constraints if not is_ceil_condition(c)] 235 init_cterm = CTerm(_simplify_config(self.source.cterm.config), init_constraints) 236 target_constraints = [c for c in self.target.cterm.constraints if not is_ceil_condition(c)] 237 target_cterm = CTerm(_simplify_config(self.target.cterm.config), target_constraints) 238 rule: KRuleLike 239 if claim: 240 rule, _ = cterm_build_claim(sentence_id, init_cterm, target_cterm) 241 else: 242 rule, _ = cterm_build_rule( 243 sentence_id, init_cterm, target_cterm, priority=priority, defunc_with=defunc_with 244 ) 245 if minimize: 246 rule = minimize_rule_like(rule) 247 return rule
248
[docs] 249 def replace_source(self, node: KCFG.Node) -> KCFG.Edge: 250 assert node.id == self.source.id 251 return KCFG.Edge(node, self.target, self.depth, self.rules)
252
[docs] 253 def replace_target(self, node: KCFG.Node) -> KCFG.Edge: 254 assert node.id == self.target.id 255 return KCFG.Edge(self.source, node, self.depth, self.rules)
256
[docs] 257 @final 258 @dataclass(frozen=True) 259 class MergedEdge(EdgeLike): 260 """Merged edge is a collection of edges that have been merged into a single edge.""" 261 262 source: KCFG.Node 263 target: KCFG.Node 264 edges: tuple[KCFG.Edge, ...] 265
[docs] 266 def to_dict(self) -> dict[str, Any]: 267 return { 268 'source': self.source.id, 269 'target': self.target.id, 270 'edges': [edge.to_dict() for edge in self.edges], 271 }
272
[docs] 273 @staticmethod 274 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Successor: 275 return KCFG.MergedEdge( 276 nodes[dct['source']], 277 nodes[dct['target']], 278 tuple(KCFG.Edge.from_dict(edge, nodes) for edge in dct['edges']), 279 )
280
[docs] 281 def replace_source(self, node: KCFG.Node) -> KCFG.Successor: 282 assert node.id == self.source.id 283 return KCFG.MergedEdge(node, self.target, self.edges)
284
[docs] 285 def replace_target(self, node: KCFG.Node) -> KCFG.Successor: 286 assert node.id == self.target.id 287 return KCFG.MergedEdge(self.source, node, self.edges)
288
[docs] 289 def to_rule(self, label: str, claim: bool = False, priority: int | None = None) -> KRuleLike: 290 return KCFG.Edge(self.source, self.target, 1, ()).to_rule(label, claim, priority)
291
[docs] 292 @final 293 @dataclass(frozen=True) 294 class Cover(EdgeLike): 295 source: KCFG.Node 296 target: KCFG.Node 297 csubst: CSubst 298
[docs] 299 def to_dict(self) -> dict[str, Any]: 300 return { 301 'source': self.source.id, 302 'target': self.target.id, 303 'csubst': self.csubst.to_dict(), 304 }
305
[docs] 306 @staticmethod 307 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Cover: 308 return KCFG.Cover(nodes[dct['source']], nodes[dct['target']], CSubst.from_dict(dct['csubst']))
309
[docs] 310 def replace_source(self, node: KCFG.Node) -> KCFG.Cover: 311 assert node.id == self.source.id 312 return KCFG.Cover(node, self.target, self.csubst)
313
[docs] 314 def replace_target(self, node: KCFG.Node) -> KCFG.Cover: 315 assert node.id == self.target.id 316 return KCFG.Cover(self.source, node, self.csubst)
317
[docs] 318 @dataclass(frozen=True) 319 class MultiEdge(Successor): 320 source: KCFG.Node 321 322 def __lt__(self, other: Any) -> bool: 323 if not type(other) is type(self): 324 return NotImplemented 325 return (self.source, self.target_ids) < (other.source, other.target_ids) 326
[docs] 327 @abstractmethod 328 def with_single_target(self, target: KCFG.Node) -> KCFG.MultiEdge: ...
329
[docs] 330 @final 331 @dataclass(frozen=True) 332 class Split(MultiEdge): 333 source: KCFG.Node 334 _targets: tuple[tuple[KCFG.Node, CSubst], ...] 335 336 def __init__(self, source: KCFG.Node, _targets: Iterable[tuple[KCFG.Node, CSubst]]) -> None: 337 object.__setattr__(self, 'source', source) 338 object.__setattr__(self, '_targets', tuple(_targets)) 339 340 @property 341 def targets(self) -> tuple[KCFG.Node, ...]: 342 return tuple(target for target, _ in self._targets) 343 344 @property 345 def splits(self) -> dict[int, CSubst]: 346 return {target.id: csubst for target, csubst in self._targets} 347
[docs] 348 def to_dict(self) -> dict[str, Any]: 349 return { 350 'source': self.source.id, 351 'targets': [ 352 { 353 'target': target.id, 354 'csubst': csubst.to_dict(), 355 } 356 for target, csubst in self._targets 357 ], 358 }
359
[docs] 360 @staticmethod 361 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Split: 362 _targets = [(nodes[target['target']], CSubst.from_dict(target['csubst'])) for target in dct['targets']] 363 return KCFG.Split(nodes[dct['source']], tuple(_targets))
364
[docs] 365 def with_single_target(self, target: KCFG.Node) -> KCFG.Split: 366 return KCFG.Split(self.source, ((target, self.splits[target.id]),))
367 368 @property 369 def covers(self) -> tuple[KCFG.Cover, ...]: 370 return tuple(KCFG.Cover(target, self.source, csubst) for target, csubst in self._targets) 371
[docs] 372 def replace_source(self, node: KCFG.Node) -> KCFG.Split: 373 assert node.id == self.source.id 374 return KCFG.Split(node, self._targets)
375
[docs] 376 def replace_target(self, node: KCFG.Node) -> KCFG.Split: 377 assert node.id in self.target_ids 378 new_targets = [ 379 (node, csubst) if node.id == target_node.id else (target_node, csubst) 380 for target_node, csubst in self._targets 381 ] 382 return KCFG.Split(self.source, tuple(new_targets))
383
[docs] 384 @final 385 @dataclass(frozen=True) 386 class NDBranch(MultiEdge): 387 source: KCFG.Node 388 _targets: tuple[KCFG.Node, ...] 389 rules: tuple[str, ...] 390 391 def __init__(self, source: KCFG.Node, _targets: Iterable[KCFG.Node], rules: tuple[str, ...]) -> None: 392 object.__setattr__(self, 'source', source) 393 object.__setattr__(self, '_targets', tuple(_targets)) 394 object.__setattr__(self, 'rules', rules) 395 396 @property 397 def targets(self) -> tuple[KCFG.Node, ...]: 398 return self._targets 399
[docs] 400 def to_dict(self) -> dict[str, Any]: 401 return { 402 'source': self.source.id, 403 'targets': [target.id for target in self.targets], 404 'rules': list(self.rules), 405 }
406
[docs] 407 @staticmethod 408 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.NDBranch: 409 return KCFG.NDBranch( 410 nodes[dct['source']], tuple([nodes[target] for target in dct['targets']]), tuple(dct['rules']) 411 )
412
[docs] 413 def with_single_target(self, target: KCFG.Node) -> KCFG.NDBranch: 414 return KCFG.NDBranch(self.source, (target,), ())
415 416 @property 417 def edges(self) -> tuple[KCFG.Edge, ...]: 418 return tuple(KCFG.Edge(self.source, target, 1, ()) for target in self.targets) 419
[docs] 420 def replace_source(self, node: KCFG.Node) -> KCFG.NDBranch: 421 assert node.id == self.source.id 422 return KCFG.NDBranch(node, self._targets, self.rules)
423
[docs] 424 def replace_target(self, node: KCFG.Node) -> KCFG.NDBranch: 425 assert node.id in self.target_ids 426 new_targets = [node if node.id == target_node.id else target_node for target_node in self._targets] 427 return KCFG.NDBranch(self.source, tuple(new_targets), self.rules)
428 429 _node_id: int 430 _nodes: MutableMapping[int, KCFG.Node] 431 432 _created_nodes: set[int] 433 _deleted_nodes: set[int] 434 435 _edges: dict[int, Edge] 436 _merged_edges: dict[int, MergedEdge] 437 _covers: dict[int, Cover] 438 _splits: dict[int, Split] 439 _ndbranches: dict[int, NDBranch] 440 _aliases: dict[str, int] 441 _lock: RLock 442 443 _kcfg_store: KCFGStore | None 444 445 def __init__(self, cfg_dir: Path | None = None, optimize_memory: bool = True) -> None: 446 self._node_id = 1 447 if optimize_memory: 448 from .store import OptimizedNodeStore 449 450 self._nodes = OptimizedNodeStore() 451 else: 452 self._nodes = {} 453 self._created_nodes = set() 454 self._deleted_nodes = set() 455 self._edges = {} 456 self._merged_edges = {} 457 self._covers = {} 458 self._splits = {} 459 self._ndbranches = {} 460 self._aliases = {} 461 self._lock = RLock() 462 if cfg_dir is not None: 463 self._kcfg_store = KCFGStore(cfg_dir) 464 465 def __contains__(self, item: object) -> bool: 466 if type(item) is KCFG.Node: 467 return self.contains_node(item) 468 if type(item) is KCFG.Edge: 469 return self.contains_edge(item) 470 if type(item) is KCFG.MergedEdge: 471 return self.contains_merged_edge(item) 472 if type(item) is KCFG.Cover: 473 return self.contains_cover(item) 474 if type(item) is KCFG.Split: 475 return self.contains_split(item) 476 if type(item) is KCFG.NDBranch: 477 return self.contains_ndbranch(item) 478 return False 479 480 def __enter__(self) -> KCFG: 481 self._lock.acquire() 482 return self 483 484 def __exit__( 485 self, 486 exc_type: type[BaseException] | None, 487 exc_value: BaseException | None, 488 traceback: TracebackType | None, 489 ) -> bool: 490 self._lock.release() 491 if exc_type is None: 492 return True 493 return False 494 495 @property 496 def nodes(self) -> list[KCFG.Node]: 497 return list(self._nodes.values()) 498 499 @property 500 def root(self) -> list[KCFG.Node]: 501 return [node for node in self.nodes if self.is_root(node.id)] 502 503 @property 504 def vacuous(self) -> list[KCFG.Node]: 505 return [node for node in self.nodes if self.is_vacuous(node.id)] 506 507 @property 508 def stuck(self) -> list[KCFG.Node]: 509 return [node for node in self.nodes if self.is_stuck(node.id)] 510 511 @property 512 def leaves(self) -> list[KCFG.Node]: 513 return [node for node in self.nodes if self.is_leaf(node.id)] 514 515 @property 516 def covered(self) -> list[KCFG.Node]: 517 return [node for node in self.nodes if self.is_covered(node.id)] 518 519 @property 520 def uncovered(self) -> list[KCFG.Node]: 521 return [node for node in self.nodes if not self.is_covered(node.id)] 522
[docs] 523 @staticmethod 524 def from_claim( 525 defn: KDefinition, claim: KClaim, cfg_dir: Path | None = None, optimize_memory: bool = True 526 ) -> tuple[KCFG, NodeIdLike, NodeIdLike]: 527 cfg = KCFG(cfg_dir=cfg_dir, optimize_memory=optimize_memory) 528 claim_body = claim.body 529 claim_body = defn.instantiate_cell_vars(claim_body) 530 claim_body = rename_generated_vars(claim_body) 531 532 claim_lhs = CTerm.from_kast(extract_lhs(claim_body)).add_constraint(bool_to_ml_pred(claim.requires)) 533 init_node = cfg.create_node(claim_lhs) 534 535 claim_rhs = CTerm.from_kast(extract_rhs(claim_body)).add_constraint( 536 bool_to_ml_pred(andBool([claim.requires, claim.ensures])) 537 ) 538 target_node = cfg.create_node(claim_rhs) 539 540 return cfg, init_node.id, target_node.id
541
[docs] 542 @staticmethod 543 def path_length(_path: Iterable[KCFG.Successor]) -> int: 544 _path = tuple(_path) 545 if len(_path) == 0: 546 return 0 547 if type(_path[0]) is KCFG.Split or type(_path[0]) is KCFG.Cover: 548 return KCFG.path_length(_path[1:]) 549 elif type(_path[0]) is KCFG.NDBranch: 550 return 1 + KCFG.path_length(_path[1:]) 551 elif type(_path[0]) is KCFG.Edge: 552 return _path[0].depth + KCFG.path_length(_path[1:]) 553 elif type(_path[0]) is KCFG.MergedEdge: 554 return min(edge.depth for edge in _path[0].edges) + KCFG.path_length(_path[1:]) # todo: check this 555 raise ValueError(f'Cannot handle Successor type: {type(_path[0])}')
556
[docs] 557 def extend( 558 self, 559 extend_result: KCFGExtendResult, 560 node: KCFG.Node, 561 logs: dict[int, tuple[LogEntry, ...]], 562 *, 563 optimize_kcfg: bool, 564 ) -> None: 565 566 def log(message: str, *, warning: bool = False) -> None: 567 result_info = extend_result.info if type(extend_result) is Step or type(extend_result) is Branch else '' 568 result_info_message = f': {result_info}' if result_info else '' 569 _LOGGER.log( 570 logging.WARNING if warning else logging.INFO, 571 f'Extending current KCFG with the following: {message}{result_info_message}', 572 ) 573 574 match extend_result: 575 case Vacuous(): 576 self.add_vacuous(node.id) 577 log(f'vacuous node: {node.id}', warning=True) 578 579 case Stuck(): 580 self.add_stuck(node.id) 581 log(f'stuck node: {node.id}') 582 583 case Abstract(cterm): 584 new_node = self.create_node(cterm) 585 self.create_cover(node.id, new_node.id) 586 log(f'abstraction node: {node.id}') 587 588 case Step(cterm, depth, next_node_logs, rule_labels, _): 589 node_id = node.id 590 next_node = self.create_node(cterm) 591 # Optimization for steps consists of on-the-fly merging of consecutive edges and can 592 # be performed only if the current node has a single predecessor connected by an Edge 593 if ( 594 optimize_kcfg 595 and (len(predecessors := self.predecessors(target_id=node.id)) == 1) 596 and isinstance(in_edge := predecessors[0], KCFG.Edge) 597 ): 598 # The existing edge is removed and the step parameters are updated accordingly 599 self.remove_edge(in_edge.source.id, node.id) 600 node_id = in_edge.source.id 601 depth += in_edge.depth 602 rule_labels = list(in_edge.rules) + rule_labels 603 next_node_logs = logs[node.id] + next_node_logs if node.id in logs else next_node_logs 604 self.remove_node(node.id) 605 self.create_edge(node_id, next_node.id, depth, rule_labels) 606 logs[next_node.id] = next_node_logs 607 log(f'basic block at depth {depth}: {node_id} --> {next_node.id}') 608 609 case Branch(branches, _): 610 branch_node_ids = self.split_on_constraints(node.id, branches) 611 log(f'{len(branches)} branches: {node.id} --> {branch_node_ids}') 612 613 case NDBranch(cterms, next_node_logs, rule_labels): 614 next_ids = [self.create_node(cterm).id for cterm in cterms] 615 for i in next_ids: 616 logs[i] = next_node_logs 617 self.create_ndbranch(node.id, next_ids, rules=rule_labels) 618 log(f'{len(cterms)} non-deterministic branches: {node.id} --> {next_ids}') 619 620 case _: 621 raise AssertionError()
622
[docs] 623 def to_dict_no_nodes(self) -> dict[str, Any]: 624 nodes = list(self._nodes.keys()) 625 edges = [edge.to_dict() for edge in self.edges()] 626 merged_edges = [merged_edge.to_dict() for merged_edge in self.merged_edges()] 627 covers = [cover.to_dict() for cover in self.covers()] 628 splits = [split.to_dict() for split in self.splits()] 629 ndbranches = [ndbranch.to_dict() for ndbranch in self.ndbranches()] 630 631 aliases = dict(sorted(self._aliases.items())) 632 633 res = { 634 'next': self._node_id, 635 'nodes': nodes, 636 'edges': edges, 637 'merged_edges': merged_edges, 638 'covers': covers, 639 'splits': splits, 640 'ndbranches': ndbranches, 641 'aliases': aliases, 642 } 643 return {k: v for k, v in res.items() if v}
644
[docs] 645 def to_dict(self) -> dict[str, Any]: 646 nodes = [node.to_dict() for node in self.nodes] 647 edges = [edge.to_dict() for edge in self.edges()] 648 merged_edges = [merged_edge.to_dict() for merged_edge in self.merged_edges()] 649 covers = [cover.to_dict() for cover in self.covers()] 650 splits = [split.to_dict() for split in self.splits()] 651 ndbranches = [ndbranch.to_dict() for ndbranch in self.ndbranches()] 652 653 aliases = dict(sorted(self._aliases.items())) 654 655 res = { 656 'next': self._node_id, 657 'nodes': nodes, 658 'edges': edges, 659 'merged_edges': merged_edges, 660 'covers': covers, 661 'splits': splits, 662 'ndbranches': ndbranches, 663 'aliases': aliases, 664 } 665 return {k: v for k, v in res.items() if v}
666
[docs] 667 @staticmethod 668 def from_dict(dct: Mapping[str, Any], optimize_memory: bool = True) -> KCFG: 669 cfg = KCFG(optimize_memory=optimize_memory) 670 671 for node_dict in dct.get('nodes') or []: 672 node = KCFG.Node.from_dict(node_dict) 673 cfg.add_node(node) 674 675 max_id = max([node.id for node in cfg.nodes], default=0) 676 cfg._node_id = dct.get('next', max_id + 1) 677 678 for edge_dict in dct.get('edges') or []: 679 edge = KCFG.Edge.from_dict(edge_dict, cfg._nodes) 680 cfg.add_successor(edge) 681 682 for edge_dict in dct.get('merged_edges') or []: 683 merged_edge = KCFG.MergedEdge.from_dict(edge_dict, cfg._nodes) 684 cfg.add_successor(merged_edge) 685 686 for cover_dict in dct.get('covers') or []: 687 cover = KCFG.Cover.from_dict(cover_dict, cfg._nodes) 688 cfg.add_successor(cover) 689 690 for split_dict in dct.get('splits') or []: 691 split = KCFG.Split.from_dict(split_dict, cfg._nodes) 692 cfg.add_successor(split) 693 694 for ndbranch_dict in dct.get('ndbranches') or []: 695 ndbranch = KCFG.NDBranch.from_dict(ndbranch_dict, cfg._nodes) 696 cfg.add_successor(ndbranch) 697 698 for alias, node_id in dct.get('aliases', {}).items(): 699 cfg.add_alias(alias=alias, node_id=node_id) 700 701 return cfg
702
[docs] 703 def aliases(self, node_id: NodeIdLike) -> list[str]: 704 node_id = self._resolve(node_id) 705 return [alias for alias, value in self._aliases.items() if node_id == value]
706
[docs] 707 def to_json(self) -> str: 708 return json.dumps(self.to_dict(), sort_keys=True)
709
[docs] 710 @staticmethod 711 def from_json(s: str, optimize_memory: bool = True) -> KCFG: 712 return KCFG.from_dict(json.loads(s), optimize_memory=optimize_memory)
713
[docs] 714 def to_rules(self, _id: str | None = None, priority: int = 20) -> list[KRuleLike]: 715 _id = 'BASIC-BLOCK' if _id is None else _id 716 return [e.to_rule(_id, priority=priority) for e in self.edges()] + [ 717 m.to_rule(_id, priority=priority) for m in self.merged_edges() 718 ]
719
[docs] 720 def to_module( 721 self, 722 module_name: str | None = None, 723 imports: Iterable[KImport] = (), 724 priority: int = 20, 725 att: KAtt = EMPTY_ATT, 726 ) -> KFlatModule: 727 module_name = 'KCFG' if module_name is None else module_name 728 return KFlatModule(module_name, self.to_rules(priority=priority), imports=imports, att=att)
729 730 def _resolve_or_none(self, id_like: NodeIdLike) -> int | None: 731 if type(id_like) is int: 732 if id_like in self._nodes: 733 return id_like 734 735 return None 736 737 if type(id_like) is not str: 738 raise TypeError(f'Expected int or str for id_like, got: {id_like}') 739 740 if id_like.startswith('@'): 741 if id_like[1:] in self._aliases: 742 return self._aliases[id_like[1:]] 743 raise ValueError(f'Unknown alias: {id_like}') 744 745 return None 746 747 def _resolve(self, id_like: NodeIdLike) -> int: 748 match = self._resolve_or_none(id_like) 749 if not match: 750 raise ValueError(f'Unknown node: {id_like}') 751 return match 752
[docs] 753 def node(self, node_id: NodeIdLike) -> KCFG.Node: 754 node_id = self._resolve(node_id) 755 return self._nodes[node_id]
756
[docs] 757 def get_node(self, node_id: NodeIdLike) -> KCFG.Node | None: 758 resolved_id = self._resolve_or_none(node_id) 759 if resolved_id is None: 760 return None 761 return self._nodes[resolved_id]
762
[docs] 763 def contains_node(self, node: KCFG.Node) -> bool: 764 return bool(self.get_node(node.id))
765
[docs] 766 def add_node(self, node: KCFG.Node) -> None: 767 if node.id in self._nodes: 768 raise ValueError(f'Node with id already exists: {node.id}') 769 self._nodes[node.id] = node 770 self._created_nodes.add(node.id)
771
[docs] 772 def create_node(self, cterm: CTerm) -> KCFG.Node: 773 node = KCFG.Node(self._node_id, cterm) 774 self._node_id += 1 775 self._nodes[node.id] = node 776 self._created_nodes.add(node.id) 777 return node
778
[docs] 779 def remove_node(self, node_id: NodeIdLike) -> None: 780 self.remove_edges_around(node_id) 781 782 node_id = self._resolve(node_id) 783 self._nodes.pop(node_id) 784 self._deleted_nodes.add(node_id) 785 self._created_nodes.discard(node_id)
786
[docs] 787 def remove_edges_around(self, node_id: NodeIdLike) -> None: 788 node_id = self._resolve(node_id) 789 790 self._edges = {k: s for k, s in self._edges.items() if k != node_id and node_id not in s.target_ids} 791 self._merged_edges = { 792 k: s for k, s in self._merged_edges.items() if k != node_id and node_id not in s.target_ids 793 } 794 self._covers = {k: s for k, s in self._covers.items() if k != node_id and node_id not in s.target_ids} 795 796 self._splits = {k: s for k, s in self._splits.items() if k != node_id and node_id not in s.target_ids} 797 self._ndbranches = {k: b for k, b in self._ndbranches.items() if k != node_id and node_id not in b.target_ids} 798 799 for alias in [alias for alias, id in self._aliases.items() if id == node_id]: 800 self.remove_alias(alias)
801 802 def _update_refs(self, node_id: int) -> None: 803 node = self.node(node_id) 804 for succ in self.successors(node_id): 805 new_succ = succ.replace_source(node) 806 if type(new_succ) is KCFG.Edge: 807 self._edges[new_succ.source.id] = new_succ 808 if type(new_succ) is KCFG.MergedEdge: 809 self._merged_edges[new_succ.source.id] = new_succ 810 if type(new_succ) is KCFG.Cover: 811 self._covers[new_succ.source.id] = new_succ 812 if type(new_succ) is KCFG.Split: 813 self._splits[new_succ.source.id] = new_succ 814 if type(new_succ) is KCFG.NDBranch: 815 self._ndbranches[new_succ.source.id] = new_succ 816 817 for pred in self.predecessors(node_id): 818 new_pred = pred.replace_target(node) 819 if type(new_pred) is KCFG.Edge: 820 self._edges[new_pred.source.id] = new_pred 821 if type(new_pred) is KCFG.MergedEdge: 822 self._merged_edges[new_pred.source.id] = new_pred 823 if type(new_pred) is KCFG.Cover: 824 self._covers[new_pred.source.id] = new_pred 825 if type(new_pred) is KCFG.Split: 826 self._splits[new_pred.source.id] = new_pred 827 if type(new_pred) is KCFG.NDBranch: 828 self._ndbranches[new_pred.source.id] = new_pred 829
[docs] 830 def remove_attr(self, node_id: NodeIdLike, attr: NodeAttr) -> None: 831 node = self.node(node_id) 832 new_node = node.remove_attr(attr) 833 self.replace_node(new_node)
834
[docs] 835 def discard_attr(self, node_id: NodeIdLike, attr: NodeAttr) -> None: 836 node = self.node(node_id) 837 new_node = node.discard_attr(attr) 838 self.replace_node(new_node)
839
[docs] 840 def add_attr(self, node_id: NodeIdLike, attr: NodeAttr) -> None: 841 node = self.node(node_id) 842 new_node = node.add_attr(attr) 843 self.replace_node(new_node)
844
[docs] 845 def let_node( 846 self, node_id: NodeIdLike, cterm: CTerm | None = None, attrs: Iterable[KCFGNodeAttr] | None = None 847 ) -> None: 848 node = self.node(node_id) 849 new_node = node.let(cterm=cterm, attrs=attrs) 850 self.replace_node(new_node)
851
[docs] 852 def replace_node(self, node: KCFG.Node) -> None: 853 self._nodes[node.id] = node 854 self._created_nodes.add(node.id) 855 self._update_refs(node.id)
856
[docs] 857 def successors(self, source_id: NodeIdLike) -> list[Successor]: 858 out_edges: Iterable[KCFG.Successor] = self.edges(source_id=source_id) 859 out_merged_edges: Iterable[KCFG.Successor] = self.merged_edges(source_id=source_id) 860 out_covers: Iterable[KCFG.Successor] = self.covers(source_id=source_id) 861 out_splits: Iterable[KCFG.Successor] = self.splits(source_id=source_id) 862 out_ndbranches: Iterable[KCFG.Successor] = self.ndbranches(source_id=source_id) 863 return list(out_edges) + list(out_merged_edges) + list(out_covers) + list(out_splits) + list(out_ndbranches)
864
[docs] 865 def predecessors(self, target_id: NodeIdLike) -> list[Successor]: 866 in_edges: Iterable[KCFG.Successor] = self.edges(target_id=target_id) 867 in_merged_edges: Iterable[KCFG.Successor] = self.merged_edges(target_id=target_id) 868 in_covers: Iterable[KCFG.Successor] = self.covers(target_id=target_id) 869 in_splits: Iterable[KCFG.Successor] = self.splits(target_id=target_id) 870 in_ndbranches: Iterable[KCFG.Successor] = self.ndbranches(target_id=target_id) 871 return list(in_edges) + list(in_merged_edges) + list(in_covers) + list(in_splits) + list(in_ndbranches)
872 873 def _check_no_successors(self, source_id: NodeIdLike) -> None: 874 if len(self.successors(source_id)) > 0: 875 raise ValueError(f'Node already has successors: {source_id} -> {self.successors(source_id)}') 876 877 def _check_no_zero_loops(self, source_id: NodeIdLike, target_ids: Iterable[NodeIdLike]) -> None: 878 for target_id in target_ids: 879 path = self.shortest_path_between(target_id, source_id) 880 if path is not None and KCFG.path_length(path) == 0: 881 raise ValueError( 882 f'Adding successor would create zero-length loop with backedge: {source_id} -> {target_id}' 883 ) 884
[docs] 885 def add_successor(self, succ: KCFG.Successor) -> None: 886 self._check_no_successors(succ.source.id) 887 self._check_no_zero_loops(succ.source.id, succ.target_ids) 888 if type(succ) is KCFG.Edge: 889 self._edges[succ.source.id] = succ 890 elif type(succ) is KCFG.MergedEdge: 891 self._merged_edges[succ.source.id] = succ 892 elif type(succ) is KCFG.Cover: 893 self._covers[succ.source.id] = succ 894 else: 895 if len(succ.target_ids) <= 1: 896 raise ValueError( 897 f'Cannot create {type(succ)} node with less than 2 targets: {succ.source.id} -> {succ.target_ids}' 898 ) 899 if type(succ) is KCFG.Split: 900 self._splits[succ.source.id] = succ 901 elif type(succ) is KCFG.NDBranch: 902 self._ndbranches[succ.source.id] = succ
903
[docs] 904 def edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> Edge | None: 905 source_id = self._resolve(source_id) 906 target_id = self._resolve(target_id) 907 edge = self._edges.get(source_id, None) 908 return edge if edge is not None and edge.target.id == target_id else None
909
[docs] 910 def edges(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[Edge]: 911 source_id = self._resolve(source_id) if source_id is not None else None 912 target_id = self._resolve(target_id) if target_id is not None else None 913 return [ 914 edge 915 for edge in self._edges.values() 916 if (source_id is None or source_id == edge.source.id) and (target_id is None or target_id == edge.target.id) 917 ]
918
[docs] 919 def contains_edge(self, edge: Edge) -> bool: 920 if other := self.edge(source_id=edge.source.id, target_id=edge.target.id): 921 return edge == other 922 return False
923
[docs] 924 def create_edge(self, source_id: NodeIdLike, target_id: NodeIdLike, depth: int, rules: Iterable[str] = ()) -> Edge: 925 if depth <= 0: 926 raise ValueError(f'Cannot build KCFG Edge with non-positive depth: {depth}') 927 source = self.node(source_id) 928 target = self.node(target_id) 929 edge = KCFG.Edge(source, target, depth, tuple(rules)) 930 self.add_successor(edge) 931 return edge
932
[docs] 933 def remove_edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> None: 934 source_id = self._resolve(source_id) 935 target_id = self._resolve(target_id) 936 edge = self.edge(source_id, target_id) 937 if not edge: 938 raise ValueError(f'Edge does not exist: {source_id} -> {target_id}') 939 self._edges.pop(source_id)
940
[docs] 941 def merged_edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> MergedEdge | None: 942 source_id = self._resolve(source_id) 943 target_id = self._resolve(target_id) 944 merged_edge = self._merged_edges.get(source_id, None) 945 return merged_edge if merged_edge is not None and merged_edge.target.id == target_id else None
946
[docs] 947 def merged_edges( 948 self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None 949 ) -> list[MergedEdge]: 950 source_id = self._resolve(source_id) if source_id is not None else None 951 target_id = self._resolve(target_id) if target_id is not None else None 952 return [ 953 merged_edge 954 for merged_edge in self._merged_edges.values() 955 if (source_id is None or source_id == merged_edge.source.id) 956 and (target_id is None or target_id == merged_edge.target.id) 957 ]
958
[docs] 959 def contains_merged_edge(self, edge: MergedEdge) -> bool: 960 if other := self.merged_edge(source_id=edge.source.id, target_id=edge.target.id): 961 return edge == other 962 return False
963
[docs] 964 def create_merged_edge( 965 self, source_id: NodeIdLike, target_id: NodeIdLike, edges: Iterable[Edge | MergedEdge] 966 ) -> MergedEdge: 967 if len(list(edges)) == 0: 968 raise ValueError(f'Cannot build KCFG MergedEdge with no edges: {edges}') 969 source = self.node(source_id) 970 target = self.node(target_id) 971 flatten_edges: list[KCFG.Edge] = [] 972 for edge in edges: 973 if isinstance(edge, KCFG.MergedEdge): 974 flatten_edges.extend(edge.edges) 975 else: 976 flatten_edges.append(edge) 977 merged_edge = KCFG.MergedEdge(source, target, tuple(flatten_edges)) 978 self.add_successor(merged_edge) 979 return merged_edge
980
[docs] 981 def remove_merged_edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> None: 982 source_id = self._resolve(source_id) 983 target_id = self._resolve(target_id) 984 merged_edge = self.merged_edge(source_id, target_id) 985 if not merged_edge: 986 raise ValueError(f'MergedEdge does not exist: {source_id} -> {target_id}') 987 self._merged_edges.pop(source_id)
988
[docs] 989 def general_edges( 990 self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None 991 ) -> list[Edge | MergedEdge]: 992 return self.edges(source_id=source_id, target_id=target_id) + self.merged_edges( 993 source_id=source_id, target_id=target_id 994 )
995
[docs] 996 def cover(self, source_id: NodeIdLike, target_id: NodeIdLike) -> Cover | None: 997 source_id = self._resolve(source_id) 998 target_id = self._resolve(target_id) 999 cover = self._covers.get(source_id, None) 1000 return cover if cover is not None and cover.target.id == target_id else None
1001
[docs] 1002 def covers(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[Cover]: 1003 source_id = self._resolve(source_id) if source_id is not None else None 1004 target_id = self._resolve(target_id) if target_id is not None else None 1005 return [ 1006 cover 1007 for cover in self._covers.values() 1008 if (source_id is None or source_id == cover.source.id) 1009 and (target_id is None or target_id == cover.target.id) 1010 ]
1011
[docs] 1012 def contains_cover(self, cover: Cover) -> bool: 1013 if other := self.cover(source_id=cover.source.id, target_id=cover.target.id): 1014 return cover == other 1015 return False
1016
[docs] 1017 def create_cover(self, source_id: NodeIdLike, target_id: NodeIdLike, csubst: CSubst | None = None) -> Cover: 1018 source = self.node(source_id) 1019 target = self.node(target_id) 1020 if csubst is None: 1021 csubst = target.cterm.match_with_constraint(source.cterm) 1022 if csubst is None: 1023 raise ValueError(f'No matching between: {source.id} and {target.id}') 1024 cover = KCFG.Cover(source, target, csubst=csubst) 1025 self.add_successor(cover) 1026 return cover
1027
[docs] 1028 def remove_cover(self, source_id: NodeIdLike, target_id: NodeIdLike) -> None: 1029 source_id = self._resolve(source_id) 1030 target_id = self._resolve(target_id) 1031 cover = self.cover(source_id, target_id) 1032 if not cover: 1033 raise ValueError(f'Cover does not exist: {source_id} -> {target_id}') 1034 self._covers.pop(source_id)
1035
[docs] 1036 def edge_likes(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[EdgeLike]: 1037 return ( 1038 cast('List[KCFG.EdgeLike]', self.edges(source_id=source_id, target_id=target_id)) 1039 + cast('List[KCFG.EdgeLike]', self.covers(source_id=source_id, target_id=target_id)) 1040 + cast('List[KCFG.EdgeLike]', self.merged_edges(source_id=source_id, target_id=target_id)) 1041 )
1042
[docs] 1043 def add_vacuous(self, node_id: NodeIdLike) -> None: 1044 self.add_attr(node_id, KCFGNodeAttr.VACUOUS)
1045
[docs] 1046 def remove_vacuous(self, node_id: NodeIdLike) -> None: 1047 self.remove_attr(node_id, KCFGNodeAttr.VACUOUS)
1048
[docs] 1049 def discard_vacuous(self, node_id: NodeIdLike) -> None: 1050 self.discard_attr(node_id, KCFGNodeAttr.VACUOUS)
1051
[docs] 1052 def add_stuck(self, node_id: NodeIdLike) -> None: 1053 self.add_attr(node_id, KCFGNodeAttr.STUCK)
1054
[docs] 1055 def remove_stuck(self, node_id: NodeIdLike) -> None: 1056 self.remove_attr(node_id, KCFGNodeAttr.STUCK)
1057
[docs] 1058 def discard_stuck(self, node_id: NodeIdLike) -> None: 1059 self.discard_attr(node_id, KCFGNodeAttr.STUCK)
1060
[docs] 1061 def splits(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[Split]: 1062 source_id = self._resolve(source_id) if source_id is not None else None 1063 target_id = self._resolve(target_id) if target_id is not None else None 1064 return [ 1065 s 1066 for s in self._splits.values() 1067 if (source_id is None or source_id == s.source.id) and (target_id is None or target_id in s.target_ids) 1068 ]
1069
[docs] 1070 def contains_split(self, split: Split) -> bool: 1071 return split in self._splits.values()
1072
[docs] 1073 def create_split(self, source_id: NodeIdLike, splits: Iterable[tuple[NodeIdLike, CSubst]]) -> KCFG.Split: 1074 source_id = self._resolve(source_id) 1075 split = KCFG.Split(self.node(source_id), tuple((self.node(nid), csubst) for nid, csubst in list(splits))) 1076 self.add_successor(split) 1077 return split
1078
[docs] 1079 def create_split_by_nodes(self, source_id: NodeIdLike, target_ids: Iterable[NodeIdLike]) -> KCFG.Split | None: 1080 """Create a split without crafting a CSubst.""" 1081 source = self.node(source_id) 1082 targets = [self.node(nid) for nid in target_ids] 1083 try: 1084 csubsts = [not_none(source.cterm.match_with_constraint(target.cterm)) for target in targets] 1085 except ValueError: 1086 return None 1087 return self.create_split(source.id, zip(target_ids, csubsts, strict=True))
1088
[docs] 1089 def ndbranches(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[NDBranch]: 1090 source_id = self._resolve(source_id) if source_id is not None else None 1091 target_id = self._resolve(target_id) if target_id is not None else None 1092 return [ 1093 b 1094 for b in self._ndbranches.values() 1095 if (source_id is None or source_id == b.source.id) and (target_id is None or target_id in b.target_ids) 1096 ]
1097
[docs] 1098 def contains_ndbranch(self, ndbranch: NDBranch) -> bool: 1099 return ndbranch in self._ndbranches
1100
[docs] 1101 def create_ndbranch( 1102 self, source_id: NodeIdLike, ndbranches: Iterable[NodeIdLike], rules: Iterable[str] = () 1103 ) -> KCFG.NDBranch: 1104 source_id = self._resolve(source_id) 1105 ndbranch = KCFG.NDBranch(self.node(source_id), tuple(self.node(nid) for nid in list(ndbranches)), tuple(rules)) 1106 self.add_successor(ndbranch) 1107 return ndbranch
1108
[docs] 1109 def split_on_constraints(self, source_id: NodeIdLike, constraints: Iterable[KInner]) -> list[int]: 1110 source = self.node(source_id) 1111 branch_node_ids = [self.create_node(source.cterm.add_constraint(c)).id for c in constraints] 1112 csubsts = [not_none(source.cterm.match_with_constraint(self.node(id).cterm)) for id in branch_node_ids] 1113 self.create_split(source.id, zip(branch_node_ids, csubsts, strict=True)) 1114 return branch_node_ids
1115
[docs] 1116 def add_alias(self, alias: str, node_id: NodeIdLike) -> None: 1117 if '@' in alias: 1118 raise ValueError('Alias may not contain "@"') 1119 if alias in self._aliases: 1120 raise ValueError(f'Duplicate alias: {alias}') 1121 node_id = self._resolve(node_id) 1122 self._aliases[alias] = node_id
1123
[docs] 1124 def remove_alias(self, alias: str) -> None: 1125 if alias not in self._aliases: 1126 raise ValueError(f'Alias does not exist: {alias}') 1127 self._aliases.pop(alias)
1128
[docs] 1129 def is_root(self, node_id: NodeIdLike) -> bool: 1130 node_id = self._resolve(node_id) 1131 return len(self.predecessors(node_id)) == 0
1132
[docs] 1133 def is_vacuous(self, node_id: NodeIdLike) -> bool: 1134 return KCFGNodeAttr.VACUOUS in self.node(node_id).attrs
1135
[docs] 1136 def is_stuck(self, node_id: NodeIdLike) -> bool: 1137 return KCFGNodeAttr.STUCK in self.node(node_id).attrs
1138
[docs] 1139 def is_split(self, node_id: NodeIdLike) -> bool: 1140 node_id = self._resolve(node_id) 1141 return node_id in self._splits
1142
[docs] 1143 def is_ndbranch(self, node_id: NodeIdLike) -> bool: 1144 node_id = self._resolve(node_id) 1145 return node_id in self._ndbranches
1146
[docs] 1147 def is_leaf(self, node_id: NodeIdLike) -> bool: 1148 return len(self.successors(node_id)) == 0
1149
[docs] 1150 def is_covered(self, node_id: NodeIdLike) -> bool: 1151 node_id = self._resolve(node_id) 1152 return node_id in self._covers
1153
[docs] 1154 def prune(self, node_id: NodeIdLike, keep_nodes: Iterable[NodeIdLike] = ()) -> list[int]: 1155 nodes = self.reachable_nodes(node_id) 1156 keep_nodes = [self._resolve(nid) for nid in keep_nodes] 1157 pruned_nodes: list[int] = [] 1158 for node in nodes: 1159 if node.id not in keep_nodes: 1160 self.remove_node(node.id) 1161 pruned_nodes.append(node.id) 1162 return pruned_nodes
1163
[docs] 1164 def shortest_path_between( 1165 self, source_node_id: NodeIdLike, target_node_id: NodeIdLike 1166 ) -> tuple[Successor, ...] | None: 1167 paths = self.paths_between(source_node_id, target_node_id) 1168 if len(paths) == 0: 1169 return None 1170 return sorted(paths, key=(lambda path: KCFG.path_length(path)))[0]
1171
[docs] 1172 def shortest_distance_between(self, node_1_id: NodeIdLike, node_2_id: NodeIdLike) -> int | None: 1173 path_1 = self.shortest_path_between(node_1_id, node_2_id) 1174 path_2 = self.shortest_path_between(node_2_id, node_1_id) 1175 distance: int | None = None 1176 if path_1 is not None: 1177 distance = KCFG.path_length(path_1) 1178 if path_2 is not None: 1179 distance_2 = KCFG.path_length(path_2) 1180 if distance is None or distance_2 < distance: 1181 distance = distance_2 1182 return distance
1183
[docs] 1184 def zero_depth_between(self, node_1_id: NodeIdLike, node_2_id: NodeIdLike) -> bool: 1185 _node_1_id = self._resolve(node_1_id) 1186 _node_2_id = self._resolve(node_2_id) 1187 if _node_1_id == _node_2_id: 1188 return True 1189 # Short-circuit and don't run pathing algorithm if there is no 0 length path on the first step. 1190 path_lengths = [ 1191 self.path_length([successor]) for successor in self.successors(_node_1_id) + self.successors(_node_2_id) 1192 ] 1193 if 0 not in path_lengths: 1194 return False 1195 1196 shortest_distance = self.shortest_distance_between(_node_1_id, _node_2_id) 1197 1198 return shortest_distance is not None and shortest_distance == 0
1199
[docs] 1200 def paths_between(self, source_id: NodeIdLike, target_id: NodeIdLike) -> list[tuple[Successor, ...]]: 1201 source_id = self._resolve(source_id) 1202 target_id = self._resolve(target_id) 1203 1204 if source_id == target_id: 1205 return [()] 1206 1207 source_successors = list(self.successors(source_id)) 1208 assert len(source_successors) <= 1 1209 if len(source_successors) == 0: 1210 return [] 1211 1212 paths: list[tuple[KCFG.Successor, ...]] = [] 1213 worklist: list[list[KCFG.Successor]] = [[source_successors[0]]] 1214 1215 def _in_path(_nid: int, _path: list[KCFG.Successor]) -> bool: 1216 for succ in _path: 1217 if _nid == succ.source.id: 1218 return True 1219 if len(_path) > 0: 1220 if isinstance(_path[-1], KCFG.EdgeLike) and _path[-1].target.id == _nid: 1221 return True 1222 elif isinstance(_path[-1], KCFG.MultiEdge) and _nid in _path[-1].target_ids: 1223 return True 1224 return False 1225 1226 while worklist: 1227 curr_path = worklist.pop() 1228 curr_successor = curr_path[-1] 1229 successors: list[KCFG.Successor] = [] 1230 1231 if isinstance(curr_successor, KCFG.EdgeLike): 1232 if curr_successor.target.id == target_id: 1233 paths.append(tuple(curr_path)) 1234 continue 1235 else: 1236 successors = list(self.successors(curr_successor.target.id)) 1237 1238 elif isinstance(curr_successor, KCFG.MultiEdge): 1239 if len(list(curr_successor.targets)) == 1: 1240 target = list(curr_successor.targets)[0] 1241 if target.id == target_id: 1242 paths.append(tuple(curr_path)) 1243 continue 1244 else: 1245 successors = list(self.successors(target.id)) 1246 if len(list(curr_successor.targets)) > 1: 1247 curr_path = curr_path[0:-1] 1248 successors = [curr_successor.with_single_target(target) for target in curr_successor.targets] 1249 1250 for successor in successors: 1251 if isinstance(successor, KCFG.EdgeLike) and not _in_path(successor.target.id, curr_path): 1252 worklist.append(curr_path + [successor]) 1253 elif isinstance(successor, KCFG.MultiEdge): 1254 if len(list(successor.targets)) == 1: 1255 target = list(successor.targets)[0] 1256 if not _in_path(target.id, curr_path): 1257 worklist.append(curr_path + [successor]) 1258 elif len(list(successor.targets)) > 1: 1259 worklist.append(curr_path + [successor]) 1260 1261 return paths
1262
[docs] 1263 def reachable_nodes(self, source_id: NodeIdLike, *, reverse: bool = False) -> set[KCFG.Node]: 1264 visited: set[KCFG.Node] = set() 1265 worklist: list[KCFG.Node] = [self.node(source_id)] 1266 1267 while worklist: 1268 node = worklist.pop() 1269 1270 if node in visited: 1271 continue 1272 1273 visited.add(node) 1274 1275 if not reverse: 1276 worklist.extend(target for succ in self.successors(source_id=node.id) for target in succ.targets) 1277 else: 1278 worklist.extend(succ.source for succ in self.predecessors(target_id=node.id)) 1279 1280 return visited
1281
[docs] 1282 def write_cfg_data(self) -> None: 1283 assert self._kcfg_store is not None 1284 self._kcfg_store.write_cfg_data( 1285 self, self.to_dict_no_nodes(), deleted_nodes=self._deleted_nodes, created_nodes=self._created_nodes 1286 ) 1287 self._deleted_nodes.clear() 1288 self._created_nodes.clear()
1289
[docs] 1290 @staticmethod 1291 def read_cfg_data(cfg_dir: Path) -> KCFG: 1292 store = KCFGStore(cfg_dir) 1293 cfg = KCFG.from_dict(store.read_cfg_data()) 1294 cfg._kcfg_store = store 1295 return cfg
1296
[docs] 1297 @staticmethod 1298 def read_node_data(cfg_dir: Path, node_id: int) -> KCFG.Node: 1299 store = KCFGStore(cfg_dir) 1300 return KCFG.Node.from_dict(store.read_node_data(node_id))
1301 1302
[docs] 1303class KCFGExtendResult(ABC): ...
1304 1305
[docs] 1306@final 1307@dataclass(frozen=True) 1308class Vacuous(KCFGExtendResult): ...
1309 1310
[docs] 1311@final 1312@dataclass(frozen=True) 1313class Stuck(KCFGExtendResult): ...
1314 1315
[docs] 1316@final 1317@dataclass(frozen=True) 1318class Abstract(KCFGExtendResult): 1319 cterm: CTerm
1320 1321
[docs] 1322@final 1323@dataclass(frozen=True) 1324class Step(KCFGExtendResult): 1325 cterm: CTerm 1326 depth: int 1327 logs: tuple[LogEntry, ...] 1328 rule_labels: list[str] 1329 cut: bool = field(default=False) 1330 info: str = field(default='')
1331 1332
[docs] 1333@final 1334@dataclass(frozen=True) 1335class Branch(KCFGExtendResult): 1336 constraints: tuple[KInner, ...] 1337 heuristic: bool 1338 info: str = field(default='') 1339 1340 def __init__(self, constraints: Iterable[KInner], *, heuristic: bool = False, info: str = ''): 1341 object.__setattr__(self, 'constraints', tuple(constraints)) 1342 object.__setattr__(self, 'heuristic', heuristic) 1343 object.__setattr__(self, 'info', info)
1344 1345
[docs] 1346@final 1347@dataclass(frozen=True) 1348class NDBranch(KCFGExtendResult): 1349 cterms: tuple[CTerm, ...] 1350 logs: tuple[LogEntry, ...] 1351 rule_labels: tuple[str, ...] 1352 1353 def __init__(self, cterms: Iterable[CTerm], logs: Iterable[LogEntry,], rule_labels: Iterable[str]): 1354 object.__setattr__(self, 'cterms', tuple(cterms)) 1355 object.__setattr__(self, 'logs', tuple(logs)) 1356 object.__setattr__(self, 'rule_labels', tuple(rule_labels))