forked from apache/cassandra-dtest
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathread_failures_test.py
130 lines (112 loc) · 5.3 KB
/
read_failures_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import logging
import pytest
from cassandra import ConsistencyLevel, ReadFailure, ReadTimeout
from cassandra.policies import FallthroughRetryPolicy
from cassandra.query import SimpleStatement
from distutils.version import LooseVersion
from dtest import Tester
since = pytest.mark.since
logger = logging.getLogger(__name__)
KEYSPACE = "readfailures"
class TestReadFailures(Tester):
"""
Tests for read failures in the replicas, introduced as a part of
@jira_ticket CASSANDRA-12311.
"""
@pytest.fixture(autouse=True)
def fixture_add_additional_log_patterns(self, fixture_dtest_setup):
fixture_dtest_setup.ignore_log_patterns = (
# These are expected when testing read failures due to tombstones,
"Scanned over [1-9][0-9]* tombstones",
"Scanned over [1-9][0-9]* tombstone rows",
)
return fixture_dtest_setup
@pytest.fixture(scope='function', autouse=True)
def fixture_dtest_setup_params(self):
self.tombstone_failure_threshold = 500
self.replication_factor = 3
self.consistency_level = ConsistencyLevel.ALL
self.expected_expt = ReadFailure
def _prepare_cluster(self):
if self.supports_guardrails():
self.cluster.set_configuration_options(
values={'guardrails': {'tombstone_warn_threshold': -1,
'tombstone_failure_threshold': self.tombstone_failure_threshold}}
)
else:
self.cluster.set_configuration_options(
values={'tombstone_failure_threshold': self.tombstone_failure_threshold,
'tombstone_warn_threshold': -1 }
)
self.cluster.populate(3)
self.cluster.start()
self.nodes = list(self.cluster.nodes.values())
session = self.patient_exclusive_cql_connection(self.nodes[0], protocol_version=self.protocol_version)
session.execute("""
CREATE KEYSPACE IF NOT EXISTS %s
WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '%s' }
""" % (KEYSPACE, self.replication_factor))
session.set_keyspace(KEYSPACE)
session.execute("CREATE TABLE IF NOT EXISTS tombstonefailure (id int, c int, value text, primary key(id, c))")
return session
def _insert_tombstones(self, session, number_of_tombstones):
for num_id in range(number_of_tombstones):
session.execute(SimpleStatement("DELETE value FROM tombstonefailure WHERE id = 0 and c = {}".format(num_id),
consistency_level=self.consistency_level, retry_policy=FallthroughRetryPolicy()))
def _perform_cql_statement(self, session, text_statement):
statement = SimpleStatement(text_statement,
consistency_level=self.consistency_level,
retry_policy=FallthroughRetryPolicy())
if self.expected_expt is None:
session.execute(statement)
else:
with pytest.raises(self.expected_expt) as cm:
# On 2.1, we won't return the ReadTimeout from coordinator until actual timeout,
# so we need to up the default timeout of the driver session
session.execute(statement, timeout=15)
return cm._excinfo[1]
def _assert_error_code_map_exists_with_code(self, exception, expected_code):
"""
Asserts that the given exception contains an error code map
where at least one node responded with some expected code.
This is meant for testing failure exceptions on protocol v5.
"""
assert exception is not None
assert exception.error_code_map is not None
expected_code_found = False
for error_code in list(exception.error_code_map.values()):
if error_code == expected_code:
expected_code_found = True
break
assert expected_code_found, "The error code map did not contain " + str(expected_code)
@since('2.1')
def test_tombstone_failure_v3(self):
"""
A failed read due to tombstones at v3 should result in a ReadTimeout
"""
self.protocol_version = 3
self.expected_expt = ReadTimeout
session = self._prepare_cluster()
self._insert_tombstones(session, 600)
self._perform_cql_statement(session, "SELECT value FROM tombstonefailure")
@since('2.2')
def test_tombstone_failure_v4(self):
"""
A failed read due to tombstones at v4 should result in a ReadFailure
"""
self.protocol_version = 4
session = self._prepare_cluster()
self._insert_tombstones(session, 600)
self._perform_cql_statement(session, "SELECT value FROM tombstonefailure")
@since('4.0')
def test_tombstone_failure_v5(self):
"""
A failed read due to tombstones at v5 should result in a ReadFailure with
an error code map containing error code 0x0001 (indicating that the replica(s)
read too many tombstones)
"""
self.protocol_version = 5
session = self._prepare_cluster()
self._insert_tombstones(session, 600)
exc = self._perform_cql_statement(session, "SELECT value FROM tombstonefailure")
self._assert_error_code_map_exists_with_code(exc, 0x0001)