Have ractool decode in parallel

The "middle of the decompressed file" numbers changed because the
programs were re-run on a desktop, not a laptop.
diff --git a/cmd/ractool/data.go b/cmd/ractool/data.go
index 55e63a1..0e4ce76 100644
--- a/cmd/ractool/data.go
+++ b/cmd/ractool/data.go
@@ -25,6 +25,11 @@
 
 The flags should include exactly one of -decode or -encode.
 
+By default, a RAC file's chunks are decoded in parallel, using more total CPU
+time to substantially reduce the real (wall clock) time taken. Batch (instead
+of interactive) processing of many RAC files may want to pass -singlethreaded
+to prefer minimizing total CPU time.
+
 When encoding, the input is partitioned into chunks and each chunk is
 compressed independently. You can specify the target chunk size in terms of
 either its compressed size or decompressed size. By default (if both
@@ -68,6 +73,8 @@
 
     -drange
         the "i:j" range to decompress, ":8" means the first 8 bytes
+    -singlethreaded
+        whether to decode on a single execution thread
 
 Encode-Related Flags:
 
@@ -130,13 +137,25 @@
     $ time unzip -p enwik8.zip | dd if=/dev/stdin status=none \
     >     iflag=skip_bytes,count_bytes skip=50000000 count=8
     Business
-    real    0m0.621s
-    user    0m0.623s
-    sys     0m0.105s
+    real    0m0.392s
+    user    0m0.407s
+    sys     0m0.118s
     $ time ractool -decode -drange=50000000:50000008 shared.rac
     Business
-    real    0m0.007s
+    real    0m0.003s
     user    0m0.004s
-    sys     0m0.004s
+    sys     0m0.000s
+
+    $ # A RAC file's chunks can be decoded in parallel, unlike ZIP,
+    $ # substantially reducing the real (wall clock) time taken even
+    $ # though both of these files use DEFLATE (RFC 1951) compression.
+    $ time unzip -p enwik8.zip        > /dev/null
+    real    0m0.737s
+    user    0m0.713s
+    sys     0m0.025s
+    $ time ractool -decode shared.rac > /dev/null
+    real    0m0.095s
+    user    0m1.316s
+    sys     0m0.069s
     --------
 `
diff --git a/cmd/ractool/main.go b/cmd/ractool/main.go
index c702b5a..44b0168 100644
--- a/cmd/ractool/main.go
+++ b/cmd/ractool/main.go
@@ -31,6 +31,11 @@
 
 The flags should include exactly one of -decode or -encode.
 
+By default, a RAC file's chunks are decoded in parallel, using more total CPU
+time to substantially reduce the real (wall clock) time taken. Batch (instead
+of interactive) processing of many RAC files may want to pass -singlethreaded
+to prefer minimizing total CPU time.
+
 When encoding, the input is partitioned into chunks and each chunk is
 compressed independently. You can specify the target chunk size in terms of
 either its compressed size or decompressed size. By default (if both
@@ -74,6 +79,8 @@
 
     -drange
         the "i:j" range to decompress, ":8" means the first 8 bytes
+    -singlethreaded
+        whether to decode on a single execution thread
 
 Encode-Related Flags:
 
@@ -136,14 +143,26 @@
     $ time unzip -p enwik8.zip | dd if=/dev/stdin status=none \
     >     iflag=skip_bytes,count_bytes skip=50000000 count=8
     Business
-    real    0m0.621s
-    user    0m0.623s
-    sys     0m0.105s
+    real    0m0.392s
+    user    0m0.407s
+    sys     0m0.118s
     $ time ractool -decode -drange=50000000:50000008 shared.rac
     Business
-    real    0m0.007s
+    real    0m0.003s
     user    0m0.004s
-    sys     0m0.004s
+    sys     0m0.000s
+
+    $ # A RAC file's chunks can be decoded in parallel, unlike ZIP,
+    $ # substantially reducing the real (wall clock) time taken even
+    $ # though both of these files use DEFLATE (RFC 1951) compression.
+    $ time unzip -p enwik8.zip        > /dev/null
+    real    0m0.737s
+    user    0m0.713s
+    sys     0m0.025s
+    $ time ractool -decode shared.rac > /dev/null
+    real    0m0.095s
+    user    0m1.316s
+    sys     0m0.069s
     --------
 */
 package main
@@ -156,6 +175,7 @@
 	"io"
 	"io/ioutil"
 	"os"
+	"runtime"
 	"strconv"
 	"strings"
 
@@ -172,6 +192,8 @@
 	// Decode-related flags.
 	drangeFlag = flag.String("drange", ":",
 		"the \"i:j\" range to decompress, \":8\" means the first 8 bytes")
+	singlethreadedFlag = flag.Bool("singlethreaded", false,
+		"whether to decode on a single execution thread")
 
 	// Encode-related flags.
 	codecFlag         = flag.String("codec", "zlib", "the compression codec")
@@ -199,7 +221,7 @@
 	flag.Usage = usage
 	flag.Parse()
 
-	r, usingStdin := io.Reader(os.Stdin), true
+	inFile := os.Stdin
 	switch flag.NArg() {
 	case 0:
 		// No-op.
@@ -209,16 +231,16 @@
 			return err
 		}
 		defer f.Close()
-		r, usingStdin = f, false
+		inFile = f
 	default:
 		return errors.New("too many filenames; the maximum is one")
 	}
 
 	if *decodeFlag && !*encodeFlag {
-		return decode(r, usingStdin)
+		return decode(inFile)
 	}
 	if *encodeFlag && !*decodeFlag {
-		return encode(r)
+		return encode(inFile)
 	}
 	return errors.New("must specify exactly one of -decode or -encode")
 }
@@ -283,43 +305,36 @@
 	return i, j, true
 }
 
-func decode(r io.Reader, usingStdin bool) error {
+func decode(inFile *os.File) error {
+	switch *codecFlag {
+	case "zlib":
+		// No-op.
+	default:
+		return errors.New("unsupported -codec")
+	}
 	i, j, ok := parseRange(*drangeFlag)
 	if !ok {
 		return fmt.Errorf("invalid -drange")
 	}
 
-	rs, ok := r.(io.ReadSeeker)
-	if !ok {
-		return fmt.Errorf("input is not seekable")
-	}
-
-	// Even if the os.File type is a ReadSeeker, it might not actually support
-	// seeking. Instead, read all of stdin into memory.
-	if usingStdin {
-		if input, err := ioutil.ReadAll(r); err != nil {
+	rs := io.ReadSeeker(inFile)
+	compressedSize, err := inFile.Seek(0, io.SeekEnd)
+	if err != nil {
+		// This seek-to-end error isn't fatal. The input might not actually be
+		// seekable, despite being an *os.File: "cat foo | ractool -decode".
+		// Instead, read all of the inFile into memory.
+		if inBytes, err := ioutil.ReadAll(inFile); err != nil {
 			return err
 		} else {
-			rs = bytes.NewReader(input)
+			rs = bytes.NewReader(inBytes)
+			compressedSize = int64(len(inBytes))
 		}
 	}
-
-	compressedSize, err := rs.Seek(0, io.SeekEnd)
-	if err != nil {
-		return err
-	}
-	racReader := &rac.Reader{
+	chunkReader := &rac.ChunkReader{
 		ReadSeeker:     rs,
 		CompressedSize: compressedSize,
 	}
-	switch *codecFlag {
-	case "zlib":
-		racReader.CodecReaders = []rac.CodecReader{&raczlib.CodecReader{}}
-	default:
-		return errors.New("unsupported -codec")
-	}
-
-	decompressedSize, err := racReader.Seek(0, io.SeekEnd)
+	decompressedSize, err := chunkReader.DecompressedSize()
 	if err != nil {
 		return err
 	}
@@ -332,17 +347,166 @@
 	if i >= j {
 		return nil
 	}
-	if _, err := racReader.Seek(i, io.SeekStart); err != nil {
-		return err
+
+	if *singlethreadedFlag {
+		racReader := mustMakeRACReader(rs, compressedSize)
+		return decodeSingleThreaded(os.Stdout, rac.Range{i, j}, racReader)
+	}
+	return decodeMultiThreaded(os.Stdout, rac.Range{i, j}, rs, compressedSize)
+}
+
+func decodeMultiThreaded(dst io.Writer, overallDRange rac.Range, rs io.ReadSeeker, compressedSize int64) error {
+	numWorkers := runtime.NumCPU()
+	// After 16 workers, we seem to hit diminishing returns.
+	if numWorkers > 16 {
+		numWorkers = 16
 	}
 
-	_, err = io.Copy(os.Stdout, &io.LimitedReader{
+	// Set up re-usable buffers to hold decoded bytes. The number of buffers is
+	// arbitrary, but is larger than the number of workers because individual
+	// pieces of work (the decodeWork type) may complete in a different order
+	// than they need to be written to dst. Having spare buffers means fewer
+	// idle workers.
+	bufc := make(chan *bytes.Buffer, 2*numWorkers)
+	for i := 0; i < cap(bufc); i++ {
+		bufc <- &bytes.Buffer{}
+	}
+
+	// Set up the workers, fed incomplete decodeWork's by issueDecodeWork.
+	reqc := make(chan decodeWork, numWorkers)
+	resc := make(chan decodeWork, numWorkers)
+	go issueDecodeWork(reqc, bufc, overallDRange, rs, compressedSize)
+	for i := 0; i < numWorkers; i++ {
+		go decodeWorker(resc, reqc, mustMakeRACReader(rs, compressedSize))
+	}
+
+	// m holds completed decodeWork's, which may arrive out of order. The next
+	// (in ascending DRange order) decodeWork starts at nextDRange0.
+	m := map[int64]*bytes.Buffer{}
+	nextDRange0 := overallDRange[0]
+
+	// Handle completed decodeWork's.
+	for work := range resc {
+		// Add the next arrival to m (if non-empty), then pop all of m's
+		// entries that are next in ascending DRange order, returning any used
+		// buffers to bufc to be recycled.
+		if work.buf == nil {
+			// No-op.
+		} else if work.buf.Len() == 0 {
+			bufc <- work.buf
+		} else {
+			m[work.dRange[0]] = work.buf
+			for {
+				buf, ok := m[nextDRange0]
+				if !ok {
+					break
+				}
+				delete(m, nextDRange0)
+				n, err := dst.Write(buf.Bytes())
+				if err != nil {
+					return err
+				}
+				nextDRange0 += int64(n)
+				bufc <- buf
+			}
+		}
+
+		if work.err == errEndOfWork {
+			numWorkers--
+			if numWorkers != 0 {
+				continue
+			}
+			return nil
+		} else if work.err != nil {
+			return work.err
+		}
+
+		// This shouldn't happen, but if it does, print a specific error
+		// message instead of the more general "deadlock" message.
+		if len(m) == cap(bufc) {
+			return errors.New("internal error: all workers idle but the next chunk is not found")
+		}
+	}
+	panic("unreachable")
+}
+
+var errEndOfWork = errors.New("end of work")
+
+// decodeWork is a unit of work for multi-threaded decoding. Workers receive
+// empty buffers and send filled buffers.
+type decodeWork struct {
+	dRange rac.Range
+	buf    *bytes.Buffer
+	err    error
+}
+
+func decodeWorker(resc chan<- decodeWork, reqc <-chan decodeWork, racReader *rac.Reader) {
+	for work := range reqc {
+		if work.err == nil {
+			work.err = decodeSingleThreaded(work.buf, work.dRange, racReader)
+		}
+		resc <- work
+		if work.err != nil {
+			break
+		}
+	}
+	resc <- decodeWork{err: errEndOfWork}
+}
+
+func issueDecodeWork(reqc chan<- decodeWork, bufc <-chan *bytes.Buffer, overallDRange rac.Range, rs io.ReadSeeker, compressedSize int64) {
+	defer close(reqc)
+	chunkReader := &rac.ChunkReader{
+		ReadSeeker:     rs,
+		CompressedSize: compressedSize,
+	}
+	if err := chunkReader.SeekToChunkContaining(overallDRange[0]); err != nil {
+		reqc <- decodeWork{err: err}
+		return
+	}
+	for {
+		chunk, err := chunkReader.NextChunk()
+		if err == io.EOF {
+			return
+		} else if err != nil {
+			reqc <- decodeWork{err: err}
+			return
+		}
+		if dr := chunk.DRange.Intersect(overallDRange); !dr.Empty() {
+			buf := <-bufc
+			buf.Reset()
+			reqc <- decodeWork{dRange: dr, buf: buf}
+		}
+		if chunk.DRange[1] >= overallDRange[1] {
+			return
+		}
+	}
+}
+
+func decodeSingleThreaded(dst io.Writer, dRange rac.Range, racReader *rac.Reader) error {
+	if _, err := racReader.Seek(dRange[0], io.SeekStart); err != nil {
+		return err
+	}
+	_, err := io.Copy(dst, &io.LimitedReader{
 		R: racReader,
-		N: j - i,
+		N: dRange.Size(),
 	})
 	return err
 }
 
+func mustMakeRACReader(rs io.ReadSeeker, compressedSize int64) *rac.Reader {
+	r := &rac.Reader{
+		ReadSeeker:     rs,
+		CompressedSize: compressedSize,
+	}
+	switch *codecFlag {
+	case "zlib":
+		r.CodecReaders = []rac.CodecReader{&raczlib.CodecReader{}}
+	default:
+		panic("unreachable")
+	}
+	return r
+}
+
 func encode(r io.Reader) error {
 	indexLocation := rac.IndexLocation(0)
 	switch *indexlocationFlag {
diff --git a/lib/rac/chunk_reader.go b/lib/rac/chunk_reader.go
index 4190379..28dc0ed 100644
--- a/lib/rac/chunk_reader.go
+++ b/lib/rac/chunk_reader.go
@@ -37,8 +37,21 @@
 // greater than high.
 type Range [2]int64
 
-func (r *Range) Empty() bool { return r[0] == r[1] }
-func (r *Range) Size() int64 { return r[1] - r[0] }
+func (r Range) Empty() bool { return r[0] == r[1] }
+func (r Range) Size() int64 { return r[1] - r[0] }
+
+func (r Range) Intersect(s Range) Range {
+	if r[0] < s[0] {
+		r[0] = s[0]
+	}
+	if r[1] > s[1] {
+		r[1] = s[1]
+	}
+	if r[0] > r[1] {
+		return Range{}
+	}
+	return r
+}
 
 // Chunk is a compressed chunk returned by a ChunkReader.
 //