Skip to content

Commit

Permalink
Merge pull request #495 from gipert/dev
Browse files Browse the repository at this point in the history
Fix several `FileDB.from_disk()` issues
  • Loading branch information
gipert authored May 16, 2023
2 parents 9115270 + 9243670 commit fe79201
Show file tree
Hide file tree
Showing 7 changed files with 324 additions and 97 deletions.
53 changes: 36 additions & 17 deletions src/pygama/flow/file_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,11 +508,26 @@ def from_disk(self, path: str | list[str]) -> None:
raise FileNotFoundError(path)

sto = lh5.LH5Store()
# objects that will be used to configure the FileDB at the end
# objects/accumulators that will be used to configure the FileDB at the end
_cfg = None
_df = None
_columns = None

# function needed later in the loop
def _replace_idx(row, trans, tier):
col = row[f"{tier}_col_idx"]
if col is None:
return None

col = np.array(col)
new_col = np.copy(col)

for idx, new_idx in trans.items():
new_col[np.where(col == idx)] = new_idx

return new_col.tolist()

# loop over the files
for p in paths:
cfg, _ = sto.read_object("config", p)
cfg = json.loads(cfg.value.decode())
Expand All @@ -538,31 +553,35 @@ def from_disk(self, path: str | list[str]) -> None:
_columns = columns
_df = df
continue

elif _columns != columns:
log.debug("found inconsistent FileDB, trying to merge")
# if columns are not the same, need to merge the two dataframes
# in the right way
# in the right way. loop over new columns
idx_trans = {}
for idx, cols in enumerate(columns):
new_idx = None

# the columns might be a new entry...
if cols not in _columns:
# add the new column at the end and save its index
_columns += [cols]
new_idx = len(_columns) - 1
# ...or just located (at a different index?) in the
# existing column list
else:
new_idx = _columns.index(cols)

def _replace_idx(row, idx, new_idx, tier):
col = row[f"{tier}_col_idx"]
if col is None:
return None
else:
return [new_idx if x == idx else x for x in col]

# now go through the new dataframe and update the old index
# everywhere in the {tier}_col_idx columns
for tier in list(_cfg["tier_dirs"].keys()):
df[f"{tier}_col_idx"] = df.apply(
_replace_idx,
args=(idx, new_idx, tier),
axis=1,
)
idx_trans[idx] = new_idx

# now go through the new dataframe and update the old index
# everywhere in the {tier}_col_idx columns
for tier in list(_cfg["tier_dirs"].keys()):
df[f"{tier}_col_idx"] = df.apply(
_replace_idx,
args=(idx_trans, tier),
axis=1,
)

# now we can safely concat the dataframes
_df = pd.concat([_df, df], ignore_index=True, copy=False)
Expand Down
21 changes: 20 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
import inspect
import os
import re
import shutil
import uuid
from getpass import getuser
from pathlib import Path
from tempfile import gettempdir

import numpy as np
import pytest
Expand All @@ -12,12 +16,27 @@
from pygama.raw import build_raw

config_dir = Path(__file__).parent / "dsp" / "configs"
_tmptestdir = os.path.join(
gettempdir(), "pygama-tests-" + getuser() + str(uuid.uuid4())
)


@pytest.fixture(scope="session")
def tmptestdir():
os.mkdir(_tmptestdir)
yield _tmptestdir
shutil.rmtree(_tmptestdir)


def pytest_sessionfinish(session, exitstatus):
if exitstatus is True:
os.rmdir(_tmptestdir)


@pytest.fixture(scope="session")
def lgnd_test_data():
ldata = LegendTestData()
ldata.checkout("39f9927")
ldata.checkout("c089a59")
return ldata


Expand Down
8 changes: 4 additions & 4 deletions tests/flow/configs/filedb-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
"tcm": "/{type}/{period}/{run}/{exp}-{period}-{run}-{type}-{timestamp}-tier_tcm.lh5"
},
"table_format": {
"raw": "ch{ch:03d}/raw",
"dsp": "ch{ch:03d}/dsp",
"hit": "ch{ch:03d}/hit",
"raw": "ch{ch:07d}/raw",
"dsp": "ch{ch:07d}/dsp",
"hit": "ch{ch:07d}/hit",
"evt": "{grp}/evt",
"tcm": "hardware_tcm"
"tcm": "hardware_tcm_1"
}
}
48 changes: 26 additions & 22 deletions tests/flow/test_data_loader.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from pathlib import Path

