1from __future__ import annotations
2
3import json
4import logging
5from dataclasses import dataclass
6from typing import TYPE_CHECKING, Any, Final
7
8from ..cterm import CSubst, CTerm, build_claim
9from ..kast.inner import KApply, KInner, Subst
10from ..kast.manip import extract_lhs, extract_rhs, flatten_label
11from ..kast.prelude.k import GENERATED_TOP_CELL
12from ..kast.prelude.kbool import BOOL, FALSE, TRUE
13from ..kast.prelude.ml import is_bottom, is_top, mlAnd, mlEquals, mlEqualsFalse, mlEqualsTrue
14from ..kore.rpc import client_label
15from ..utils import ensure_dir_path
16from .proof import FailureInfo, Proof, ProofStatus, ProofSummary, Prover
17
18if TYPE_CHECKING:
19 from collections.abc import Iterable, Mapping
20 from pathlib import Path
21
22 from ..kast.inner import KSort
23 from ..kast.outer import KClaim, KDefinition
24 from ..kcfg import KCFGExplore
25 from ..ktool.kprint import KPrint
26
27_LOGGER: Final = logging.getLogger(__name__)
28
29
[docs]
30@dataclass(frozen=True)
31class ImpliesProofStep:
32 proof: ImpliesProof
33
34
[docs]
35@dataclass
36class ImpliesProofResult:
37 csubst: CSubst | None
38 simplified_antecedent: KInner | None
39 simplified_consequent: KInner | None
40
41
[docs]
42class ImpliesProof(Proof[ImpliesProofStep, ImpliesProofResult]):
43 antecedent: KInner
44 consequent: KInner
45 bind_universally: bool
46 simplified_antecedent: KInner | None
47 simplified_consequent: KInner | None
48 csubst: CSubst | None
49
50 def __init__(
51 self,
52 id: str,
53 antecedent: KInner,
54 consequent: KInner,
55 bind_universally: bool = False,
56 simplified_antecedent: KInner | None = None,
57 simplified_consequent: KInner | None = None,
58 csubst: CSubst | None = None,
59 proof_dir: Path | None = None,
60 subproof_ids: Iterable[str] = (),
61 admitted: bool = False,
62 ):
63 super().__init__(id=id, proof_dir=proof_dir, subproof_ids=subproof_ids, admitted=admitted)
64 self.antecedent = antecedent
65 self.consequent = consequent
66 self.bind_universally = bind_universally
67 self.simplified_antecedent = simplified_antecedent
68 self.simplified_consequent = simplified_consequent
69 self.csubst = csubst
70
[docs]
71 def get_steps(self) -> list[ImpliesProofStep]:
72 if not self.can_progress:
73 return []
74 return [ImpliesProofStep(self)]
75
[docs]
76 def commit(self, result: ImpliesProofResult) -> None:
77 proof_type = type(self).__name__
78 if isinstance(result, ImpliesProofResult):
79 self.csubst = result.csubst
80 self.simplified_antecedent = result.simplified_antecedent
81 self.simplified_consequent = result.simplified_consequent
82 _LOGGER.info(f'{proof_type} finished {self.id}: {self.status}')
83 else:
84 raise ValueError(f'Incorrect result type, expected ImpliesProofResult: {result}')
85
86 @property
87 def own_status(self) -> ProofStatus:
88 if self.admitted:
89 return ProofStatus.PASSED
90 if self.simplified_antecedent is None or self.simplified_consequent is None:
91 return ProofStatus.PENDING
92 if self.csubst is None:
93 return ProofStatus.FAILED
94 return ProofStatus.PASSED
95
96 @property
97 def can_progress(self) -> bool:
98 return self.simplified_antecedent is None or self.simplified_consequent is None
99
[docs]
100 def write_proof_data(self, subproofs: bool = False) -> None:
101 super().write_proof_data()
102 if not self.proof_dir:
103 return
104 ensure_dir_path(self.proof_dir)
105 ensure_dir_path(self.proof_dir / self.id)
106 proof_path = self.proof_dir / self.id / 'proof.json'
107 if not self.up_to_date:
108 proof_json = json.dumps(self.dict)
109 proof_path.write_text(proof_json)
110 _LOGGER.info(f'Updated proof file {self.id}: {proof_path}')
111
[docs]
112 @classmethod
113 def from_dict(cls: type[ImpliesProof], dct: Mapping[str, Any], proof_dir: Path | None = None) -> ImpliesProof:
114 id = dct['id']
115 antecedent = KInner.from_dict(dct['antecedent'])
116 consequent = KInner.from_dict(dct['consequent'])
117 simplified_antecedent = (
118 KInner.from_dict(dct['simplified_antecedent']) if 'simplified_antecedent' in dct else None
119 )
120 simplified_consequent = (
121 KInner.from_dict(dct['simplified_consequent']) if 'simplified_consequent' in dct else None
122 )
123 csubst = CSubst.from_dict(dct['csubst']) if 'csubst' in dct else None
124 subproof_ids = dct['subproof_ids']
125 admitted = dct.get('admitted', False)
126 return ImpliesProof(
127 id,
128 antecedent,
129 consequent,
130 simplified_antecedent=simplified_antecedent,
131 simplified_consequent=simplified_consequent,
132 csubst=csubst,
133 admitted=admitted,
134 subproof_ids=subproof_ids,
135 proof_dir=proof_dir,
136 )
137
138 @property
139 def dict(self) -> dict[str, Any]:
140 dct = super().dict
141 dct['type'] = 'ImpliesProof'
142 dct['antecedent'] = self.antecedent.to_dict()
143 dct['consequent'] = self.consequent.to_dict()
144 if self.simplified_antecedent is not None:
145 dct['simplified_antecedent'] = self.simplified_antecedent.to_dict()
146 if self.simplified_consequent is not None:
147 dct['simplified_consequent'] = self.simplified_consequent.to_dict()
148 if self.csubst is not None:
149 dct['csubst'] = self.csubst.to_dict()
150 return dct
151
152
[docs]
153class EqualityProof(ImpliesProof):
154 def __init__(
155 self,
156 id: str,
157 lhs_body: KInner,
158 rhs_body: KInner,
159 sort: KSort,
160 constraints: Iterable[KInner] = (),
161 simplified_constraints: KInner | None = None,
162 simplified_equality: KInner | None = None,
163 csubst: CSubst | None = None,
164 proof_dir: Path | None = None,
165 subproof_ids: Iterable[str] = (),
166 admitted: bool = False,
167 ):
168 antecedent = mlAnd(constraints)
169 consequent = mlEquals(lhs_body, rhs_body, arg_sort=sort, sort=GENERATED_TOP_CELL)
170 super().__init__(
171 id,
172 antecedent,
173 consequent,
174 bind_universally=True,
175 simplified_antecedent=simplified_constraints,
176 simplified_consequent=simplified_equality,
177 csubst=csubst,
178 proof_dir=proof_dir,
179 subproof_ids=subproof_ids,
180 admitted=admitted,
181 )
182 _LOGGER.warning(
183 'Building an EqualityProof that has known soundness issues: See https://github.com/runtimeverification/haskell-backend/issues/3605.'
184 )
185
[docs]
186 @staticmethod
187 def read_proof_data(proof_dir: Path, id: str) -> EqualityProof:
188 proof_path = proof_dir / id / 'proof.json'
189 if Proof.proof_data_exists(id, proof_dir):
190 proof_dict = json.loads(proof_path.read_text())
191 return EqualityProof.from_dict(proof_dict, proof_dir)
192
193 raise ValueError(f'Could not load Proof from file {id}: {proof_path}')
194
[docs]
195 @staticmethod
196 def from_claim(claim: KClaim, defn: KDefinition, proof_dir: Path | None = None) -> EqualityProof:
197 claim_body = defn.add_sort_params(claim.body)
198 sort = defn.sort_strict(claim_body)
199 lhs_body = extract_lhs(claim_body)
200 rhs_body = extract_rhs(claim_body)
201 if not (claim.ensures is None or claim.ensures == TRUE):
202 raise ValueError(f'Cannot convert claim to EqualityProof due to non-trival ensures clause {claim.ensures}')
203 constraints = [mlEquals(TRUE, c, arg_sort=BOOL) for c in flatten_label('_andBool_', claim.requires)]
204 return EqualityProof(claim.label, lhs_body, rhs_body, sort, constraints=constraints, proof_dir=proof_dir)
205
206 @property
207 def equality(self) -> KApply:
208 assert type(self.consequent) is KApply
209 return self.consequent
210
211 @property
212 def lhs_body(self) -> KInner:
213 return self.equality.args[0]
214
215 @property
216 def rhs_body(self) -> KInner:
217 return self.equality.args[1]
218
219 @property
220 def sort(self) -> KSort:
221 return self.equality.label.params[0]
222
223 @property
224 def constraint(self) -> KInner:
225 return self.antecedent
226
227 @property
228 def constraints(self) -> list[KInner]:
229 return flatten_label('#And', self.constraint)
230
231 @property
232 def simplified_constraints(self) -> KInner | None:
233 return self.simplified_antecedent
234
235 @property
236 def simplified_equality(self) -> KInner | None:
237 return self.simplified_consequent
238
[docs]
239 @classmethod
240 def from_dict(cls: type[EqualityProof], dct: Mapping[str, Any], proof_dir: Path | None = None) -> EqualityProof:
241 implies_proof = ImpliesProof.from_dict(dct, proof_dir=proof_dir)
242 assert type(implies_proof.consequent) is KApply
243 return EqualityProof(
244 id=implies_proof.id,
245 lhs_body=implies_proof.consequent.args[0],
246 rhs_body=implies_proof.consequent.args[1],
247 sort=implies_proof.consequent.label.params[0],
248 constraints=flatten_label('#And', implies_proof.antecedent),
249 simplified_constraints=implies_proof.simplified_antecedent,
250 simplified_equality=implies_proof.simplified_consequent,
251 csubst=implies_proof.csubst,
252 proof_dir=implies_proof.proof_dir,
253 subproof_ids=implies_proof.subproof_ids,
254 admitted=implies_proof.admitted,
255 )
256
257 @property
258 def dict(self) -> dict[str, Any]:
259 dct = super().dict
260 dct['type'] = 'EqualityProof'
261 return dct
262
[docs]
263 def pretty(self, kprint: KPrint) -> Iterable[str]:
264 lines = [
265 f'LHS: {kprint.pretty_print(self.lhs_body)}',
266 f'RHS: {kprint.pretty_print(self.rhs_body)}',
267 f'Constraints: {kprint.pretty_print(mlAnd(self.constraints))}',
268 f'Equality: {kprint.pretty_print(self.equality)}',
269 ]
270 if self.simplified_constraints:
271 lines.append(f'Simplified constraints: {kprint.pretty_print(self.simplified_constraints)}')
272 if self.simplified_equality:
273 lines.append(f'Simplified equality: {kprint.pretty_print(self.simplified_equality)}')
274 if self.csubst is not None:
275 lines.append(f'Implication csubst: {self.csubst}')
276 lines.append(f'Status: {self.status}')
277 return lines
278
279 @property
280 def summary(self) -> EqualitySummary:
281 return EqualitySummary(self.id, self.status, self.admitted)
282
283
[docs]
284@dataclass(frozen=True)
285class EqualitySummary(ProofSummary):
286 id: str
287 status: ProofStatus
288 admitted: bool
289
290 @property
291 def lines(self) -> list[str]:
292 return [
293 f'EqualityProof: {self.id}',
294 f' status: {self.status}',
295 f' admitted: {self.admitted}',
296 ]
297
298
[docs]
299class RefutationProof(ImpliesProof):
300 def __init__(
301 self,
302 id: str,
303 pre_constraints: Iterable[KInner],
304 last_constraint: KInner,
305 simplified_antecedent: KInner | None = None,
306 simplified_consequent: KInner | None = None,
307 csubst: CSubst | None = None,
308 proof_dir: Path | None = None,
309 subproof_ids: Iterable[str] = (),
310 admitted: bool = False,
311 ):
312 antecedent = mlAnd(mlEqualsTrue(c) for c in pre_constraints)
313 consequent = mlEqualsFalse(last_constraint)
314 super().__init__(
315 id,
316 antecedent,
317 consequent,
318 bind_universally=True,
319 simplified_antecedent=simplified_antecedent,
320 simplified_consequent=simplified_consequent,
321 csubst=csubst,
322 subproof_ids=subproof_ids,
323 proof_dir=proof_dir,
324 admitted=admitted,
325 )
326 _LOGGER.warning(
327 'Building a RefutationProof that has known soundness issues: See https://github.com/runtimeverification/haskell-backend/issues/3605.'
328 )
329
[docs]
330 @staticmethod
331 def read_proof_data(proof_dir: Path, id: str) -> RefutationProof:
332 proof_path = proof_dir / id / 'proof.json'
333 if Proof.proof_data_exists(id, proof_dir):
334 proof_dict = json.loads(proof_path.read_text())
335 return RefutationProof.from_dict(proof_dict, proof_dir)
336
337 raise ValueError(f'Could not load Proof from file {id}: {proof_path}')
338
339 @property
340 def pre_constraints(self) -> list[KInner]:
341 return flatten_label('#And', self.antecedent)
342
343 @property
344 def last_constraint(self) -> KInner:
345 assert type(self.consequent) is KApply
346 return self.consequent.args[1]
347
348 @property
349 def simplified_constraints(self) -> KInner | None:
350 return self.simplified_antecedent
351
[docs]
352 @classmethod
353 def from_dict(cls: type[RefutationProof], dct: Mapping[str, Any], proof_dir: Path | None = None) -> RefutationProof:
354 implies_proof = ImpliesProof.from_dict(dct, proof_dir=proof_dir)
355 assert type(implies_proof.consequent) is KApply
356 return RefutationProof(
357 id=implies_proof.id,
358 pre_constraints=flatten_label('#And', implies_proof.antecedent),
359 last_constraint=implies_proof.consequent.args[1],
360 simplified_antecedent=implies_proof.simplified_antecedent,
361 simplified_consequent=implies_proof.simplified_consequent,
362 csubst=implies_proof.csubst,
363 proof_dir=implies_proof.proof_dir,
364 subproof_ids=implies_proof.subproof_ids,
365 admitted=implies_proof.admitted,
366 )
367
368 @property
369 def dict(self) -> dict[str, Any]:
370 dct = super().dict
371 dct['type'] = 'RefutationProof'
372 return dct
373
374 @property
375 def summary(self) -> RefutationSummary:
376 return RefutationSummary(self.id, self.status)
377
[docs]
378 def pretty(self, kprint: KPrint) -> Iterable[str]:
379 lines = [
380 f'Constraints: {kprint.pretty_print(mlAnd(self.pre_constraints))}',
381 f'Last constraint: {kprint.pretty_print(self.last_constraint)}',
382 ]
383 if self.csubst is not None:
384 lines.append(f'Implication csubst: {self.csubst}')
385 lines.append(f'Status: {self.status}')
386 return lines
387
[docs]
388 def to_claim(self, claim_id: str) -> tuple[KClaim, Subst]:
389 return build_claim(
390 claim_id,
391 init_config=self.last_constraint,
392 final_config=FALSE,
393 init_constraints=self.pre_constraints,
394 final_constraints=[],
395 )
396
397
[docs]
398@dataclass(frozen=True)
399class RefutationSummary(ProofSummary):
400 id: str
401 status: ProofStatus
402
403 @property
404 def lines(self) -> list[str]:
405 return [
406 f'RefutationProof: {self.id}',
407 f' status: {self.status}',
408 ]
409
410
[docs]
411class ImpliesProver(Prover[ImpliesProof, ImpliesProofStep, ImpliesProofResult]):
412 proof: ImpliesProof
413 kcfg_explore: KCFGExplore
414 assume_defined: bool
415
[docs]
416 def close(self) -> None:
417 self.kcfg_explore.cterm_symbolic._kore_client.close()
418
419 def __init__(self, proof: ImpliesProof, kcfg_explore: KCFGExplore, assume_defined: bool = False):
420 self.kcfg_explore = kcfg_explore
421 self.proof = proof
422 self.assume_defined = assume_defined
423
[docs]
424 def step_proof(self, step: ImpliesProofStep) -> list[ImpliesProofResult]:
425 proof_type = type(step.proof).__name__
426 _LOGGER.info(f'Attempting {proof_type} {step.proof.id}')
427
428 if step.proof.status is not ProofStatus.PENDING:
429 _LOGGER.info(f'{proof_type} finished {step.proof.id}: {step.proof.status}')
430 return []
431
432 # to prove the equality, we check the implication of the form `constraints #Implies LHS #Equals RHS`, i.e.
433 # "LHS equals RHS under these constraints"
434 simplified_antecedent, _ = self.kcfg_explore.cterm_symbolic.kast_simplify(step.proof.antecedent)
435 simplified_consequent, _ = self.kcfg_explore.cterm_symbolic.kast_simplify(step.proof.consequent)
436 _LOGGER.debug(f'Simplified antecedent: {self.kcfg_explore.pretty_print(simplified_antecedent)}')
437 _LOGGER.debug(f'Simplified consequent: {self.kcfg_explore.pretty_print(simplified_consequent)}')
438
439 csubst: CSubst | None = None
440
441 if is_bottom(simplified_antecedent):
442 _LOGGER.warning(f'Antecedent of implication (proof constraints) simplifies to #Bottom {step.proof.id}')
443 csubst = CSubst(Subst({}), ())
444
445 elif is_top(simplified_consequent):
446 _LOGGER.warning(f'Consequent of implication (proof equality) simplifies to #Top {step.proof.id}')
447 csubst = CSubst(Subst({}), ())
448
449 else:
450 # TODO: we should not be forced to include the dummy configuration in the antecedent and consequent
451 dummy_config = self.kcfg_explore.cterm_symbolic._definition.empty_config(sort=GENERATED_TOP_CELL)
452 _result = self.kcfg_explore.cterm_symbolic.implies(
453 antecedent=CTerm(config=dummy_config, constraints=[simplified_antecedent]),
454 consequent=CTerm(config=dummy_config, constraints=[simplified_consequent]),
455 bind_universally=step.proof.bind_universally,
456 assume_defined=self.assume_defined,
457 )
458 result = _result.csubst
459 if result is not None:
460 csubst = result
461
462 _LOGGER.info(f'{proof_type} finished {step.proof.id}: {step.proof.status}')
463 return [
464 ImpliesProofResult(
465 csubst=csubst, simplified_antecedent=simplified_antecedent, simplified_consequent=simplified_consequent
466 )
467 ]
468
[docs]
469 def init_proof(self, proof: ImpliesProof) -> None:
470 # Stamp proof.id on every subsequent kore-RPC request from this thread so
471 # booster's `{request: ...}` log lines self-identify the originating
472 # claim. Worker-thread consumers must `client_label.set(...)` inside
473 # their own closure — `ContextVar` is not propagated by ThreadPoolExecutor.
474 client_label.set(proof.id)
475
[docs]
476 def failure_info(self, proof: ImpliesProof) -> FailureInfo:
477 # TODO add implementation
478 return FailureInfo()