diff --git a/pex/common.py b/pex/common.py index 446972b93..efbf5b90d 100644 --- a/pex/common.py +++ b/pex/common.py @@ -6,6 +6,7 @@ import atexit import contextlib import errno +import io import itertools import os import re @@ -137,6 +138,26 @@ def do_copy(): do_copy() +_COPY_BUFSIZE = 64 * 1024 + + +def copy_file_range(source, destination, length, buffer_size=_COPY_BUFSIZE): + # type: (io.BufferedIOBase, io.BufferedIOBase, int, int) -> None + """Implementation of shutil.copyfileobj() that only copies exactly `length` bytes.""" + # We require a BufferedIOBase in order to avoid handling short reads or writes. + remaining_length = length + if buffer_size > length: + buffer_size = length + cur_buf = bytearray(buffer_size) + while remaining_length > buffer_size: + assert source.readinto(cur_buf) == buffer_size + assert destination.write(cur_buf) == buffer_size + remaining_length -= buffer_size + remainder = source.read(remaining_length) + assert len(remainder) == remaining_length + assert destination.write(remainder) == remaining_length + + # See http://stackoverflow.com/questions/2572172/referencing-other-modules-in-atexit class MktempTeardownRegistry(object): def __init__(self): @@ -281,18 +302,32 @@ def safe_mkdir(directory, clean=False): return directory +def _ensure_parent(filename): + # type: (str) -> None + parent_dir = os.path.dirname(filename) + if parent_dir: + safe_mkdir(parent_dir) + + def safe_open(filename, *args, **kwargs): """Safely open a file. ``safe_open`` ensures that the directory components leading up the specified file have been created first. """ - parent_dir = os.path.dirname(filename) - if parent_dir: - safe_mkdir(parent_dir) + _ensure_parent(filename) return open(filename, *args, **kwargs) # noqa: T802 +def safe_io_open(filename, *args, **kwargs): + # type: (str, Any, Any) -> io.IOBase + """``safe_open()``, but using ``io.open()`` instead. + + With the right arguments, this ensures the result produces a buffered file handle on py2.""" + _ensure_parent(filename) + return cast("io.IOBase", io.open(filename, *args, **kwargs)) + + def safe_delete(filename): # type: (str) -> None """Delete a file safely. @@ -606,9 +641,13 @@ def delete(self): # type: () -> None shutil.rmtree(self.chroot) + # This directory traversal, file I/O, and compression can be made faster with complex + # parallelism and pipelining in a compiled language, but the result is much harder to package, + # and is still less performant than effective caching. See investigation in + # https://github.com/pantsbuild/pex/issues/2158 and https://github.com/pantsbuild/pex/pull/2175. def zip( self, - filename, # type: str + output_file, # type: Union[str, io.IOBase] mode="w", # type: str deterministic_timestamp=False, # type: bool exclude_file=lambda _: False, # type: Callable[[str], bool] @@ -626,7 +665,7 @@ def zip( selected_files = self.files() compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED - with open_zip(filename, mode, compression) as zf: + with open_zip(output_file, mode, compression) as zf: def write_entry( filename, # type: str @@ -640,6 +679,7 @@ def write_entry( if deterministic_timestamp else None, ) + # FIXME: this ignores the zinfo.compress_type value from zip_entry_from_file()! zf.writestr(zip_entry.info, zip_entry.data, compression) def get_parent_dir(path): diff --git a/pex/pex_builder.py b/pex/pex_builder.py index 2a5433ee7..1c0660209 100644 --- a/pex/pex_builder.py +++ b/pex/pex_builder.py @@ -7,6 +7,7 @@ import logging import os import shutil +import zipfile from pex import pex_warnings from pex.atomic_directory import atomic_directory @@ -17,9 +18,9 @@ is_pyc_temporary_file, safe_copy, safe_delete, + safe_io_open, safe_mkdir, safe_mkdtemp, - safe_open, ) from pex.compatibility import commonpath, to_bytes from pex.compiler import Compiler @@ -37,9 +38,10 @@ from pex.tracer import TRACER from pex.typing import TYPE_CHECKING from pex.util import CacheHelper +from pex.ziputils import MergeableZipFile, buffered_zip_archive if TYPE_CHECKING: - from typing import Dict, Optional + from typing import ClassVar, Dict, Iterable, Optional, Tuple # N.B.: __file__ will be relative when this module is loaded from a "" `sys.path` entry under @@ -95,14 +97,14 @@ def __maybe_run_venv__(pex, pex_root, pex_path): venv_dir = venv_dir( pex_file=pex, - pex_root=pex_root, + pex_root=pex_root, pex_hash={pex_hash!r}, has_interpreter_constraints={has_interpreter_constraints!r}, pex_path=pex_path, ) venv_pex = os.path.join(venv_dir, 'pex') if not __execute__ or not is_exe(venv_pex): - # Code in bootstrap_pex will (re)create the venv after selecting the correct interpreter. + # Code in bootstrap_pex will (re)create the venv after selecting the correct interpreter. return venv_dir TRACER.log('Executing venv PEX for {{}} at {{}}'.format(pex, venv_pex)) @@ -441,6 +443,7 @@ def set_header(self, header): self._header = header def _add_dist_dir(self, path, dist_name, fingerprint=None): + # type: (str, str, Optional[str]) -> str target_dir = os.path.join(self._pex_info.internal_cache, dist_name) if self._copy_mode == CopyMode.SYMLINK: self._copy_or_link(path, target_dir, label=dist_name) @@ -557,6 +560,7 @@ def _copy_or_link(self, src, dst, label=None): elif self._copy_mode == CopyMode.SYMLINK: self._chroot.symlink(src, dst, label) else: + assert self._copy_mode == CopyMode.LINK self._chroot.link(src, dst, label) def _prepare_bootstrap(self): @@ -636,7 +640,7 @@ def build( """Package the PEX application. By default, the PEX is packaged as a zipapp for ease of shipping as a single file, but it - can also be packaged in spread mode for efficiency of syncing over the network + can also be packaged in a packed layout for efficiency of syncing over the network incrementally. :param path: The path where the PEX should be stored. @@ -700,6 +704,28 @@ def set_sh_boot_script( ) self.set_header(script) + def _setup_pex_info(self): + # type: () -> Tuple[PexInfo, str] + pex_info = self._pex_info.copy() + pex_info.update(PexInfo.from_env()) + + bootstrap_hash = pex_info.bootstrap_hash + if bootstrap_hash is None: + raise AssertionError( + "Expected bootstrap_hash to be populated for {}.".format(self._pex_info) + ) + + return (pex_info, bootstrap_hash) + + _DIRECT_SOURCE_LABELS = ( + "executable", + "importhook", + "main", + "manifest", + "resource", + "source", + ) # type: ClassVar[Iterable[str]] + def _build_packedapp( self, dirname, # type: str @@ -707,75 +733,112 @@ def _build_packedapp( compress=True, # type: bool ): # type: (...) -> None - - pex_info = self._pex_info.copy() - pex_info.update(PexInfo.from_env()) + pex_info, bootstrap_hash = self._setup_pex_info() # Include user sources, PEX-INFO and __main__ as loose files in src/. - for fileset in ("executable", "importhook", "main", "manifest", "resource", "source"): - for f in self._chroot.filesets.get(fileset, ()): - dest = os.path.join(dirname, f) - safe_mkdir(os.path.dirname(dest)) - safe_copy(os.path.realpath(os.path.join(self._chroot.chroot, f)), dest) - - # Pex historically only supported compressed zips in packed layout, so we don't disturb the - # old cache structure for those zips and instead just use a subdir for un-compressed zips. - # This works for our two zip caches (we'll have no collisions with legacy compressed zips) - # since the bootstrap zip has a known name that is not "un-compressed" and "un-compressed" - # is not a valid wheel name either. - def zip_cache_dir(path): - # type: (str) -> str - if compress: - return path - return os.path.join(path, "un-compressed") + with TRACER.timed("copying over uncached sources", V=9): + for fileset in self._DIRECT_SOURCE_LABELS: + for f in self._chroot.filesets.get(fileset, ()): + dest = os.path.join(dirname, f) + safe_mkdir(os.path.dirname(dest)) + safe_copy(os.path.realpath(os.path.join(self._chroot.chroot, f)), dest) # Zip up the bootstrap which is constant for a given version of Pex. - bootstrap_hash = pex_info.bootstrap_hash - if bootstrap_hash is None: - raise AssertionError( - "Expected bootstrap_hash to be populated for {}.".format(self._pex_info) - ) - cached_bootstrap_zip_dir = zip_cache_dir( - os.path.join(pex_info.pex_root, "bootstrap_zips", bootstrap_hash) - ) - with atomic_directory(cached_bootstrap_zip_dir) as atomic_bootstrap_zip_dir: - if not atomic_bootstrap_zip_dir.is_finalized(): - self._chroot.zip( - os.path.join(atomic_bootstrap_zip_dir.work_dir, pex_info.bootstrap), - deterministic_timestamp=deterministic_timestamp, - exclude_file=is_pyc_temporary_file, - strip_prefix=pex_info.bootstrap, - labels=("bootstrap",), - compress=compress, - ) - safe_copy( - os.path.join(cached_bootstrap_zip_dir, pex_info.bootstrap), - os.path.join(dirname, pex_info.bootstrap), + cached_bootstrap_zip = self._get_or_insert_into_bootstrap_cache( + pex_info, + bootstrap_hash, + compress=compress, ) + safe_copy(cached_bootstrap_zip, os.path.join(dirname, pex_info.bootstrap)) # Zip up each installed wheel chroot, which is constant for a given version of a # wheel. if pex_info.distributions: internal_cache = os.path.join(dirname, pex_info.internal_cache) os.mkdir(internal_cache) - for location, fingerprint in pex_info.distributions.items(): - cached_installed_wheel_zip_dir = zip_cache_dir( - os.path.join(pex_info.pex_root, "packed_wheels", fingerprint) - ) - with atomic_directory(cached_installed_wheel_zip_dir) as atomic_zip_dir: - if not atomic_zip_dir.is_finalized(): - self._chroot.zip( - os.path.join(atomic_zip_dir.work_dir, location), - deterministic_timestamp=deterministic_timestamp, - exclude_file=is_pyc_temporary_file, - strip_prefix=os.path.join(pex_info.internal_cache, location), - labels=(location,), - compress=compress, - ) - safe_copy( - os.path.join(cached_installed_wheel_zip_dir, location), - os.path.join(internal_cache, location), + for dist_label, fingerprint in pex_info.distributions.items(): + cached_packed_zip = self._get_or_insert_into_packed_wheel_cache( + pex_info, + dist_label, + fingerprint, + compress=compress, ) + safe_copy(cached_packed_zip, os.path.join(internal_cache, dist_label)) + + @staticmethod + def get_zip_cache_subdir(base_path, compress): + # type: (str, bool) -> str + """Get the cache location for the entry at `base_path` depending on the value of `compress`. + + Pex historically only supported compressed zips in packed layout, so we don't disturb the + old cache structure for those zips and instead just use a subdir for un-compressed zips. + This works for our two zip caches (we'll have no collisions with legacy compressed zips) + since the bootstrap zip has a known name that is not "un-compressed" and "un-compressed" + is not a valid wheel name either. + """ + if compress: + return base_path + return os.path.join(base_path, "un-compressed") + + def _get_or_insert_into_bootstrap_cache( + self, + pex_info, # type: PexInfo + bootstrap_hash, # type: str + compress, # type: bool + ): + # type: (...) -> str + cached_bootstrap_zip_dir = self.get_zip_cache_subdir( + os.path.join(pex_info.pex_root, "bootstrap_zips", bootstrap_hash), + compress=compress, + ) + with atomic_directory(cached_bootstrap_zip_dir) as atomic_bootstrap_zip_dir: + if not atomic_bootstrap_zip_dir.is_finalized(): + with TRACER.timed("generating bootstrap cache for hash {}".format(bootstrap_hash)): + self._chroot.zip( + os.path.join(atomic_bootstrap_zip_dir.work_dir, pex_info.bootstrap), + deterministic_timestamp=True, + exclude_file=is_pyc_temporary_file, + strip_prefix=pex_info.bootstrap, + labels=("bootstrap",), + compress=compress, + ) + return os.path.join(cached_bootstrap_zip_dir, pex_info.bootstrap) + + def _get_or_insert_into_packed_wheel_cache( + self, + pex_info, # type: PexInfo + dist_label, # type: str + fingerprint, # type: str + compress, # type: bool + ): + # type: (...) -> str + cached_installed_wheel_zip_dir = self.get_zip_cache_subdir( + os.path.join(pex_info.pex_root, "packed_wheels", fingerprint), + compress=compress, + ) + with atomic_directory(cached_installed_wheel_zip_dir) as atomic_zip_dir: + if not atomic_zip_dir.is_finalized(): + with TRACER.timed("generating packed wheel cache for {}".format(dist_label)): + self._chroot.zip( + os.path.join(atomic_zip_dir.work_dir, dist_label), + # These will be getting served from a cache regardless of the value of + # deterministic_timestamp in the caller, so best to explicitly mark them as + # synthetic in some way to avoid timestamps changing depending on the + # cache contents. + deterministic_timestamp=True, + # When configured with a `copy_mode` of `CopyMode.SYMLINK`, we symlink + # distributions as pointers to installed wheel directories in + # ~/.pex/installed_wheels/... Since those installed wheels reside in + # a shared cache, they can be in-use by other processes and so their code + # may be in the process of being bytecode compiled as we attempt to zip up + # our chroot. Bytecode compilation produces ephemeral temporary pyc files + # that we should avoid copying since they are useless and inherently racy. + exclude_file=is_pyc_temporary_file, + strip_prefix=os.path.join(pex_info.internal_cache, dist_label), + labels=(dist_label,), + compress=compress, + ) + return os.path.join(cached_installed_wheel_zip_dir, dist_label) def _build_zipapp( self, @@ -784,24 +847,87 @@ def _build_zipapp( compress=True, # type: bool ): # type: (...) -> None - with safe_open(filename, "wb") as pexfile: - assert os.path.getsize(pexfile.name) == 0 - pexfile.write(to_bytes("{}\n".format(self._shebang))) - if self._header: - pexfile.write(to_bytes(self._header)) + pex_info, bootstrap_hash = self._setup_pex_info() + with TRACER.timed("Zipping PEX file."): - self._chroot.zip( - filename, - mode="a", - deterministic_timestamp=deterministic_timestamp, - # When configured with a `copy_mode` of `CopyMode.SYMLINK`, we symlink distributions - # as pointers to installed wheel directories in ~/.pex/installed_wheels/... Since - # those installed wheels reside in a shared cache, they can be in-use by other - # processes and so their code may be in the process of being bytecode compiled as we - # attempt to zip up our chroot. Bytecode compilation produces ephemeral temporary - # pyc files that we should avoid copying since they are useless and inherently - # racy. - exclude_file=is_pyc_temporary_file, - compress=compress, - ) + # We will be reusing the same file handle to wrap with ZipFile, which needs to be able + # to read the file contents when instantiated, so it can check for existing zip + # file data. Hence, we open with "w+b". + with safe_io_open(filename, "w+b") as pexfile: + # (1) Write shebang line and any header script to a non-zip file handle. + pexfile.write(to_bytes("{}\n".format(self._shebang))) + if self._header: + pexfile.write(to_bytes(self._header)) + + # (2) Zip up everything that won't be retrieved from a cache first. + # NB: This produces a slightly different output than if we had simply zipped up the + # entire chroot at once; the sources and resources will now be listed before the + # contents of the .bootstrap/ directory. This isn't necessary, but it makes the + # output of `unzip -l` more useful for quickly scanning the contents of a PEX. + with TRACER.timed("zipping up uncached sources", V=9): + # Reuse the file handle to zip into. This isn't necessary (we could close and + # reopen it), but it avoids unnecessarily flushing to disk. + self._chroot.zip( + pexfile, + mode="a", + # These files are the only ones that might have a nonzero timestamp. All + # entries copied from cache are assigned the zero timestamp, so their + # checksums won't depend on the state of the pex cache. + deterministic_timestamp=deterministic_timestamp, + compress=compress, + labels=self._DIRECT_SOURCE_LABELS, + ) + + # (3) Engage our hacky solution to reuse the same caches as for --layout packed. + # + # This reads from a single compressed file per cached resource instead of + # traversing any directory trees. If compress=True, this will also significantly + # reduce the total number of bytes to read and avoid a lot of single-threaded + # computation, because the compressed entries are simply copied over as-is. + # + # Note that these cached zips are created in the --layout packed format, without + # the .bootstrap/ or .deps/ prefixes we need to form a proper zipapp. Our zip + # file merging solution edits each entry's .filename with the appropriate + # prefix, but we will still need to generate intermediate directory entries + # before adding the prefixed files in order to unzip correctly. + + # Reuse the file handle again. The ZipFile class will re-parse the central directory + # records at the end of the file and then reset the cursor to the beginning of the + # central directory records. + with MergeableZipFile(pexfile, mode="a") as zf: + # (3.1) Add the single bootstrap dir. + with TRACER.timed("adding bootstrap dir", V=9): + # Generate ".bootstrap/". + zf.mkdir(pex_info.bootstrap) + cached_bootstrap_zip = self._get_or_insert_into_bootstrap_cache( + pex_info, bootstrap_hash, compress=compress + ) + with buffered_zip_archive(cached_bootstrap_zip) as bootstrap_zf: + zf.merge_archive(bootstrap_zf, name_prefix=pex_info.bootstrap) + + # (3.2) Add a subdirectory for each resolved dist. + with TRACER.timed("adding dependencies", V=9): + # Generate ".deps/". + zf.mkdir(pex_info.internal_cache) + + # Sort the dict keys for a deterministic output. This also ensures that the + # contents of the .deps/ subdirectory are lexicographically sorted, which + # corresponds to the order they would have been added to the zip without + # any caching. + for dist_label in sorted(pex_info.distributions.keys()): + fingerprint = pex_info.distributions[dist_label] + + dist_prefix = os.path.join(pex_info.internal_cache, dist_label) + # Generate e.g. ".deps/Keras-2.4.3-py2.py3-none-any.whl/". + zf.mkdir(dist_prefix) + + cached_packed_zip = self._get_or_insert_into_packed_wheel_cache( + pex_info, + dist_label, + fingerprint, + compress=compress, + ) + with buffered_zip_archive(cached_packed_zip) as packed_zf: + zf.merge_archive(packed_zf, name_prefix=dist_prefix) + chmod_plus_x(filename) diff --git a/pex/ziputils.py b/pex/ziputils.py index 7e2cd0b1c..d43b99382 100644 --- a/pex/ziputils.py +++ b/pex/ziputils.py @@ -5,13 +5,18 @@ import io import os +import re import shutil import struct +import subprocess +import zipfile +from pex.common import DETERMINISTIC_DATETIME, copy_file_range, safe_io_open from pex.typing import TYPE_CHECKING if TYPE_CHECKING: - from typing import BinaryIO, Optional + from io import BufferedIOBase + from typing import BinaryIO, Optional, Union import attr # vendor:skip else: @@ -242,3 +247,93 @@ def isolate_zip(self, out_fp): if self.has_header: in_fp.seek(self.header_size, os.SEEK_SET) shutil.copyfileobj(in_fp, out_fp) + + +def buffered_zip_archive(filename): + # type: (str) -> zipfile.ZipFile + """Return a ``zipfile.ZipFile`` instance backed by a buffered file handle. + + This is required by ``MergeableZipFile#merge_archive()`` to copy over exactly the right amount + of bytes (via ``pex.common.copy_file_range()``).""" + buffered_handle = safe_io_open(filename, mode="rb") + return zipfile.ZipFile(buffered_handle, mode="r") # type: ignore[arg-type] + + +# TODO: merge this with PermPreservingZipFile in pex.common? +class MergeableZipFile(zipfile.ZipFile): + """A zip file that can copy over the contents of other zips without decompression. + + This is used to synthesize --layout zipapp PEX files from other zip files in the pex cache.""" + + def __init__(self, *args, **kwargs): + kwargs.setdefault("allowZip64", True) + super(MergeableZipFile, self).__init__(*args, **kwargs) + + def mkdir(self, name, mode=511): + # type: (str, int) -> None + """Polyfill for ZipFile#mkdir() in < 3.11. + + Extracted from https://github.com/python/cpython/pull/32160.""" + # End in a single slash. + arcname = re.sub(r"/*$", "/", name) + # Unlike PermPreservingZipFile#zip_entry_from_file(), this should never correspond to a file + # on disk, as this class is used to synthesize zip files from cache and the created + # directories are also virtual. Giving it a non-zero timestamp would be misleading. + zinfo = zipfile.ZipInfo(filename=arcname, date_time=DETERMINISTIC_DATETIME.timetuple()[:6]) + zinfo.file_size = 0 + zinfo.external_attr = ((0o40000 | mode) & 0xFFFF) << 16 + zinfo.external_attr |= 0x10 + zinfo.compress_type = zipfile.ZIP_STORED + self.writestr(zinfo, b"") + + # This exists to placate mypy. + def __enter__(self): + # type: () -> MergeableZipFile + return self + + def merge_archive(self, source_zf, name_prefix=None): + # type: (zipfile.ZipFile, Optional[str]) -> None + """Copy entries from `source_path` to `destination` without decompressing. + + If provided, `name_prefix` will be applied to the names of file and directory entries. + + NB: This method is not aware of data descriptors, and will not copy over their contents!""" + assert self.fp is not None + assert isinstance(self.fp, io.BufferedIOBase) # type: ignore[unreachable] + assert source_zf.fp is not None # type: ignore[unreachable] + assert isinstance(source_zf.fp, io.BufferedIOBase) + + for zinfo in source_zf.infolist(): + # We will be mutating the ZipInfo and writing to the destination stream, so save this + # info upfront. + source_header_len = len(zinfo.FileHeader()) + source_offset = zinfo.header_offset + start_of_destination_entry = self.fp.tell() + + # (1) Modify the values which will affect the contents of the local file header for + # this entry. + if name_prefix: + zinfo.filename = os.path.join(name_prefix, zinfo.filename) + + # (2) Modify the values which will affect this entry's central directory header. + # The new entry will begin at the very end of the existing file. + zinfo.header_offset = start_of_destination_entry + + # (3) Generate the modified header, then copy over the header and contents. + # > (3.1) Copy over the header bytes verbatim into the output zip's underlying + # file handle. + self.fp.write(zinfo.FileHeader()) + + # > (3.2) Seek to the start of the source entry's file contents. + source_zf.fp.seek(source_offset + source_header_len, io.SEEK_SET) + # > (3.3) Copy over the file data verbatim from the source zip's underlying + # file handle. + copy_file_range(source_zf.fp, self.fp, zinfo.compress_size) + + # (4) Hack the synthesized ZipInfo onto the destination zip's infos. + self.filelist.append(zinfo) + self.NameToInfo[zinfo.filename] = zinfo + + # Update the central directory start position so the zipfile.ZipFile writes after all the + # data we just added. + self.start_dir = self.fp.tell()