| # Copyright 2016 The Brotli Authors. All rights reserved. |
| # |
| # Distributed under MIT license. |
| # See file LICENSE for detail or copy at https://opensource.org/licenses/MIT |
| |
| import functools |
| import unittest |
| |
| from . import _test_utils |
| import brotli |
| |
| |
| def _get_original_name(test_data): |
| return test_data.split('.compressed')[0] |
| |
| |
| class TestDecompressor(_test_utils.TestCase): |
| |
| CHUNK_SIZE = 1 |
| |
| def setUp(self): |
| self.decompressor = brotli.Decompressor() |
| |
| def tearDown(self): |
| self.decompressor = None |
| |
| def _check_decompression(self, test_data): |
| # Verify decompression matches the original. |
| temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) |
| original = _get_original_name(test_data) |
| self.assertFilesMatch(temp_uncompressed, original) |
| |
| def _decompress(self, test_data): |
| temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) |
| with open(temp_uncompressed, 'wb') as out_file: |
| with open(test_data, 'rb') as in_file: |
| read_chunk = functools.partial(in_file.read, self.CHUNK_SIZE) |
| for data in iter(read_chunk, b''): |
| out_file.write(self.decompressor.process(data)) |
| self.assertTrue(self.decompressor.is_finished()) |
| |
| def _test_decompress(self, test_data): |
| self._decompress(test_data) |
| self._check_decompression(test_data) |
| |
| def test_garbage_appended(self): |
| with self.assertRaises(brotli.error): |
| self.decompressor.process(brotli.compress(b'a') + b'a') |
| |
| def test_already_finished(self): |
| self.decompressor.process(brotli.compress(b'a')) |
| with self.assertRaises(brotli.error): |
| self.decompressor.process(b'a') |
| |
| |
| _test_utils.generate_test_methods(TestDecompressor, for_decompression=True) |
| |
| if __name__ == '__main__': |
| unittest.main() |