Skip to content

Commit

Permalink
fixup some syntax and other errors
Browse files Browse the repository at this point in the history
  • Loading branch information
weatherhead99 committed Dec 12, 2023
1 parent a162b0f commit 36f7666
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 59 deletions.
108 changes: 74 additions & 34 deletions python/lsst/ts/observatory/control/auxtel/atcalsys.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from typing import List, Optional, NamedTuple, TYPE_CHECKING
from ..base_calsys import BaseCalsys, HardcodeCalsysThroughput, CalibrationSequenceStepBase
from ..base_calsys import CalsysScriptIntention
from ..base_calsys import CalsysScriptIntention, _calsys_get_parameter
from lsst.ts import salobj
from lsst.ts.idl.enums import ATMonochromator
from lsst.ts.idl.enums import ATMonochromator, Electrometer
from lsst.ts.idl.enums import ATWhiteLight
import asyncio
import astropy.units as un
Expand Down Expand Up @@ -51,20 +51,25 @@ class ATCalsys(BaseCalsys, HardcodeCalsysThroughput):
WHITELIGHT_LAMP_WARMUP_TIMEOUT: Quantity[un.physical.time] = 15 << un.min

SHUTTER_OPEN_TIMEOUT: Quantity[un.physical.time] = 15 << un.min
CAL_PROGRAM_NAME: str = "AT_flats"

def __init__(self, intention: CalsysScriptIntention, **kwargs):
super().__init__(intention, components=self._AT_SAL_COMPONENTS, **kwargs)

#instance variables we'll set later
self._specsposure_time: Optional[float] = None
self._elecsposure_time: Optional[float] = None
self._n_spec_exps: Optional[int] = None
self._n_elec_exps: Optional[int] = None

async def setup_for_wavelength(
self, wavelen: float, nelec: float, spectral_res: float
self, wavelen: float, nelec: float, spectral_res: float, **override_kwargs
) -> None:


grating = self.calculate_grating_type(wavelen, spectral_res)
slit_widths = self.calculate_slit_widths(spectral_res, grating)

grating = _calsys_get_parameter(override_kwargs, "grating", self.calculate_grating_type,
wavelen, spectral_res)
slit_widths = _calsys_get_parameter(override_kwargs, "slit_widths", self.calculate_slit_widths,
wavelen, spectral_res, grating)
self.log.debug(
f"setting up monochromtor with wavlength {wavelen} nm and spectral resolution {spectral_res}"
)
Expand Down Expand Up @@ -93,12 +98,22 @@ async def setup_for_wavelength(
[monoch_fut, elect_fut, elect_fut2], return_when=asyncio.ALL_COMPLETED
)
self.log.debug("all SAL setup commands returned")
specsposure_time = self.spectrograph_exposure_time_for_nelectrons(nelec)
return
self._specsposure_time = _calsys_get_parameter(override_kwargs, "specsposure_time",
self.spectrograph_exposure_time_for_nelectrons,
nelec)
self._elecsposure_time = _calsys_get_parameter(override_kwargs, "elecsposure_time",
self.pd_exposure_time_for_nelectrons,
nelec)
self._n_spec_exps = _calsys_get_parameter(override_kwargs, "n_spec_exps",
self.spectrograph_n_exps_for_nelectrons, nelec)
self._n_elec_exps = _calsys_get_parameter(override_kwargs, "n_elec_exps",
self.pd_n_exps_for_nelectrons, nelec)


def calculate_slit_width(self, spectral_res: float, grating) -> ATSpectrographSlits:
# NOTE: this will either need to be derived by doing calculations on the Grating equation, or by loading in calibration data (which I couldn't find yet!)
pass

def calculate_slit_width(self,wavelen: float, spectral_res: float, grating) -> Optional[ATSpectrographSlits]:
# NOTE: this will either need to be derived by doing calculations on the Grating equation, or by loading in calibration data (which I couldn't find yet!). For now we just return the
raise NotImplementedError("calculation of slit widths not available yet, override in script parameters!")

