blob: 01d9c6bf2e52f9beba1d8359b9bb8fc150e0e69e [file]
#!/usr/bin/env python3
# Copyright 2026 The Brotli Authors. All rights reserved.
#
# Distributed under MIT license.
# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
"""Regression suite for brotli CLI CopyStat behavior.
Usage:
python3 tests/regression/t01/copystat_regression_test.py /path/to/brotli
"""
import argparse
import contextlib
import hashlib
import os
from pathlib import Path
import shutil
import stat
import subprocess
import tempfile
import unittest
PLAIN_BYTES = b"A" * 65537
TARGET_BYTES = b"TARGET\n"
@contextlib.contextmanager
def umask(mask):
previous = os.umask(mask)
try:
yield
finally:
os.umask(previous)
def file_mode(path):
return stat.S_IMODE(os.stat(path).st_mode)
def sha256(path):
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
class CopyStatRegressionTest(unittest.TestCase):
brotli = None
def setUp(self):
self.tmpdir = tempfile.TemporaryDirectory()
self.addCleanup(self.tmpdir.cleanup)
self.workdir = Path(self.tmpdir.name)
def write_plain(self, path):
path.write_bytes(PLAIN_BYTES)
def write_target(self, path):
path.write_bytes(TARGET_BYTES)
def run_brotli(self, *args, input_bytes=None, env=None, check=True):
proc = subprocess.run(
[str(self.brotli)] + [str(arg) for arg in args],
input=input_bytes,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
check=False)
if check and proc.returncode != 0:
self.fail(
"brotli exited with %d\nstdout:\n%s\nstderr:\n%s" % (
proc.returncode,
proc.stdout.decode("utf-8", "replace"),
proc.stderr.decode("utf-8", "replace")))
return proc
def compress(self, src, dst, no_copy_stat=False):
args = ["-f", "-k"]
if no_copy_stat:
args.append("-n")
args.extend([src, "-o", dst])
return self.run_brotli(*args)
def decompress(self, src, dst, no_copy_stat=False):
args = ["-d", "-f"]
if no_copy_stat:
args.append("-n")
args.extend([src, "-o", dst])
return self.run_brotli(*args)
def test_modes_propagate_on_compress_and_decompress(self):
modes = [0o600, 0o640, 0o644, 0o664, 0o660,
0o400, 0o444, 0o755, 0o700, 0o604]
with umask(0o022):
for mode in modes:
with self.subTest(mode="%03o" % mode):
case_dir = self.workdir / ("%03o" % mode)
case_dir.mkdir()
src = case_dir / "in.bin"
compressed = case_dir / "in.bin.br"
decoded = case_dir / "out.bin"
self.write_plain(src)
os.chmod(src, mode)
self.compress(src, compressed)
self.decompress(compressed, decoded)
self.assertEqual(file_mode(compressed), mode)
self.assertEqual(file_mode(decoded), mode)
self.assertEqual(decoded.read_bytes(), PLAIN_BYTES)
def test_special_mode_bits_are_masked(self):
with umask(0o022):
src = self.workdir / "in.bin"
compressed = self.workdir / "in.bin.br"
self.write_plain(src)
os.chmod(src, 0o3644)
actual_input_mode = file_mode(src)
if actual_input_mode == 0o644:
self.skipTest("filesystem stripped special mode bits from input")
self.compress(src, compressed)
compressed_mode = file_mode(compressed)
self.assertEqual(compressed_mode & 0o7000, 0)
self.assertEqual(compressed_mode & 0o777, actual_input_mode & 0o777)
def test_no_copy_stat_flag_suppresses_mode_copy(self):
with umask(0o022):
src = self.workdir / "in.bin"
without_copy = self.workdir / "without_copy.br"
with_copy = self.workdir / "with_copy.br"
self.write_plain(src)
os.chmod(src, 0o606)
self.compress(src, without_copy, no_copy_stat=True)
self.compress(src, with_copy)
self.assertNotEqual(file_mode(without_copy), 0o606)
self.assertEqual(file_mode(with_copy), 0o606)
def test_timestamp_is_copied(self):
with umask(0o022):
src = self.workdir / "in.bin"
compressed = self.workdir / "in.bin.br"
decoded = self.workdir / "out.bin"
self.write_plain(src)
expected_mtime = 1577934245
os.utime(src, (expected_mtime, expected_mtime))
self.compress(src, compressed)
self.decompress(compressed, decoded)
self.assertEqual(int(os.stat(compressed).st_mtime), expected_mtime)
self.assertEqual(int(os.stat(decoded).st_mtime), expected_mtime)
def test_stdin_input_has_no_path_to_copy_from(self):
payload = b"stdin -> compressed file\n"
compressed = self.run_brotli("-c", input_bytes=payload).stdout
decoded = self.workdir / "decoded.txt"
self.run_brotli("-d", "-o", decoded, input_bytes=compressed)
self.assertEqual(decoded.read_bytes(), payload)
self.assertEqual(file_mode(decoded) & ~0o666, 0)
def test_stdout_path_skips_copystat(self):
payload = b"hello brotli stdout pipeline\n"
compressed = self.run_brotli("-c", input_bytes=payload).stdout
decoded = self.run_brotli("-d", "-c", input_bytes=compressed).stdout
self.assertEqual(decoded, payload)
def test_content_roundtrip(self):
cases = {
"small-ascii": b"hello brotli\n",
"empty": b"",
"plain-65537": PLAIN_BYTES,
"aligned-65536": b"\0" * 65536,
"deterministic-binary": bytes((i * 73 + 41) % 256
for i in range(12288)),
}
with umask(0o022):
for label, data in cases.items():
with self.subTest(label=label):
case_dir = self.workdir / label
case_dir.mkdir()
src = case_dir / "in.bin"
compressed = case_dir / "in.bin.br"
decoded = case_dir / "out.bin"
src.write_bytes(data)
os.chmod(src, 0o644)
self.compress(src, compressed)
if data:
self.assertGreater(compressed.stat().st_size, 0)
self.decompress(compressed, decoded)
self.assertEqual(decoded.read_bytes(), data)
def test_fclose_swap_does_not_redirect_copystat(self):
compressed, out_path, target_path = self.prepare_attack_files()
pre_hash = sha256(target_path)
self.run_brotli_with_swap(compressed, out_path, target_path)
self.assertTrue(out_path.is_symlink())
self.assertEqual(file_mode(target_path), 0o600)
self.assertEqual(sha256(target_path), pre_hash)
def test_fclose_swap_with_no_copy_stat_leaves_target_unchanged(self):
compressed, out_path, target_path = self.prepare_attack_files()
pre_hash = sha256(target_path)
self.run_brotli_with_swap(compressed, out_path, target_path,
no_copy_stat=True)
self.assertTrue(out_path.is_symlink())
self.assertEqual(file_mode(target_path), 0o600)
self.assertEqual(sha256(target_path), pre_hash)
def prepare_attack_files(self):
src = self.workdir / "plain.bin"
compressed = self.workdir / "plain.bin.br"
out_path = self.workdir / "out"
target_path = self.workdir / "target.txt"
self.write_plain(src)
self.write_target(target_path)
os.chmod(src, 0o644)
os.chmod(target_path, 0o600)
self.compress(src, compressed)
os.chmod(compressed, 0o644)
return compressed, out_path, target_path
def run_brotli_with_swap(self, compressed, out_path, target_path,
no_copy_stat=False):
env = dict(os.environ)
env["BROTLI_SWAP_OUTPUT_ABS"] = str(out_path)
env["BROTLI_SWAP_TARGET_ABS"] = str(target_path)
env["LD_PRELOAD"] = self.ld_preload_value()
args = ["-d", "-f"]
if no_copy_stat:
args.append("-n")
args.extend(["-o", out_path, compressed])
proc = self.run_brotli(*args, env=env, check=False)
if proc.returncode != 0:
self.fail(
"brotli with fclose_swap exited with %d\nstdout:\n%s\nstderr:\n%s" % (
proc.returncode,
proc.stdout.decode("utf-8", "replace"),
proc.stderr.decode("utf-8", "replace")))
def ld_preload_value(self):
preloads = []
asan_runtime = self.asan_runtime()
if asan_runtime:
preloads.append(asan_runtime)
preloads.append(str(self.swap_helper()))
existing = os.environ.get("LD_PRELOAD")
if existing:
preloads.append(existing)
return ":".join(preloads)
def swap_helper(self):
if not Path("/proc/self/fd").is_dir():
self.skipTest("fclose_swap helper requires /proc/self/fd")
cc = shutil.which("cc")
if cc is None:
self.skipTest("cc is required to build fclose_swap.so")
helper_src = Path(__file__).with_name("fclose_swap.c")
helper_out = self.workdir / "libfclose_swap.so"
proc = subprocess.run(
[cc, "-shared", "-fPIC", str(helper_src), "-o", str(helper_out),
"-ldl"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False)
if proc.returncode != 0:
self.fail(
"failed to build fclose_swap.so\nstdout:\n%s\nstderr:\n%s" % (
proc.stdout.decode("utf-8", "replace"),
proc.stderr.decode("utf-8", "replace")))
return helper_out
def asan_runtime(self):
try:
proc = subprocess.run(["ldd", str(self.brotli)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False)
except FileNotFoundError:
return None
if b"libasan" not in proc.stdout + proc.stderr:
return None
cc = shutil.which("cc")
if cc is None:
return None
proc = subprocess.run([cc, "-print-file-name=libasan.so"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False)
candidate = proc.stdout.strip()
if candidate and Path(candidate).is_file():
return candidate
return None
def main():
parser = argparse.ArgumentParser()
parser.add_argument("brotli", help="path to the brotli CLI binary to test")
parser.add_argument("-v", "--verbose", action="store_true")
args = parser.parse_args()
brotli = Path(args.brotli).resolve()
if not brotli.is_file():
parser.error("%s is not a file" % brotli)
CopyStatRegressionTest.brotli = brotli
unittest.main(argv=[__file__], verbosity=2 if args.verbose else 1)
if __name__ == "__main__":
main()