package jobqueue import ( "context" "fmt" "github.com/google/uuid" "sync" "time" ) type WorkerStatus int const ( WorkerStatusStopped = iota WorkerStatusRunning ) type WorkerID string func (id WorkerID) String() string { return string(id) } // Worker is a worker type Worker interface { Start() error Stop() error Status() WorkerStatus AssignJob(job GenericJob) error GetID() WorkerID SetManager(manager *Manager) } // GenericWorker is a generic worker type GenericWorker struct { ID WorkerID status WorkerStatus } // LocalWorker is a worker that runs jobs locally type LocalWorker struct { GenericWorker jobChannels []chan GenericJob stopChans []chan bool cancelChans []chan bool maxJobs int mu sync.Mutex wg sync.WaitGroup manager *Manager } // GetID returns the ID of the worker func (w *GenericWorker) GetID() WorkerID { return w.ID } // NewLocalWorker creates a new local worker func NewLocalWorker(maxJobs int) *LocalWorker { w := &LocalWorker{maxJobs: maxJobs} w.jobChannels = make([]chan GenericJob, maxJobs) w.stopChans = make([]chan bool, maxJobs) w.cancelChans = make([]chan bool, maxJobs) w.ID = WorkerID(uuid.New().String()) return w } // Start starts the worker func (w *LocalWorker) Start() error { w.mu.Lock() defer w.mu.Unlock() if w.status == WorkerStatusRunning { return ErrWorkerAlreadyRunning } for i := 0; i < w.maxJobs; i++ { w.wg.Add(1) w.jobChannels[i] = make(chan GenericJob) w.stopChans[i] = make(chan bool) w.cancelChans[i] = make(chan bool) go w.run(w.jobChannels[i], w.stopChans[i], w.cancelChans[i]) } time.Sleep(200 * time.Millisecond) // wait go routine until select w.wg.Wait() w.status = WorkerStatusRunning if w.manager != nil && w.manager.logger != nil { w.manager.logger.Info("Worker started", "worker", w.ID) } return nil } func (w *LocalWorker) SetManager(manager *Manager) { w.mu.Lock() defer w.mu.Unlock() w.manager = manager } // Stop stops the worker func (w *LocalWorker) Stop() error { w.mu.Lock() defer w.mu.Unlock() if w.status == WorkerStatusStopped { return ErrWorkerNotRunning } w.status = WorkerStatusStopped for _, stopChan := range w.stopChans { stopChan <- true } if w.manager != nil && w.manager.logger != nil { w.manager.logger.Info("Worker stopped", "worker", w.ID) } return nil } func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancelChan chan bool) { workerThreadID := w.ID.String() + "-" + fmt.Sprintf("%p", &w) if w.manager != nil && w.manager.logger != nil { w.manager.logger.Info("Worker thread with id started", "worker", w.ID, "thread_id", workerThreadID) } stopFlag := false w.wg.Done() for { select { case job := <-jobChannel: ctx, cancel := context.WithCancel(context.Background()) retries := job.GetMaxRetries() retryDelay := job.GetRetryDelay() if retries == 0 { retries = 1 } var err error for retries > 0 { var cancel context.CancelFunc timeout := job.GetTimeout() if timeout > 0 { ctx, cancel = context.WithTimeout(ctx, timeout) } if w.manager != nil && w.manager.logger != nil { w.manager.logger.Info("Executing job on worker thread", "worker", w.ID, "thread_id", workerThreadID, "job_id", job.GetID()) } _, err = job.Execute(ctx) if cancel != nil { cancel() } if err == nil || ctx.Err() == context.Canceled { break } if retryDelay > 0 { time.Sleep(retryDelay) } retries-- } cancel() if w.manager != nil && w.manager.dbSaver != nil { err = w.manager.dbSaver.SaveJob(job) if err != nil { if w.manager.logger != nil { w.manager.logger.Error("Error while saving job", "job_id", job.GetID(), "error", err) } } } case <-stopChan: stopFlag = true break } if stopFlag { break } } if w.manager != nil && w.manager.logger != nil { w.manager.logger.Info("Worker thread with id stopped", "worker", w.ID, "thread_id", workerThreadID) } } // AssignJob assigns a job to the worker func (w *LocalWorker) AssignJob(job GenericJob) error { w.mu.Lock() defer w.mu.Unlock() if w.status != WorkerStatusRunning { return ErrWorkerNotRunning } for _, ch := range w.jobChannels { select { case ch <- job: return nil default: continue } } return ErrMaxJobsReached } // Status returns the status of the worker func (w *LocalWorker) Status() WorkerStatus { w.mu.Lock() defer w.mu.Unlock() return w.status }