Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[hailtop.saige] Implement SAIGE in QoB #13804

Open
wants to merge 75 commits into
base: main
Choose a base branch
from

Conversation

jigold
Copy link
Contributor

@jigold jigold commented Oct 12, 2023

@danking I have a mostly completed draft for SAIGE in QoB. Can you take a look? I'm mainly looking for enough feedback to get a green light to actually start testing this end to end, fill in the remaining not implemented components, add documentation, add verbosity and possibly a dry run feature, and support VEP annotations natively.

There are a couple of core concepts:

  1. Phenotypes - Set of phenotypes to test. I support the ability to group phenotypes together. This is in anticipation of a new version of SAIGE that Wei is going to release soon.
  2. VariantChunks - The set of variant intervals of data to test per job. If it's SAIGE-GENE, then there's also the "groups" to actually test within that interval.
  3. io - There's a bunch of wrappers that handle input and output files so all of that logic combined with the checkpointing logic is abstracted away from what is actually going on.
  4. steps - These are the SAIGE modules to run. They are all dataclasses with configuration options
  5. saige - There's a class that can be instantiated in Python or I started writing the framework for a CLI. This has the code that builds the DAG end to end.

All configuration happens with a yaml file that can overwrite default parameters for each step such as whether to checkpoint or where the results should be written to. For the CLI, I envision you can either give a config file and/or specify --overrides step1_null_glmm.use_checkpoint=true. For every Saige run, I write out the configuration used to a file in the output directory as well as information about the input data and variant chunks and the batch information.

pyproject.toml Outdated
@@ -1,7 +1,7 @@
[tool.black]
line-length = 120
skip-string-normalization = true
force-exclude = 'hail|datasets|sql|\.mypy'
force-exclude = 'datasets|sql|\.mypy'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just so I can run black for now on my new code.

@danking
Copy link
Contributor

danking commented Oct 12, 2023

Very exciting!

@danking
Copy link
Contributor

danking commented Oct 16, 2023

Apologies for delay; I will look this afternoon.

Copy link
Contributor

@danking danking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some high-level thoughts on structure. I think I'd generally bias towards just writing Python code rather than YAMLs, it feels like that adds complexity that doesn't justify itself with added value, but let me know if I'm off-base on that.


__all__ = [
'Backend'
'Backend',
'ServiceBackend',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this?

@@ -5,15 +5,19 @@
from .backend import LocalBackend, ServiceBackend, Backend
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit skeptical of leaking these names into the top-level namespace; seems better to keep them a bit hidden.

from .constants import SaigeAnalysisType


class AliasedResourceGroup:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it'll get a bit confusing. Why not consistently use the actual names or change the file names to be the new names? I generally dislike having two names for the exact same thing.

b: hb.Batch, config: CheckpointConfigMixin, bfile_root: str
) -> Optional['PlinkResourceGroup']:
if config.use_checkpoints:
all_files_exist = all(hfs.exists(f'{bfile_root}{ext}') for ext in ['.bed', '.bim', '.fam'])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kind of logic will start to get slow if there are lots of files. It'd be better to use async the whole way through so that you can check file existence massively in parallel.


def __iter__(self):
for group in self._grouped_phenotypes:
yield self._grouped_phenotypes[group]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit skeptical of using a class here; this is basically a list plus phenotype_names. Is the savings of having phenotype_names worth replacing a very familiar class, List[...] with an unfamiliar one Phenotypes?

] = None,
):
if config is not None:
config = SaigeConfig.from_yaml_file(config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit skeptical of YAML files for configuration. Why is this easier than using Python to configure it?

I think the YAML would be:

version=1
name='saige'
attributes:
    foo: bar
    ...
prepare_inputs:
    ...
...

And the Python would be:

from ... import run_saige

config = SaigeConfig(
    version=1,
    name='Saige',
    attributes={
        'foo': 'bar',
        ...
    },
    prepare_inputs=...
)

run_saige(config)

In the YAML case, I don't have an IDE that can give me hints about types and parameter names. If I get a parameter name wrong in YAML, I probably don't find out until we parse it? Or maybe it just gets ignored?

In Python, the IDE gives me hints and can give me red squiggles exactly where there's an error. Also, if I want to do some kind of preprocessing, automation, etc., I can just use Python to do that.

Copy link
Contributor Author

@jigold jigold Oct 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it depends on how we want hailtop.saige (or similar) to be used. Do we want to support a command line interface or does the user have to directly use the Python functions and write their own wrappers and use the Python config classes? If we do support a command line interface, I think it's a pain to have to use a bunch of command line options especially with all of the different SAIGE steps having their own parameters so I thought having a configuration file that was easy to specify the configuration would be easier. I was also inspired by the machine learning library I've been playing with where the configuration is done this way since there's just so many parameters to specify. Maybe it's overkill though for this use case. I can think about it some more.

Copy link
Contributor

@danking danking Oct 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree a CLI is a nice way to invoke a simple tool. I'd rather to cp x y than python3 -c 'import cp; cp("x", "y"). However, once the interface is more complex than a CLI, the choice is a configuration file or general purpose programming language. If the choice is between those two, my preference is for the latter, but maybe I'm missing some clear win of the former. To be really specific it feels like the cli+config is:

./cli-tool --config config.yaml --maybe-another-option

And the python script is:

python3 config.py

Admittedly, if you want to pass extra command line options to config.py you have to use argparse or sys.argv.

'attributes': b.attributes,
'timestamp': str(datetime.datetime.now()),
}
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kind of dump seems more appropriate for a log statement

