package jobqueue import ( "context" "sync" "time" ) type WorkerStatus int const ( WorkerStatusStopped = iota WorkerStatusRunning ) type WorkerID string // Worker is a worker type Worker interface { Start() error Stop() error Status() WorkerStatus AssignJob(job GenericJob) error GetID() WorkerID } // GenericWorker is a generic worker type GenericWorker struct { ID WorkerID status WorkerStatus } // LocalWorker is a worker that runs jobs locally type LocalWorker struct { GenericWorker jobChannels []chan GenericJob stopChans []chan bool cancelChans []chan bool maxJobs int mu sync.Mutex wg sync.WaitGroup } // GetID returns the ID of the worker func (w *GenericWorker) GetID() WorkerID { return w.ID } // NewLocalWorker creates a new local worker func NewLocalWorker(maxJobs int) *LocalWorker { w := &LocalWorker{maxJobs: maxJobs} w.jobChannels = make([]chan GenericJob, maxJobs) w.stopChans = make([]chan bool, maxJobs) w.cancelChans = make([]chan bool, maxJobs) return w } // Start starts the worker func (w *LocalWorker) Start() error { w.mu.Lock() defer w.mu.Unlock() if w.status == WorkerStatusRunning { return ErrWorkerAlreadyRunning } for i := 0; i < w.maxJobs; i++ { w.wg.Add(1) w.jobChannels[i] = make(chan GenericJob) w.stopChans[i] = make(chan bool) w.cancelChans[i] = make(chan bool) go w.run(w.jobChannels[i], w.stopChans[i], w.cancelChans[i]) } w.wg.Wait() w.status = WorkerStatusRunning return nil } // Stop stops the worker func (w *LocalWorker) Stop() error { w.mu.Lock() defer w.mu.Unlock() if w.status == WorkerStatusStopped { return ErrWorkerNotRunning } w.status = WorkerStatusStopped for _, stopChan := range w.stopChans { stopChan <- true } return nil } func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancelChan chan bool) { w.wg.Done() for { select { case job := <-jobChannel: ctx, cancel := context.WithCancel(context.Background()) retries := job.GetMaxRetries() retryDelay := job.GetRetryDelay() if retries == 0 { retries = 1 } var err error for retries > 0 { timeout := job.GetTimeout() if timeout == 0 { timeout = 1 * time.Minute } ctxTimeout, cancelTimeout := context.WithTimeout(ctx, timeout) _, err = job.Execute(ctxTimeout) cancelTimeout() if err == nil || ctx.Err() == context.Canceled { break } if retryDelay > 0 { time.Sleep(retryDelay) } retries-- } select { case <-cancelChan: cancel() case <-ctx.Done(): cancel() default: cancel() } case <-stopChan: return } } } // AssignJob assigns a job to the worker func (w *LocalWorker) AssignJob(job GenericJob) error { w.mu.Lock() defer w.mu.Unlock() if w.status != WorkerStatusRunning { return ErrWorkerNotRunning } for _, ch := range w.jobChannels { select { case ch <- job: return nil default: continue } } return ErrMaxJobsReached } // Status returns the status of the worker func (w *LocalWorker) Status() WorkerStatus { w.mu.Lock() defer w.mu.Unlock() return w.status }