Skip to content
Snippets Groups Projects
Select Git revision
  • 7efff3740b24450abf6b658ba6fb8aeb0d82b05b
  • master default protected
  • v1.23.2
  • v1.23.1
  • v1.23.0
  • v1.22.0
  • v1.21.1
  • v1.21.0
  • v1.20.3
  • v1.20.2
  • v1.20.1
  • v1.20.0
  • v1.19.4
  • v1.19.3
  • v1.19.2
  • v1.19.1
  • v1.19.0
  • v1.18.2
  • v1.18.1
  • v1.18.0
  • v1.17.0
  • v1.16.1
22 results

worker.go

Blame
  • worker.go 3.09 KiB
    package jobqueue
    
    import (
    	"context"
    	"sync"
    	"time"
    )
    
    type WorkerStatus int
    
    const (
    	WorkerStatusStopped = iota
    	WorkerStatusRunning
    )
    
    type WorkerID string
    
    // Worker is a worker
    type Worker interface {
    	Start() error
    	Stop() error
    	Status() WorkerStatus
    	AssignJob(job GenericJob) error
    
    	GetID() WorkerID
    }
    
    // GenericWorker is a generic worker
    type GenericWorker struct {
    	ID     WorkerID
    	status WorkerStatus
    }
    
    // LocalWorker is a worker that runs jobs locally
    type LocalWorker struct {
    	GenericWorker
    	jobChannels []chan GenericJob
    	stopChans   []chan bool
    	cancelChans []chan bool
    	maxJobs     int
    	mu          sync.Mutex
    	wg          sync.WaitGroup
    }
    
    // GetID returns the ID of the worker
    func (w *GenericWorker) GetID() WorkerID {
    	return w.ID
    }
    
    // NewLocalWorker creates a new local worker
    func NewLocalWorker(maxJobs int) *LocalWorker {
    	w := &LocalWorker{maxJobs: maxJobs}
    	w.jobChannels = make([]chan GenericJob, maxJobs)
    	w.stopChans = make([]chan bool, maxJobs)
    	w.cancelChans = make([]chan bool, maxJobs)
    	return w
    }
    
    // Start starts the worker
    func (w *LocalWorker) Start() error {
    	w.mu.Lock()
    	defer w.mu.Unlock()
    
    	if w.status == WorkerStatusRunning {
    		return ErrWorkerAlreadyRunning
    	}
    
    	for i := 0; i < w.maxJobs; i++ {
    		w.wg.Add(1)
    		w.jobChannels[i] = make(chan GenericJob)