blob: 1979e7c915614bbdea977a499a57133a35c984be [file] [log] [blame]
# Copyright 2016 The Brotli Authors. All rights reserved.
#
# Distributed under MIT license.
# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
import queue
import random
import threading
import time
import brotli
import pytest
from . import _test_utils
@pytest.mark.parametrize("quality", [1, 6, 9, 11])
@pytest.mark.parametrize("text_name", _test_utils.gather_text_inputs())
def test_single_process(quality, text_name):
original = _test_utils.take_input(text_name)
compressor = brotli.Compressor(quality=quality)
compressed = compressor.process(original)
compressed += compressor.finish()
decompressed = brotli.decompress(compressed)
assert original == decompressed
@pytest.mark.parametrize("quality", [1, 6, 9, 11])
@pytest.mark.parametrize("text_name", _test_utils.gather_text_inputs())
def test_multiple_process(quality, text_name):
original = _test_utils.take_input(text_name)
chunk_size = 2048
chunks = _test_utils.chunk_input(original, chunk_size)
compressor = brotli.Compressor(quality=quality)
compressed = b""
for chunk in chunks:
compressed += compressor.process(chunk)
compressed += compressor.finish()
decompressed = brotli.decompress(compressed)
assert original == decompressed
@pytest.mark.parametrize("quality", [1, 6, 9, 11])
@pytest.mark.parametrize("text_name", _test_utils.gather_text_inputs())
def test_multiple_process_and_flush(quality, text_name):
original = _test_utils.take_input(text_name)
chunk_size = 2048
chunks = _test_utils.chunk_input(original, chunk_size)
compressor = brotli.Compressor(quality=quality)
compressed = b""
for chunk in chunks:
compressed += compressor.process(chunk)
compressed += compressor.flush()
compressed += compressor.finish()
decompressed = brotli.decompress(compressed)
assert original == decompressed
def make_input(size):
abc = [bytes([b]) for b in b"abcdefghijklmnopqrstuvwxyz"]
abc_cap = [bytes([b]) for b in b"ABCDEFGHIJKLMNOPQRSTUVWXYZ"]
num_words_by_len = [0, 25, 100, 175, 1700, 1000, 1000, 1000]
word_set = set()
rng = random.Random()
rng.seed(2025)
words_by_len = [[]]
for word_len in range(1, len(num_words_by_len)):
num_words = num_words_by_len[word_len]
words = []
for _ in range(num_words):
while True:
word = b"".join(
[rng.choice(abc_cap)]
+ [rng.choice(abc) for _ in range(word_len - 1)]
)
if word not in word_set:
word_set.add(word)
words.append(word)
break
words_by_len.append(words)
total_size = 0
out = []
while total_size < size:
word_len = rng.choice(range(1, len(num_words_by_len)))
word = rng.choice(words_by_len[word_len])
total_size += len(word)
out.append(word)
return b"".join(out)
def _thread_compress(original, compressor, results):
compressed = compressor.process(original)
compressed += compressor.finish()
results.put(1)
def _thread_concurrent_process(compressor, results):
time.sleep(0.01)
try:
_ = compressor.process(b"whatever")
except brotli.error:
results.put(2)
def _thread_concurrent_flush(compressor, results):
time.sleep(0.02)
try:
_ = compressor.flush()
except brotli.error:
results.put(3)
def _thread_concurrent_finish(compressor, results):
time.sleep(0.03)
try:
_ = compressor.finish()
except brotli.error:
results.put(4)
def test_concurrency():
original = make_input(2 * 1024 * 1024)
compressor = brotli.Compressor(quality=9)
results = queue.Queue()
threads = []
threads.append(
threading.Thread(
target=_thread_compress, args=(original, compressor, results)
)
)
threads.append(
threading.Thread(
target=_thread_concurrent_process, args=(compressor, results)
)
)
threads.append(
threading.Thread(
target=_thread_concurrent_flush, args=(compressor, results)
)
)
threads.append(
threading.Thread(
target=_thread_concurrent_finish, args=(compressor, results)
)
)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
assert sorted(list(results.queue)) == [1, 2, 3, 4]