Source code for pyk.proof.prove_rpc

  1from __future__ import annotations
  2
  3import logging
  4from typing import TYPE_CHECKING
  5
  6from ..kast.manip import extract_lhs
  7from ..kast.outer import KApply
  8from . import APRProof, APRProver, EqualityProof, ImpliesProver
  9
 10if TYPE_CHECKING:
 11    from collections.abc import Callable
 12    from pathlib import Path
 13    from typing import ContextManager, Final
 14
 15    from ..cli.pyk import ProveOptions
 16    from ..kast.outer import KClaim
 17    from ..kcfg import KCFGExplore
 18    from ..ktool.kprove import KProve
 19    from . import Proof, Prover
 20
 21_LOGGER: Final = logging.getLogger(__name__)
 22
 23
[docs] 24class ProveRpc: 25 _kprove: KProve 26 _explore_context: Callable[[], ContextManager[KCFGExplore]] 27 28 def __init__( 29 self, 30 kprove: KProve, 31 explore_context: Callable[[], ContextManager[KCFGExplore]], 32 ): 33 self._kprove = kprove 34 self._explore_context = explore_context 35
[docs] 36 def prove_rpc(self, options: ProveOptions) -> list[Proof]: 37 all_claims = self._kprove.get_claims( 38 options.spec_file, 39 spec_module_name=options.spec_module, 40 claim_labels=options.claim_labels, 41 exclude_claim_labels=options.exclude_claim_labels, 42 type_inference_mode=options.type_inference_mode, 43 ) 44 45 if all_claims is None: 46 raise ValueError(f'No claims found in file: {options.spec_file}') 47 48 return [ 49 self._prove_claim_rpc( 50 claim, 51 assume_defined=options.assume_defined, 52 max_depth=options.max_depth, 53 save_directory=options.save_directory, 54 max_iterations=options.max_iterations, 55 step_timeout=options.step_timeout, 56 ) 57 for claim in all_claims 58 ]
59 60 def _prove_claim_rpc( 61 self, 62 claim: KClaim, 63 assume_defined: bool, 64 max_depth: int | None = None, 65 save_directory: Path | None = None, 66 max_iterations: int | None = None, 67 step_timeout: int | None = None, 68 ) -> Proof: 69 definition = self._kprove.definition 70 71 proof: Proof 72 prover: Prover 73 lhs_top = extract_lhs(claim.body) 74 is_functional_claim = type(lhs_top) is KApply and definition.symbols[lhs_top.label.name] in definition.functions 75 76 if is_functional_claim: 77 proof = EqualityProof.from_claim(claim, definition, proof_dir=save_directory) 78 if save_directory is not None and EqualityProof.proof_data_exists(proof.id, save_directory): 79 _LOGGER.info(f'Reloading from disk {proof.id}: {save_directory}') 80 proof = EqualityProof.read_proof_data(save_directory, proof.id) 81 82 else: 83 proof = APRProof.from_claim(definition, claim, {}, proof_dir=save_directory) 84 if save_directory is not None and APRProof.proof_data_exists(proof.id, save_directory): 85 _LOGGER.info(f'Reloading from disk {proof.id}: {save_directory}') 86 proof = APRProof.read_proof_data(save_directory, proof.id) 87 88 if not proof.passed and (max_iterations is None or max_iterations > 0): 89 with self._explore_context() as kcfg_explore: 90 if is_functional_claim: 91 assert type(proof) is EqualityProof 92 prover = ImpliesProver(proof, kcfg_explore, assume_defined=assume_defined) 93 else: 94 assert type(proof) is APRProof 95 prover = APRProver( 96 kcfg_explore, 97 execute_depth=max_depth, 98 assume_defined=assume_defined, 99 step_timeout=step_timeout, 100 ) 101 prover.advance_proof(proof, max_iterations=max_iterations) # type: ignore [arg-type] 102 103 if proof.passed: 104 _LOGGER.info(f'Proof passed: {proof.id}') 105 elif proof.failed: 106 _LOGGER.info(f'Proof failed: {proof.id}') 107 else: 108 _LOGGER.info(f'Proof pending: {proof.id}') 109 return proof