Skip to content

Commit

Permalink
Merge pull request #2 from pritchardn/drop_hash
Browse files Browse the repository at this point in the history
Drop hash
  • Loading branch information
pritchardn authored May 29, 2020
2 parents efaa438 + c26b2d7 commit 79e97e8
Show file tree
Hide file tree
Showing 178 changed files with 53,054 additions and 8,908 deletions.
24 changes: 12 additions & 12 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,28 +43,28 @@ matrix:

# We want to use docker during the tests
services:
- docker
- docker

# Try to speed up builds by caching our dependencies
cache: pip

before_install:
- pip install -U coveralls pytest pytest-cov
- pip install -U setuptools pip wheel
- pip install -U coveralls pytest pytest-cov
- pip install -U setuptools pip wheel

install:
- pip install -e daliuge-common/
- test -n "$NO_DLG_TRANSLATOR" || pip install -e daliuge-translator/
- test -n "$NO_DLG_RUNTIME" || pip install -e daliuge-runtime/
- pip install -e daliuge-common/
- test -n "$NO_DLG_TRANSLATOR" || pip install -e daliuge-translator/
- test -n "$NO_DLG_RUNTIME" || pip install -e daliuge-runtime/

# run the tests, making sure subprocesses generate coverage information
script:
- COVFILES=
- test -n "$NO_DLG_TRANSLATOR" || { (cd daliuge-translator && py.test --cov) && COVFILES+=" daliuge-translator/.coverage"; }
- test -n "$NO_DLG_RUNTIME" || { (cd daliuge-runtime && py.test --cov) && COVFILES+=" daliuge-runtime/.coverage"; }
- coverage combine $COVFILES
- test -z "$TEST_OPENAPI" || (cd OpenAPI/tests && ./test_managers_openapi.sh)
- COVFILES=
- test -n "$NO_DLG_TRANSLATOR" || { (cd daliuge-translator && py.test --cov) && COVFILES+=" daliuge-translator/.coverage"; }
- test -n "$NO_DLG_RUNTIME" || { (cd daliuge-runtime && py.test --cov) && COVFILES+=" daliuge-runtime/.coverage"; }
- coverage combine $COVFILES
- test -z "$TEST_OPENAPI" || (cd OpenAPI/tests && ./test_managers_openapi.sh)

# Publish to coveralls (only once per commit, so only using one environment)
after_success:
- coveralls
- coveralls
1 change: 0 additions & 1 deletion OpenAPI/tests/managers_test_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import composite_manager_client as cmc
import node_manager_client as nmc


nm_config = nmc.Configuration()
nm_config.host = "127.0.0.1:8000"
dim_config = cmc.Configuration()
Expand Down
3 changes: 1 addition & 2 deletions OpenAPI/tests/translator_test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import translator_client as tc


translator_config = tc.Configuration()
translator_config.host = "127.0.0.1:8084"

Expand All @@ -12,7 +11,7 @@
with tc.ApiClient(translator_config) as translator_client:
translator = tc.DefaultApi(translator_client)
html_content = translator.gen_pgt(json_data=graph,
lg_name='test', algo='metis', num_islands=1)
lg_name='test', algo='metis', num_islands=1)
print(html_content)
html_content = translator.gen_pg(
pgt_id='test', dlg_mgr_host='127.0.0.1', dlg_mgr_port=8001)
Empty file added __init__.py
Empty file.
2 changes: 1 addition & 1 deletion daliuge-common/dlg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@
#

# Declaring this as a namespace package
__path__ = __import__('pkgutil').extend_path(__path__, __name__) # @ReservedAssignment
__path__ = __import__('pkgutil').extend_path(__path__, __name__) # @ReservedAssignment
14 changes: 11 additions & 3 deletions daliuge-common/dlg/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
from . import constants
from .restutils import RestClient


logger = logging.getLogger(__name__)
compress = os.environ.get('DALIUGE_COMPRESSED_JSON', True)


class BaseDROPManagerClient(RestClient):
"""
Base class for REST clients that talk to the DROP managers.
Expand Down Expand Up @@ -97,7 +97,8 @@ def graph(self, sessionId):
the DROP specifications.
"""
graph = self._get_json('/sessions/%s/graph' % (urllib.quote(sessionId),))
logger.debug('Successfully read graph (%d nodes) from session %s on %s:%s', len(graph), sessionId, self.host, self.port)
logger.debug('Successfully read graph (%d nodes) from session %s on %s:%s', len(graph), sessionId, self.host,
self.port)
return graph

def sessions(self):
Expand Down Expand Up @@ -142,10 +143,12 @@ def graph_size(self, sessionId):
getGraphSize = graph_size
getGraph = graph


class NodeManagerClient(BaseDROPManagerClient):
"""
A NodeManager REST client
"""

def __init__(self, host='localhost', port=constants.NODE_DEFAULT_REST_PORT, timeout=10):
super(NodeManagerClient, self).__init__(host=host, port=port, timeout=timeout)

Expand All @@ -158,6 +161,7 @@ def trigger_drops(self, sessionId, drop_uids):
def shutdown_node_manager(self):
self._GET('/shutdown')