return create_chunks_by_group(variants, group, max_count_per_chunk, max_span_per_chunk)


class VariantChunks:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is just a List[VariantChunk], I think it's better to just pass around the List itself

class Step2SPAStep(CheckpointConfigMixin, JobConfigMixin):
base_name: str = '{{ phenotype["name"] }}-{{ chunk["idx"] }}'
save_stdout: bool = True
base_output_root: str = '{{ output_dir }}/results/{{ phenotype["name"] }}/{{ chunk["idx"] }}'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should use templates for this; these are just functions:

def base_name(phenotype: str, chunk_idx: int):
    return f'{phenotype}-{chunk_idx}'

if value in ('0', 'false', 'False'):
value = False
elif value in ('1', 'true', 'True'):
value = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't the YAML parser do this?

@jigold
Copy link
Contributor Author

jigold commented Oct 19, 2023

@danking Can you take another look? The only thing I didn't address is the Phenotypes -> List[Phenotype], VariantChunks -> List[VariantChunk]. I don't want to rip it out yet in case Wei comes out with the new SAIGE implementation with phenotype groupings. Again, just looking for a green light to start testing it. We can figure out the question about the CLI later on.

@staticmethod
async def use_checkpoint_if_exists_and_requested(
fs: AsyncFS, pool: AsyncWorkerPool, b: hb.Batch, config: CheckpointConfigMixin, grm_root: str
) -> Optional['SaigeSparseGRMResourceGroup']:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I can probably consolidate this even more with more classmethods. Will do that in the next round.

@jigold
Copy link
Contributor Author

jigold commented Oct 19, 2023

Just realized that I didn't entirely make this parallel enough yet. Will do tomorrow.

from .saige import SAIGE

__all__ = [
'SAIGE',
Copy link
Contributor

@danking danking Oct 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean for people to create an object of this class and then call run_saige and run_saige_gene multiple times? If not, it seems cleaner to export functions, especially because SAIGE contains an AsyncWorkerPool and an FS, both of which need to live in context managers.

Concretely, my proposal is to expose two functions. Those functions can use with / async with to appropriately manage the lifetime of the FS and the AsyncWorkerPool.

   def run_saige(
        *,
        router_fs_args: Optional[Dict[str, Any]] = None, 
        parallelism: int = 10,
        config: Optional[SaigeConfig] = None,
        mt: Union[str, hl.MatrixTable],
        phenotypes: Union[List[str], Phenotypes],
        covariates: List[str],
        output_dir: str,
        b: Optional[hb.Batch] = None,
        checkpoint_dir: Optional[str] = None,
        variant_chunks: Optional[Union[str, VariantChunks]] = None,
        run_kwargs: Optional[dict] = None,
    ):

    def run_saige_gene(
        *,
        router_fs_args: Optional[Dict[str, Any]] = None, 
        parallelism: int = 10,
        config: Optional[SaigeConfig] = None,
        mt: Union[str, hl.MatrixTable],
        phenotypes: Union[List[str], Phenotypes],
        covariates: List[str],
        group_col: str,
        output_dir: str,
        b: Optional[hb.Batch] = None,
        checkpoint_dir: Optional[str] = None,
        variant_chunks: Optional[VariantChunks] = None,
        run_kwargs: Optional[dict] = None,
    ):

self.config = config

self.fs = RouterAsyncFS(**router_fs_args)
self.pool = AsyncWorkerPool(parallelism)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both of these need to be closed, which implies SAIGE needs to be closed. It should probably be a context manager. Otherwise, I think users are unlikely to close it appropriately.

) -> Tuple[hl.MatrixTable, str, Phenotypes, VariantChunks]:
if isinstance(mt, hl.MatrixTable):
mt_path = f'{working_dir}/input.mt'
mt.checkpoint(mt_path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checkpointing a matrix table writes all the genotypes, which is a huge amount of data. I really think we shouldn't do this under the covers.

I think we either should only accept paths or we should not checkpoint when given an MT. Why do we need to checkpoint? Do we compute on the matrix table twice?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I see that we export cols and the entry data separately. That means we use the MT twice. However, we still shouldn't checkpoint. This punishes the vast majority of cases that just do read_matrix_table with filters or annotates, for the tiny cases that do a column aggregation to produce a phenotype (this really shouldn't happen, the phenotypes should really just be a table or some column annotations).

