|  | package util | 
|  |  | 
|  | import ( | 
|  | "bytes" | 
|  | "encoding/gob" | 
|  | "sync" | 
|  | ) | 
|  |  | 
|  | const kNumDecoderGoroutines = 10 | 
|  |  | 
|  | // GobEncoder encodes structs into bytes via GOB encoding. Not safe for | 
|  | // concurrent use. | 
|  | // | 
|  | // Here's a template for writing a type-specific encoder: | 
|  | // | 
|  | // // FooEncoder encodes Foos into bytes via GOB encoding. Not safe for | 
|  | // // concurrent use. | 
|  | // | 
|  | //	type FooEncoder { | 
|  | //		util.GobEncoder | 
|  | //	} | 
|  | // | 
|  | // // Next returns one of the Foox provided to Process (in arbitrary order) and | 
|  | // // its serialized bytes. If any items remain, returns the item, the | 
|  | // // serialized bytes, nil. If all items have been returned, returns nil, nil, | 
|  | // // nil. If an error is encountered, returns nil, nil, error. | 
|  | // | 
|  | //	func (e *FooEncoder) Next() (*Foo, []byte, error) { | 
|  | //		item, serialized, err := e.GobEncoder.Next() | 
|  | //		if err != nil { | 
|  | //			return nil, nil, err | 
|  | //		} else if item == nil { | 
|  | //			return nil, nil, nil | 
|  | //		} | 
|  | //		return item.(*Foo), serialized, nil | 
|  | //	} | 
|  | type GobEncoder struct { | 
|  | err    error | 
|  | items  []interface{} | 
|  | result [][]byte | 
|  | } | 
|  |  | 
|  | // Process encodes the item into a byte slice that will be returned from | 
|  | // Next() (in arbitrary order). Returns false if Next is certain to return an | 
|  | // error. Caller must ensure item does not change until after the first call to | 
|  | // Next(). May not be called after calling Next(). | 
|  | func (e *GobEncoder) Process(item interface{}) bool { | 
|  | if e.err != nil { | 
|  | return false | 
|  | } | 
|  | var buf bytes.Buffer | 
|  | if err := gob.NewEncoder(&buf).Encode(item); err != nil { | 
|  | e.err = err | 
|  | e.items = nil | 
|  | e.result = nil | 
|  | return false | 
|  | } | 
|  | e.items = append(e.items, item) | 
|  | e.result = append(e.result, buf.Bytes()) | 
|  | return true | 
|  | } | 
|  |  | 
|  | // Next returns one of the items provided to Process (in arbitrary order) and | 
|  | // its serialized bytes. If any items remain, returns the item, the serialized | 
|  | // bytes, nil. If all items have been returned, returns nil, nil, nil. If an | 
|  | // error is encountered, returns nil, nil, error. | 
|  | func (e *GobEncoder) Next() (interface{}, []byte, error) { | 
|  | if e.err != nil { | 
|  | return nil, nil, e.err | 
|  | } | 
|  | if len(e.items) == 0 { | 
|  | return nil, nil, nil | 
|  | } | 
|  | c := e.items[0] | 
|  | e.items = e.items[1:] | 
|  | serialized := e.result[0] | 
|  | e.result = e.result[1:] | 
|  | return c, serialized, nil | 
|  | } | 
|  |  | 
|  | // GobDecoder decodes bytes into structs via GOB decoding. Not safe for | 
|  | // concurrent use. | 
|  | // | 
|  | // Here's a template for writing a type-specific decoder: | 
|  | // | 
|  | // FooDecoder decodes bytes into Foos via GOB decoding. Not safe for | 
|  | // concurrent use. | 
|  | // | 
|  | //	type FooDecoder struct { | 
|  | //		*util.GobDecoder | 
|  | //	} | 
|  | // | 
|  | // // NewFooDecoder returns a FooDecoder instance. | 
|  | // | 
|  | //	func NewFooDecoder() *FooDecoder { | 
|  | //		return &FooDecoder{ | 
|  | //			GobDecoder: util.NewGobDecoder(func() interface{} { | 
|  | //				return &Foo{} | 
|  | //			}, func(ch <-chan interface{}) interface{} { | 
|  | //				items := []*Foo{} | 
|  | //				for item := range ch { | 
|  | //					items = append(items, item.(*Foo)) | 
|  | //				} | 
|  | //				return items | 
|  | //			}), | 
|  | //		} | 
|  | //	} | 
|  | // | 
|  | // // Result returns all decoded Foos provided to Process (in arbitrary order), or | 
|  | // // any error encountered. | 
|  | // | 
|  | //	func (d *FooDecoder) Result() ([]*Foo, error) { | 
|  | //		res, err := d.GobDecoder.Result() | 
|  | //		if err != nil { | 
|  | //			return nil, err | 
|  | //		} | 
|  | //		return res.([]*Foo), nil | 
|  | //	} | 
|  | type GobDecoder struct { | 
|  | // input contains the incoming byte slices. Process() sends on this | 
|  | // channel, decode() receives from it, and Result() closes it. | 
|  | input chan []byte | 
|  | // output contains decoded items. decode() sends on this channel, | 
|  | // collect() receives from it, and run() closes it when all decode() | 
|  | // goroutines have finished. | 
|  | output chan interface{} | 
|  | // result contains the return value of Result(). collect() sends a single | 
|  | // value on this channel and closes it. Result() receives from it. | 
|  | result chan interface{} | 
|  | // errors contains the first error from any goroutine. It's a channel in | 
|  | // case multiple goroutines experience an error at the same time. | 
|  | errors chan error | 
|  |  | 
|  | newItem     func() interface{} | 
|  | collectImpl func(<-chan interface{}) interface{} | 
|  | } | 
|  |  | 
|  | // NewGobDecoder returns a GobDecoder instance. The first argument is a | 
|  | // goroutine-safe function which returns a zero-valued instance of the type | 
|  | // being decoded, eg. | 
|  | // | 
|  | //	func() interface{} { | 
|  | //		return &MyType{} | 
|  | //	} | 
|  | // | 
|  | // The second argument is a function which collects decoded instances of that | 
|  | // type from a channel and returns a slice, eg. | 
|  | // | 
|  | //	func(ch <-chan interface{}) interface{} { | 
|  | //		items := []*MyType{} | 
|  | //		for item := range ch { | 
|  | //			items = append(items, item.(*MyType)) | 
|  | //		} | 
|  | //		return items | 
|  | //	} | 
|  | func NewGobDecoder(newItem func() interface{}, collect func(<-chan interface{}) interface{}) *GobDecoder { | 
|  | d := &GobDecoder{ | 
|  | input:       make(chan []byte, kNumDecoderGoroutines*2), | 
|  | output:      make(chan interface{}, kNumDecoderGoroutines), | 
|  | result:      make(chan interface{}, 1), | 
|  | errors:      make(chan error, kNumDecoderGoroutines), | 
|  | newItem:     newItem, | 
|  | collectImpl: collect, | 
|  | } | 
|  | go d.run() | 
|  | go d.collect() | 
|  | return d | 
|  | } | 
|  |  | 
|  | // run starts the decode goroutines and closes d.output when they finish. | 
|  | func (d *GobDecoder) run() { | 
|  | // Start decoders. | 
|  | wg := sync.WaitGroup{} | 
|  | for i := 0; i < kNumDecoderGoroutines; i++ { | 
|  | wg.Add(1) | 
|  | go d.decode(&wg) | 
|  | } | 
|  | // Wait for decoders to exit. | 
|  | wg.Wait() | 
|  | // Drain d.input in the case that errors were encountered, to avoid deadlock. | 
|  | for range d.input { | 
|  | } | 
|  | close(d.output) | 
|  | } | 
|  |  | 
|  | // decode receives from d.input and sends to d.output until d.input is closed or | 
|  | // d.errors is non-empty. Decrements wg when done. | 
|  | func (d *GobDecoder) decode(wg *sync.WaitGroup) { | 
|  | for b := range d.input { | 
|  | item := d.newItem() | 
|  | if err := gob.NewDecoder(bytes.NewReader(b)).Decode(item); err != nil { | 
|  | d.errors <- err | 
|  | break | 
|  | } | 
|  | d.output <- item | 
|  | if len(d.errors) > 0 { | 
|  | break | 
|  | } | 
|  | } | 
|  | wg.Done() | 
|  | } | 
|  |  | 
|  | // collect receives from d.output until it is closed, then sends on d.result. | 
|  | func (d *GobDecoder) collect() { | 
|  | d.result <- d.collectImpl(d.output) | 
|  | close(d.result) | 
|  | } | 
|  |  | 
|  | // Process decodes the byte slice and includes it in Result() (in arbitrary | 
|  | // order). Returns false if Result is certain to return an error. Caller must | 
|  | // ensure b does not change until after Result() returns. | 
|  | func (d *GobDecoder) Process(b []byte) bool { | 
|  | d.input <- b | 
|  | return len(d.errors) == 0 | 
|  | } | 
|  |  | 
|  | // Result returns all decoded items provided to Process (in arbitrary order), or | 
|  | // any error encountered. | 
|  | func (d *GobDecoder) Result() (interface{}, error) { | 
|  | close(d.input) | 
|  | select { | 
|  | case err := <-d.errors: | 
|  | return nil, err | 
|  | case result := <-d.result: | 
|  | return result, nil | 
|  | } | 
|  | } |