Skip to content
Snippets Groups Projects
Select Git revision
  • ba5135276d252fd9905fcd6c2e0dc4c6706dd0c9
  • master default protected
  • 1.31
  • 4.28.0
  • 4.27.0
  • 4.26.0
  • 4.25.5
  • 4.25.4
  • 4.25.3
  • 4.25.2
  • 4.25.1
  • 4.25.0
  • 4.24.3
  • 4.24.2
  • 4.24.1
  • 4.24.0
  • 4.23.6
  • 4.23.5
  • 4.23.4
  • 4.23.3
  • 4.23.2
  • 4.23.1
  • 4.23.0
23 results

project.properties

Blame
  • job.go 8.12 KiB
    // Copyright 2023 schukai GmbH
    // SPDX-License-Identifier: AGPL-3.0
    
    package jobqueue
    
    import (
    	"context"
    	"os"
    	"sync"
    	"time"
    )
    
    type JobID string
    
    // String returns the string representation of a JobID
    func (id JobID) String() string {
    	return string(id)
    }
    
    type GetResultAndError interface {
    	GetResult() string
    	GetError() (string, int)
    }
    
    // Priority is the priority of a job
    type Priority int
    
    const (
    	PriorityLow Priority = iota
    	PriorityDefault
    	PriorityHigh
    	PriorityCritical
    )
    
    // Job is a job that can be executed
    type Job[T any] struct {
    	id          JobID
    	description string
    	priority    Priority
    
    	timeout    *time.Duration
    	maxRetries uint
    	retryDelay *time.Duration
    
    	scheduler Scheduler
    
    	pause       bool
    	pauseReason string
    	pauseUntil  *time.Time
    
    	dependencies []JobID
    
    	mu sync.Mutex
    
    	runner Runnable[T]
    
    	stats *JobStats
    	logs  []JobLog
    }
    
    // NewJob creates a new job with the given id and runner
    func NewJob[T any](id JobID, runner Runnable[T]) *Job[T] {
    	return &Job[T]{
    		id:       id,
    		runner:   runner,
    		priority: PriorityDefault,
    		logs:     make([]JobLog, 0),
    		stats:    &JobStats{},
    	}
    }
    
    // GetLogs returns the logs of the job
    func (j *Job[T]) GetLogs() []JobLog {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    	logs := j.logs
    	j.logs = make([]JobLog, 0)
    	return logs
    }
    
    // GetStats returns the stats of the job
    func (j *Job[T]) GetStats() JobStats {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    	// workaround for gorm
    	if j.stats == nil {
    		j.stats = &JobStats{}
    	}
    
    	j.stats.JobID = j.id
    	return *j.stats
    }
    
    // GetPersistence returns the persistence of the job
    // and clears the logs. After this call, the logs are
    // no longer available in the job.
    func (j *Job[T]) GetPersistence() JobPersistence {
    
    	j.mu.Lock()
    	defer j.mu.Unlock()
    
    	job := JobPersistence{
    		ID:           j.id,
    		Description:  j.description,
    		Priority:     j.priority,
    		Timeout:      j.timeout,
    		MaxRetries:   j.maxRetries,
    		RetryDelay:   j.retryDelay,
    		Dependencies: j.dependencies,
    		Runnable:     j.runner.GetPersistence(),
    
    		Pause:       j.pause,
    		PauseReason: j.pauseReason,
    		PauseUntil:  j.pauseUntil,
    
    		Logs:  j.logs,
    		Stats: j.stats,
    	}
    
    	if j.scheduler != nil {
    		job.Scheduler = j.scheduler.GetPersistence()
    	}
    
    	if job.Stats != nil {
    		job.Stats.JobID = job.ID
    	}
    
    	for i := range job.Logs {
    		job.Logs[i].JobID = job.ID
    	}
    
    	// Clear logs
    	j.logs = make([]JobLog, 0)
    
    	return job
    
    }
    
    // SetScheduler sets the scheduler of the job
    func (j *Job[T]) SetScheduler(scheduler Scheduler) {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    	j.scheduler = scheduler
    }
    
    // Pause pauses the job
    func (j *Job[T]) Pause() {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    	j.pause = true
    
    }
    
    // PauseUntil pauses the job until the given time
    func (j *Job[T]) PauseUntil(until time.Time) {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    	j.pause = true
    	j.pauseUntil = &until
    
    }
    
    func (j *Job[T]) Resume() {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    	j.pause = false
    	j.pauseUntil = nil
    
    }
    
    func (j *Job[T]) ResetStats() {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    	j.stats = &JobStats{
    		JobID:        j.id,
    		RunCount:     0,
    		SuccessCount: 0,
    		ErrorCount:   0,
    		TimeMetrics: TimeMetrics{
    			TotalRunTime: 0,
    			MinRunTime:   0,
    			MaxRunTime:   0,
    			AvgRunTime:   0,
    		},
    	}
    }
    
    // IsPaused returns true if the job is paused
    func (j *Job[T]) IsPaused() bool {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    	if j.pause {
    		if j.pauseUntil == nil || j.pauseUntil.IsZero() {
    			return true
    		} else {
    			return j.pauseUntil.After(time.Now())
    		}
    	}
    	return false
    }
    
    // GetScheduler returns the scheduler of the job
    func (j *Job[T]) GetScheduler() Scheduler {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    	return j.scheduler
    }
    
    // Execute executes the job
    func (j *Job[T]) Execute(ctx context.Context) (RunGenericResult, error) {
    	startTime := time.Now()
    	r, runnerError := j.runner.Run(ctx)
    	endTime := time.Now()
    	elapsedTime := endTime.Sub(startTime)
    
    	isSuccessful := runnerError == nil
    	if r.Status != ResultStatusSuccess {
    		isSuccessful = false
    	}
    
    	j.mu.Lock()
    	defer j.mu.Unlock()
    
    	// Update RunCount
    	if j.stats == nil {
    		j.stats = &JobStats{
    			JobID: j.id,
    		}
    	}
    
    	j.stats.RunCount++
    
    	// Update TimeMetrics
    	newTotalRunTime := j.stats.TimeMetrics.TotalRunTime + elapsedTime
    	if newTotalRunTime > j.stats.TimeMetrics.TotalRunTime { // no overflow happened
    		j.stats.TimeMetrics.TotalRunTime = newTotalRunTime
    	} else {
    		// set to max
    		j.stats.TimeMetrics.TotalRunTime = time.Duration(^uint64(0) >> 1)
    	}
    
    	if j.stats.TimeMetrics.MinRunTime == 0 || elapsedTime < j.stats.TimeMetrics.MinRunTime {
    		j.stats.TimeMetrics.MinRunTime = elapsedTime
    	}
    
    	if elapsedTime > j.stats.TimeMetrics.MaxRunTime {
    		j.stats.TimeMetrics.MaxRunTime = elapsedTime
    	}
    
    	if j.stats.RunCount == 0 {
    		j.stats.TimeMetrics.AvgRunTime = 0
    	} else {
    		j.stats.TimeMetrics.AvgRunTime = j.stats.TimeMetrics.TotalRunTime / time.Duration(j.stats.RunCount)
    	}
    
    	if isSuccessful {
    		j.stats.SuccessCount++
    	} else {
    		j.stats.ErrorCount++
    	}
    
    	newLog := JobLog{
    		StartTime: startTime,
    	}
    
    	if isSuccessful {
    		newLog.IsSuccessful = true
    		newLog.ExitCode = 0
    	} else {
    		switch v := any(&r.Data).(type) {
    		case GetResultAndError:
    			newLog.ErrorMsg, newLog.ExitCode = v.GetError()
    		}
    
    		if newLog.ErrorMsg == "" {
    			newLog.ErrorMsg = runnerError.Error()
    		}
    
    		newLog.IsSuccessful = false
    		if newLog.ExitCode == 0 {
    			newLog.ExitCode = DefaultErrorExitCode
    		}
    	}
    
    	newLog.StartTime = startTime
    	newLog.EndTime = endTime
    	newLog.ProcessID = os.Getpid() // Assuming you want the PID of the current process
    
    	newLog.EndTime = time.Now()
    
    	switch v := any(&r.Data).(type) {
    	case GetResultAndError:
    		newLog.Result = v.GetResult()
    	}
    
    	j.logs = append(j.logs, newLog)
    
    	genericResult := RunGenericResult(r)
    	return genericResult, runnerError
    }
    
    // Cancel cancels the job, currently a no-op
    func (j *Job[T]) Cancel() error {
    	return nil
    }
    
    // SetPriority sets the priority of the job
    func (j *Job[T]) SetPriority(priority Priority) *Job[T] {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    
    	j.priority = priority
    	return j
    }
    
    // GetPriority returns the priority of the job
    func (j *Job[T]) GetPriority() Priority {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    
    	return j.priority
    }
    
    // SetTimeout sets the timeout of the job
    func (j *Job[T]) SetTimeout(timeout time.Duration) *Job[T] {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    
    	j.timeout = &timeout
    	return j
    }
    
    // GetTimeout returns the timeout of the job
    func (j *Job[T]) GetTimeout() *time.Duration {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    
    	return j.timeout
    }
    
    // SetMaxRetries sets the max retries of the job
    func (j *Job[T]) SetMaxRetries(maxRetries uint) *Job[T] {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    
    	j.maxRetries = maxRetries
    	return j
    }
    
    // GetMaxRetries returns the max retries of the job
    func (j *Job[T]) GetMaxRetries() uint {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    
    	return j.maxRetries
    }
    
    // SetRetryDelay sets the retry delay of the job
    func (j *Job[T]) SetRetryDelay(retryDelay time.Duration) *Job[T] {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    
    	j.retryDelay = &retryDelay
    	return j
    }
    
    // GetRetryDelay returns the retry delay of the job
    func (j *Job[T]) GetRetryDelay() *time.Duration {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    
    	return j.retryDelay
    }
    
    // SetDependencies sets the dependencies of the job
    func (j *Job[T]) SetDependencies(dependencies []JobID) *Job[T] {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    
    	j.dependencies = dependencies
    	return j
    }
    
    // AddDependency adds a dependency to the job
    func (j *Job[T]) AddDependency(dependency JobID) *Job[T] {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    
    	j.dependencies = append(j.dependencies, dependency)
    	return j
    }
    
    // RemoveDependency removes the dependency of the job
    func (j *Job[T]) RemoveDependency(dependency JobID) *Job[T] {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    
    	for i, dep := range j.dependencies {
    		if dep == dependency {
    			j.dependencies = append(j.dependencies[:i], j.dependencies[i+1:]...)
    			break
    		}
    	}
    	return j
    }
    
    // GetDependencies returns the dependencies of the job
    func (j *Job[T]) GetDependencies() []JobID {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    	return j.dependencies
    }
    
    // GetID returns the id of the job
    func (j *Job[T]) GetID() JobID {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    	return j.id
    }
    
    // GetRunnable returns the runnable of the job
    func (j *Job[T]) GetRunnable() Runnable[T] {
    	j.mu.Lock()
    	defer j.mu.Unlock()
    	return j.runner
    }