Skip to content

Commit

Permalink
backend: Fetch online status of nodes from PFS
Browse files Browse the repository at this point in the history
  • Loading branch information
manuelwedler committed May 27, 2021
1 parent ed1c7ef commit bea462f
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 98 deletions.
44 changes: 21 additions & 23 deletions backend/metrics_backend/metrics_cli.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from gevent import monkey, config # isort:skip # noqa
config.resolver = ['dnspython', 'ares', 'block']
from gevent import monkey # isort:skip # noqa
monkey.patch_all() # isort:skip # noqa

import contextlib
Expand All @@ -13,8 +12,9 @@
import click
import gevent
import requests
from eth_utils import is_checksum_address
from eth_utils import is_checksum_address, to_canonical_address
from raiden_contracts.constants import (
CONTRACT_SERVICE_REGISTRY,
CONTRACT_TOKEN_NETWORK_REGISTRY,
CONTRACTS_VERSION
)
Expand All @@ -25,7 +25,6 @@
)
from requests.exceptions import ConnectionError
from web3 import HTTPProvider, Web3
from web3.middleware import geth_poa_middleware

from metrics_backend.api.rest import NetworkInfoAPI
from metrics_backend.metrics_service import MetricsService
Expand All @@ -41,7 +40,6 @@
REQUIRED_CONFIRMATIONS = 5
PRODUCTION_CONTRACTS_VERSION = '0.37.0'
DEMOENV_CONTRACTS_VERSION = '0.37.0'
DEMOENV_MATRIX_SERVER = 'https://transport.demo001.env.raiden.network'


