mcmas.engine

mcmas.engine.

The main wrapper for the MCMAS engine. Core functionality to run the CLI inside a docker container, then parse the output to return JSON. See also

  1"""
  2mcmas.engine.
  3
  4The main wrapper for the MCMAS engine. Core functionality to run
  5the CLI inside a docker container, then parse the output to
  6return JSON.  See also
  7"""
  8
  9import atexit
 10import os
 11import pathlib
 12import re
 13import tempfile
 14import time
 15from pathlib import Path, PosixPath
 16from typing import Dict, Union
 17
 18import docker
 19import pydantic
 20
 21from mcmas import ispl, parser, sim, util
 22
 23LOGGER = util.get_logger(__name__)
 24DEFAULT_IMG = "ghcr.io/mattvonrocketstein/mcmas:v1.3.0"
 25DEFAULT_V = os.environ.get("MCMAS_VERBOSITY", "3")
 26MCMAS_DEBUG = os.environ.get("MCMAS_DEBUG", "0")
 27MCMAS_DEBUG = MCMAS_DEBUG == "1"
 28MCMAS_VERBOSE = os.environ.get("MCMAS_VERBOSE", "0")
 29MCMAS_VERBOSE = MCMAS_VERBOSE != "0"
 30
 31try:
 32    docker_client = docker.from_env()
 33except (docker.errors.DockerException,) as exc:
 34    LOGGER.critical(f"ERROR: could not create docker client: {exc}")
 35    LOGGER.critical("Simulations will not be able to run!")
 36    docker_client = None
 37
 38
 39def relpath(fname):
 40    try:
 41        return str(Path(fname).relative_to(os.getcwd()))
 42    except ValueError:
 43        return fname
 44
 45
 46@pydantic.validate_call
 47def parse_engine_output(text: str, file=None, exit_code=None) -> sim.Simulation:
 48    """
 49    Parses raw engine output to Simulation.
 50    """
 51    LOGGER.critical(f"file={file} exit_code={exit_code}")
 52
 53    formula_lines = re.findall(r"^\s*Formula number.*$", text, re.MULTILINE)
 54    true_props = [
 55        x[x.find(": ") + 2 : -len(", is TRUE in the model")].replace(" && ", " and ")
 56        for x in formula_lines
 57        if x.endswith("is TRUE in the model")
 58    ]
 59    false_props = [
 60        x[x.find(": ") + 2 : -len(", is FALSE in the model")].replace(" && ", " and ")
 61        for x in formula_lines
 62        if x.endswith("is FALSE in the model")
 63    ]
 64
 65    # trim the extra parens output includes
 66    # so that these *exactly* match input formulae
 67    # true_props=[x.lstrip().rstrip()[1:-1] for x in true_props]
 68    # false_props=[x.lstrip().rstrip()[1:-1] for x in false_props]
 69    # true_props = [x.replace(' && ',' and ') for x in true_props]
 70
 71    match = re.search(r"BDD memory in use = (\d+)", text)
 72    bdd_memory = int(match.group(1)) if match else 0
 73
 74    match = re.search(r"There is no deadlock state in the model!", text)
 75    deadlock = not bool(match)
 76
 77    match = re.search(r"execution time = ([\d.]+)", text)
 78    execution_time = float(match.group(1)) if match else None
 79
 80    match = re.search(r"It took ([\d.]+) seconds to generate state space.", text)
 81    gen_state_space = float(match.group(1)) if match else None
 82
 83    match = re.search(r"It took ([\d.]+) seconds to encode transition relation.", text)
 84    enc_time = float(match.group(1)) if match else None
 85    match = re.search(r"number of reachable states = ([\d]+)", text)
 86    reachable_states = int(match.group(1)) if match else None
 87
 88    match = re.search(r" has been parsed successfully.\n", text)
 89    parsed = bool(match)  # True if match else False
 90
 91    match = re.search(r"(.*) has error\(s\)[.]", text)
 92    parsed = False if match else parsed
 93
 94    spec_validates = parsed and (len(formula_lines) == len(true_props))
 95    error = exit_code not in [0, 139]
 96    error and LOGGER.critical(f"error {error}")
 97    metadata = sim.Simulation.Metadata(
 98        parsed=parsed,
 99        file=file,
100        exit_code=exit_code,
101        validates=parsed and spec_validates,
102        deadlock=deadlock if parsed else None,
103    )
104    data = {"error": error and text, "metadata": metadata}
105    if not error:
106        metadata = metadata.model_copy(
107            update={
108                "file": relpath(file),
109                "timing": {
110                    "generate_time": gen_state_space,
111                    "execution_time": execution_time,
112                    "encoding_time": enc_time,
113                },
114            }
115        )
116        if not metadata.validates:
117            _hint = f"{metadata.file}:"
118            errors = [
119                ":".join(line[line.find(_hint) :].split(":")[1:])
120                for line in text.split("\n")
121                if _hint in line
122            ]
123            del metadata.timing
124        else:
125            errors = []
126            data.update(
127                facts={"true": true_props, "false": false_props},
128                state_space={
129                    "reachable_states": reachable_states,
130                    "memory": {"bdd": bdd_memory},
131                },
132            )
133        data.update(
134            metadata=metadata,
135            text=text if not error else None,
136            error=errors,
137        )
138    out = sim.Simulation(**data)
139
140    return out
141
142
143@pydantic.validate_call
144def get_help() -> str:
145    return (
146        docker_client.containers.run(
147            DEFAULT_IMG,
148            stdout=True,
149            stderr=True,
150            detach=True,
151        )
152        .logs()
153        .decode("utf-8")
154    )
155
156
157@pydantic.validate_call
158def show_help() -> None:
159    """
160    
161    """
162    print(get_help())
163
164
165@pydantic.validate_call
166def mcmas(
167    img: str = DEFAULT_IMG,
168    cmd: str = "",
169    # force: bool = False,
170    output_format: str = "data",
171    fname: Union[str, PosixPath] = "",
172    witness: bool = False,
173    model=None,
174    # raw: bool = False,
175    strict: bool = False,
176    validate_only: bool = False,
177) -> Union[Dict, str, bool]:
178    """
179    Proxies an invocation of the mcmas engine through to the
180    containerized CLI.
181    """
182
183    def create_witness_folder(tmpd, img):
184        container = docker_client.containers.run(
185            img,
186            entrypoint="bash",
187            command=f"-x -c 'mkdir {tmpd}'",
188            working_dir="/workspace",
189            volumes=volumes,
190            stdout=True,
191            stderr=True,
192            detach=True,
193        )
194        container.wait()
195        container.reload()
196        exit_code = container.attrs["State"]["ExitCode"]
197        assert exit_code == 0, "failed creating {tmpd} from container {img}!"
198
199    def clean_witness_folder(tmpd, img):
200        """
201        Registered with atexit module.
202
203        Removes the .info and .dot files created by `mcmas`,
204        using the same container to avoid permissions issues
205        """
206        LOGGER.info("cleaning folder for witnesses")
207        docker_client.containers.run(
208            img,
209            volumes=volumes,
210            entrypoint="bash",
211            working_dir="/workspace",
212            stdout=True,
213            stderr=True,
214            command=f"-c 'mv {tmpd} /tmp'",
215        )
216
217    LOGGER.debug(f"running mcmas with img={img} fname={fname} model={model}")
218    cmd = f"-v {DEFAULT_V} " + cmd if "-v" not in cmd else cmd
219    cmd = "-a " + cmd if "-a " not in cmd else cmd
220    cmd = "-k " + cmd if "-k " not in cmd else cmd
221    volumes = [
222        f"{os.getcwd()}:/workspace",
223    ]
224    if witness:
225        tmpd = f".tmp.mcmas_{time.time()}"
226        cmd = "-c 2 " + cmd if "-c " not in cmd else cmd
227        cmd = f"-p ./{tmpd} " + cmd if "-p " not in cmd else cmd
228        LOGGER.debug(f"paving tmpdir for witnesses: {tmpd}")
229        create_witness_folder(tmpd, img)
230        LOGGER.debug("registering cleanup at exit")
231        atexit.register(clean_witness_folder, tmpd, img)
232
233    validate_only = validate_only or "--validate-only" in cmd
234    if "--strict" in cmd:
235        strict = True
236        cmd = cmd.replace("--strict", "")
237    if validate_only:
238        LOGGER.info(f"Requested validation (strict={strict})")
239        cmd = f"-v {DEFAULT_V} -s"
240    else:
241        LOGGER.info(f"cmd={cmd}")
242    assert fname or model
243
244    if fname and not str(fname).startswith("<<"):
245        LOGGER.debug(f"fname={fname}")
246        abspath = pathlib.Path(fname).absolute()
247        command = f"{cmd} {abspath}"
248        if abspath.exists():
249            dirname = abspath.parent
250            volumes += [f"{dirname}:{dirname}"]
251            LOGGER.debug(f"file on host @{dirname}.. adding volume {volumes}")
252        else:
253            err = f"{abspath} does not exist on host"
254            LOGGER.critical(err)
255            raise SystemExit(1)
256    else:
257        raise NotImplementedError("expected an argument for filename")
258    LOGGER.debug(f"command={command}")
259    text = None
260    exit_code = -1
261    try:
262        container = docker_client.containers.run(
263            img,
264            entrypoint="mcmas",
265            command=command,
266            working_dir="/workspace",
267            volumes=volumes,
268            stdout=True,
269            stderr=True,
270            detach=True,
271        )
272        container.wait()
273        container.reload()
274        while container.status != "exited":
275            time.sleep(0.1)
276            container.reload()
277        text = container.logs(stdout=True, stderr=True).decode()
278        exit_code = container.attrs["State"]["ExitCode"]
279    except (docker.errors.ContainerError,) as exc:
280        err = f"failed trying to use the container: stderr=\n\n{exc.stderr.decode('utf-8')}"
281        LOGGER.debug(err)
282        if strict:
283            raise
284        else:
285            text = str(exc)
286    if any([exit_code != 0, validate_only]):
287        LOGGER.critical(f"exit_code={exit_code} validate_only={validate_only}")
288        if validate_only:
289            pattern = r"Global syntax checking.*?(?:\n1)*\nDone"
290            match = re.search(pattern, text, re.DOTALL)
291            if match:
292                return True
293            else:
294                LOGGER.critical(f"Global syntax checking failed: {match}")
295                LOGGER.critical(f"Cannot validate {fname}")
296                return False
297        try:
298            sim = parse_engine_output(text, file=fname, exit_code=exit_code)
299        except (Exception,) as exc:
300            LOGGER.critical(text)
301            LOGGER.critical(exc)
302            raise
303    elif exit_code != 0:
304        LOGGER.critical(f"exit_code={exit_code}")
305        if strict:
306            raise SystemExit(1)
307        else:
308            tmp = parse_engine_output(
309                text, file=fname, exit_code=exit_code
310            ).model_dump()
311            tmp.pop("error"), tmp.pop("text"), tmp.pop("exit_code")
312            sim = sim.Simulation(
313                text=None,
314                metadata=tmp.metadata.model_copy(update={"exit_code": exit_code}),
315                error=text,
316                **tmp,
317            )
318    else:
319        LOGGER.critical(
320            f"requested sim, not validation, file={fname} exit_code={exit_code}"
321        )
322        sim = parse_engine_output(text, file=fname, exit_code=exit_code)
323        if model:
324            model = model.model_copy(update={"metadata": sim.metadata})
325    if not text:
326        err = "nothing returned from engine"
327        LOGGER.critical(err)
328        raise Exception(container)
329
330    if witness and sim:
331        sim = load_witnesses(tmpd=tmpd, fname=fname, model=model, sim=sim)
332
333    if output_format in ["data"]:
334        return sim.model_dump()
335    elif output_format in ["model"]:
336        return sim
337    elif output_format in ["json"]:
338        return sim.model_dump_json(
339            # exclude_none=True,
340            indent=2
341        )
342    elif output_format in ["text"]:
343        return "\n".join(sim.error or sim.text)
344    else:
345        raise Exception(f"unrecognized output format {output_format}")
346
347
348def load_witnesses(tmpd=None, model=None, fname: Union[str, PosixPath] = "", sim=None):
349    """
350    
351    """
352    LOGGER.warning("loading witnesses..")
353    witnesses = dict()
354    for wfile in Path(tmpd).iterdir():
355        if str(wfile).endswith(".info"):
356            with open(wfile) as fhandle:
357                content = fhandle.read()
358                witnesses[wfile.stem] = parser.extract_witnesses(content, fname=wfile)
359    ordered = sorted([k for k in witnesses])
360    tmp = {}
361    for k in ordered:
362        tmp[k] = witnesses[k]
363    witnesses = {}
364
365    model = model or ispl.ISPL.load_from_ispl_file(file=str(fname))
366    forms = model.formulae
367    for i, fname_stem in enumerate(tmp.keys()):
368        witnesses[forms[i]] = tmp[fname_stem]
369    if len(forms) > len(witnesses):
370        LOGGER.warning("could not find witnesses for some formulae.")
371        missing = []
372        present = []
373        for fname_stem in tmp:
374            match = re.search(r"(\d+)$", fname_stem)
375            i = int(match.group(1))
376            present.append(i)
377        missing = [forms[i] for i in [x for x in range(len(tmp)) if x not in present]]
378        for m in missing:
379            LOGGER.warning(f" - {m}")
380            witnesses[m] = []
381        sim = sim.model_copy(update=dict(witnesses=witnesses))
382    sim = sim.model_copy(update=dict(witnesses=witnesses))
383    LOGGER.critical(
384        f"found witnesses for {len(witnesses)} of {len(model.formulae)} formulae"
385    )
386    return sim
387
388
389@pydantic.validate_call
390def validator(**kwargs) -> bool:
391    return engine(validate_only=True, **kwargs)
392
393
394@pydantic.validate_call
395def engine(
396    fname: Union[str, PosixPath] = "",
397    text: str = "",
398    model=None,
399    data: dict = {},
400    file: Union[str, PosixPath] = "",
401    **kwargs,
402) -> Union[Dict, str]:
403    """
404    Runs the engine on either a filename, a block of ISPL text,
405    or an ISPL- Specification object.
406    """
407    if text:
408        LOGGER.debug("------------")
409        LOGGER.debug(text)
410        LOGGER.debug("------------")
411        with tempfile.NamedTemporaryFile(
412            mode="w+", suffix=".ispl", delete=True, dir="."
413        ) as temp_file:
414            temp_file.write(text)
415            temp_file.flush()
416            result = mcmas(fname=temp_file.name, **kwargs)
417            # raise Exception(result.metadata)
418            return result
419    # elif file and file=='/dev/stdin':
420    #     import sys
421    #     raise Exception(sys.stdin.read())
422    #     return engine(text=sys.stdin.read())
423    elif model:
424        return engine(data=model.model_dump(), **kwargs)
425    elif fname or file:
426        return mcmas(fname=fname or file, **kwargs)
427    elif data:
428        return engine(text=util.dict2ispl(data), **kwargs)
429    else:
430        err = "No input, expected one of {text|model|data}"
431        raise Exception(err + f"\n{[fname,text,model,data,file]}")
432
433
434mcmas.validate = validator
435engine.validate = validator
436engine.show_help = show_help
437
438__all__ = [engine, validator]
@pydantic.validate_call
def engine( fname: Union[str, pathlib.PosixPath] = '', text: str = '', model=None, data: dict = {}, file: Union[str, pathlib.PosixPath] = '', **kwargs) -> Union[Dict, str]:
395@pydantic.validate_call
396def engine(
397    fname: Union[str, PosixPath] = "",
398    text: str = "",
399    model=None,
400    data: dict = {},
401    file: Union[str, PosixPath] = "",
402    **kwargs,
403) -> Union[Dict, str]:
404    """
405    Runs the engine on either a filename, a block of ISPL text,
406    or an ISPL- Specification object.
407    """
408    if text:
409        LOGGER.debug("------------")
410        LOGGER.debug(text)
411        LOGGER.debug("------------")
412        with tempfile.NamedTemporaryFile(
413            mode="w+", suffix=".ispl", delete=True, dir="."
414        ) as temp_file:
415            temp_file.write(text)
416            temp_file.flush()
417            result = mcmas(fname=temp_file.name, **kwargs)
418            # raise Exception(result.metadata)
419            return result
420    # elif file and file=='/dev/stdin':
421    #     import sys
422    #     raise Exception(sys.stdin.read())
423    #     return engine(text=sys.stdin.read())
424    elif model:
425        return engine(data=model.model_dump(), **kwargs)
426    elif fname or file:
427        return mcmas(fname=fname or file, **kwargs)
428    elif data:
429        return engine(text=util.dict2ispl(data), **kwargs)
430    else:
431        err = "No input, expected one of {text|model|data}"
432        raise Exception(err + f"\n{[fname,text,model,data,file]}")

Runs the engine on either a filename, a block of ISPL text, or an ISPL- Specification object.

@pydantic.validate_call
def validator(**kwargs) -> bool:
390@pydantic.validate_call
391def validator(**kwargs) -> bool:
392    return engine(validate_only=True, **kwargs)