Skip to content

Commit

Permalink
added redis sentinel support
Browse files Browse the repository at this point in the history
  • Loading branch information
RaphaelVRossi committed May 6, 2022
1 parent e4dea46 commit f859e51
Show file tree
Hide file tree
Showing 18 changed files with 1,033 additions and 95 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/unittest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
python-version: ${{ matrix.python-version }}
architecture: x64
- name: Install apt dependencies
run: sudo apt install -y libcurl4-openssl-dev libssl-dev libjpeg-dev
run: sudo apt update && sudo apt install -y libcurl4-openssl-dev libssl-dev libjpeg-dev
- name: Install pip dependencies
run: make setup
- run: make test
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ repos:
rev: 22.1.0
hooks:
- id: black
additional_dependencies: ['click==8.0.4']
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
REDIS_CONTAINER := redis-test
REDIS_CONTAINER := redis-test redis-sentinel-test

test: run-redis unit stop-redis

Expand Down
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ To use redis as a storage or result storage some values must be configured in `t

##### Redis Storage

###### Single Node
```python
STORAGE = "tc_redis.storages.redis_storage"

Expand All @@ -24,10 +25,25 @@ REDIS_STORAGE_SERVER_PORT = 6379
REDIS_STORAGE_SERVER_HOST = "localhost"
REDIS_STORAGE_SERVER_DB = 0
REDIS_STORAGE_SERVER_PASSWORD = None
REDIS_STORAGE_MODE = "single_node"
```

###### Sentinel
```python
STORAGE = "tc_redis.storages.redis_storage"

REDIS_STORAGE_IGNORE_ERRORS = True
REDIS_SENTINEL_STORAGE_INSTANCES = "localhost:26379,localhost:26380"
REDIS_SENTINEL_STORAGE_MASTER_INSTANCE = "redismaster"
REDIS_SENTINEL_STORAGE_MASTER_PASSWORD = "dummy"
REDIS_SENTINEL_STORAGE_PASSWORD = "dummy"
REDIS_SENTINEL_STORAGE_SOCKET_TIMEOUT = 1.0
REDIS_STORAGE_MODE = "sentinel"
```

##### Redis Result Storage

###### Single Node
```python
RESULT_STORAGE = "tc_redis.result_storages.redis_result_storage"

Expand All @@ -36,8 +52,21 @@ REDIS_RESULT_STORAGE_SERVER_PORT = 6379
REDIS_RESULT_STORAGE_SERVER_HOST = "localhost"
REDIS_RESULT_STORAGE_SERVER_DB = 0
REDIS_RESULT_STORAGE_SERVER_PASSWORD = None
REDIS_RESULT_STORAGE_MODE = "single_node"
```

###### Sentinel
```python
RESULT_STORAGE = "tc_redis.result_storages.redis_result_storage"

