Source code for pyk.kcfg.kcfg

   1from __future__ import annotations
   2
   3import json
   4import logging
   5from abc import ABC, abstractmethod
   6from collections.abc import Container
   7from dataclasses import dataclass, field
   8from functools import reduce
   9from threading import RLock
  10from typing import TYPE_CHECKING, Final, List, Union, cast, final
  11
  12from pyk.cterm.cterm import cterm_match
  13
  14from ..cterm import CSubst, CTerm, cterm_build_claim, cterm_build_rule
  15from ..kast import EMPTY_ATT
  16from ..kast.inner import KApply
  17from ..kast.manip import (
  18    bool_to_ml_pred,
  19    extract_lhs,
  20    extract_rhs,
  21    flatten_label,
  22    inline_cell_maps,
  23    minimize_rule_like,
  24    rename_generated_vars,
  25    sort_ac_collections,
  26)
  27from ..kast.outer import KFlatModule
  28from ..prelude.kbool import andBool
  29from ..utils import ensure_dir_path, not_none
  30
  31if TYPE_CHECKING:
  32    from collections.abc import Iterable, Mapping, MutableMapping
  33    from pathlib import Path
  34    from types import TracebackType
  35    from typing import Any
  36
  37    from pyk.kore.rpc import LogEntry
  38
  39    from ..kast import KAtt
  40    from ..kast.inner import KInner
  41    from ..kast.outer import KClaim, KDefinition, KImport, KRuleLike
  42
  43
  44NodeIdLike = int | str
  45
  46_LOGGER: Final = logging.getLogger(__name__)
  47
  48
