1from __future__ import annotations
2
3import json
4import logging
5from abc import ABC, abstractmethod
6from collections.abc import Container
7from dataclasses import dataclass, field
8from threading import RLock
9from typing import TYPE_CHECKING, Final, List, Union, cast, final
10
11from ..cterm import CSubst, CTerm, cterm_build_claim, cterm_build_rule
12from ..kast import EMPTY_ATT
13from ..kast.inner import KApply
14from ..kast.manip import (
15 bool_to_ml_pred,
16 extract_lhs,
17 extract_rhs,
18 inline_cell_maps,
19 minimize_rule_like,
20 rename_generated_vars,
21 sort_ac_collections,
22)
23from ..kast.outer import KFlatModule
24from ..kast.prelude.kbool import andBool
25from ..utils import ensure_dir_path, not_none
26
27if TYPE_CHECKING:
28 from collections.abc import Iterable, Mapping, MutableMapping
29 from pathlib import Path
30 from types import TracebackType
31 from typing import Any
32
33 from ..kast import KAtt
34 from ..kast.inner import KInner
35 from ..kast.outer import KClaim, KDefinition, KImport, KRuleLike
36 from ..kore.rpc import LogEntry
37
38
39NodeIdLike = int | str
40
41_LOGGER: Final = logging.getLogger(__name__)
42
43
[docs]
44@dataclass(frozen=True)
45class NodeAttr:
46 value: str
47
48
[docs]
49class KCFGNodeAttr(NodeAttr):
50 VACUOUS = NodeAttr('vacuous')
51 STUCK = NodeAttr('stuck')
52
53
[docs]
54class KCFGStore:
55 store_path: Path
56
57 def __init__(self, store_path: Path) -> None:
58 self.store_path = store_path
59 ensure_dir_path(store_path)
60 ensure_dir_path(self.kcfg_node_dir)
61
62 @property
63 def kcfg_json_path(self) -> Path:
64 return self.store_path / 'kcfg.json'
65
66 @property
67 def kcfg_node_dir(self) -> Path:
68 return self.store_path / 'nodes'
69
[docs]
70 def kcfg_node_path(self, node_id: int) -> Path:
71 return self.kcfg_node_dir / f'{node_id}.json'
72
[docs]
73 def write_cfg_data(
74 self, kcfg: KCFG, dct: dict[str, Any], deleted_nodes: Iterable[int] = (), created_nodes: Iterable[int] = ()
75 ) -> None:
76 vacuous_nodes = [
77 node_id for node_id in kcfg._nodes.keys() if KCFGNodeAttr.VACUOUS in kcfg._nodes[node_id].attrs
78 ]
79 stuck_nodes = [node_id for node_id in kcfg._nodes.keys() if KCFGNodeAttr.STUCK in kcfg._nodes[node_id].attrs]
80 dct['vacuous'] = vacuous_nodes
81 dct['stuck'] = stuck_nodes
82 for node_id in deleted_nodes:
83 self.kcfg_node_path(node_id).unlink(missing_ok=True)
84 for node_id in created_nodes:
85 self.kcfg_node_path(node_id).write_text(json.dumps(kcfg._nodes[node_id].to_dict()))
86 self.kcfg_json_path.write_text(json.dumps(dct))
87
[docs]
88 def read_cfg_data(self) -> dict[str, Any]:
89 dct = json.loads(self.kcfg_json_path.read_text())
90 nodes = [self.read_node_data(node_id) for node_id in dct.get('nodes') or []]
91 dct['nodes'] = nodes
92
93 new_nodes = []
94 for node in dct['nodes']:
95 attrs = []
96 if node['id'] in dct['vacuous']:
97 attrs.append(KCFGNodeAttr.VACUOUS.value)
98 if node['id'] in dct['stuck']:
99 attrs.append(KCFGNodeAttr.STUCK.value)
100 new_nodes.append({'id': node['id'], 'cterm': node['cterm'], 'attrs': attrs})
101
102 dct['nodes'] = new_nodes
103
104 del dct['vacuous']
105 del dct['stuck']
106
107 return dct
108
[docs]
109 def read_node_data(self, node_id: int) -> dict[str, Any]:
110 return json.loads(self.kcfg_node_path(node_id).read_text())
111
112
[docs]
113class KCFG(Container[Union['KCFG.Node', 'KCFG.Successor']]):
[docs]
114 @final
115 @dataclass(frozen=True, order=True)
116 class Node:
117 id: int
118 cterm: CTerm
119 attrs: frozenset[NodeAttr]
120
121 def __init__(self, id: int, cterm: CTerm, attrs: Iterable[NodeAttr] = ()) -> None:
122 object.__setattr__(self, 'id', id)
123 object.__setattr__(self, 'cterm', cterm)
124 object.__setattr__(self, 'attrs', frozenset(attrs))
125
[docs]
126 def to_dict(self) -> dict[str, Any]:
127 return {'id': self.id, 'cterm': self.cterm.to_dict(), 'attrs': [attr.value for attr in self.attrs]}
128
[docs]
129 @staticmethod
130 def from_dict(dct: dict[str, Any]) -> KCFG.Node:
131 return KCFG.Node(dct['id'], CTerm.from_dict(dct['cterm']), [NodeAttr(attr) for attr in dct['attrs']])
132
[docs]
133 def add_attr(self, attr: NodeAttr) -> KCFG.Node:
134 return KCFG.Node(self.id, self.cterm, list(self.attrs) + [attr])
135
[docs]
136 def remove_attr(self, attr: NodeAttr) -> KCFG.Node:
137 if attr not in self.attrs:
138 raise ValueError(f'Node {self.id} does not have attribute {attr.value}')
139 return KCFG.Node(self.id, self.cterm, self.attrs.difference([attr]))
140
[docs]
141 def discard_attr(self, attr: NodeAttr) -> KCFG.Node:
142 return KCFG.Node(self.id, self.cterm, self.attrs.difference([attr]))
143
[docs]
144 def let(self, cterm: CTerm | None = None, attrs: Iterable[KCFGNodeAttr] | None = None) -> KCFG.Node:
145 new_cterm = cterm if cterm is not None else self.cterm
146 new_attrs = attrs if attrs is not None else self.attrs
147 return KCFG.Node(self.id, new_cterm, new_attrs)
148
149 @property
150 def free_vars(self) -> frozenset[str]:
151 return frozenset(self.cterm.free_vars)
152
[docs]
153 class Successor(ABC):
154 source: KCFG.Node
155
156 def __lt__(self, other: Any) -> bool:
157 if not isinstance(other, KCFG.Successor):
158 return NotImplemented
159 return self.source < other.source
160
161 @property
162 def source_vars(self) -> frozenset[str]:
163 return frozenset(self.source.free_vars)
164
165 @property
166 @abstractmethod
167 def targets(self) -> tuple[KCFG.Node, ...]: ...
168
169 @property
170 def target_ids(self) -> list[int]:
171 return sorted(target.id for target in self.targets)
172
173 @property
174 def target_vars(self) -> frozenset[str]:
175 return frozenset(set.union(set(), *(target.free_vars for target in self.targets)))
176
[docs]
177 @abstractmethod
178 def replace_source(self, node: KCFG.Node) -> KCFG.Successor: ...
179
[docs]
180 @abstractmethod
181 def replace_target(self, node: KCFG.Node) -> KCFG.Successor: ...
182
[docs]
183 @abstractmethod
184 def to_dict(self) -> dict[str, Any]: ...
185
[docs]
186 @staticmethod
187 @abstractmethod
188 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Successor: ...
189
[docs]
190 class EdgeLike(Successor):
191 source: KCFG.Node
192 target: KCFG.Node
193
194 @property
195 def targets(self) -> tuple[KCFG.Node, ...]:
196 return (self.target,)
197
[docs]
198 @final
199 @dataclass(frozen=True)
200 class Edge(EdgeLike):
201 source: KCFG.Node
202 target: KCFG.Node
203 depth: int
204 rules: tuple[str, ...]
205
[docs]
206 def to_dict(self) -> dict[str, Any]:
207 return {
208 'source': self.source.id,
209 'target': self.target.id,
210 'depth': self.depth,
211 'rules': list(self.rules),
212 }
213
[docs]
214 @staticmethod
215 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Edge:
216 return KCFG.Edge(nodes[dct['source']], nodes[dct['target']], dct['depth'], tuple(dct['rules']))
217
[docs]
218 def to_rule(
219 self,
220 label: str,
221 claim: bool = False,
222 priority: int | None = None,
223 defunc_with: KDefinition | None = None,
224 minimize: bool = False,
225 ) -> KRuleLike:
226 def is_ceil_condition(kast: KInner) -> bool:
227 return type(kast) is KApply and kast.label.name == '#Ceil'
228
229 def _simplify_config(config: KInner) -> KInner:
230 return sort_ac_collections(inline_cell_maps(config))
231
232 sentence_id = f'{label}-{self.source.id}-TO-{self.target.id}'
233 init_constraints = [c for c in self.source.cterm.constraints if not is_ceil_condition(c)]
234 init_cterm = CTerm(_simplify_config(self.source.cterm.config), init_constraints)
235 target_constraints = [c for c in self.target.cterm.constraints if not is_ceil_condition(c)]
236 target_cterm = CTerm(_simplify_config(self.target.cterm.config), target_constraints)
237 rule: KRuleLike
238 if claim:
239 rule, _ = cterm_build_claim(sentence_id, init_cterm, target_cterm)
240 else:
241 rule, _ = cterm_build_rule(
242 sentence_id, init_cterm, target_cterm, priority=priority, defunc_with=defunc_with
243 )
244 if minimize:
245 rule = minimize_rule_like(rule)
246 return rule
247
[docs]
248 def replace_source(self, node: KCFG.Node) -> KCFG.Edge:
249 assert node.id == self.source.id
250 return KCFG.Edge(node, self.target, self.depth, self.rules)
251
[docs]
252 def replace_target(self, node: KCFG.Node) -> KCFG.Edge:
253 assert node.id == self.target.id
254 return KCFG.Edge(self.source, node, self.depth, self.rules)
255
[docs]
256 @final
257 @dataclass(frozen=True)
258 class MergedEdge(EdgeLike):
259 """Merged edge is a collection of edges that have been merged into a single edge."""
260
261 source: KCFG.Node
262 target: KCFG.Node
263 edges: tuple[KCFG.Edge, ...]
264
[docs]
265 def to_dict(self) -> dict[str, Any]:
266 return {
267 'source': self.source.id,
268 'target': self.target.id,
269 'edges': [edge.to_dict() for edge in self.edges],
270 }
271
[docs]
272 @staticmethod
273 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Successor:
274 return KCFG.MergedEdge(
275 nodes[dct['source']],
276 nodes[dct['target']],
277 tuple(KCFG.Edge.from_dict(edge, nodes) for edge in dct['edges']),
278 )
279
[docs]
280 def replace_source(self, node: KCFG.Node) -> KCFG.Successor:
281 assert node.id == self.source.id
282 return KCFG.MergedEdge(node, self.target, self.edges)
283
[docs]
284 def replace_target(self, node: KCFG.Node) -> KCFG.Successor:
285 assert node.id == self.target.id
286 return KCFG.MergedEdge(self.source, node, self.edges)
287
[docs]
288 def to_rule(self, label: str, claim: bool = False, priority: int | None = None) -> KRuleLike:
289 return KCFG.Edge(self.source, self.target, 1, ()).to_rule(label, claim, priority)
290
[docs]
291 @final
292 @dataclass(frozen=True)
293 class Cover(EdgeLike):
294 source: KCFG.Node
295 target: KCFG.Node
296 csubst: CSubst
297
[docs]
298 def to_dict(self) -> dict[str, Any]:
299 return {
300 'source': self.source.id,
301 'target': self.target.id,
302 'csubst': self.csubst.to_dict(),
303 }
304
[docs]
305 @staticmethod
306 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Cover:
307 return KCFG.Cover(nodes[dct['source']], nodes[dct['target']], CSubst.from_dict(dct['csubst']))
308
[docs]
309 def replace_source(self, node: KCFG.Node) -> KCFG.Cover:
310 assert node.id == self.source.id
311 return KCFG.Cover(node, self.target, self.csubst)
312
[docs]
313 def replace_target(self, node: KCFG.Node) -> KCFG.Cover:
314 assert node.id == self.target.id
315 return KCFG.Cover(self.source, node, self.csubst)
316
[docs]
317 @dataclass(frozen=True)
318 class MultiEdge(Successor):
319 source: KCFG.Node
320
321 def __lt__(self, other: Any) -> bool:
322 if not type(other) is type(self):
323 return NotImplemented
324 return (self.source, self.target_ids) < (other.source, other.target_ids)
325
[docs]
326 @abstractmethod
327 def with_single_target(self, target: KCFG.Node) -> KCFG.MultiEdge: ...
328
[docs]
329 @final
330 @dataclass(frozen=True)
331 class Split(MultiEdge):
332 source: KCFG.Node
333 _targets: tuple[tuple[KCFG.Node, CSubst], ...]
334
335 def __init__(self, source: KCFG.Node, _targets: Iterable[tuple[KCFG.Node, CSubst]]) -> None:
336 object.__setattr__(self, 'source', source)
337 object.__setattr__(self, '_targets', tuple(_targets))
338
339 @property
340 def targets(self) -> tuple[KCFG.Node, ...]:
341 return tuple(target for target, _ in self._targets)
342
343 @property
344 def splits(self) -> dict[int, CSubst]:
345 return {target.id: csubst for target, csubst in self._targets}
346
[docs]
347 def to_dict(self) -> dict[str, Any]:
348 return {
349 'source': self.source.id,
350 'targets': [
351 {
352 'target': target.id,
353 'csubst': csubst.to_dict(),
354 }
355 for target, csubst in self._targets
356 ],
357 }
358
[docs]
359 @staticmethod
360 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Split:
361 _targets = [(nodes[target['target']], CSubst.from_dict(target['csubst'])) for target in dct['targets']]
362 return KCFG.Split(nodes[dct['source']], tuple(_targets))
363
[docs]
364 def with_single_target(self, target: KCFG.Node) -> KCFG.Split:
365 return KCFG.Split(self.source, ((target, self.splits[target.id]),))
366
367 @property
368 def covers(self) -> tuple[KCFG.Cover, ...]:
369 return tuple(KCFG.Cover(target, self.source, csubst) for target, csubst in self._targets)
370
[docs]
371 def replace_source(self, node: KCFG.Node) -> KCFG.Split:
372 assert node.id == self.source.id
373 return KCFG.Split(node, self._targets)
374
[docs]
375 def replace_target(self, node: KCFG.Node) -> KCFG.Split:
376 assert node.id in self.target_ids
377 new_targets = [
378 (node, csubst) if node.id == target_node.id else (target_node, csubst)
379 for target_node, csubst in self._targets
380 ]
381 return KCFG.Split(self.source, tuple(new_targets))
382
[docs]
383 @final
384 @dataclass(frozen=True)
385 class NDBranch(MultiEdge):
386 source: KCFG.Node
387 _targets: tuple[KCFG.Node, ...]
388 rules: tuple[str, ...]
389
390 def __init__(self, source: KCFG.Node, _targets: Iterable[KCFG.Node], rules: tuple[str, ...]) -> None:
391 object.__setattr__(self, 'source', source)
392 object.__setattr__(self, '_targets', tuple(_targets))
393 object.__setattr__(self, 'rules', rules)
394
395 @property
396 def targets(self) -> tuple[KCFG.Node, ...]:
397 return self._targets
398
[docs]
399 def to_dict(self) -> dict[str, Any]:
400 return {
401 'source': self.source.id,
402 'targets': [target.id for target in self.targets],
403 'rules': list(self.rules),
404 }
405
[docs]
406 @staticmethod
407 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.NDBranch:
408 return KCFG.NDBranch(
409 nodes[dct['source']], tuple([nodes[target] for target in dct['targets']]), tuple(dct['rules'])
410 )
411
[docs]
412 def with_single_target(self, target: KCFG.Node) -> KCFG.NDBranch:
413 return KCFG.NDBranch(self.source, (target,), ())
414
415 @property
416 def edges(self) -> tuple[KCFG.Edge, ...]:
417 return tuple(KCFG.Edge(self.source, target, 1, ()) for target in self.targets)
418
[docs]
419 def replace_source(self, node: KCFG.Node) -> KCFG.NDBranch:
420 assert node.id == self.source.id
421 return KCFG.NDBranch(node, self._targets, self.rules)
422
[docs]
423 def replace_target(self, node: KCFG.Node) -> KCFG.NDBranch:
424 assert node.id in self.target_ids
425 new_targets = [node if node.id == target_node.id else target_node for target_node in self._targets]
426 return KCFG.NDBranch(self.source, tuple(new_targets), self.rules)
427
428 _node_id: int
429 _nodes: MutableMapping[int, KCFG.Node]
430
431 _created_nodes: set[int]
432 _deleted_nodes: set[int]
433
434 _edges: dict[int, Edge]
435 _merged_edges: dict[int, MergedEdge]
436 _covers: dict[int, Cover]
437 _splits: dict[int, Split]
438 _ndbranches: dict[int, NDBranch]
439 _aliases: dict[str, int]
440 _lock: RLock
441
442 _kcfg_store: KCFGStore | None
443
444 def __init__(self, cfg_dir: Path | None = None, optimize_memory: bool = True) -> None:
445 self._node_id = 1
446 if optimize_memory:
447 from .store import OptimizedNodeStore
448
449 self._nodes = OptimizedNodeStore()
450 else:
451 self._nodes = {}
452 self._created_nodes = set()
453 self._deleted_nodes = set()
454 self._edges = {}
455 self._merged_edges = {}
456 self._covers = {}
457 self._splits = {}
458 self._ndbranches = {}
459 self._aliases = {}
460 self._lock = RLock()
461 if cfg_dir is not None:
462 self._kcfg_store = KCFGStore(cfg_dir)
463
464 def __contains__(self, item: object) -> bool:
465 if type(item) is KCFG.Node:
466 return self.contains_node(item)
467 if type(item) is KCFG.Edge:
468 return self.contains_edge(item)
469 if type(item) is KCFG.MergedEdge:
470 return self.contains_merged_edge(item)
471 if type(item) is KCFG.Cover:
472 return self.contains_cover(item)
473 if type(item) is KCFG.Split:
474 return self.contains_split(item)
475 if type(item) is KCFG.NDBranch:
476 return self.contains_ndbranch(item)
477 return False
478
479 def __enter__(self) -> KCFG:
480 self._lock.acquire()
481 return self
482
483 def __exit__(
484 self,
485 exc_type: type[BaseException] | None,
486 exc_value: BaseException | None,
487 traceback: TracebackType | None,
488 ) -> bool:
489 self._lock.release()
490 if exc_type is None:
491 return True
492 return False
493
494 @property
495 def nodes(self) -> list[KCFG.Node]:
496 return list(self._nodes.values())
497
498 @property
499 def root(self) -> list[KCFG.Node]:
500 return [node for node in self.nodes if self.is_root(node.id)]
501
502 @property
503 def vacuous(self) -> list[KCFG.Node]:
504 return [node for node in self.nodes if self.is_vacuous(node.id)]
505
506 @property
507 def stuck(self) -> list[KCFG.Node]:
508 return [node for node in self.nodes if self.is_stuck(node.id)]
509
510 @property
511 def leaves(self) -> list[KCFG.Node]:
512 return [node for node in self.nodes if self.is_leaf(node.id)]
513
514 @property
515 def covered(self) -> list[KCFG.Node]:
516 return [node for node in self.nodes if self.is_covered(node.id)]
517
518 @property
519 def uncovered(self) -> list[KCFG.Node]:
520 return [node for node in self.nodes if not self.is_covered(node.id)]
521
[docs]
522 @staticmethod
523 def from_claim(
524 defn: KDefinition, claim: KClaim, cfg_dir: Path | None = None, optimize_memory: bool = True
525 ) -> tuple[KCFG, NodeIdLike, NodeIdLike]:
526 cfg = KCFG(cfg_dir=cfg_dir, optimize_memory=optimize_memory)
527 claim_body = claim.body
528 claim_body = defn.instantiate_cell_vars(claim_body)
529 claim_body = rename_generated_vars(claim_body)
530
531 claim_lhs = CTerm.from_kast(extract_lhs(claim_body)).add_constraint(bool_to_ml_pred(claim.requires))
532 init_node = cfg.create_node(claim_lhs)
533
534 claim_rhs = CTerm.from_kast(extract_rhs(claim_body)).add_constraint(
535 bool_to_ml_pred(andBool([claim.requires, claim.ensures]))
536 )
537 target_node = cfg.create_node(claim_rhs)
538
539 return cfg, init_node.id, target_node.id
540
[docs]
541 @staticmethod
542 def path_length(_path: Iterable[KCFG.Successor]) -> int:
543 _path = tuple(_path)
544 if len(_path) == 0:
545 return 0
546 if type(_path[0]) is KCFG.Split or type(_path[0]) is KCFG.Cover:
547 return KCFG.path_length(_path[1:])
548 elif type(_path[0]) is KCFG.NDBranch:
549 return 1 + KCFG.path_length(_path[1:])
550 elif type(_path[0]) is KCFG.Edge:
551 return _path[0].depth + KCFG.path_length(_path[1:])
552 elif type(_path[0]) is KCFG.MergedEdge:
553 return min(edge.depth for edge in _path[0].edges) + KCFG.path_length(_path[1:]) # todo: check this
554 raise ValueError(f'Cannot handle Successor type: {type(_path[0])}')
555
[docs]
556 def extend(
557 self,
558 extend_result: KCFGExtendResult,
559 node: KCFG.Node,
560 logs: dict[int, tuple[LogEntry, ...]],
561 *,
562 optimize_kcfg: bool,
563 ) -> None:
564
565 def log(message: str, *, warning: bool = False) -> None:
566 result_info = extend_result.info if type(extend_result) is Step or type(extend_result) is Branch else ''
567 result_info_message = f': {result_info}' if result_info else ''
568 _LOGGER.log(
569 logging.WARNING if warning else logging.INFO,
570 f'Extending KCFG with: {message}{result_info_message}',
571 )
572
573 match extend_result:
574 case Vacuous():
575 self.add_vacuous(node.id)
576 log(f'vacuous node: {node.id}', warning=True)
577
578 case Stuck():
579 self.add_stuck(node.id)
580 log(f'stuck node: {node.id}')
581
582 case Abstract(cterm):
583 new_node = self.create_node(cterm)
584 self.create_cover(node.id, new_node.id)
585 log(f'abstraction node: {node.id}')
586
587 case Step(cterm, depth, next_node_logs, rule_labels, _):
588 node_id = node.id
589 next_node = self.create_node(cterm)
590 # Optimization for steps consists of on-the-fly merging of consecutive edges and can
591 # be performed only if the current node has a single predecessor connected by an Edge
592 if (
593 optimize_kcfg
594 and (len(predecessors := self.predecessors(target_id=node.id)) == 1)
595 and isinstance(in_edge := predecessors[0], KCFG.Edge)
596 ):
597 # The existing edge is removed and the step parameters are updated accordingly
598 self.remove_edge(in_edge.source.id, node.id)
599 node_id = in_edge.source.id
600 depth += in_edge.depth
601 rule_labels = list(in_edge.rules) + rule_labels
602 next_node_logs = logs[node.id] + next_node_logs if node.id in logs else next_node_logs
603 self.remove_node(node.id)
604 self.create_edge(node_id, next_node.id, depth, rule_labels)
605 logs[next_node.id] = next_node_logs
606 log(f'basic block at depth {depth}: {node_id} --> {next_node.id}')
607
608 case Branch(branches, _):
609 branch_node_ids = self.split_on_constraints(node.id, branches)
610 log(f'{len(branches)} branches: {node.id} --> {branch_node_ids}')
611
612 case NDBranch(cterms, next_node_logs, rule_labels):
613 next_ids = [self.create_node(cterm).id for cterm in cterms]
614 for i in next_ids:
615 logs[i] = next_node_logs
616 self.create_ndbranch(node.id, next_ids, rules=rule_labels)
617 log(f'{len(cterms)} non-deterministic branches: {node.id} --> {next_ids}')
618
619 case _:
620 raise AssertionError()
621
[docs]
622 def to_dict_no_nodes(self) -> dict[str, Any]:
623 nodes = list(self._nodes.keys())
624 edges = [edge.to_dict() for edge in self.edges()]
625 merged_edges = [merged_edge.to_dict() for merged_edge in self.merged_edges()]
626 covers = [cover.to_dict() for cover in self.covers()]
627 splits = [split.to_dict() for split in self.splits()]
628 ndbranches = [ndbranch.to_dict() for ndbranch in self.ndbranches()]
629
630 aliases = dict(sorted(self._aliases.items()))
631
632 res = {
633 'next': self._node_id,
634 'nodes': nodes,
635 'edges': edges,
636 'merged_edges': merged_edges,
637 'covers': covers,
638 'splits': splits,
639 'ndbranches': ndbranches,
640 'aliases': aliases,
641 }
642 return {k: v for k, v in res.items() if v}
643
[docs]
644 def to_dict(self) -> dict[str, Any]:
645 nodes = [node.to_dict() for node in self.nodes]
646 edges = [edge.to_dict() for edge in self.edges()]
647 merged_edges = [merged_edge.to_dict() for merged_edge in self.merged_edges()]
648 covers = [cover.to_dict() for cover in self.covers()]
649 splits = [split.to_dict() for split in self.splits()]
650 ndbranches = [ndbranch.to_dict() for ndbranch in self.ndbranches()]
651
652 aliases = dict(sorted(self._aliases.items()))
653
654 res = {
655 'next': self._node_id,
656 'nodes': nodes,
657 'edges': edges,
658 'merged_edges': merged_edges,
659 'covers': covers,
660 'splits': splits,
661 'ndbranches': ndbranches,
662 'aliases': aliases,
663 }
664 return {k: v for k, v in res.items() if v}
665
[docs]
666 @staticmethod
667 def from_dict(dct: Mapping[str, Any], optimize_memory: bool = True) -> KCFG:
668 cfg = KCFG(optimize_memory=optimize_memory)
669
670 for node_dict in dct.get('nodes') or []:
671 node = KCFG.Node.from_dict(node_dict)
672 cfg.add_node(node)
673
674 max_id = max([node.id for node in cfg.nodes], default=0)
675 cfg._node_id = dct.get('next', max_id + 1)
676
677 for edge_dict in dct.get('edges') or []:
678 edge = KCFG.Edge.from_dict(edge_dict, cfg._nodes)
679 cfg.add_successor(edge)
680
681 for edge_dict in dct.get('merged_edges') or []:
682 merged_edge = KCFG.MergedEdge.from_dict(edge_dict, cfg._nodes)
683 cfg.add_successor(merged_edge)
684
685 for cover_dict in dct.get('covers') or []:
686 cover = KCFG.Cover.from_dict(cover_dict, cfg._nodes)
687 cfg.add_successor(cover)
688
689 for split_dict in dct.get('splits') or []:
690 split = KCFG.Split.from_dict(split_dict, cfg._nodes)
691 cfg.add_successor(split)
692
693 for ndbranch_dict in dct.get('ndbranches') or []:
694 ndbranch = KCFG.NDBranch.from_dict(ndbranch_dict, cfg._nodes)
695 cfg.add_successor(ndbranch)
696
697 for alias, node_id in dct.get('aliases', {}).items():
698 cfg.add_alias(alias=alias, node_id=node_id)
699
700 return cfg
701
[docs]
702 def aliases(self, node_id: NodeIdLike) -> list[str]:
703 node_id = self._resolve(node_id)
704 return [alias for alias, value in self._aliases.items() if node_id == value]
705
[docs]
706 def to_json(self) -> str:
707 return json.dumps(self.to_dict(), sort_keys=True)
708
[docs]
709 @staticmethod
710 def from_json(s: str, optimize_memory: bool = True) -> KCFG:
711 return KCFG.from_dict(json.loads(s), optimize_memory=optimize_memory)
712
[docs]
713 def to_rules(
714 self,
715 _id: str | None = None,
716 priority: int = 20,
717 defunc_with: KDefinition | None = None,
718 ) -> list[KRuleLike]:
719 _id = 'BASIC-BLOCK' if _id is None else _id
720 return [e.to_rule(_id, priority=priority, defunc_with=defunc_with) for e in self.edges()] + [
721 m.to_rule(_id, priority=priority) for m in self.merged_edges()
722 ]
723
[docs]
724 def to_module(
725 self,
726 module_name: str | None = None,
727 imports: Iterable[KImport] = (),
728 priority: int = 20,
729 att: KAtt = EMPTY_ATT,
730 defunc_with: KDefinition | None = None,
731 ) -> KFlatModule:
732 module_name = 'KCFG' if module_name is None else module_name
733 return KFlatModule(
734 module_name, self.to_rules(priority=priority, defunc_with=defunc_with), imports=imports, att=att
735 )
736
737 def _resolve_or_none(self, id_like: NodeIdLike) -> int | None:
738 if type(id_like) is int:
739 if id_like in self._nodes:
740 return id_like
741
742 return None
743
744 if type(id_like) is not str:
745 raise TypeError(f'Expected int or str for id_like, got: {id_like}')
746
747 if id_like.startswith('@'):
748 if id_like[1:] in self._aliases:
749 return self._aliases[id_like[1:]]
750 raise ValueError(f'Unknown alias: {id_like}')
751
752 return None
753
754 def _resolve(self, id_like: NodeIdLike) -> int:
755 match = self._resolve_or_none(id_like)
756 if not match:
757 raise ValueError(f'Unknown node: {id_like}')
758 return match
759
[docs]
760 def node(self, node_id: NodeIdLike) -> KCFG.Node:
761 node_id = self._resolve(node_id)
762 return self._nodes[node_id]
763
[docs]
764 def get_node(self, node_id: NodeIdLike) -> KCFG.Node | None:
765 resolved_id = self._resolve_or_none(node_id)
766 if resolved_id is None:
767 return None
768 return self._nodes[resolved_id]
769
[docs]
770 def contains_node(self, node: KCFG.Node) -> bool:
771 return bool(self.get_node(node.id))
772
[docs]
773 def add_node(self, node: KCFG.Node) -> None:
774 if node.id in self._nodes:
775 raise ValueError(f'Node with id already exists: {node.id}')
776 self._nodes[node.id] = node
777 self._created_nodes.add(node.id)
778
[docs]
779 def create_node(self, cterm: CTerm) -> KCFG.Node:
780 node = KCFG.Node(self._node_id, cterm)
781 self._node_id += 1
782 self._nodes[node.id] = node
783 self._created_nodes.add(node.id)
784 return node
785
[docs]
786 def remove_node(self, node_id: NodeIdLike) -> None:
787 self.remove_edges_around(node_id)
788
789 node_id = self._resolve(node_id)
790 self._nodes.pop(node_id)
791 self._deleted_nodes.add(node_id)
792 self._created_nodes.discard(node_id)
793
[docs]
794 def remove_edges_around(self, node_id: NodeIdLike) -> None:
795 node_id = self._resolve(node_id)
796
797 self._edges = {k: s for k, s in self._edges.items() if k != node_id and node_id not in s.target_ids}
798 self._merged_edges = {
799 k: s for k, s in self._merged_edges.items() if k != node_id and node_id not in s.target_ids
800 }
801 self._covers = {k: s for k, s in self._covers.items() if k != node_id and node_id not in s.target_ids}
802
803 self._splits = {k: s for k, s in self._splits.items() if k != node_id and node_id not in s.target_ids}
804 self._ndbranches = {k: b for k, b in self._ndbranches.items() if k != node_id and node_id not in b.target_ids}
805
806 for alias in [alias for alias, id in self._aliases.items() if id == node_id]:
807 self.remove_alias(alias)
808
809 def _update_refs(self, node_id: int) -> None:
810 node = self.node(node_id)
811 for succ in self.successors(node_id):
812 new_succ = succ.replace_source(node)
813 if type(new_succ) is KCFG.Edge:
814 self._edges[new_succ.source.id] = new_succ
815 if type(new_succ) is KCFG.MergedEdge:
816 self._merged_edges[new_succ.source.id] = new_succ
817 if type(new_succ) is KCFG.Cover:
818 self._covers[new_succ.source.id] = new_succ
819 if type(new_succ) is KCFG.Split:
820 self._splits[new_succ.source.id] = new_succ
821 if type(new_succ) is KCFG.NDBranch:
822 self._ndbranches[new_succ.source.id] = new_succ
823
824 for pred in self.predecessors(node_id):
825 new_pred = pred.replace_target(node)
826 if type(new_pred) is KCFG.Edge:
827 self._edges[new_pred.source.id] = new_pred
828 if type(new_pred) is KCFG.MergedEdge:
829 self._merged_edges[new_pred.source.id] = new_pred
830 if type(new_pred) is KCFG.Cover:
831 self._covers[new_pred.source.id] = new_pred
832 if type(new_pred) is KCFG.Split:
833 self._splits[new_pred.source.id] = new_pred
834 if type(new_pred) is KCFG.NDBranch:
835 self._ndbranches[new_pred.source.id] = new_pred
836
[docs]
837 def remove_attr(self, node_id: NodeIdLike, attr: NodeAttr) -> None:
838 node = self.node(node_id)
839 new_node = node.remove_attr(attr)
840 self.replace_node(new_node)
841
[docs]
842 def discard_attr(self, node_id: NodeIdLike, attr: NodeAttr) -> None:
843 node = self.node(node_id)
844 new_node = node.discard_attr(attr)
845 self.replace_node(new_node)
846
[docs]
847 def add_attr(self, node_id: NodeIdLike, attr: NodeAttr) -> None:
848 node = self.node(node_id)
849 new_node = node.add_attr(attr)
850 self.replace_node(new_node)
851
[docs]
852 def let_node(
853 self, node_id: NodeIdLike, cterm: CTerm | None = None, attrs: Iterable[KCFGNodeAttr] | None = None
854 ) -> None:
855 node = self.node(node_id)
856 new_node = node.let(cterm=cterm, attrs=attrs)
857 self.replace_node(new_node)
858
[docs]
859 def replace_node(self, node: KCFG.Node) -> None:
860 self._nodes[node.id] = node
861 self._created_nodes.add(node.id)
862 self._update_refs(node.id)
863
[docs]
864 def successors(self, source_id: NodeIdLike) -> list[Successor]:
865 out_edges: Iterable[KCFG.Successor] = self.edges(source_id=source_id)
866 out_merged_edges: Iterable[KCFG.Successor] = self.merged_edges(source_id=source_id)
867 out_covers: Iterable[KCFG.Successor] = self.covers(source_id=source_id)
868 out_splits: Iterable[KCFG.Successor] = self.splits(source_id=source_id)
869 out_ndbranches: Iterable[KCFG.Successor] = self.ndbranches(source_id=source_id)
870 return list(out_edges) + list(out_merged_edges) + list(out_covers) + list(out_splits) + list(out_ndbranches)
871
[docs]
872 def predecessors(self, target_id: NodeIdLike) -> list[Successor]:
873 in_edges: Iterable[KCFG.Successor] = self.edges(target_id=target_id)
874 in_merged_edges: Iterable[KCFG.Successor] = self.merged_edges(target_id=target_id)
875 in_covers: Iterable[KCFG.Successor] = self.covers(target_id=target_id)
876 in_splits: Iterable[KCFG.Successor] = self.splits(target_id=target_id)
877 in_ndbranches: Iterable[KCFG.Successor] = self.ndbranches(target_id=target_id)
878 return list(in_edges) + list(in_merged_edges) + list(in_covers) + list(in_splits) + list(in_ndbranches)
879
880 def _check_no_successors(self, source_id: NodeIdLike) -> None:
881 if len(self.successors(source_id)) > 0:
882 raise ValueError(f'Node already has successors: {source_id} -> {self.successors(source_id)}')
883
884 def _check_no_zero_loops(self, source_id: NodeIdLike, target_ids: Iterable[NodeIdLike]) -> None:
885 for target_id in target_ids:
886 path = self.shortest_path_between(target_id, source_id)
887 if path is not None and KCFG.path_length(path) == 0:
888 raise ValueError(
889 f'Adding successor would create zero-length loop with backedge: {source_id} -> {target_id}'
890 )
891
[docs]
892 def add_successor(self, succ: KCFG.Successor) -> None:
893 self._check_no_successors(succ.source.id)
894 self._check_no_zero_loops(succ.source.id, succ.target_ids)
895 if type(succ) is KCFG.Edge:
896 self._edges[succ.source.id] = succ
897 elif type(succ) is KCFG.MergedEdge:
898 self._merged_edges[succ.source.id] = succ
899 elif type(succ) is KCFG.Cover:
900 self._covers[succ.source.id] = succ
901 else:
902 if len(succ.target_ids) <= 1:
903 raise ValueError(
904 f'Cannot create {type(succ)} node with less than 2 targets: {succ.source.id} -> {succ.target_ids}'
905 )
906 if type(succ) is KCFG.Split:
907 self._splits[succ.source.id] = succ
908 elif type(succ) is KCFG.NDBranch:
909 self._ndbranches[succ.source.id] = succ
910
[docs]
911 def edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> Edge | None:
912 source_id = self._resolve(source_id)
913 target_id = self._resolve(target_id)
914 edge = self._edges.get(source_id, None)
915 return edge if edge is not None and edge.target.id == target_id else None
916
[docs]
917 def edges(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[Edge]:
918 source_id = self._resolve(source_id) if source_id is not None else None
919 target_id = self._resolve(target_id) if target_id is not None else None
920 return [
921 edge
922 for edge in self._edges.values()
923 if (source_id is None or source_id == edge.source.id) and (target_id is None or target_id == edge.target.id)
924 ]
925
[docs]
926 def contains_edge(self, edge: Edge) -> bool:
927 if other := self.edge(source_id=edge.source.id, target_id=edge.target.id):
928 return edge == other
929 return False
930
[docs]
931 def create_edge(self, source_id: NodeIdLike, target_id: NodeIdLike, depth: int, rules: Iterable[str] = ()) -> Edge:
932 if depth <= 0:
933 raise ValueError(f'Cannot build KCFG Edge with non-positive depth: {depth}')
934 source = self.node(source_id)
935 target = self.node(target_id)
936 edge = KCFG.Edge(source, target, depth, tuple(rules))
937 self.add_successor(edge)
938 return edge
939
[docs]
940 def remove_edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> None:
941 source_id = self._resolve(source_id)
942 target_id = self._resolve(target_id)
943 edge = self.edge(source_id, target_id)
944 if not edge:
945 raise ValueError(f'Edge does not exist: {source_id} -> {target_id}')
946 self._edges.pop(source_id)
947
[docs]
948 def merged_edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> MergedEdge | None:
949 source_id = self._resolve(source_id)
950 target_id = self._resolve(target_id)
951 merged_edge = self._merged_edges.get(source_id, None)
952 return merged_edge if merged_edge is not None and merged_edge.target.id == target_id else None
953
[docs]
954 def merged_edges(
955 self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None
956 ) -> list[MergedEdge]:
957 source_id = self._resolve(source_id) if source_id is not None else None
958 target_id = self._resolve(target_id) if target_id is not None else None
959 return [
960 merged_edge
961 for merged_edge in self._merged_edges.values()
962 if (source_id is None or source_id == merged_edge.source.id)
963 and (target_id is None or target_id == merged_edge.target.id)
964 ]
965
[docs]
966 def contains_merged_edge(self, edge: MergedEdge) -> bool:
967 if other := self.merged_edge(source_id=edge.source.id, target_id=edge.target.id):
968 return edge == other
969 return False
970
[docs]
971 def create_merged_edge(
972 self, source_id: NodeIdLike, target_id: NodeIdLike, edges: Iterable[Edge | MergedEdge]
973 ) -> MergedEdge:
974 if len(list(edges)) == 0:
975 raise ValueError(f'Cannot build KCFG MergedEdge with no edges: {edges}')
976 source = self.node(source_id)
977 target = self.node(target_id)
978 flatten_edges: list[KCFG.Edge] = []
979 for edge in edges:
980 if isinstance(edge, KCFG.MergedEdge):
981 flatten_edges.extend(edge.edges)
982 else:
983 flatten_edges.append(edge)
984 merged_edge = KCFG.MergedEdge(source, target, tuple(flatten_edges))
985 self.add_successor(merged_edge)
986 return merged_edge
987
[docs]
988 def remove_merged_edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> None:
989 source_id = self._resolve(source_id)
990 target_id = self._resolve(target_id)
991 merged_edge = self.merged_edge(source_id, target_id)
992 if not merged_edge:
993 raise ValueError(f'MergedEdge does not exist: {source_id} -> {target_id}')
994 self._merged_edges.pop(source_id)
995
[docs]
996 def general_edges(
997 self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None
998 ) -> list[Edge | MergedEdge]:
999 return self.edges(source_id=source_id, target_id=target_id) + self.merged_edges(
1000 source_id=source_id, target_id=target_id
1001 )
1002
[docs]
1003 def cover(self, source_id: NodeIdLike, target_id: NodeIdLike) -> Cover | None:
1004 source_id = self._resolve(source_id)
1005 target_id = self._resolve(target_id)
1006 cover = self._covers.get(source_id, None)
1007 return cover if cover is not None and cover.target.id == target_id else None
1008
[docs]
1009 def covers(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[Cover]:
1010 source_id = self._resolve(source_id) if source_id is not None else None
1011 target_id = self._resolve(target_id) if target_id is not None else None
1012 return [
1013 cover
1014 for cover in self._covers.values()
1015 if (source_id is None or source_id == cover.source.id)
1016 and (target_id is None or target_id == cover.target.id)
1017 ]
1018
[docs]
1019 def contains_cover(self, cover: Cover) -> bool:
1020 if other := self.cover(source_id=cover.source.id, target_id=cover.target.id):
1021 return cover == other
1022 return False
1023
[docs]
1024 def create_cover(self, source_id: NodeIdLike, target_id: NodeIdLike, csubst: CSubst | None = None) -> Cover:
1025 source = self.node(source_id)
1026 target = self.node(target_id)
1027 if csubst is None:
1028 csubst = target.cterm.match_with_constraint(source.cterm)
1029 if csubst is None:
1030 raise ValueError(f'No matching between: {source.id} and {target.id}')
1031 cover = KCFG.Cover(source, target, csubst=csubst)
1032 self.add_successor(cover)
1033 return cover
1034
[docs]
1035 def remove_cover(self, source_id: NodeIdLike, target_id: NodeIdLike) -> None:
1036 source_id = self._resolve(source_id)
1037 target_id = self._resolve(target_id)
1038 cover = self.cover(source_id, target_id)
1039 if not cover:
1040 raise ValueError(f'Cover does not exist: {source_id} -> {target_id}')
1041 self._covers.pop(source_id)
1042
[docs]
1043 def edge_likes(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[EdgeLike]:
1044 return (
1045 cast('List[KCFG.EdgeLike]', self.edges(source_id=source_id, target_id=target_id))
1046 + cast('List[KCFG.EdgeLike]', self.covers(source_id=source_id, target_id=target_id))
1047 + cast('List[KCFG.EdgeLike]', self.merged_edges(source_id=source_id, target_id=target_id))
1048 )
1049
[docs]
1050 def add_vacuous(self, node_id: NodeIdLike) -> None:
1051 self.add_attr(node_id, KCFGNodeAttr.VACUOUS)
1052
[docs]
1053 def remove_vacuous(self, node_id: NodeIdLike) -> None:
1054 self.remove_attr(node_id, KCFGNodeAttr.VACUOUS)
1055
[docs]
1056 def discard_vacuous(self, node_id: NodeIdLike) -> None:
1057 self.discard_attr(node_id, KCFGNodeAttr.VACUOUS)
1058
[docs]
1059 def add_stuck(self, node_id: NodeIdLike) -> None:
1060 self.add_attr(node_id, KCFGNodeAttr.STUCK)
1061
[docs]
1062 def remove_stuck(self, node_id: NodeIdLike) -> None:
1063 self.remove_attr(node_id, KCFGNodeAttr.STUCK)
1064
[docs]
1065 def discard_stuck(self, node_id: NodeIdLike) -> None:
1066 self.discard_attr(node_id, KCFGNodeAttr.STUCK)
1067
[docs]
1068 def splits(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[Split]:
1069 source_id = self._resolve(source_id) if source_id is not None else None
1070 target_id = self._resolve(target_id) if target_id is not None else None
1071 return [
1072 s
1073 for s in self._splits.values()
1074 if (source_id is None or source_id == s.source.id) and (target_id is None or target_id in s.target_ids)
1075 ]
1076
[docs]
1077 def contains_split(self, split: Split) -> bool:
1078 return split in self._splits.values()
1079
[docs]
1080 def create_split(self, source_id: NodeIdLike, splits: Iterable[tuple[NodeIdLike, CSubst]]) -> KCFG.Split:
1081 source_id = self._resolve(source_id)
1082 split = KCFG.Split(self.node(source_id), tuple((self.node(nid), csubst) for nid, csubst in list(splits)))
1083 self.add_successor(split)
1084 return split
1085
[docs]
1086 def create_split_by_nodes(self, source_id: NodeIdLike, target_ids: Iterable[NodeIdLike]) -> KCFG.Split | None:
1087 """Create a split without crafting a CSubst."""
1088 source = self.node(source_id)
1089 targets = [self.node(nid) for nid in target_ids]
1090 try:
1091 csubsts = [not_none(source.cterm.match_with_constraint(target.cterm)) for target in targets]
1092 except ValueError:
1093 return None
1094 return self.create_split(source.id, zip(target_ids, csubsts, strict=True))
1095
[docs]
1096 def ndbranches(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[NDBranch]:
1097 source_id = self._resolve(source_id) if source_id is not None else None
1098 target_id = self._resolve(target_id) if target_id is not None else None
1099 return [
1100 b
1101 for b in self._ndbranches.values()
1102 if (source_id is None or source_id == b.source.id) and (target_id is None or target_id in b.target_ids)
1103 ]
1104
[docs]
1105 def contains_ndbranch(self, ndbranch: NDBranch) -> bool:
1106 return ndbranch in self._ndbranches
1107
[docs]
1108 def create_ndbranch(
1109 self, source_id: NodeIdLike, ndbranches: Iterable[NodeIdLike], rules: Iterable[str] = ()
1110 ) -> KCFG.NDBranch:
1111 source_id = self._resolve(source_id)
1112 ndbranch = KCFG.NDBranch(self.node(source_id), tuple(self.node(nid) for nid in list(ndbranches)), tuple(rules))
1113 self.add_successor(ndbranch)
1114 return ndbranch
1115
[docs]
1116 def split_on_constraints(self, source_id: NodeIdLike, constraints: Iterable[KInner]) -> list[int]:
1117 source = self.node(source_id)
1118 branch_node_ids = [self.create_node(source.cterm.add_constraint(c)).id for c in constraints]
1119 csubsts = [not_none(source.cterm.match_with_constraint(self.node(id).cterm)) for id in branch_node_ids]
1120 self.create_split(source.id, zip(branch_node_ids, csubsts, strict=True))
1121 return branch_node_ids
1122
[docs]
1123 def add_alias(self, alias: str, node_id: NodeIdLike) -> None:
1124 if '@' in alias:
1125 raise ValueError('Alias may not contain "@"')
1126 if alias in self._aliases:
1127 raise ValueError(f'Duplicate alias: {alias}')
1128 node_id = self._resolve(node_id)
1129 self._aliases[alias] = node_id
1130
[docs]
1131 def remove_alias(self, alias: str) -> None:
1132 if alias not in self._aliases:
1133 raise ValueError(f'Alias does not exist: {alias}')
1134 self._aliases.pop(alias)
1135
[docs]
1136 def is_root(self, node_id: NodeIdLike) -> bool:
1137 node_id = self._resolve(node_id)
1138 return len(self.predecessors(node_id)) == 0
1139
[docs]
1140 def is_vacuous(self, node_id: NodeIdLike) -> bool:
1141 return KCFGNodeAttr.VACUOUS in self.node(node_id).attrs
1142
[docs]
1143 def is_stuck(self, node_id: NodeIdLike) -> bool:
1144 return KCFGNodeAttr.STUCK in self.node(node_id).attrs
1145
[docs]
1146 def is_split(self, node_id: NodeIdLike) -> bool:
1147 node_id = self._resolve(node_id)
1148 return node_id in self._splits
1149
[docs]
1150 def is_ndbranch(self, node_id: NodeIdLike) -> bool:
1151 node_id = self._resolve(node_id)
1152 return node_id in self._ndbranches
1153
[docs]
1154 def is_leaf(self, node_id: NodeIdLike) -> bool:
1155 return len(self.successors(node_id)) == 0
1156
[docs]
1157 def is_covered(self, node_id: NodeIdLike) -> bool:
1158 node_id = self._resolve(node_id)
1159 return node_id in self._covers
1160
[docs]
1161 def prune(self, node_id: NodeIdLike, keep_nodes: Iterable[NodeIdLike] = ()) -> list[int]:
1162 nodes = self.reachable_nodes(node_id)
1163 keep_nodes = [self._resolve(nid) for nid in keep_nodes]
1164 pruned_nodes: list[int] = []
1165 for node in nodes:
1166 if node.id not in keep_nodes:
1167 self.remove_node(node.id)
1168 pruned_nodes.append(node.id)
1169 return pruned_nodes
1170
[docs]
1171 def shortest_path_between(
1172 self, source_node_id: NodeIdLike, target_node_id: NodeIdLike
1173 ) -> tuple[Successor, ...] | None:
1174 paths = self.paths_between(source_node_id, target_node_id)
1175 if len(paths) == 0:
1176 return None
1177 return sorted(paths, key=(lambda path: KCFG.path_length(path)))[0]
1178
[docs]
1179 def shortest_distance_between(self, node_1_id: NodeIdLike, node_2_id: NodeIdLike) -> int | None:
1180 path_1 = self.shortest_path_between(node_1_id, node_2_id)
1181 path_2 = self.shortest_path_between(node_2_id, node_1_id)
1182 distance: int | None = None
1183 if path_1 is not None:
1184 distance = KCFG.path_length(path_1)
1185 if path_2 is not None:
1186 distance_2 = KCFG.path_length(path_2)
1187 if distance is None or distance_2 < distance:
1188 distance = distance_2
1189 return distance
1190
[docs]
1191 def zero_depth_between(self, node_1_id: NodeIdLike, node_2_id: NodeIdLike) -> bool:
1192 _node_1_id = self._resolve(node_1_id)
1193 _node_2_id = self._resolve(node_2_id)
1194 if _node_1_id == _node_2_id:
1195 return True
1196 # Short-circuit and don't run pathing algorithm if there is no 0 length path on the first step.
1197 path_lengths = [
1198 self.path_length([successor]) for successor in self.successors(_node_1_id) + self.successors(_node_2_id)
1199 ]
1200 if 0 not in path_lengths:
1201 return False
1202
1203 shortest_distance = self.shortest_distance_between(_node_1_id, _node_2_id)
1204
1205 return shortest_distance is not None and shortest_distance == 0
1206
[docs]
1207 def paths_between(self, source_id: NodeIdLike, target_id: NodeIdLike) -> list[tuple[Successor, ...]]:
1208 source_id = self._resolve(source_id)
1209 target_id = self._resolve(target_id)
1210
1211 if source_id == target_id:
1212 return [()]
1213
1214 source_successors = list(self.successors(source_id))
1215 assert len(source_successors) <= 1
1216 if len(source_successors) == 0:
1217 return []
1218
1219 paths: list[tuple[KCFG.Successor, ...]] = []
1220 worklist: list[list[KCFG.Successor]] = [[source_successors[0]]]
1221
1222 def _in_path(_nid: int, _path: list[KCFG.Successor]) -> bool:
1223 for succ in _path:
1224 if _nid == succ.source.id:
1225 return True
1226 if len(_path) > 0:
1227 if isinstance(_path[-1], KCFG.EdgeLike) and _path[-1].target.id == _nid:
1228 return True
1229 elif isinstance(_path[-1], KCFG.MultiEdge) and _nid in _path[-1].target_ids:
1230 return True
1231 return False
1232
1233 while worklist:
1234 curr_path = worklist.pop()
1235 curr_successor = curr_path[-1]
1236 successors: list[KCFG.Successor] = []
1237
1238 if isinstance(curr_successor, KCFG.EdgeLike):
1239 if curr_successor.target.id == target_id:
1240 paths.append(tuple(curr_path))
1241 continue
1242 else:
1243 successors = list(self.successors(curr_successor.target.id))
1244
1245 elif isinstance(curr_successor, KCFG.MultiEdge):
1246 if len(list(curr_successor.targets)) == 1:
1247 target = list(curr_successor.targets)[0]
1248 if target.id == target_id:
1249 paths.append(tuple(curr_path))
1250 continue
1251 else:
1252 successors = list(self.successors(target.id))
1253 if len(list(curr_successor.targets)) > 1:
1254 curr_path = curr_path[0:-1]
1255 successors = [curr_successor.with_single_target(target) for target in curr_successor.targets]
1256
1257 for successor in successors:
1258 if isinstance(successor, KCFG.EdgeLike) and not _in_path(successor.target.id, curr_path):
1259 worklist.append(curr_path + [successor])
1260 elif isinstance(successor, KCFG.MultiEdge):
1261 if len(list(successor.targets)) == 1:
1262 target = list(successor.targets)[0]
1263 if not _in_path(target.id, curr_path):
1264 worklist.append(curr_path + [successor])
1265 elif len(list(successor.targets)) > 1:
1266 worklist.append(curr_path + [successor])
1267
1268 return paths
1269
[docs]
1270 def reachable_nodes(self, source_id: NodeIdLike, *, reverse: bool = False) -> set[KCFG.Node]:
1271 visited: set[KCFG.Node] = set()
1272 worklist: list[KCFG.Node] = [self.node(source_id)]
1273
1274 while worklist:
1275 node = worklist.pop()
1276
1277 if node in visited:
1278 continue
1279
1280 visited.add(node)
1281
1282 if not reverse:
1283 worklist.extend(target for succ in self.successors(source_id=node.id) for target in succ.targets)
1284 else:
1285 worklist.extend(succ.source for succ in self.predecessors(target_id=node.id))
1286
1287 return visited
1288
[docs]
1289 def write_cfg_data(self) -> None:
1290 assert self._kcfg_store is not None
1291 self._kcfg_store.write_cfg_data(
1292 self, self.to_dict_no_nodes(), deleted_nodes=self._deleted_nodes, created_nodes=self._created_nodes
1293 )
1294 self._deleted_nodes.clear()
1295 self._created_nodes.clear()
1296
[docs]
1297 @staticmethod
1298 def read_cfg_data(cfg_dir: Path) -> KCFG:
1299 store = KCFGStore(cfg_dir)
1300 cfg = KCFG.from_dict(store.read_cfg_data())
1301 cfg._kcfg_store = store
1302 return cfg
1303
[docs]
1304 @staticmethod
1305 def read_node_data(cfg_dir: Path, node_id: int) -> KCFG.Node:
1306 store = KCFGStore(cfg_dir)
1307 return KCFG.Node.from_dict(store.read_node_data(node_id))
1308
1309
[docs]
1310class KCFGExtendResult(ABC): ...
1311
1312
[docs]
1313@final
1314@dataclass(frozen=True)
1315class Vacuous(KCFGExtendResult): ...
1316
1317
[docs]
1318@final
1319@dataclass(frozen=True)
1320class Stuck(KCFGExtendResult): ...
1321
1322
[docs]
1323@final
1324@dataclass(frozen=True)
1325class Abstract(KCFGExtendResult):
1326 cterm: CTerm
1327
1328
[docs]
1329@final
1330@dataclass(frozen=True)
1331class Step(KCFGExtendResult):
1332 cterm: CTerm
1333 depth: int
1334 logs: tuple[LogEntry, ...]
1335 rule_labels: list[str]
1336 cut: bool = field(default=False)
1337 info: str = field(default='')
1338
1339
[docs]
1340@final
1341@dataclass(frozen=True)
1342class Branch(KCFGExtendResult):
1343 constraints: tuple[KInner, ...]
1344 heuristic: bool
1345 info: str = field(default='')
1346
1347 def __init__(self, constraints: Iterable[KInner], *, heuristic: bool = False, info: str = ''):
1348 object.__setattr__(self, 'constraints', tuple(constraints))
1349 object.__setattr__(self, 'heuristic', heuristic)
1350 object.__setattr__(self, 'info', info)
1351
1352
[docs]
1353@final
1354@dataclass(frozen=True)
1355class NDBranch(KCFGExtendResult):
1356 cterms: tuple[CTerm, ...]
1357 logs: tuple[LogEntry, ...]
1358 rule_labels: tuple[str, ...]
1359
1360 def __init__(self, cterms: Iterable[CTerm], logs: Iterable[LogEntry,], rule_labels: Iterable[str]):
1361 object.__setattr__(self, 'cterms', tuple(cterms))
1362 object.__setattr__(self, 'logs', tuple(logs))
1363 object.__setattr__(self, 'rule_labels', tuple(rule_labels))