"""zstd streaming compression + 7z extraction. Ported from: JoshSteam CDN/Utils/compression.py No behavior changes; just type hints and path-objects. """ from __future__ import annotations import os import tarfile from pathlib import Path from typing import BinaryIO import py7zr import zstandard as zstd def generate_tar_zstd_stream(depot_path: Path | str, output_stream: BinaryIO) -> None: """Stream tarball of `depot_path` contents into an output binary stream.""" depot_path = Path(depot_path) with tarfile.open(fileobj=output_stream, mode="w|") as tar: for root, _, files in os.walk(depot_path): for file in files: file_path = Path(root) / file relative_path = file_path.relative_to(depot_path) tar.add(file_path, arcname=str(relative_path)) def compress_and_save_zstd( depot_path: Path | str, temp_zstd_path: Path | str, final_zstd_path: Path | str ) -> None: """Tar+zstd `depot_path` to a temp file, then atomically rename to final. No intermediate uncompressed .tar is created on disk. """ temp_zstd_path = Path(temp_zstd_path) final_zstd_path = Path(final_zstd_path) temp_zstd_path.parent.mkdir(parents=True, exist_ok=True) with open(temp_zstd_path, "wb") as out_file: cctx = zstd.ZstdCompressor(level=3) with cctx.stream_writer(out_file) as compressor: generate_tar_zstd_stream(depot_path, compressor) os.replace(temp_zstd_path, final_zstd_path) def extract_7z(archive_path: Path | str, destination_path: Path | str) -> None: """Extract a .7z archive into `destination_path`.""" archive_path = Path(archive_path) destination_path = Path(destination_path) if not archive_path.exists(): raise FileNotFoundError(f"The archive {archive_path} does not exist.") destination_path.mkdir(parents=True, exist_ok=True) with py7zr.SevenZipFile(archive_path, mode="r") as archive: archive.extractall(path=destination_path)