REDIS_RESULT_STORAGE_IGNORE_ERRORS = True
REDIS_SENTINEL_RESULT_STORAGE_INSTANCES = "localhost:26379,localhost:26380"
REDIS_SENTINEL_RESULT_STORAGE_MASTER_INSTANCE = "redismaster"
REDIS_SENTINEL_RESULT_STORAGE_MASTER_PASSWORD = "dummy"
REDIS_SENTINEL_RESULT_STORAGE_PASSWORD = "dummy"
REDIS_SENTINEL_RESULT_STORAGE_SOCKET_TIMEOUT = 1.0
REDIS_RESULT_STORAGE_MODE = "sentinel"
```
## Contribute

To build tc_redis locally use
Expand Down
11 changes: 11 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,14 @@ services:
image: "redis:6.2-alpine"
ports:
- 6379:6379

redis-sentinel-test:
build:
context: ./tests/fixtures/redis-sentinel
ports:
- 6380:6380
- 6381:6381
- 6382:6382
- 6383:6383
- 26379:26379
- 26380:26380
182 changes: 182 additions & 0 deletions tc_redis/base_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
# -*- coding: utf-8 -*-

# Licensed under the MIT license:
# http://www.opensource.org/licenses/mit-license
# Copyright (c) 2014 PopKey <[email protected]>
# Copyright (c) 2022 Raphael Rossi <[email protected]>

from redis import Redis, Sentinel

SINGLE_NODE = "single_node"
SENTINEL = "sentinel"


class RedisBaseStorage:

single_node_storage = None
sentinel_storage = None

def __init__(self, context, storage_type):
self.storage = None
self.context = context
self.storage_type = storage_type

self.storage_values = {
"storage": {
"db": self.context.config.get("REDIS_STORAGE_SERVER_DB"),
"host": self.context.config.get("REDIS_STORAGE_SERVER_HOST"),
"instances": self.context.config.get(
"REDIS_SENTINEL_STORAGE_INSTANCES"
),
"master_instance": self.context.config.get(
"REDIS_SENTINEL_STORAGE_MASTER_INSTANCE"
),
"master_password": self.context.config.get(
"REDIS_SENTINEL_STORAGE_MASTER_PASSWORD"
),
"password": self.context.config.get("REDIS_STORAGE_SERVER_PASSWORD"),
"port": self.context.config.get("REDIS_STORAGE_SERVER_PORT"),
"sentinel_password": self.context.config.get(
"REDIS_SENTINEL_STORAGE_PASSWORD"
),
"socket_timeout": self.context.config.get(
"REDIS_SENTINEL_STORAGE_SOCKET_TIMEOUT", 2.0
),
"mode": self.context.config.get(
"REDIS_STORAGE_MODE", SINGLE_NODE
).lower(),
},
"result_storage": {
"db": self.context.config.get("REDIS_RESULT_STORAGE_SERVER_DB"),
"host": self.context.config.get("REDIS_RESULT_STORAGE_SERVER_HOST"),
"instances": self.context.config.get(
"REDIS_SENTINEL_RESULT_STORAGE_INSTANCES"
),
"master_instance": self.context.config.get(
"REDIS_SENTINEL_RESULT_STORAGE_MASTER_INSTANCE"
),
"master_password": self.context.config.get(
"REDIS_SENTINEL_RESULT_STORAGE_MASTER_PASSWORD"
),
"password": self.context.config.get(
"REDIS_RESULT_STORAGE_SERVER_PASSWORD"
),
"port": self.context.config.get("REDIS_RESULT_STORAGE_SERVER_PORT"),
"sentinel_password": self.context.config.get(
"REDIS_SENTINEL_RESULT_STORAGE_PASSWORD"
),
"socket_timeout": self.context.config.get(
"REDIS_SENTINEL_RESULT_STORAGE_SOCKET_TIMEOUT", 2.0
),
"mode": self.context.config.get(
"REDIS_RESULT_STORAGE_MODE", SINGLE_NODE
).lower(),
},
}

def get_storage(self):
"""Get the storage instance.
:return Redis: Redis instance
"""

if self.storage:
return self.storage
self.storage = self.reconnect_redis()

return self.storage

def connect_redis_sentinel(self):
instances_split = self.storage_values[self.storage_type]["instances"].split(",")
instances = [tuple(instance.split(":")) for instance in instances_split]

if self.storage_values[self.storage_type]["sentinel_password"]:
sentinel_instance = Sentinel(
instances,
socket_timeout=self.storage_values[self.storage_type]["socket_timeout"],
sentinel_kwargs={
"password": self.storage_values[self.storage_type][
"sentinel_password"
]
},
)
else:
sentinel_instance = Sentinel(
instances,
socket_timeout=self.storage_values[self.storage_type]["socket_timeout"],
)

return sentinel_instance.master_for(
self.storage_values[self.storage_type]["master_instance"],
socket_timeout=self.storage_values[self.storage_type]["socket_timeout"],
password=self.storage_values[self.storage_type]["master_password"],
)

def connect_redis_single_node(self):
if self.storage_values[self.storage_type]["password"] is None:
return Redis(
port=self.storage_values[self.storage_type]["port"],
host=self.storage_values[self.storage_type]["host"],
db=self.storage_values[self.storage_type]["db"],
)

return Redis(
port=self.storage_values[self.storage_type]["port"],
host=self.storage_values[self.storage_type]["host"],
db=self.storage_values[self.storage_type]["db"],
password=self.storage_values[self.storage_type]["password"],
)

def reconnect_redis(self):
shared_client = self.get_shared_storage()
if shared_client:
return shared_client

storage = self.get_storage_redis()
self.set_shared_storage(storage)

return storage

def get_storage_redis(self):
redis_mode = self.storage_values[self.storage_type]["mode"]
if redis_mode == SINGLE_NODE:
return self.connect_redis_single_node()
if redis_mode == SENTINEL:
return self.connect_redis_sentinel()

if self.storage_type == "storage":
redis_mode_var = "REDIS_STORAGE_MODE"
else:
redis_mode_var = "REDIS_RESULT_STORAGE_MODE"

raise AttributeError(
f"Unknow value for {redis_mode_var} {redis_mode}. See README for more information."
)

def get_shared_storage(self):
redis_mode = self.storage_values[self.storage_type]["mode"]

if not self.shared_client:
return None

if redis_mode == SENTINEL and RedisBaseStorage.sentinel_storage:
return RedisBaseStorage.sentinel_storage

if redis_mode == SINGLE_NODE and RedisBaseStorage.single_node_storage:
return RedisBaseStorage.single_node_storage

return None

def set_shared_storage(self, storage):
redis_mode = self.storage_values[self.storage_type]["mode"]

if not self.shared_client:
return None

if redis_mode == SENTINEL:
RedisBaseStorage.sentinel_storage = storage

if redis_mode == SINGLE_NODE:
RedisBaseStorage.single_node_storage = storage

return None
59 changes: 10 additions & 49 deletions tc_redis/result_storages/redis_result_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,21 @@

import time
from datetime import datetime, timedelta
from redis import Redis, RedisError
from tc_redis.utils import on_exception

from redis import RedisError
from thumbor.result_storages import BaseStorage
from thumbor.utils import logger

from tc_redis.base_storage import RedisBaseStorage
from tc_redis.utils import on_exception

class Storage(BaseStorage):

storage = None
class Storage(BaseStorage, RedisBaseStorage):

"""start_time is used to calculate the last modified value when an item
has no expiration date.
"""

start_time = None

def __init__(self, context, shared_client=True):
Expand All @@ -30,52 +32,13 @@ def __init__(self, context, shared_client=True):
"""

BaseStorage.__init__(self, context)
RedisBaseStorage.__init__(self, context, "result_storage")
self.shared_client = shared_client
self.storage = self.reconnect_redis()
self.storage = self.get_storage()

if not Storage.start_time:
Storage.start_time = time.time()

def get_storage(self):
"""Get the storage instance.
:return Redis: Redis instance
"""

if self.storage:
return self.storage
self.storage = self.reconnect_redis()

return self.storage

def reconnect_redis(self):
"""Reconnect to redis.
:return: Redis client instance
:rettype: redis.Redis
"""

if self.shared_client and Storage.storage:
return Storage.storage

if self.context.config.REDIS_RESULT_STORAGE_SERVER_PASSWORD is None:
storage = Redis(
port=self.context.config.REDIS_RESULT_STORAGE_SERVER_PORT,
host=self.context.config.REDIS_RESULT_STORAGE_SERVER_HOST,
db=self.context.config.REDIS_RESULT_STORAGE_SERVER_DB,
)
else:
storage = Redis(
port=self.context.config.REDIS_RESULT_STORAGE_SERVER_PORT,
host=self.context.config.REDIS_RESULT_STORAGE_SERVER_HOST,
db=self.context.config.REDIS_RESULT_STORAGE_SERVER_DB,
password=self.context.config.REDIS_RESULT_STORAGE_SERVER_PASSWORD,
)

if self.shared_client:
Storage.storage = storage
return storage

def on_redis_error(self, fname, exc_type, exc_value):
"""Callback executed when there is a redis error.
Expand All @@ -85,10 +48,8 @@ def on_redis_error(self, fname, exc_type, exc_value):
:returns: Default value or raise the current exception
"""

if self.shared_client:
Storage.storage = None
else:
self.storage = None
self.storage = None
self.set_shared_storage(None)

if self.context.config.REDIS_RESULT_STORAGE_IGNORE_ERRORS is True:
logger.error(f"Redis result storage failure: {exc_value}")
Expand Down
Loading

0 comments on commit f859e51

Please sign in to comment.