Skip to content

Commit

Permalink
Fix regression in arrow_storage
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-thom committed Dec 13, 2024
1 parent 54b005c commit 9c3daba
Showing 1 changed file with 17 additions and 7 deletions.
24 changes: 17 additions & 7 deletions src/infrasys/arrow_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,14 @@ def _get_single_time_series(
base_ts = pa.ipc.open_file(source).get_record_batch(0)
logger.trace("Reading time series from {}", fpath)
index, length = metadata.get_range(start_time=start_time, length=length)
data = base_ts[str(metadata.time_series_uuid)][index : index + length]
columns = base_ts.column_names
if len(columns) != 1:
msg = f"Bug: expected a single column: {columns=}"
raise Exception(msg)
# This should be equal to metadata.time_series_uuid in versions
# v0.2.1 or later. Earlier versions used the time series variable name.
column = columns[0]
data = base_ts[column][index : index + length]
if metadata.quantity_metadata is not None:
np_array = metadata.quantity_metadata.quantity_type(
data, metadata.quantity_metadata.units
Expand All @@ -137,14 +144,17 @@ def get_raw_single_time_series(self, time_series_uuid: UUID) -> NDArray:
with pa.OSFile(str(fpath), "r") as source:
base_ts = pa.ipc.open_file(source).get_record_batch(0)
logger.trace("Reading time series from {}", fpath)
return base_ts[str(time_series_uuid)].to_numpy()

def _convert_to_record_batch(
self, time_series_array: NDArray, variable_name: str
) -> pa.RecordBatch:
columns = base_ts.column_names
if len(columns) != 1:
msg = f"Bug: expected a single column: {columns=}"
raise Exception(msg)
column = columns[0]
return base_ts[column].to_numpy()

def _convert_to_record_batch(self, time_series_array: NDArray, column: str) -> pa.RecordBatch:
"""Create record batch to save array to disk."""
pa_array = pa.array(time_series_array)
schema = pa.schema([pa.field(variable_name, pa_array.type)])
schema = pa.schema([pa.field(column, pa_array.type)])
return pa.record_batch([pa_array], schema=schema)


Expand Down

0 comments on commit 9c3daba

Please sign in to comment.