[docs] 49@dataclass(frozen=True) 50class NodeAttr: 51 value: str
52 53
[docs] 54class KCFGNodeAttr(NodeAttr): 55 VACUOUS = NodeAttr('vacuous') 56 STUCK = NodeAttr('stuck')
57 58
[docs] 59class KCFGStore: 60 store_path: Path 61 62 def __init__(self, store_path: Path) -> None: 63 self.store_path = store_path 64 ensure_dir_path(store_path) 65 ensure_dir_path(self.kcfg_node_dir) 66 67 @property 68 def kcfg_json_path(self) -> Path: 69 return self.store_path / 'kcfg.json' 70 71 @property 72 def kcfg_node_dir(self) -> Path: 73 return self.store_path / 'nodes' 74
[docs] 75 def kcfg_node_path(self, node_id: int) -> Path: 76 return self.kcfg_node_dir / f'{node_id}.json'
77
[docs] 78 def write_cfg_data( 79 self, kcfg: KCFG, dct: dict[str, Any], deleted_nodes: Iterable[int] = (), created_nodes: Iterable[int] = () 80 ) -> None: 81 vacuous_nodes = [ 82 node_id for node_id in kcfg._nodes.keys() if KCFGNodeAttr.VACUOUS in kcfg._nodes[node_id].attrs 83 ] 84 stuck_nodes = [node_id for node_id in kcfg._nodes.keys() if KCFGNodeAttr.STUCK in kcfg._nodes[node_id].attrs] 85 dct['vacuous'] = vacuous_nodes 86 dct['stuck'] = stuck_nodes 87 for node_id in deleted_nodes: 88 self.kcfg_node_path(node_id).unlink(missing_ok=True) 89 for node_id in created_nodes: 90 self.kcfg_node_path(node_id).write_text(json.dumps(kcfg._nodes[node_id].to_dict())) 91 self.kcfg_json_path.write_text(json.dumps(dct))
92
[docs] 93 def read_cfg_data(self) -> dict[str, Any]: 94 dct = json.loads(self.kcfg_json_path.read_text()) 95 nodes = [self.read_node_data(node_id) for node_id in dct.get('nodes') or []] 96 dct['nodes'] = nodes 97 98 new_nodes = [] 99 for node in dct['nodes']: 100 attrs = [] 101 if node['id'] in dct['vacuous']: 102 attrs.append(KCFGNodeAttr.VACUOUS.value) 103 if node['id'] in dct['stuck']: 104 attrs.append(KCFGNodeAttr.STUCK.value) 105 new_nodes.append({'id': node['id'], 'cterm': node['cterm'], 'attrs': attrs}) 106 107 dct['nodes'] = new_nodes 108 109 del dct['vacuous'] 110 del dct['stuck'] 111 112 return dct
113
[docs] 114 def read_node_data(self, node_id: int) -> dict[str, Any]: 115 return json.loads(self.kcfg_node_path(node_id).read_text())
116 117
[docs] 118class KCFG(Container[Union['KCFG.Node', 'KCFG.Successor']]):
[docs] 119 @final 120 @dataclass(frozen=True, order=True) 121 class Node: 122 id: int 123 cterm: CTerm 124 attrs: frozenset[NodeAttr] 125 126 def __init__(self, id: int, cterm: CTerm, attrs: Iterable[NodeAttr] = ()) -> None: 127 object.__setattr__(self, 'id', id) 128 object.__setattr__(self, 'cterm', cterm) 129 object.__setattr__(self, 'attrs', frozenset(attrs)) 130
[docs] 131 def to_dict(self) -> dict[str, Any]: 132 return {'id': self.id, 'cterm': self.cterm.to_dict(), 'attrs': [attr.value for attr in self.attrs]}
133
[docs] 134 @staticmethod 135 def from_dict(dct: dict[str, Any]) -> KCFG.Node: 136 return KCFG.Node(dct['id'], CTerm.from_dict(dct['cterm']), [NodeAttr(attr) for attr in dct['attrs']])
137
[docs] 138 def add_attr(self, attr: NodeAttr) -> KCFG.Node: 139 return KCFG.Node(self.id, self.cterm, list(self.attrs) + [attr])
140
[docs] 141 def remove_attr(self, attr: NodeAttr) -> KCFG.Node: 142 if attr not in self.attrs: 143 raise ValueError(f'Node {self.id} does not have attribute {attr.value}') 144 return KCFG.Node(self.id, self.cterm, self.attrs.difference([attr]))
145
[docs] 146 def discard_attr(self, attr: NodeAttr) -> KCFG.Node: 147 return KCFG.Node(self.id, self.cterm, self.attrs.difference([attr]))
148
[docs] 149 def let(self, cterm: CTerm | None = None, attrs: Iterable[KCFGNodeAttr] | None = None) -> KCFG.Node: 150 new_cterm = cterm if cterm is not None else self.cterm 151 new_attrs = attrs if attrs is not None else self.attrs 152 return KCFG.Node(self.id, new_cterm, new_attrs)
153 154 @property 155 def free_vars(self) -> frozenset[str]: 156 return frozenset(self.cterm.free_vars)
157
[docs] 158 class Successor(ABC): 159 source: KCFG.Node 160 161 def __lt__(self, other: Any) -> bool: 162 if not isinstance(other, KCFG.Successor): 163 return NotImplemented 164 return self.source < other.source 165 166 @property 167 def source_vars(self) -> frozenset[str]: 168 return frozenset(self.source.free_vars) 169 170 @property 171 @abstractmethod 172 def targets(self) -> tuple[KCFG.Node, ...]: ... 173 174 @property 175 def target_ids(self) -> list[int]: 176 return sorted(target.id for target in self.targets) 177 178 @property 179 def target_vars(self) -> frozenset[str]: 180 return frozenset(set.union(set(), *(target.free_vars for target in self.targets))) 181
[docs] 182 @abstractmethod 183 def replace_source(self, node: KCFG.Node) -> KCFG.Successor: ...
184
[docs] 185 @abstractmethod 186 def replace_target(self, node: KCFG.Node) -> KCFG.Successor: ...
187
[docs] 188 @abstractmethod 189 def to_dict(self) -> dict[str, Any]: ...
190
[docs] 191 @staticmethod 192 @abstractmethod 193 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Successor: ...
194
[docs] 195 class EdgeLike(Successor): 196 source: KCFG.Node 197 target: KCFG.Node 198 199 @property 200 def targets(self) -> tuple[KCFG.Node, ...]: 201 return (self.target,)
202
[docs] 203 @final 204 @dataclass(frozen=True) 205 class Edge(EdgeLike): 206 source: KCFG.Node 207 target: KCFG.Node 208 depth: int 209 rules: tuple[str, ...] 210
[docs] 211 def to_dict(self) -> dict[str, Any]: 212 return { 213 'source': self.source.id, 214 'target': self.target.id, 215 'depth': self.depth, 216 'rules': list(self.rules), 217 }
218
[docs] 219 @staticmethod 220 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Edge: 221 return KCFG.Edge(nodes[dct['source']], nodes[dct['target']], dct['depth'], tuple(dct['rules']))
222
[docs] 223 def to_rule( 224 self, 225 label: str, 226 claim: bool = False, 227 priority: int | None = None, 228 defunc_with: KDefinition | None = None, 229 minimize: bool = False, 230 ) -> KRuleLike: 231 def is_ceil_condition(kast: KInner) -> bool: 232 return type(kast) is KApply and kast.label.name == '#Ceil' 233 234 def _simplify_config(config: KInner) -> KInner: 235 return sort_ac_collections(inline_cell_maps(config)) 236 237 sentence_id = f'{label}-{self.source.id}-TO-{self.target.id}' 238 init_constraints = [c for c in self.source.cterm.constraints if not is_ceil_condition(c)] 239 init_cterm = CTerm(_simplify_config(self.source.cterm.config), init_constraints) 240 target_constraints = [c for c in self.target.cterm.constraints if not is_ceil_condition(c)] 241 target_cterm = CTerm(_simplify_config(self.target.cterm.config), target_constraints) 242 rule: KRuleLike 243 if claim: 244 rule, _ = cterm_build_claim(sentence_id, init_cterm, target_cterm) 245 else: 246 rule, _ = cterm_build_rule( 247 sentence_id, init_cterm, target_cterm, priority=priority, defunc_with=defunc_with 248 ) 249 if minimize: 250 rule = minimize_rule_like(rule) 251 return rule
252
[docs] 253 def replace_source(self, node: KCFG.Node) -> KCFG.Edge: 254 assert node.id == self.source.id 255 return KCFG.Edge(node, self.target, self.depth, self.rules)
256
[docs] 257 def replace_target(self, node: KCFG.Node) -> KCFG.Edge: 258 assert node.id == self.target.id 259 return KCFG.Edge(self.source, node, self.depth, self.rules)
260
[docs] 261 @final 262 @dataclass(frozen=True) 263 class MergedEdge(EdgeLike): 264 """Merged edge is a collection of edges that have been merged into a single edge.""" 265 266 source: KCFG.Node 267 target: KCFG.Node 268 edges: tuple[KCFG.Edge, ...] 269
[docs] 270 def to_dict(self) -> dict[str, Any]: 271 return { 272 'source': self.source.id, 273 'target': self.target.id, 274 'edges': [edge.to_dict() for edge in self.edges], 275 }
276
[docs] 277 @staticmethod 278 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Successor: 279 return KCFG.MergedEdge( 280 nodes[dct['source']], 281 nodes[dct['target']], 282 tuple(KCFG.Edge.from_dict(edge, nodes) for edge in dct['edges']), 283 )
284
[docs] 285 def replace_source(self, node: KCFG.Node) -> KCFG.Successor: 286 assert node.id == self.source.id 287 return KCFG.MergedEdge(node, self.target, self.edges)
288
[docs] 289 def replace_target(self, node: KCFG.Node) -> KCFG.Successor: 290 assert node.id == self.target.id 291 return KCFG.MergedEdge(self.source, node, self.edges)
292
[docs] 293 def to_rule(self, label: str, claim: bool = False, priority: int | None = None) -> KRuleLike: 294 return KCFG.Edge(self.source, self.target, 1, ()).to_rule(label, claim, priority)
295
[docs] 296 @final 297 @dataclass(frozen=True) 298 class Cover(EdgeLike): 299 source: KCFG.Node 300 target: KCFG.Node 301 csubst: CSubst 302
[docs] 303 def to_dict(self) -> dict[str, Any]: 304 return { 305 'source': self.source.id, 306 'target': self.target.id, 307 'csubst': self.csubst.to_dict(), 308 }
309
[docs] 310 @staticmethod 311 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Cover: 312 return KCFG.Cover(nodes[dct['source']], nodes[dct['target']], CSubst.from_dict(dct['csubst']))
313
[docs] 314 def replace_source(self, node: KCFG.Node) -> KCFG.Cover: 315 assert node.id == self.source.id 316 return KCFG.Cover(node, self.target, self.csubst)
317
[docs] 318 def replace_target(self, node: KCFG.Node) -> KCFG.Cover: 319 assert node.id == self.target.id 320 return KCFG.Cover(self.source, node, self.csubst)
321
[docs] 322 @dataclass(frozen=True) 323 class MultiEdge(Successor): 324 source: KCFG.Node 325 326 def __lt__(self, other: Any) -> bool: 327 if not type(other) is type(self): 328 return NotImplemented 329 return (self.source, self.target_ids) < (other.source, other.target_ids) 330
[docs] 331 @abstractmethod 332 def with_single_target(self, target: KCFG.Node) -> KCFG.MultiEdge: ...
333
[docs] 334 @final 335 @dataclass(frozen=True) 336 class Split(MultiEdge): 337 source: KCFG.Node 338 _targets: tuple[tuple[KCFG.Node, CSubst], ...] 339 340 def __init__(self, source: KCFG.Node, _targets: Iterable[tuple[KCFG.Node, CSubst]]) -> None: 341 object.__setattr__(self, 'source', source) 342 object.__setattr__(self, '_targets', tuple(_targets)) 343 344 @property 345 def targets(self) -> tuple[KCFG.Node, ...]: 346 return tuple(target for target, _ in self._targets) 347 348 @property 349 def splits(self) -> dict[int, CSubst]: 350 return {target.id: csubst for target, csubst in self._targets} 351
[docs] 352 def to_dict(self) -> dict[str, Any]: 353 return { 354 'source': self.source.id, 355 'targets': [ 356 { 357 'target': target.id, 358 'csubst': csubst.to_dict(), 359 } 360 for target, csubst in self._targets 361 ], 362 }
363
[docs] 364 @staticmethod 365 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Split: 366 _targets = [(nodes[target['target']], CSubst.from_dict(target['csubst'])) for target in dct['targets']] 367 return KCFG.Split(nodes[dct['source']], tuple(_targets))
368
[docs] 369 def with_single_target(self, target: KCFG.Node) -> KCFG.Split: 370 return KCFG.Split(self.source, ((target, self.splits[target.id]),))
371 372 @property 373 def covers(self) -> tuple[KCFG.Cover, ...]: 374 return tuple(KCFG.Cover(target, self.source, csubst) for target, csubst in self._targets) 375
[docs] 376 def replace_source(self, node: KCFG.Node) -> KCFG.Split: 377 assert node.id == self.source.id 378 return KCFG.Split(node, self._targets)
379
[docs] 380 def replace_target(self, node: KCFG.Node) -> KCFG.Split: 381 assert node.id in self.target_ids 382 new_targets = [ 383 (node, csubst) if node.id == target_node.id else (target_node, csubst) 384 for target_node, csubst in self._targets 385 ] 386 return KCFG.Split(self.source, tuple(new_targets))
387
[docs] 388 @final 389 @dataclass(frozen=True) 390 class NDBranch(MultiEdge): 391 source: KCFG.Node 392 _targets: tuple[KCFG.Node, ...] 393 rules: tuple[str, ...] 394 395 def __init__(self, source: KCFG.Node, _targets: Iterable[KCFG.Node], rules: tuple[str, ...]) -> None: 396 object.__setattr__(self, 'source', source) 397 object.__setattr__(self, '_targets', tuple(_targets)) 398 object.__setattr__(self, 'rules', rules) 399 400 @property 401 def targets(self) -> tuple[KCFG.Node, ...]: 402 return self._targets 403
[docs] 404 def to_dict(self) -> dict[str, Any]: 405 return { 406 'source': self.source.id, 407 'targets': [target.id for target in self.targets], 408 'rules': list(self.rules), 409 }
410
[docs] 411 @staticmethod 412 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.NDBranch: 413 return KCFG.NDBranch( 414 nodes[dct['source']], tuple([nodes[target] for target in dct['targets']]), tuple(dct['rules']) 415 )
416
[docs] 417 def with_single_target(self, target: KCFG.Node) -> KCFG.NDBranch: 418 return KCFG.NDBranch(self.source, (target,), ())
419 420 @property 421 def edges(self) -> tuple[KCFG.Edge, ...]: 422 return tuple(KCFG.Edge(self.source, target, 1, ()) for target in self.targets) 423
[docs] 424 def replace_source(self, node: KCFG.Node) -> KCFG.NDBranch: 425 assert node.id == self.source.id 426 return KCFG.NDBranch(node, self._targets, self.rules)
427
[docs] 428 def replace_target(self, node: KCFG.Node) -> KCFG.NDBranch: 429 assert node.id in self.target_ids 430 new_targets = [node if node.id == target_node.id else target_node for target_node in self._targets] 431 return KCFG.NDBranch(self.source, tuple(new_targets), self.rules)
432 433 _node_id: int 434 _nodes: MutableMapping[int, KCFG.Node] 435 436 _created_nodes: set[int] 437 _deleted_nodes: set[int] 438 439 _edges: dict[int, Edge] 440 _merged_edges: dict[int, MergedEdge] 441 _covers: dict[int, Cover] 442 _splits: dict[int, Split] 443 _ndbranches: dict[int, NDBranch] 444 _aliases: dict[str, int] 445 _lock: RLock 446 447 _kcfg_store: KCFGStore | None 448 449 def __init__(self, cfg_dir: Path | None = None, optimize_memory: bool = True) -> None: 450 self._node_id = 1 451 if optimize_memory: 452 from .store import OptimizedNodeStore 453 454 self._nodes = OptimizedNodeStore() 455 else: 456 self._nodes = {} 457 self._created_nodes = set() 458 self._deleted_nodes = set() 459 self._edges = {} 460 self._merged_edges = {} 461 self._covers = {} 462 self._splits = {} 463 self._ndbranches = {} 464 self._aliases = {} 465 self._lock = RLock() 466 if cfg_dir is not None: 467 self._kcfg_store = KCFGStore(cfg_dir) 468 469 def __contains__(self, item: object) -> bool: 470 if type(item) is KCFG.Node: 471 return self.contains_node(item) 472 if type(item) is KCFG.Edge: 473 return self.contains_edge(item) 474 if type(item) is KCFG.MergedEdge: 475 return self.contains_merged_edge(item) 476 if type(item) is KCFG.Cover: 477 return self.contains_cover(item) 478 if type(item) is KCFG.Split: 479 return self.contains_split(item) 480 if type(item) is KCFG.NDBranch: 481 return self.contains_ndbranch(item) 482 return False 483 484 def __enter__(self) -> KCFG: 485 self._lock.acquire() 486 return self 487 488 def __exit__( 489 self, 490 exc_type: type[BaseException] | None, 491 exc_value: BaseException | None, 492 traceback: TracebackType | None, 493 ) -> bool: 494 self._lock.release() 495 if exc_type is None: 496 return True 497 return False 498 499 @property 500 def nodes(self) -> list[KCFG.Node]: 501 return list(self._nodes.values()) 502 503 @property 504 def root(self) -> list[KCFG.Node]: 505 return [node for node in self.nodes if self.is_root(node.id)] 506 507 @property 508 def vacuous(self) -> list[KCFG.Node]: 509 return [node for node in self.nodes if self.is_vacuous(node.id)] 510 511 @property 512 def stuck(self) -> list[KCFG.Node]: 513 return [node for node in self.nodes if self.is_stuck(node.id)] 514 515 @property 516 def leaves(self) -> list[KCFG.Node]: 517 return [node for node in self.nodes if self.is_leaf(node.id)] 518 519 @property 520 def covered(self) -> list[KCFG.Node]: 521 return [node for node in self.nodes if self.is_covered(node.id)] 522 523 @property 524 def uncovered(self) -> list[KCFG.Node]: 525 return [node for node in self.nodes if not self.is_covered(node.id)] 526
[docs] 527 @staticmethod 528 def from_claim( 529 defn: KDefinition, claim: KClaim, cfg_dir: Path | None = None, optimize_memory: bool = True 530 ) -> tuple[KCFG, NodeIdLike, NodeIdLike]: 531 cfg = KCFG(cfg_dir=cfg_dir, optimize_memory=optimize_memory) 532 claim_body = claim.body 533 claim_body = defn.instantiate_cell_vars(claim_body) 534 claim_body = rename_generated_vars(claim_body) 535 536 claim_lhs = CTerm.from_kast(extract_lhs(claim_body)).add_constraint(bool_to_ml_pred(claim.requires)) 537 init_node = cfg.create_node(claim_lhs) 538 539 claim_rhs = CTerm.from_kast(extract_rhs(claim_body)).add_constraint( 540 bool_to_ml_pred(andBool([claim.requires, claim.ensures])) 541 ) 542 target_node = cfg.create_node(claim_rhs) 543 544 return cfg, init_node.id, target_node.id
545
[docs] 546 @staticmethod 547 def path_length(_path: Iterable[KCFG.Successor]) -> int: 548 _path = tuple(_path) 549 if len(_path) == 0: 550 return 0 551 if type(_path[0]) is KCFG.Split or type(_path[0]) is KCFG.Cover: 552 return KCFG.path_length(_path[1:]) 553 elif type(_path[0]) is KCFG.NDBranch: 554 return 1 + KCFG.path_length(_path[1:]) 555 elif type(_path[0]) is KCFG.Edge: 556 return _path[0].depth + KCFG.path_length(_path[1:]) 557 elif type(_path[0]) is KCFG.MergedEdge: 558 return min(edge.depth for edge in _path[0].edges) + KCFG.path_length(_path[1:]) # todo: check this 559 raise ValueError(f'Cannot handle Successor type: {type(_path[0])}')
560
[docs] 561 def extend( 562 self, 563 extend_result: KCFGExtendResult, 564 node: KCFG.Node, 565 logs: dict[int, tuple[LogEntry, ...]], 566 ) -> None: 567 568 def log(message: str, *, warning: bool = False) -> None: 569 result_info = extend_result.info if type(extend_result) is Step or type(extend_result) is Branch else '' 570 result_info_message = f': {result_info}' if result_info else '' 571 _LOGGER.log( 572 logging.WARNING if warning else logging.INFO, 573 f'Extending current KCFG with the following: {message}{result_info_message}', 574 ) 575 576 match extend_result: 577 case Vacuous(): 578 self.add_vacuous(node.id) 579 log(f'vacuous node: {node.id}', warning=True) 580 581 case Stuck(): 582 self.add_stuck(node.id) 583 log(f'stuck node: {node.id}') 584 585 case Abstract(cterm): 586 new_node = self.create_node(cterm) 587 self.create_cover(node.id, new_node.id) 588 log(f'abstraction node: {node.id}') 589 590 case Step(cterm, depth, next_node_logs, rule_labels, _): 591 next_node = self.create_node(cterm) 592 logs[next_node.id] = next_node_logs 593 self.create_edge(node.id, next_node.id, depth, rules=rule_labels) 594 log(f'basic block at depth {depth}: {node.id} --> {next_node.id}') 595 596 case Branch(branches, _): 597 branch_node_ids = self.split_on_constraints(node.id, branches) 598 log(f'{len(branches)} branches: {node.id} --> {branch_node_ids}') 599 600 case NDBranch(cterms, next_node_logs, rule_labels): 601 next_ids = [self.create_node(cterm).id for cterm in cterms] 602 for i in next_ids: 603 logs[i] = next_node_logs 604 self.create_ndbranch(node.id, next_ids, rules=rule_labels) 605 log(f'{len(cterms)} non-deterministic branches: {node.id} --> {next_ids}') 606 607 case _: 608 raise AssertionError()
609
[docs] 610 def to_dict_no_nodes(self) -> dict[str, Any]: 611 nodes = list(self._nodes.keys()) 612 edges = [edge.to_dict() for edge in self.edges()] 613 covers = [cover.to_dict() for cover in self.covers()] 614 splits = [split.to_dict() for split in self.splits()] 615 ndbranches = [ndbranch.to_dict() for ndbranch in self.ndbranches()] 616 617 aliases = dict(sorted(self._aliases.items())) 618 619 res = { 620 'next': self._node_id, 621 'nodes': nodes, 622 'edges': edges, 623 'covers': covers, 624 'splits': splits, 625 'ndbranches': ndbranches, 626 'aliases': aliases, 627 } 628 return {k: v for k, v in res.items() if v}
629
[docs] 630 def to_dict(self) -> dict[str, Any]: 631 nodes = [node.to_dict() for node in self.nodes] 632 edges = [edge.to_dict() for edge in self.edges()] 633 merged_edges = [merged_edge.to_dict() for merged_edge in self.merged_edges()] 634 covers = [cover.to_dict() for cover in self.covers()] 635 splits = [split.to_dict() for split in self.splits()] 636 ndbranches = [ndbranch.to_dict() for ndbranch in self.ndbranches()] 637 638 aliases = dict(sorted(self._aliases.items())) 639 640 res = { 641 'next': self._node_id, 642 'nodes': nodes, 643 'edges': edges, 644 'merged_edges': merged_edges, 645 'covers': covers, 646 'splits': splits, 647 'ndbranches': ndbranches, 648 'aliases': aliases, 649 } 650 return {k: v for k, v in res.items() if v}
651
[docs] 652 @staticmethod 653 def from_dict(dct: Mapping[str, Any], optimize_memory: bool = True) -> KCFG: 654 cfg = KCFG(optimize_memory=optimize_memory) 655 656 for node_dict in dct.get('nodes') or []: 657 node = KCFG.Node.from_dict(node_dict) 658 cfg.add_node(node) 659 660 max_id = max([node.id for node in cfg.nodes], default=0) 661 cfg._node_id = dct.get('next', max_id + 1) 662 663 for edge_dict in dct.get('edges') or []: 664 edge = KCFG.Edge.from_dict(edge_dict, cfg._nodes) 665 cfg.add_successor(edge) 666 667 for edge_dict in dct.get('merged_edges') or []: 668 merged_edge = KCFG.MergedEdge.from_dict(edge_dict, cfg._nodes) 669 cfg.add_successor(merged_edge) 670 671 for cover_dict in dct.get('covers') or []: 672 cover = KCFG.Cover.from_dict(cover_dict, cfg._nodes) 673 cfg.add_successor(cover) 674 675 for split_dict in dct.get('splits') or []: 676 split = KCFG.Split.from_dict(split_dict, cfg._nodes) 677 cfg.add_successor(split) 678 679 for ndbranch_dict in dct.get('ndbranches') or []: 680 ndbranch = KCFG.NDBranch.from_dict(ndbranch_dict, cfg._nodes) 681 cfg.add_successor(ndbranch) 682 683 for alias, node_id in dct.get('aliases', {}).items(): 684 cfg.add_alias(alias=alias, node_id=node_id) 685 686 return cfg
687
[docs] 688 def aliases(self, node_id: NodeIdLike) -> list[str]: 689 node_id = self._resolve(node_id) 690 return [alias for alias, value in self._aliases.items() if node_id == value]
691
[docs] 692 def to_json(self) -> str: 693 return json.dumps(self.to_dict(), sort_keys=True)
694
[docs] 695 @staticmethod 696 def from_json(s: str, optimize_memory: bool = True) -> KCFG: 697 return KCFG.from_dict(json.loads(s), optimize_memory=optimize_memory)
698
[docs] 699 def to_rules(self, _id: str | None = None, priority: int = 20) -> list[KRuleLike]: 700 _id = 'BASIC-BLOCK' if _id is None else _id 701 return [e.to_rule(_id, priority=priority) for e in self.edges()] + [ 702 m.to_rule(_id, priority=priority) for m in self.merged_edges() 703 ]
704
[docs] 705 def to_module( 706 self, 707 module_name: str | None = None, 708 imports: Iterable[KImport] = (), 709 priority: int = 20, 710 att: KAtt = EMPTY_ATT, 711 ) -> KFlatModule: 712 module_name = 'KCFG' if module_name is None else module_name 713 return KFlatModule(module_name, self.to_rules(priority=priority), imports=imports, att=att)
714 715 def _resolve_or_none(self, id_like: NodeIdLike) -> int | None: 716 if type(id_like) is int: 717 if id_like in self._nodes: 718 return id_like 719 720 return None 721 722 if type(id_like) is not str: 723 raise TypeError(f'Expected int or str for id_like, got: {id_like}') 724 725 if id_like.startswith('@'): 726 if id_like[1:] in self._aliases: 727 return self._aliases[id_like[1:]] 728 raise ValueError(f'Unknown alias: {id_like}') 729 730 return None 731 732 def _resolve(self, id_like: NodeIdLike) -> int: 733 match = self._resolve_or_none(id_like) 734 if not match: 735 raise ValueError(f'Unknown node: {id_like}') 736 return match 737
[docs] 738 def node(self, node_id: NodeIdLike) -> KCFG.Node: 739 node_id = self._resolve(node_id) 740 return self._nodes[node_id]
741
[docs] 742 def get_node(self, node_id: NodeIdLike) -> KCFG.Node | None: 743 resolved_id = self._resolve_or_none(node_id) 744 if resolved_id is None: 745 return None 746 return self._nodes[resolved_id]
747
[docs] 748 def contains_node(self, node: KCFG.Node) -> bool: 749 return bool(self.get_node(node.id))
750
[docs] 751 def add_node(self, node: KCFG.Node) -> None: 752 if node.id in self._nodes: 753 raise ValueError(f'Node with id already exists: {node.id}') 754 self._nodes[node.id] = node 755 self._created_nodes.add(node.id)
756
[docs] 757 def create_node(self, cterm: CTerm) -> KCFG.Node: 758 node = KCFG.Node(self._node_id, cterm) 759 self._node_id += 1 760 self._nodes[node.id] = node 761 self._created_nodes.add(node.id) 762 return node
763
[docs] 764 def remove_node(self, node_id: NodeIdLike) -> None: 765 node_id = self._resolve(node_id) 766 767 node = self._nodes.pop(node_id) 768 self._created_nodes.discard(node_id) 769 self._deleted_nodes.add(node.id) 770 771 self._edges = {k: s for k, s in self._edges.items() if k != node_id and node_id not in s.target_ids} 772 self._merged_edges = { 773 k: s for k, s in self._merged_edges.items() if k != node_id and node_id not in s.target_ids 774 } 775 self._covers = {k: s for k, s in self._covers.items() if k != node_id and node_id not in s.target_ids} 776 777 self._splits = {k: s for k, s in self._splits.items() if k != node_id and node_id not in s.target_ids} 778 self._ndbranches = {k: b for k, b in self._ndbranches.items() if k != node_id and node_id not in b.target_ids} 779 780 for alias in [alias for alias, id in self._aliases.items() if id == node_id]: 781 self.remove_alias(alias)
782 783 def _update_refs(self, node_id: int) -> None: 784 node = self.node(node_id) 785 for succ in self.successors(node_id): 786 new_succ = succ.replace_source(node) 787 if type(new_succ) is KCFG.Edge: 788 self._edges[new_succ.source.id] = new_succ 789 if type(new_succ) is KCFG.MergedEdge: 790 self._merged_edges[new_succ.source.id] = new_succ 791 if type(new_succ) is KCFG.Cover: 792 self._covers[new_succ.source.id] = new_succ 793 if type(new_succ) is KCFG.Split: 794 self._splits[new_succ.source.id] = new_succ 795 if type(new_succ) is KCFG.NDBranch: 796 self._ndbranches[new_succ.source.id] = new_succ 797 798 for pred in self.predecessors(node_id): 799 new_pred = pred.replace_target(node) 800 if type(new_pred) is KCFG.Edge: 801 self._edges[new_pred.source.id] = new_pred 802 if type(new_pred) is KCFG.MergedEdge: 803 self._merged_edges[new_pred.source.id] = new_pred 804 if type(new_pred) is KCFG.Cover: 805 self._covers[new_pred.source.id] = new_pred 806 if type(new_pred) is KCFG.Split: 807 self._splits[new_pred.source.id] = new_pred 808 if type(new_pred) is KCFG.NDBranch: 809 self._ndbranches[new_pred.source.id] = new_pred 810
[docs] 811 def remove_attr(self, node_id: NodeIdLike, attr: NodeAttr) -> None: 812 node = self.node(node_id) 813 new_node = node.remove_attr(attr) 814 self.replace_node(new_node)
815
[docs] 816 def discard_attr(self, node_id: NodeIdLike, attr: NodeAttr) -> None: 817 node = self.node(node_id) 818 new_node = node.discard_attr(attr) 819 self.replace_node(new_node)
820
[docs] 821 def add_attr(self, node_id: NodeIdLike, attr: NodeAttr) -> None: 822 node = self.node(node_id) 823 new_node = node.add_attr(attr) 824 self.replace_node(new_node)
825
[docs] 826 def let_node( 827 self, node_id: NodeIdLike, cterm: CTerm | None = None, attrs: Iterable[KCFGNodeAttr] | None = None 828 ) -> None: 829 node = self.node(node_id) 830 new_node = node.let(cterm=cterm, attrs=attrs) 831 self.replace_node(new_node)
832
[docs] 833 def replace_node(self, node: KCFG.Node) -> None: 834 self._nodes[node.id] = node 835 self._created_nodes.add(node.id) 836 self._update_refs(node.id)
837
[docs] 838 def successors(self, source_id: NodeIdLike) -> list[Successor]: 839 out_edges: Iterable[KCFG.Successor] = self.edges(source_id=source_id) 840 out_merged_edges: Iterable[KCFG.Successor] = self.merged_edges(source_id=source_id) 841 out_covers: Iterable[KCFG.Successor] = self.covers(source_id=source_id) 842 out_splits: Iterable[KCFG.Successor] = self.splits(source_id=source_id) 843 out_ndbranches: Iterable[KCFG.Successor] = self.ndbranches(source_id=source_id) 844 return list(out_edges) + list(out_merged_edges) + list(out_covers) + list(out_splits) + list(out_ndbranches)
845
[docs] 846 def predecessors(self, target_id: NodeIdLike) -> list[Successor]: 847 in_edges: Iterable[KCFG.Successor] = self.edges(target_id=target_id) 848 in_merged_edges: Iterable[KCFG.Successor] = self.merged_edges(target_id=target_id) 849 in_covers: Iterable[KCFG.Successor] = self.covers(target_id=target_id) 850 in_splits: Iterable[KCFG.Successor] = self.splits(target_id=target_id) 851 in_ndbranches: Iterable[KCFG.Successor] = self.ndbranches(target_id=target_id) 852 return list(in_edges) + list(in_merged_edges) + list(in_covers) + list(in_splits) + list(in_ndbranches)
853 854 def _check_no_successors(self, source_id: NodeIdLike) -> None: 855 if len(self.successors(source_id)) > 0: 856 raise ValueError(f'Node already has successors: {source_id} -> {self.successors(source_id)}') 857 858 def _check_no_zero_loops(self, source_id: NodeIdLike, target_ids: Iterable[NodeIdLike]) -> None: 859 for target_id in target_ids: 860 path = self.shortest_path_between(target_id, source_id) 861 if path is not None and KCFG.path_length(path) == 0: 862 raise ValueError( 863 f'Adding successor would create zero-length loop with backedge: {source_id} -> {target_id}' 864 ) 865
[docs] 866 def add_successor(self, succ: KCFG.Successor) -> None: 867 self._check_no_successors(succ.source.id) 868 self._check_no_zero_loops(succ.source.id, succ.target_ids) 869 if type(succ) is KCFG.Edge: 870 self._edges[succ.source.id] = succ 871 elif type(succ) is KCFG.MergedEdge: 872 self._merged_edges[succ.source.id] = succ 873 elif type(succ) is KCFG.Cover: 874 self._covers[succ.source.id] = succ 875 else: 876 if len(succ.target_ids) <= 1: 877 raise ValueError( 878 f'Cannot create {type(succ)} node with less than 2 targets: {succ.source.id} -> {succ.target_ids}' 879 ) 880 if type(succ) is KCFG.Split: 881 self._splits[succ.source.id] = succ 882 elif type(succ) is KCFG.NDBranch: 883 self._ndbranches[succ.source.id] = succ
884
[docs] 885 def edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> Edge | None: 886 source_id = self._resolve(source_id) 887 target_id = self._resolve(target_id) 888 edge = self._edges.get(source_id, None) 889 return edge if edge is not None and edge.target.id == target_id else None
890
[docs] 891 def edges(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[Edge]: 892 source_id = self._resolve(source_id) if source_id is not None else None 893 target_id = self._resolve(target_id) if target_id is not None else None 894 return [ 895 edge 896 for edge in self._edges.values() 897 if (source_id is None or source_id == edge.source.id) and (target_id is None or target_id == edge.target.id) 898 ]
899
[docs] 900 def contains_edge(self, edge: Edge) -> bool: 901 if other := self.edge(source_id=edge.source.id, target_id=edge.target.id): 902 return edge == other 903 return False
904
[docs] 905 def create_edge(self, source_id: NodeIdLike, target_id: NodeIdLike, depth: int, rules: Iterable[str] = ()) -> Edge: 906 if depth <= 0: 907 raise ValueError(f'Cannot build KCFG Edge with non-positive depth: {depth}') 908 source = self.node(source_id) 909 target = self.node(target_id) 910 edge = KCFG.Edge(source, target, depth, tuple(rules)) 911 self.add_successor(edge) 912 return edge
913
[docs] 914 def remove_edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> None: 915 source_id = self._resolve(source_id) 916 target_id = self._resolve(target_id) 917 edge = self.edge(source_id, target_id) 918 if not edge: 919 raise ValueError(f'Edge does not exist: {source_id} -> {target_id}') 920 self._edges.pop(source_id)
921
[docs] 922 def merged_edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> MergedEdge | None: 923 source_id = self._resolve(source_id) 924 target_id = self._resolve(target_id) 925 merged_edge = self._merged_edges.get(source_id, None) 926 return merged_edge if merged_edge is not None and merged_edge.target.id == target_id else None
927
[docs] 928 def merged_edges( 929 self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None 930 ) -> list[MergedEdge]: 931 source_id = self._resolve(source_id) if source_id is not None else None 932 target_id = self._resolve(target_id) if target_id is not None else None 933 return [ 934 merged_edge 935 for merged_edge in self._merged_edges.values() 936 if (source_id is None or source_id == merged_edge.source.id) 937 and (target_id is None or target_id == merged_edge.target.id) 938 ]
939
[docs] 940 def contains_merged_edge(self, edge: MergedEdge) -> bool: 941 if other := self.merged_edge(source_id=edge.source.id, target_id=edge.target.id): 942 return edge == other 943 return False
944
[docs] 945 def create_merged_edge(self, source_id: NodeIdLike, target_id: NodeIdLike, edges: Iterable[Edge]) -> MergedEdge: 946 if len(list(edges)) == 0: 947 raise ValueError(f'Cannot build KCFG MergedEdge with no edges: {edges}') 948 source = self.node(source_id) 949 target = self.node(target_id) 950 merged_edge = KCFG.MergedEdge(source, target, tuple(edges)) 951 self.add_successor(merged_edge) 952 return merged_edge
953
[docs] 954 def remove_merged_edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> None: 955 source_id = self._resolve(source_id) 956 target_id = self._resolve(target_id) 957 merged_edge = self.merged_edge(source_id, target_id) 958 if not merged_edge: 959 raise ValueError(f'MergedEdge does not exist: {source_id} -> {target_id}') 960 self._merged_edges.pop(source_id)
961
[docs] 962 def cover(self, source_id: NodeIdLike, target_id: NodeIdLike) -> Cover | None: 963 source_id = self._resolve(source_id) 964 target_id = self._resolve(target_id) 965 cover = self._covers.get(source_id, None) 966 return cover if cover is not None and cover.target.id == target_id else None
967
[docs] 968 def covers(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[Cover]: 969 source_id = self._resolve(source_id) if source_id is not None else None 970 target_id = self._resolve(target_id) if target_id is not None else None 971 return [ 972 cover 973 for cover in self._covers.values() 974 if (source_id is None or source_id == cover.source.id) 975 and (target_id is None or target_id == cover.target.id) 976 ]
977
[docs] 978 def contains_cover(self, cover: Cover) -> bool: 979 if other := self.cover(source_id=cover.source.id, target_id=cover.target.id): 980 return cover == other 981 return False
982
[docs] 983 def create_cover(self, source_id: NodeIdLike, target_id: NodeIdLike, csubst: CSubst | None = None) -> Cover: 984 source = self.node(source_id) 985 target = self.node(target_id) 986 if csubst is None: 987 csubst = target.cterm.match_with_constraint(source.cterm) 988 if csubst is None: 989 raise ValueError(f'No matching between: {source.id} and {target.id}') 990 cover = KCFG.Cover(source, target, csubst=csubst) 991 self.add_successor(cover) 992 return cover
993
[docs] 994 def remove_cover(self, source_id: NodeIdLike, target_id: NodeIdLike) -> None: 995 source_id = self._resolve(source_id) 996 target_id = self._resolve(target_id) 997 cover = self.cover(source_id, target_id) 998 if not cover: 999 raise ValueError(f'Cover does not exist: {source_id} -> {target_id}') 1000 self._covers.pop(source_id)
1001
[docs] 1002 def edge_likes(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[EdgeLike]: 1003 return ( 1004 cast('List[KCFG.EdgeLike]', self.edges(source_id=source_id, target_id=target_id)) 1005 + cast('List[KCFG.EdgeLike]', self.covers(source_id=source_id, target_id=target_id)) 1006 + cast('List[KCFG.EdgeLike]', self.merged_edges(source_id=source_id, target_id=target_id)) 1007 )
1008
[docs] 1009 def add_vacuous(self, node_id: NodeIdLike) -> None: 1010 self.add_attr(node_id, KCFGNodeAttr.VACUOUS)
1011
[docs] 1012 def remove_vacuous(self, node_id: NodeIdLike) -> None: 1013 self.remove_attr(node_id, KCFGNodeAttr.VACUOUS)
1014
[docs] 1015 def discard_vacuous(self, node_id: NodeIdLike) -> None: 1016 self.discard_attr(node_id, KCFGNodeAttr.VACUOUS)
1017
[docs] 1018 def add_stuck(self, node_id: NodeIdLike) -> None: 1019 self.add_attr(node_id, KCFGNodeAttr.STUCK)
1020
[docs] 1021 def remove_stuck(self, node_id: NodeIdLike) -> None: 1022 self.remove_attr(node_id, KCFGNodeAttr.STUCK)
1023
[docs] 1024 def discard_stuck(self, node_id: NodeIdLike) -> None: 1025 self.discard_attr(node_id, KCFGNodeAttr.STUCK)
1026
[docs] 1027 def splits(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[Split]: 1028 source_id = self._resolve(source_id) if source_id is not None else None 1029 target_id = self._resolve(target_id) if target_id is not None else None 1030 return [ 1031 s 1032 for s in self._splits.values() 1033 if (source_id is None or source_id == s.source.id) and (target_id is None or target_id in s.target_ids) 1034 ]
1035
[docs] 1036 def contains_split(self, split: Split) -> bool: 1037 return split in self._splits.values()
1038
[docs] 1039 def create_split(self, source_id: NodeIdLike, splits: Iterable[tuple[NodeIdLike, CSubst]]) -> KCFG.Split: 1040 source_id = self._resolve(source_id) 1041 split = KCFG.Split(self.node(source_id), tuple((self.node(nid), csubst) for nid, csubst in list(splits))) 1042 self.add_successor(split) 1043 return split
1044
[docs] 1045 def create_split_by_nodes(self, source_id: NodeIdLike, target_ids: Iterable[NodeIdLike]) -> KCFG.Split | None: 1046 """Create a split without crafting a CSubst.""" 1047 source = self.node(source_id) 1048 targets = [self.node(nid) for nid in target_ids] 1049 try: 1050 csubsts = [not_none(cterm_match(source.cterm, target.cterm)) for target in targets] 1051 except ValueError: 1052 return None 1053 return self.create_split(source.id, zip(target_ids, csubsts, strict=True))
1054
[docs] 1055 def ndbranches(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[NDBranch]: 1056 source_id = self._resolve(source_id) if source_id is not None else None 1057 target_id = self._resolve(target_id) if target_id is not None else None 1058 return [ 1059 b 1060 for b in self._ndbranches.values() 1061 if (source_id is None or source_id == b.source.id) and (target_id is None or target_id in b.target_ids) 1062 ]
1063
[docs] 1064 def contains_ndbranch(self, ndbranch: NDBranch) -> bool: 1065 return ndbranch in self._ndbranches
1066
[docs] 1067 def create_ndbranch( 1068 self, source_id: NodeIdLike, ndbranches: Iterable[NodeIdLike], rules: Iterable[str] = () 1069 ) -> KCFG.NDBranch: 1070 source_id = self._resolve(source_id) 1071 ndbranch = KCFG.NDBranch(self.node(source_id), tuple(self.node(nid) for nid in list(ndbranches)), tuple(rules)) 1072 self.add_successor(ndbranch) 1073 return ndbranch
1074
[docs] 1075 def split_on_constraints(self, source_id: NodeIdLike, constraints: Iterable[KInner]) -> list[int]: 1076 source = self.node(source_id) 1077 branch_node_ids = [self.create_node(source.cterm.add_constraint(c)).id for c in constraints] 1078 csubsts = [not_none(source.cterm.match_with_constraint(self.node(id).cterm)) for id in branch_node_ids] 1079 csubsts = [ 1080 reduce(CSubst.add_constraint, flatten_label('#And', constraint), csubst) 1081 for (csubst, constraint) in zip(csubsts, constraints, strict=True) 1082 ] 1083 self.create_split(source.id, zip(branch_node_ids, csubsts, strict=True)) 1084 return branch_node_ids
1085
[docs] 1086 def add_alias(self, alias: str, node_id: NodeIdLike) -> None: 1087 if '@' in alias: 1088 raise ValueError('Alias may not contain "@"') 1089 if alias in self._aliases: 1090 raise ValueError(f'Duplicate alias: {alias}') 1091 node_id = self._resolve(node_id) 1092 self._aliases[alias] = node_id
1093
[docs] 1094 def remove_alias(self, alias: str) -> None: 1095 if alias not in self._aliases: 1096 raise ValueError(f'Alias does not exist: {alias}') 1097 self._aliases.pop(alias)
1098
[docs] 1099 def is_root(self, node_id: NodeIdLike) -> bool: 1100 node_id = self._resolve(node_id) 1101 return len(self.predecessors(node_id)) == 0
1102
[docs] 1103 def is_vacuous(self, node_id: NodeIdLike) -> bool: 1104 return KCFGNodeAttr.VACUOUS in self.node(node_id).attrs
1105
[docs] 1106 def is_stuck(self, node_id: NodeIdLike) -> bool: 1107 return KCFGNodeAttr.STUCK in self.node(node_id).attrs
1108
[docs] 1109 def is_split(self, node_id: NodeIdLike) -> bool: 1110 node_id = self._resolve(node_id) 1111 return node_id in self._splits
1112
[docs] 1113 def is_ndbranch(self, node_id: NodeIdLike) -> bool: 1114 node_id = self._resolve(node_id) 1115 return node_id in self._ndbranches
1116
[docs] 1117 def is_leaf(self, node_id: NodeIdLike) -> bool: 1118 return len(self.successors(node_id)) == 0
1119
[docs] 1120 def is_covered(self, node_id: NodeIdLike) -> bool: 1121 node_id = self._resolve(node_id) 1122 return node_id in self._covers
1123
[docs] 1124 def prune(self, node_id: NodeIdLike, keep_nodes: Iterable[NodeIdLike] = ()) -> list[int]: 1125 nodes = self.reachable_nodes(node_id) 1126 keep_nodes = [self._resolve(nid) for nid in keep_nodes] 1127 pruned_nodes: list[int] = [] 1128 for node in nodes: 1129 if node.id not in keep_nodes: 1130 self.remove_node(node.id) 1131 pruned_nodes.append(node.id) 1132 return pruned_nodes
1133
[docs] 1134 def shortest_path_between( 1135 self, source_node_id: NodeIdLike, target_node_id: NodeIdLike 1136 ) -> tuple[Successor, ...] | None: 1137 paths = self.paths_between(source_node_id, target_node_id) 1138 if len(paths) == 0: 1139 return None 1140 return sorted(paths, key=(lambda path: KCFG.path_length(path)))[0]
1141
[docs] 1142 def shortest_distance_between(self, node_1_id: NodeIdLike, node_2_id: NodeIdLike) -> int | None: 1143 path_1 = self.shortest_path_between(node_1_id, node_2_id) 1144 path_2 = self.shortest_path_between(node_2_id, node_1_id) 1145 distance: int | None = None 1146 if path_1 is not None: 1147 distance = KCFG.path_length(path_1) 1148 if path_2 is not None: 1149 distance_2 = KCFG.path_length(path_2) 1150 if distance is None or distance_2 < distance: 1151 distance = distance_2 1152 return distance
1153
[docs] 1154 def zero_depth_between(self, node_1_id: NodeIdLike, node_2_id: NodeIdLike) -> bool: 1155 _node_1_id = self._resolve(node_1_id) 1156 _node_2_id = self._resolve(node_2_id) 1157 if _node_1_id == _node_2_id: 1158 return True 1159 # Short-circuit and don't run pathing algorithm if there is no 0 length path on the first step. 1160 path_lengths = [ 1161 self.path_length([successor]) for successor in self.successors(_node_1_id) + self.successors(_node_2_id) 1162 ] 1163 if 0 not in path_lengths: 1164 return False 1165 1166 shortest_distance = self.shortest_distance_between(_node_1_id, _node_2_id) 1167 1168 return shortest_distance is not None and shortest_distance == 0
1169
[docs] 1170 def paths_between(self, source_id: NodeIdLike, target_id: NodeIdLike) -> list[tuple[Successor, ...]]: 1171 source_id = self._resolve(source_id) 1172 target_id = self._resolve(target_id) 1173 1174 if source_id == target_id: 1175 return [()] 1176 1177 source_successors = list(self.successors(source_id)) 1178 assert len(source_successors) <= 1 1179 if len(source_successors) == 0: 1180 return [] 1181 1182 paths: list[tuple[KCFG.Successor, ...]] = [] 1183 worklist: list[list[KCFG.Successor]] = [[source_successors[0]]] 1184 1185 def _in_path(_nid: int, _path: list[KCFG.Successor]) -> bool: 1186 for succ in _path: 1187 if _nid == succ.source.id: 1188 return True 1189 if len(_path) > 0: 1190 if isinstance(_path[-1], KCFG.EdgeLike) and _path[-1].target.id == _nid: 1191 return True 1192 elif isinstance(_path[-1], KCFG.MultiEdge) and _nid in _path[-1].target_ids: 1193 return True 1194 return False 1195 1196 while worklist: 1197 curr_path = worklist.pop() 1198 curr_successor = curr_path[-1] 1199 successors: list[KCFG.Successor] = [] 1200 1201 if isinstance(curr_successor, KCFG.EdgeLike): 1202 if curr_successor.target.id == target_id: 1203 paths.append(tuple(curr_path)) 1204 continue 1205 else: 1206 successors = list(self.successors(curr_successor.target.id)) 1207 1208 elif isinstance(curr_successor, KCFG.MultiEdge): 1209 if len(list(curr_successor.targets)) == 1: 1210 target = list(curr_successor.targets)[0] 1211 if target.id == target_id: 1212 paths.append(tuple(curr_path)) 1213 continue 1214 else: 1215 successors = list(self.successors(target.id)) 1216 if len(list(curr_successor.targets)) > 1: 1217 curr_path = curr_path[0:-1] 1218 successors = [curr_successor.with_single_target(target) for target in curr_successor.targets] 1219 1220 for successor in successors: 1221 if isinstance(successor, KCFG.EdgeLike) and not _in_path(successor.target.id, curr_path): 1222 worklist.append(curr_path + [successor]) 1223 elif isinstance(successor, KCFG.MultiEdge): 1224 if len(list(successor.targets)) == 1: 1225 target = list(successor.targets)[0] 1226 if not _in_path(target.id, curr_path): 1227 worklist.append(curr_path + [successor]) 1228 elif len(list(successor.targets)) > 1: 1229 worklist.append(curr_path + [successor]) 1230 1231 return paths
1232
[docs] 1233 def reachable_nodes(self, source_id: NodeIdLike, *, reverse: bool = False) -> set[KCFG.Node]: 1234 visited: set[KCFG.Node] = set() 1235 worklist: list[KCFG.Node] = [self.node(source_id)] 1236 1237 while worklist: 1238 node = worklist.pop() 1239 1240 if node in visited: 1241 continue 1242 1243 visited.add(node) 1244 1245 if not reverse: 1246 worklist.extend(target for succ in self.successors(source_id=node.id) for target in succ.targets) 1247 else: 1248 worklist.extend(succ.source for succ in self.predecessors(target_id=node.id)) 1249 1250 return visited
1251
[docs] 1252 def write_cfg_data(self) -> None: 1253 assert self._kcfg_store is not None 1254 self._kcfg_store.write_cfg_data( 1255 self, self.to_dict_no_nodes(), deleted_nodes=self._deleted_nodes, created_nodes=self._created_nodes 1256 ) 1257 self._deleted_nodes.clear() 1258 self._created_nodes.clear()
1259
[docs] 1260 @staticmethod 1261 def read_cfg_data(cfg_dir: Path) -> KCFG: 1262 store = KCFGStore(cfg_dir) 1263 cfg = KCFG.from_dict(store.read_cfg_data()) 1264 cfg._kcfg_store = store 1265 return cfg
1266
[docs] 1267 @staticmethod 1268 def read_node_data(cfg_dir: Path, node_id: int) -> KCFG.Node: 1269 store = KCFGStore(cfg_dir) 1270 return KCFG.Node.from_dict(store.read_node_data(node_id))
1271 1272
[docs] 1273class KCFGExtendResult(ABC): ...
1274 1275
[docs] 1276@final 1277@dataclass(frozen=True) 1278class Vacuous(KCFGExtendResult): ...
1279 1280
[docs] 1281@final 1282@dataclass(frozen=True) 1283class Stuck(KCFGExtendResult): ...
1284 1285
[docs] 1286@final 1287@dataclass(frozen=True) 1288class Abstract(KCFGExtendResult): 1289 cterm: CTerm
1290 1291
[docs] 1292@final 1293@dataclass(frozen=True) 1294class Step(KCFGExtendResult): 1295 cterm: CTerm 1296 depth: int 1297 logs: tuple[LogEntry, ...] 1298 rule_labels: list[str] 1299 cut: bool = field(default=False) 1300 info: str = field(default='')
1301 1302
[docs] 1303@final 1304@dataclass(frozen=True) 1305class Branch(KCFGExtendResult): 1306 constraints: tuple[KInner, ...] 1307 heuristic: bool 1308 info: str = field(default='') 1309 1310 def __init__(self, constraints: Iterable[KInner], *, heuristic: bool = False, info: str = ''): 1311 object.__setattr__(self, 'constraints', tuple(constraints)) 1312 object.__setattr__(self, 'heuristic', heuristic) 1313 object.__setattr__(self, 'info', info)
1314 1315
[docs] 1316@final 1317@dataclass(frozen=True) 1318class NDBranch(KCFGExtendResult): 1319 cterms: tuple[CTerm, ...] 1320 logs: tuple[LogEntry, ...] 1321 rule_labels: tuple[str, ...] 1322 1323 def __init__(self, cterms: Iterable[CTerm], logs: Iterable[LogEntry,], rule_labels: Iterable[str]): 1324 object.__setattr__(self, 'cterms', tuple(cterms)) 1325 object.__setattr__(self, 'logs', tuple(logs)) 1326 object.__setattr__(self, 'rule_labels', tuple(rule_labels))