1from __future__ import annotations
2
3import logging
4from typing import TYPE_CHECKING
5
6from ..kast.manip import extract_lhs
7from ..kast.outer import KApply
8from . import APRProof, APRProver, EqualityProof, ImpliesProver
9
10if TYPE_CHECKING:
11 from collections.abc import Callable
12 from pathlib import Path
13 from typing import ContextManager, Final
14
15 from ..cli.pyk import ProveOptions
16 from ..kast.outer import KClaim
17 from ..kcfg import KCFGExplore
18 from ..ktool.kprove import KProve
19 from . import Proof, Prover
20
21_LOGGER: Final = logging.getLogger(__name__)
22
23
[docs]
24class ProveRpc:
25 _kprove: KProve
26 _explore_context: Callable[[], ContextManager[KCFGExplore]]
27
28 def __init__(
29 self,
30 kprove: KProve,
31 explore_context: Callable[[], ContextManager[KCFGExplore]],
32 ):
33 self._kprove = kprove
34 self._explore_context = explore_context
35
[docs]
36 def prove_rpc(self, options: ProveOptions) -> list[Proof]:
37 all_claims = self._kprove.get_claims(
38 options.spec_file,
39 spec_module_name=options.spec_module,
40 claim_labels=options.claim_labels,
41 exclude_claim_labels=options.exclude_claim_labels,
42 type_inference_mode=options.type_inference_mode,
43 )
44
45 if all_claims is None:
46 raise ValueError(f'No claims found in file: {options.spec_file}')
47
48 return [
49 self._prove_claim_rpc(
50 claim,
51 assume_defined=options.assume_defined,
52 max_depth=options.max_depth,
53 save_directory=options.save_directory,
54 max_iterations=options.max_iterations,
55 )
56 for claim in all_claims
57 ]
58
59 def _prove_claim_rpc(
60 self,
61 claim: KClaim,
62 assume_defined: bool,
63 max_depth: int | None = None,
64 save_directory: Path | None = None,
65 max_iterations: int | None = None,
66 ) -> Proof:
67 definition = self._kprove.definition
68
69 proof: Proof
70 prover: Prover
71 lhs_top = extract_lhs(claim.body)
72 is_functional_claim = type(lhs_top) is KApply and definition.symbols[lhs_top.label.name] in definition.functions
73
74 if is_functional_claim:
75 proof = EqualityProof.from_claim(claim, definition, proof_dir=save_directory)
76 if save_directory is not None and EqualityProof.proof_data_exists(proof.id, save_directory):
77 _LOGGER.info(f'Reloading from disk {proof.id}: {save_directory}')
78 proof = EqualityProof.read_proof_data(save_directory, proof.id)
79
80 else:
81 proof = APRProof.from_claim(definition, claim, {}, proof_dir=save_directory)
82 if save_directory is not None and APRProof.proof_data_exists(proof.id, save_directory):
83 _LOGGER.info(f'Reloading from disk {proof.id}: {save_directory}')
84 proof = APRProof.read_proof_data(save_directory, proof.id)
85
86 if not proof.passed and (max_iterations is None or max_iterations > 0):
87 with self._explore_context() as kcfg_explore:
88 if is_functional_claim:
89 assert type(proof) is EqualityProof
90 prover = ImpliesProver(proof, kcfg_explore, assume_defined=assume_defined)
91 else:
92 assert type(proof) is APRProof
93 prover = APRProver(kcfg_explore, execute_depth=max_depth, assume_defined=assume_defined)
94 prover.advance_proof(proof, max_iterations=max_iterations)
95
96 if proof.passed:
97 _LOGGER.info(f'Proof passed: {proof.id}')
98 elif proof.failed:
99 _LOGGER.info(f'Proof failed: {proof.id}')
100 else:
101 _LOGGER.info(f'Proof pending: {proof.id}')
102 return proof