package jobqueue

import (
	"context"
	"sync"
	"time"
)

type WorkerStatus int

const (
	WorkerStatusStopped = iota
	WorkerStatusRunning
)

type WorkerID string

// Worker is a worker
type Worker interface {
	Start() error
	Stop() error
	Status() WorkerStatus
	AssignJob(job GenericJob) error

	GetID() WorkerID
}

// GenericWorker is a generic worker
type GenericWorker struct {
	ID     WorkerID
	status WorkerStatus
}

// LocalWorker is a worker that runs jobs locally
type LocalWorker struct {
	GenericWorker
	jobChannels []chan GenericJob
	stopChans   []chan bool
	cancelChans []chan bool
	maxJobs     int
	mu          sync.Mutex
	wg          sync.WaitGroup
}

// GetID returns the ID of the worker
func (w *GenericWorker) GetID() WorkerID {
	return w.ID
}

// NewLocalWorker creates a new local worker
func NewLocalWorker(maxJobs int) *LocalWorker {
	w := &LocalWorker{maxJobs: maxJobs}
	w.jobChannels = make([]chan GenericJob, maxJobs)
	w.stopChans = make([]chan bool, maxJobs)
	w.cancelChans = make([]chan bool, maxJobs)
	return w
}

// Start starts the worker
func (w *LocalWorker) Start() error {
	w.mu.Lock()
	defer w.mu.Unlock()

	if w.status == WorkerStatusRunning {
		return ErrWorkerAlreadyRunning
	}

	for i := 0; i < w.maxJobs; i++ {
		w.wg.Add(1)
		w.jobChannels[i] = make(chan GenericJob)
		w.stopChans[i] = make(chan bool)
		w.cancelChans[i] = make(chan bool)
		go w.run(w.jobChannels[i], w.stopChans[i], w.cancelChans[i])
	}

	w.wg.Wait()
	w.status = WorkerStatusRunning

	return nil
}

// Stop stops the worker
func (w *LocalWorker) Stop() error {
	w.mu.Lock()
	defer w.mu.Unlock()

	if w.status == WorkerStatusStopped {
		return ErrWorkerNotRunning
	}

	w.status = WorkerStatusStopped
	for _, stopChan := range w.stopChans {
		stopChan <- true
	}

	return nil
}

func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancelChan chan bool) {
	w.wg.Done()

	for {
		select {
		case job := <-jobChannel:
			ctx, cancel := context.WithCancel(context.Background())
			retries := job.GetMaxRetries()
			retryDelay := job.GetRetryDelay()

			if retries == 0 {
				retries = 1
			}

			var err error
			for retries > 0 {

				timeout := job.GetTimeout()
				if timeout == 0 {
					timeout = 1 * time.Minute
				}

				ctxTimeout, cancelTimeout := context.WithTimeout(ctx, timeout)
				_, err = job.Execute(ctxTimeout)
				cancelTimeout()

				if err == nil || ctx.Err() == context.Canceled {
					break
				}

				if retryDelay > 0 {
					time.Sleep(retryDelay)
				}

				retries--
			}

			select {
			case <-cancelChan:
				cancel()

			case <-ctx.Done():
				cancel()

			default:
				cancel()
			}
		case <-stopChan:
			return
		}
	}
}

// AssignJob assigns a job to the worker
func (w *LocalWorker) AssignJob(job GenericJob) error {
	w.mu.Lock()
	defer w.mu.Unlock()

	if w.status != WorkerStatusRunning {
		return ErrWorkerNotRunning
	}

	for _, ch := range w.jobChannels {

		select {
		case ch <- job:
			return nil
		default:
			continue
		}
	}

	return ErrMaxJobsReached
}

// Status returns the status of the worker
func (w *LocalWorker) Status() WorkerStatus {
	w.mu.Lock()
	defer w.mu.Unlock()
	return w.status
}