diff --git a/mira/sources/askenet/petrinet.py b/mira/sources/askenet/petrinet.py index eae507fa6..414f3b1f2 100644 --- a/mira/sources/askenet/petrinet.py +++ b/mira/sources/askenet/petrinet.py @@ -12,7 +12,6 @@ "model_from_url", "model_from_json_file", "template_model_from_askenet_json", - "get_sympy" ] import json @@ -23,6 +22,7 @@ import requests from mira.metamodel import * +from mira.sources.util import get_sympy, transition_to_templates def model_from_url(url: str) -> TemplateModel: @@ -271,98 +271,3 @@ def parameter_to_mira(parameter): "units": parameter.get('units') } return Parameter.from_json(data) - - -def transition_to_templates(transition_rate, input_concepts, output_concepts, - controller_concepts, symbols, transition_id): - """Return a list of templates from a transition""" - rate_law = get_sympy(transition_rate, local_dict=symbols) - - if not controller_concepts: - if not input_concepts: - for output_concept in output_concepts: - yield NaturalProduction(outcome=output_concept, - rate_law=rate_law, - name=transition_id) - elif not output_concepts: - for input_concept in input_concepts: - yield NaturalDegradation(subject=input_concept, - rate_law=rate_law, - name=transition_id) - else: - for input_concept in input_concepts: - for output_concept in output_concepts: - yield NaturalConversion(subject=input_concept, - outcome=output_concept, - rate_law=rate_law, - name=transition_id) - else: - if not (len(input_concepts) == 1 and len(output_concepts) == 1): - if len(input_concepts) == 1 and not output_concepts: - if len(controller_concepts) > 1: - yield GroupedControlledDegradation(controllers=controller_concepts, - subject=input_concepts[0], - rate_law=rate_law, - name=transition_id) - else: - yield ControlledDegradation(controller=controller_concepts[0], - subject=input_concepts[0], - rate_law=rate_law, - name=transition_id) - elif len(output_concepts) == 1 and not input_concepts: - if len(controller_concepts) > 1: - yield GroupedControlledProduction(controllers=controller_concepts, - outcome=output_concepts[0], - rate_law=rate_law, - name=transition_id) - else: - yield ControlledProduction(controller=controller_concepts[0], - outcome=output_concepts[0], - rate_law=rate_law, - name=transition_id) - else: - return [] - - elif len(controller_concepts) == 1: - yield ControlledConversion(controller=controller_concepts[0], - subject=input_concepts[0], - outcome=output_concepts[0], - rate_law=rate_law, - name=transition_id) - else: - yield GroupedControlledConversion(controllers=controller_concepts, - subject=input_concepts[0], - outcome=output_concepts[0], - rate_law=rate_law) - - -def get_sympy(expr_data, local_dict=None) -> Optional[sympy.Expr]: - """Return a sympy expression from a dict with an expression or MathML - - Sympy string expressions are prioritized over MathML. - - Parameters - ---------- - expr_data : - A dict with an expression and/or MathML - local_dict : - A dict of local variables to use when parsing the expression - - Returns - ------- - : - A sympy expression or None if no expression was found - """ - if expr_data is None: - return None - - # Sympy - if expr_data.get("expression"): - expr = safe_parse_expr(expr_data["expression"], local_dict=local_dict) - # MathML - elif expr_data.get("expression_mathml"): - expr = mathml_to_expression(expr_data["expression_mathml"]) - # No expression found - else: - expr = None - return expr diff --git a/mira/sources/stock_and_flow.py b/mira/sources/stock_and_flow.py new file mode 100644 index 000000000..8f0e26359 --- /dev/null +++ b/mira/sources/stock_and_flow.py @@ -0,0 +1,135 @@ +from typing import Optional +import re +import sympy +import requests + +from mira.metamodel import * +from mira.metamodel.utils import safe_parse_expr +from mira.sources.util import get_sympy, transition_to_templates + + +def template_model_from_sf_json(model_json) -> TemplateModel: + stocks = model_json.get('Stock', []) + + # process stocks/states + concepts = {} + all_stocks = set() + for stock in stocks: + concept_stock = stock_to_concept(stock) + concepts[stock['_id']] = concept_stock + all_stocks.add(stock['_id']) + + symbols, mira_parameters = {}, {} + + # Store stocks as parameters + for stock_id, concept_item in concepts.items(): + symbols[concept_item.display_name] = \ + sympy.Symbol(concept_item.display_name) + + used_stocks = set() + flows = model_json['Flow'] + links = model_json['Link'] + templates = [] + + for flow in flows: + # First identify parameters and stocks in the flow expression + params_in_expr = re.findall(r'p\.([^*+-/ ]+)', flow['ϕf']) + stocks_in_expr = re.findall(r'u\.([^*+-/ ]+)', flow['ϕf']) + # We can now remove the prefixes from the expression + expression_str = flow['ϕf'].replace('p.', '').replace('u.', '') + + # Turn each str symbol into a sympy.Symbol and add to dict of symbols + # if not present before and also turn it into a Parameter object to be + # added to tm + for str_symbol in set(params_in_expr + stocks_in_expr): + if symbols.get(str_symbol) is None: + symbols[str_symbol] = sympy.Symbol(str_symbol) + mira_parameters[str_symbol] = \ + parameter_to_mira({"id": str_symbol}) + + # Process flow and links + # Input stock to the flow is the 'u' field of the flow + # Output stock of the flow is the 'd' field of the flow + # Does not handle multiple inputs or outputs of a flow currently + # Doesn't use copy method as inputs/outputs of stock and flow diagram + # are non-mutable (e.g. int), not mutable (e.g. lists) + input = flow['u'] + output = flow['d'] + inputs = [] + outputs = [] + + # flow_id or flow_name for template name? + flow_id = flow['_id'] # required + flow_name = flow.get('fname') + + inputs.append(input) + outputs.append(output) + + used_stocks |= (set(inputs) | set(outputs)) + + # A stock is considered a controller if it has a link to the given + # flow but is not an input to the flow + controllers = [link['s'] for link in links if ( + link['t'] == flow_id and link['s'] != input)] + + input_concepts = [concepts[i].copy(deep=True) for i in inputs] + output_concepts = [concepts[i].copy(deep=True) for i in outputs] + controller_concepts = [concepts[i].copy(deep=True) for i in controllers] + + templates.extend( + transition_to_templates({'expression': expression_str}, + input_concepts, output_concepts, + controller_concepts, symbols, flow_id)) + + static_stocks = all_stocks - used_stocks + + for state in static_stocks: + concept = concepts[state].copy(deep=True) + templates.append(StaticConcept(subject=concept)) + + return TemplateModel(templates=templates, + parameters=mira_parameters) + + +def stock_to_concept(state): + name = state['_id'] + display_name = state.get('sname') + grounding = state.get('grounding', {}) + identifiers = grounding.get('identifiers', {}) + context = grounding.get('modifiers', {}) + units = state.get('units') + units_expr = get_sympy(units, UNIT_SYMBOLS) + units_obj = Unit(expression=units_expr) if units_expr else None + return Concept(name=name, + display_name=display_name, + identifiers=identifiers, + context=context, + units=units_obj) + + +def parameter_to_mira(parameter): + """Return a MIRA parameter from a parameter""" + distr = Distribution(**parameter['distribution']) \ + if parameter.get('distribution') else None + data = { + "name": parameter['id'], + "display_name": parameter.get('name'), + "description": parameter.get('description'), + "value": parameter.get('value'), + "distribution": distr, + "units": parameter.get('units') + } + return Parameter.from_json(data) + + +def main(): + sfamr = requests.get( + 'https://raw.githubusercontent.com/AlgebraicJulia/' + 'py-acsets/jpfairbanks-patch-1/src/acsets/schemas/' + 'examples/StockFlowp.json').json() + tm = template_model_from_sf_json(sfamr) + return tm + + +if __name__ == "__main__": + tm = main() diff --git a/mira/sources/util.py b/mira/sources/util.py new file mode 100644 index 000000000..45d8827ed --- /dev/null +++ b/mira/sources/util.py @@ -0,0 +1,100 @@ +__all__ = ['transition_to_templates', 'get_sympy'] + +import sympy +from typing import Optional +from mira.metamodel import * + + +def transition_to_templates(transition_rate, input_concepts, output_concepts, + controller_concepts, symbols, transition_id): + """Return a list of templates from a transition""" + rate_law = get_sympy(transition_rate, local_dict=symbols) + + if not controller_concepts: + if not input_concepts: + for output_concept in output_concepts: + yield NaturalProduction(outcome=output_concept, + rate_law=rate_law, + name=transition_id) + elif not output_concepts: + for input_concept in input_concepts: + yield NaturalDegradation(subject=input_concept, + rate_law=rate_law, + name=transition_id) + else: + for input_concept in input_concepts: + for output_concept in output_concepts: + yield NaturalConversion(subject=input_concept, + outcome=output_concept, + rate_law=rate_law, + name=transition_id) + else: + if not (len(input_concepts) == 1 and len(output_concepts) == 1): + if len(input_concepts) == 1 and not output_concepts: + if len(controller_concepts) > 1: + yield GroupedControlledDegradation(controllers=controller_concepts, + subject=input_concepts[0], + rate_law=rate_law, + name=transition_id) + else: + yield ControlledDegradation(controller=controller_concepts[0], + subject=input_concepts[0], + rate_law=rate_law, + name=transition_id) + elif len(output_concepts) == 1 and not input_concepts: + if len(controller_concepts) > 1: + yield GroupedControlledProduction(controllers=controller_concepts, + outcome=output_concepts[0], + rate_law=rate_law, + name=transition_id) + else: + yield ControlledProduction(controller=controller_concepts[0], + outcome=output_concepts[0], + rate_law=rate_law, + name=transition_id) + else: + return [] + + elif len(controller_concepts) == 1: + yield ControlledConversion(controller=controller_concepts[0], + subject=input_concepts[0], + outcome=output_concepts[0], + rate_law=rate_law, + name=transition_id) + else: + yield GroupedControlledConversion(controllers=controller_concepts, + subject=input_concepts[0], + outcome=output_concepts[0], + rate_law=rate_law) + + +def get_sympy(expr_data, local_dict=None) -> Optional[sympy.Expr]: + """Return a sympy expression from a dict with an expression or MathML + + Sympy string expressions are prioritized over MathML. + + Parameters + ---------- + expr_data : + A dict with an expression and/or MathML + local_dict : + A dict of local variables to use when parsing the expression + + Returns + ------- + : + A sympy expression or None if no expression was found + """ + if expr_data is None: + return None + + # Sympy + if expr_data.get("expression"): + expr = safe_parse_expr(expr_data["expression"], local_dict=local_dict) + # MathML + elif expr_data.get("expression_mathml"): + expr = mathml_to_expression(expr_data["expression_mathml"]) + # No expression found + else: + expr = None + return expr diff --git a/tests/test_stock_flow_source.py b/tests/test_stock_flow_source.py new file mode 100644 index 000000000..1e2797e47 --- /dev/null +++ b/tests/test_stock_flow_source.py @@ -0,0 +1,34 @@ +from copy import deepcopy as _d +from mira.sources.stock_and_flow import * +import requests + +def set_up_file(): + return requests.get( + 'https://raw.githubusercontent.com/AlgebraicJulia/' + 'py-acsets/jpfairbanks-patch-1/src/acsets/schemas/examples/StockFlowp.json').json() + + +def test_stock_to_concept(): + stock = { + "_id": 1, + "sname": "S" + } + concept = stock_to_concept(stock) + assert concept.name == str(stock['_id']) + assert concept.display_name == stock['sname'] + + +def test_flow_to_template(): + sf_ascet = _d(set_up_file()) + tm = template_model_from_sf_json(sf_ascet) + + assert len(tm.templates) == 2 + assert isinstance(tm.templates[0], ControlledConversion) + assert isinstance(tm.templates[1], NaturalConversion) + assert tm.templates[0].name == str(sf_ascet['Flow'][0]['_id']) + assert tm.templates[1].name == str(sf_ascet['Flow'][1]['_id']) + assert tm.templates[0].subject.name == str(sf_ascet['Flow'][0]['u']) + assert tm.templates[0].outcome.name == str(sf_ascet['Flow'][0]['d']) + assert tm.templates[0].controller.name == str(sf_ascet['Flow'][0]['d']) + assert tm.templates[1].subject.name == str(sf_ascet['Flow'][1]['u']) + assert tm.templates[1].outcome.name == str(sf_ascet['Flow'][1]['d'])