Skip to content

Commit

Permalink
initial work to redesign the timescale tables
Browse files Browse the repository at this point in the history
  • Loading branch information
noctarius committed Mar 2, 2023
1 parent 2f0362c commit e74c823
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 79 deletions.
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Enabling simple long time state storage (LTSS) for your sensor states in a Postg
The following extensions are required for full functionality:
* TimescaleDB
* PostGIS
* Ltree

LTSS automatically detects the available extensions and creates the necessary table accordingly. A PostgeSQL instance without those extensions can be used but will lack some features: efficient storing and accessing time-series data (without TimescaleDB) and directly accessing geolocation data of logged data (without PostGis).

Expand Down Expand Up @@ -45,7 +46,7 @@ configuration.yaml

ltss:
db_url: postgresql://USER:PASSWORD@HOST_ADRESS/DB_NAME
chunk_time_interval: 2592000000000
chunk_time_interval: 604800000000
include:
domains:
- sensor
Expand All @@ -66,7 +67,11 @@ configuration.yaml

chunk_time_interval
(int)(Optional)
The time interval to be used for chunking in TimescaleDB in microseconds. Defaults to 2592000000000 (30 days). Ignored for databases without TimescaleDB extension.
The time interval to be used for chunking in TimescaleDB in microseconds. Defaults to 604800000000 (7 days). Ignored for databases without TimescaleDB extension.

chunk_compression_after
(int)(Optional)
The time interval after which TimescaleDB will compress old chunks in microseconds. Defaults to 1209600000000 (14 days). Ignored for databases without TimescaleDB extension.

exclude
(map)(Optional)
Expand Down
56 changes: 44 additions & 12 deletions custom_components/ltss/__init__.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,22 @@
"""Support for recording details."""
import asyncio
import concurrent.futures
from contextlib import contextmanager
from datetime import datetime, timedelta
import logging
import queue
import threading
import time
import json
from typing import Any, Dict, Optional, Callable
from typing import Any, Callable

import voluptuous as vol
from sqlalchemy import exc, create_engine, inspect, text
from sqlalchemy.engine import Engine
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.dialects.postgresql import insert

import psycopg2

from homeassistant.const import (
ATTR_ENTITY_ID,
CONF_DOMAINS,
CONF_ENTITIES,
CONF_EXCLUDE,
CONF_INCLUDE,
EVENT_HOMEASSISTANT_START,
EVENT_HOMEASSISTANT_STOP,
EVENT_STATE_CHANGED,
Expand All @@ -39,7 +33,7 @@
import homeassistant.util.dt as dt_util
from homeassistant.helpers.json import JSONEncoder

from .models import Base, LTSS
from .models import Base, LTSS, LTSS_ATTRIBUTES
from .migrations import check_and_migrate

_LOGGER = logging.getLogger(__name__)
Expand All @@ -48,6 +42,7 @@

CONF_DB_URL = "db_url"
CONF_CHUNK_TIME_INTERVAL = "chunk_time_interval"
CONF_CHUNK_COMPRESSION_AFTER = "chunk_compression_after"

CONNECT_RETRY_WAIT = 3

Expand All @@ -57,8 +52,11 @@
{
vol.Required(CONF_DB_URL): cv.string,
vol.Optional(
CONF_CHUNK_TIME_INTERVAL, default=2592000000000
): cv.positive_int, # 30 days
CONF_CHUNK_TIME_INTERVAL, default=604800000000
): cv.positive_int, # 7 days
vol.Optional(
CONF_CHUNK_COMPRESSION_AFTER, default=1209600000000
): cv.positive_int # 14 days
}
)
},
Expand All @@ -72,12 +70,14 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:

db_url = conf.get(CONF_DB_URL)
chunk_time_interval = conf.get(CONF_CHUNK_TIME_INTERVAL)
chunk_compression_after = conf.get(CONF_CHUNK_COMPRESSION_AFTER)
entity_filter = convert_include_exclude_filter(conf)

