Skip to content
Snippets Groups Projects
Select Git revision
  • fafc34f8f9295ac0b8bcd6903b33e89d6ab6fb09
  • master default protected
  • v1.23.2
  • v1.23.1
  • v1.23.0
  • v1.22.0
  • v1.21.1
  • v1.21.0
  • v1.20.3
  • v1.20.2
  • v1.20.1
  • v1.20.0
  • v1.19.4
  • v1.19.3
  • v1.19.2
  • v1.19.1
  • v1.19.0
  • v1.18.2
  • v1.18.1
  • v1.18.0
  • v1.17.0
  • v1.16.1
22 results

worker.go

Blame
  • worker.go 4.44 KiB
    package jobqueue
    
    import (
    	"context"
    	"fmt"
    	"github.com/google/uuid"
    	"sync"
    	"time"
    )
    
    type WorkerStatus int
    
    const (
    	WorkerStatusStopped = iota
    	WorkerStatusRunning
    )
    
    type WorkerID string
    
    func (id WorkerID) String() string {
    	return string(id)
    }
    
    // Worker is a worker
    type Worker interface {
    	Start() error
    	Stop() error
    	Status() WorkerStatus
    	AssignJob(job GenericJob) error
    
    	GetID() WorkerID
    
    	SetManager(manager *Manager)
    }
    
    // GenericWorker is a generic worker
    type GenericWorker struct {
    	ID     WorkerID
    	status WorkerStatus
    }
    
    // LocalWorker is a worker that runs jobs locally
    type LocalWorker struct {
    	GenericWorker
    	jobChannels []chan GenericJob
    	stopChans   []chan bool
    	cancelChans []chan bool
    	maxJobs     int
    	mu          sync.Mutex
    	wg          sync.WaitGroup
    	manager     *Manager
    }
    
    // GetID returns the ID of the worker
    func (w *GenericWorker) GetID() WorkerID {
    	return w.ID
    }
    
    // NewLocalWorker creates a new local worker
    func NewLocalWorker(maxJobs int) *LocalWorker {
    	w := &LocalWorker{maxJobs: maxJobs}
    	w.jobChannels = make([]chan GenericJob, maxJobs)
    	w.stopChans = make([]chan bool, maxJobs)
    	w.cancelChans = make([]chan bool, maxJobs)
    	w.ID = WorkerID(uuid.New().String())
    	return w
    }
    
    // Start starts the worker
    func (w *LocalWorker) Start() error {
    	w.mu.Lock()
    	defer w.mu.Unlock()
    
    	if w.status == WorkerStatusRunning {
    		return ErrWorkerAlreadyRunning
    	}
    
    	for i := 0; i < w.maxJobs; i++ {
    		w.wg.Add(1)
    		w.jobChannels[i] = make(chan GenericJob)
    		w.stopChans[i] = make(chan bool)
    		w.cancelChans[i] = make(chan bool)
    		go w.run(w.jobChannels[i], w.stopChans[i], w.cancelChans[i])
    	}
    
    	time.Sleep(200 * time.Millisecond) // wait go routine until select
    	w.wg.Wait()
    	w.status = WorkerStatusRunning
    
    	if w.manager != nil && w.manager.logger != nil {
    		w.manager.logger.Info("Worker started", "worker", w.ID)
    	}
    
    	return nil
    }
    
    func (w *LocalWorker) SetManager(manager *Manager) {
    	w.mu.Lock()
    	defer w.mu.Unlock()
    	w.manager = manager
    }
    
    // Stop stops the worker
    func (w *LocalWorker) Stop() error {
    	w.mu.Lock()
    	defer w.mu.Unlock()
    
    	if w.status == WorkerStatusStopped {
    		return ErrWorkerNotRunning
    	}
    
    	w.status = WorkerStatusStopped
    	for _, stopChan := range w.stopChans {
    		stopChan <- true
    	}
    
    	if w.manager != nil && w.manager.logger != nil {
    		w.manager.logger.Info("Worker stopped", "worker", w.ID)
    	}
    
    	return nil
    }
    
    func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancelChan chan bool) {
    
    	workerThreadID := w.ID.String() + "-" + fmt.Sprintf("%p", &w)
    
    	if w.manager != nil && w.manager.logger != nil {
    		w.manager.logger.Info("Worker thread with id started", "worker", w.ID, "thread_id", workerThreadID)
    	}
    
    	stopFlag := false
    	w.wg.Done()
    
    	for {
    
    		select {
    		case job := <-jobChannel:
    			ctx, cancel := context.WithCancel(context.Background())
    			retries := job.GetMaxRetries()
    			retryDelay := job.GetRetryDelay()
    
    			if retries == 0 {
    				retries = 1
    			}
    
    			var err error
    			for retries > 0 {
    
    				var cancel context.CancelFunc
    
    				timeout := job.GetTimeout()
    				if timeout > 0 {
    					ctx, cancel = context.WithTimeout(ctx, timeout)
    				}
    
    				if w.manager != nil && w.manager.logger != nil {
    					w.manager.logger.Info("Executing job on worker thread", "worker", w.ID, "thread_id", workerThreadID, "job_id", job.GetID())
    				}
    
    				_, err = job.Execute(ctx)
    
    				if cancel != nil {
    					cancel()
    				}
    
    				if err == nil || ctx.Err() == context.Canceled {
    					break
    				}
    
    				if retryDelay > 0 {
    					time.Sleep(retryDelay)
    				}
    
    				retries--
    			}
    
    			cancel()
    
    			if w.manager != nil && w.manager.dbSaver != nil {
    				err = w.manager.dbSaver.SaveJob(job)
    				if err != nil {
    					if w.manager.logger != nil {
    						w.manager.logger.Error("Error while saving job", "job_id", job.GetID(), "error", err)
    					}
    				}
    			}
    
    		case <-stopChan:
    			stopFlag = true
    			break
    		}
    
    		if stopFlag {
    			break
    		}
    	}
    
    	if w.manager != nil && w.manager.logger != nil {
    		w.manager.logger.Info("Worker thread with id stopped", "worker", w.ID, "thread_id", workerThreadID)
    	}
    
    }
    
    // AssignJob assigns a job to the worker
    func (w *LocalWorker) AssignJob(job GenericJob) error {
    	w.mu.Lock()
    	defer w.mu.Unlock()
    
    	if w.status != WorkerStatusRunning {
    		return ErrWorkerNotRunning
    	}
    
    	for _, ch := range w.jobChannels {
    		select {
    		case ch <- job:
    			return nil
    		default:
    			continue
    		}
    	}
    
    	return ErrMaxJobsReached
    }
    
    // Status returns the status of the worker
    func (w *LocalWorker) Status() WorkerStatus {
    	w.mu.Lock()
    	defer w.mu.Unlock()
    	return w.status
    }