package util

import (
	"bytes"
	"encoding/gob"
	"sync"
)

const kNumDecoderGoroutines = 10

// GobEncoder encodes structs into bytes via GOB encoding. Not safe for
// concurrent use.
//
// Here's a template for writing a type-specific encoder:
//
// // FooEncoder encodes Foos into bytes via GOB encoding. Not safe for
// // concurrent use.
//
//	type FooEncoder {
//		util.GobEncoder
//	}
//
// // Next returns one of the Foox provided to Process (in arbitrary order) and
// // its serialized bytes. If any items remain, returns the item, the
// // serialized bytes, nil. If all items have been returned, returns nil, nil,
// // nil. If an error is encountered, returns nil, nil, error.
//
//	func (e *FooEncoder) Next() (*Foo, []byte, error) {
//		item, serialized, err := e.GobEncoder.Next()
//		if err != nil {
//			return nil, nil, err
//		} else if item == nil {
//			return nil, nil, nil
//		}
//		return item.(*Foo), serialized, nil
//	}
type GobEncoder struct {
	err    error
	items  []interface{}
	result [][]byte
}

// Process encodes the item into a byte slice that will be returned from
// Next() (in arbitrary order). Returns false if Next is certain to return an
// error. Caller must ensure item does not change until after the first call to
// Next(). May not be called after calling Next().
func (e *GobEncoder) Process(item interface{}) bool {
	if e.err != nil {
		return false
	}
	var buf bytes.Buffer
	if err := gob.NewEncoder(&buf).Encode(item); err != nil {
		e.err = err
		e.items = nil
		e.result = nil
		return false
	}
	e.items = append(e.items, item)
	e.result = append(e.result, buf.Bytes())
	return true
}

// Next returns one of the items provided to Process (in arbitrary order) and
// its serialized bytes. If any items remain, returns the item, the serialized
// bytes, nil. If all items have been returned, returns nil, nil, nil. If an
// error is encountered, returns nil, nil, error.
func (e *GobEncoder) Next() (interface{}, []byte, error) {
	if e.err != nil {
		return nil, nil, e.err
	}
	if len(e.items) == 0 {
		return nil, nil, nil
	}
	c := e.items[0]
	e.items = e.items[1:]
	serialized := e.result[0]
	e.result = e.result[1:]
	return c, serialized, nil
}

// GobDecoder decodes bytes into structs via GOB decoding. Not safe for
// concurrent use.
//
// Here's a template for writing a type-specific decoder:
//
// FooDecoder decodes bytes into Foos via GOB decoding. Not safe for
// concurrent use.
//
//	type FooDecoder struct {
//		*util.GobDecoder
//	}
//
// // NewFooDecoder returns a FooDecoder instance.
//
//	func NewFooDecoder() *FooDecoder {
//		return &FooDecoder{
//			GobDecoder: util.NewGobDecoder(func() interface{} {
//				return &Foo{}
//			}, func(ch <-chan interface{}) interface{} {
//				items := []*Foo{}
//				for item := range ch {
//					items = append(items, item.(*Foo))
//				}
//				return items
//			}),
//		}
//	}
//
// // Result returns all decoded Foos provided to Process (in arbitrary order), or
// // any error encountered.
//
//	func (d *FooDecoder) Result() ([]*Foo, error) {
//		res, err := d.GobDecoder.Result()
//		if err != nil {
//			return nil, err
//		}
//		return res.([]*Foo), nil
//	}
type GobDecoder struct {
	// input contains the incoming byte slices. Process() sends on this
	// channel, decode() receives from it, and Result() closes it.
	input chan []byte
	// output contains decoded items. decode() sends on this channel,
	// collect() receives from it, and run() closes it when all decode()
	// goroutines have finished.
	output chan interface{}
	// result contains the return value of Result(). collect() sends a single
	// value on this channel and closes it. Result() receives from it.
	result chan interface{}
	// errors contains the first error from any goroutine. It's a channel in
	// case multiple goroutines experience an error at the same time.
	errors chan error

	newItem     func() interface{}
	collectImpl func(<-chan interface{}) interface{}
}

// NewGobDecoder returns a GobDecoder instance. The first argument is a
// goroutine-safe function which returns a zero-valued instance of the type
// being decoded, eg.
//
//	func() interface{} {
//		return &MyType{}
//	}
//
// The second argument is a function which collects decoded instances of that
// type from a channel and returns a slice, eg.
//
//	func(ch <-chan interface{}) interface{} {
//		items := []*MyType{}
//		for item := range ch {
//			items = append(items, item.(*MyType))
//		}
//		return items
//	}
func NewGobDecoder(newItem func() interface{}, collect func(<-chan interface{}) interface{}) *GobDecoder {
	d := &GobDecoder{
		input:       make(chan []byte, kNumDecoderGoroutines*2),
		output:      make(chan interface{}, kNumDecoderGoroutines),
		result:      make(chan interface{}, 1),
		errors:      make(chan error, kNumDecoderGoroutines),
		newItem:     newItem,
		collectImpl: collect,
	}
	go d.run()
	go d.collect()
	return d
}

// run starts the decode goroutines and closes d.output when they finish.
func (d *GobDecoder) run() {
	// Start decoders.
	wg := sync.WaitGroup{}
	for i := 0; i < kNumDecoderGoroutines; i++ {
		wg.Add(1)
		go d.decode(&wg)
	}
	// Wait for decoders to exit.
	wg.Wait()
	// Drain d.input in the case that errors were encountered, to avoid deadlock.
	for range d.input {
	}
	close(d.output)
}

// decode receives from d.input and sends to d.output until d.input is closed or
// d.errors is non-empty. Decrements wg when done.
func (d *GobDecoder) decode(wg *sync.WaitGroup) {
	for b := range d.input {
		item := d.newItem()
		if err := gob.NewDecoder(bytes.NewReader(b)).Decode(item); err != nil {
			d.errors <- err
			break
		}
		d.output <- item
		if len(d.errors) > 0 {
			break
		}
	}
	wg.Done()
}

// collect receives from d.output until it is closed, then sends on d.result.
func (d *GobDecoder) collect() {
	d.result <- d.collectImpl(d.output)
	close(d.result)
}

// Process decodes the byte slice and includes it in Result() (in arbitrary
// order). Returns false if Result is certain to return an error. Caller must
// ensure b does not change until after Result() returns.
func (d *GobDecoder) Process(b []byte) bool {
	d.input <- b
	return len(d.errors) == 0
}

// Result returns all decoded items provided to Process (in arbitrary order), or
// any error encountered.
func (d *GobDecoder) Result() (interface{}, error) {
	close(d.input)
	select {
	case err := <-d.errors:
		return nil, err
	case result := <-d.result:
		return result, nil
	}
}
