1from __future__ import annotations
2
3import logging
4from typing import TYPE_CHECKING
5
6from ..kast.manip import extract_lhs
7from ..kast.outer import KApply
8from . import APRProof, APRProver, EqualityProof, ImpliesProver
9
10if TYPE_CHECKING:
11 from collections.abc import Callable
12 from pathlib import Path
13 from typing import ContextManager, Final
14
15 from ..cli.pyk import ProveOptions
16 from ..kast.outer import KClaim
17 from ..kcfg import KCFGExplore
18 from ..ktool.kprove import KProve
19 from . import Proof, Prover
20
21_LOGGER: Final = logging.getLogger(__name__)
22
23
[docs]
24class ProveRpc:
25 _kprove: KProve
26 _explore_context: Callable[[], ContextManager[KCFGExplore]]
27
28 def __init__(
29 self,
30 kprove: KProve,
31 explore_context: Callable[[], ContextManager[KCFGExplore]],
32 ):
33 self._kprove = kprove
34 self._explore_context = explore_context
35
[docs]
36 def prove_rpc(self, options: ProveOptions) -> list[Proof]:
37 all_claims = self._kprove.get_claims(
38 options.spec_file,
39 spec_module_name=options.spec_module,
40 claim_labels=options.claim_labels,
41 exclude_claim_labels=options.exclude_claim_labels,
42 type_inference_mode=options.type_inference_mode,
43 )
44
45 if all_claims is None:
46 raise ValueError(f'No claims found in file: {options.spec_file}')
47
48 return [
49 self._prove_claim_rpc(
50 claim,
51 assume_defined=options.assume_defined,
52 max_depth=options.max_depth,
53 save_directory=options.save_directory,
54 max_iterations=options.max_iterations,
55 step_timeout=options.step_timeout,
56 )
57 for claim in all_claims
58 ]
59
60 def _prove_claim_rpc(
61 self,
62 claim: KClaim,
63 assume_defined: bool,
64 max_depth: int | None = None,
65 save_directory: Path | None = None,
66 max_iterations: int | None = None,
67 step_timeout: int | None = None,
68 ) -> Proof:
69 definition = self._kprove.definition
70
71 proof: Proof
72 prover: Prover
73 lhs_top = extract_lhs(claim.body)
74 is_functional_claim = type(lhs_top) is KApply and definition.symbols[lhs_top.label.name] in definition.functions
75
76 if is_functional_claim:
77 proof = EqualityProof.from_claim(claim, definition, proof_dir=save_directory)
78 if save_directory is not None and EqualityProof.proof_data_exists(proof.id, save_directory):
79 _LOGGER.info(f'Reloading from disk {proof.id}: {save_directory}')
80 proof = EqualityProof.read_proof_data(save_directory, proof.id)
81
82 else:
83 proof = APRProof.from_claim(definition, claim, {}, proof_dir=save_directory)
84 if save_directory is not None and APRProof.proof_data_exists(proof.id, save_directory):
85 _LOGGER.info(f'Reloading from disk {proof.id}: {save_directory}')
86 proof = APRProof.read_proof_data(save_directory, proof.id)
87
88 if not proof.passed and (max_iterations is None or max_iterations > 0):
89 with self._explore_context() as kcfg_explore:
90 if is_functional_claim:
91 assert type(proof) is EqualityProof
92 prover = ImpliesProver(proof, kcfg_explore, assume_defined=assume_defined)
93 else:
94 assert type(proof) is APRProof
95 prover = APRProver(
96 kcfg_explore,
97 execute_depth=max_depth,
98 assume_defined=assume_defined,
99 step_timeout=step_timeout,
100 )
101 prover.advance_proof(proof, max_iterations=max_iterations) # type: ignore [arg-type]
102
103 if proof.passed:
104 _LOGGER.info(f'Proof passed: {proof.id}')
105 elif proof.failed:
106 _LOGGER.info(f'Proof failed: {proof.id}')
107 else:
108 _LOGGER.info(f'Proof pending: {proof.id}')
109 return proof