Skip to content

Commit

Permalink
Feature/mongodb jobmanager (#1186)
Browse files Browse the repository at this point in the history
* MongoDB Jobmanager

* Add MongoDB-Jobmanager

Example configuration included

* Requested Fixes

Fixes Issues mentioned in latest Review

* Flake8 fixes

Code is fixed on the basis of a flake8 report

* Update ogcapi-processes.rst

* Update ogcapi-processes.rst

* Flake8 fixes

Fixes of flake8 errors
  • Loading branch information
xcomagent95 authored Mar 30, 2023
1 parent 466a1df commit d9c377e
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 2 deletions.
15 changes: 15 additions & 0 deletions docs/source/data-publishing/ogcapi-processes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,21 @@ advanced job management capabilities (e.g. Kubernetes, databases, etc.).
connection: /tmp/pygeoapi-process-manager.db
output_dir: /tmp/
MongoDB
--------------------
As an alternative to the default a manager employing `MongoDB`_ can be used.
The connection to an installed `MongoDB`_ instance must be provided in the configuration.
`MongoDB`_ uses the localhost and port 27017 by default. Jobs are stored in a collection named
job_manager_pygeoapi.

.. code-block:: yaml
server:
manager:
name: MongoDB
connection: mongodb://host:port
output_dir: /tmp/
Putting it all together
-----------------------
Expand Down
2 changes: 1 addition & 1 deletion pygeoapi-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ server:
# connection: /tmp/pygeoapi-process-manager.db
# output_dir: /tmp/
# ogc_schemas_location: /opt/schemas.opengis.net

logging:
level: ERROR
#logfile: /tmp/pygeoapi.log
Expand Down
1 change: 1 addition & 0 deletions pygeoapi/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
},
'process_manager': {
'Dummy': 'pygeoapi.process.manager.dummy.DummyManager',
'MongoDB': 'pygeoapi.process.manager.mongodb_.MongoDBManager',
'TinyDB': 'pygeoapi.process.manager.tinydb_.TinyDBManager'
}
}
Expand Down
156 changes: 156 additions & 0 deletions pygeoapi/process/manager/mongodb_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
# =================================================================
#
# Authors: Alexander Pilz <[email protected]>
#
# Copyright (c) 2023 Alexander Pilz
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# =================================================================
import json
import logging
import traceback

from pymongo import MongoClient

from pygeoapi.process.manager.base import BaseManager

LOGGER = logging.getLogger(__name__)


class MongoDBManager(BaseManager):
def __init__(self, manager_def):
super().__init__(manager_def)
self.is_async = True

def _connect(self):
try:
client = MongoClient(self.connection)
self.db = client
LOGGER.info("JOBMANAGER - MongoDB connected")
return True
except Exception:
self.destroy()
LOGGER.error("JOBMANAGER - connect error",
exc_info=(traceback))
return False

def destroy(self):
try:
self.db.close()
LOGGER.info("JOBMANAGER - MongoDB disconnected")
return True
except Exception:
self.destroy()
LOGGER.error("JOBMANAGER - destroy error",
exc_info=(traceback))
return False

def get_jobs(self, status=None):
try:
self._connect()
database = self.db.job_manager_pygeoapi
collection = database.jobs
if status is not None:
jobs = list(collection.find({}, {"status": status}))
else:
jobs = list(collection.find({}))
LOGGER.info("JOBMANAGER - MongoDB jobs queried")
return jobs
except Exception:
LOGGER.error("JOBMANAGER - get_jobs error",
exc_info=(traceback))
return False

def add_job(self, job_metadata):
try:
self._connect()
database = self.db.job_manager_pygeoapi
collection = database.jobs
doc_id = collection.insert_one(job_metadata)
LOGGER.info("JOBMANAGER - MongoDB job added")
return doc_id
except Exception:
LOGGER.error("JOBMANAGER - add_job error",
exc_info=(traceback))
return False

def update_job(self, job_id, update_dict):
try:
self._connect()
database = self.db.job_manager_pygeoapi
collection = database.jobs
entry = collection.find_one({"identifier": job_id})
collection.update_one(entry, {"$set": update_dict})
LOGGER.info("JOBMANAGER - MongoDB job updated")
return True
except Exception:
LOGGER.error("JOBMANAGER - MongoDB update_job error",
exc_info=(traceback))
return False

def delete_job(self, job_id):
try:
self._connect()
database = self.db.job_manager_pygeoapi
collection = database.jobs
collection.delete_one({"identifier": job_id})
LOGGER.info("JOBMANAGER - MongoDB job deleted")
return True
except Exception:
LOGGER.error("JOBMANAGER - MongoDB delete_job error",
exc_info=(traceback))
return False

def get_job(self, job_id):
try:
self._connect()
database = self.db.job_manager_pygeoapi
collection = database.jobs
entry = collection.find_one({"identifier": job_id})
LOGGER.info("JOBMANAGER - MongoDB job queried")
return entry
except Exception:
LOGGER.error("JOBMANAGER - MongoDB get_job error",
exc_info=(traceback))
return False

def get_job_result(self, job_id):
try:
self._connect()
database = self.db.job_manager_pygeoapi
collection = database.jobs
entry = collection.find_one({"identifier": job_id})
if entry["status"] != "successful":
LOGGER.info("JOBMANAGER - job not finished or failed")
return (None,)
with open(entry["location"], "r") as file:
data = json.load(file)
LOGGER.info("JOBMANAGER - MongoDB job result queried")
return entry["mimetype"], data
except Exception:
LOGGER.error("JOBMANAGER - MongoDB get_job_result error",
exc_info=(traceback))
return False

def __repr__(self):
return f'<MongoDBManager> {self.name}'
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ requests
shapely<2.0
SQLAlchemy<2.0.0
tinydb
unicodecsv
unicodecsv

0 comments on commit d9c377e

Please sign in to comment.