|  | #! /usr/bin/env python | 
|  | """Compression/decompression utility using the Brotli algorithm.""" | 
|  |  | 
|  | # Note: Python2 has been deprecated long ago, but some projects out in | 
|  | # the wide world may still use it nevertheless. This should not | 
|  | # deprive them from being able to run Brotli. | 
|  | from __future__ import print_function | 
|  |  | 
|  | import argparse | 
|  | import os | 
|  | import platform | 
|  | import sys | 
|  |  | 
|  | import brotli | 
|  |  | 
|  |  | 
|  | # default values of encoder parameters | 
|  | _DEFAULT_PARAMS = { | 
|  | 'mode': brotli.MODE_GENERIC, | 
|  | 'quality': 11, | 
|  | 'lgwin': 22, | 
|  | 'lgblock': 0, | 
|  | } | 
|  |  | 
|  |  | 
|  | def get_binary_stdio(stream): | 
|  | """Return the specified stdin/stdout/stderr stream. | 
|  |  | 
|  | If the stdio stream requested (i.e. sys.(stdin|stdout|stderr)) | 
|  | has been replaced with a stream object that does not have a `.buffer` | 
|  | attribute, this will return the original stdio stream's buffer, i.e. | 
|  | `sys.__(stdin|stdout|stderr)__.buffer`. | 
|  |  | 
|  | Args: | 
|  | stream: One of 'stdin', 'stdout', 'stderr'. | 
|  |  | 
|  | Returns: | 
|  | The stream, as a 'raw' buffer object (i.e. io.BufferedIOBase subclass | 
|  | instance such as io.Bufferedreader/io.BufferedWriter), suitable for | 
|  | reading/writing binary data from/to it. | 
|  | """ | 
|  | if stream == 'stdin': stdio = sys.stdin | 
|  | elif stream == 'stdout': stdio = sys.stdout | 
|  | elif stream == 'stderr': stdio = sys.stderr | 
|  | else: | 
|  | raise ValueError('invalid stream name: %s' % (stream,)) | 
|  | if sys.version_info[0] < 3: | 
|  | if sys.platform == 'win32': | 
|  | # set I/O stream binary flag on python2.x (Windows) | 
|  | runtime = platform.python_implementation() | 
|  | if runtime == 'PyPy': | 
|  | # the msvcrt trick doesn't work in pypy, so use fdopen(). | 
|  | mode = 'rb' if stream == 'stdin' else 'wb' | 
|  | stdio = os.fdopen(stdio.fileno(), mode, 0) | 
|  | else: | 
|  | # this works with CPython -- untested on other implementations | 
|  | import msvcrt | 
|  | msvcrt.setmode(stdio.fileno(), os.O_BINARY) | 
|  | return stdio | 
|  | else: | 
|  | try: | 
|  | return stdio.buffer | 
|  | except AttributeError: | 
|  | # The Python reference explains | 
|  | # (-> https://docs.python.org/3/library/sys.html#sys.stdin) | 
|  | # that the `.buffer` attribute might not exist, since | 
|  | # the standard streams might have been replaced by something else | 
|  | # (such as an `io.StringIO()` - perhaps via | 
|  | # `contextlib.redirect_stdout()`). | 
|  | # We fall back to the original stdio in these cases. | 
|  | if stream == 'stdin': return sys.__stdin__.buffer | 
|  | if stream == 'stdout': return sys.__stdout__.buffer | 
|  | if stream == 'stderr': return sys.__stderr__.buffer | 
|  | assert False, 'Impossible Situation.' | 
|  |  | 
|  |  | 
|  | def main(args=None): | 
|  |  | 
|  | parser = argparse.ArgumentParser( | 
|  | prog=os.path.basename(__file__), description=__doc__) | 
|  | parser.add_argument( | 
|  | '--version', action='version', version=brotli.version) | 
|  | parser.add_argument( | 
|  | '-i', | 
|  | '--input', | 
|  | metavar='FILE', | 
|  | type=str, | 
|  | dest='infile', | 
|  | help='Input file', | 
|  | default=None) | 
|  | parser.add_argument( | 
|  | '-o', | 
|  | '--output', | 
|  | metavar='FILE', | 
|  | type=str, | 
|  | dest='outfile', | 
|  | help='Output file', | 
|  | default=None) | 
|  | parser.add_argument( | 
|  | '-f', | 
|  | '--force', | 
|  | action='store_true', | 
|  | help='Overwrite existing output file', | 
|  | default=False) | 
|  | parser.add_argument( | 
|  | '-d', | 
|  | '--decompress', | 
|  | action='store_true', | 
|  | help='Decompress input file', | 
|  | default=False) | 
|  | params = parser.add_argument_group('optional encoder parameters') | 
|  | params.add_argument( | 
|  | '-m', | 
|  | '--mode', | 
|  | metavar='MODE', | 
|  | type=int, | 
|  | choices=[0, 1, 2], | 
|  | help='The compression mode can be 0 for generic input, ' | 
|  | '1 for UTF-8 encoded text, or 2 for WOFF 2.0 font data. ' | 
|  | 'Defaults to 0.') | 
|  | params.add_argument( | 
|  | '-q', | 
|  | '--quality', | 
|  | metavar='QUALITY', | 
|  | type=int, | 
|  | choices=list(range(0, 12)), | 
|  | help='Controls the compression-speed vs compression-density ' | 
|  | 'tradeoff. The higher the quality, the slower the ' | 
|  | 'compression. Range is 0 to 11. Defaults to 11.') | 
|  | params.add_argument( | 
|  | '--lgwin', | 
|  | metavar='LGWIN', | 
|  | type=int, | 
|  | choices=list(range(10, 25)), | 
|  | help='Base 2 logarithm of the sliding window size. Range is ' | 
|  | '10 to 24. Defaults to 22.') | 
|  | params.add_argument( | 
|  | '--lgblock', | 
|  | metavar='LGBLOCK', | 
|  | type=int, | 
|  | choices=[0] + list(range(16, 25)), | 
|  | help='Base 2 logarithm of the maximum input block size. ' | 
|  | 'Range is 16 to 24. If set to 0, the value will be set based ' | 
|  | 'on the quality. Defaults to 0.') | 
|  | # set default values using global _DEFAULT_PARAMS dictionary | 
|  | parser.set_defaults(**_DEFAULT_PARAMS) | 
|  |  | 
|  | options = parser.parse_args(args=args) | 
|  |  | 
|  | if options.infile: | 
|  | try: | 
|  | with open(options.infile, 'rb') as infile: | 
|  | data = infile.read() | 
|  | except OSError: | 
|  | parser.error('Could not read --infile: %s' % (infile,)) | 
|  | else: | 
|  | if sys.stdin.isatty(): | 
|  | # interactive console, just quit | 
|  | parser.error('No input (called from interactive terminal).') | 
|  | infile = get_binary_stdio('stdin') | 
|  | data = infile.read() | 
|  |  | 
|  | if options.outfile: | 
|  | # Caution! If `options.outfile` is a broken symlink, will try to | 
|  | # redirect the write according to symlink. | 
|  | if os.path.exists(options.outfile) and not options.force: | 
|  | parser.error(('Target --outfile=%s already exists, ' | 
|  | 'but --force was not requested.') % (options.outfile,)) | 
|  | outfile = open(options.outfile, 'wb') | 
|  | did_open_outfile = True | 
|  | else: | 
|  | outfile = get_binary_stdio('stdout') | 
|  | did_open_outfile = False | 
|  | try: | 
|  | try: | 
|  | if options.decompress: | 
|  | data = brotli.decompress(data) | 
|  | else: | 
|  | data = brotli.compress( | 
|  | data, | 
|  | mode=options.mode, | 
|  | quality=options.quality, | 
|  | lgwin=options.lgwin, | 
|  | lgblock=options.lgblock) | 
|  | outfile.write(data) | 
|  | finally: | 
|  | if did_open_outfile: outfile.close() | 
|  | except brotli.error as e: | 
|  | parser.exit(1, | 
|  | 'bro: error: %s: %s' % (e, options.infile or '{stdin}')) | 
|  |  | 
|  |  | 
|  | if __name__ == '__main__': | 
|  | main() |