mcmas.ispl
Pydantic models for holding ISPL programs and program- fragments.
1""" 2mcmas.ispl: 3 4Pydantic models for holding ISPL programs and program- fragments. 5""" 6 7import inspect 8import json 9import os 10import sys 11from typing import Optional 12 13import pydantic 14from pydantic import Field 15 16import mcmas 17 18# from . import typing 19from mcmas import fmtk, typing, util 20from mcmas.logic import symbols # noqa 21from mcmas.models import spec 22from mcmas.sim import SimType, Simulation 23 24Actions = typing.ActionsType 25 26LOGGER = util.get_logger(__name__) 27 28 29def check_slice(*args): 30 assert len(args) in [0, 1] 31 slice_maybe = args and args[0] 32 if slice_maybe == Ellipsis or isinstance(slice_maybe, (slice, tuple, type(None))): 33 return {} 34 elif isinstance(slice_maybe, (dict,)): 35 return slice_maybe 36 else: 37 err = f"expected slice or dict for posargs, got {[args, type(args)]}" 38 LOGGER.warning(err) 39 raise ValueError(err) 40 41 42############################################################################### 43ObsvarsField = Field( 44 default={}, 45 description=("Observable variables. Can be seen by other agents"), 46) 47VarsField = Field( 48 default={}, 49 description=("Private variables. Only the Environment can access"), 50) 51ActionsField = Field( 52 default=[], 53 description=( 54 "Defines the set of actions an agent can perform." 55 "Visible to all other agents." 56 ), 57) 58ProtocolField = Field( 59 description=( 60 "Action selection rules; how and when an agent can perform " 61 "specific actions based on its current state" 62 ), 63 default=[], 64) 65EvolutionField = Field( 66 default=[], 67 description=( 68 "Defines how an agent's local variables change in " 69 "response to the actions performed by all agents." 70 ), 71) 72############################################################################### 73 74 75class Fragment(fmtk.Fragment): 76 """ 77 A piece of an ISPL Specification. 78 79 May or may not be "concrete" as of yet, i.e. this may not yet 80 be useful for actually running a simulation! 81 """ 82 83 PROMPT_HINTS: typing.ClassVar = "" 84 logger: typing.ClassVar = LOGGER 85 86 def __init__(self, *args, **kwargs): 87 """ 88 89 """ 90 if args and args[0] and args[0] == Ellipsis: 91 tmp = self.__class__.get_trivial().model_dump() 92 tmp.update(**kwargs) 93 super().__init__(**tmp) 94 else: 95 super().__init__(*args, **kwargs) 96 97 def __call__(self, **kwargs): 98 """ 99 100 """ 101 for k, v in kwargs.items(): 102 setattr(self, k, v) 103 return self 104 105 def __lt__(self, other) -> bool: 106 """ 107 Specification algebra. 108 """ 109 if not isinstance(other, self.__class__): 110 raise TypeError(f"Cannot compare {type(self)} and {type(other)}") 111 return self.COMPARATOR(self) < self.COMPARATOR(other) 112 113 def __gt__(self, other) -> bool: 114 """ 115 Specification algebra. 116 """ 117 return not self.__lt__(other) 118 119 def __class_getitem__(kls, other): 120 """ 121 122 """ 123 if other == Ellipsis: 124 return kls(other) 125 else: 126 mod_name = other.__class__.__module__ 127 cname = "_load_from_" + mod_name.replace(".", "_") 128 tname = "_load_from_" + type(other).__name__ 129 alt = getattr(kls, tname, None) 130 constructor = getattr(kls, cname, alt) 131 if constructor is None: 132 err = ( 133 f"could not find constructor for `{kls}.{cname}` or `{kls}.{tname}`" 134 ) 135 LOGGER.critical(err) 136 raise ValueError(err) 137 else: 138 LOGGER.info(f"using constructor: {constructor}") 139 return constructor(other) 140 141 @classmethod 142 def _load_from_prompt(kls, txt, **extra) -> typing.Self: 143 """ 144 Create ISPL agent from the given prompt. 145 """ 146 import pydantic_ai 147 148 from mcmas.ai import DEFAULT_PROMPT, config 149 150 hints = """""" 151 prompt = DEFAULT_PROMPT + kls.PROMPT_HINTS 152 LOGGER.info(f"Attempting to load object from prompt:\n\n{prompt}") 153 agent = ( 154 pydantic_ai.Agent( 155 config.DEFAULT_MODEL, 156 output_type=pydantic_ai.PromptedOutput( 157 [kls], 158 template=prompt, 159 ), 160 ) 161 .run_sync(txt) 162 .output 163 ) 164 # FIXME: agent-only post-processing; move to subclass 165 if hasattr(agent, "name"): 166 agent.name = agent.name.lower() 167 agent.metadata.parser = f"{kls.__module__}.{kls.__name__}._load_from_prompt" 168 return agent 169 170 _load_from_str = _load_from_prompt 171 172 def analyze_types(self) -> typing.List[str]: 173 """ 174 175 """ 176 types = [] 177 types += getattr(self, "vars", {}).values() 178 types += getattr(self, "obsvars", {}).values() 179 types = sorted(list({x.strip() for x in types})) 180 return types 181 182 def analyze_complexity(self) -> typing.Dict: 183 """ 184 185 """ 186 from mcmas.models import spec 187 188 return [spec.ComplexityAnalysis()] 189 190 @pydantic.validate_call 191 def update(self, data: dict) -> typing.Self: 192 """ 193 194 """ 195 for k, v in self.model_dump().items(): 196 setattr(self, k, v) 197 return self 198 199 @pydantic.validate_call 200 def spec_validate(self) -> typing.Dict: 201 """ 202 Full validation output for this fragment (not a simple 203 bool!) 204 """ 205 return dict( 206 metadata=self.metadata.model_dump(), 207 validates=self.validates, 208 advice=self.advice, 209 ) 210 211 def __invert__(self): 212 """Model interpolation: returns this model""" 213 if not self.advice: 214 LOGGER.warning(f"{self} has no advice and appears complete, returning it.") 215 return self 216 else: 217 err = f"Not sure how from complete object like {self}" 218 LOGGER.critical(err) 219 raise NotImplementedError(err) 220 221 222class Environment(Fragment): 223 """ 224 Python wrapper for ISPL Environments. Environments model the 225 shared information and boundary conditions that all other 226 agents can observe. 227 228 See also the relevant [ISPL reference](http://mattvonrocketstein.github.io/py-mcmas/isplref/#environment) 229 """ 230 231 REQUIRED: typing.ClassVar = ["protocol", "actions"] 232 DefaultEnvironment: typing.ClassVar 233 actions: typing.ActionsType = ActionsField 234 vars: typing.VarsType = VarsField 235 obsvars: typing.ObsVarsType = ObsvarsField 236 evolution: typing.EvolType = EvolutionField 237 protocol: typing.ProtocolType = ProtocolField 238 239 def __len__(self): 240 """ 241 Specification Algebra. 242 243 The length of an agent is the number of items in the 244 description length 245 """ 246 return sum( 247 map( 248 len, 249 [ 250 self.actions, 251 self.evolution, 252 self.obsvars, 253 self.protocol, 254 self.vars, 255 getattr(self, "lobsvars", []), 256 getattr(self, "red_states", []), 257 ], 258 ) 259 ) 260 261 @util.classproperty 262 def DefaultEnvironment(kls): 263 return kls(...) 264 265 @classmethod 266 def get_trivial(kls, *args, **kwargs): 267 kwargs.update(check_slice(*args)) 268 actions = kwargs.pop("actions", [symbols.tick]) 269 protocol = kwargs.pop("protocol", dict(Other=[symbols.tick])) 270 vars = kwargs.pop("vars", dict(ticking="boolean")) 271 return kls(vars=vars, actions=actions, protocol=protocol, **kwargs) 272 273 @classmethod 274 @pydantic.validate_call 275 def load_from_source(kls, txt: str) -> typing.Self: 276 """ 277 Creates an Environment from string. 278 """ 279 from mcmas import parser 280 281 agents = parser.extract_agents(txt) 282 env = agents.pop("Environment", None) 283 assert env is not None 284 return Environment(**env) 285 286 287class Agent(Fragment): 288 """ 289 Python wrapper for ISPL Agents. 290 291 See also the relevant [ISPL reference](http://mattvonrocketstein.github.io/py-mcmas/isplref/#agent) 292 """ 293 294 COMPARATOR: typing.ClassVar = len 295 DefaultAgent: typing.ClassVar 296 REQUIRED: typing.ClassVar = ["protocol", "evolution", "actions"] 297 parser: typing.ClassVar 298 299 name: str = Field( 300 default="player", 301 description=("Name of this agent"), 302 ) 303 actions: typing.ActionsType = ActionsField 304 evolution: typing.EvolType = EvolutionField 305 obsvars: typing.ObsVarsType = ObsvarsField 306 protocol: typing.ProtocolType = ProtocolField 307 vars: typing.VarsType = VarsField 308 309 lobsvars: typing.LobsvarsType = Field( 310 description="", 311 default=[], 312 ) 313 red_states: typing.List[str] = Field( 314 description="", 315 default=[], 316 ) 317 318 __len__ = Environment.__len__ 319 320 @util.classproperty 321 def DefaultAgent(kls): 322 """ 323 Returns the trivial agent. 324 """ 325 return kls(...) 326 327 @classmethod 328 def _load_from_pydantic_ai_agent(kls, pagent, **extra) -> typing.Self: 329 """ 330 Create ISPL agent from the given pydantic agent. 331 """ 332 kls.logger.info(f"_load_from_pydantic_ai_agent: {pagent}") 333 from mcmas import util 334 335 name = pagent.name or f"Agent_{id(pagent)}" 336 actions = list(pagent._function_toolset.tools.keys()) 337 tools = list(pagent._function_toolset.tools.values()) 338 tool = tools[0] 339 if len(tools) > 1: 340 kls.logger.warning(f"pydantic agent `{name}` has multiple tools!") 341 kls.logger.warning(f"using just the first one: {tool}") 342 # naive conversion from function signature to ISPL types. 343 vars = util.fxn_sig.as_ispl_types(tool.function) 344 vars.pop("ctx", None) 345 return Agent( 346 name=name, 347 actions=actions, 348 vars=vars, 349 metadata=dict( 350 parser=f"{kls.__module__}.{kls.__name__}._load_from_pydantic_ai_agent", 351 file=inspect.getfile(pagent.__class__), 352 ), 353 **extra, 354 ).model_completion() 355 356 def analyze_symbols(self) -> typing.List[str]: 357 """ 358 Returns symbol-related metadata including agent-names, 359 actions, variables, and type info. 360 """ 361 return spec.SymbolMetadata( 362 agents=[self.name], 363 actions=self.actions, 364 vars=[k.strip() for k in list(self.lobsvars) + list(self.vars)], 365 types=self.analyze_types(), 366 ) 367 368 @property 369 @pydantic.validate_call 370 def analysis(self) -> spec.Analysis: 371 """ 372 Static-analysis for this Agent specification. 373 374 Returns details about symbols and logical operators. 375 """ 376 return spec.Analysis( 377 symbols=self.analyze_symbols(), 378 types=self.analyze_types(), 379 complexity=self.analyze_complexity(), 380 ) 381 # out.actions = [getattr(symbols, x) for x in sorted(list(set(out.actions)))] 382 # out.vars = [getattr(symbols, x) for x in sorted(list(set(out.vars)))] 383 # out.agents = [getattr(symbols, x) for x in sorted(list(set(out.agents)))] 384 # return meta 385 386 def __pow__(self, other: float = 0.1) -> typing.Self: 387 """ 388 389 """ 390 from mcmas import ai 391 392 return ai.model_mutation(obj=self, model_settings=dict(top_p=other)) 393 394 def __invert__(self) -> typing.Self: 395 """ 396 Trigger completion for this Agent. 397 398 This is NOT backed by an LLM; see instead `mcmas.ai.agent_completion`. 399 """ 400 if not self.advice: 401 err = f"{self} is already valid, returning it instead of completing" 402 LOGGER.warning(err) 403 return self 404 else: 405 tmp = self 406 trivial = DefaultAgent 407 defaults = dict( 408 protocol=trivial.protocol, 409 vars=trivial.vars, 410 evolution=trivial.evolution, 411 ) 412 for x in ["protocol", "vars", "evolution"]: 413 if getattr(self, x, None): 414 pass 415 else: 416 LOGGER.warning(f"could not find required {x}") 417 tmp = tmp.model_copy(update={x: defaults[x]}) 418 return tmp.model_copy( 419 update=dict(actions=list(set(tmp.actions + trivial.actions))) 420 ) 421 422 model_completion = __invert__ 423 424 @classmethod 425 def get_trivial(kls, *args, **kwargs): 426 """ 427 Smallest legal agent. 428 """ 429 kwargs.update(check_slice(*args)) 430 return kls( 431 name="trivial", 432 vars=dict(ticking="boolean"), 433 actions=["tick"], 434 protocol=["Other : {tick}"], 435 evolution=["ticking=true if Action=tick;"], 436 ) 437 438 def model_dump_source(self): 439 """ 440 Dump the source-code for this piece of the specification. 441 """ 442 from mcmas import rendering 443 444 return rendering.get_template("Agent.j2").render( 445 name=self.name, agent=self.model_dump() 446 ) 447 448 @util.classproperty 449 def parser(self) -> typing.Callable: 450 """ 451 Return an appropriate parser for this spec-fragment. 452 """ 453 from mcmas import parser 454 455 return parser.extract_agents 456 457 @classmethod 458 @pydantic.validate_call 459 def load_from_source(kls, txt, strict: bool = False) -> typing.Self: 460 """ 461 Load ISPL agent from string. 462 """ 463 agents = kls.parser(txt) 464 agents.pop("Environment", None) 465 if len(agents) != 1: 466 LOGGER.critical("load_from_source: more than 1 agent! returning first..") 467 return Agent(**list(agents.values())[0]) 468 469 @property 470 def local_advice(self) -> list: 471 return [] 472 473 474DefaultAgent = Agent.DefaultAgent 475DefaultEnvironment = Environment.DefaultEnvironment 476 477 478class ISpec(fmtk.Specification): 479 """ 480 481 """ 482 483 484import functools 485import operator 486 487 488class ISPL(Fragment): 489 """ 490 Python wrapper for ISPL specifications. 491 492 This permits partials or "fragments", i.e. the specification need not be complete and ready to run. 493 494 See the ISPL reference here: http://mattvonrocketstein.github.io/py-mcmas/isplref 495 """ 496 497 PROMPT_HINTS: typing.ClassVar = "Individual proper nouns refer to separate agents." 498 499 def __str__(self): 500 return f"<ISPL: agents={len(self.agents)} vars={len(self.environment.vars)}>" 501 502 def analyze_types(self): 503 types = super().analyze_types() 504 for agent in self.agents.values(): 505 types += agent.analyze_types() 506 return list(set(types)) 507 508 @classmethod 509 def get_trivial(kls, *args, **kwargs): 510 """ 511 ISPL(...) notation. Unlike ISPL(..) 512 513 This returns the trivial specification, with just enough 514 structure to validate and run, plus any optional 515 overrides. 516 """ 517 kwargs.update(check_slice(*args)) 518 title = kwargs.pop("title", "Minimal valid ISPL specification") 519 agents = kwargs.pop("agents", {"DefaultAgent": DefaultAgent}) 520 environment = kwargs.pop("environment", Environment(...)) 521 evaluation = kwargs.pop("evaluation", ["ticking if Environment.ticking=true"]) 522 init_states = kwargs.pop("init_states", ["Environment.ticking=true"]) 523 formulae = kwargs.pop("formulae", ["ticking"]) 524 return kls( 525 title=title, 526 environment=environment, 527 evaluation=evaluation, 528 init_states=init_states, 529 formulae=formulae, 530 agents=agents, 531 **kwargs, 532 ) 533 534 # Metadata: typing.ClassVar = fmtk.SpecificationMetadata 535 COMPARATOR: typing.ClassVar = abs 536 REQUIRED: typing.ClassVar = ["agents", "evaluation", "formulae"] 537 parser: typing.ClassVar 538 title: str = Field( 539 default="Untitled Model", 540 description="Optional title. Used as a comment at the top of the file", 541 ) 542 agents: typing.Dict[str, Agent] = Field( 543 default={}, 544 description="All agents involved in this specification. A map of {name: agent_object}", 545 ) 546 environment: Environment = Field( 547 default={}, 548 description="The environment for this specification", 549 ) 550 fairness: typing.Dict[str, typing.List[str]] = Field( 551 default={}, 552 description=( 553 "Specifies conditions that must hold infinitely often along " 554 "all execution paths, used to rule out unrealistic behaviors." 555 ), 556 ) 557 init_states: typing.InitStateType = Field( 558 default=[], 559 description="Initial global states of the system when verification begins.", 560 ) 561 evaluation: typing.EvalType = Field( 562 default=[], 563 description="Calculations, composites, aggregates that can be referenced in formulae", 564 ) 565 groups: typing.GroupsType = Field( 566 default={}, 567 description=( 568 "A map of {group_name: [member1, .. ]}" 569 "Defines collections of agents for use in group-based verification formulae." 570 ), 571 ) 572 573 formulae: typing.FormulaeType = Field( 574 description=( 575 "A list of formulas.\n\n" 576 "These will be partitioned into true/false categories, per the rest of the model" 577 ), 578 default=[], 579 ) 580 simulation: SimType = Field( 581 default=None, 582 description=( 583 "The result of simulating this specification. " 584 "Empty if simulation has never been run" 585 ), 586 ) 587 source_code: typing.Union[str, None] = Field( 588 default=None, 589 description=( 590 "Source code for this specification. " 591 "Only available if the specification was loaded from raw ISPL" 592 ), 593 ) 594 595 def __contains__(self, other): 596 """ 597 Specification algebra. 598 """ 599 if isinstance(other, Agent): 600 return other in self.agents.values() 601 else: 602 raise TypeError(f"__contains__ undefined for {[type(self), type(other)]}") 603 604 def __abs__(self) -> float: 605 """ 606 Specification algebra. 607 608 Used in __lt__ and __gt__. For ISPL specifications this 609 is the cumulative sum of formulae complexity 610 """ 611 return sum([c.score for c in self.analyze_complexity()]) 612 613 def __iadd__(self, other): 614 """ 615 Specification algebra. 616 617 This is for in-place addition, i.e. `spec+=Agent(..)` 618 """ 619 upd = self + other 620 self.update(upd.model_dump()) 621 return self 622 623 def __sub__(self, other): 624 """ 625 Specification algebra. 626 627 ISPL - agent => removes agent from this agent list, if present. 628 """ 629 # if isinstance(other, (ISPL,)): 630 # return self.model_copy(update=other.model_dump()) 631 # if isinstance(other, (Environment,)): 632 # return self.model_copy(update=dict(environment=other)) 633 if isinstance(other, (Agent,)): 634 agents = {} 635 for a in self.agents.values(): 636 if a.name != other.name: 637 agents[a.name] = a 638 return self.model_copy(update=dict(agents=agents)) 639 raise TypeError(f"Cannot add {type(self)} and {type(other)}") 640 641 def __add__(self, other): 642 """ 643 Specification algebra. 644 645 ISPL + agent => adds agent to spec ISPL + ISPL => 646 overrides first spec with values from 2nd 647 """ 648 if isinstance(other, (ISPL,)): 649 return self.model_copy(update=other.model_dump()) 650 if isinstance(other, (Environment,)): 651 return self.model_copy(update=dict(environment=other)) 652 if isinstance(other, (Agent,)): 653 agents = self.agents 654 agents.update(**{other.name: other}) 655 return self.model_copy(update=dict(agents=agents)) 656 raise TypeError(f"Cannot add {type(self)} and {type(other)}") 657 658 def model_dump_source(self): 659 """ 660 Dump the source-code for this piece of the specification. 661 """ 662 # from mcmas.engine import dict2ispl 663 return util.dict2ispl(self.model_dump()) 664 665 @util.classproperty 666 def parser(self): 667 """ 668 Shortcut for `mcmas.parser.parse` 669 """ 670 from mcmas import parser 671 672 return parser.parse 673 674 @property 675 def local_advice(self) -> list: 676 """ 677 678 """ 679 out = [] 680 for agent in self.agents: 681 agentish = self.agents[agent] 682 out += agentish.advice 683 return out 684 685 @classmethod 686 @pydantic.validate_call 687 def load_from_source( 688 kls, txt, strict: bool = False 689 ) -> typing.Dict[str, typing.Self]: 690 """ 691 Return ISPL object from given string. 692 """ 693 # from mcmas import parser 694 return kls.parser(txt) 695 696 @classmethod 697 @pydantic.validate_call 698 def load_from_ispl_file( 699 kls, 700 file: Optional[str] = None, 701 ): 702 """ 703 Return ISPL object from contents of given file. 704 """ 705 LOGGER.debug(f"ISPL.load_from_ispl_file: {file}") 706 metadata = dict(file=file) 707 if file: 708 assert os.path.exists(file), f"no such file: {file}" 709 if file.endswith("ispl"): 710 with open(file) as fhandle: 711 text = fhandle.read() 712 tmp = kls.parser(text, file=file) 713 data = tmp.model_dump(exclude="metadata") 714 return ISPL(metadata=ISPL.Metadata(**metadata), **data) 715 # return ISPL(metadata=dict(file=file, engine="bonk"), **data) 716 elif file.endswith("json"): 717 raise ValueError("refusing to work with ispl file") 718 elif file in ["-", "/dev/stdin"]: 719 metadata.update(file="<<stream>>") 720 text = sys.stdin.read().strip() 721 data = kls.parser(text).model_dump(exclude="metadata") 722 return ISPL(metadata=ISPL.Metadata(**metadata), **data) 723 else: 724 LOGGER.critical(f"could not create model from {file}") 725 raise Exception(file) 726 if text: 727 LOGGER.critical("NIY") 728 raise Exception(text) 729 730 load_from_file = load_from_ispl_file 731 732 @classmethod 733 @pydantic.validate_call 734 def load_from_json_file(kls, file: Optional[str] = None, text=None): 735 """ 736 Return ISPL object from the contents of given file. 737 738 File *must* be JSON encoded. 739 """ 740 LOGGER.critical(f"ISPL.load_from_json_file: {file}") 741 metadata = dict(file=file) 742 if file: 743 assert os.path.exists(file), f"no such file: {file}" 744 if file.endswith("json"): 745 with open(file) as fhandle: 746 data = json.loads(fhandle.read()) 747 return ISPL(metadata=ISPL.Metadata(**metadata), **data) 748 elif file.endswith("ispl"): 749 raise ValueError("refusing to work with ispl file") 750 elif file in ["-", "/dev/stdin"]: 751 metadata.update(file="<<stream>>") 752 text = sys.stdin.read().strip() 753 data = json.loads(text) 754 return ISPL( 755 metadata=ISPL.Metadata(**{**data.pop("metadata", {}), **metadata}), 756 **data, 757 ) 758 else: 759 LOGGER.critical(f"could not create model from {file}") 760 raise Exception(file) 761 762 if text: 763 LOGGER.critical("NIY") 764 raise Exception(text) 765 766 @property 767 @pydantic.validate_call 768 def validates(self) -> bool: 769 """ 770 Asks the engine directly whether this spec validates. 771 772 Note that this is ground-truth and not heuristic like the `valid` property elsewhere! 773 """ 774 return mcmas.engine.validate(model=self) 775 776 @property 777 @pydantic.validate_call 778 def analysis(self) -> spec.Analysis: 779 """ 780 Static-analysis for this ISPL specification. 781 782 Returns details about symbols and logical operators. 783 """ 784 ents = list(self.agents.values()) 785 ents += [self.environment] 786 vars = [] 787 for agent in ents: 788 if isinstance(agent, (Agent,)): 789 vars += [k.strip() for k in (agent.lobsvars or []) if k.strip()] 790 vars += [k.strip() for k in (agent.vars or []) if k.strip()] 791 vars = list(set(vars)) 792 meta = dict( 793 symbols=spec.SymbolMetadata( 794 agents=[agent for agent in self.agents], 795 vars=vars, 796 actions=list( 797 set( 798 functools.reduce( 799 operator.add, 800 [self.agents[agent].actions for agent in self.agents], 801 ) 802 ) 803 ), 804 ), 805 types=self.analyze_types(), 806 complexity=self.analyze_complexity(), 807 ) 808 meta = spec.Analysis(**meta) 809 return meta 810 811 def analyze_complexity(self) -> typing.List: 812 from mcmas.logic import complexity 813 814 return [ 815 complexity.analyzer.analyze(f.lstrip().rstrip(), index=i) 816 for i, f in enumerate(self.formulae) 817 ] 818 819 def exec(self, strict: bool = False, **kwargs) -> typing.Self: 820 """ 821 Execute this ISPL specification. 822 """ 823 required = ["init_states"] 824 self.logger.debug(f"validating: {self.source_code or self.model_dump_source()}") 825 826 # check advice before execution 827 if self.advice: 828 msg = "exec: Model has non-empty advice!" 829 LOGGER.critical(msg) 830 if strict: 831 raise RuntimeError(msg) 832 833 for k in required: 834 if not getattr(self, k): 835 err = f"Validation failed. Required key `{k}` is missing." 836 return self.model_copy( 837 update={ 838 # "source_code": src, 839 "simulation": Simulation( 840 error=err, 841 metadata=Simulation.Metadata(parsed=False, validates=False), 842 ), 843 } 844 ) 845 846 self.logger.debug("starting..") 847 sim = mcmas.engine(text=self.model_dump_source(), output_format="model") 848 out = self.model_copy( 849 update=dict( 850 source_code=self.source_code or self.model_dump_source(), 851 simulation=sim, 852 metadata=self.metadata.model_dump(), 853 ) 854 ) 855 self.logger.debug("done") 856 return out 857 858 run_sim = run_simulation = exec 859 860 def repl(self): 861 """ 862 Start a REPL shell with this object available as `spec`. 863 """ 864 result_model = self.exec() 865 return util.repl(spec=result_model) 866 867 868DefaultISPL = ISPL(...)
30def check_slice(*args): 31 assert len(args) in [0, 1] 32 slice_maybe = args and args[0] 33 if slice_maybe == Ellipsis or isinstance(slice_maybe, (slice, tuple, type(None))): 34 return {} 35 elif isinstance(slice_maybe, (dict,)): 36 return slice_maybe 37 else: 38 err = f"expected slice or dict for posargs, got {[args, type(args)]}" 39 LOGGER.warning(err) 40 raise ValueError(err)
76class Fragment(fmtk.Fragment): 77 """ 78 A piece of an ISPL Specification. 79 80 May or may not be "concrete" as of yet, i.e. this may not yet 81 be useful for actually running a simulation! 82 """ 83 84 PROMPT_HINTS: typing.ClassVar = "" 85 logger: typing.ClassVar = LOGGER 86 87 def __init__(self, *args, **kwargs): 88 """ 89 90 """ 91 if args and args[0] and args[0] == Ellipsis: 92 tmp = self.__class__.get_trivial().model_dump() 93 tmp.update(**kwargs) 94 super().__init__(**tmp) 95 else: 96 super().__init__(*args, **kwargs) 97 98 def __call__(self, **kwargs): 99 """ 100 101 """ 102 for k, v in kwargs.items(): 103 setattr(self, k, v) 104 return self 105 106 def __lt__(self, other) -> bool: 107 """ 108 Specification algebra. 109 """ 110 if not isinstance(other, self.__class__): 111 raise TypeError(f"Cannot compare {type(self)} and {type(other)}") 112 return self.COMPARATOR(self) < self.COMPARATOR(other) 113 114 def __gt__(self, other) -> bool: 115 """ 116 Specification algebra. 117 """ 118 return not self.__lt__(other) 119 120 def __class_getitem__(kls, other): 121 """ 122 123 """ 124 if other == Ellipsis: 125 return kls(other) 126 else: 127 mod_name = other.__class__.__module__ 128 cname = "_load_from_" + mod_name.replace(".", "_") 129 tname = "_load_from_" + type(other).__name__ 130 alt = getattr(kls, tname, None) 131 constructor = getattr(kls, cname, alt) 132 if constructor is None: 133 err = ( 134 f"could not find constructor for `{kls}.{cname}` or `{kls}.{tname}`" 135 ) 136 LOGGER.critical(err) 137 raise ValueError(err) 138 else: 139 LOGGER.info(f"using constructor: {constructor}") 140 return constructor(other) 141 142 @classmethod 143 def _load_from_prompt(kls, txt, **extra) -> typing.Self: 144 """ 145 Create ISPL agent from the given prompt. 146 """ 147 import pydantic_ai 148 149 from mcmas.ai import DEFAULT_PROMPT, config 150 151 hints = """""" 152 prompt = DEFAULT_PROMPT + kls.PROMPT_HINTS 153 LOGGER.info(f"Attempting to load object from prompt:\n\n{prompt}") 154 agent = ( 155 pydantic_ai.Agent( 156 config.DEFAULT_MODEL, 157 output_type=pydantic_ai.PromptedOutput( 158 [kls], 159 template=prompt, 160 ), 161 ) 162 .run_sync(txt) 163 .output 164 ) 165 # FIXME: agent-only post-processing; move to subclass 166 if hasattr(agent, "name"): 167 agent.name = agent.name.lower() 168 agent.metadata.parser = f"{kls.__module__}.{kls.__name__}._load_from_prompt" 169 return agent 170 171 _load_from_str = _load_from_prompt 172 173 def analyze_types(self) -> typing.List[str]: 174 """ 175 176 """ 177 types = [] 178 types += getattr(self, "vars", {}).values() 179 types += getattr(self, "obsvars", {}).values() 180 types = sorted(list({x.strip() for x in types})) 181 return types 182 183 def analyze_complexity(self) -> typing.Dict: 184 """ 185 186 """ 187 from mcmas.models import spec 188 189 return [spec.ComplexityAnalysis()] 190 191 @pydantic.validate_call 192 def update(self, data: dict) -> typing.Self: 193 """ 194 195 """ 196 for k, v in self.model_dump().items(): 197 setattr(self, k, v) 198 return self 199 200 @pydantic.validate_call 201 def spec_validate(self) -> typing.Dict: 202 """ 203 Full validation output for this fragment (not a simple 204 bool!) 205 """ 206 return dict( 207 metadata=self.metadata.model_dump(), 208 validates=self.validates, 209 advice=self.advice, 210 ) 211 212 def __invert__(self): 213 """Model interpolation: returns this model""" 214 if not self.advice: 215 LOGGER.warning(f"{self} has no advice and appears complete, returning it.") 216 return self 217 else: 218 err = f"Not sure how from complete object like {self}" 219 LOGGER.critical(err) 220 raise NotImplementedError(err)
A piece of an ISPL Specification.
May or may not be "concrete" as of yet, i.e. this may not yet be useful for actually running a simulation!
200 @pydantic.validate_call 201 def spec_validate(self) -> typing.Dict: 202 """ 203 Full validation output for this fragment (not a simple 204 bool!) 205 """ 206 return dict( 207 metadata=self.metadata.model_dump(), 208 validates=self.validates, 209 advice=self.advice, 210 )
Full validation output for this fragment (not a simple bool!)
223class Environment(Fragment): 224 """ 225 Python wrapper for ISPL Environments. Environments model the 226 shared information and boundary conditions that all other 227 agents can observe. 228 229 See also the relevant [ISPL reference](http://mattvonrocketstein.github.io/py-mcmas/isplref/#environment) 230 """ 231 232 REQUIRED: typing.ClassVar = ["protocol", "actions"] 233 DefaultEnvironment: typing.ClassVar 234 actions: typing.ActionsType = ActionsField 235 vars: typing.VarsType = VarsField 236 obsvars: typing.ObsVarsType = ObsvarsField 237 evolution: typing.EvolType = EvolutionField 238 protocol: typing.ProtocolType = ProtocolField 239 240 def __len__(self): 241 """ 242 Specification Algebra. 243 244 The length of an agent is the number of items in the 245 description length 246 """ 247 return sum( 248 map( 249 len, 250 [ 251 self.actions, 252 self.evolution, 253 self.obsvars, 254 self.protocol, 255 self.vars, 256 getattr(self, "lobsvars", []), 257 getattr(self, "red_states", []), 258 ], 259 ) 260 ) 261 262 @util.classproperty 263 def DefaultEnvironment(kls): 264 return kls(...) 265 266 @classmethod 267 def get_trivial(kls, *args, **kwargs): 268 kwargs.update(check_slice(*args)) 269 actions = kwargs.pop("actions", [symbols.tick]) 270 protocol = kwargs.pop("protocol", dict(Other=[symbols.tick])) 271 vars = kwargs.pop("vars", dict(ticking="boolean")) 272 return kls(vars=vars, actions=actions, protocol=protocol, **kwargs) 273 274 @classmethod 275 @pydantic.validate_call 276 def load_from_source(kls, txt: str) -> typing.Self: 277 """ 278 Creates an Environment from string. 279 """ 280 from mcmas import parser 281 282 agents = parser.extract_agents(txt) 283 env = agents.pop("Environment", None) 284 assert env is not None 285 return Environment(**env)
Python wrapper for ISPL Environments. Environments model the shared information and boundary conditions that all other agents can observe.
See also the relevant ISPL reference
266 @classmethod 267 def get_trivial(kls, *args, **kwargs): 268 kwargs.update(check_slice(*args)) 269 actions = kwargs.pop("actions", [symbols.tick]) 270 protocol = kwargs.pop("protocol", dict(Other=[symbols.tick])) 271 vars = kwargs.pop("vars", dict(ticking="boolean")) 272 return kls(vars=vars, actions=actions, protocol=protocol, **kwargs)
Subclassers must implement this.
Returns the trivial fragment for this type.
274 @classmethod 275 @pydantic.validate_call 276 def load_from_source(kls, txt: str) -> typing.Self: 277 """ 278 Creates an Environment from string. 279 """ 280 from mcmas import parser 281 282 agents = parser.extract_agents(txt) 283 env = agents.pop("Environment", None) 284 assert env is not None 285 return Environment(**env)
Creates an Environment from string.
288class Agent(Fragment): 289 """ 290 Python wrapper for ISPL Agents. 291 292 See also the relevant [ISPL reference](http://mattvonrocketstein.github.io/py-mcmas/isplref/#agent) 293 """ 294 295 COMPARATOR: typing.ClassVar = len 296 DefaultAgent: typing.ClassVar 297 REQUIRED: typing.ClassVar = ["protocol", "evolution", "actions"] 298 parser: typing.ClassVar 299 300 name: str = Field( 301 default="player", 302 description=("Name of this agent"), 303 ) 304 actions: typing.ActionsType = ActionsField 305 evolution: typing.EvolType = EvolutionField 306 obsvars: typing.ObsVarsType = ObsvarsField 307 protocol: typing.ProtocolType = ProtocolField 308 vars: typing.VarsType = VarsField 309 310 lobsvars: typing.LobsvarsType = Field( 311 description="", 312 default=[], 313 ) 314 red_states: typing.List[str] = Field( 315 description="", 316 default=[], 317 ) 318 319 __len__ = Environment.__len__ 320 321 @util.classproperty 322 def DefaultAgent(kls): 323 """ 324 Returns the trivial agent. 325 """ 326 return kls(...) 327 328 @classmethod 329 def _load_from_pydantic_ai_agent(kls, pagent, **extra) -> typing.Self: 330 """ 331 Create ISPL agent from the given pydantic agent. 332 """ 333 kls.logger.info(f"_load_from_pydantic_ai_agent: {pagent}") 334 from mcmas import util 335 336 name = pagent.name or f"Agent_{id(pagent)}" 337 actions = list(pagent._function_toolset.tools.keys()) 338 tools = list(pagent._function_toolset.tools.values()) 339 tool = tools[0] 340 if len(tools) > 1: 341 kls.logger.warning(f"pydantic agent `{name}` has multiple tools!") 342 kls.logger.warning(f"using just the first one: {tool}") 343 # naive conversion from function signature to ISPL types. 344 vars = util.fxn_sig.as_ispl_types(tool.function) 345 vars.pop("ctx", None) 346 return Agent( 347 name=name, 348 actions=actions, 349 vars=vars, 350 metadata=dict( 351 parser=f"{kls.__module__}.{kls.__name__}._load_from_pydantic_ai_agent", 352 file=inspect.getfile(pagent.__class__), 353 ), 354 **extra, 355 ).model_completion() 356 357 def analyze_symbols(self) -> typing.List[str]: 358 """ 359 Returns symbol-related metadata including agent-names, 360 actions, variables, and type info. 361 """ 362 return spec.SymbolMetadata( 363 agents=[self.name], 364 actions=self.actions, 365 vars=[k.strip() for k in list(self.lobsvars) + list(self.vars)], 366 types=self.analyze_types(), 367 ) 368 369 @property 370 @pydantic.validate_call 371 def analysis(self) -> spec.Analysis: 372 """ 373 Static-analysis for this Agent specification. 374 375 Returns details about symbols and logical operators. 376 """ 377 return spec.Analysis( 378 symbols=self.analyze_symbols(), 379 types=self.analyze_types(), 380 complexity=self.analyze_complexity(), 381 ) 382 # out.actions = [getattr(symbols, x) for x in sorted(list(set(out.actions)))] 383 # out.vars = [getattr(symbols, x) for x in sorted(list(set(out.vars)))] 384 # out.agents = [getattr(symbols, x) for x in sorted(list(set(out.agents)))] 385 # return meta 386 387 def __pow__(self, other: float = 0.1) -> typing.Self: 388 """ 389 390 """ 391 from mcmas import ai 392 393 return ai.model_mutation(obj=self, model_settings=dict(top_p=other)) 394 395 def __invert__(self) -> typing.Self: 396 """ 397 Trigger completion for this Agent. 398 399 This is NOT backed by an LLM; see instead `mcmas.ai.agent_completion`. 400 """ 401 if not self.advice: 402 err = f"{self} is already valid, returning it instead of completing" 403 LOGGER.warning(err) 404 return self 405 else: 406 tmp = self 407 trivial = DefaultAgent 408 defaults = dict( 409 protocol=trivial.protocol, 410 vars=trivial.vars, 411 evolution=trivial.evolution, 412 ) 413 for x in ["protocol", "vars", "evolution"]: 414 if getattr(self, x, None): 415 pass 416 else: 417 LOGGER.warning(f"could not find required {x}") 418 tmp = tmp.model_copy(update={x: defaults[x]}) 419 return tmp.model_copy( 420 update=dict(actions=list(set(tmp.actions + trivial.actions))) 421 ) 422 423 model_completion = __invert__ 424 425 @classmethod 426 def get_trivial(kls, *args, **kwargs): 427 """ 428 Smallest legal agent. 429 """ 430 kwargs.update(check_slice(*args)) 431 return kls( 432 name="trivial", 433 vars=dict(ticking="boolean"), 434 actions=["tick"], 435 protocol=["Other : {tick}"], 436 evolution=["ticking=true if Action=tick;"], 437 ) 438 439 def model_dump_source(self): 440 """ 441 Dump the source-code for this piece of the specification. 442 """ 443 from mcmas import rendering 444 445 return rendering.get_template("Agent.j2").render( 446 name=self.name, agent=self.model_dump() 447 ) 448 449 @util.classproperty 450 def parser(self) -> typing.Callable: 451 """ 452 Return an appropriate parser for this spec-fragment. 453 """ 454 from mcmas import parser 455 456 return parser.extract_agents 457 458 @classmethod 459 @pydantic.validate_call 460 def load_from_source(kls, txt, strict: bool = False) -> typing.Self: 461 """ 462 Load ISPL agent from string. 463 """ 464 agents = kls.parser(txt) 465 agents.pop("Environment", None) 466 if len(agents) != 1: 467 LOGGER.critical("load_from_source: more than 1 agent! returning first..") 468 return Agent(**list(agents.values())[0]) 469 470 @property 471 def local_advice(self) -> list: 472 return []
Python wrapper for ISPL Agents.
See also the relevant ISPL reference
357 def analyze_symbols(self) -> typing.List[str]: 358 """ 359 Returns symbol-related metadata including agent-names, 360 actions, variables, and type info. 361 """ 362 return spec.SymbolMetadata( 363 agents=[self.name], 364 actions=self.actions, 365 vars=[k.strip() for k in list(self.lobsvars) + list(self.vars)], 366 types=self.analyze_types(), 367 )
Returns symbol-related metadata including agent-names, actions, variables, and type info.
369 @property 370 @pydantic.validate_call 371 def analysis(self) -> spec.Analysis: 372 """ 373 Static-analysis for this Agent specification. 374 375 Returns details about symbols and logical operators. 376 """ 377 return spec.Analysis( 378 symbols=self.analyze_symbols(), 379 types=self.analyze_types(), 380 complexity=self.analyze_complexity(), 381 ) 382 # out.actions = [getattr(symbols, x) for x in sorted(list(set(out.actions)))] 383 # out.vars = [getattr(symbols, x) for x in sorted(list(set(out.vars)))] 384 # out.agents = [getattr(symbols, x) for x in sorted(list(set(out.agents)))] 385 # return meta
Static-analysis for this Agent specification.
Returns details about symbols and logical operators.
395 def __invert__(self) -> typing.Self: 396 """ 397 Trigger completion for this Agent. 398 399 This is NOT backed by an LLM; see instead `mcmas.ai.agent_completion`. 400 """ 401 if not self.advice: 402 err = f"{self} is already valid, returning it instead of completing" 403 LOGGER.warning(err) 404 return self 405 else: 406 tmp = self 407 trivial = DefaultAgent 408 defaults = dict( 409 protocol=trivial.protocol, 410 vars=trivial.vars, 411 evolution=trivial.evolution, 412 ) 413 for x in ["protocol", "vars", "evolution"]: 414 if getattr(self, x, None): 415 pass 416 else: 417 LOGGER.warning(f"could not find required {x}") 418 tmp = tmp.model_copy(update={x: defaults[x]}) 419 return tmp.model_copy( 420 update=dict(actions=list(set(tmp.actions + trivial.actions))) 421 )
Trigger completion for this Agent.
This is NOT backed by an LLM; see instead mcmas.ai.agent_completion.
425 @classmethod 426 def get_trivial(kls, *args, **kwargs): 427 """ 428 Smallest legal agent. 429 """ 430 kwargs.update(check_slice(*args)) 431 return kls( 432 name="trivial", 433 vars=dict(ticking="boolean"), 434 actions=["tick"], 435 protocol=["Other : {tick}"], 436 evolution=["ticking=true if Action=tick;"], 437 )
Smallest legal agent.
439 def model_dump_source(self): 440 """ 441 Dump the source-code for this piece of the specification. 442 """ 443 from mcmas import rendering 444 445 return rendering.get_template("Agent.j2").render( 446 name=self.name, agent=self.model_dump() 447 )
Dump the source-code for this piece of the specification.
458 @classmethod 459 @pydantic.validate_call 460 def load_from_source(kls, txt, strict: bool = False) -> typing.Self: 461 """ 462 Load ISPL agent from string. 463 """ 464 agents = kls.parser(txt) 465 agents.pop("Environment", None) 466 if len(agents) != 1: 467 LOGGER.critical("load_from_source: more than 1 agent! returning first..") 468 return Agent(**list(agents.values())[0])
Load ISPL agent from string.
489class ISPL(Fragment): 490 """ 491 Python wrapper for ISPL specifications. 492 493 This permits partials or "fragments", i.e. the specification need not be complete and ready to run. 494 495 See the ISPL reference here: http://mattvonrocketstein.github.io/py-mcmas/isplref 496 """ 497 498 PROMPT_HINTS: typing.ClassVar = "Individual proper nouns refer to separate agents." 499 500 def __str__(self): 501 return f"<ISPL: agents={len(self.agents)} vars={len(self.environment.vars)}>" 502 503 def analyze_types(self): 504 types = super().analyze_types() 505 for agent in self.agents.values(): 506 types += agent.analyze_types() 507 return list(set(types)) 508 509 @classmethod 510 def get_trivial(kls, *args, **kwargs): 511 """ 512 ISPL(...) notation. Unlike ISPL(..) 513 514 This returns the trivial specification, with just enough 515 structure to validate and run, plus any optional 516 overrides. 517 """ 518 kwargs.update(check_slice(*args)) 519 title = kwargs.pop("title", "Minimal valid ISPL specification") 520 agents = kwargs.pop("agents", {"DefaultAgent": DefaultAgent}) 521 environment = kwargs.pop("environment", Environment(...)) 522 evaluation = kwargs.pop("evaluation", ["ticking if Environment.ticking=true"]) 523 init_states = kwargs.pop("init_states", ["Environment.ticking=true"]) 524 formulae = kwargs.pop("formulae", ["ticking"]) 525 return kls( 526 title=title, 527 environment=environment, 528 evaluation=evaluation, 529 init_states=init_states, 530 formulae=formulae, 531 agents=agents, 532 **kwargs, 533 ) 534 535 # Metadata: typing.ClassVar = fmtk.SpecificationMetadata 536 COMPARATOR: typing.ClassVar = abs 537 REQUIRED: typing.ClassVar = ["agents", "evaluation", "formulae"] 538 parser: typing.ClassVar 539 title: str = Field( 540 default="Untitled Model", 541 description="Optional title. Used as a comment at the top of the file", 542 ) 543 agents: typing.Dict[str, Agent] = Field( 544 default={}, 545 description="All agents involved in this specification. A map of {name: agent_object}", 546 ) 547 environment: Environment = Field( 548 default={}, 549 description="The environment for this specification", 550 ) 551 fairness: typing.Dict[str, typing.List[str]] = Field( 552 default={}, 553 description=( 554 "Specifies conditions that must hold infinitely often along " 555 "all execution paths, used to rule out unrealistic behaviors." 556 ), 557 ) 558 init_states: typing.InitStateType = Field( 559 default=[], 560 description="Initial global states of the system when verification begins.", 561 ) 562 evaluation: typing.EvalType = Field( 563 default=[], 564 description="Calculations, composites, aggregates that can be referenced in formulae", 565 ) 566 groups: typing.GroupsType = Field( 567 default={}, 568 description=( 569 "A map of {group_name: [member1, .. ]}" 570 "Defines collections of agents for use in group-based verification formulae." 571 ), 572 ) 573 574 formulae: typing.FormulaeType = Field( 575 description=( 576 "A list of formulas.\n\n" 577 "These will be partitioned into true/false categories, per the rest of the model" 578 ), 579 default=[], 580 ) 581 simulation: SimType = Field( 582 default=None, 583 description=( 584 "The result of simulating this specification. " 585 "Empty if simulation has never been run" 586 ), 587 ) 588 source_code: typing.Union[str, None] = Field( 589 default=None, 590 description=( 591 "Source code for this specification. " 592 "Only available if the specification was loaded from raw ISPL" 593 ), 594 ) 595 596 def __contains__(self, other): 597 """ 598 Specification algebra. 599 """ 600 if isinstance(other, Agent): 601 return other in self.agents.values() 602 else: 603 raise TypeError(f"__contains__ undefined for {[type(self), type(other)]}") 604 605 def __abs__(self) -> float: 606 """ 607 Specification algebra. 608 609 Used in __lt__ and __gt__. For ISPL specifications this 610 is the cumulative sum of formulae complexity 611 """ 612 return sum([c.score for c in self.analyze_complexity()]) 613 614 def __iadd__(self, other): 615 """ 616 Specification algebra. 617 618 This is for in-place addition, i.e. `spec+=Agent(..)` 619 """ 620 upd = self + other 621 self.update(upd.model_dump()) 622 return self 623 624 def __sub__(self, other): 625 """ 626 Specification algebra. 627 628 ISPL - agent => removes agent from this agent list, if present. 629 """ 630 # if isinstance(other, (ISPL,)): 631 # return self.model_copy(update=other.model_dump()) 632 # if isinstance(other, (Environment,)): 633 # return self.model_copy(update=dict(environment=other)) 634 if isinstance(other, (Agent,)): 635 agents = {} 636 for a in self.agents.values(): 637 if a.name != other.name: 638 agents[a.name] = a 639 return self.model_copy(update=dict(agents=agents)) 640 raise TypeError(f"Cannot add {type(self)} and {type(other)}") 641 642 def __add__(self, other): 643 """ 644 Specification algebra. 645 646 ISPL + agent => adds agent to spec ISPL + ISPL => 647 overrides first spec with values from 2nd 648 """ 649 if isinstance(other, (ISPL,)): 650 return self.model_copy(update=other.model_dump()) 651 if isinstance(other, (Environment,)): 652 return self.model_copy(update=dict(environment=other)) 653 if isinstance(other, (Agent,)): 654 agents = self.agents 655 agents.update(**{other.name: other}) 656 return self.model_copy(update=dict(agents=agents)) 657 raise TypeError(f"Cannot add {type(self)} and {type(other)}") 658 659 def model_dump_source(self): 660 """ 661 Dump the source-code for this piece of the specification. 662 """ 663 # from mcmas.engine import dict2ispl 664 return util.dict2ispl(self.model_dump()) 665 666 @util.classproperty 667 def parser(self): 668 """ 669 Shortcut for `mcmas.parser.parse` 670 """ 671 from mcmas import parser 672 673 return parser.parse 674 675 @property 676 def local_advice(self) -> list: 677 """ 678 679 """ 680 out = [] 681 for agent in self.agents: 682 agentish = self.agents[agent] 683 out += agentish.advice 684 return out 685 686 @classmethod 687 @pydantic.validate_call 688 def load_from_source( 689 kls, txt, strict: bool = False 690 ) -> typing.Dict[str, typing.Self]: 691 """ 692 Return ISPL object from given string. 693 """ 694 # from mcmas import parser 695 return kls.parser(txt) 696 697 @classmethod 698 @pydantic.validate_call 699 def load_from_ispl_file( 700 kls, 701 file: Optional[str] = None, 702 ): 703 """ 704 Return ISPL object from contents of given file. 705 """ 706 LOGGER.debug(f"ISPL.load_from_ispl_file: {file}") 707 metadata = dict(file=file) 708 if file: 709 assert os.path.exists(file), f"no such file: {file}" 710 if file.endswith("ispl"): 711 with open(file) as fhandle: 712 text = fhandle.read() 713 tmp = kls.parser(text, file=file) 714 data = tmp.model_dump(exclude="metadata") 715 return ISPL(metadata=ISPL.Metadata(**metadata), **data) 716 # return ISPL(metadata=dict(file=file, engine="bonk"), **data) 717 elif file.endswith("json"): 718 raise ValueError("refusing to work with ispl file") 719 elif file in ["-", "/dev/stdin"]: 720 metadata.update(file="<<stream>>") 721 text = sys.stdin.read().strip() 722 data = kls.parser(text).model_dump(exclude="metadata") 723 return ISPL(metadata=ISPL.Metadata(**metadata), **data) 724 else: 725 LOGGER.critical(f"could not create model from {file}") 726 raise Exception(file) 727 if text: 728 LOGGER.critical("NIY") 729 raise Exception(text) 730 731 load_from_file = load_from_ispl_file 732 733 @classmethod 734 @pydantic.validate_call 735 def load_from_json_file(kls, file: Optional[str] = None, text=None): 736 """ 737 Return ISPL object from the contents of given file. 738 739 File *must* be JSON encoded. 740 """ 741 LOGGER.critical(f"ISPL.load_from_json_file: {file}") 742 metadata = dict(file=file) 743 if file: 744 assert os.path.exists(file), f"no such file: {file}" 745 if file.endswith("json"): 746 with open(file) as fhandle: 747 data = json.loads(fhandle.read()) 748 return ISPL(metadata=ISPL.Metadata(**metadata), **data) 749 elif file.endswith("ispl"): 750 raise ValueError("refusing to work with ispl file") 751 elif file in ["-", "/dev/stdin"]: 752 metadata.update(file="<<stream>>") 753 text = sys.stdin.read().strip() 754 data = json.loads(text) 755 return ISPL( 756 metadata=ISPL.Metadata(**{**data.pop("metadata", {}), **metadata}), 757 **data, 758 ) 759 else: 760 LOGGER.critical(f"could not create model from {file}") 761 raise Exception(file) 762 763 if text: 764 LOGGER.critical("NIY") 765 raise Exception(text) 766 767 @property 768 @pydantic.validate_call 769 def validates(self) -> bool: 770 """ 771 Asks the engine directly whether this spec validates. 772 773 Note that this is ground-truth and not heuristic like the `valid` property elsewhere! 774 """ 775 return mcmas.engine.validate(model=self) 776 777 @property 778 @pydantic.validate_call 779 def analysis(self) -> spec.Analysis: 780 """ 781 Static-analysis for this ISPL specification. 782 783 Returns details about symbols and logical operators. 784 """ 785 ents = list(self.agents.values()) 786 ents += [self.environment] 787 vars = [] 788 for agent in ents: 789 if isinstance(agent, (Agent,)): 790 vars += [k.strip() for k in (agent.lobsvars or []) if k.strip()] 791 vars += [k.strip() for k in (agent.vars or []) if k.strip()] 792 vars = list(set(vars)) 793 meta = dict( 794 symbols=spec.SymbolMetadata( 795 agents=[agent for agent in self.agents], 796 vars=vars, 797 actions=list( 798 set( 799 functools.reduce( 800 operator.add, 801 [self.agents[agent].actions for agent in self.agents], 802 ) 803 ) 804 ), 805 ), 806 types=self.analyze_types(), 807 complexity=self.analyze_complexity(), 808 ) 809 meta = spec.Analysis(**meta) 810 return meta 811 812 def analyze_complexity(self) -> typing.List: 813 from mcmas.logic import complexity 814 815 return [ 816 complexity.analyzer.analyze(f.lstrip().rstrip(), index=i) 817 for i, f in enumerate(self.formulae) 818 ] 819 820 def exec(self, strict: bool = False, **kwargs) -> typing.Self: 821 """ 822 Execute this ISPL specification. 823 """ 824 required = ["init_states"] 825 self.logger.debug(f"validating: {self.source_code or self.model_dump_source()}") 826 827 # check advice before execution 828 if self.advice: 829 msg = "exec: Model has non-empty advice!" 830 LOGGER.critical(msg) 831 if strict: 832 raise RuntimeError(msg) 833 834 for k in required: 835 if not getattr(self, k): 836 err = f"Validation failed. Required key `{k}` is missing." 837 return self.model_copy( 838 update={ 839 # "source_code": src, 840 "simulation": Simulation( 841 error=err, 842 metadata=Simulation.Metadata(parsed=False, validates=False), 843 ), 844 } 845 ) 846 847 self.logger.debug("starting..") 848 sim = mcmas.engine(text=self.model_dump_source(), output_format="model") 849 out = self.model_copy( 850 update=dict( 851 source_code=self.source_code or self.model_dump_source(), 852 simulation=sim, 853 metadata=self.metadata.model_dump(), 854 ) 855 ) 856 self.logger.debug("done") 857 return out 858 859 run_sim = run_simulation = exec 860 861 def repl(self): 862 """ 863 Start a REPL shell with this object available as `spec`. 864 """ 865 result_model = self.exec() 866 return util.repl(spec=result_model)
Python wrapper for ISPL specifications.
This permits partials or "fragments", i.e. the specification need not be complete and ready to run.
See the ISPL reference here: http://mattvonrocketstein.github.io/py-mcmas/isplref
509 @classmethod 510 def get_trivial(kls, *args, **kwargs): 511 """ 512 ISPL(...) notation. Unlike ISPL(..) 513 514 This returns the trivial specification, with just enough 515 structure to validate and run, plus any optional 516 overrides. 517 """ 518 kwargs.update(check_slice(*args)) 519 title = kwargs.pop("title", "Minimal valid ISPL specification") 520 agents = kwargs.pop("agents", {"DefaultAgent": DefaultAgent}) 521 environment = kwargs.pop("environment", Environment(...)) 522 evaluation = kwargs.pop("evaluation", ["ticking if Environment.ticking=true"]) 523 init_states = kwargs.pop("init_states", ["Environment.ticking=true"]) 524 formulae = kwargs.pop("formulae", ["ticking"]) 525 return kls( 526 title=title, 527 environment=environment, 528 evaluation=evaluation, 529 init_states=init_states, 530 formulae=formulae, 531 agents=agents, 532 **kwargs, 533 )
ISPL(...) notation. Unlike ISPL(..)
This returns the trivial specification, with just enough structure to validate and run, plus any optional overrides.
659 def model_dump_source(self): 660 """ 661 Dump the source-code for this piece of the specification. 662 """ 663 # from mcmas.engine import dict2ispl 664 return util.dict2ispl(self.model_dump())
Dump the source-code for this piece of the specification.
686 @classmethod 687 @pydantic.validate_call 688 def load_from_source( 689 kls, txt, strict: bool = False 690 ) -> typing.Dict[str, typing.Self]: 691 """ 692 Return ISPL object from given string. 693 """ 694 # from mcmas import parser 695 return kls.parser(txt)
Return ISPL object from given string.
697 @classmethod 698 @pydantic.validate_call 699 def load_from_ispl_file( 700 kls, 701 file: Optional[str] = None, 702 ): 703 """ 704 Return ISPL object from contents of given file. 705 """ 706 LOGGER.debug(f"ISPL.load_from_ispl_file: {file}") 707 metadata = dict(file=file) 708 if file: 709 assert os.path.exists(file), f"no such file: {file}" 710 if file.endswith("ispl"): 711 with open(file) as fhandle: 712 text = fhandle.read() 713 tmp = kls.parser(text, file=file) 714 data = tmp.model_dump(exclude="metadata") 715 return ISPL(metadata=ISPL.Metadata(**metadata), **data) 716 # return ISPL(metadata=dict(file=file, engine="bonk"), **data) 717 elif file.endswith("json"): 718 raise ValueError("refusing to work with ispl file") 719 elif file in ["-", "/dev/stdin"]: 720 metadata.update(file="<<stream>>") 721 text = sys.stdin.read().strip() 722 data = kls.parser(text).model_dump(exclude="metadata") 723 return ISPL(metadata=ISPL.Metadata(**metadata), **data) 724 else: 725 LOGGER.critical(f"could not create model from {file}") 726 raise Exception(file) 727 if text: 728 LOGGER.critical("NIY") 729 raise Exception(text)
Return ISPL object from contents of given file.
697 @classmethod 698 @pydantic.validate_call 699 def load_from_ispl_file( 700 kls, 701 file: Optional[str] = None, 702 ): 703 """ 704 Return ISPL object from contents of given file. 705 """ 706 LOGGER.debug(f"ISPL.load_from_ispl_file: {file}") 707 metadata = dict(file=file) 708 if file: 709 assert os.path.exists(file), f"no such file: {file}" 710 if file.endswith("ispl"): 711 with open(file) as fhandle: 712 text = fhandle.read() 713 tmp = kls.parser(text, file=file) 714 data = tmp.model_dump(exclude="metadata") 715 return ISPL(metadata=ISPL.Metadata(**metadata), **data) 716 # return ISPL(metadata=dict(file=file, engine="bonk"), **data) 717 elif file.endswith("json"): 718 raise ValueError("refusing to work with ispl file") 719 elif file in ["-", "/dev/stdin"]: 720 metadata.update(file="<<stream>>") 721 text = sys.stdin.read().strip() 722 data = kls.parser(text).model_dump(exclude="metadata") 723 return ISPL(metadata=ISPL.Metadata(**metadata), **data) 724 else: 725 LOGGER.critical(f"could not create model from {file}") 726 raise Exception(file) 727 if text: 728 LOGGER.critical("NIY") 729 raise Exception(text)
Return ISPL object from contents of given file.
733 @classmethod 734 @pydantic.validate_call 735 def load_from_json_file(kls, file: Optional[str] = None, text=None): 736 """ 737 Return ISPL object from the contents of given file. 738 739 File *must* be JSON encoded. 740 """ 741 LOGGER.critical(f"ISPL.load_from_json_file: {file}") 742 metadata = dict(file=file) 743 if file: 744 assert os.path.exists(file), f"no such file: {file}" 745 if file.endswith("json"): 746 with open(file) as fhandle: 747 data = json.loads(fhandle.read()) 748 return ISPL(metadata=ISPL.Metadata(**metadata), **data) 749 elif file.endswith("ispl"): 750 raise ValueError("refusing to work with ispl file") 751 elif file in ["-", "/dev/stdin"]: 752 metadata.update(file="<<stream>>") 753 text = sys.stdin.read().strip() 754 data = json.loads(text) 755 return ISPL( 756 metadata=ISPL.Metadata(**{**data.pop("metadata", {}), **metadata}), 757 **data, 758 ) 759 else: 760 LOGGER.critical(f"could not create model from {file}") 761 raise Exception(file) 762 763 if text: 764 LOGGER.critical("NIY") 765 raise Exception(text)
Return ISPL object from the contents of given file.
File must be JSON encoded.
767 @property 768 @pydantic.validate_call 769 def validates(self) -> bool: 770 """ 771 Asks the engine directly whether this spec validates. 772 773 Note that this is ground-truth and not heuristic like the `valid` property elsewhere! 774 """ 775 return mcmas.engine.validate(model=self)
Asks the engine directly whether this spec validates.
Note that this is ground-truth and not heuristic like the valid property elsewhere!
777 @property 778 @pydantic.validate_call 779 def analysis(self) -> spec.Analysis: 780 """ 781 Static-analysis for this ISPL specification. 782 783 Returns details about symbols and logical operators. 784 """ 785 ents = list(self.agents.values()) 786 ents += [self.environment] 787 vars = [] 788 for agent in ents: 789 if isinstance(agent, (Agent,)): 790 vars += [k.strip() for k in (agent.lobsvars or []) if k.strip()] 791 vars += [k.strip() for k in (agent.vars or []) if k.strip()] 792 vars = list(set(vars)) 793 meta = dict( 794 symbols=spec.SymbolMetadata( 795 agents=[agent for agent in self.agents], 796 vars=vars, 797 actions=list( 798 set( 799 functools.reduce( 800 operator.add, 801 [self.agents[agent].actions for agent in self.agents], 802 ) 803 ) 804 ), 805 ), 806 types=self.analyze_types(), 807 complexity=self.analyze_complexity(), 808 ) 809 meta = spec.Analysis(**meta) 810 return meta
Static-analysis for this ISPL specification.
Returns details about symbols and logical operators.
820 def exec(self, strict: bool = False, **kwargs) -> typing.Self: 821 """ 822 Execute this ISPL specification. 823 """ 824 required = ["init_states"] 825 self.logger.debug(f"validating: {self.source_code or self.model_dump_source()}") 826 827 # check advice before execution 828 if self.advice: 829 msg = "exec: Model has non-empty advice!" 830 LOGGER.critical(msg) 831 if strict: 832 raise RuntimeError(msg) 833 834 for k in required: 835 if not getattr(self, k): 836 err = f"Validation failed. Required key `{k}` is missing." 837 return self.model_copy( 838 update={ 839 # "source_code": src, 840 "simulation": Simulation( 841 error=err, 842 metadata=Simulation.Metadata(parsed=False, validates=False), 843 ), 844 } 845 ) 846 847 self.logger.debug("starting..") 848 sim = mcmas.engine(text=self.model_dump_source(), output_format="model") 849 out = self.model_copy( 850 update=dict( 851 source_code=self.source_code or self.model_dump_source(), 852 simulation=sim, 853 metadata=self.metadata.model_dump(), 854 ) 855 ) 856 self.logger.debug("done") 857 return out
Execute this ISPL specification.
861 def repl(self): 862 """ 863 Start a REPL shell with this object available as `spec`. 864 """ 865 result_model = self.exec() 866 return util.repl(spec=result_model)
Start a REPL shell with this object available as spec.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
820 def exec(self, strict: bool = False, **kwargs) -> typing.Self: 821 """ 822 Execute this ISPL specification. 823 """ 824 required = ["init_states"] 825 self.logger.debug(f"validating: {self.source_code or self.model_dump_source()}") 826 827 # check advice before execution 828 if self.advice: 829 msg = "exec: Model has non-empty advice!" 830 LOGGER.critical(msg) 831 if strict: 832 raise RuntimeError(msg) 833 834 for k in required: 835 if not getattr(self, k): 836 err = f"Validation failed. Required key `{k}` is missing." 837 return self.model_copy( 838 update={ 839 # "source_code": src, 840 "simulation": Simulation( 841 error=err, 842 metadata=Simulation.Metadata(parsed=False, validates=False), 843 ), 844 } 845 ) 846 847 self.logger.debug("starting..") 848 sim = mcmas.engine(text=self.model_dump_source(), output_format="model") 849 out = self.model_copy( 850 update=dict( 851 source_code=self.source_code or self.model_dump_source(), 852 simulation=sim, 853 metadata=self.metadata.model_dump(), 854 ) 855 ) 856 self.logger.debug("done") 857 return out
Execute this ISPL specification.
820 def exec(self, strict: bool = False, **kwargs) -> typing.Self: 821 """ 822 Execute this ISPL specification. 823 """ 824 required = ["init_states"] 825 self.logger.debug(f"validating: {self.source_code or self.model_dump_source()}") 826 827 # check advice before execution 828 if self.advice: 829 msg = "exec: Model has non-empty advice!" 830 LOGGER.critical(msg) 831 if strict: 832 raise RuntimeError(msg) 833 834 for k in required: 835 if not getattr(self, k): 836 err = f"Validation failed. Required key `{k}` is missing." 837 return self.model_copy( 838 update={ 839 # "source_code": src, 840 "simulation": Simulation( 841 error=err, 842 metadata=Simulation.Metadata(parsed=False, validates=False), 843 ), 844 } 845 ) 846 847 self.logger.debug("starting..") 848 sim = mcmas.engine(text=self.model_dump_source(), output_format="model") 849 out = self.model_copy( 850 update=dict( 851 source_code=self.source_code or self.model_dump_source(), 852 simulation=sim, 853 metadata=self.metadata.model_dump(), 854 ) 855 ) 856 self.logger.debug("done") 857 return out
Execute this ISPL specification.