From 28d9e12ccbe2a4f419765ad11b8bbc61c51521d8 Mon Sep 17 00:00:00 2001 From: Dan King Date: Mon, 23 Oct 2023 10:32:17 -0400 Subject: [PATCH] [fs] expose hl.fast_stat and hl.hadoop_fast_stat to users CHANGELOG: Introduce `hl.fs.fast_stat` and `hl.hadoop_fast_stat` which use cheaper Class B Operations in Google Cloud Storage rather than Class A Operations. Users of `hl.hadoop_stat` and `hl.fs.stat` should consider switching. This PR extends #13885 into the public API. --- hail/python/hail/utils/hadoop_utils.py | 42 +++++++++++++++++++++++++ hail/python/hailtop/fs/fs.py | 6 +++- hail/python/hailtop/fs/fs_utils.py | 43 +++++++++++++++++++++++++- hail/python/hailtop/fs/router_fs.py | 28 +++++++++++++---- 4 files changed, 111 insertions(+), 8 deletions(-) diff --git a/hail/python/hail/utils/hadoop_utils.py b/hail/python/hail/utils/hadoop_utils.py index f87203d5a2e..e79b63bafcc 100644 --- a/hail/python/hail/utils/hadoop_utils.py +++ b/hail/python/hail/utils/hadoop_utils.py @@ -179,6 +179,12 @@ def hadoop_is_dir(path: str) -> bool: def hadoop_stat(path: str) -> Dict[str, Any]: """Returns information about the file or directory at a given path. + Warning + ------- + + This function requires the use of more expensive cloud object storage operations than + :func:`.hadoop_fast_stat`. See :func:`.hadoop_fast_stat` for details. + Notes ----- Raises an error if `path` does not exist. @@ -199,10 +205,46 @@ def hadoop_stat(path: str) -> Dict[str, Any]: Returns ------- :obj:`dict` + """ return Env.fs().stat(path).to_legacy_dict() +def hadoop_fast_stat(path: str) -> Dict[str, Any]: + """Returns information about the file or directory at a given path. + + Notes + ----- + + In cloud object stores, `path` could be a file (aka object), a directory (aka prefix), or + both. Determining if a path is or is not a directory is typically more expensive than retrieving + file metadata such as the size or creation time. For example, Google Cloud Storage charges a + list-by-prefix operation as a "Class A Operation" and an object-metadata operation as a "Class B + Operation". + + This function does not determine if `path` is a directory, it only returns metadata about the + file at `path`, if a file exists. + + The resulting dictionary contains the following data: + + - size_bytes (:obj:`int`) -- Size in bytes. + - size (:class:`str`) -- Size as a readable string. + - modification_time (:class:`str`) -- Time of last file modification. + - owner (:class:`str`) -- Owner. + - path (:class:`str`) -- Path. + + Parameters + ---------- + path : :class:`str` + + Returns + ------- + :obj:`dict` + + """ + return Env.fs().fast_stat(path).to_legacy_dict() + + def hadoop_ls(path: str) -> List[Dict[str, Any]]: """Returns information about files at `path`. diff --git a/hail/python/hailtop/fs/fs.py b/hail/python/hailtop/fs/fs.py index a11942f582d..49e1303557b 100644 --- a/hail/python/hailtop/fs/fs.py +++ b/hail/python/hailtop/fs/fs.py @@ -1,7 +1,7 @@ import abc from typing import IO, List -from .stat_result import FileListEntry +from .stat_result import FileListEntry, FileStatus class FS(abc.ABC): @@ -29,6 +29,10 @@ def is_dir(self, path: str) -> bool: def stat(self, path: str) -> FileListEntry: raise NotImplementedError + @abc.abstractmethod + def fast_stat(self, path: str) -> FileStatus: + raise NotImplementedError + @abc.abstractmethod def ls(self, path: str) -> List[FileListEntry]: raise NotImplementedError diff --git a/hail/python/hailtop/fs/fs_utils.py b/hail/python/hailtop/fs/fs_utils.py index 1b6c42089ad..e7746ae4286 100644 --- a/hail/python/hailtop/fs/fs_utils.py +++ b/hail/python/hailtop/fs/fs_utils.py @@ -2,7 +2,7 @@ from typing import List from .router_fs import RouterFS -from .stat_result import FileListEntry +from .stat_result import FileListEntry, FileStatus _router_fs = None @@ -153,6 +153,12 @@ def is_dir(path: str) -> bool: def stat(path: str) -> FileListEntry: """Returns information about the file or directory at a given path. + Warning + ------- + + This function requires the use of more expensive cloud object storage operations than + :func:`.fast_stat`. See :func:`.fast_stat` for details. + Notes ----- Raises an error if `path` does not exist. @@ -177,6 +183,41 @@ def stat(path: str) -> FileListEntry: return _fs().stat(path) +def fast_stat(path: str) -> FileStatus: + """Returns information about the file or directory at a given path. + + Notes + ----- + In cloud object stores, `path` could be a file (aka object), a directory (aka prefix), or + both. Determining if a path is or is not a directory is typically more expensive than retrieving + file metadata such as the size or creation time. For example, Google Cloud Storage charges a + list-by-prefix operation as a "Class A Operation" and an object-metadata operation as a "Class B + Operation". + + This function does not determine if `path` is a directory, it only returns metadata about the + file at `path`, if a file exists. + + If no file exists at `path`, this function raises an error. + + The resulting dictionary contains the following data: + + - size_bytes (:obj:`int`) -- Size in bytes. + - size (:class:`str`) -- Size as a readable string. + - modification_time (:class:`str`) -- Time of last file modification. + - owner (:class:`str`) -- Owner. + - path (:class:`str`) -- Path. + + Parameters + ---------- + path : :class:`str` + + Returns + ------- + :obj:`dict` + """ + return _fs().fast_stat(path) + + def ls(path: str) -> List[FileListEntry]: """Returns information about files at `path`. diff --git a/hail/python/hailtop/fs/router_fs.py b/hail/python/hailtop/fs/router_fs.py index 217fdb372b3..8f91e0b8e17 100644 --- a/hail/python/hailtop/fs/router_fs.py +++ b/hail/python/hailtop/fs/router_fs.py @@ -13,7 +13,7 @@ from hailtop.utils import bounded_gather2, async_to_blocking from .fs import FS -from .stat_result import FileType, FileListEntry +from .stat_result import FileType, FileListEntry, FileStatus class SyncReadableStream(io.RawIOBase, BinaryIO): # type: ignore # https://github.com/python/typeshed/blob/a40d79a4e63c4e750a8d3a8012305da942251eb4/stdlib/http/client.pyi#L81 @@ -152,7 +152,7 @@ def write(self, b): return async_to_blocking(self.aws.write(b)) -def _stat_result(is_dir: bool, size_bytes_and_time_modified: Optional[Tuple[int, float]], path: str) -> FileListEntry: +def _file_list_entry_result(is_dir: bool, size_bytes_and_time_modified: Optional[Tuple[int, float]], path: str) -> FileListEntry: if size_bytes_and_time_modified: size_bytes, time_modified = size_bytes_and_time_modified else: @@ -161,9 +161,17 @@ def _stat_result(is_dir: bool, size_bytes_and_time_modified: Optional[Tuple[int, return FileListEntry( path=path.rstrip('/'), + owner=None, size=size_bytes, typ=FileType.DIRECTORY if is_dir else FileType.FILE, + modification_time=time_modified) + + +def _file_status_result(size_bytes: int, time_modified: float, path: str) -> FileStatus: + return FileStatus( + path=path.rstrip('/'), owner=None, + size=size_bytes, modification_time=time_modified) @@ -244,8 +252,8 @@ def stat(self, path: str) -> FileListEntry: if maybe_sb_and_t is None: if not is_dir: raise FileNotFoundError(path) - return _stat_result(True, None, path) - return _stat_result(is_dir, maybe_sb_and_t, path) + return _file_list_entry_result(True, None, path) + return _file_list_entry_result(is_dir, maybe_sb_and_t, path) async def _size_bytes_and_time_modified_or_none(self, path: str) -> Optional[Tuple[int, float]]: try: @@ -255,6 +263,14 @@ async def _size_bytes_and_time_modified_or_none(self, path: str) -> Optional[Tup except FileNotFoundError: return None + def fast_stat(self, path: str) -> FileStatus: + file_status = async_to_blocking(self.afs.statfile(path)) + return _file_status_result( + async_to_blocking(file_status.size()), + file_status.time_modified().timestamp(), + path + ) + async def _aiofle_to_fle(self, fle: AIOFileListEntry) -> FileListEntry: async def maybe_status() -> Optional[Tuple[int, float]]: try: @@ -262,7 +278,7 @@ async def maybe_status() -> Optional[Tuple[int, float]]: return (await file_status.size(), file_status.time_modified().timestamp()) except IsADirectoryError: return None - return _stat_result( + return _file_list_entry_result( *await asyncio.gather(fle.is_dir(), maybe_status(), fle.url())) def ls(self, @@ -385,7 +401,7 @@ async def ls_as_dir() -> Optional[List[FileListEntry]]: self._size_bytes_and_time_modified_or_none(path), ls_as_dir()) if maybe_sb_and_t is not None: - file_stat = _stat_result(False, maybe_sb_and_t, path) + file_stat = _file_list_entry_result(False, maybe_sb_and_t, path) if maybe_contents is not None: if error_when_file_and_directory: raise ValueError(f'{path} is both a file and a directory')