1from __future__ import annotations
2
3import json
4import logging
5from abc import ABC, abstractmethod
6from collections.abc import Container
7from dataclasses import dataclass, field
8from threading import RLock
9from typing import TYPE_CHECKING, Final, List, Union, cast, final
10
11from ..cterm import CSubst, CTerm, cterm_build_claim, cterm_build_rule
12from ..kast import EMPTY_ATT
13from ..kast.inner import KApply
14from ..kast.manip import (
15 bool_to_ml_pred,
16 extract_lhs,
17 extract_rhs,
18 inline_cell_maps,
19 minimize_rule_like,
20 rename_generated_vars,
21 sort_ac_collections,
22)
23from ..kast.outer import KFlatModule
24from ..prelude.kbool import andBool
25from ..utils import ensure_dir_path, not_none
26
27if TYPE_CHECKING:
28 from collections.abc import Iterable, Mapping, MutableMapping
29 from pathlib import Path
30 from types import TracebackType
31 from typing import Any
32
33 from pyk.kore.rpc import LogEntry
34
35 from ..kast import KAtt
36 from ..kast.inner import KInner
37 from ..kast.outer import KClaim, KDefinition, KImport, KRuleLike
38
39
40NodeIdLike = int | str
41
42_LOGGER: Final = logging.getLogger(__name__)
43
44
[docs]
45@dataclass(frozen=True)
46class NodeAttr:
47 value: str
48
49
[docs]
50class KCFGNodeAttr(NodeAttr):
51 VACUOUS = NodeAttr('vacuous')
52 STUCK = NodeAttr('stuck')
53
54
[docs]
55class KCFGStore:
56 store_path: Path
57
58 def __init__(self, store_path: Path) -> None:
59 self.store_path = store_path
60 ensure_dir_path(store_path)
61 ensure_dir_path(self.kcfg_node_dir)
62
63 @property
64 def kcfg_json_path(self) -> Path:
65 return self.store_path / 'kcfg.json'
66
67 @property
68 def kcfg_node_dir(self) -> Path:
69 return self.store_path / 'nodes'
70
[docs]
71 def kcfg_node_path(self, node_id: int) -> Path:
72 return self.kcfg_node_dir / f'{node_id}.json'
73
[docs]
74 def write_cfg_data(
75 self, kcfg: KCFG, dct: dict[str, Any], deleted_nodes: Iterable[int] = (), created_nodes: Iterable[int] = ()
76 ) -> None:
77 vacuous_nodes = [
78 node_id for node_id in kcfg._nodes.keys() if KCFGNodeAttr.VACUOUS in kcfg._nodes[node_id].attrs
79 ]
80 stuck_nodes = [node_id for node_id in kcfg._nodes.keys() if KCFGNodeAttr.STUCK in kcfg._nodes[node_id].attrs]
81 dct['vacuous'] = vacuous_nodes
82 dct['stuck'] = stuck_nodes
83 for node_id in deleted_nodes:
84 self.kcfg_node_path(node_id).unlink(missing_ok=True)
85 for node_id in created_nodes:
86 self.kcfg_node_path(node_id).write_text(json.dumps(kcfg._nodes[node_id].to_dict()))
87 self.kcfg_json_path.write_text(json.dumps(dct))
88
[docs]
89 def read_cfg_data(self) -> dict[str, Any]:
90 dct = json.loads(self.kcfg_json_path.read_text())
91 nodes = [self.read_node_data(node_id) for node_id in dct.get('nodes') or []]
92 dct['nodes'] = nodes
93
94 new_nodes = []
95 for node in dct['nodes']:
96 attrs = []
97 if node['id'] in dct['vacuous']:
98 attrs.append(KCFGNodeAttr.VACUOUS.value)
99 if node['id'] in dct['stuck']:
100 attrs.append(KCFGNodeAttr.STUCK.value)
101 new_nodes.append({'id': node['id'], 'cterm': node['cterm'], 'attrs': attrs})
102
103 dct['nodes'] = new_nodes
104
105 del dct['vacuous']
106 del dct['stuck']
107
108 return dct
109
[docs]
110 def read_node_data(self, node_id: int) -> dict[str, Any]:
111 return json.loads(self.kcfg_node_path(node_id).read_text())
112
113
[docs]
114class KCFG(Container[Union['KCFG.Node', 'KCFG.Successor']]):
[docs]
115 @final
116 @dataclass(frozen=True, order=True)
117 class Node:
118 id: int
119 cterm: CTerm
120 attrs: frozenset[NodeAttr]
121
122 def __init__(self, id: int, cterm: CTerm, attrs: Iterable[NodeAttr] = ()) -> None:
123 object.__setattr__(self, 'id', id)
124 object.__setattr__(self, 'cterm', cterm)
125 object.__setattr__(self, 'attrs', frozenset(attrs))
126
[docs]
127 def to_dict(self) -> dict[str, Any]:
128 return {'id': self.id, 'cterm': self.cterm.to_dict(), 'attrs': [attr.value for attr in self.attrs]}
129
[docs]
130 @staticmethod
131 def from_dict(dct: dict[str, Any]) -> KCFG.Node:
132 return KCFG.Node(dct['id'], CTerm.from_dict(dct['cterm']), [NodeAttr(attr) for attr in dct['attrs']])
133
[docs]
134 def add_attr(self, attr: NodeAttr) -> KCFG.Node:
135 return KCFG.Node(self.id, self.cterm, list(self.attrs) + [attr])
136
[docs]
137 def remove_attr(self, attr: NodeAttr) -> KCFG.Node:
138 if attr not in self.attrs:
139 raise ValueError(f'Node {self.id} does not have attribute {attr.value}')
140 return KCFG.Node(self.id, self.cterm, self.attrs.difference([attr]))
141
[docs]
142 def discard_attr(self, attr: NodeAttr) -> KCFG.Node:
143 return KCFG.Node(self.id, self.cterm, self.attrs.difference([attr]))
144
[docs]
145 def let(self, cterm: CTerm | None = None, attrs: Iterable[KCFGNodeAttr] | None = None) -> KCFG.Node:
146 new_cterm = cterm if cterm is not None else self.cterm
147 new_attrs = attrs if attrs is not None else self.attrs
148 return KCFG.Node(self.id, new_cterm, new_attrs)
149
150 @property
151 def free_vars(self) -> frozenset[str]:
152 return frozenset(self.cterm.free_vars)
153
[docs]
154 class Successor(ABC):
155 source: KCFG.Node
156
157 def __lt__(self, other: Any) -> bool:
158 if not isinstance(other, KCFG.Successor):
159 return NotImplemented
160 return self.source < other.source
161
162 @property
163 def source_vars(self) -> frozenset[str]:
164 return frozenset(self.source.free_vars)
165
166 @property
167 @abstractmethod
168 def targets(self) -> tuple[KCFG.Node, ...]: ...
169
170 @property
171 def target_ids(self) -> list[int]:
172 return sorted(target.id for target in self.targets)
173
174 @property
175 def target_vars(self) -> frozenset[str]:
176 return frozenset(set.union(set(), *(target.free_vars for target in self.targets)))
177
[docs]
178 @abstractmethod
179 def replace_source(self, node: KCFG.Node) -> KCFG.Successor: ...
180
[docs]
181 @abstractmethod
182 def replace_target(self, node: KCFG.Node) -> KCFG.Successor: ...
183
[docs]
184 @abstractmethod
185 def to_dict(self) -> dict[str, Any]: ...
186
[docs]
187 @staticmethod
188 @abstractmethod
189 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Successor: ...
190
[docs]
191 class EdgeLike(Successor):
192 source: KCFG.Node
193 target: KCFG.Node
194
195 @property
196 def targets(self) -> tuple[KCFG.Node, ...]:
197 return (self.target,)
198
[docs]
199 @final
200 @dataclass(frozen=True)
201 class Edge(EdgeLike):
202 source: KCFG.Node
203 target: KCFG.Node
204 depth: int
205 rules: tuple[str, ...]
206
[docs]
207 def to_dict(self) -> dict[str, Any]:
208 return {
209 'source': self.source.id,
210 'target': self.target.id,
211 'depth': self.depth,
212 'rules': list(self.rules),
213 }
214
[docs]
215 @staticmethod
216 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Edge:
217 return KCFG.Edge(nodes[dct['source']], nodes[dct['target']], dct['depth'], tuple(dct['rules']))
218
[docs]
219 def to_rule(
220 self,
221 label: str,
222 claim: bool = False,
223 priority: int | None = None,
224 defunc_with: KDefinition | None = None,
225 minimize: bool = False,
226 ) -> KRuleLike:
227 def is_ceil_condition(kast: KInner) -> bool:
228 return type(kast) is KApply and kast.label.name == '#Ceil'
229
230 def _simplify_config(config: KInner) -> KInner:
231 return sort_ac_collections(inline_cell_maps(config))
232
233 sentence_id = f'{label}-{self.source.id}-TO-{self.target.id}'
234 init_constraints = [c for c in self.source.cterm.constraints if not is_ceil_condition(c)]
235 init_cterm = CTerm(_simplify_config(self.source.cterm.config), init_constraints)
236 target_constraints = [c for c in self.target.cterm.constraints if not is_ceil_condition(c)]
237 target_cterm = CTerm(_simplify_config(self.target.cterm.config), target_constraints)
238 rule: KRuleLike
239 if claim:
240 rule, _ = cterm_build_claim(sentence_id, init_cterm, target_cterm)
241 else:
242 rule, _ = cterm_build_rule(
243 sentence_id, init_cterm, target_cterm, priority=priority, defunc_with=defunc_with
244 )
245 if minimize:
246 rule = minimize_rule_like(rule)
247 return rule
248
[docs]
249 def replace_source(self, node: KCFG.Node) -> KCFG.Edge:
250 assert node.id == self.source.id
251 return KCFG.Edge(node, self.target, self.depth, self.rules)
252
[docs]
253 def replace_target(self, node: KCFG.Node) -> KCFG.Edge:
254 assert node.id == self.target.id
255 return KCFG.Edge(self.source, node, self.depth, self.rules)
256
[docs]
257 @final
258 @dataclass(frozen=True)
259 class MergedEdge(EdgeLike):
260 """Merged edge is a collection of edges that have been merged into a single edge."""
261
262 source: KCFG.Node
263 target: KCFG.Node
264 edges: tuple[KCFG.Edge, ...]
265
[docs]
266 def to_dict(self) -> dict[str, Any]:
267 return {
268 'source': self.source.id,
269 'target': self.target.id,
270 'edges': [edge.to_dict() for edge in self.edges],
271 }
272
[docs]
273 @staticmethod
274 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Successor:
275 return KCFG.MergedEdge(
276 nodes[dct['source']],
277 nodes[dct['target']],
278 tuple(KCFG.Edge.from_dict(edge, nodes) for edge in dct['edges']),
279 )
280
[docs]
281 def replace_source(self, node: KCFG.Node) -> KCFG.Successor:
282 assert node.id == self.source.id
283 return KCFG.MergedEdge(node, self.target, self.edges)
284
[docs]
285 def replace_target(self, node: KCFG.Node) -> KCFG.Successor:
286 assert node.id == self.target.id
287 return KCFG.MergedEdge(self.source, node, self.edges)
288
[docs]
289 def to_rule(self, label: str, claim: bool = False, priority: int | None = None) -> KRuleLike:
290 return KCFG.Edge(self.source, self.target, 1, ()).to_rule(label, claim, priority)
291
[docs]
292 @final
293 @dataclass(frozen=True)
294 class Cover(EdgeLike):
295 source: KCFG.Node
296 target: KCFG.Node
297 csubst: CSubst
298
[docs]
299 def to_dict(self) -> dict[str, Any]:
300 return {
301 'source': self.source.id,
302 'target': self.target.id,
303 'csubst': self.csubst.to_dict(),
304 }
305
[docs]
306 @staticmethod
307 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Cover:
308 return KCFG.Cover(nodes[dct['source']], nodes[dct['target']], CSubst.from_dict(dct['csubst']))
309
[docs]
310 def replace_source(self, node: KCFG.Node) -> KCFG.Cover:
311 assert node.id == self.source.id
312 return KCFG.Cover(node, self.target, self.csubst)
313
[docs]
314 def replace_target(self, node: KCFG.Node) -> KCFG.Cover:
315 assert node.id == self.target.id
316 return KCFG.Cover(self.source, node, self.csubst)
317
[docs]
318 @dataclass(frozen=True)
319 class MultiEdge(Successor):
320 source: KCFG.Node
321
322 def __lt__(self, other: Any) -> bool:
323 if not type(other) is type(self):
324 return NotImplemented
325 return (self.source, self.target_ids) < (other.source, other.target_ids)
326
[docs]
327 @abstractmethod
328 def with_single_target(self, target: KCFG.Node) -> KCFG.MultiEdge: ...
329
[docs]
330 @final
331 @dataclass(frozen=True)
332 class Split(MultiEdge):
333 source: KCFG.Node
334 _targets: tuple[tuple[KCFG.Node, CSubst], ...]
335
336 def __init__(self, source: KCFG.Node, _targets: Iterable[tuple[KCFG.Node, CSubst]]) -> None:
337 object.__setattr__(self, 'source', source)
338 object.__setattr__(self, '_targets', tuple(_targets))
339
340 @property
341 def targets(self) -> tuple[KCFG.Node, ...]:
342 return tuple(target for target, _ in self._targets)
343
344 @property
345 def splits(self) -> dict[int, CSubst]:
346 return {target.id: csubst for target, csubst in self._targets}
347
[docs]
348 def to_dict(self) -> dict[str, Any]:
349 return {
350 'source': self.source.id,
351 'targets': [
352 {
353 'target': target.id,
354 'csubst': csubst.to_dict(),
355 }
356 for target, csubst in self._targets
357 ],
358 }
359
[docs]
360 @staticmethod
361 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.Split:
362 _targets = [(nodes[target['target']], CSubst.from_dict(target['csubst'])) for target in dct['targets']]
363 return KCFG.Split(nodes[dct['source']], tuple(_targets))
364
[docs]
365 def with_single_target(self, target: KCFG.Node) -> KCFG.Split:
366 return KCFG.Split(self.source, ((target, self.splits[target.id]),))
367
368 @property
369 def covers(self) -> tuple[KCFG.Cover, ...]:
370 return tuple(KCFG.Cover(target, self.source, csubst) for target, csubst in self._targets)
371
[docs]
372 def replace_source(self, node: KCFG.Node) -> KCFG.Split:
373 assert node.id == self.source.id
374 return KCFG.Split(node, self._targets)
375
[docs]
376 def replace_target(self, node: KCFG.Node) -> KCFG.Split:
377 assert node.id in self.target_ids
378 new_targets = [
379 (node, csubst) if node.id == target_node.id else (target_node, csubst)
380 for target_node, csubst in self._targets
381 ]
382 return KCFG.Split(self.source, tuple(new_targets))
383
[docs]
384 @final
385 @dataclass(frozen=True)
386 class NDBranch(MultiEdge):
387 source: KCFG.Node
388 _targets: tuple[KCFG.Node, ...]
389 rules: tuple[str, ...]
390
391 def __init__(self, source: KCFG.Node, _targets: Iterable[KCFG.Node], rules: tuple[str, ...]) -> None:
392 object.__setattr__(self, 'source', source)
393 object.__setattr__(self, '_targets', tuple(_targets))
394 object.__setattr__(self, 'rules', rules)
395
396 @property
397 def targets(self) -> tuple[KCFG.Node, ...]:
398 return self._targets
399
[docs]
400 def to_dict(self) -> dict[str, Any]:
401 return {
402 'source': self.source.id,
403 'targets': [target.id for target in self.targets],
404 'rules': list(self.rules),
405 }
406
[docs]
407 @staticmethod
408 def from_dict(dct: dict[str, Any], nodes: Mapping[int, KCFG.Node]) -> KCFG.NDBranch:
409 return KCFG.NDBranch(
410 nodes[dct['source']], tuple([nodes[target] for target in dct['targets']]), tuple(dct['rules'])
411 )
412
[docs]
413 def with_single_target(self, target: KCFG.Node) -> KCFG.NDBranch:
414 return KCFG.NDBranch(self.source, (target,), ())
415
416 @property
417 def edges(self) -> tuple[KCFG.Edge, ...]:
418 return tuple(KCFG.Edge(self.source, target, 1, ()) for target in self.targets)
419
[docs]
420 def replace_source(self, node: KCFG.Node) -> KCFG.NDBranch:
421 assert node.id == self.source.id
422 return KCFG.NDBranch(node, self._targets, self.rules)
423
[docs]
424 def replace_target(self, node: KCFG.Node) -> KCFG.NDBranch:
425 assert node.id in self.target_ids
426 new_targets = [node if node.id == target_node.id else target_node for target_node in self._targets]
427 return KCFG.NDBranch(self.source, tuple(new_targets), self.rules)
428
429 _node_id: int
430 _nodes: MutableMapping[int, KCFG.Node]
431
432 _created_nodes: set[int]
433 _deleted_nodes: set[int]
434
435 _edges: dict[int, Edge]
436 _merged_edges: dict[int, MergedEdge]
437 _covers: dict[int, Cover]
438 _splits: dict[int, Split]
439 _ndbranches: dict[int, NDBranch]
440 _aliases: dict[str, int]
441 _lock: RLock
442
443 _kcfg_store: KCFGStore | None
444
445 def __init__(self, cfg_dir: Path | None = None, optimize_memory: bool = True) -> None:
446 self._node_id = 1
447 if optimize_memory:
448 from .store import OptimizedNodeStore
449
450 self._nodes = OptimizedNodeStore()
451 else:
452 self._nodes = {}
453 self._created_nodes = set()
454 self._deleted_nodes = set()
455 self._edges = {}
456 self._merged_edges = {}
457 self._covers = {}
458 self._splits = {}
459 self._ndbranches = {}
460 self._aliases = {}
461 self._lock = RLock()
462 if cfg_dir is not None:
463 self._kcfg_store = KCFGStore(cfg_dir)
464
465 def __contains__(self, item: object) -> bool:
466 if type(item) is KCFG.Node:
467 return self.contains_node(item)
468 if type(item) is KCFG.Edge:
469 return self.contains_edge(item)
470 if type(item) is KCFG.MergedEdge:
471 return self.contains_merged_edge(item)
472 if type(item) is KCFG.Cover:
473 return self.contains_cover(item)
474 if type(item) is KCFG.Split:
475 return self.contains_split(item)
476 if type(item) is KCFG.NDBranch:
477 return self.contains_ndbranch(item)
478 return False
479
480 def __enter__(self) -> KCFG:
481 self._lock.acquire()
482 return self
483
484 def __exit__(
485 self,
486 exc_type: type[BaseException] | None,
487 exc_value: BaseException | None,
488 traceback: TracebackType | None,
489 ) -> bool:
490 self._lock.release()
491 if exc_type is None:
492 return True
493 return False
494
495 @property
496 def nodes(self) -> list[KCFG.Node]:
497 return list(self._nodes.values())
498
499 @property
500 def root(self) -> list[KCFG.Node]:
501 return [node for node in self.nodes if self.is_root(node.id)]
502
503 @property
504 def vacuous(self) -> list[KCFG.Node]:
505 return [node for node in self.nodes if self.is_vacuous(node.id)]
506
507 @property
508 def stuck(self) -> list[KCFG.Node]:
509 return [node for node in self.nodes if self.is_stuck(node.id)]
510
511 @property
512 def leaves(self) -> list[KCFG.Node]:
513 return [node for node in self.nodes if self.is_leaf(node.id)]
514
515 @property
516 def covered(self) -> list[KCFG.Node]:
517 return [node for node in self.nodes if self.is_covered(node.id)]
518
519 @property
520 def uncovered(self) -> list[KCFG.Node]:
521 return [node for node in self.nodes if not self.is_covered(node.id)]
522
[docs]
523 @staticmethod
524 def from_claim(
525 defn: KDefinition, claim: KClaim, cfg_dir: Path | None = None, optimize_memory: bool = True
526 ) -> tuple[KCFG, NodeIdLike, NodeIdLike]:
527 cfg = KCFG(cfg_dir=cfg_dir, optimize_memory=optimize_memory)
528 claim_body = claim.body
529 claim_body = defn.instantiate_cell_vars(claim_body)
530 claim_body = rename_generated_vars(claim_body)
531
532 claim_lhs = CTerm.from_kast(extract_lhs(claim_body)).add_constraint(bool_to_ml_pred(claim.requires))
533 init_node = cfg.create_node(claim_lhs)
534
535 claim_rhs = CTerm.from_kast(extract_rhs(claim_body)).add_constraint(
536 bool_to_ml_pred(andBool([claim.requires, claim.ensures]))
537 )
538 target_node = cfg.create_node(claim_rhs)
539
540 return cfg, init_node.id, target_node.id
541
[docs]
542 @staticmethod
543 def path_length(_path: Iterable[KCFG.Successor]) -> int:
544 _path = tuple(_path)
545 if len(_path) == 0:
546 return 0
547 if type(_path[0]) is KCFG.Split or type(_path[0]) is KCFG.Cover:
548 return KCFG.path_length(_path[1:])
549 elif type(_path[0]) is KCFG.NDBranch:
550 return 1 + KCFG.path_length(_path[1:])
551 elif type(_path[0]) is KCFG.Edge:
552 return _path[0].depth + KCFG.path_length(_path[1:])
553 elif type(_path[0]) is KCFG.MergedEdge:
554 return min(edge.depth for edge in _path[0].edges) + KCFG.path_length(_path[1:]) # todo: check this
555 raise ValueError(f'Cannot handle Successor type: {type(_path[0])}')
556
[docs]
557 def extend(
558 self,
559 extend_result: KCFGExtendResult,
560 node: KCFG.Node,
561 logs: dict[int, tuple[LogEntry, ...]],
562 *,
563 optimize_kcfg: bool,
564 ) -> None:
565
566 def log(message: str, *, warning: bool = False) -> None:
567 result_info = extend_result.info if type(extend_result) is Step or type(extend_result) is Branch else ''
568 result_info_message = f': {result_info}' if result_info else ''
569 _LOGGER.log(
570 logging.WARNING if warning else logging.INFO,
571 f'Extending KCFG with: {message}{result_info_message}',
572 )
573
574 match extend_result:
575 case Vacuous():
576 self.add_vacuous(node.id)
577 log(f'vacuous node: {node.id}', warning=True)
578
579 case Stuck():
580 self.add_stuck(node.id)
581 log(f'stuck node: {node.id}')
582
583 case Abstract(cterm):
584 new_node = self.create_node(cterm)
585 self.create_cover(node.id, new_node.id)
586 log(f'abstraction node: {node.id}')
587
588 case Step(cterm, depth, next_node_logs, rule_labels, _):
589 node_id = node.id
590 next_node = self.create_node(cterm)
591 # Optimization for steps consists of on-the-fly merging of consecutive edges and can
592 # be performed only if the current node has a single predecessor connected by an Edge
593 if (
594 optimize_kcfg
595 and (len(predecessors := self.predecessors(target_id=node.id)) == 1)
596 and isinstance(in_edge := predecessors[0], KCFG.Edge)
597 ):
598 # The existing edge is removed and the step parameters are updated accordingly
599 self.remove_edge(in_edge.source.id, node.id)
600 node_id = in_edge.source.id
601 depth += in_edge.depth
602 rule_labels = list(in_edge.rules) + rule_labels
603 next_node_logs = logs[node.id] + next_node_logs if node.id in logs else next_node_logs
604 self.remove_node(node.id)
605 self.create_edge(node_id, next_node.id, depth, rule_labels)
606 logs[next_node.id] = next_node_logs
607 log(f'basic block at depth {depth}: {node_id} --> {next_node.id}')
608
609 case Branch(branches, _):
610 branch_node_ids = self.split_on_constraints(node.id, branches)
611 log(f'{len(branches)} branches: {node.id} --> {branch_node_ids}')
612
613 case NDBranch(cterms, next_node_logs, rule_labels):
614 next_ids = [self.create_node(cterm).id for cterm in cterms]
615 for i in next_ids:
616 logs[i] = next_node_logs
617 self.create_ndbranch(node.id, next_ids, rules=rule_labels)
618 log(f'{len(cterms)} non-deterministic branches: {node.id} --> {next_ids}')
619
620 case _:
621 raise AssertionError()
622
[docs]
623 def to_dict_no_nodes(self) -> dict[str, Any]:
624 nodes = list(self._nodes.keys())
625 edges = [edge.to_dict() for edge in self.edges()]
626 merged_edges = [merged_edge.to_dict() for merged_edge in self.merged_edges()]
627 covers = [cover.to_dict() for cover in self.covers()]
628 splits = [split.to_dict() for split in self.splits()]
629 ndbranches = [ndbranch.to_dict() for ndbranch in self.ndbranches()]
630
631 aliases = dict(sorted(self._aliases.items()))
632
633 res = {
634 'next': self._node_id,
635 'nodes': nodes,
636 'edges': edges,
637 'merged_edges': merged_edges,
638 'covers': covers,
639 'splits': splits,
640 'ndbranches': ndbranches,
641 'aliases': aliases,
642 }
643 return {k: v for k, v in res.items() if v}
644
[docs]
645 def to_dict(self) -> dict[str, Any]:
646 nodes = [node.to_dict() for node in self.nodes]
647 edges = [edge.to_dict() for edge in self.edges()]
648 merged_edges = [merged_edge.to_dict() for merged_edge in self.merged_edges()]
649 covers = [cover.to_dict() for cover in self.covers()]
650 splits = [split.to_dict() for split in self.splits()]
651 ndbranches = [ndbranch.to_dict() for ndbranch in self.ndbranches()]
652
653 aliases = dict(sorted(self._aliases.items()))
654
655 res = {
656 'next': self._node_id,
657 'nodes': nodes,
658 'edges': edges,
659 'merged_edges': merged_edges,
660 'covers': covers,
661 'splits': splits,
662 'ndbranches': ndbranches,
663 'aliases': aliases,
664 }
665 return {k: v for k, v in res.items() if v}
666
[docs]
667 @staticmethod
668 def from_dict(dct: Mapping[str, Any], optimize_memory: bool = True) -> KCFG:
669 cfg = KCFG(optimize_memory=optimize_memory)
670
671 for node_dict in dct.get('nodes') or []:
672 node = KCFG.Node.from_dict(node_dict)
673 cfg.add_node(node)
674
675 max_id = max([node.id for node in cfg.nodes], default=0)
676 cfg._node_id = dct.get('next', max_id + 1)
677
678 for edge_dict in dct.get('edges') or []:
679 edge = KCFG.Edge.from_dict(edge_dict, cfg._nodes)
680 cfg.add_successor(edge)
681
682 for edge_dict in dct.get('merged_edges') or []:
683 merged_edge = KCFG.MergedEdge.from_dict(edge_dict, cfg._nodes)
684 cfg.add_successor(merged_edge)
685
686 for cover_dict in dct.get('covers') or []:
687 cover = KCFG.Cover.from_dict(cover_dict, cfg._nodes)
688 cfg.add_successor(cover)
689
690 for split_dict in dct.get('splits') or []:
691 split = KCFG.Split.from_dict(split_dict, cfg._nodes)
692 cfg.add_successor(split)
693
694 for ndbranch_dict in dct.get('ndbranches') or []:
695 ndbranch = KCFG.NDBranch.from_dict(ndbranch_dict, cfg._nodes)
696 cfg.add_successor(ndbranch)
697
698 for alias, node_id in dct.get('aliases', {}).items():
699 cfg.add_alias(alias=alias, node_id=node_id)
700
701 return cfg
702
[docs]
703 def aliases(self, node_id: NodeIdLike) -> list[str]:
704 node_id = self._resolve(node_id)
705 return [alias for alias, value in self._aliases.items() if node_id == value]
706
[docs]
707 def to_json(self) -> str:
708 return json.dumps(self.to_dict(), sort_keys=True)
709
[docs]
710 @staticmethod
711 def from_json(s: str, optimize_memory: bool = True) -> KCFG:
712 return KCFG.from_dict(json.loads(s), optimize_memory=optimize_memory)
713
[docs]
714 def to_rules(
715 self,
716 _id: str | None = None,
717 priority: int = 20,
718 defunc_with: KDefinition | None = None,
719 ) -> list[KRuleLike]:
720 _id = 'BASIC-BLOCK' if _id is None else _id
721 return [e.to_rule(_id, priority=priority, defunc_with=defunc_with) for e in self.edges()] + [
722 m.to_rule(_id, priority=priority) for m in self.merged_edges()
723 ]
724
[docs]
725 def to_module(
726 self,
727 module_name: str | None = None,
728 imports: Iterable[KImport] = (),
729 priority: int = 20,
730 att: KAtt = EMPTY_ATT,
731 defunc_with: KDefinition | None = None,
732 ) -> KFlatModule:
733 module_name = 'KCFG' if module_name is None else module_name
734 return KFlatModule(
735 module_name, self.to_rules(priority=priority, defunc_with=defunc_with), imports=imports, att=att
736 )
737
738 def _resolve_or_none(self, id_like: NodeIdLike) -> int | None:
739 if type(id_like) is int:
740 if id_like in self._nodes:
741 return id_like
742
743 return None
744
745 if type(id_like) is not str:
746 raise TypeError(f'Expected int or str for id_like, got: {id_like}')
747
748 if id_like.startswith('@'):
749 if id_like[1:] in self._aliases:
750 return self._aliases[id_like[1:]]
751 raise ValueError(f'Unknown alias: {id_like}')
752
753 return None
754
755 def _resolve(self, id_like: NodeIdLike) -> int:
756 match = self._resolve_or_none(id_like)
757 if not match:
758 raise ValueError(f'Unknown node: {id_like}')
759 return match
760
[docs]
761 def node(self, node_id: NodeIdLike) -> KCFG.Node:
762 node_id = self._resolve(node_id)
763 return self._nodes[node_id]
764
[docs]
765 def get_node(self, node_id: NodeIdLike) -> KCFG.Node | None:
766 resolved_id = self._resolve_or_none(node_id)
767 if resolved_id is None:
768 return None
769 return self._nodes[resolved_id]
770
[docs]
771 def contains_node(self, node: KCFG.Node) -> bool:
772 return bool(self.get_node(node.id))
773
[docs]
774 def add_node(self, node: KCFG.Node) -> None:
775 if node.id in self._nodes:
776 raise ValueError(f'Node with id already exists: {node.id}')
777 self._nodes[node.id] = node
778 self._created_nodes.add(node.id)
779
[docs]
780 def create_node(self, cterm: CTerm) -> KCFG.Node:
781 node = KCFG.Node(self._node_id, cterm)
782 self._node_id += 1
783 self._nodes[node.id] = node
784 self._created_nodes.add(node.id)
785 return node
786
[docs]
787 def remove_node(self, node_id: NodeIdLike) -> None:
788 self.remove_edges_around(node_id)
789
790 node_id = self._resolve(node_id)
791 self._nodes.pop(node_id)
792 self._deleted_nodes.add(node_id)
793 self._created_nodes.discard(node_id)
794
[docs]
795 def remove_edges_around(self, node_id: NodeIdLike) -> None:
796 node_id = self._resolve(node_id)
797
798 self._edges = {k: s for k, s in self._edges.items() if k != node_id and node_id not in s.target_ids}
799 self._merged_edges = {
800 k: s for k, s in self._merged_edges.items() if k != node_id and node_id not in s.target_ids
801 }
802 self._covers = {k: s for k, s in self._covers.items() if k != node_id and node_id not in s.target_ids}
803
804 self._splits = {k: s for k, s in self._splits.items() if k != node_id and node_id not in s.target_ids}
805 self._ndbranches = {k: b for k, b in self._ndbranches.items() if k != node_id and node_id not in b.target_ids}
806
807 for alias in [alias for alias, id in self._aliases.items() if id == node_id]:
808 self.remove_alias(alias)
809
810 def _update_refs(self, node_id: int) -> None:
811 node = self.node(node_id)
812 for succ in self.successors(node_id):
813 new_succ = succ.replace_source(node)
814 if type(new_succ) is KCFG.Edge:
815 self._edges[new_succ.source.id] = new_succ
816 if type(new_succ) is KCFG.MergedEdge:
817 self._merged_edges[new_succ.source.id] = new_succ
818 if type(new_succ) is KCFG.Cover:
819 self._covers[new_succ.source.id] = new_succ
820 if type(new_succ) is KCFG.Split:
821 self._splits[new_succ.source.id] = new_succ
822 if type(new_succ) is KCFG.NDBranch:
823 self._ndbranches[new_succ.source.id] = new_succ
824
825 for pred in self.predecessors(node_id):
826 new_pred = pred.replace_target(node)
827 if type(new_pred) is KCFG.Edge:
828 self._edges[new_pred.source.id] = new_pred
829 if type(new_pred) is KCFG.MergedEdge:
830 self._merged_edges[new_pred.source.id] = new_pred
831 if type(new_pred) is KCFG.Cover:
832 self._covers[new_pred.source.id] = new_pred
833 if type(new_pred) is KCFG.Split:
834 self._splits[new_pred.source.id] = new_pred
835 if type(new_pred) is KCFG.NDBranch:
836 self._ndbranches[new_pred.source.id] = new_pred
837
[docs]
838 def remove_attr(self, node_id: NodeIdLike, attr: NodeAttr) -> None:
839 node = self.node(node_id)
840 new_node = node.remove_attr(attr)
841 self.replace_node(new_node)
842
[docs]
843 def discard_attr(self, node_id: NodeIdLike, attr: NodeAttr) -> None:
844 node = self.node(node_id)
845 new_node = node.discard_attr(attr)
846 self.replace_node(new_node)
847
[docs]
848 def add_attr(self, node_id: NodeIdLike, attr: NodeAttr) -> None:
849 node = self.node(node_id)
850 new_node = node.add_attr(attr)
851 self.replace_node(new_node)
852
[docs]
853 def let_node(
854 self, node_id: NodeIdLike, cterm: CTerm | None = None, attrs: Iterable[KCFGNodeAttr] | None = None
855 ) -> None:
856 node = self.node(node_id)
857 new_node = node.let(cterm=cterm, attrs=attrs)
858 self.replace_node(new_node)
859
[docs]
860 def replace_node(self, node: KCFG.Node) -> None:
861 self._nodes[node.id] = node
862 self._created_nodes.add(node.id)
863 self._update_refs(node.id)
864
[docs]
865 def successors(self, source_id: NodeIdLike) -> list[Successor]:
866 out_edges: Iterable[KCFG.Successor] = self.edges(source_id=source_id)
867 out_merged_edges: Iterable[KCFG.Successor] = self.merged_edges(source_id=source_id)
868 out_covers: Iterable[KCFG.Successor] = self.covers(source_id=source_id)
869 out_splits: Iterable[KCFG.Successor] = self.splits(source_id=source_id)
870 out_ndbranches: Iterable[KCFG.Successor] = self.ndbranches(source_id=source_id)
871 return list(out_edges) + list(out_merged_edges) + list(out_covers) + list(out_splits) + list(out_ndbranches)
872
[docs]
873 def predecessors(self, target_id: NodeIdLike) -> list[Successor]:
874 in_edges: Iterable[KCFG.Successor] = self.edges(target_id=target_id)
875 in_merged_edges: Iterable[KCFG.Successor] = self.merged_edges(target_id=target_id)
876 in_covers: Iterable[KCFG.Successor] = self.covers(target_id=target_id)
877 in_splits: Iterable[KCFG.Successor] = self.splits(target_id=target_id)
878 in_ndbranches: Iterable[KCFG.Successor] = self.ndbranches(target_id=target_id)
879 return list(in_edges) + list(in_merged_edges) + list(in_covers) + list(in_splits) + list(in_ndbranches)
880
881 def _check_no_successors(self, source_id: NodeIdLike) -> None:
882 if len(self.successors(source_id)) > 0:
883 raise ValueError(f'Node already has successors: {source_id} -> {self.successors(source_id)}')
884
885 def _check_no_zero_loops(self, source_id: NodeIdLike, target_ids: Iterable[NodeIdLike]) -> None:
886 for target_id in target_ids:
887 path = self.shortest_path_between(target_id, source_id)
888 if path is not None and KCFG.path_length(path) == 0:
889 raise ValueError(
890 f'Adding successor would create zero-length loop with backedge: {source_id} -> {target_id}'
891 )
892
[docs]
893 def add_successor(self, succ: KCFG.Successor) -> None:
894 self._check_no_successors(succ.source.id)
895 self._check_no_zero_loops(succ.source.id, succ.target_ids)
896 if type(succ) is KCFG.Edge:
897 self._edges[succ.source.id] = succ
898 elif type(succ) is KCFG.MergedEdge:
899 self._merged_edges[succ.source.id] = succ
900 elif type(succ) is KCFG.Cover:
901 self._covers[succ.source.id] = succ
902 else:
903 if len(succ.target_ids) <= 1:
904 raise ValueError(
905 f'Cannot create {type(succ)} node with less than 2 targets: {succ.source.id} -> {succ.target_ids}'
906 )
907 if type(succ) is KCFG.Split:
908 self._splits[succ.source.id] = succ
909 elif type(succ) is KCFG.NDBranch:
910 self._ndbranches[succ.source.id] = succ
911
[docs]
912 def edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> Edge | None:
913 source_id = self._resolve(source_id)
914 target_id = self._resolve(target_id)
915 edge = self._edges.get(source_id, None)
916 return edge if edge is not None and edge.target.id == target_id else None
917
[docs]
918 def edges(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[Edge]:
919 source_id = self._resolve(source_id) if source_id is not None else None
920 target_id = self._resolve(target_id) if target_id is not None else None
921 return [
922 edge
923 for edge in self._edges.values()
924 if (source_id is None or source_id == edge.source.id) and (target_id is None or target_id == edge.target.id)
925 ]
926
[docs]
927 def contains_edge(self, edge: Edge) -> bool:
928 if other := self.edge(source_id=edge.source.id, target_id=edge.target.id):
929 return edge == other
930 return False
931
[docs]
932 def create_edge(self, source_id: NodeIdLike, target_id: NodeIdLike, depth: int, rules: Iterable[str] = ()) -> Edge:
933 if depth <= 0:
934 raise ValueError(f'Cannot build KCFG Edge with non-positive depth: {depth}')
935 source = self.node(source_id)
936 target = self.node(target_id)
937 edge = KCFG.Edge(source, target, depth, tuple(rules))
938 self.add_successor(edge)
939 return edge
940
[docs]
941 def remove_edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> None:
942 source_id = self._resolve(source_id)
943 target_id = self._resolve(target_id)
944 edge = self.edge(source_id, target_id)
945 if not edge:
946 raise ValueError(f'Edge does not exist: {source_id} -> {target_id}')
947 self._edges.pop(source_id)
948
[docs]
949 def merged_edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> MergedEdge | None:
950 source_id = self._resolve(source_id)
951 target_id = self._resolve(target_id)
952 merged_edge = self._merged_edges.get(source_id, None)
953 return merged_edge if merged_edge is not None and merged_edge.target.id == target_id else None
954
[docs]
955 def merged_edges(
956 self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None
957 ) -> list[MergedEdge]:
958 source_id = self._resolve(source_id) if source_id is not None else None
959 target_id = self._resolve(target_id) if target_id is not None else None
960 return [
961 merged_edge
962 for merged_edge in self._merged_edges.values()
963 if (source_id is None or source_id == merged_edge.source.id)
964 and (target_id is None or target_id == merged_edge.target.id)
965 ]
966
[docs]
967 def contains_merged_edge(self, edge: MergedEdge) -> bool:
968 if other := self.merged_edge(source_id=edge.source.id, target_id=edge.target.id):
969 return edge == other
970 return False
971
[docs]
972 def create_merged_edge(
973 self, source_id: NodeIdLike, target_id: NodeIdLike, edges: Iterable[Edge | MergedEdge]
974 ) -> MergedEdge:
975 if len(list(edges)) == 0:
976 raise ValueError(f'Cannot build KCFG MergedEdge with no edges: {edges}')
977 source = self.node(source_id)
978 target = self.node(target_id)
979 flatten_edges: list[KCFG.Edge] = []
980 for edge in edges:
981 if isinstance(edge, KCFG.MergedEdge):
982 flatten_edges.extend(edge.edges)
983 else:
984 flatten_edges.append(edge)
985 merged_edge = KCFG.MergedEdge(source, target, tuple(flatten_edges))
986 self.add_successor(merged_edge)
987 return merged_edge
988
[docs]
989 def remove_merged_edge(self, source_id: NodeIdLike, target_id: NodeIdLike) -> None:
990 source_id = self._resolve(source_id)
991 target_id = self._resolve(target_id)
992 merged_edge = self.merged_edge(source_id, target_id)
993 if not merged_edge:
994 raise ValueError(f'MergedEdge does not exist: {source_id} -> {target_id}')
995 self._merged_edges.pop(source_id)
996
[docs]
997 def general_edges(
998 self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None
999 ) -> list[Edge | MergedEdge]:
1000 return self.edges(source_id=source_id, target_id=target_id) + self.merged_edges(
1001 source_id=source_id, target_id=target_id
1002 )
1003
[docs]
1004 def cover(self, source_id: NodeIdLike, target_id: NodeIdLike) -> Cover | None:
1005 source_id = self._resolve(source_id)
1006 target_id = self._resolve(target_id)
1007 cover = self._covers.get(source_id, None)
1008 return cover if cover is not None and cover.target.id == target_id else None
1009
[docs]
1010 def covers(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[Cover]:
1011 source_id = self._resolve(source_id) if source_id is not None else None
1012 target_id = self._resolve(target_id) if target_id is not None else None
1013 return [
1014 cover
1015 for cover in self._covers.values()
1016 if (source_id is None or source_id == cover.source.id)
1017 and (target_id is None or target_id == cover.target.id)
1018 ]
1019
[docs]
1020 def contains_cover(self, cover: Cover) -> bool:
1021 if other := self.cover(source_id=cover.source.id, target_id=cover.target.id):
1022 return cover == other
1023 return False
1024
[docs]
1025 def create_cover(self, source_id: NodeIdLike, target_id: NodeIdLike, csubst: CSubst | None = None) -> Cover:
1026 source = self.node(source_id)
1027 target = self.node(target_id)
1028 if csubst is None:
1029 csubst = target.cterm.match_with_constraint(source.cterm)
1030 if csubst is None:
1031 raise ValueError(f'No matching between: {source.id} and {target.id}')
1032 cover = KCFG.Cover(source, target, csubst=csubst)
1033 self.add_successor(cover)
1034 return cover
1035
[docs]
1036 def remove_cover(self, source_id: NodeIdLike, target_id: NodeIdLike) -> None:
1037 source_id = self._resolve(source_id)
1038 target_id = self._resolve(target_id)
1039 cover = self.cover(source_id, target_id)
1040 if not cover:
1041 raise ValueError(f'Cover does not exist: {source_id} -> {target_id}')
1042 self._covers.pop(source_id)
1043
[docs]
1044 def edge_likes(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[EdgeLike]:
1045 return (
1046 cast('List[KCFG.EdgeLike]', self.edges(source_id=source_id, target_id=target_id))
1047 + cast('List[KCFG.EdgeLike]', self.covers(source_id=source_id, target_id=target_id))
1048 + cast('List[KCFG.EdgeLike]', self.merged_edges(source_id=source_id, target_id=target_id))
1049 )
1050
[docs]
1051 def add_vacuous(self, node_id: NodeIdLike) -> None:
1052 self.add_attr(node_id, KCFGNodeAttr.VACUOUS)
1053
[docs]
1054 def remove_vacuous(self, node_id: NodeIdLike) -> None:
1055 self.remove_attr(node_id, KCFGNodeAttr.VACUOUS)
1056
[docs]
1057 def discard_vacuous(self, node_id: NodeIdLike) -> None:
1058 self.discard_attr(node_id, KCFGNodeAttr.VACUOUS)
1059
[docs]
1060 def add_stuck(self, node_id: NodeIdLike) -> None:
1061 self.add_attr(node_id, KCFGNodeAttr.STUCK)
1062
[docs]
1063 def remove_stuck(self, node_id: NodeIdLike) -> None:
1064 self.remove_attr(node_id, KCFGNodeAttr.STUCK)
1065
[docs]
1066 def discard_stuck(self, node_id: NodeIdLike) -> None:
1067 self.discard_attr(node_id, KCFGNodeAttr.STUCK)
1068
[docs]
1069 def splits(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[Split]:
1070 source_id = self._resolve(source_id) if source_id is not None else None
1071 target_id = self._resolve(target_id) if target_id is not None else None
1072 return [
1073 s
1074 for s in self._splits.values()
1075 if (source_id is None or source_id == s.source.id) and (target_id is None or target_id in s.target_ids)
1076 ]
1077
[docs]
1078 def contains_split(self, split: Split) -> bool:
1079 return split in self._splits.values()
1080
[docs]
1081 def create_split(self, source_id: NodeIdLike, splits: Iterable[tuple[NodeIdLike, CSubst]]) -> KCFG.Split:
1082 source_id = self._resolve(source_id)
1083 split = KCFG.Split(self.node(source_id), tuple((self.node(nid), csubst) for nid, csubst in list(splits)))
1084 self.add_successor(split)
1085 return split
1086
[docs]
1087 def create_split_by_nodes(self, source_id: NodeIdLike, target_ids: Iterable[NodeIdLike]) -> KCFG.Split | None:
1088 """Create a split without crafting a CSubst."""
1089 source = self.node(source_id)
1090 targets = [self.node(nid) for nid in target_ids]
1091 try:
1092 csubsts = [not_none(source.cterm.match_with_constraint(target.cterm)) for target in targets]
1093 except ValueError:
1094 return None
1095 return self.create_split(source.id, zip(target_ids, csubsts, strict=True))
1096
[docs]
1097 def ndbranches(self, *, source_id: NodeIdLike | None = None, target_id: NodeIdLike | None = None) -> list[NDBranch]:
1098 source_id = self._resolve(source_id) if source_id is not None else None
1099 target_id = self._resolve(target_id) if target_id is not None else None
1100 return [
1101 b
1102 for b in self._ndbranches.values()
1103 if (source_id is None or source_id == b.source.id) and (target_id is None or target_id in b.target_ids)
1104 ]
1105
[docs]
1106 def contains_ndbranch(self, ndbranch: NDBranch) -> bool:
1107 return ndbranch in self._ndbranches
1108
[docs]
1109 def create_ndbranch(
1110 self, source_id: NodeIdLike, ndbranches: Iterable[NodeIdLike], rules: Iterable[str] = ()
1111 ) -> KCFG.NDBranch:
1112 source_id = self._resolve(source_id)
1113 ndbranch = KCFG.NDBranch(self.node(source_id), tuple(self.node(nid) for nid in list(ndbranches)), tuple(rules))
1114 self.add_successor(ndbranch)
1115 return ndbranch
1116
[docs]
1117 def split_on_constraints(self, source_id: NodeIdLike, constraints: Iterable[KInner]) -> list[int]:
1118 source = self.node(source_id)
1119 branch_node_ids = [self.create_node(source.cterm.add_constraint(c)).id for c in constraints]
1120 csubsts = [not_none(source.cterm.match_with_constraint(self.node(id).cterm)) for id in branch_node_ids]
1121 self.create_split(source.id, zip(branch_node_ids, csubsts, strict=True))
1122 return branch_node_ids
1123
[docs]
1124 def add_alias(self, alias: str, node_id: NodeIdLike) -> None:
1125 if '@' in alias:
1126 raise ValueError('Alias may not contain "@"')
1127 if alias in self._aliases:
1128 raise ValueError(f'Duplicate alias: {alias}')
1129 node_id = self._resolve(node_id)
1130 self._aliases[alias] = node_id
1131
[docs]
1132 def remove_alias(self, alias: str) -> None:
1133 if alias not in self._aliases:
1134 raise ValueError(f'Alias does not exist: {alias}')
1135 self._aliases.pop(alias)
1136
[docs]
1137 def is_root(self, node_id: NodeIdLike) -> bool:
1138 node_id = self._resolve(node_id)
1139 return len(self.predecessors(node_id)) == 0
1140
[docs]
1141 def is_vacuous(self, node_id: NodeIdLike) -> bool:
1142 return KCFGNodeAttr.VACUOUS in self.node(node_id).attrs
1143
[docs]
1144 def is_stuck(self, node_id: NodeIdLike) -> bool:
1145 return KCFGNodeAttr.STUCK in self.node(node_id).attrs
1146
[docs]
1147 def is_split(self, node_id: NodeIdLike) -> bool:
1148 node_id = self._resolve(node_id)
1149 return node_id in self._splits
1150
[docs]
1151 def is_ndbranch(self, node_id: NodeIdLike) -> bool:
1152 node_id = self._resolve(node_id)
1153 return node_id in self._ndbranches
1154
[docs]
1155 def is_leaf(self, node_id: NodeIdLike) -> bool:
1156 return len(self.successors(node_id)) == 0
1157
[docs]
1158 def is_covered(self, node_id: NodeIdLike) -> bool:
1159 node_id = self._resolve(node_id)
1160 return node_id in self._covers
1161
[docs]
1162 def prune(self, node_id: NodeIdLike, keep_nodes: Iterable[NodeIdLike] = ()) -> list[int]:
1163 nodes = self.reachable_nodes(node_id)
1164 keep_nodes = [self._resolve(nid) for nid in keep_nodes]
1165 pruned_nodes: list[int] = []
1166 for node in nodes:
1167 if node.id not in keep_nodes:
1168 self.remove_node(node.id)
1169 pruned_nodes.append(node.id)
1170 return pruned_nodes
1171
[docs]
1172 def shortest_path_between(
1173 self, source_node_id: NodeIdLike, target_node_id: NodeIdLike
1174 ) -> tuple[Successor, ...] | None:
1175 paths = self.paths_between(source_node_id, target_node_id)
1176 if len(paths) == 0:
1177 return None
1178 return sorted(paths, key=(lambda path: KCFG.path_length(path)))[0]
1179
[docs]
1180 def shortest_distance_between(self, node_1_id: NodeIdLike, node_2_id: NodeIdLike) -> int | None:
1181 path_1 = self.shortest_path_between(node_1_id, node_2_id)
1182 path_2 = self.shortest_path_between(node_2_id, node_1_id)
1183 distance: int | None = None
1184 if path_1 is not None:
1185 distance = KCFG.path_length(path_1)
1186 if path_2 is not None:
1187 distance_2 = KCFG.path_length(path_2)
1188 if distance is None or distance_2 < distance:
1189 distance = distance_2
1190 return distance
1191
[docs]
1192 def zero_depth_between(self, node_1_id: NodeIdLike, node_2_id: NodeIdLike) -> bool:
1193 _node_1_id = self._resolve(node_1_id)
1194 _node_2_id = self._resolve(node_2_id)
1195 if _node_1_id == _node_2_id:
1196 return True
1197 # Short-circuit and don't run pathing algorithm if there is no 0 length path on the first step.
1198 path_lengths = [
1199 self.path_length([successor]) for successor in self.successors(_node_1_id) + self.successors(_node_2_id)
1200 ]
1201 if 0 not in path_lengths:
1202 return False
1203
1204 shortest_distance = self.shortest_distance_between(_node_1_id, _node_2_id)
1205
1206 return shortest_distance is not None and shortest_distance == 0
1207
[docs]
1208 def paths_between(self, source_id: NodeIdLike, target_id: NodeIdLike) -> list[tuple[Successor, ...]]:
1209 source_id = self._resolve(source_id)
1210 target_id = self._resolve(target_id)
1211
1212 if source_id == target_id:
1213 return [()]
1214
1215 source_successors = list(self.successors(source_id))
1216 assert len(source_successors) <= 1
1217 if len(source_successors) == 0:
1218 return []
1219
1220 paths: list[tuple[KCFG.Successor, ...]] = []
1221 worklist: list[list[KCFG.Successor]] = [[source_successors[0]]]
1222
1223 def _in_path(_nid: int, _path: list[KCFG.Successor]) -> bool:
1224 for succ in _path:
1225 if _nid == succ.source.id:
1226 return True
1227 if len(_path) > 0:
1228 if isinstance(_path[-1], KCFG.EdgeLike) and _path[-1].target.id == _nid:
1229 return True
1230 elif isinstance(_path[-1], KCFG.MultiEdge) and _nid in _path[-1].target_ids:
1231 return True
1232 return False
1233
1234 while worklist:
1235 curr_path = worklist.pop()
1236 curr_successor = curr_path[-1]
1237 successors: list[KCFG.Successor] = []
1238
1239 if isinstance(curr_successor, KCFG.EdgeLike):
1240 if curr_successor.target.id == target_id:
1241 paths.append(tuple(curr_path))
1242 continue
1243 else:
1244 successors = list(self.successors(curr_successor.target.id))
1245
1246 elif isinstance(curr_successor, KCFG.MultiEdge):
1247 if len(list(curr_successor.targets)) == 1:
1248 target = list(curr_successor.targets)[0]
1249 if target.id == target_id:
1250 paths.append(tuple(curr_path))
1251 continue
1252 else:
1253 successors = list(self.successors(target.id))
1254 if len(list(curr_successor.targets)) > 1:
1255 curr_path = curr_path[0:-1]
1256 successors = [curr_successor.with_single_target(target) for target in curr_successor.targets]
1257
1258 for successor in successors:
1259 if isinstance(successor, KCFG.EdgeLike) and not _in_path(successor.target.id, curr_path):
1260 worklist.append(curr_path + [successor])
1261 elif isinstance(successor, KCFG.MultiEdge):
1262 if len(list(successor.targets)) == 1:
1263 target = list(successor.targets)[0]
1264 if not _in_path(target.id, curr_path):
1265 worklist.append(curr_path + [successor])
1266 elif len(list(successor.targets)) > 1:
1267 worklist.append(curr_path + [successor])
1268
1269 return paths
1270
[docs]
1271 def reachable_nodes(self, source_id: NodeIdLike, *, reverse: bool = False) -> set[KCFG.Node]:
1272 visited: set[KCFG.Node] = set()
1273 worklist: list[KCFG.Node] = [self.node(source_id)]
1274
1275 while worklist:
1276 node = worklist.pop()
1277
1278 if node in visited:
1279 continue
1280
1281 visited.add(node)
1282
1283 if not reverse:
1284 worklist.extend(target for succ in self.successors(source_id=node.id) for target in succ.targets)
1285 else:
1286 worklist.extend(succ.source for succ in self.predecessors(target_id=node.id))
1287
1288 return visited
1289
[docs]
1290 def write_cfg_data(self) -> None:
1291 assert self._kcfg_store is not None
1292 self._kcfg_store.write_cfg_data(
1293 self, self.to_dict_no_nodes(), deleted_nodes=self._deleted_nodes, created_nodes=self._created_nodes
1294 )
1295 self._deleted_nodes.clear()
1296 self._created_nodes.clear()
1297
[docs]
1298 @staticmethod
1299 def read_cfg_data(cfg_dir: Path) -> KCFG:
1300 store = KCFGStore(cfg_dir)
1301 cfg = KCFG.from_dict(store.read_cfg_data())
1302 cfg._kcfg_store = store
1303 return cfg
1304
[docs]
1305 @staticmethod
1306 def read_node_data(cfg_dir: Path, node_id: int) -> KCFG.Node:
1307 store = KCFGStore(cfg_dir)
1308 return KCFG.Node.from_dict(store.read_node_data(node_id))
1309
1310
[docs]
1311class KCFGExtendResult(ABC): ...
1312
1313
[docs]
1314@final
1315@dataclass(frozen=True)
1316class Vacuous(KCFGExtendResult): ...
1317
1318
[docs]
1319@final
1320@dataclass(frozen=True)
1321class Stuck(KCFGExtendResult): ...
1322
1323
[docs]
1324@final
1325@dataclass(frozen=True)
1326class Abstract(KCFGExtendResult):
1327 cterm: CTerm
1328
1329
[docs]
1330@final
1331@dataclass(frozen=True)
1332class Step(KCFGExtendResult):
1333 cterm: CTerm
1334 depth: int
1335 logs: tuple[LogEntry, ...]
1336 rule_labels: list[str]
1337 cut: bool = field(default=False)
1338 info: str = field(default='')
1339
1340
[docs]
1341@final
1342@dataclass(frozen=True)
1343class Branch(KCFGExtendResult):
1344 constraints: tuple[KInner, ...]
1345 heuristic: bool
1346 info: str = field(default='')
1347
1348 def __init__(self, constraints: Iterable[KInner], *, heuristic: bool = False, info: str = ''):
1349 object.__setattr__(self, 'constraints', tuple(constraints))
1350 object.__setattr__(self, 'heuristic', heuristic)
1351 object.__setattr__(self, 'info', info)
1352
1353
[docs]
1354@final
1355@dataclass(frozen=True)
1356class NDBranch(KCFGExtendResult):
1357 cterms: tuple[CTerm, ...]
1358 logs: tuple[LogEntry, ...]
1359 rule_labels: tuple[str, ...]
1360
1361 def __init__(self, cterms: Iterable[CTerm], logs: Iterable[LogEntry,], rule_labels: Iterable[str]):
1362 object.__setattr__(self, 'cterms', tuple(cterms))
1363 object.__setattr__(self, 'logs', tuple(logs))
1364 object.__setattr__(self, 'rule_labels', tuple(rule_labels))