From ebcd950ea97ff81d16b7cd6526d4e9d958fef9f0 Mon Sep 17 00:00:00 2001 From: Haili Hu Date: Wed, 18 Dec 2024 09:43:38 +0100 Subject: [PATCH 1/3] actors.py: continue when saving task to DB fails --- picas/actors.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/picas/actors.py b/picas/actors.py index e79b9a1..187f590 100644 --- a/picas/actors.py +++ b/picas/actors.py @@ -70,18 +70,12 @@ def _run(self, task, timeout): try: self.db.save(task) - except ResourceConflict as ex: + except Exception as ex: # simply overwrite changes - model results are more important - msg = f"Warning: {type(ex)} occurred while saving task to database: " + \ - "Document exists with different revision or was deleted" + msg = f"Warning: {type(ex)} occurred while saving task to database: {ex}" log.info(msg) new_task = self.db.get(task.id) task['_rev'] = new_task.rev - except Exception as ex: - # re-raise Exception - msg = f"Error: {type(ex)} occurred while saving task to database: {ex}" - log.info(msg) - raise self.cleanup_run() self.tasks_processed += 1 From fb3223d3174746e5246856830e5204ac09e93ad6 Mon Sep 17 00:00:00 2001 From: Haili Hu Date: Thu, 19 Dec 2024 15:09:41 +0100 Subject: [PATCH 2/3] In case of SSLEOFError, try to reconnect to database --- picas/actors.py | 30 ++++++++++++++++++++++++++++-- picas/iterators.py | 4 ++++ 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/picas/actors.py b/picas/actors.py index 187f590..2529cb6 100644 --- a/picas/actors.py +++ b/picas/actors.py @@ -5,6 +5,7 @@ @author: Jan Bot, Joris Borgdorff """ +import ssl import logging import signal import subprocess @@ -48,6 +49,12 @@ def __init__(self, db, iterator=None, view='todo', token_reset_values=[0, 0], ** else: self.iterator = iterator + + def reconnect(self): + self.db = self.db.copy() + self.iterator.reconnect(self.db) + + def _run(self, task, timeout): """ Execution of the work on the iterator used in the run method. @@ -70,12 +77,31 @@ def _run(self, task, timeout): try: self.db.save(task) - except Exception as ex: + except ResourceConflict as ex: # simply overwrite changes - model results are more important - msg = f"Warning: {type(ex)} occurred while saving task to database: {ex}" + msg = f"Warning: {type(ex)} occurred while saving task to database: " + \ + "Document exists with different revision or was deleted" log.info(msg) new_task = self.db.get(task.id) task['_rev'] = new_task.rev + except ssl.SSLEOFError as ex: + # SSLEOFError can occur for long-lived connections, re-establish connection + msg = f"Warning: {type(ex)} occurred while saving task to database: " + \ + "Trying ro reconnect to database" + log.info(msg) + self.reconnect() + try: + self.db.save(task) + except: + msg = f"Error: {type(ex)} occurred while saving task to database: " + \ + "Not able to reconnect to database" + log.info(msg) + raise + except Exception as ex: + #re-raise unknown exception, this will terminate the iterator + msg = f"Error: {type(ex)} occurred while saving task to database: {ex}" + log.info(msg) + raise self.cleanup_run() self.tasks_processed += 1 diff --git a/picas/iterators.py b/picas/iterators.py index b13eb5b..7b4963e 100644 --- a/picas/iterators.py +++ b/picas/iterators.py @@ -56,6 +56,10 @@ def __next__(self): def claim_task(self): """Get the first available task from a view.""" raise NotImplementedError("claim_task function not implemented.") + + def reconnect(self, database): + """Reconnect to database""" + self.database = database def _claim_task(database, view, allowed_failures=10, **view_params): From 24851467e84c59c7b3f84ca7fa8ced3685b51912 Mon Sep 17 00:00:00 2001 From: Haili Hu Date: Fri, 20 Dec 2024 11:51:33 +0100 Subject: [PATCH 3/3] Add unit test --- picas/actors.py | 12 +++++------- picas/iterators.py | 2 +- tests/test_actors.py | 11 +++++++++++ tests/test_mock.py | 3 +++ 4 files changed, 20 insertions(+), 8 deletions(-) diff --git a/picas/actors.py b/picas/actors.py index 2529cb6..a15d71a 100644 --- a/picas/actors.py +++ b/picas/actors.py @@ -49,12 +49,10 @@ def __init__(self, db, iterator=None, view='todo', token_reset_values=[0, 0], ** else: self.iterator = iterator - def reconnect(self): self.db = self.db.copy() self.iterator.reconnect(self.db) - def _run(self, task, timeout): """ Execution of the work on the iterator used in the run method. @@ -88,20 +86,20 @@ def _run(self, task, timeout): # SSLEOFError can occur for long-lived connections, re-establish connection msg = f"Warning: {type(ex)} occurred while saving task to database: " + \ "Trying ro reconnect to database" - log.info(msg) + log.info(msg) self.reconnect() try: self.db.save(task) - except: + except Exception as ex: msg = f"Error: {type(ex)} occurred while saving task to database: " + \ "Not able to reconnect to database" - log.info(msg) + log.info(msg) raise except Exception as ex: - #re-raise unknown exception, this will terminate the iterator + # re-raise unknown exception, this will terminate the iterator msg = f"Error: {type(ex)} occurred while saving task to database: {ex}" log.info(msg) - raise + raise self.cleanup_run() self.tasks_processed += 1 diff --git a/picas/iterators.py b/picas/iterators.py index 7b4963e..36b1f78 100644 --- a/picas/iterators.py +++ b/picas/iterators.py @@ -56,7 +56,7 @@ def __next__(self): def claim_task(self): """Get the first available task from a view.""" raise NotImplementedError("claim_task function not implemented.") - + def reconnect(self, database): """Reconnect to database""" self.database = database diff --git a/tests/test_actors.py b/tests/test_actors.py index a61a8b0..be05431 100644 --- a/tests/test_actors.py +++ b/tests/test_actors.py @@ -3,6 +3,7 @@ import subprocess import time import unittest +import ssl from test_mock import MockDB, MockEmptyDB from unittest.mock import patch @@ -72,6 +73,16 @@ def test_run_resourceconflict(self, mock_save): runner._run(task=Task({'_id': 'c', 'lock': None, 'done': None}), timeout=None) self.assertEqual(runner.tasks_processed, 1) + @patch('test_mock.MockDB.save') + def test_run_ssleoferror(self, mock_save): + """ + Test the _run function, in case the DB throws a an SSLEOFError + """ + with pytest.raises(ssl.SSLEOFError): + mock_save.side_effect = ssl.SSLEOFError + runner = ExampleRun(self._callback) + runner._run(task=Task({'_id': 'c', 'lock': None, 'done': None}), timeout=None) + @patch('test_mock.MockDB.save') def test_run_exception(self, mock_save): """ diff --git a/tests/test_mock.py b/tests/test_mock.py index 265ead6..5a97d81 100644 --- a/tests/test_mock.py +++ b/tests/test_mock.py @@ -41,6 +41,9 @@ def save(self, doc): return doc + def copy(self): + return self + class MockEmptyDB(MockDB): TASKS = []