@contextlib.contextmanager
Expand Down Expand Up @@ -117,7 +115,6 @@ def main(
try:
log.info(f'Starting Web3 client for node at {eth_rpc}')
web3 = Web3(HTTPProvider(eth_rpc))
web3.middleware_onion.inject(geth_poa_middleware, layer=0)
except ConnectionError:
log.error(
'Can not connect to the Ethereum client. Please check that it is running and that '
Expand All @@ -135,36 +132,37 @@ def main(

with no_ssl_verification():
valid_params_given = is_checksum_address(registry_address) and start_block >= 0
if not valid_params_given:
try:
contract_data = get_contracts_deployment_info(web3.eth.chainId, contracts_version)
try:
contract_data = get_contracts_deployment_info(web3.eth.chainId, contracts_version)
service_registry_address = contract_data['contracts'][CONTRACT_SERVICE_REGISTRY]['address']
if not valid_params_given:
token_network_registry_info = contract_data['contracts'][CONTRACT_TOKEN_NETWORK_REGISTRY] # noqa
registry_address = token_network_registry_info['address']
start_block = max(0, token_network_registry_info['block_number'])
except ValueError:
log.error(
'Provided registry address or start block are not valid and '
'no deployed contracts were found'
)
sys.exit(1)
except ValueError:
log.error(
'Provided registry address or start block are not valid and '
'no deployed contracts were found'
)
sys.exit(1)

try:
contract_manager = ContractManager(contracts_precompiled_path(contracts_version))

metrics_service = MetricsService(
web3=web3,
contract_manager=ContractManager(contracts_precompiled_path(contracts_version)),
contract_manager=contract_manager,
registry_address=registry_address,
sync_start_block=start_block,
required_confirmations=confirmations,
)

matrix_server = None
if environment == 'demo':
matrix_server = DEMOENV_MATRIX_SERVER
presence_service = PresenceService(
f'EXPLORER_{web3.eth.chainId}',
web3.eth.chainId,
environment == 'production',
matrix_server
privkey_seed=f'EXPLORER_{web3.eth.chainId}',
contract_manager=contract_manager,
web3=web3,
block_confirmations=REQUIRED_CONFIRMATIONS,
service_registry_address=to_canonical_address(service_registry_address),
)

# re-enable once deployment works
Expand Down
135 changes: 68 additions & 67 deletions backend/metrics_backend/presence_service.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
import hashlib
import logging
from typing import Any, Dict
from urllib.parse import urlparse
import math
from typing import Dict, List

import gevent
from raiden.constants import DISCOVERY_DEFAULT_ROOM, Environment
from raiden.network.transport.matrix.utils import (
USER_PRESENCE_TO_ADDRESS_REACHABILITY,
AddressReachability,
UserPresence,
address_from_userid,
join_broadcast_room,
login,
make_client,
make_room_alias
)
from raiden.settings import DEFAULT_MATRIX_KNOWN_SERVERS
from raiden.utils.cli import get_matrix_servers
from raiden.utils.signer import LocalSigner, Signer
from raiden_contracts.utils.type_aliases import ChainID
import requests
from eth_utils.address import to_canonical_address, to_checksum_address
from raiden.constants import BLOCK_ID_LATEST
from raiden.network.pathfinding import get_random_pfs
from raiden.network.proxies.service_registry import ServiceRegistry
from raiden.network.rpc.client import JSONRPCClient
from requests import ConnectionError, HTTPError, Timeout
from web3 import Web3

from metrics_backend.utils import Address

log = logging.getLogger(__name__)

Expand All @@ -27,66 +22,72 @@ class PresenceService(gevent.Greenlet):
def __init__(
self,
privkey_seed: str,
chain_id: ChainID,
production_environment: bool,
server: str = None
contract_manager,
web3: Web3,
block_confirmations: int,
service_registry_address: Address,
poll_interval: int = 30,
error_poll_interval: int = 600,
) -> None:
""" Creates a new presence service listening on matrix presence updates
in the discovery room
Args:
privkey_seed: Seed for generating a private key for matrix login
chain_id: Chain id to listen on presence
production_environment: Determines which matrix server to use if none is explicitly set
server: Matrix server
"""
"""Creates a new presence service getting the online status of nodes from a PFS"""
super().__init__()
self.signer = LocalSigner(hashlib.sha256(privkey_seed.encode()).digest())
self.server = server
self.chain_id = chain_id
self.production_environment = production_environment

self.is_running = gevent.event.Event()

self.running = False
self.poll_interval = poll_interval
self.error_poll_interval = error_poll_interval
jsonrpc_client = JSONRPCClient(
web3=web3,
privkey=hashlib.sha256(privkey_seed.encode()).digest(),
block_num_confirmations=block_confirmations,
)
self.service_registry = ServiceRegistry(
jsonrpc_client=jsonrpc_client,
service_registry_address=service_registry_address,
contract_manager=contract_manager,
block_identifier=BLOCK_ID_LATEST,
)
self.nodes_presence_status: Dict[bytes, bool] = {}
log.info('Using address %s for matrix login', self.signer.address_hex)

def _run(self):
available_servers = [self.server]
if not self.server:
environment_type = Environment.PRODUCTION if self.production_environment else Environment.DEVELOPMENT
available_servers_url = DEFAULT_MATRIX_KNOWN_SERVERS[environment_type]
available_servers = get_matrix_servers(available_servers_url)

client = make_client(lambda x: False, lambda x: None, available_servers)
self.server = client.api.base_url
server_name = urlparse(self.server).netloc
login(client=client, signer=self.signer)
client.add_presence_listener(self.handle_presence_update)
client.start_listener_thread(30_000, 1_000)
self.running = True

discovery_room_alias = make_room_alias(
self.chain_id, DISCOVERY_DEFAULT_ROOM
pfs_url = get_random_pfs(
service_registry=self.service_registry,
block_identifier=BLOCK_ID_LATEST,
pathfinding_max_fee=math.inf,
)
join_broadcast_room(client, f'#{discovery_room_alias}:{server_name}')
if pfs_url is None:
self.running = False
log.warning(
"Could not get a PFS from ServiceRegistry %s. Disabling presence monitoring.",
to_checksum_address(self.service_registry.address),
)
return

log.info('Presence monitoring started, server: %s', self.server)
self.is_running.wait()
client.stop()
log.info("Presence service started, PFS: %s", pfs_url)
log.info("Presence polling interval: %ss", self.poll_interval)
while self.running:
try:
response = requests.get(f"{pfs_url}/api/v1/online_addresses")
response.raise_for_status()
self.update_presence(response.json())
gevent.sleep(self.poll_interval)
except (ConnectionError, HTTPError, Timeout):
log.warning(
"Error while trying to request from the PFS. Retrying in %d seconds.",
self.error_poll_interval,
)
gevent.sleep(self.error_poll_interval)

def stop(self):
self.is_running.set()
log.info("Stopped presence service")

def handle_presence_update(self, event: Dict[str, Any], update_id: int) -> None:
presence = UserPresence(event["content"]["presence"])
reachable = USER_PRESENCE_TO_ADDRESS_REACHABILITY[presence] is AddressReachability.REACHABLE
node_address = address_from_userid(event["sender"])
self.nodes_presence_status[node_address] = reachable
def stop(self):
self.running = False

def update_presence(self, online_addresses: List[str]):
self.nodes_presence_status = {
to_canonical_address(address): True for address in online_addresses
}
log.info(
'Presence update, server: %s, user_id: %s, presence: %s, update_id: %d',
self.server,
event["sender"],
presence,
update_id
"Presence update, number of online nodes: %d",
len(online_addresses),
)
16 changes: 8 additions & 8 deletions backend/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
Click==7.0
Click==8.0.1

web3
web3==5.19.0
eth-utils

flask==1.1.2
flask_restful==0.3.8
flask-cors==3.0.9
gevent==20.6.2
requests==2.24.0
cachetools==4.1.1
flask-cors==3.0.10
gevent==21.1.2
requests==2.25.1
cachetools==4.2.2

raiden==1.2.0
raiden-contracts==0.37.1
raiden==2.0.0rc0
raiden-contracts==0.37.5
mypy-extensions

0 comments on commit bea462f

Please sign in to comment.