From 9c3daba01bbe5be8644ca66932482959c8ccebce Mon Sep 17 00:00:00 2001 From: Daniel Thom Date: Thu, 12 Dec 2024 17:08:46 -0700 Subject: [PATCH] Fix regression in arrow_storage --- src/infrasys/arrow_storage.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/src/infrasys/arrow_storage.py b/src/infrasys/arrow_storage.py index a1dc7ce..4bf2f02 100644 --- a/src/infrasys/arrow_storage.py +++ b/src/infrasys/arrow_storage.py @@ -116,7 +116,14 @@ def _get_single_time_series( base_ts = pa.ipc.open_file(source).get_record_batch(0) logger.trace("Reading time series from {}", fpath) index, length = metadata.get_range(start_time=start_time, length=length) - data = base_ts[str(metadata.time_series_uuid)][index : index + length] + columns = base_ts.column_names + if len(columns) != 1: + msg = f"Bug: expected a single column: {columns=}" + raise Exception(msg) + # This should be equal to metadata.time_series_uuid in versions + # v0.2.1 or later. Earlier versions used the time series variable name. + column = columns[0] + data = base_ts[column][index : index + length] if metadata.quantity_metadata is not None: np_array = metadata.quantity_metadata.quantity_type( data, metadata.quantity_metadata.units @@ -137,14 +144,17 @@ def get_raw_single_time_series(self, time_series_uuid: UUID) -> NDArray: with pa.OSFile(str(fpath), "r") as source: base_ts = pa.ipc.open_file(source).get_record_batch(0) logger.trace("Reading time series from {}", fpath) - return base_ts[str(time_series_uuid)].to_numpy() - - def _convert_to_record_batch( - self, time_series_array: NDArray, variable_name: str - ) -> pa.RecordBatch: + columns = base_ts.column_names + if len(columns) != 1: + msg = f"Bug: expected a single column: {columns=}" + raise Exception(msg) + column = columns[0] + return base_ts[column].to_numpy() + + def _convert_to_record_batch(self, time_series_array: NDArray, column: str) -> pa.RecordBatch: """Create record batch to save array to disk.""" pa_array = pa.array(time_series_array) - schema = pa.schema([pa.field(variable_name, pa_array.type)]) + schema = pa.schema([pa.field(column, pa_array.type)]) return pa.record_batch([pa_array], schema=schema)