class CompositeManagerClient(BaseDROPManagerClient):

def nodes(self):
Expand All @@ -169,19 +173,23 @@ def add_node(self, node):
def remove_node(self, node):
self._DELETE('/nodes/%s' % (node,))


class DataIslandManagerClient(CompositeManagerClient):
"""
A DataIslandManager REST client
"""

def __init__(self, host='localhost', port=constants.ISLAND_DEFAULT_REST_PORT, timeout=10):
super(DataIslandManagerClient, self).__init__(host=host, port=port, timeout=timeout)


class MasterManagerClient(CompositeManagerClient):
"""
A MasterManager REST client
"""

def __init__(self, host='localhost', port=constants.MASTER_DEFAULT_REST_PORT, timeout=10):
super(MasterManagerClient, self).__init__(host=host, port=port, timeout=timeout)

def create_island(self, island_host, nodes):
self._post_json('/managers/%s/dataisland' % (urllib.quote(island_host)), {'nodes': nodes})
self._post_json('/managers/%s/dataisland' % (urllib.quote(island_host)), {'nodes': nodes})
10 changes: 8 additions & 2 deletions daliuge-common/dlg/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
"""Common utilities used by daliuge packages"""
import sys

from .osutils import terminate_or_kill, wait_or_kill
from .network import check_port, connect_to, portIsClosed, portIsOpen, write_to
from .osutils import terminate_or_kill, wait_or_kill
from .streams import ZlibCompressedStream, JSONStream

STORAGE_TYPES = {'memory', 'file', 'ngas', 'null', 'json'}
Expand Down Expand Up @@ -59,6 +59,7 @@ class dropdict(dict):
repositories where graph templates are expected to be found by the
DROPManager.
"""

def _addSomething(self, other, key):
if key not in self:
self[key] = []
Expand All @@ -67,14 +68,19 @@ def _addSomething(self, other, key):

def addConsumer(self, other):
self._addSomething(other, 'consumers')

def addStreamingConsumer(self, other):
self._addSomething(other, 'streamingConsumers')

def addInput(self, other):
self._addSomething(other, 'inputs')

def addStreamingInput(self, other):
self._addSomething(other, 'streamingInputs')

def addOutput(self, other):
self._addSomething(other, 'outputs')

def addProducer(self, other):
self._addSomething(other, 'producers')

Expand Down Expand Up @@ -138,4 +144,4 @@ def get_leaves(pg_spec):
if dropspec.get('consumers', None) or dropspec.get('streamingConsumers', None):
nonleaves.add(oid)

return all_oids - nonleaves
return all_oids - nonleaves
9 changes: 6 additions & 3 deletions daliuge-common/dlg/common/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#
import contextlib
import errno
import logging
import socket
import time
import contextlib


logger = logging.getLogger(__name__)


def check_port(host, port, timeout=0, checking_open=True, return_socket=False):
"""
Checks that the port specified by ``host``:``port`` is either open or
Expand Down Expand Up @@ -127,12 +127,14 @@ def portIsClosed(host, port, timeout):
"""
return check_port(host, port, timeout=timeout, checking_open=False)


def portIsOpen(host, port, timeout=0):
"""
Checks if a given ``host``/``port`` is open, with a given ``timeout``.
"""
return check_port(host, port, timeout=timeout, checking_open=True)


def connect_to(host, port, timeout=None):
"""
Connects to ``host``:``port`` within the given timeout and return the
Expand All @@ -144,11 +146,12 @@ def connect_to(host, port, timeout=None):
raise socket.timeout()
return s


def write_to(host, port, data, timeout=None):
"""
Connects to ``host``:``port`` within the given timeout and write the given
piece of ``data`` into the connected socket.
"""
sock = connect_to(host, port, timeout=timeout)
with contextlib.closing(sock):
sock.send(data)
sock.send(data)
3 changes: 2 additions & 1 deletion daliuge-common/dlg/common/osutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import math
import time


logger = logging.getLogger(__name__)