instance = LTSS_DB(
hass=hass,
uri=db_url,
chunk_time_interval=chunk_time_interval,
chunk_compression_after=chunk_compression_after,
entity_filter=entity_filter,
)
instance.async_initialize()
Expand All @@ -94,6 +94,7 @@ def __init__(
hass: HomeAssistant,
uri: str,
chunk_time_interval: int,
chunk_compression_after: int,
entity_filter: Callable[[str], bool],
) -> None:
"""Initialize the ltss."""
Expand All @@ -104,6 +105,7 @@ def __init__(
self.recording_start = dt_util.utcnow()
self.db_url = uri
self.chunk_time_interval = chunk_time_interval
self.chunk_compression_after = chunk_compression_after
self.async_db_ready = asyncio.Future()
self.engine: Any = None
self.run_info: Any = None
Expand Down Expand Up @@ -206,8 +208,15 @@ def notify_hass_started(event):
with self.get_session() as session:
with session.begin():
try:
row = LTSS.from_event(event)
row, attributes_row = LTSS.from_event(event)
session.add(row)
session.execute(
LTSS_ATTRIBUTES.__table__.insert().values(
attributes_row
).on_conflict_do_nothing(
[LTSS_ATTRIBUTES.attributes_key.name, LTSS_ATTRIBUTES.attributes.name]
)
)
except (TypeError, ValueError):
_LOGGER.warning(
"State is not JSON serializable: %s",
Expand Down Expand Up @@ -336,6 +345,29 @@ def _create_table(self, available_extensions):
if_not_exists => TRUE);"""
)
)
con.execute(
text(
f"""ALTER TABLE {LTSS.__tablename__} SET (timescaledb.compress,
timescaledb.compress_orderby = 'time, entity_id'"""
)
)
con.execute(
text(
f"""SELECT add_compression_policy('{LTSS.__tablename__}',
{self.chunk_compression_after})"""
)
)

if 'ltree' not in available_extensions:
_LOGGER.error("ltree extension is required, but not found...")

if 'ltree' not in available_extensions:
_LOGGER.error("ltree extension is available, enabled it...")
con.execute(
text(
"CREATE EXTENSION IF NOT EXISTS ltree CASCADE"
)
)

def _close_connection(self):
"""Close the connection."""
Expand Down
5 changes: 3 additions & 2 deletions custom_components/ltss/manifest.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
{
"domain": "ltss",
"version": "2.0.0",
"version": "2.0.1",
"name": "Long Time State Storage (LTSS)",
"documentation": "https://github.com/freol35241/ltss",
"requirements": [
"sqlalchemy>=2.0,<3.0",
"psycopg2>=2.8,<3.0",
"geoalchemy2>=0.8,<1.0"
"geoalchemy2>=0.13,<1.0",
"sqlalchemy-utils>=0.40.0,<1.0"
],
"dependencies": [],
"codeowners": [
Expand Down
114 changes: 66 additions & 48 deletions custom_components/ltss/migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,68 +2,86 @@

from sqlalchemy import inspect, text, Text

from .models import LTSS, LTSS_attributes_index, LTSS_entityid_time_composite_index
from .models import LTSS, LTSS_ATTRIBUTES

_LOGGER = logging.getLogger(__name__)


def check_and_migrate(engine):

#Inspect the DB
iengine = inspect(engine)
columns = iengine.get_columns(LTSS.__tablename__)
indexes = iengine.get_indexes(LTSS.__tablename__)
# Inspect the DB
inspector = inspect(engine)
columns = inspector.get_columns('ltss') # old table name here
indexes = inspector.get_indexes(LTSS.__tablename__)

def index_exists(index_name):
matches = [idx for idx in indexes if idx["name"] == index_name]
return True if matches else False

# Attributes column Text -> JSONB
attributes_column = next(col for col in columns if col["name"] == 'attributes')
if isinstance(attributes_column['type'], Text):
_LOGGER.warning('Migrating you LTSS table to the latest schema, this might take a couple of minutes!')
migrate_attributes_text_to_jsonb(engine)
_LOGGER.info('Migration completed successfully!')

# Attributes Index?
if not index_exists('ltss_attributes_idx'):
_LOGGER.warning('Creating an index for the attributes column, this might take a couple of minutes!')
create_attributes_index(engine)
_LOGGER.info('Index created successfully!')

# entity_id and time composite Index?
if not index_exists('ltss_entityid_time_composite_idx'):
_LOGGER.warning('Creating a composite index over entity_id and time columns, this might take a couple of minutes!')
create_entityid_time_index(engine)
_LOGGER.info('Index created successfully!')

if index_exists('ix_ltss_entity_id'):
_LOGGER.warning('Index on entity_id no longer needed, dropping...')
drop_entityid_index(engine)

if inspector.has_table('ltss'):
# Attributes column Text -> JSONB
attributes_column = next(col for col in columns if col["name"] == 'attributes')
if isinstance(attributes_column['type'], Text):
_LOGGER.warning('Migrating your LTSS table to the latest schema, this might take a couple of minutes!')
migrate_attributes_text_to_jsonb(engine)
_LOGGER.info('Migration completed successfully!')

if not inspector.has_table('ltss_hass_attributes'):
_LOGGER.warning('Migrating your LTSS table to the latest schema, this might take a couple of minutes!')
with engine.begin() as con:
con.execute(text('ALTER TABLE ltss RENAME TO ltss_old'))
con.execute(text(f"""INSERT INTO {LTSS.__tablename__} (time, entity_id, state_text, state_float,
state_int, state_bool, location, attributes_key)
SELECT
l.time,
l.entity_id,
CASE
WHEN l.state !~ '^[0-9]*[0-9]?\\.[0-9]+$'
AND l.state !~ '^[0-9]+$'
AND l.state !~* 'off'
AND l.state !~* 'on' THEN
l.state
END,
CASE
WHEN l.state ~ '^[0-9]*[0-9]?\\.[0-9]+$' THEN
l.state::double precision
END,
CASE
WHEN l.state ~ '^[0-9]+$' THEN
l.state::int8
END,
CASE
WHEN l.state ~* 'on' THEN
true
WHEN l.state ~* 'off' THEN
false
END,
l.location,
CASE
WHEN l.attributes IS NOT NULL THEN
text2ltree(l.entity_id || '.' ||
encode(
sha256(regexp_replace(l.attributes::text, '\\\\n', '', 'ng')::bytea),
'hex'))
END
FROM ltss_old l ON CONFLICT DO NOTHING"""))
con.execute(text(f"""INSERT INTO {LTSS_ATTRIBUTES.__tablename__} (attributes_key, attributes)
SELECT text2ltree(l.entity_id || '.' ||
encode(sha256(regexp_replace(l.attributes::text, '\\\\n', '', 'ng')::bytea),
'hex')),
l.attributes
FROM ltss_old l WHERE l.attributes IS NOT NULL ON CONFLICT DO NOTHING"""))
con.execute(text("DROP TABLE ltss_old"))
con.commit()


def migrate_attributes_text_to_jsonb(engine):

with engine.connect() as con:

_LOGGER.info("Migrating attributes column from type text to type JSONB")
con.execute(text(
f"""ALTER TABLE {LTSS.__tablename__}
f"""ALTER TABLE ltss
ALTER COLUMN attributes TYPE JSONB USING attributes::JSONB;"""
).execution_options(autocommit=True))

def create_attributes_index(engine):

_LOGGER.info("Creating GIN index on the attributes column")
LTSS_attributes_index.create(bind=engine)

def create_entityid_time_index(engine):

_LOGGER.info("Creating composite index over entity_id and time columns")
LTSS_entityid_time_composite_index.create(bind=engine)

def drop_entityid_index(engine):

with engine.connect() as con:
con.execute(text(
f"""DROP INDEX ix_ltss_entity_id;"""
).execution_options(autocommit=True))


Loading

0 comments on commit e74c823

Please sign in to comment.