def calculate_grating_type(self, wavelen: float, spectral_res: float) -> ATMonochromator.Grating:
# TODO: placeholder logic, in particular the exact numbers will be WRONG!
Expand All @@ -109,27 +124,35 @@ def calculate_grating_type(self, wavelen: float, spectral_res: float) -> ATMonoc
return ATMonochromator.Grating.BLUE
return ATMonochromator.Grating.RED

async def _setup_spectrograph(self, int_time: float) -> None:
pass

async def _setup_electrometer(self, int_time: float):
pass
async def _electrometer_expose(self) -> Awaitable[list[str]]:
assert self._n_elec_exps is not None
assert self._elecsposure_time is not None
out_urls: list[str] = []

for i in range(self._n_elec_exps):
await self._sal_cmd(self.Electrometer, "startScanDt", scanDuration=self._elecsposure_time)
lfa_obj_fut = await self._sal_waitevent(self.Electrometer, "largeFileObjectAvailable")
out_urls.append(lfa_obj_fut.url)
return out_urls

async def _electrometer_expose(self, exp_time: float) -> Awaitable[str]:
await self._sal_cmd(self.Electrometer, "startScanDt", scanDuration=exp_time,
run_immediate=False)
lfa_obj = self._sal_waitevent(self.Electrometer, "largeFileObjectAvailable",
run_immediate=False)
return lfa_obj.url
async def _spectrograph_expose(self) -> Awaitable[list[str]]:
assert self._n_spec_exps is not None
assert self._specsposure_time is not None

async def _spectrograph_expose(self, exp_time: float, numExposures: int) -> Awaitable[str]:
await self._sal_cmd(self.ATSpectrograph, "expose", numExposures = numExposures)
lfa_obj = await self._sal_waitevent(self.ATSpectrograph, "largeFileObjectAvailable",
run_immediate=False)
return lfa_obj.url
out_urls: list[str] = []
for i in range(self._n_spec_exps):
await self._sal_cmd(self.ATSpectrograph, "expose", numExposures = numExposures)
lfa_obj_fut = await self._sal_waitevent(self.ATSpectrograph, "largeFileObjectAvailable",
run_immediate=true)

out_urls.append(lfa_obj_fut.url)
return out_urls

@property
def _electrometer_object(self):
return self.Electrometer

