diff --git a/README.md b/README.md index cfc0f17..ed3c715 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ Enabling simple long time state storage (LTSS) for your sensor states in a Postg The following extensions are required for full functionality: * TimescaleDB * PostGIS +* Ltree LTSS automatically detects the available extensions and creates the necessary table accordingly. A PostgeSQL instance without those extensions can be used but will lack some features: efficient storing and accessing time-series data (without TimescaleDB) and directly accessing geolocation data of logged data (without PostGis). @@ -45,7 +46,7 @@ configuration.yaml ltss: db_url: postgresql://USER:PASSWORD@HOST_ADRESS/DB_NAME - chunk_time_interval: 2592000000000 + chunk_time_interval: 604800000000 include: domains: - sensor @@ -66,7 +67,11 @@ configuration.yaml chunk_time_interval (int)(Optional) - The time interval to be used for chunking in TimescaleDB in microseconds. Defaults to 2592000000000 (30 days). Ignored for databases without TimescaleDB extension. + The time interval to be used for chunking in TimescaleDB in microseconds. Defaults to 604800000000 (7 days). Ignored for databases without TimescaleDB extension. + + chunk_compression_after + (int)(Optional) + The time interval after which TimescaleDB will compress old chunks in microseconds. Defaults to 1209600000000 (14 days). Ignored for databases without TimescaleDB extension. exclude (map)(Optional) diff --git a/custom_components/ltss/__init__.py b/custom_components/ltss/__init__.py index 56a5cc6..398eeb2 100644 --- a/custom_components/ltss/__init__.py +++ b/custom_components/ltss/__init__.py @@ -1,28 +1,22 @@ """Support for recording details.""" import asyncio import concurrent.futures -from contextlib import contextmanager -from datetime import datetime, timedelta import logging import queue import threading import time import json -from typing import Any, Dict, Optional, Callable +from typing import Any, Callable import voluptuous as vol from sqlalchemy import exc, create_engine, inspect, text -from sqlalchemy.engine import Engine from sqlalchemy.orm import scoped_session, sessionmaker +from sqlalchemy.dialects.postgresql import insert import psycopg2 from homeassistant.const import ( ATTR_ENTITY_ID, - CONF_DOMAINS, - CONF_ENTITIES, - CONF_EXCLUDE, - CONF_INCLUDE, EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP, EVENT_STATE_CHANGED, @@ -39,7 +33,7 @@ import homeassistant.util.dt as dt_util from homeassistant.helpers.json import JSONEncoder -from .models import Base, LTSS +from .models import Base, LTSS, LTSS_ATTRIBUTES from .migrations import check_and_migrate _LOGGER = logging.getLogger(__name__) @@ -48,6 +42,7 @@ CONF_DB_URL = "db_url" CONF_CHUNK_TIME_INTERVAL = "chunk_time_interval" +CONF_CHUNK_COMPRESSION_AFTER = "chunk_compression_after" CONNECT_RETRY_WAIT = 3 @@ -57,8 +52,11 @@ { vol.Required(CONF_DB_URL): cv.string, vol.Optional( - CONF_CHUNK_TIME_INTERVAL, default=2592000000000 - ): cv.positive_int, # 30 days + CONF_CHUNK_TIME_INTERVAL, default=604800000000 + ): cv.positive_int, # 7 days + vol.Optional( + CONF_CHUNK_COMPRESSION_AFTER, default=1209600000000 + ): cv.positive_int # 14 days } ) }, @@ -72,12 +70,14 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: db_url = conf.get(CONF_DB_URL) chunk_time_interval = conf.get(CONF_CHUNK_TIME_INTERVAL) + chunk_compression_after = conf.get(CONF_CHUNK_COMPRESSION_AFTER) entity_filter = convert_include_exclude_filter(conf) instance = LTSS_DB( hass=hass, uri=db_url, chunk_time_interval=chunk_time_interval, + chunk_compression_after=chunk_compression_after, entity_filter=entity_filter, ) instance.async_initialize() @@ -94,6 +94,7 @@ def __init__( hass: HomeAssistant, uri: str, chunk_time_interval: int, + chunk_compression_after: int, entity_filter: Callable[[str], bool], ) -> None: """Initialize the ltss.""" @@ -104,6 +105,7 @@ def __init__( self.recording_start = dt_util.utcnow() self.db_url = uri self.chunk_time_interval = chunk_time_interval + self.chunk_compression_after = chunk_compression_after self.async_db_ready = asyncio.Future() self.engine: Any = None self.run_info: Any = None @@ -206,11 +208,22 @@ def notify_hass_started(event): with self.get_session() as session: with session.begin(): try: - row = LTSS.from_event(event) + row, attributes_row = LTSS.from_event(event) + # Insert the actual attributes first for the + # internal trigger to work + stmt = insert( + LTSS_ATTRIBUTES.__table__ + ).values( + attributes_row + ) + session.execute( + stmt.on_conflict_do_nothing() + ) session.add(row) - except (TypeError, ValueError): + except (TypeError, ValueError) as e: _LOGGER.warning( - "State is not JSON serializable: %s", + "State is not JSON serializable: %s => %s", + e, event.data.get("new_state"), ) @@ -319,6 +332,17 @@ def _create_table(self, available_extensions): # activate location extraction in model/ORM to add necessary column when calling create_all() LTSS.activate_location_extraction() + if 'ltree' not in available_extensions: + _LOGGER.error("ltree extension is required, but not found...") + + if 'ltree' in available_extensions: + _LOGGER.info("ltree extension is available, enabling it...") + con.execute( + text( + "CREATE EXTENSION IF NOT EXISTS ltree CASCADE" + ) + ) + Base.metadata.create_all(self.engine) if "timescaledb" in available_extensions: @@ -336,6 +360,18 @@ def _create_table(self, available_extensions): if_not_exists => TRUE);""" ) ) + con.execute( + text( + f"""ALTER TABLE {LTSS.__tablename__} SET (timescaledb.compress, + timescaledb.compress_orderby = 'time, entity_id')""" + ) + ) + con.execute( + text( + f"""SELECT add_compression_policy('{LTSS.__tablename__}', + (interval '1us') * {self.chunk_compression_after})""" + ) + ) def _close_connection(self): """Close the connection.""" diff --git a/custom_components/ltss/manifest.json b/custom_components/ltss/manifest.json index a9e6aa2..206d5f2 100644 --- a/custom_components/ltss/manifest.json +++ b/custom_components/ltss/manifest.json @@ -1,12 +1,13 @@ { "domain": "ltss", - "version": "2.0.1", + "version": "2.1.0", "name": "Long Time State Storage (LTSS)", "documentation": "https://github.com/freol35241/ltss", "requirements": [ "sqlalchemy>=2.0,<3.0", "psycopg2>=2.8,<3.0", - "geoalchemy2>=0.13,<1.0" + "geoalchemy2>=0.13,<1.0", + "sqlalchemy-utils>=0.40.0,<1.0" ], "dependencies": [], "codeowners": [ diff --git a/custom_components/ltss/migrations.py b/custom_components/ltss/migrations.py index 2e1e00a..3e858c8 100644 --- a/custom_components/ltss/migrations.py +++ b/custom_components/ltss/migrations.py @@ -1,69 +1,153 @@ import logging -from sqlalchemy import inspect, text, Text +from sqlalchemy import inspect, text -from .models import LTSS, LTSS_attributes_index, LTSS_entityid_time_composite_index +from .models import LTSS, LTSS_ATTRIBUTES _LOGGER = logging.getLogger(__name__) + def check_and_migrate(engine): - #Inspect the DB - iengine = inspect(engine) - columns = iengine.get_columns(LTSS.__tablename__) - indexes = iengine.get_indexes(LTSS.__tablename__) - + # Inspect the DB + inspector = inspect(engine) + indexes = inspector.get_indexes(LTSS.__tablename__) + def index_exists(index_name): matches = [idx for idx in indexes if idx["name"] == index_name] return True if matches else False - - # Attributes column Text -> JSONB - attributes_column = next(col for col in columns if col["name"] == 'attributes') - if isinstance(attributes_column['type'], Text): - _LOGGER.warning('Migrating you LTSS table to the latest schema, this might take a couple of minutes!') - migrate_attributes_text_to_jsonb(engine) - _LOGGER.info('Migration completed successfully!') - - # Attributes Index? - if not index_exists('ltss_attributes_idx'): - _LOGGER.warning('Creating an index for the attributes column, this might take a couple of minutes!') - create_attributes_index(engine) - _LOGGER.info('Index created successfully!') - - # entity_id and time composite Index? - if not index_exists('ltss_entityid_time_composite_idx'): - _LOGGER.warning('Creating a composite index over entity_id and time columns, this might take a couple of minutes!') - create_entityid_time_index(engine) - _LOGGER.info('Index created successfully!') - - if index_exists('ix_ltss_entity_id'): - _LOGGER.warning('Index on entity_id no longer needed, dropping...') - drop_entityid_index(engine) -def migrate_attributes_text_to_jsonb(engine): - - with engine.connect() as con: - - _LOGGER.info("Migrating attributes column from type text to type JSONB") - con.execute(text( - f"""ALTER TABLE {LTSS.__tablename__} - ALTER COLUMN attributes TYPE JSONB USING attributes::JSONB;""" - ).execution_options(autocommit=True)) - -def create_attributes_index(engine): - - _LOGGER.info("Creating GIN index on the attributes column") - LTSS_attributes_index.create(bind=engine) - -def create_entityid_time_index(engine): - - _LOGGER.info("Creating composite index over entity_id and time columns") - LTSS_entityid_time_composite_index.create(bind=engine) - -def drop_entityid_index(engine): - - with engine.connect() as con: - con.execute(text( - f"""DROP INDEX ix_ltss_entity_id;""" - ).execution_options(autocommit=True)) - + def function_exists(func_name): + with engine.connect() as conn: + res = conn.execute(text(f"SELECT true FROM pg_catalog.pg_proc WHERE proname='{func_name}'")) + for _ in res: + return True + return False + + def trigger_exists(trg_name): + with engine.connect() as conn: + res = conn.execute(text(f"SELECT true FROM pg_catalog.pg_trigger WHERE tgname='{trg_name}'")) + for _ in res: + return True + return False + + fn_ref_count_increment_exists = function_exists('ltss_hass_attributes_ref_count_increment') + fn_ref_count_decrement_exists = function_exists('ltss_hass_attributes_ref_count_decrement') + trg_ref_count_increment_exists = trigger_exists('trg_ltss_hass_attributes_ref_count_increment') + trg_ref_count_decrement_exists = trigger_exists('trg_ltss_hass_attributes_ref_count_decrement') + if inspector.has_table('ltss'): + # If we find an old column, not yet transformed into JSONB, we ignore it and force the cast on migration + # to the new two table schema. No need to run the transform beforehand. + if not inspector.has_table('ltss_hass') and not inspector.has_table('ltss_hass_attributes'): + _LOGGER.warning( + 'Migrating your old LTSS table to the new 2 table schema, this might take a couple of minutes!' + ) + + with engine.begin() as con: + con.execute(text('ALTER TABLE ltss RENAME TO ltss_old')) + con.execute( + text( + f"""INSERT INTO {LTSS.__tablename__} (time, entity_id, state, location, attributes_key) + SELECT + l.time, + l.entity_id, + l.state, + l.location, + CASE + WHEN l.attributes IS NOT NULL THEN + text2ltree(l.entity_id || '.' || + encode( + sha256(regexp_replace(l.attributes::text, '\\\\n', '', 'ng')::bytea), 'hex') + ) + END + FROM ltss_old l ON CONFLICT DO NOTHING""" + ) + ) + con.execute( + text( + f"""INSERT INTO {LTSS_ATTRIBUTES.__tablename__} (attributes_key, attributes) + SELECT text2ltree(l.entity_id || '.' || + encode( + sha256(regexp_replace(l.attributes::text, '\\\\n', '', 'ng')::bytea), 'hex') + ), + l.attributes::jsonb + FROM ltss_old l WHERE l.attributes IS NOT NULL ON CONFLICT DO NOTHING""" + ) + ) + con.execute( + text( + f"""create or replace view ltss as + select + row_number() over (rows unbounded preceding) as id, + l.time, + l.entity_id, + l.state, + l.location, + a.attributes + from {LTSS.__tablename__} + left join {LTSS_ATTRIBUTES.__tablename__} + on l.attributes_key is not null + and l.attributes_key = a.attributes_key""" + ) + ) + if not fn_ref_count_increment_exists: + con.execute( + text( + f"""create or replace function ltss_hass_attributes_ref_count_increment() returns trigger + language plpgsql as $$ + begin + update {LTSS_ATTRIBUTES.__tablename__} + set ref_count = ref_count + 1 + where attributes_key = NEW.attributes_key; + return null; + end; $$""" + ) + ) + if not fn_ref_count_decrement_exists: + con.execute( + text( + f"""create or replace function ltss_hass_attributes_ref_count_decrement() returns trigger + language plpgsql as $$ + declare + remaining bigint; + begin + if OLD.attributes_key is null then + return null; + end if; + + update {LTSS_ATTRIBUTES.__tablename__} + set ref_count = ref_count - 1 + where attributes_key = OLD.attributes_key + returning ref_count + into remaining; + + if remaining <= 0 then + -- orphaned attributes row, deleting + delete from {LTSS_ATTRIBUTES.__tablename__} + where attributes_key = OLD.attributes_key; + end if; + end; $$""" + ) + ) + if not trg_ref_count_increment_exists: + con.execute(text( + f"""create trigger trg_ltss_hass_attributes_ref_count_increment + after insert or update on {LTSS.__tablename__} + for each row execute function ltss_hass_attributes_ref_count_increment()""" + )) + if not trg_ref_count_decrement_exists: + con.execute(text( + f"""create trigger trg_ltss_hass_attributes_ref_count_decrement + after delete on {LTSS.__tablename__} + for each row execute function ltss_hass_attributes_ref_count_decrement()""" + )) + # Not yet executed automatically: + # con.execute(text("DROP TABLE ltss_old")) + _LOGGER.warning( + 'The old table has been renamed to \'ltss_old\' and all data is migrated. The old table is not ' + + 'deleted though. If everything works please run the following command manually: \n' + + 'DROP TABLE ltss_old;' + ) + con.commit() + _LOGGER.info('Migration completed successfully!') + diff --git a/custom_components/ltss/models.py b/custom_components/ltss/models.py index 3fccf29..62edb70 100644 --- a/custom_components/ltss/models.py +++ b/custom_components/ltss/models.py @@ -1,20 +1,22 @@ """Models for SQLAlchemy.""" -import json -from datetime import datetime +from datetime import datetime, date import logging from sqlalchemy import ( Column, - BigInteger, DateTime, - String, Text, + BIGINT ) from sqlalchemy.schema import Index -from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.dialects.postgresql import JSONB, ExcludeConstraint +from sqlalchemy_utils import LtreeType, Ltree from geoalchemy2 import Geometry from sqlalchemy.orm import column_property, declarative_base +import json +import hashlib +import re # SQLAlchemy Schema # pylint: disable=invalid-name @@ -23,15 +25,20 @@ _LOGGER = logging.getLogger(__name__) +def datetime_json_encoder(o): + if isinstance(o, (datetime, date)): + return o.isoformat() + raise TypeError("Type %s not serializable" % type(o)) + + class LTSS(Base): # type: ignore """State change history.""" - __tablename__ = "ltss" - id = Column(BigInteger, primary_key=True, autoincrement=True) + __tablename__ = "ltss_hass" time = Column(DateTime(timezone=True), default=datetime.utcnow, primary_key=True) - entity_id = Column(String(255)) - state = Column(String(255), index=True) - attributes = Column(JSONB) + entity_id = Column(Text, primary_key=True) + state = Column(Text) + attributes_key = Column(LtreeType) location = None # when not activated, no location column will be added to the table/database @classmethod @@ -59,17 +66,44 @@ def from_event(cls, event): location = f'SRID=4326;POINT({lon} {lat})' if lon and lat else None + state_json = json.dumps(attrs, default=datetime_json_encoder) + attributes_key_data = re.sub(r'\\n', '', state_json) + attributes_key = Ltree(f"{entity_id}.{hashlib.sha256(attributes_key_data.encode()).hexdigest()}") + row = LTSS( entity_id=entity_id, time=event.time_fired, state=state.state, - attributes=attrs, + attributes_key=attributes_key, location=location ) - return row + attributes_row = { + "attributes_key": attributes_key, + "attributes": state_json + } + + return row, attributes_row + + +LTSS_time_entityid_composite_index = Index( + 'ltss_hass_time_entity_id_idx', LTSS.time.desc(), LTSS.entity_id, postgresql_using='btree' +) + + +class LTSS_ATTRIBUTES(Base): + __tablename__ = f'{LTSS.__tablename__}_attributes' + attributes_key = Column(LtreeType) + attributes = Column(JSONB) + ref_count = Column(BIGINT) + __table_args__ = ( + ExcludeConstraint((attributes_key, '=')), + ) + __mapper_args__ = { + "primary_key": [attributes_key] + } + -LTSS_attributes_index = Index('ltss_attributes_idx', LTSS.attributes, postgresql_using='gin') -LTSS_entityid_time_composite_index = Index( - 'ltss_entityid_time_composite_idx', LTSS.entity_id, LTSS.time.desc() +LTSS_ATTRIBUTES_attributes_index = Index( + 'ltss_hass_attributes_attributes_idx', LTSS_ATTRIBUTES.attributes, postgresql_using='gin' ) diff --git a/requirements.txt b/requirements.txt index 5eb1b0e..b665885 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ GeoAlchemy2==0.13.1 psycopg2==2.9.5 +sqlalchemy-utils==0.40.0 SQLAlchemy # Pinned by HA version \ No newline at end of file diff --git a/tests/test_databases.py b/tests/test_databases.py index 298f7ca..2997b83 100644 --- a/tests/test_databases.py +++ b/tests/test_databases.py @@ -51,6 +51,7 @@ def ltss_init_wrapper(container): "postgresql://postgres@localhost:" + container.ports["5432/tcp"][0]["HostPort"], 123, + 456, lambda x: False, ) @@ -70,7 +71,6 @@ def test_lite(self): @pytest.mark.parametrize( "version", [ - "latest-pg9.6", "latest-pg10", "latest-pg11", "latest-pg12",