import numpy as np
import pandas as pd
import pytest

Expand Down Expand Up @@ -77,14 +78,15 @@ def test_no_merge(test_dl):

assert isinstance(data, dict)
assert isinstance(data[0], lgdo.Table)
assert len(data) == 2
assert len(data) == 4 # 4 files
assert list(data[0].keys()) == ["hit_table", "hit_idx", "timestamp"]


def test_outputs(test_dl):
test_dl.set_files("all")
test_dl.set_files("type == 'phy'")
test_dl.set_datastreams([1057600, 1059201], "ch")
test_dl.set_output(
fmt="pd.DataFrame", columns=["timestamp", "channel", "bl_mean", "hit_par1"]
fmt="pd.DataFrame", columns=["timestamp", "channel", "energies", "energy_in_pe"]
)
data = test_dl.load()

Expand All @@ -95,59 +97,61 @@ def test_outputs(test_dl):
"file",
"timestamp",
"channel",
"bl_mean",
"hit_par1",
"energies",
"energy_in_pe",
]


def test_any_mode(test_dl):
test_dl.filedb.scan_tables_columns()
test_dl.set_files("all")
test_dl.set_cuts({"hit": "daqenergy == 634"})
test_dl.set_files("type == 'phy'")
test_dl.set_cuts({"hit": "daqenergy == 10221"})
el = test_dl.build_entry_list(tcm_level="tcm", mode="any")

assert len(el) == 42
assert len(el) == 6


def test_set_files(test_dl):
test_dl.set_files("timestamp == '20220716T104550Z'")
test_dl.set_files("timestamp == '20230318T012144Z'")
test_dl.set_output(columns=["timestamp"], merge_files=False)
data = test_dl.load()

assert len(data) == 1


def test_set_keylist(test_dl):
test_dl.set_files(["20220716T104550Z", "20220716T104550Z"])
test_dl.set_files(["20230318T012144Z", "20230318T012228Z"])
test_dl.set_output(columns=["timestamp"], merge_files=False)
data = test_dl.load()

assert len(data) == 1
assert len(data) == 2


def test_set_datastreams(test_dl):
test_dl.set_files("all")
test_dl.set_datastreams([1, 3, 8], "ch")
test_dl.set_output(columns=["channel"], merge_files=False)
channels = [1084803, 1084804, 1121600]
test_dl.set_files("timestamp == '20230318T012144Z'")
test_dl.set_datastreams(channels, "ch")
test_dl.set_output(columns=["eventnumber"], fmt="pd.DataFrame", merge_files=False)
data = test_dl.load()

assert (data[0]["hit_table"].nda == [1, 3, 8]).all()
assert (data[0]["channel"].nda == [1, 3, 8]).all()
assert np.array_equal(data[0]["hit_table"].unique(), channels)


def test_set_cuts(test_dl):
test_dl.set_files("all")
test_dl.set_cuts({"hit": "card == 3"})
test_dl.set_output(columns=["card"])
test_dl.set_files("type == 'cal'")
test_dl.set_cuts({"hit": "is_valid_cal == False"})
test_dl.set_datastreams([1084803], "ch")
test_dl.set_output(columns=["is_valid_cal"], fmt="pd.DataFrame")
data = test_dl.load()

assert (data["hit_table"].nda == [12, 13, 12, 13]).all()
assert (data.is_valid_cal == False).all() # noqa: E712


def test_browse(test_dl):
test_dl.set_files("all")
test_dl.set_files("type == 'phy'")
test_dl.set_datastreams([1057600, 1059201], "ch")
test_dl.set_output(
fmt="pd.DataFrame", columns=["timestamp", "channel", "bl_mean", "hit_par1"]
fmt="pd.DataFrame", columns=["timestamp", "channel", "energies", "energy_in_pe"]
)
wb = test_dl.browse()

Expand Down
Loading

0 comments on commit fe79201

Please sign in to comment.