@property
def script_time_estimate_s(self) -> float:
"""Property that returns the estimated time for the script to run in units of seconds
Expand Down Expand Up @@ -185,12 +208,12 @@ async def power_sequence_run(self, scriptobj: salobj.BaseScript):
self.WHITELIGHT_LAMP_WARMUP_TIMEOUT)
await scriptobj.checkpoint("lamp has cooled down")
await self._chiller_power(False)
self.log.info("chiller has been turned off, ATCalsys is powered down"!)
self.log.info("chiller has been turned off, ATCalsys is powered down")

case _:
raise NotImplementedError("don't know how to handle this script intention")

async def validate_hardware_status_for_acquisition(self) -> Awaitable[float]:
async def validate_hardware_status_for_acquisition(self) -> Awaitable:
shutter_fut = self._sal_waitevent(self.ATWhiteLight, "shutterState")
lamp_fut = self._sal_waitevent(self.ATWhiteLight, "lampState")

Expand All @@ -201,7 +224,7 @@ async def validate_hardware_status_for_acquisition(self) -> Awaitable[float]:
raise RuntimeError(errmsg)

if shutter_state.actualState != ATWhiteLight.ShutterState.OPEN:
errmsg = f"shutter is not open, its state is reported as {repr(shutter_state.actualState)}")
errmsg = f"shutter is not open, its state is reported as {repr(shutter_state.actualState)}"
self.log.error(errmsg)
raise RuntimeError(errmsg)

Expand All @@ -211,11 +234,9 @@ async def validate_hardware_status_for_acquisition(self) -> Awaitable[float]:
self.log.error(errmsg)
raise RuntimeError(errmsg)

if !lamp_state.lightDetected:
if not lamp_state.lightDetected:
self.log.warning(f"all states seem fine, but lamp is not reporting light detected!")

lamp_power: float = lamp_state.setPower
return lamp_power

def _chiller_temp_check(self, temps) -> bool:
self.log.debug(f"Chiller supply temperature: {temps.supplyTemperature:0.1f} C "
Expand Down Expand Up @@ -251,7 +272,8 @@ async def _lamp_power(self, onoff:bool) -> Awaitable:
shutter_task = self._sal_cmd(self.ATWhiteLight, shutter_cmd_target, run_immediate=False)

#now start the lamp
lamp_start_task = self._sal_cmd(self.ATWhiteLight, lamp_cmd_target, run_immediate=False)
lamp_start_task = self._sal_cmd(self.ATWhiteLight, lamp_cmd_target, run_immediate=False,
power=self.WHITELIGHT_POWER)

await asyncio.wait([shutter_task, lamp_start_task], timeout=self._cmd_timeout,
return_when=asyncio.FIRST_EXCEPTION)
Expand Down Expand Up @@ -283,4 +305,22 @@ def lamp_verify(evt):
return self._long_wait_err_handle(lamp_evt_gen, lamp_settle_timeout,
lamp_verify, lamp_transition_name)


async def take_calibration_data(self) -> Awaitable[dict[str, list[str]]]:
spec_fut = self._spectrograph_expose()
elec_fut = self._electrometer_expose()

spec_results, elec_results = await asyncio.gather(spec_fut, elec_fut)
return {"spectrometer_urls" : spec_results,
"electrometer_urls" : elec_results}


@property
def program_reason(self) -> str:
return "AT_flats"

@property
def prgoram_note(self) -> str:
return "TODO"


77 changes: 52 additions & 25 deletions python/lsst/ts/observatory/control/base_calsys.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
from abc import ABCMeta, abstractmethod
from dataclasses import dataclass
from .remote_group import RemoteGroup
from typing import Iterable, Optional, Tuple, List, Union
from typing import Sequence, Mapping, TypeAlias, Any
from typing import TypeVar
from typing import Iterable, Optional, Union
from typing import Sequence, Mapping, TypeAlias, Any, Awaitable
from functools import reduce
from collections.abc import Coroutine, Callable, AsyncGenerator
from operator import mul
from lsst.ts import salobj
from lsst.ts.idl.enums import Electrometer
import logging
import asyncio
from importlib.resources import files
Expand All @@ -17,9 +16,17 @@
import astropy.units as un
import enum
from datetime import datetime
from itertools import count


Responsivity: TypeAlias = Quantity[ampere / watt]

def _calsys_get_parameter(indct: dict[str, Any], key: str, factory_callable: Callable,
*factory_args, **factory_kwargs):
if indct.get(key, None) is None:
return factory_callable(*factory_args, **factory_kwargs)


@dataclass
class CalibrationSequenceStepBase:
wavelength: float
Expand Down Expand Up @@ -173,12 +180,13 @@ class BaseCalsys(RemoteGroup, metaclass=ABCMeta):
CMD_TIMEOUT: Quantity[un.physical.time] = 30 << un.s
EVT_TIMEOUT: Quantity[un.physical.time] = 30 << un.s
TELEM_TIMEOUT: Quantity[un.physical.time] = 30 << un.s
CAL_PROGRAM_NAME: str = "flats"

def __init__(
self,
intention: CalsysScriptIntention,
components: Iterable[str],
domain: Optional[salobj.domain] = None,
domain: Optional[salobj.domain.Domain] = None,
cmd_timeout: Optional[int] = 10,
log: Optional[logging.Logger] = None,
):
Expand Down Expand Up @@ -269,12 +277,11 @@ def log_event_timings(cls, logger, time_evt_name: str,
logstr2 = f"the duration was: {duration}, and our timeout allowance was: {expt_duration}"
logger.info(logstr2)



async def take_electrometer_exposures(
self, electrobj, exp_time_s: float, n: int
) -> List[str]:
urlout: List[str] = []
) -> list[str]:
urlout: list[str] = []
for i in range(n):
await electrobj.cmd_StartScanDt.set_start(scanDuration=exp_time)
lfaurl = await self._lfa_event_helper(electrobj)
Expand Down Expand Up @@ -305,21 +312,31 @@ def detector_exposure_time_for_nelectrons(
assert issubclass(type(self), CalsysThroughputCalculationMixin)

def spectrograph_exposure_time_for_nelectrons(self, nelec: float) -> float:
pass
raise NotImplementerError("throughput calc for spectrograph not implemented yet!")

def spectrograph_n_exps_for_nelectrons(self, nelec: float) -> int:
raise NotImplementedError("throughput calc for spectrograph not implemented yet!")

def pd_exposure_time_for_nelectrons(self, nelec: float) -> float:
pass
raise NotImplementedError("throughput calc for electrometer not implemented yet!")

def pd_n_exps_for_nelectrons(self, nelec: float) -> int:
raise NotImplementedError("throughput calc for electrometer not implemented yet")


@property
@abstractmethod
def _electrometer_object(self) -> Electrometer:
pass

@abstractmethod
async def validate_hardware_status_for_acquisition(self) -> Awaitable:
pass

@abstractmethod
async def power_sequence_run(self, scriptobj, **kwargs):
async def power_sequence_run(self, scriptobj, **kwargs) -> Awaitable:
pass


@abstractmethod
async def setup_for_wavelength(self, wavelen: float, **extra_params) -> None:
"""awaitable which sets up the various remote components of a calibration system
Expand All @@ -339,25 +356,35 @@ async def setup_for_wavelength(self, wavelen: float, **extra_params) -> None:
pass