else:
results = results_tables[0].union(*results_tables[1:])

results.checkpoint(f'{output_dir}/saige_results.ht')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not checkpoint before returning. If a user wants to save the results, they should write which uses higher compression. It also allows the user to perform some post-processing or filtering before writing the results.

for input in input_files:
await waitable_pool.call(fs.exists, input)
await waitable_pool.wait()
files_exist = waitable_pool.results()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When there's only a handful of coroutines, I feel like asyncio.gather is easier to read

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed. I needed to parallelize at the Saige level and not at the io level.

return VCFResourceGroup._from_job_intermediate(j, inputs)


class BgenResourceGroup(BaseSaigeResource):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These classes still seem very duplicative. What happens if you try to have just one class that you instantiate differently for each resource type? A la

def bgen_resource_group():
    return BaseSaigeResource(ResourceGroupInputs(...))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't quite tell if your comment below was intended for the io.py classes as well.

Do you like this better rather than having a class representing the specific type of file group? I can see the argument both ways. The reason I had it the other way was it was more explicit to me what exactly was going on with the function names use_checkpoint, checkpoint_output, from_job_intermediate as well as the explicit typing of a file type.

async def get_or_create_saige_sparse_grm_resource(fs: AsyncFS,
                                                  b: hb.Batch,
                                                  j: hb.BashJob,
                                                  config: CheckpointConfigMixin,
                                                  root: str,
                                                  output_path: str,
                                                  relatedness_cutoff: float,
                                                  num_markers: int) -> Tuple[hb.ResourceGroup, bool]:
    suffix = f'_relatednessCutoff_{relatedness_cutoff}_{num_markers}_randomMarkersUsed'
    if config.use_checkpoints and not config.overwrite:
        maybe_inputs = [f'{root}.{ext}{suffix}' for ext in ['sparseGRM.mtx', 'sparseGRM.mtx.sampleIDs.txt']]  # FIXME: is this correct path?
        all_files_exist = all(await asyncio.gather(*[fs.exists(input) for input in maybe_inputs]))
        if all_files_exist:            
            return (b.read_input_group(grm=f'{root}.sparseGRM.mtx', sample_ids=f'{root}.sparseGRM.mtx.sampleIDs.txt'), True)

    j.declare_resource_group(grm={'grm': '{root}.sparseGRM.mtx{suffix}', 'sample_ids': '{root}.sparseGRM.mtx.sampleIDs.txt{suffix}'})

    if config.checkpoint_output:
        b.write_output(j.grm, output_path)

    return (j.grm, False)


@dataclass
class SparseGRMStep(CheckpointConfigMixin, JobConfigMixin):
    relatedness_cutoff: float = ...
    num_markers: int = ...
    cpu: Union[str, float, int] = 1
    memory: str = 'highmem'
    storage: str = '10Gi'
    spot: bool = True

    def name(self) -> str:
        return 'sparse-grm'

    def output_root(self, output_dir: str) -> str:
        return f'{output_dir}/sparse-grm'

    def attributes(self) -> Optional[Dict]:
        return None

    async def call(self, fs: AsyncFS, b: hb.Batch, input_bfile: PlinkResourceGroup, output_dir: str):
        output_prefix = self.output_root(output_dir)

        create_sparse_grm_task = b.new_job(name=self.name(), attributes=self.attributes())

        (create_sparse_grm_task.cpu(self.cpu).storage(self.storage).image(self.image).spot(self.spot))

        sparse_grm_output, already_exists = await get_or_create_saige_sparse_grm_resource(
            fs,
            b,
            create_sparse_grm_task,
            self,
            ...,
            output_prefix,
            self.relatedness_cutoff,
            self.num_markers,
        )
        if already_exists:
            return sparse_grm_output

        command = f'''
createSparseGRM.R \
    --plinkFile={input_bfile} \
    --nThreads={self.cpu} \
    --outputPrefix={sparse_grm_output.grm} \
    --numRandomMarkerforSparseKin={self.num_markers} \
    --relatednessCutoff={self.relatedness_cutoff}
'''

        create_sparse_grm_task.command(command)

        return sparse_grm_output

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, you can't do this. There's a chicken egg problem where we have to create a new job for this function hence defeating the purpose of using the checkpointed files and not running this job.

