Skip to content

Commit

Permalink
Merge pull request #244 from nanglo123/stock_flow
Browse files Browse the repository at this point in the history
Initial implementation of Stock-and-flow model processing from ACSet
  • Loading branch information
bgyori authored Oct 5, 2023
2 parents 061ee9d + a3f36a6 commit 15f4588
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 96 deletions.
97 changes: 1 addition & 96 deletions mira/sources/askenet/petrinet.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
"model_from_url",
"model_from_json_file",
"template_model_from_askenet_json",
"get_sympy"
]

import json
Expand All @@ -23,6 +22,7 @@
import requests

from mira.metamodel import *
from mira.sources.util import get_sympy, transition_to_templates


def model_from_url(url: str) -> TemplateModel:
Expand Down Expand Up @@ -271,98 +271,3 @@ def parameter_to_mira(parameter):
"units": parameter.get('units')
}
return Parameter.from_json(data)


def transition_to_templates(transition_rate, input_concepts, output_concepts,
controller_concepts, symbols, transition_id):
"""Return a list of templates from a transition"""
rate_law = get_sympy(transition_rate, local_dict=symbols)

if not controller_concepts:
if not input_concepts:
for output_concept in output_concepts:
yield NaturalProduction(outcome=output_concept,
rate_law=rate_law,
name=transition_id)
elif not output_concepts:
for input_concept in input_concepts:
yield NaturalDegradation(subject=input_concept,
rate_law=rate_law,
name=transition_id)
else:
for input_concept in input_concepts:
for output_concept in output_concepts:
yield NaturalConversion(subject=input_concept,
outcome=output_concept,
rate_law=rate_law,
name=transition_id)
else:
if not (len(input_concepts) == 1 and len(output_concepts) == 1):
if len(input_concepts) == 1 and not output_concepts:
if len(controller_concepts) > 1:
yield GroupedControlledDegradation(controllers=controller_concepts,
subject=input_concepts[0],
rate_law=rate_law,
name=transition_id)
else:
yield ControlledDegradation(controller=controller_concepts[0],
subject=input_concepts[0],
rate_law=rate_law,
name=transition_id)
elif len(output_concepts) == 1 and not input_concepts:
if len(controller_concepts) > 1:
yield GroupedControlledProduction(controllers=controller_concepts,
outcome=output_concepts[0],
rate_law=rate_law,
name=transition_id)
else:
yield ControlledProduction(controller=controller_concepts[0],
outcome=output_concepts[0],
rate_law=rate_law,
name=transition_id)
else:
return []

elif len(controller_concepts) == 1:
yield ControlledConversion(controller=controller_concepts[0],
subject=input_concepts[0],
outcome=output_concepts[0],
rate_law=rate_law,
name=transition_id)
else:
yield GroupedControlledConversion(controllers=controller_concepts,
subject=input_concepts[0],
outcome=output_concepts[0],
rate_law=rate_law)


def get_sympy(expr_data, local_dict=None) -> Optional[sympy.Expr]:
"""Return a sympy expression from a dict with an expression or MathML
Sympy string expressions are prioritized over MathML.
Parameters
----------
expr_data :
A dict with an expression and/or MathML
local_dict :
A dict of local variables to use when parsing the expression
Returns
-------
:
A sympy expression or None if no expression was found
"""
if expr_data is None:
return None

# Sympy
if expr_data.get("expression"):
expr = safe_parse_expr(expr_data["expression"], local_dict=local_dict)
# MathML
elif expr_data.get("expression_mathml"):
expr = mathml_to_expression(expr_data["expression_mathml"])
# No expression found
else:
expr = None
return expr
135 changes: 135 additions & 0 deletions mira/sources/stock_and_flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
from typing import Optional
import re
import sympy
import requests

from mira.metamodel import *
from mira.metamodel.utils import safe_parse_expr
from mira.sources.util import get_sympy, transition_to_templates


def template_model_from_sf_json(model_json) -> TemplateModel:
stocks = model_json.get('Stock', [])

# process stocks/states
concepts = {}
all_stocks = set()
for stock in stocks:
concept_stock = stock_to_concept(stock)
concepts[stock['_id']] = concept_stock
all_stocks.add(stock['_id'])

symbols, mira_parameters = {}, {}

# Store stocks as parameters
for stock_id, concept_item in concepts.items():
symbols[concept_item.display_name] = \
sympy.Symbol(concept_item.display_name)

used_stocks = set()
flows = model_json['Flow']
links = model_json['Link']
templates = []

for flow in flows:
# First identify parameters and stocks in the flow expression
params_in_expr = re.findall(r'p\.([^*+-/ ]+)', flow['ϕf'])
stocks_in_expr = re.findall(r'u\.([^*+-/ ]+)', flow['ϕf'])
# We can now remove the prefixes from the expression
expression_str = flow['ϕf'].replace('p.', '').replace('u.', '')

# Turn each str symbol into a sympy.Symbol and add to dict of symbols
# if not present before and also turn it into a Parameter object to be
# added to tm
for str_symbol in set(params_in_expr + stocks_in_expr):
if symbols.get(str_symbol) is None:
symbols[str_symbol] = sympy.Symbol(str_symbol)
mira_parameters[str_symbol] = \
parameter_to_mira({"id": str_symbol})

# Process flow and links
# Input stock to the flow is the 'u' field of the flow
# Output stock of the flow is the 'd' field of the flow
# Does not handle multiple inputs or outputs of a flow currently
# Doesn't use copy method as inputs/outputs of stock and flow diagram
# are non-mutable (e.g. int), not mutable (e.g. lists)
input = flow['u']
output = flow['d']
inputs = []
outputs = []

# flow_id or flow_name for template name?
flow_id = flow['_id'] # required
flow_name = flow.get('fname')

inputs.append(input)
outputs.append(output)

used_stocks |= (set(inputs) | set(outputs))

# A stock is considered a controller if it has a link to the given
# flow but is not an input to the flow
controllers = [link['s'] for link in links if (
link['t'] == flow_id and link['s'] != input)]

input_concepts = [concepts[i].copy(deep=True) for i in inputs]
output_concepts = [concepts[i].copy(deep=True) for i in outputs]
controller_concepts = [concepts[i].copy(deep=True) for i in controllers]

templates.extend(
transition_to_templates({'expression': expression_str},
input_concepts, output_concepts,
controller_concepts, symbols, flow_id))

static_stocks = all_stocks - used_stocks

for state in static_stocks:
concept = concepts[state].copy(deep=True)
templates.append(StaticConcept(subject=concept))

return TemplateModel(templates=templates,
parameters=mira_parameters)


def stock_to_concept(state):
name = state['_id']
display_name = state.get('sname')
grounding = state.get('grounding', {})
identifiers = grounding.get('identifiers', {})
context = grounding.get('modifiers', {})
units = state.get('units')
units_expr = get_sympy(units, UNIT_SYMBOLS)
units_obj = Unit(expression=units_expr) if units_expr else None
return Concept(name=name,
display_name=display_name,
identifiers=identifiers,
context=context,
units=units_obj)


def parameter_to_mira(parameter):
"""Return a MIRA parameter from a parameter"""
distr = Distribution(**parameter['distribution']) \
if parameter.get('distribution') else None
data = {
"name": parameter['id'],
"display_name": parameter.get('name'),
"description": parameter.get('description'),
"value": parameter.get('value'),
"distribution": distr,
"units": parameter.get('units')
}
return Parameter.from_json(data)


def main():
sfamr = requests.get(
'https://raw.githubusercontent.com/AlgebraicJulia/'
'py-acsets/jpfairbanks-patch-1/src/acsets/schemas/'
'examples/StockFlowp.json').json()
tm = template_model_from_sf_json(sfamr)
return tm


if __name__ == "__main__":
tm = main()
100 changes: 100 additions & 0 deletions mira/sources/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
__all__ = ['transition_to_templates', 'get_sympy']

import sympy
from typing import Optional
from mira.metamodel import *


def transition_to_templates(transition_rate, input_concepts, output_concepts,
controller_concepts, symbols, transition_id):
"""Return a list of templates from a transition"""
rate_law = get_sympy(transition_rate, local_dict=symbols)