@abstractmethod
async def take_calibration_instr_exposures(self) -> None:
async def take_calibration_data(self):
"""awaitable which starts the exposures for the calibration instruments (i.e the spectrographs, electrometers etc) according to the setup. It does not take images with the instrument under test, it is intended that script components which use this class do that themselves"""
pass

@property
@abstractmethod
def wavelen(self) -> Quantity[nm]:
"""returns the currently configured wavelength"""

@abstractmethod
async def take_detector_data(self):
"""This will fire off all async tasks to take calibration data in sequence, and return locations a
nd metadata about the files supplied etc"""
async def generate_data_flats(self, instrobj, scriptobj, exposure_time_s: float, n_iter: Optional[int] = None):
"""returns an async generator which yields sets of exposures from """

# Run forever if n_iter was not given, don't worry it's just a generator
nrange = count() if n_iter is None else range(n_iter)

for i in nrange:
self.log.info("taking flats number %d", i)
instr_task = instrobj.take_flats(exposure_time_s, nflats=1, groupid=scriptobj.group_id,
program = self.CAL_PROGRAM_NAME,
reason = self.program_reason,
note = self.program_note
)
instr_fut = asyncio.create_task(instr_task)
aux_cal_fut = self.take_calibration_data()
instr_results, aux_cal_results = await asyncio.gather(aux_cal_fut, instr_fut)
yield instr_results, aux_cal_results

@property
@abstractmethod
async def gen_calibration_auxiliaries(self):
pass
def program_reason(self) -> str: ...

@property
@abstractmethod
def program_note(self) -> str: ...

async def wait_ready(self):
""" Method to wait for prepared state for taking data - e.g. lamps on, warmed up, laser warmed up etc"""

0 comments on commit 36f7666

Please sign in to comment.