Modernize tests. Avoid file IO. Drive-by: drop bro.py and bro_test.py; we do not support it well and likely no one uses it. PiperOrigin-RevId: 834206605
diff --git a/.github/workflows/build_test.yml b/.github/workflows/build_test.yml index 0e45a94..29f02fb 100644 --- a/.github/workflows/build_test.yml +++ b/.github/workflows/build_test.yml
@@ -137,7 +137,6 @@ build_system: python python_version: "3.10" # TODO: investigate why win-builds can't run tests - py_setuptools_cmd: build_ext os: windows-2022 - name: maven @@ -333,8 +332,9 @@ run: | python -VV python -c "import sys; sys.exit('Invalid python version') if '.'.join(map(str,sys.version_info[0:2])) != '${{ matrix.python_version }}' else True" - pip install setuptools==51.3.3 - python setup.py ${{ matrix.py_setuptools_cmd || 'test'}} + pip install setuptools==51.3.3 pytest + python setup.py build_ext --inplace + pytest ./python/tests build_test_dotnet: name: Build and test with .NET
diff --git a/MANIFEST.in b/MANIFEST.in index ff8d600..9843392 100644 --- a/MANIFEST.in +++ b/MANIFEST.in
@@ -9,7 +9,6 @@ include LICENSE include MANIFEST.in include python/_brotli.cc -include python/bro.py include python/brotli.py include python/README.md include python/tests/*
diff --git a/python/README.md b/python/README.md index 5ea135b..750e0ac 100644 --- a/python/README.md +++ b/python/README.md
@@ -1,7 +1,7 @@ This directory contains the code for the Python `brotli` module, -`bro.py` tool, and roundtrip tests. +and roundtrip tests. -Only Python 2.7+ is supported. +Only Python 3.10+ is supported. We provide a `Makefile` to simplify common development commands. @@ -17,13 +17,17 @@ $ make install -If you already have native Brotli installed on your system and want to use this one instead of the vendored sources, you -should set the `USE_SYSTEM_BROTLI=1` environment variable when building the wheel, like this: +If you already have native Brotli installed on your system and want to use +this one instead of the vendored sources, you should set +the `USE_SYSTEM_BROTLI=1` environment variable when building the wheel, +like this: $ USE_SYSTEM_BROTLI=1 pip install brotli --no-binary brotli -Brotli is found via the `pkg-config` utility. Moreover, you must build all 3 `brotlicommon`, `brotlienc`, and `brotlidec` -components. If you're installing brotli from the package manager, you need the development package, like this on Fedora: +Brotli is found via the `pkg-config` utility. Moreover, you must build +all 3 `brotlicommon`, `brotlienc`, and `brotlidec` components. If you're +installing brotli from the package manager, you need the development package, +like this on Fedora: $ dnf install brotli brotli-devel @@ -45,8 +49,8 @@ ### Code Style -Brotli's code follows the [Google Python Style Guide][]. To -automatically format your code, first install [YAPF][]: +Brotli code follows the [Google Python Style Guide][]. +To automatically format your code, first install [YAPF][]: $ pip install yapf @@ -56,7 +60,6 @@ See the [YAPF usage][] documentation for more information. - [PyPI]: https://pypi.org/project/Brotli/ [development mode]: https://setuptools.readthedocs.io/en/latest/setuptools.html#development-mode [Google Python Style Guide]: https://google.github.io/styleguide/pyguide.html
diff --git a/python/bro.py b/python/bro.py deleted file mode 100755 index a624d3f..0000000 --- a/python/bro.py +++ /dev/null
@@ -1,194 +0,0 @@ -#! /usr/bin/env python -"""Compression/decompression utility using the Brotli algorithm.""" - -# Note: Python2 has been deprecated long ago, but some projects out in -# the wide world may still use it nevertheless. This should not -# deprive them from being able to run Brotli. -from __future__ import print_function - -import argparse -import os -import platform -import sys - -import brotli - - -# default values of encoder parameters -_DEFAULT_PARAMS = { - 'mode': brotli.MODE_GENERIC, - 'quality': 11, - 'lgwin': 22, - 'lgblock': 0, -} - - -def get_binary_stdio(stream): - """Return the specified stdin/stdout/stderr stream. - - If the stdio stream requested (i.e. sys.(stdin|stdout|stderr)) - has been replaced with a stream object that does not have a `.buffer` - attribute, this will return the original stdio stream's buffer, i.e. - `sys.__(stdin|stdout|stderr)__.buffer`. - - Args: - stream: One of 'stdin', 'stdout', 'stderr'. - - Returns: - The stream, as a 'raw' buffer object (i.e. io.BufferedIOBase subclass - instance such as io.Bufferedreader/io.BufferedWriter), suitable for - reading/writing binary data from/to it. - """ - if stream == 'stdin': stdio = sys.stdin - elif stream == 'stdout': stdio = sys.stdout - elif stream == 'stderr': stdio = sys.stderr - else: - raise ValueError('invalid stream name: %s' % (stream,)) - if sys.version_info[0] < 3: - if sys.platform == 'win32': - # set I/O stream binary flag on python2.x (Windows) - runtime = platform.python_implementation() - if runtime == 'PyPy': - # the msvcrt trick doesn't work in pypy, so use fdopen(). - mode = 'rb' if stream == 'stdin' else 'wb' - stdio = os.fdopen(stdio.fileno(), mode, 0) - else: - # this works with CPython -- untested on other implementations - import msvcrt - msvcrt.setmode(stdio.fileno(), os.O_BINARY) - return stdio - else: - try: - return stdio.buffer - except AttributeError: - # The Python reference explains - # (-> https://docs.python.org/3/library/sys.html#sys.stdin) - # that the `.buffer` attribute might not exist, since - # the standard streams might have been replaced by something else - # (such as an `io.StringIO()` - perhaps via - # `contextlib.redirect_stdout()`). - # We fall back to the original stdio in these cases. - if stream == 'stdin': return sys.__stdin__.buffer - if stream == 'stdout': return sys.__stdout__.buffer - if stream == 'stderr': return sys.__stderr__.buffer - assert False, 'Impossible Situation.' - - -def main(args=None): - - parser = argparse.ArgumentParser( - prog=os.path.basename(__file__), description=__doc__) - parser.add_argument( - '--version', action='version', version=brotli.version) - parser.add_argument( - '-i', - '--input', - metavar='FILE', - type=str, - dest='infile', - help='Input file', - default=None) - parser.add_argument( - '-o', - '--output', - metavar='FILE', - type=str, - dest='outfile', - help='Output file', - default=None) - parser.add_argument( - '-f', - '--force', - action='store_true', - help='Overwrite existing output file', - default=False) - parser.add_argument( - '-d', - '--decompress', - action='store_true', - help='Decompress input file', - default=False) - params = parser.add_argument_group('optional encoder parameters') - params.add_argument( - '-m', - '--mode', - metavar='MODE', - type=int, - choices=[0, 1, 2], - help='The compression mode can be 0 for generic input, ' - '1 for UTF-8 encoded text, or 2 for WOFF 2.0 font data. ' - 'Defaults to 0.') - params.add_argument( - '-q', - '--quality', - metavar='QUALITY', - type=int, - choices=list(range(0, 12)), - help='Controls the compression-speed vs compression-density ' - 'tradeoff. The higher the quality, the slower the ' - 'compression. Range is 0 to 11. Defaults to 11.') - params.add_argument( - '--lgwin', - metavar='LGWIN', - type=int, - choices=list(range(10, 25)), - help='Base 2 logarithm of the sliding window size. Range is ' - '10 to 24. Defaults to 22.') - params.add_argument( - '--lgblock', - metavar='LGBLOCK', - type=int, - choices=[0] + list(range(16, 25)), - help='Base 2 logarithm of the maximum input block size. ' - 'Range is 16 to 24. If set to 0, the value will be set based ' - 'on the quality. Defaults to 0.') - # set default values using global _DEFAULT_PARAMS dictionary - parser.set_defaults(**_DEFAULT_PARAMS) - - options = parser.parse_args(args=args) - - if options.infile: - try: - with open(options.infile, 'rb') as infile: - data = infile.read() - except OSError: - parser.error('Could not read --infile: %s' % (infile,)) - else: - if sys.stdin.isatty(): - # interactive console, just quit - parser.error('No input (called from interactive terminal).') - infile = get_binary_stdio('stdin') - data = infile.read() - - if options.outfile: - # Caution! If `options.outfile` is a broken symlink, will try to - # redirect the write according to symlink. - if os.path.exists(options.outfile) and not options.force: - parser.error(('Target --outfile=%s already exists, ' - 'but --force was not requested.') % (options.outfile,)) - outfile = open(options.outfile, 'wb') - did_open_outfile = True - else: - outfile = get_binary_stdio('stdout') - did_open_outfile = False - try: - try: - if options.decompress: - data = brotli.decompress(data) - else: - data = brotli.compress( - data, - mode=options.mode, - quality=options.quality, - lgwin=options.lgwin, - lgblock=options.lgblock) - outfile.write(data) - finally: - if did_open_outfile: outfile.close() - except brotli.error as e: - parser.exit(1, - 'bro: error: %s: %s' % (e, options.infile or '{stdin}')) - - -if __name__ == '__main__': - main()
diff --git a/python/tests/_test_utils.py b/python/tests/_test_utils.py index c451968..d94ebd0 100644 --- a/python/tests/_test_utils.py +++ b/python/tests/_test_utils.py
@@ -1,27 +1,21 @@ """Common utilities for Brotli tests.""" from __future__ import print_function -import filecmp import glob -import itertools import os import pathlib import sys import sysconfig -import tempfile -import unittest project_dir = str(pathlib.PurePath(__file__).parent.parent.parent) +runtime_dir = os.getenv('TEST_SRCDIR') test_dir = os.getenv('BROTLI_TESTS_PATH') -BRO_ARGS = [os.getenv('BROTLI_WRAPPER')] # Fallbacks -if test_dir is None: +if test_dir and runtime_dir: + test_dir = os.path.join(runtime_dir, test_dir) +elif test_dir is None: test_dir = os.path.join(project_dir, 'tests') -if BRO_ARGS[0] is None: - python_exe = sys.executable or 'python' - bro_path = os.path.join(project_dir, 'python', 'bro.py') - BRO_ARGS = [python_exe, bro_path] # Get the platform/version-specific build folder. # By default, the distutils build base is in the same location as setup.py. @@ -41,113 +35,49 @@ TESTDATA_DIR = os.path.join(test_dir, 'testdata') -TESTDATA_FILES = [ - 'empty', # Empty file - '10x10y', # Small text - 'alice29.txt', # Large text - 'random_org_10k.bin', # Small data - 'mapsdatazrh', # Large data - 'ukkonooa', # Poem - 'cp1251-utf16le', # Codepage 1251 table saved in UTF16-LE encoding - 'cp852-utf8', # Codepage 852 table saved in UTF8 encoding - # TODO(eustas): add test on already compressed content -] -# Some files might be missing in a lightweight sources pack. -TESTDATA_PATH_CANDIDATES = [ - os.path.join(TESTDATA_DIR, f) for f in TESTDATA_FILES -] - -TESTDATA_PATHS = [ - path for path in TESTDATA_PATH_CANDIDATES if os.path.isfile(path) -] - -TESTDATA_PATHS_FOR_DECOMPRESSION = glob.glob( - os.path.join(TESTDATA_DIR, '*.compressed') -) - -TEMP_DIR = tempfile.mkdtemp() +def gather_text_inputs(): + """Discover inputs for decompression tests.""" + all_inputs = [ + 'empty', # Empty file + '10x10y', # Small text + 'alice29.txt', # Large text + 'random_org_10k.bin', # Small data + 'mapsdatazrh', # Large data + 'ukkonooa', # Poem + 'cp1251-utf16le', # Codepage 1251 table saved in UTF16-LE encoding + 'cp852-utf8', # Codepage 852 table saved in UTF8 encoding + # TODO(eustas): add test on already compressed content + ] + # Filter out non-existing files; e.g. in lightweight sources pack. + return [ + f for f in all_inputs if os.path.isfile(os.path.join(TESTDATA_DIR, f)) + ] -def get_temp_compressed_name(filename): - return os.path.join(TEMP_DIR, os.path.basename(filename + '.bro')) +def gather_compressed_inputs(): + """Discover inputs for compression tests.""" + candidates = glob.glob(os.path.join(TESTDATA_DIR, '*.compressed')) + pairs = [(f, f.split('.compressed')[0]) for f in candidates] + existing = [ + pair + for pair in pairs + if os.path.isfile(pair[0]) and os.path.isfile(pair[1]) + ] + return [ + (os.path.basename(pair[0]), (os.path.basename(pair[1]))) + for pair in existing + ] -def get_temp_uncompressed_name(filename): - return os.path.join(TEMP_DIR, os.path.basename(filename + '.unbro')) +def take_input(input_name): + with open(os.path.join(TESTDATA_DIR, input_name), 'rb') as f: + return f.read() -def bind_method_args(method, *args, **kwargs): - return lambda self: method(self, *args, **kwargs) +def has_input(input_name): + return os.path.isfile(os.path.join(TESTDATA_DIR, input_name)) -# TODO(eustas): migrate to absl.testing.parameterized. -def generate_test_methods( - test_case_class, for_decompression=False, variants=None -): - """Adds test methods for each test data file and each variant. - - This makes identifying problems with specific compression scenarios easier. - - Args: - test_case_class: The test class to add methods to. - for_decompression: If True, uses compressed test data files. - variants: A dictionary where keys are option names and values are lists of - possible values for that option. Each combination of variants will - generate a separate test method. - """ - if for_decompression: - paths = [ - path for path in TESTDATA_PATHS_FOR_DECOMPRESSION - if os.path.exists(path.replace('.compressed', '')) - ] - else: - paths = TESTDATA_PATHS - opts = [] - if variants: - opts_list = [] - for k, v in variants.items(): - opts_list.append([r for r in itertools.product([k], v)]) - for o in itertools.product(*opts_list): - opts_name = '_'.join([str(i) for i in itertools.chain(*o)]) - opts_dict = dict(o) - opts.append([opts_name, opts_dict]) - else: - opts.append(['', {}]) - for method in [m for m in dir(test_case_class) if m.startswith('_test')]: - for testdata in paths: - for opts_name, opts_dict in opts: - f = os.path.splitext(os.path.basename(testdata))[0] - name = 'test_{method}_{options}_{file}'.format( - method=method, options=opts_name, file=f - ) - func = bind_method_args( - getattr(test_case_class, method), testdata, **opts_dict - ) - setattr(test_case_class, name, func) - - -class TestCase(unittest.TestCase): - """Base class for Brotli test cases. - - Provides common setup and teardown logic, including cleaning up temporary - files and a utility for comparing file contents. - """ - - def tearDown(self): - for f in TESTDATA_PATHS: - try: - os.unlink(get_temp_compressed_name(f)) - except OSError: - pass - try: - os.unlink(get_temp_uncompressed_name(f)) - except OSError: - pass - # super().tearDown() # Requires Py3+ - - def assert_files_match(self, first, second): - self.assertTrue( - filecmp.cmp(first, second, shallow=False), - 'File {} differs from {}'.format(first, second), - ) +def chunk_input(data, chunk_size): + return [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)]
diff --git a/python/tests/bro_test.py b/python/tests/bro_test.py deleted file mode 100644 index fa056ed..0000000 --- a/python/tests/bro_test.py +++ /dev/null
@@ -1,104 +0,0 @@ -# Copyright 2016 The Brotli Authors. All rights reserved. -# -# Distributed under MIT license. -# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT - -import subprocess -import unittest - -from . import _test_utils - -BRO_ARGS = _test_utils.BRO_ARGS -TEST_ENV = _test_utils.TEST_ENV - - -def _get_original_name(test_data): - return test_data.split('.compressed')[0] - - -class TestBroDecompress(_test_utils.TestCase): - - def _check_decompression(self, test_data): - # Verify decompression matches the original. - temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) - original = _get_original_name(test_data) - self.assert_files_match(temp_uncompressed, original) - - def _decompress_file(self, test_data): - temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) - args = BRO_ARGS + ['-f', '-d', '-i', test_data, '-o', temp_uncompressed] - subprocess.check_call(args, env=TEST_ENV) - - def _decompress_pipe(self, test_data): - temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) - args = BRO_ARGS + ['-d'] - with open(temp_uncompressed, 'wb') as out_file: - with open(test_data, 'rb') as in_file: - subprocess.check_call( - args, stdin=in_file, stdout=out_file, env=TEST_ENV - ) - - def _test_decompress_file(self, test_data): - self._decompress_file(test_data) - self._check_decompression(test_data) - - def _test_decompress_pipe(self, test_data): - self._decompress_pipe(test_data) - self._check_decompression(test_data) - - -_test_utils.generate_test_methods(TestBroDecompress, for_decompression=True) - - -class TestBroCompress(_test_utils.TestCase): - - VARIANTS = {'quality': (1, 6, 9, 11), 'lgwin': (10, 15, 20, 24)} - - def _check_decompression(self, test_data): - # Write decompression to temp file and verify it matches the original. - temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) - temp_compressed = _test_utils.get_temp_compressed_name(test_data) - original = test_data - args = BRO_ARGS + ['-f', '-d'] - args.extend(['-i', temp_compressed, '-o', temp_uncompressed]) - subprocess.check_call(args, env=TEST_ENV) - self.assert_files_match(temp_uncompressed, original) - - def _compress_file(self, test_data, **kwargs): - temp_compressed = _test_utils.get_temp_compressed_name(test_data) - args = BRO_ARGS + ['-f'] - if 'quality' in kwargs: - args.extend(['-q', str(kwargs['quality'])]) - if 'lgwin' in kwargs: - args.extend(['--lgwin', str(kwargs['lgwin'])]) - args.extend(['-i', test_data, '-o', temp_compressed]) - subprocess.check_call(args, env=TEST_ENV) - - def _compress_pipe(self, test_data, **kwargs): - temp_compressed = _test_utils.get_temp_compressed_name(test_data) - args = BRO_ARGS - if 'quality' in kwargs: - args.extend(['-q', str(kwargs['quality'])]) - if 'lgwin' in kwargs: - args.extend(['--lgwin', str(kwargs['lgwin'])]) - with open(temp_compressed, 'wb') as out_file: - with open(test_data, 'rb') as in_file: - subprocess.check_call( - args, stdin=in_file, stdout=out_file, env=TEST_ENV - ) - - def _test_compress_file(self, test_data, **kwargs): - self._compress_file(test_data, **kwargs) - self._check_decompression(test_data) - - def _test_compress_pipe(self, test_data, **kwargs): - self._compress_pipe(test_data, **kwargs) - self._check_decompression(test_data) - - -_test_utils.generate_test_methods( - TestBroCompress, variants=TestBroCompress.VARIANTS -) - -if __name__ == '__main__': - unittest.main()
diff --git a/python/tests/compress_test.py b/python/tests/compress_test.py index 7ad9d46..b3297c2 100644 --- a/python/tests/compress_test.py +++ b/python/tests/compress_test.py
@@ -3,39 +3,17 @@ # Distributed under MIT license. # See file LICENSE for detail or copy at https://opensource.org/licenses/MIT -import unittest - import brotli +import pytest from . import _test_utils -class TestCompress(_test_utils.TestCase): - - VARIANTS = {'quality': (1, 6, 9, 11), 'lgwin': (10, 15, 20, 24)} - - def _check_decompression(self, test_data): - # Write decompression to temp file and verify it matches the original. - temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) - temp_compressed = _test_utils.get_temp_compressed_name(test_data) - original = test_data - with open(temp_uncompressed, 'wb') as out_file: - with open(temp_compressed, 'rb') as in_file: - out_file.write(brotli.decompress(in_file.read())) - self.assert_files_match(temp_uncompressed, original) - - def _compress(self, test_data, **kwargs): - temp_compressed = _test_utils.get_temp_compressed_name(test_data) - with open(temp_compressed, 'wb') as out_file: - with open(test_data, 'rb') as in_file: - out_file.write(brotli.compress(in_file.read(), **kwargs)) - - def _test_compress(self, test_data, **kwargs): - self._compress(test_data, **kwargs) - self._check_decompression(test_data) - - -_test_utils.generate_test_methods(TestCompress, variants=TestCompress.VARIANTS) - -if __name__ == '__main__': - unittest.main() +@pytest.mark.parametrize("quality", [1, 6, 9, 11]) +@pytest.mark.parametrize("lgwin", [10, 15, 20, 24]) +@pytest.mark.parametrize("text_name", _test_utils.gather_text_inputs()) +def test_compress(quality, lgwin, text_name): + original = _test_utils.take_input(text_name) + compressed = brotli.compress(original, quality=quality, lgwin=lgwin) + decompressed = brotli.decompress(compressed) + assert original == decompressed
diff --git a/python/tests/compressor_test.py b/python/tests/compressor_test.py index 2c08f8f..dd8fd62 100644 --- a/python/tests/compressor_test.py +++ b/python/tests/compressor_test.py
@@ -3,98 +3,49 @@ # Distributed under MIT license. # See file LICENSE for detail or copy at https://opensource.org/licenses/MIT -import functools -import unittest - import brotli +import pytest from . import _test_utils -# Do not inherit from TestCase here to ensure that test methods -# are not run automatically and instead are run as part of a specific -# configuration below. -class _TestCompressor(object): - - CHUNK_SIZE = 2048 - - def tearDown(self): - self.compressor = None - super().tearDown() - - def _check_decompression(self, test_data): - # Write decompression to temp file and verify it matches the original. - temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) - temp_compressed = _test_utils.get_temp_compressed_name(test_data) - original = test_data - with open(temp_uncompressed, 'wb') as out_file: - with open(temp_compressed, 'rb') as in_file: - out_file.write(brotli.decompress(in_file.read())) - self.assert_files_match(temp_uncompressed, original) - - def _test_single_process(self, test_data): - # Write single-shot compression to temp file. - temp_compressed = _test_utils.get_temp_compressed_name(test_data) - with open(temp_compressed, 'wb') as out_file: - with open(test_data, 'rb') as in_file: - out_file.write(self.compressor.process(in_file.read())) - out_file.write(self.compressor.finish()) - self._check_decompression(test_data) - - def _test_multiple_process(self, test_data): - # Write chunked compression to temp file. - temp_compressed = _test_utils.get_temp_compressed_name(test_data) - with open(temp_compressed, 'wb') as out_file: - with open(test_data, 'rb') as in_file: - read_chunk = functools.partial(in_file.read, self.CHUNK_SIZE) - for data in iter(read_chunk, b''): - out_file.write(self.compressor.process(data)) - out_file.write(self.compressor.finish()) - self._check_decompression(test_data) - - def _test_multiple_process_and_flush(self, test_data): - # Write chunked and flushed compression to temp file. - temp_compressed = _test_utils.get_temp_compressed_name(test_data) - with open(temp_compressed, 'wb') as out_file: - with open(test_data, 'rb') as in_file: - read_chunk = functools.partial(in_file.read, self.CHUNK_SIZE) - for data in iter(read_chunk, b''): - out_file.write(self.compressor.process(data)) - out_file.write(self.compressor.flush()) - out_file.write(self.compressor.finish()) - self._check_decompression(test_data) +@pytest.mark.parametrize("quality", [1, 6, 9, 11]) +@pytest.mark.parametrize("text_name", _test_utils.gather_text_inputs()) +def test_single_process(quality, text_name): + original = _test_utils.take_input(text_name) + compressor = brotli.Compressor(quality=quality) + compressed = compressor.process(original) + compressed += compressor.finish() + decompressed = brotli.decompress(compressed) + assert original == decompressed -_test_utils.generate_test_methods(_TestCompressor) +@pytest.mark.parametrize("quality", [1, 6, 9, 11]) +@pytest.mark.parametrize("text_name", _test_utils.gather_text_inputs()) +def test_multiple_process(quality, text_name): + original = _test_utils.take_input(text_name) + chunk_size = 2048 + chunks = _test_utils.chunk_input(original, chunk_size) + compressor = brotli.Compressor(quality=quality) + compressed = b'' + for chunk in chunks: + compressed += compressor.process(chunk) + compressed += compressor.finish() + decompressed = brotli.decompress(compressed) + assert original == decompressed -class TestCompressorQuality1(_TestCompressor, _test_utils.TestCase): - - def setUp(self): - super().setUp() - self.compressor = brotli.Compressor(quality=1) - - -class TestCompressorQuality6(_TestCompressor, _test_utils.TestCase): - - def setUp(self): - super().setUp() - self.compressor = brotli.Compressor(quality=6) - - -class TestCompressorQuality9(_TestCompressor, _test_utils.TestCase): - - def setUp(self): - super().setUp() - self.compressor = brotli.Compressor(quality=9) - - -class TestCompressorQuality11(_TestCompressor, _test_utils.TestCase): - - def setUp(self): - super().setUp() - self.compressor = brotli.Compressor(quality=11) - - -if __name__ == '__main__': - unittest.main() +@pytest.mark.parametrize("quality", [1, 6, 9, 11]) +@pytest.mark.parametrize("text_name", _test_utils.gather_text_inputs()) +def test_multiple_process_and_flush(quality, text_name): + original = _test_utils.take_input(text_name) + chunk_size = 2048 + chunks = _test_utils.chunk_input(original, chunk_size) + compressor = brotli.Compressor(quality=quality) + compressed = b'' + for chunk in chunks: + compressed += compressor.process(chunk) + compressed += compressor.flush() + compressed += compressor.finish() + decompressed = brotli.decompress(compressed) + assert original == decompressed
diff --git a/python/tests/decompress_test.py b/python/tests/decompress_test.py index 9289aa7..e59c96d 100644 --- a/python/tests/decompress_test.py +++ b/python/tests/decompress_test.py
@@ -3,41 +3,22 @@ # Distributed under MIT license. # See file LICENSE for detail or copy at https://opensource.org/licenses/MIT -import unittest - import brotli +import pytest from . import _test_utils -def _get_original_name(test_data): - return test_data.split('.compressed')[0] +@pytest.mark.parametrize( + 'compressed_name, original_name', _test_utils.gather_compressed_inputs() +) +def test_decompress(compressed_name, original_name): + compressed = _test_utils.take_input(compressed_name) + original = _test_utils.take_input(original_name) + decompressed = brotli.decompress(compressed) + assert decompressed == original -class TestDecompress(_test_utils.TestCase): - - def _check_decompression(self, test_data): - # Verify decompression matches the original. - temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) - original = _get_original_name(test_data) - self.assert_files_match(temp_uncompressed, original) - - def _decompress(self, test_data): - temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) - with open(temp_uncompressed, 'wb') as out_file: - with open(test_data, 'rb') as in_file: - out_file.write(brotli.decompress(in_file.read())) - - def _test_decompress(self, test_data): - self._decompress(test_data) - self._check_decompression(test_data) - - def test_garbage_appended(self): - with self.assertRaises(brotli.error): - brotli.decompress(brotli.compress(b'a') + b'a') - - -_test_utils.generate_test_methods(TestDecompress, for_decompression=True) - -if __name__ == '__main__': - unittest.main() +def test_garbage_appended(): + with pytest.raises(brotli.error): + brotli.decompress(brotli.compress(b'a') + b'a')
diff --git a/python/tests/decompressor_test.py b/python/tests/decompressor_test.py index 1aa9ecb..7d00f14 100644 --- a/python/tests/decompressor_test.py +++ b/python/tests/decompressor_test.py
@@ -3,116 +3,89 @@ # Distributed under MIT license. # See file LICENSE for detail or copy at https://opensource.org/licenses/MIT -import functools -import os -import unittest - import brotli +import pytest from . import _test_utils - -def _get_original_name(test_data): - return test_data.split('.compressed')[0] +MIN_OUTPUT_BUFFER_SIZE = 32768 # Actually, several bytes less. -class TestDecompressor(_test_utils.TestCase): +@pytest.mark.parametrize( + 'compressed_name, original_name', _test_utils.gather_compressed_inputs() +) +def test_decompress(compressed_name, original_name): + decompressor = brotli.Decompressor() + compressed = _test_utils.take_input(compressed_name) + original = _test_utils.take_input(original_name) + chunk_size = 1 + chunks = _test_utils.chunk_input(compressed, chunk_size) + decompressed = b'' + for chunk in chunks: + decompressed += decompressor.process(chunk) + assert decompressor.is_finished() + assert original == decompressed - CHUNK_SIZE = 1 - MIN_OUTPUT_BUFFER_SIZE = 32768 # Actually, several bytes less. - def setUp(self): - super().setUp() - self.decompressor = brotli.Decompressor() - - def tearDown(self): - self.decompressor = None - super().tearDown() - - def _check_decompression(self, test_data): - # Verify decompression matches the original. - temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) - original = _get_original_name(test_data) - self.assert_files_match(temp_uncompressed, original) - - def _decompress(self, test_data): - temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) - with open(temp_uncompressed, 'wb') as out_file: - with open(test_data, 'rb') as in_file: - read_chunk = functools.partial(in_file.read, self.CHUNK_SIZE) - for data in iter(read_chunk, b''): - out_file.write(self.decompressor.process(data)) - self.assertTrue(self.decompressor.is_finished()) - - def _decompress_with_limit(self, test_data): - output_buffer_limit = 10922 - temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) - with open(temp_uncompressed, 'wb') as out_file: - with open(test_data, 'rb') as in_file: - chunk_iter = iter(functools.partial(in_file.read, 10 * 1024), b'') - while not self.decompressor.is_finished(): - data = b'' - if self.decompressor.can_accept_more_data(): - data = next(chunk_iter, b'') - decompressed_data = self.decompressor.process( - data, output_buffer_limit=output_buffer_limit - ) - self.assertLessEqual( - len(decompressed_data), self.MIN_OUTPUT_BUFFER_SIZE - ) - out_file.write(decompressed_data) - self.assertIsNone(next(chunk_iter, None)) - - def _test_decompress(self, test_data): - self._decompress(test_data) - self._check_decompression(test_data) - - def _test_decompress_with_limit(self, test_data): - self._decompress_with_limit(test_data) - self._check_decompression(test_data) - - def test_too_much_input(self): - with open( - os.path.join(_test_utils.TESTDATA_DIR, 'zerosukkanooa.compressed'), 'rb' - ) as in_file: - compressed = in_file.read() - self.decompressor.process(compressed[:-1], output_buffer_limit=10240) - # the following assertion checks whether the test setup is correct - self.assertFalse(self.decompressor.can_accept_more_data()) - with self.assertRaises(brotli.error): - self.decompressor.process(compressed[-1:]) - - def test_changing_limit(self): - test_data = os.path.join( - _test_utils.TESTDATA_DIR, 'zerosukkanooa.compressed' +@pytest.mark.parametrize( + 'compressed_name, original_name', _test_utils.gather_compressed_inputs() +) +def test_decompress_with_limit(compressed_name, original_name): + decompressor = brotli.Decompressor() + compressed = _test_utils.take_input(compressed_name) + original = _test_utils.take_input(original_name) + chunk_size = 10 * 1024 + output_buffer_limit = 10922 + chunks = _test_utils.chunk_input(compressed, chunk_size) + decompressed = b'' + while not decompressor.is_finished(): + data = b'' + if decompressor.can_accept_more_data() and chunks: + data = chunks.pop(0) + decompressed_chunk = decompressor.process( + data, output_buffer_limit=output_buffer_limit ) - check_output = os.path.exists(test_data.replace('.compressed', '')) - temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) - with open(temp_uncompressed, 'wb') as out_file: - with open(test_data, 'rb') as in_file: - compressed = in_file.read() - uncompressed = self.decompressor.process( - compressed[:-1], output_buffer_limit=10240 - ) - self.assertLessEqual(len(uncompressed), self.MIN_OUTPUT_BUFFER_SIZE) - out_file.write(uncompressed) - while not self.decompressor.can_accept_more_data(): - out_file.write(self.decompressor.process(b'')) - out_file.write(self.decompressor.process(compressed[-1:])) - if check_output: - self._check_decompression(test_data) - - def test_garbage_appended(self): - with self.assertRaises(brotli.error): - self.decompressor.process(brotli.compress(b'a') + b'a') - - def test_already_finished(self): - self.decompressor.process(brotli.compress(b'a')) - with self.assertRaises(brotli.error): - self.decompressor.process(b'a') + assert len(decompressed_chunk) <= MIN_OUTPUT_BUFFER_SIZE + decompressed += decompressed_chunk + assert not chunks + assert original == decompressed -_test_utils.generate_test_methods(TestDecompressor, for_decompression=True) +def test_too_much_input(): + decompressor = brotli.Decompressor() + compressed = _test_utils.take_input('zerosukkanooa.compressed') + decompressor.process(compressed[:-1], output_buffer_limit=10240) + # The following assertion checks whether the test setup is correct. + assert not decompressor.can_accept_more_data() + with pytest.raises(brotli.error): + decompressor.process(compressed[-1:]) -if __name__ == '__main__': - unittest.main() + +def test_changing_limit(): + decompressor = brotli.Decompressor() + input_name = 'zerosukkanooa' + compressed = _test_utils.take_input(input_name + '.compressed') + check_output = _test_utils.has_input(input_name) + decompressed = decompressor.process( + compressed[:-1], output_buffer_limit=10240 + ) + assert len(decompressed) <= MIN_OUTPUT_BUFFER_SIZE + while not decompressor.can_accept_more_data(): + decompressed += decompressor.process(b'') + decompressed += decompressor.process(compressed[-1:]) + if check_output: + original = _test_utils.take_input(input_name) + assert original == decompressed + + +def test_garbage_appended(): + decompressor = brotli.Decompressor() + with pytest.raises(brotli.error): + decompressor.process(brotli.compress(b'a') + b'a') + + +def test_already_finished(): + decompressor = brotli.Decompressor() + decompressor.process(brotli.compress(b'a')) + with pytest.raises(brotli.error): + decompressor.process(b'a')