def terminate_or_kill(proc, timeout):
"""
Terminates a process and waits until it has completed its execution within
Expand All @@ -41,6 +41,7 @@ def terminate_or_kill(proc, timeout):
proc.terminate()
wait_or_kill(proc, timeout)


def wait_or_kill(proc, timeout, period=0.1):
waitLoops = 0
max_loops = math.ceil(timeout / period)
Expand Down
Empty file.
14 changes: 14 additions & 0 deletions daliuge-common/dlg/common/reproducibility/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from enum import Enum


class ReproduciblityFlags(Enum):
NOTHING = 0
RERUN = 1
REPEAT = 2
REPRODUCE = 3
REPLICATE_COMP = 4
REPLICATE_SCI = 5
EXPERIMENTAL = 6


REPRO_DEFAULT = ReproduciblityFlags.RERUN
4 changes: 2 additions & 2 deletions daliuge-common/dlg/common/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,12 @@ def read(self, n=-1):

if self.isiter:
try:
i,obj = next(self.objects)
i, obj = next(self.objects)
json_out = b'[' if i == 0 else b','
json_out += json.dumps(obj).encode('latin1')
except StopIteration:
json_out = b']'
self.isiter = False # not nice, but prevents more reads
self.isiter = False # not nice, but prevents more reads
else:
json_out = json.dumps(self.objects).encode('latin1')

Expand Down
17 changes: 11 additions & 6 deletions daliuge-common/dlg/common/tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import pkg_resources


logger = logging.getLogger(__name__)


Expand All @@ -42,7 +41,6 @@ def add_logging_options(parser):


def setup_logging(opts):

levels = [
logging.NOTSET,
logging.DEBUG,
Expand All @@ -63,7 +61,8 @@ def setup_logging(opts):
# Let's configure logging now
# We use stderr for loggin because stdout is the default output file
# for several operations
fmt = logging.Formatter("%(asctime)-15s [%(levelname)5.5s] [%(threadName)15.15s] %(name)s#%(funcName)s:%(lineno)s %(message)s")
fmt = logging.Formatter(
"%(asctime)-15s [%(levelname)5.5s] [%(threadName)15.15s] %(name)s#%(funcName)s:%(lineno)s %(message)s")
fmt.converter = time.gmtime
streamHdlr = logging.StreamHandler(sys.stderr)
streamHdlr.setFormatter(fmt)
Expand All @@ -72,8 +71,9 @@ def setup_logging(opts):


commands = {}
def cmdwrap(cmdname, desc, f):


def cmdwrap(cmdname, desc, f):
# If it's not a callable we assume it's a string
# in which case we lazy-load the module:function when it gets called
if not callable(f):
Expand All @@ -84,11 +84,13 @@ def __call__(self, *args, **kwargs):
modname, fname = orig_f.split(':')
module = importlib.import_module(modname)
return getattr(module, fname)(*args, **kwargs)

f = Importer()

def wrapped(*args, **kwargs):
parser = optparse.OptionParser(description=desc)
f(parser, *args, **kwargs)

commands[cmdname] = (desc, wrapped)


Expand All @@ -97,22 +99,25 @@ def version(parser, args):
print("Version: %s" % version)
print("Git version: %s" % git_version)


cmdwrap('version', 'Reports the DALiuGE version and exits', version)


def _load_commands():
for entry_point in pkg_resources.iter_entry_points('dlg.tool_commands'):
entry_point.load().register_commands()


def print_usage(prgname):
print('Usage: %s [command] [options]' % (prgname))
print('')
print('\n'.join(['Commands are:'] + ['\t%-25.25s%s' % (cmdname,desc_and_f[0]) for cmdname,desc_and_f in sorted(commands.items())]))
print('\n'.join(['Commands are:'] + ['\t%-25.25s%s' % (cmdname, desc_and_f[0]) for cmdname, desc_and_f in
sorted(commands.items())]))
print('')
print('Try %s [command] --help for more details' % (prgname))


def run(args=sys.argv):

_load_commands()

# Manually parse the first argument, which will be
Expand Down
9 changes: 4 additions & 5 deletions daliuge-common/dlg/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,23 @@
#

# Constants used throughout the manager package
DEFAULT_PORTS = {'NODE_DEFAULT_REST_PORT': 8000,
DEFAULT_PORTS = {'NODE_DEFAULT_REST_PORT': 8000,
'ISLAND_DEFAULT_REST_PORT': 8001,
'MASTER_DEFAULT_REST_PORT': 8002,
'REPLAY_DEFAULT_REST_PORT': 8500,
'DAEMON_DEFAULT_REST_PORT': 9000,
'NODE_DEFAULT_EVENTS_PORT': 5555,
'NODE_DEFAULT_RPC_PORT': 6666}
'NODE_DEFAULT_RPC_PORT': 6666}

# just for backwards compatibility
NODE_DEFAULT_REST_PORT = DEFAULT_PORTS['NODE_DEFAULT_REST_PORT']
NODE_DEFAULT_REST_PORT = DEFAULT_PORTS['NODE_DEFAULT_REST_PORT']
ISLAND_DEFAULT_REST_PORT = DEFAULT_PORTS['ISLAND_DEFAULT_REST_PORT']
MASTER_DEFAULT_REST_PORT = DEFAULT_PORTS['MASTER_DEFAULT_REST_PORT']

REPLAY_DEFAULT_REST_PORT = DEFAULT_PORTS['REPLAY_DEFAULT_REST_PORT']

DAEMON_DEFAULT_REST_PORT = DEFAULT_PORTS['DAEMON_DEFAULT_REST_PORT']


# Others ports used by the Node Managers
NODE_DEFAULT_EVENTS_PORT = DEFAULT_PORTS['NODE_DEFAULT_EVENTS_PORT']
NODE_DEFAULT_RPC_PORT = DEFAULT_PORTS['NODE_DEFAULT_RPC_PORT']
NODE_DEFAULT_RPC_PORT = DEFAULT_PORTS['NODE_DEFAULT_RPC_PORT']
Loading

0 comments on commit 79e97e8

Please sign in to comment.