fs, pool, b, mt, f'{working_dir}/inputs/phenotypes.tsv', phenotypes.phenotype_names
)
input_data = await self._convert_mt_to_plink(fs, pool, b, mt, f'{working_dir}/inputs/input_data')
return (phenotypes_output, input_data)
Copy link
Contributor

@danking danking Oct 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still feel like there are more classes and lines of code than we need to express the logic here. You're ultimately trying to get some hailtop.batch.Resource objects that you can use with a job.command, right? What do things look like if you go in this kind of direction:

async def write_plink(
    fs: AsyncFS,
    cfg: CheckpointConfigMixin,
    b: Batch,
    mt: MatrixTable,
    fname: str
) -> Resource:
    files = dict(bed=f'{fname}.bed', bim=f'{fname}.bim', fam=f'{fname}.fam')
    if cfg.overwrite or not all(await asyncio.gather(fs.exists(x) for x in files.values())):
        hl.export_plink(mt, fname)
    return b.read_input_group(**files)


async def export_tsv(
    fs: AsyncFS,
    cfg: CheckpointConfigMixin,
    b: Batch,
    ht: Table,
    fname: str
) -> Resource:
    if cfg.overwrite or not await fs.exists(fname):
        ht.export(fname, delimiter='\t')
    return b.read_input(fname)


async def prepare_inputs(
    fs: AsyncFS,
    cfg: CheckpointConfigMixin,
    b: hb.Batch,
    mt: hl.MatrixTable,
    working_dir: str,
    pheno_cols: List[str]
):
    pheno_file = f'{working_dir}/inputs/phenotypes.tsv'
    plink_file = f'{working_dir}/inputs/input_data'

    ht = mt.cols().select(pheno_cols)

    return await asyncio.gather(
        export_tsv(fs, cfg, b, ht, pheno_file),
        write_plink(fs, cfg, b, mt, plink_file)
    )

In fact, restructuring this way reveals that we probably should have a class that contains an FS, a CheckpointConfig, and a Batch. That would save us a couple lines of code and some noise every time we call a function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, I can't quite tell what you are advocating for. Is the issue that you don't like the PrepareInputs step class? My intention was to make this as customizable as possible. Each "Step" in the pipeline is a class. This class must inherit from the CheckpointConfigMixin. The config for each step is present in the SaigeConfig. If a user wanted to prepare their inputs differently with custom matrix table filter functions etc., then they can inherit from the PrepareInputs class and override the appropriate function call (call). Same for all of the other steps.

@jigold jigold dismissed danking’s stale review October 23, 2023 17:10

see comments

Copy link
Contributor

@danking danking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we covered the comments in our conversation; if you want me too look again, dismiss!

@jigold
Copy link
Contributor Author

jigold commented Oct 25, 2023

I don't think I addressed all of your comments, but I'd like you to take another look. I rewrote the io.py component and fixed the parallelism and context managers. I also figure out where to write files to based on the checkpoint_dir argument. We need to results.checkpoint() at the end because I decided to treat the raw SAIGE output as temp files (or checkpoints) and the results as a hail table is the thing people want to save to an output directory. I can make the output directory optional as well and not checkpoint that. The issue is that if we write the raw SAIGE results to a temp dir, when the context manager exits, the results HailTable won't be able to read the data once it's returned back to the user.

@jigold
Copy link
Contributor Author

jigold commented Oct 26, 2023

I did not fix the variant chunks or phenotypes classes to not be actual classes. But I would like your feedback on the rewritten io.py. It still might be overkill, but getting closer.

@danking
Copy link
Contributor

danking commented Nov 28, 2023

This all seems fine to me now. I'm skeptical the new types will catch non-trivial bugs, but they're also not syntactically obtrusive.

I'm strongly opposed to unnecessary writing and reading of data. In much the same way that import_table doesn't immediately write to a table and then read that, I don't think we should be writing the results. If you really feel strongly that the results should be saved to a file, then do not return a table/matrix-table. Just accept a file path and write to that file path.

Copy link
Contributor

@danking danking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dismiss if I should look again

@jigold
Copy link
Contributor Author

jigold commented Mar 14, 2024

For reference, Wei's documentation for SAIGE is here: https://saigegit.github.io/SAIGE-doc/
The code is here: https://github.com/saigegit/SAIGE
Example files are here: https://github.com/saigegit/SAIGE/tree/main/extdata/input
Dockerhub image is here: https://hub.docker.com/r/wzhou88/saige/tags

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants