Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix SSLEOFError when connection to DB is lost #34

Merged
merged 4 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion picas/actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
@author: Jan Bot, Joris Borgdorff
"""

import ssl
import logging
import signal
import subprocess
Expand Down Expand Up @@ -48,6 +49,10 @@ def __init__(self, db, iterator=None, view='todo', token_reset_values=[0, 0], **
else:
self.iterator = iterator

def reconnect(self):
self.db = self.db.copy()
self.iterator.reconnect(self.db)

def _run(self, task, timeout):
"""
Execution of the work on the iterator used in the run method.
Expand Down Expand Up @@ -77,8 +82,21 @@ def _run(self, task, timeout):
log.info(msg)
new_task = self.db.get(task.id)
task['_rev'] = new_task.rev
except ssl.SSLEOFError as ex:
# SSLEOFError can occur for long-lived connections, re-establish connection
msg = f"Warning: {type(ex)} occurred while saving task to database: " + \
"Trying ro reconnect to database"
log.info(msg)
self.reconnect()
try:
self.db.save(task)
except Exception as ex:
msg = f"Error: {type(ex)} occurred while saving task to database: " + \
"Not able to reconnect to database"
log.info(msg)
raise
except Exception as ex:
# re-raise Exception
# re-raise unknown exception, this will terminate the iterator
msg = f"Error: {type(ex)} occurred while saving task to database: {ex}"
log.info(msg)
raise
Expand Down
4 changes: 4 additions & 0 deletions picas/iterators.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ def claim_task(self):
"""Get the first available task from a view."""
raise NotImplementedError("claim_task function not implemented.")

def reconnect(self, database):
"""Reconnect to database"""
self.database = database


def _claim_task(database, view, allowed_failures=10, **view_params):
for _ in range(allowed_failures):
Expand Down
11 changes: 11 additions & 0 deletions tests/test_actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import subprocess
import time
import unittest
import ssl

from test_mock import MockDB, MockEmptyDB
from unittest.mock import patch
Expand Down Expand Up @@ -72,6 +73,16 @@ def test_run_resourceconflict(self, mock_save):
runner._run(task=Task({'_id': 'c', 'lock': None, 'done': None}), timeout=None)
self.assertEqual(runner.tasks_processed, 1)

@patch('test_mock.MockDB.save')
def test_run_ssleoferror(self, mock_save):
"""
Test the _run function, in case the DB throws a an SSLEOFError
"""
with pytest.raises(ssl.SSLEOFError):
mock_save.side_effect = ssl.SSLEOFError
runner = ExampleRun(self._callback)
runner._run(task=Task({'_id': 'c', 'lock': None, 'done': None}), timeout=None)

@patch('test_mock.MockDB.save')
def test_run_exception(self, mock_save):
"""
Expand Down
3 changes: 3 additions & 0 deletions tests/test_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ def save(self, doc):

return doc

def copy(self):
return self


class MockEmptyDB(MockDB):
TASKS = []
Expand Down
Loading