if not controller_concepts:
if not input_concepts:
for output_concept in output_concepts:
yield NaturalProduction(outcome=output_concept,
rate_law=rate_law,
name=transition_id)
elif not output_concepts:
for input_concept in input_concepts:
yield NaturalDegradation(subject=input_concept,
rate_law=rate_law,
name=transition_id)
else:
for input_concept in input_concepts:
for output_concept in output_concepts:
yield NaturalConversion(subject=input_concept,
outcome=output_concept,
rate_law=rate_law,
name=transition_id)
else:
if not (len(input_concepts) == 1 and len(output_concepts) == 1):
if len(input_concepts) == 1 and not output_concepts:
if len(controller_concepts) > 1:
yield GroupedControlledDegradation(controllers=controller_concepts,
subject=input_concepts[0],
rate_law=rate_law,
name=transition_id)
else:
yield ControlledDegradation(controller=controller_concepts[0],
subject=input_concepts[0],
rate_law=rate_law,
name=transition_id)
elif len(output_concepts) == 1 and not input_concepts:
if len(controller_concepts) > 1:
yield GroupedControlledProduction(controllers=controller_concepts,
outcome=output_concepts[0],
rate_law=rate_law,
name=transition_id)
else:
yield ControlledProduction(controller=controller_concepts[0],
outcome=output_concepts[0],
rate_law=rate_law,
name=transition_id)
else:
return []

elif len(controller_concepts) == 1:
yield ControlledConversion(controller=controller_concepts[0],
subject=input_concepts[0],
outcome=output_concepts[0],
rate_law=rate_law,
name=transition_id)
else:
yield GroupedControlledConversion(controllers=controller_concepts,
subject=input_concepts[0],
outcome=output_concepts[0],
rate_law=rate_law)


def get_sympy(expr_data, local_dict=None) -> Optional[sympy.Expr]:
"""Return a sympy expression from a dict with an expression or MathML
Sympy string expressions are prioritized over MathML.
Parameters
----------
expr_data :
A dict with an expression and/or MathML
local_dict :
A dict of local variables to use when parsing the expression
Returns
-------
:
A sympy expression or None if no expression was found
"""
if expr_data is None:
return None

# Sympy
if expr_data.get("expression"):
expr = safe_parse_expr(expr_data["expression"], local_dict=local_dict)
# MathML
elif expr_data.get("expression_mathml"):
expr = mathml_to_expression(expr_data["expression_mathml"])
# No expression found
else:
expr = None
return expr
34 changes: 34 additions & 0 deletions tests/test_stock_flow_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from copy import deepcopy as _d
from mira.sources.stock_and_flow import *
import requests

def set_up_file():
return requests.get(
'https://raw.githubusercontent.com/AlgebraicJulia/'
'py-acsets/jpfairbanks-patch-1/src/acsets/schemas/examples/StockFlowp.json').json()


def test_stock_to_concept():
stock = {
"_id": 1,
"sname": "S"
}
concept = stock_to_concept(stock)
assert concept.name == str(stock['_id'])
assert concept.display_name == stock['sname']


def test_flow_to_template():
sf_ascet = _d(set_up_file())
tm = template_model_from_sf_json(sf_ascet)

assert len(tm.templates) == 2
assert isinstance(tm.templates[0], ControlledConversion)
assert isinstance(tm.templates[1], NaturalConversion)
assert tm.templates[0].name == str(sf_ascet['Flow'][0]['_id'])
assert tm.templates[1].name == str(sf_ascet['Flow'][1]['_id'])
assert tm.templates[0].subject.name == str(sf_ascet['Flow'][0]['u'])
assert tm.templates[0].outcome.name == str(sf_ascet['Flow'][0]['d'])
assert tm.templates[0].controller.name == str(sf_ascet['Flow'][0]['d'])
assert tm.templates[1].subject.name == str(sf_ascet['Flow'][1]['u'])
assert tm.templates[1].outcome.name == str(sf_ascet['Flow'][1]['d'])

0 comments on commit 15f4588

Please sign in to comment.