Have concurrent rac reading require an io.ReaderAt
diff --git a/lib/rac/reader.go b/lib/rac/reader.go
index 503809a..4e9710e 100644
--- a/lib/rac/reader.go
+++ b/lib/rac/reader.go
@@ -95,7 +95,10 @@
// Bigger values often lead to faster throughput, up to a
// hardware-dependent point, but also larger memory requirements.
//
- // Zero means a non-concurrent (single-goroutine) reader.
+ // If positive, then the ReadSeeker must also be an io.ReaderAt.
+ //
+ // Non-positive values (including zero) mean a non-concurrent
+ // (single-goroutine) reader.
Concurrency int
// err is the first error encountered. It is sticky: once a non-nil error
@@ -175,6 +178,12 @@
}
r.chunkReader.ReadSeeker = r.ReadSeeker
r.chunkReader.CompressedSize = r.CompressedSize
+ if r.Concurrency > 0 {
+ if _, ok := r.ReadSeeker.(io.ReaderAt); !ok {
+ r.err = fmt.Errorf("rac: Concurrency > 0 requires the ReadSeeker to be an io.ReaderAt")
+ return r.err
+ }
+ }
if err := r.chunkReader.initialize(); err != nil {
r.err = err
return r.err
diff --git a/lib/raczlib/raczlib_test.go b/lib/raczlib/raczlib_test.go
index 318f5c2..0bcee96 100644
--- a/lib/raczlib/raczlib_test.go
+++ b/lib/raczlib/raczlib_test.go
@@ -75,12 +75,13 @@
return buf.Bytes(), nil
}
-func racDecompress(compressed []byte) ([]byte, error) {
+func racDecompress(compressed []byte, concurrency int) ([]byte, error) {
buf := &bytes.Buffer{}
r := &rac.Reader{
ReadSeeker: bytes.NewReader(compressed),
CompressedSize: int64(len(compressed)),
CodecReaders: []rac.CodecReader{&CodecReader{}},
+ Concurrency: concurrency,
}
defer r.Close()
if _, err := io.Copy(buf, r); err != nil {
@@ -89,8 +90,8 @@
return buf.Bytes(), nil
}
-func testReader(tt *testing.T, decoded string, encoded string) {
- g, err := racDecompress([]byte(encoded))
+func testReader(tt *testing.T, decoded string, encoded string, concurrency int) {
+ g, err := racDecompress([]byte(encoded), concurrency)
if err != nil {
tt.Fatalf("racDecompress: %v", err)
}
@@ -99,8 +100,9 @@
}
}
-func TestReaderSansDictionary(tt *testing.T) { testReader(tt, decodedMore, encodedMore) }
-func TestReaderWithDictionary(tt *testing.T) { testReader(tt, decodedSheep, encodedSheep) }
+func TestReaderSansDictionary(tt *testing.T) { testReader(tt, decodedMore, encodedMore, 0) }
+func TestReaderWithDictionary(tt *testing.T) { testReader(tt, decodedSheep, encodedSheep, 0) }
+func TestConcurrentReader(tt *testing.T) { testReader(tt, decodedSheep, encodedSheep, 2) }
func TestReaderConcatenation(tt *testing.T) {
// Create a RAC file whose decoding is the concatenation of two other RAC
@@ -174,6 +176,7 @@
testReader(tt,
decodedSheep+decodedMore,
encodedSheep+encodedMore+string(buf[:]),
+ 0,
)
}
@@ -262,7 +265,7 @@
compressedLengths[i] = len(compressed)
// Decompress.
- decompressed, err := racDecompress(compressed)
+ decompressed, err := racDecompress(compressed, 0)
if err != nil {
tt.Fatalf("i=%d: racDecompress: %v", i, err)
}