From e74c82347a46ff6c483d512641bf00a285027ebb Mon Sep 17 00:00:00 2001 From: noctarius aka Christoph Engelbert Date: Thu, 23 Feb 2023 12:27:24 +0100 Subject: [PATCH] initial work to redesign the timescale tables --- README.md | 9 ++- custom_components/ltss/__init__.py | 56 ++++++++++--- custom_components/ltss/manifest.json | 5 +- custom_components/ltss/migrations.py | 114 ++++++++++++++++----------- custom_components/ltss/models.py | 79 +++++++++++++++---- requirements.txt | 1 + 6 files changed, 185 insertions(+), 79 deletions(-) diff --git a/README.md b/README.md index cfc0f17..ed3c715 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ Enabling simple long time state storage (LTSS) for your sensor states in a Postg The following extensions are required for full functionality: * TimescaleDB * PostGIS +* Ltree LTSS automatically detects the available extensions and creates the necessary table accordingly. A PostgeSQL instance without those extensions can be used but will lack some features: efficient storing and accessing time-series data (without TimescaleDB) and directly accessing geolocation data of logged data (without PostGis). @@ -45,7 +46,7 @@ configuration.yaml ltss: db_url: postgresql://USER:PASSWORD@HOST_ADRESS/DB_NAME - chunk_time_interval: 2592000000000 + chunk_time_interval: 604800000000 include: domains: - sensor @@ -66,7 +67,11 @@ configuration.yaml chunk_time_interval (int)(Optional) - The time interval to be used for chunking in TimescaleDB in microseconds. Defaults to 2592000000000 (30 days). Ignored for databases without TimescaleDB extension. + The time interval to be used for chunking in TimescaleDB in microseconds. Defaults to 604800000000 (7 days). Ignored for databases without TimescaleDB extension. + + chunk_compression_after + (int)(Optional) + The time interval after which TimescaleDB will compress old chunks in microseconds. Defaults to 1209600000000 (14 days). Ignored for databases without TimescaleDB extension. exclude (map)(Optional) diff --git a/custom_components/ltss/__init__.py b/custom_components/ltss/__init__.py index 56a5cc6..2644b5b 100644 --- a/custom_components/ltss/__init__.py +++ b/custom_components/ltss/__init__.py @@ -1,28 +1,22 @@ """Support for recording details.""" import asyncio import concurrent.futures -from contextlib import contextmanager -from datetime import datetime, timedelta import logging import queue import threading import time import json -from typing import Any, Dict, Optional, Callable +from typing import Any, Callable import voluptuous as vol from sqlalchemy import exc, create_engine, inspect, text -from sqlalchemy.engine import Engine from sqlalchemy.orm import scoped_session, sessionmaker +from sqlalchemy.dialects.postgresql import insert import psycopg2 from homeassistant.const import ( ATTR_ENTITY_ID, - CONF_DOMAINS, - CONF_ENTITIES, - CONF_EXCLUDE, - CONF_INCLUDE, EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP, EVENT_STATE_CHANGED, @@ -39,7 +33,7 @@ import homeassistant.util.dt as dt_util from homeassistant.helpers.json import JSONEncoder -from .models import Base, LTSS +from .models import Base, LTSS, LTSS_ATTRIBUTES from .migrations import check_and_migrate _LOGGER = logging.getLogger(__name__) @@ -48,6 +42,7 @@ CONF_DB_URL = "db_url" CONF_CHUNK_TIME_INTERVAL = "chunk_time_interval" +CONF_CHUNK_COMPRESSION_AFTER = "chunk_compression_after" CONNECT_RETRY_WAIT = 3 @@ -57,8 +52,11 @@ { vol.Required(CONF_DB_URL): cv.string, vol.Optional( - CONF_CHUNK_TIME_INTERVAL, default=2592000000000 - ): cv.positive_int, # 30 days + CONF_CHUNK_TIME_INTERVAL, default=604800000000 + ): cv.positive_int, # 7 days + vol.Optional( + CONF_CHUNK_COMPRESSION_AFTER, default=1209600000000 + ): cv.positive_int # 14 days } ) }, @@ -72,12 +70,14 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: db_url = conf.get(CONF_DB_URL) chunk_time_interval = conf.get(CONF_CHUNK_TIME_INTERVAL) + chunk_compression_after = conf.get(CONF_CHUNK_COMPRESSION_AFTER) entity_filter = convert_include_exclude_filter(conf) instance = LTSS_DB( hass=hass, uri=db_url, chunk_time_interval=chunk_time_interval, + chunk_compression_after=chunk_compression_after, entity_filter=entity_filter, ) instance.async_initialize() @@ -94,6 +94,7 @@ def __init__( hass: HomeAssistant, uri: str, chunk_time_interval: int, + chunk_compression_after: int, entity_filter: Callable[[str], bool], ) -> None: """Initialize the ltss.""" @@ -104,6 +105,7 @@ def __init__( self.recording_start = dt_util.utcnow() self.db_url = uri self.chunk_time_interval = chunk_time_interval + self.chunk_compression_after = chunk_compression_after self.async_db_ready = asyncio.Future() self.engine: Any = None self.run_info: Any = None @@ -206,8 +208,15 @@ def notify_hass_started(event): with self.get_session() as session: with session.begin(): try: - row = LTSS.from_event(event) + row, attributes_row = LTSS.from_event(event) session.add(row) + session.execute( + LTSS_ATTRIBUTES.__table__.insert().values( + attributes_row + ).on_conflict_do_nothing( + [LTSS_ATTRIBUTES.attributes_key.name, LTSS_ATTRIBUTES.attributes.name] + ) + ) except (TypeError, ValueError): _LOGGER.warning( "State is not JSON serializable: %s", @@ -336,6 +345,29 @@ def _create_table(self, available_extensions): if_not_exists => TRUE);""" ) ) + con.execute( + text( + f"""ALTER TABLE {LTSS.__tablename__} SET (timescaledb.compress, + timescaledb.compress_orderby = 'time, entity_id'""" + ) + ) + con.execute( + text( + f"""SELECT add_compression_policy('{LTSS.__tablename__}', + {self.chunk_compression_after})""" + ) + ) + + if 'ltree' not in available_extensions: + _LOGGER.error("ltree extension is required, but not found...") + + if 'ltree' not in available_extensions: + _LOGGER.error("ltree extension is available, enabled it...") + con.execute( + text( + "CREATE EXTENSION IF NOT EXISTS ltree CASCADE" + ) + ) def _close_connection(self): """Close the connection.""" diff --git a/custom_components/ltss/manifest.json b/custom_components/ltss/manifest.json index 07c6a34..9db696e 100644 --- a/custom_components/ltss/manifest.json +++ b/custom_components/ltss/manifest.json @@ -1,12 +1,13 @@ { "domain": "ltss", - "version": "2.0.0", + "version": "2.0.1", "name": "Long Time State Storage (LTSS)", "documentation": "https://github.com/freol35241/ltss", "requirements": [ "sqlalchemy>=2.0,<3.0", "psycopg2>=2.8,<3.0", - "geoalchemy2>=0.8,<1.0" + "geoalchemy2>=0.13,<1.0", + "sqlalchemy-utils>=0.40.0,<1.0" ], "dependencies": [], "codeowners": [ diff --git a/custom_components/ltss/migrations.py b/custom_components/ltss/migrations.py index 2e1e00a..aeb0a70 100644 --- a/custom_components/ltss/migrations.py +++ b/custom_components/ltss/migrations.py @@ -2,43 +2,78 @@ from sqlalchemy import inspect, text, Text -from .models import LTSS, LTSS_attributes_index, LTSS_entityid_time_composite_index +from .models import LTSS, LTSS_ATTRIBUTES _LOGGER = logging.getLogger(__name__) + def check_and_migrate(engine): - #Inspect the DB - iengine = inspect(engine) - columns = iengine.get_columns(LTSS.__tablename__) - indexes = iengine.get_indexes(LTSS.__tablename__) - + # Inspect the DB + inspector = inspect(engine) + columns = inspector.get_columns('ltss') # old table name here + indexes = inspector.get_indexes(LTSS.__tablename__) + def index_exists(index_name): matches = [idx for idx in indexes if idx["name"] == index_name] return True if matches else False - - # Attributes column Text -> JSONB - attributes_column = next(col for col in columns if col["name"] == 'attributes') - if isinstance(attributes_column['type'], Text): - _LOGGER.warning('Migrating you LTSS table to the latest schema, this might take a couple of minutes!') - migrate_attributes_text_to_jsonb(engine) - _LOGGER.info('Migration completed successfully!') - - # Attributes Index? - if not index_exists('ltss_attributes_idx'): - _LOGGER.warning('Creating an index for the attributes column, this might take a couple of minutes!') - create_attributes_index(engine) - _LOGGER.info('Index created successfully!') - - # entity_id and time composite Index? - if not index_exists('ltss_entityid_time_composite_idx'): - _LOGGER.warning('Creating a composite index over entity_id and time columns, this might take a couple of minutes!') - create_entityid_time_index(engine) - _LOGGER.info('Index created successfully!') - - if index_exists('ix_ltss_entity_id'): - _LOGGER.warning('Index on entity_id no longer needed, dropping...') - drop_entityid_index(engine) + + if inspector.has_table('ltss'): + # Attributes column Text -> JSONB + attributes_column = next(col for col in columns if col["name"] == 'attributes') + if isinstance(attributes_column['type'], Text): + _LOGGER.warning('Migrating your LTSS table to the latest schema, this might take a couple of minutes!') + migrate_attributes_text_to_jsonb(engine) + _LOGGER.info('Migration completed successfully!') + + if not inspector.has_table('ltss_hass_attributes'): + _LOGGER.warning('Migrating your LTSS table to the latest schema, this might take a couple of minutes!') + with engine.begin() as con: + con.execute(text('ALTER TABLE ltss RENAME TO ltss_old')) + con.execute(text(f"""INSERT INTO {LTSS.__tablename__} (time, entity_id, state_text, state_float, + state_int, state_bool, location, attributes_key) + SELECT + l.time, + l.entity_id, + CASE + WHEN l.state !~ '^[0-9]*[0-9]?\\.[0-9]+$' + AND l.state !~ '^[0-9]+$' + AND l.state !~* 'off' + AND l.state !~* 'on' THEN + l.state + END, + CASE + WHEN l.state ~ '^[0-9]*[0-9]?\\.[0-9]+$' THEN + l.state::double precision + END, + CASE + WHEN l.state ~ '^[0-9]+$' THEN + l.state::int8 + END, + CASE + WHEN l.state ~* 'on' THEN + true + WHEN l.state ~* 'off' THEN + false + END, + l.location, + CASE + WHEN l.attributes IS NOT NULL THEN + text2ltree(l.entity_id || '.' || + encode( + sha256(regexp_replace(l.attributes::text, '\\\\n', '', 'ng')::bytea), + 'hex')) + END + FROM ltss_old l ON CONFLICT DO NOTHING""")) + con.execute(text(f"""INSERT INTO {LTSS_ATTRIBUTES.__tablename__} (attributes_key, attributes) + SELECT text2ltree(l.entity_id || '.' || + encode(sha256(regexp_replace(l.attributes::text, '\\\\n', '', 'ng')::bytea), + 'hex')), + l.attributes + FROM ltss_old l WHERE l.attributes IS NOT NULL ON CONFLICT DO NOTHING""")) + con.execute(text("DROP TABLE ltss_old")) + con.commit() + def migrate_attributes_text_to_jsonb(engine): @@ -46,24 +81,7 @@ def migrate_attributes_text_to_jsonb(engine): _LOGGER.info("Migrating attributes column from type text to type JSONB") con.execute(text( - f"""ALTER TABLE {LTSS.__tablename__} + f"""ALTER TABLE ltss ALTER COLUMN attributes TYPE JSONB USING attributes::JSONB;""" ).execution_options(autocommit=True)) - -def create_attributes_index(engine): - - _LOGGER.info("Creating GIN index on the attributes column") - LTSS_attributes_index.create(bind=engine) - -def create_entityid_time_index(engine): - - _LOGGER.info("Creating composite index over entity_id and time columns") - LTSS_entityid_time_composite_index.create(bind=engine) - -def drop_entityid_index(engine): - - with engine.connect() as con: - con.execute(text( - f"""DROP INDEX ix_ltss_entity_id;""" - ).execution_options(autocommit=True)) - + diff --git a/custom_components/ltss/models.py b/custom_components/ltss/models.py index 3fccf29..d82567e 100644 --- a/custom_components/ltss/models.py +++ b/custom_components/ltss/models.py @@ -1,20 +1,24 @@ """Models for SQLAlchemy.""" -import json from datetime import datetime import logging from sqlalchemy import ( Column, - BigInteger, DateTime, - String, Text, + Float, + Integer, + Boolean ) from sqlalchemy.schema import Index -from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.dialects.postgresql import JSONB, ExcludeConstraint +from sqlalchemy_utils import LtreeType, Ltree from geoalchemy2 import Geometry from sqlalchemy.orm import column_property, declarative_base +import json +import hashlib +import re # SQLAlchemy Schema # pylint: disable=invalid-name @@ -26,12 +30,14 @@ class LTSS(Base): # type: ignore """State change history.""" - __tablename__ = "ltss" - id = Column(BigInteger, primary_key=True, autoincrement=True) + __tablename__ = "ltss_hass" time = Column(DateTime(timezone=True), default=datetime.utcnow, primary_key=True) - entity_id = Column(String(255)) - state = Column(String(255), index=True) - attributes = Column(JSONB) + entity_id = Column(Text, primary_key=True) + state_text = Column(Text) + state_float = Column(Float(asdecimal=True)) + state_int = Column(Integer) + state_bool = Column(Boolean) + attributes_key = Column(LtreeType) location = None # when not activated, no location column will be added to the table/database @classmethod @@ -59,17 +65,60 @@ def from_event(cls, event): location = f'SRID=4326;POINT({lon} {lat})' if lon and lat else None + attributes_key_data = re.sub(r'\\n', '', json.dumps(attrs)) + attributes_key = Ltree(f"{entity_id}.{hashlib.sha256(attributes_key_data.encode()).hexdigest()}") + + state_text = None + state_float = None + state_int = None + state_bool = None + try: + state_int = int(state.state) + except ValueError: + try: + state_float = float(state.state) + except ValueError: + if 'on' == state.state.lower() or 'off' == state.state.lower(): + state_bool = 'on' == state.state.lower() + else: + state_text = state.state + row = LTSS( entity_id=entity_id, time=event.time_fired, - state=state.state, - attributes=attrs, + state_text=state_text, + state_float=state_float, + state_int=state_int, + state_bool=state_bool, + attributes_key=attributes_key, location=location ) - return row + attributes_row = { + "attributes_key": attributes_key, + "attributes": attrs + } + + return row, attributes_row + + +LTSS_time_entityid_composite_index = Index( + 'idx_ltss_hass_time_entity_id', LTSS.time.desc(), LTSS.entity_id, postgresql_using='btree' +) + + +class LTSS_ATTRIBUTES(Base): + __tablename__ = f'{LTSS.__tablename__}_attributes' + attributes_key = Column(LtreeType) + attributes = Column(JSONB) + __table_args__ = ( + ExcludeConstraint((attributes_key, '=')), + ) + __mapper_args__ = { + "primary_key": [attributes_key] + } + -LTSS_attributes_index = Index('ltss_attributes_idx', LTSS.attributes, postgresql_using='gin') -LTSS_entityid_time_composite_index = Index( - 'ltss_entityid_time_composite_idx', LTSS.entity_id, LTSS.time.desc() +LTSS_ATTRIBUTES_attributes_index = Index( + 'idx_ltss_hass_attributes_attributes', LTSS_ATTRIBUTES.attributes, postgresql_using='gin' ) diff --git a/requirements.txt b/requirements.txt index 5eb1b0e..b665885 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ GeoAlchemy2==0.13.1 psycopg2==2.9.5 +sqlalchemy-utils==0.40.0 SQLAlchemy # Pinned by HA version \ No newline at end of file