Skip to content
Snippets Groups Projects
Select Git revision
  • 9e4f7dc34d470a34272ab3f7eb89a8b18d5f8d3e
  • master default protected
  • v1.23.2
  • v1.23.1
  • v1.23.0
  • v1.22.0
  • v1.21.1
  • v1.21.0
  • v1.20.3
  • v1.20.2
  • v1.20.1
  • v1.20.0
  • v1.19.4
  • v1.19.3
  • v1.19.2
  • v1.19.1
  • v1.19.0
  • v1.18.2
  • v1.18.1
  • v1.18.0
  • v1.17.0
  • v1.16.1
22 results

worker.go

Blame
  • job.go 4.49 KiB
    package jobqueue
    
    import (
    	"context"
    	"os"
    	"sync"
    	"time"
    )
    
    type JobID string
    
    func (id JobID) String() string {
    	return string(id)
    }
    
    type Priority int
    
    const (
    	PriorityLow Priority = iota
    	PriorityDefault
    	PriorityHigh
    	PriorityCritical
    )
    
    type GenericJob interface {
    	GetID() JobID
    	GetDependencies() []JobID
    
    	GetPriority() Priority
    
    	Execute(ctx context.Context) (RunGenericResult, error)
    
    	Cancel() error
    
    	GetMaxRetries() uint
    
    	GetRetryDelay() time.Duration
    
    	GetTimeout() time.Duration
    }
    
    type Job[T any] struct {
    	id       JobID
    	priority Priority
    
    	timout     time.Duration
    	maxRetries uint
    	RetryDelay time.Duration
    
    	dependencies []JobID
    
    	mu sync.Mutex
    
    	runner Runnable[T]
    
    	stats JobStats
    	logs  []JobLog
    }
    
    // NewJob creates a new job with the given id and runner
    func NewJob[T any](id JobID, runner Runnable[T]) *Job[T] {
    	return &Job[T]{
    		id:       id,
    		runner:   runner,
    		priority: PriorityDefault,
    	}
    }
    
    // Execute executes the job
    func (j *Job[T]) Execute(ctx context.Context) (RunGenericResult, error) {
    	startTime := time.Now()
    	r, runnerError := j.runner.Run()
    	endTime := time.Now()
    	elapsedTime := endTime.Sub(startTime)
    
    	j.mu.Lock()
    	defer j.mu.Unlock()
    
    	// Update RunCount
    	j.stats.RunCount++
    
    	// Update TimeMetrics
    	j.stats.TimeMetrics.TotalRunTime += elapsedTime
    	if j.stats.TimeMetrics.MinRunTime == 0 || elapsedTime < j.stats.TimeMetrics.MinRunTime {
    		j.stats.TimeMetrics.MinRunTime = elapsedTime
    	}
    	if elapsedTime > j.stats.TimeMetrics.MaxRunTime {
    		j.stats.TimeMetrics.MaxRunTime = elapsedTime
    	}
    	j.stats.TimeMetrics.AvgRunTime = j.stats.TimeMetrics.TotalRunTime / time.Duration(j.stats.RunCount)
    
    	// Update SuccessCount or ErrorCount and codes
    	if runnerError == nil {
    		j.stats.SuccessCount++
    	} else {
    		j.stats.ErrorCount++
    	}
    
    	newLog := JobLog{
    		StartTime: startTime,
    	}
    
    	if runnerError == nil {
    		newLog.IsSuccessful = true
    		newLog.ExitCode = 0
    	} else {
    		newLog.IsSuccessful = false
    		newLog.ExitCode = 1 // Set to appropriate error code if applicable
    		newLog.ErrorMsg = runnerError.Error()
    	}
    
    	newLog.StartTime = startTime
    	newLog.EndTime = endTime
    	newLog.ProcessID = os.Getpid() // Assuming you want the PID of the current process
    
    	newLog.EndTime = time.Now()
    	if runnerError != nil {
    		newLog.ErrorMsg = runnerError.Error()
    	}
    
    	j.logs = append(j.logs, newLog)
    
    	genericResult := RunGenericResult(r)
    	return genericResult, runnerError
    }
    
    // Cancel cancels the job
    func (j *Job[T]) Cancel() error {
    	return nil
    }
    
    // SetPriority sets the priority of the job
    func (j *Job[T]) SetPriority(priority Priority) *Job[T] {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    
    	j.priority = priority
    	return j
    }
    
    func (j *Job[T]) GetPriority() Priority {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    
    	return j.priority
    }
    
    // SetTimeout sets the timeout of the job
    func (j *Job[T]) SetTimeout(timeout time.Duration) *Job[T] {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    
    	j.timout = timeout
    	return j
    }
    
    func (j *Job[T]) GetTimeout() time.Duration {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    
    	return j.timout
    }
    
    // SetMaxRetries sets the max retries of the job
    func (j *Job[T]) SetMaxRetries(maxRetries uint) *Job[T] {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    
    	j.maxRetries = maxRetries
    	return j
    }
    
    func (j *Job[T]) GetMaxRetries() uint {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    
    	return j.maxRetries
    }
    
    // SetRetryDelay sets the retry delay of the job
    func (j *Job[T]) SetRetryDelay(retryDelay time.Duration) *Job[T] {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    
    	j.RetryDelay = retryDelay
    	return j
    }
    
    func (j *Job[T]) GetRetryDelay() time.Duration {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    
    	return j.RetryDelay
    }
    
    // SetDependencies sets the dependencies of the job
    func (j *Job[T]) SetDependencies(dependencies []JobID) *Job[T] {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    
    	j.dependencies = dependencies
    	return j
    }
    
    func (j *Job[T]) AddDependency(dependency JobID) *Job[T] {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    
    	j.dependencies = append(j.dependencies, dependency)
    	return j
    }
    
    func (j *Job[T]) RemoveDependency(dependency JobID) *Job[T] {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    
    	for i, dep := range j.dependencies {
    		if dep == dependency {
    			j.dependencies = append(j.dependencies[:i], j.dependencies[i+1:]...)
    			break
    		}
    	}
    	return j
    }
    
    func (j *Job[T]) GetDependencies() []JobID {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    
    	return j.dependencies
    }
    
    // GetID returns the id of the job
    func (j *Job[T]) GetID() JobID {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    
    	return j.id
    }
    
    // GetRunnable returns the runnable of the job
    func (j *Job[T]) GetRunnable() Runnable[T] {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    	return j.runner
    }