Source code for pyk.proof.implies

  1from __future__ import annotations
  2
  3import json
  4import logging
  5from dataclasses import dataclass
  6from typing import TYPE_CHECKING, Any, Final
  7
  8from ..cterm import CSubst, CTerm, build_claim
  9from ..kast.inner import KApply, KInner, Subst
 10from ..kast.manip import extract_lhs, extract_rhs, flatten_label
 11from ..kast.prelude.k import GENERATED_TOP_CELL
 12from ..kast.prelude.kbool import BOOL, FALSE, TRUE
 13from ..kast.prelude.ml import is_bottom, is_top, mlAnd, mlEquals, mlEqualsFalse, mlEqualsTrue
 14from ..kore.rpc import client_label
 15from ..utils import ensure_dir_path
 16from .proof import FailureInfo, Proof, ProofStatus, ProofSummary, Prover
 17
 18if TYPE_CHECKING:
 19    from collections.abc import Iterable, Mapping
 20    from pathlib import Path
 21
 22    from ..kast.inner import KSort
 23    from ..kast.outer import KClaim, KDefinition
 24    from ..kcfg import KCFGExplore
 25    from ..ktool.kprint import KPrint
 26
 27_LOGGER: Final = logging.getLogger(__name__)
 28
 29
[docs] 30@dataclass(frozen=True) 31class ImpliesProofStep: 32 proof: ImpliesProof
33 34
[docs] 35@dataclass 36class ImpliesProofResult: 37 csubst: CSubst | None 38 simplified_antecedent: KInner | None 39 simplified_consequent: KInner | None
40 41
[docs] 42class ImpliesProof(Proof[ImpliesProofStep, ImpliesProofResult]): 43 antecedent: KInner 44 consequent: KInner 45 bind_universally: bool 46 simplified_antecedent: KInner | None 47 simplified_consequent: KInner | None 48 csubst: CSubst | None 49 50 def __init__( 51 self, 52 id: str, 53 antecedent: KInner, 54 consequent: KInner, 55 bind_universally: bool = False, 56 simplified_antecedent: KInner | None = None, 57 simplified_consequent: KInner | None = None, 58 csubst: CSubst | None = None, 59 proof_dir: Path | None = None, 60 subproof_ids: Iterable[str] = (), 61 admitted: bool = False, 62 ): 63 super().__init__(id=id, proof_dir=proof_dir, subproof_ids=subproof_ids, admitted=admitted) 64 self.antecedent = antecedent 65 self.consequent = consequent 66 self.bind_universally = bind_universally 67 self.simplified_antecedent = simplified_antecedent 68 self.simplified_consequent = simplified_consequent 69 self.csubst = csubst 70
[docs] 71 def get_steps(self) -> list[ImpliesProofStep]: 72 if not self.can_progress: 73 return [] 74 return [ImpliesProofStep(self)]
75
[docs] 76 def commit(self, result: ImpliesProofResult) -> None: 77 proof_type = type(self).__name__ 78 if isinstance(result, ImpliesProofResult): 79 self.csubst = result.csubst 80 self.simplified_antecedent = result.simplified_antecedent 81 self.simplified_consequent = result.simplified_consequent 82 _LOGGER.info(f'{proof_type} finished {self.id}: {self.status}') 83 else: 84 raise ValueError(f'Incorrect result type, expected ImpliesProofResult: {result}')
85 86 @property 87 def own_status(self) -> ProofStatus: 88 if self.admitted: 89 return ProofStatus.PASSED 90 if self.simplified_antecedent is None or self.simplified_consequent is None: 91 return ProofStatus.PENDING 92 if self.csubst is None: 93 return ProofStatus.FAILED 94 return ProofStatus.PASSED 95 96 @property 97 def can_progress(self) -> bool: 98 return self.simplified_antecedent is None or self.simplified_consequent is None 99
[docs] 100 def write_proof_data(self, subproofs: bool = False) -> None: 101 super().write_proof_data() 102 if not self.proof_dir: 103 return 104 ensure_dir_path(self.proof_dir) 105 ensure_dir_path(self.proof_dir / self.id) 106 proof_path = self.proof_dir / self.id / 'proof.json' 107 if not self.up_to_date: 108 proof_json = json.dumps(self.dict) 109 proof_path.write_text(proof_json) 110 _LOGGER.info(f'Updated proof file {self.id}: {proof_path}')
111
[docs] 112 @classmethod 113 def from_dict(cls: type[ImpliesProof], dct: Mapping[str, Any], proof_dir: Path | None = None) -> ImpliesProof: 114 id = dct['id'] 115 antecedent = KInner.from_dict(dct['antecedent']) 116 consequent = KInner.from_dict(dct['consequent']) 117 simplified_antecedent = ( 118 KInner.from_dict(dct['simplified_antecedent']) if 'simplified_antecedent' in dct else None 119 ) 120 simplified_consequent = ( 121 KInner.from_dict(dct['simplified_consequent']) if 'simplified_consequent' in dct else None 122 ) 123 csubst = CSubst.from_dict(dct['csubst']) if 'csubst' in dct else None 124 subproof_ids = dct['subproof_ids'] 125 admitted = dct.get('admitted', False) 126 return ImpliesProof( 127 id, 128 antecedent, 129 consequent, 130 simplified_antecedent=simplified_antecedent, 131 simplified_consequent=simplified_consequent, 132 csubst=csubst, 133 admitted=admitted, 134 subproof_ids=subproof_ids, 135 proof_dir=proof_dir, 136 )
137 138 @property 139 def dict(self) -> dict[str, Any]: 140 dct = super().dict 141 dct['type'] = 'ImpliesProof' 142 dct['antecedent'] = self.antecedent.to_dict() 143 dct['consequent'] = self.consequent.to_dict() 144 if self.simplified_antecedent is not None: 145 dct['simplified_antecedent'] = self.simplified_antecedent.to_dict() 146 if self.simplified_consequent is not None: 147 dct['simplified_consequent'] = self.simplified_consequent.to_dict() 148 if self.csubst is not None: 149 dct['csubst'] = self.csubst.to_dict() 150 return dct
151 152
[docs] 153class EqualityProof(ImpliesProof): 154 def __init__( 155 self, 156 id: str, 157 lhs_body: KInner, 158 rhs_body: KInner, 159 sort: KSort, 160 constraints: Iterable[KInner] = (), 161 simplified_constraints: KInner | None = None, 162 simplified_equality: KInner | None = None, 163 csubst: CSubst | None = None, 164 proof_dir: Path | None = None, 165 subproof_ids: Iterable[str] = (), 166 admitted: bool = False, 167 ): 168 antecedent = mlAnd(constraints) 169 consequent = mlEquals(lhs_body, rhs_body, arg_sort=sort, sort=GENERATED_TOP_CELL) 170 super().__init__( 171 id, 172 antecedent, 173 consequent, 174 bind_universally=True, 175 simplified_antecedent=simplified_constraints, 176 simplified_consequent=simplified_equality, 177 csubst=csubst, 178 proof_dir=proof_dir, 179 subproof_ids=subproof_ids, 180 admitted=admitted, 181 ) 182 _LOGGER.warning( 183 'Building an EqualityProof that has known soundness issues: See https://github.com/runtimeverification/haskell-backend/issues/3605.' 184 ) 185
[docs] 186 @staticmethod 187 def read_proof_data(proof_dir: Path, id: str) -> EqualityProof: 188 proof_path = proof_dir / id / 'proof.json' 189 if Proof.proof_data_exists(id, proof_dir): 190 proof_dict = json.loads(proof_path.read_text()) 191 return EqualityProof.from_dict(proof_dict, proof_dir) 192 193 raise ValueError(f'Could not load Proof from file {id}: {proof_path}')
194
[docs] 195 @staticmethod 196 def from_claim(claim: KClaim, defn: KDefinition, proof_dir: Path | None = None) -> EqualityProof: 197 claim_body = defn.add_sort_params(claim.body) 198 sort = defn.sort_strict(claim_body) 199 lhs_body = extract_lhs(claim_body) 200 rhs_body = extract_rhs(claim_body) 201 if not (claim.ensures is None or claim.ensures == TRUE): 202 raise ValueError(f'Cannot convert claim to EqualityProof due to non-trival ensures clause {claim.ensures}') 203 constraints = [mlEquals(TRUE, c, arg_sort=BOOL) for c in flatten_label('_andBool_', claim.requires)] 204 return EqualityProof(claim.label, lhs_body, rhs_body, sort, constraints=constraints, proof_dir=proof_dir)
205 206 @property 207 def equality(self) -> KApply: 208 assert type(self.consequent) is KApply 209 return self.consequent 210 211 @property 212 def lhs_body(self) -> KInner: 213 return self.equality.args[0] 214 215 @property 216 def rhs_body(self) -> KInner: 217 return self.equality.args[1] 218 219 @property 220 def sort(self) -> KSort: 221 return self.equality.label.params[0] 222 223 @property 224 def constraint(self) -> KInner: 225 return self.antecedent 226 227 @property 228 def constraints(self) -> list[KInner]: 229 return flatten_label('#And', self.constraint) 230 231 @property 232 def simplified_constraints(self) -> KInner | None: 233 return self.simplified_antecedent 234 235 @property 236 def simplified_equality(self) -> KInner | None: 237 return self.simplified_consequent 238
[docs] 239 @classmethod 240 def from_dict(cls: type[EqualityProof], dct: Mapping[str, Any], proof_dir: Path | None = None) -> EqualityProof: 241 implies_proof = ImpliesProof.from_dict(dct, proof_dir=proof_dir) 242 assert type(implies_proof.consequent) is KApply 243 return EqualityProof( 244 id=implies_proof.id, 245 lhs_body=implies_proof.consequent.args[0], 246 rhs_body=implies_proof.consequent.args[1], 247 sort=implies_proof.consequent.label.params[0], 248 constraints=flatten_label('#And', implies_proof.antecedent), 249 simplified_constraints=implies_proof.simplified_antecedent, 250 simplified_equality=implies_proof.simplified_consequent, 251 csubst=implies_proof.csubst, 252 proof_dir=implies_proof.proof_dir, 253 subproof_ids=implies_proof.subproof_ids, 254 admitted=implies_proof.admitted, 255 )
256 257 @property 258 def dict(self) -> dict[str, Any]: 259 dct = super().dict 260 dct['type'] = 'EqualityProof' 261 return dct 262
[docs] 263 def pretty(self, kprint: KPrint) -> Iterable[str]: 264 lines = [ 265 f'LHS: {kprint.pretty_print(self.lhs_body)}', 266 f'RHS: {kprint.pretty_print(self.rhs_body)}', 267 f'Constraints: {kprint.pretty_print(mlAnd(self.constraints))}', 268 f'Equality: {kprint.pretty_print(self.equality)}', 269 ] 270 if self.simplified_constraints: 271 lines.append(f'Simplified constraints: {kprint.pretty_print(self.simplified_constraints)}') 272 if self.simplified_equality: 273 lines.append(f'Simplified equality: {kprint.pretty_print(self.simplified_equality)}') 274 if self.csubst is not None: 275 lines.append(f'Implication csubst: {self.csubst}') 276 lines.append(f'Status: {self.status}') 277 return lines
278 279 @property 280 def summary(self) -> EqualitySummary: 281 return EqualitySummary(self.id, self.status, self.admitted)
282 283
[docs] 284@dataclass(frozen=True) 285class EqualitySummary(ProofSummary): 286 id: str 287 status: ProofStatus 288 admitted: bool 289 290 @property 291 def lines(self) -> list[str]: 292 return [ 293 f'EqualityProof: {self.id}', 294 f' status: {self.status}', 295 f' admitted: {self.admitted}', 296 ]
297 298
[docs] 299class RefutationProof(ImpliesProof): 300 def __init__( 301 self, 302 id: str, 303 pre_constraints: Iterable[KInner], 304 last_constraint: KInner, 305 simplified_antecedent: KInner | None = None, 306 simplified_consequent: KInner | None = None, 307 csubst: CSubst | None = None, 308 proof_dir: Path | None = None, 309 subproof_ids: Iterable[str] = (), 310 admitted: bool = False, 311 ): 312 antecedent = mlAnd(mlEqualsTrue(c) for c in pre_constraints) 313 consequent = mlEqualsFalse(last_constraint) 314 super().__init__( 315 id, 316 antecedent, 317 consequent, 318 bind_universally=True, 319 simplified_antecedent=simplified_antecedent, 320 simplified_consequent=simplified_consequent, 321 csubst=csubst, 322 subproof_ids=subproof_ids, 323 proof_dir=proof_dir, 324 admitted=admitted, 325 ) 326 _LOGGER.warning( 327 'Building a RefutationProof that has known soundness issues: See https://github.com/runtimeverification/haskell-backend/issues/3605.' 328 ) 329
[docs] 330 @staticmethod 331 def read_proof_data(proof_dir: Path, id: str) -> RefutationProof: 332 proof_path = proof_dir / id / 'proof.json' 333 if Proof.proof_data_exists(id, proof_dir): 334 proof_dict = json.loads(proof_path.read_text()) 335 return RefutationProof.from_dict(proof_dict, proof_dir) 336 337 raise ValueError(f'Could not load Proof from file {id}: {proof_path}')
338 339 @property 340 def pre_constraints(self) -> list[KInner]: 341 return flatten_label('#And', self.antecedent) 342 343 @property 344 def last_constraint(self) -> KInner: 345 assert type(self.consequent) is KApply 346 return self.consequent.args[1] 347 348 @property 349 def simplified_constraints(self) -> KInner | None: 350 return self.simplified_antecedent 351
[docs] 352 @classmethod 353 def from_dict(cls: type[RefutationProof], dct: Mapping[str, Any], proof_dir: Path | None = None) -> RefutationProof: 354 implies_proof = ImpliesProof.from_dict(dct, proof_dir=proof_dir) 355 assert type(implies_proof.consequent) is KApply 356 return RefutationProof( 357 id=implies_proof.id, 358 pre_constraints=flatten_label('#And', implies_proof.antecedent), 359 last_constraint=implies_proof.consequent.args[1], 360 simplified_antecedent=implies_proof.simplified_antecedent, 361 simplified_consequent=implies_proof.simplified_consequent, 362 csubst=implies_proof.csubst, 363 proof_dir=implies_proof.proof_dir, 364 subproof_ids=implies_proof.subproof_ids, 365 admitted=implies_proof.admitted, 366 )
367 368 @property 369 def dict(self) -> dict[str, Any]: 370 dct = super().dict 371 dct['type'] = 'RefutationProof' 372 return dct 373 374 @property 375 def summary(self) -> RefutationSummary: 376 return RefutationSummary(self.id, self.status) 377
[docs] 378 def pretty(self, kprint: KPrint) -> Iterable[str]: 379 lines = [ 380 f'Constraints: {kprint.pretty_print(mlAnd(self.pre_constraints))}', 381 f'Last constraint: {kprint.pretty_print(self.last_constraint)}', 382 ] 383 if self.csubst is not None: 384 lines.append(f'Implication csubst: {self.csubst}') 385 lines.append(f'Status: {self.status}') 386 return lines
387
[docs] 388 def to_claim(self, claim_id: str) -> tuple[KClaim, Subst]: 389 return build_claim( 390 claim_id, 391 init_config=self.last_constraint, 392 final_config=FALSE, 393 init_constraints=self.pre_constraints, 394 final_constraints=[], 395 )
396 397
[docs] 398@dataclass(frozen=True) 399class RefutationSummary(ProofSummary): 400 id: str 401 status: ProofStatus 402 403 @property 404 def lines(self) -> list[str]: 405 return [ 406 f'RefutationProof: {self.id}', 407 f' status: {self.status}', 408 ]
409 410
[docs] 411class ImpliesProver(Prover[ImpliesProof, ImpliesProofStep, ImpliesProofResult]): 412 proof: ImpliesProof 413 kcfg_explore: KCFGExplore 414 assume_defined: bool 415
[docs] 416 def close(self) -> None: 417 self.kcfg_explore.cterm_symbolic._kore_client.close()
418 419 def __init__(self, proof: ImpliesProof, kcfg_explore: KCFGExplore, assume_defined: bool = False): 420 self.kcfg_explore = kcfg_explore 421 self.proof = proof 422 self.assume_defined = assume_defined 423
[docs] 424 def step_proof(self, step: ImpliesProofStep) -> list[ImpliesProofResult]: 425 proof_type = type(step.proof).__name__ 426 _LOGGER.info(f'Attempting {proof_type} {step.proof.id}') 427 428 if step.proof.status is not ProofStatus.PENDING: 429 _LOGGER.info(f'{proof_type} finished {step.proof.id}: {step.proof.status}') 430 return [] 431 432 # to prove the equality, we check the implication of the form `constraints #Implies LHS #Equals RHS`, i.e. 433 # "LHS equals RHS under these constraints" 434 simplified_antecedent, _ = self.kcfg_explore.cterm_symbolic.kast_simplify(step.proof.antecedent) 435 simplified_consequent, _ = self.kcfg_explore.cterm_symbolic.kast_simplify(step.proof.consequent) 436 _LOGGER.debug(f'Simplified antecedent: {self.kcfg_explore.pretty_print(simplified_antecedent)}') 437 _LOGGER.debug(f'Simplified consequent: {self.kcfg_explore.pretty_print(simplified_consequent)}') 438 439 csubst: CSubst | None = None 440 441 if is_bottom(simplified_antecedent): 442 _LOGGER.warning(f'Antecedent of implication (proof constraints) simplifies to #Bottom {step.proof.id}') 443 csubst = CSubst(Subst({}), ()) 444 445 elif is_top(simplified_consequent): 446 _LOGGER.warning(f'Consequent of implication (proof equality) simplifies to #Top {step.proof.id}') 447 csubst = CSubst(Subst({}), ()) 448 449 else: 450 # TODO: we should not be forced to include the dummy configuration in the antecedent and consequent 451 dummy_config = self.kcfg_explore.cterm_symbolic._definition.empty_config(sort=GENERATED_TOP_CELL) 452 _result = self.kcfg_explore.cterm_symbolic.implies( 453 antecedent=CTerm(config=dummy_config, constraints=[simplified_antecedent]), 454 consequent=CTerm(config=dummy_config, constraints=[simplified_consequent]), 455 bind_universally=step.proof.bind_universally, 456 assume_defined=self.assume_defined, 457 ) 458 result = _result.csubst 459 if result is not None: 460 csubst = result 461 462 _LOGGER.info(f'{proof_type} finished {step.proof.id}: {step.proof.status}') 463 return [ 464 ImpliesProofResult( 465 csubst=csubst, simplified_antecedent=simplified_antecedent, simplified_consequent=simplified_consequent 466 ) 467 ]
468
[docs] 469 def init_proof(self, proof: ImpliesProof) -> None: 470 # Stamp proof.id on every subsequent kore-RPC request from this thread so 471 # booster's `{request: ...}` log lines self-identify the originating 472 # claim. Worker-thread consumers must `client_label.set(...)` inside 473 # their own closure — `ContextVar` is not propagated by ThreadPoolExecutor. 474 client_label.set(proof.id)
475
[docs] 476 def failure_info(self, proof: ImpliesProof) -> FailureInfo: 477 # TODO add implementation 478 return FailureInfo()