Something went wrong on our end
Select Git revision
project.properties
-
Marina Dali authoredMarina Dali authored
job.go 8.12 KiB
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
"context"
"os"
"sync"
"time"
)
type JobID string
// String returns the string representation of a JobID
func (id JobID) String() string {
return string(id)
}
type GetResultAndError interface {
GetResult() string
GetError() (string, int)
}
// Priority is the priority of a job
type Priority int
const (
PriorityLow Priority = iota
PriorityDefault
PriorityHigh
PriorityCritical
)
// Job is a job that can be executed
type Job[T any] struct {
id JobID
description string
priority Priority
timeout *time.Duration
maxRetries uint
retryDelay *time.Duration
scheduler Scheduler
pause bool
pauseReason string
pauseUntil *time.Time
dependencies []JobID
mu sync.Mutex
runner Runnable[T]
stats *JobStats
logs []JobLog
}
// NewJob creates a new job with the given id and runner
func NewJob[T any](id JobID, runner Runnable[T]) *Job[T] {
return &Job[T]{
id: id,
runner: runner,
priority: PriorityDefault,
logs: make([]JobLog, 0),
stats: &JobStats{},
}
}
// GetLogs returns the logs of the job
func (j *Job[T]) GetLogs() []JobLog {
j.mu.Lock()
defer j.mu.Unlock()
logs := j.logs
j.logs = make([]JobLog, 0)
return logs
}
// GetStats returns the stats of the job
func (j *Job[T]) GetStats() JobStats {
j.mu.Lock()
defer j.mu.Unlock()
// workaround for gorm
if j.stats == nil {
j.stats = &JobStats{}
}
j.stats.JobID = j.id
return *j.stats
}
// GetPersistence returns the persistence of the job
// and clears the logs. After this call, the logs are
// no longer available in the job.
func (j *Job[T]) GetPersistence() JobPersistence {
j.mu.Lock()
defer j.mu.Unlock()
job := JobPersistence{
ID: j.id,
Description: j.description,
Priority: j.priority,
Timeout: j.timeout,
MaxRetries: j.maxRetries,
RetryDelay: j.retryDelay,
Dependencies: j.dependencies,
Runnable: j.runner.GetPersistence(),
Pause: j.pause,
PauseReason: j.pauseReason,
PauseUntil: j.pauseUntil,
Logs: j.logs,
Stats: j.stats,
}
if j.scheduler != nil {
job.Scheduler = j.scheduler.GetPersistence()
}
if job.Stats != nil {
job.Stats.JobID = job.ID
}
for i := range job.Logs {
job.Logs[i].JobID = job.ID
}
// Clear logs
j.logs = make([]JobLog, 0)
return job
}
// SetScheduler sets the scheduler of the job
func (j *Job[T]) SetScheduler(scheduler Scheduler) {
j.mu.Lock()
defer j.mu.Unlock()
j.scheduler = scheduler
}
// Pause pauses the job
func (j *Job[T]) Pause() {
j.mu.Lock()
defer j.mu.Unlock()
j.pause = true
}
// PauseUntil pauses the job until the given time
func (j *Job[T]) PauseUntil(until time.Time) {
j.mu.Lock()
defer j.mu.Unlock()
j.pause = true
j.pauseUntil = &until
}
func (j *Job[T]) Resume() {
j.mu.Lock()
defer j.mu.Unlock()
j.pause = false
j.pauseUntil = nil
}
func (j *Job[T]) ResetStats() {
j.mu.Lock()
defer j.mu.Unlock()
j.stats = &JobStats{
JobID: j.id,
RunCount: 0,
SuccessCount: 0,
ErrorCount: 0,
TimeMetrics: TimeMetrics{
TotalRunTime: 0,
MinRunTime: 0,
MaxRunTime: 0,
AvgRunTime: 0,
},
}
}
// IsPaused returns true if the job is paused
func (j *Job[T]) IsPaused() bool {
j.mu.Lock()
defer j.mu.Unlock()
if j.pause {
if j.pauseUntil == nil || j.pauseUntil.IsZero() {
return true
} else {
return j.pauseUntil.After(time.Now())
}
}
return false
}
// GetScheduler returns the scheduler of the job
func (j *Job[T]) GetScheduler() Scheduler {
j.mu.Lock()
defer j.mu.Unlock()
return j.scheduler
}
// Execute executes the job
func (j *Job[T]) Execute(ctx context.Context) (RunGenericResult, error) {
startTime := time.Now()
r, runnerError := j.runner.Run(ctx)
endTime := time.Now()
elapsedTime := endTime.Sub(startTime)
isSuccessful := runnerError == nil
if r.Status != ResultStatusSuccess {
isSuccessful = false
}
j.mu.Lock()
defer j.mu.Unlock()
// Update RunCount
if j.stats == nil {
j.stats = &JobStats{
JobID: j.id,
}
}
j.stats.RunCount++
// Update TimeMetrics
newTotalRunTime := j.stats.TimeMetrics.TotalRunTime + elapsedTime
if newTotalRunTime > j.stats.TimeMetrics.TotalRunTime { // no overflow happened
j.stats.TimeMetrics.TotalRunTime = newTotalRunTime
} else {
// set to max
j.stats.TimeMetrics.TotalRunTime = time.Duration(^uint64(0) >> 1)
}
if j.stats.TimeMetrics.MinRunTime == 0 || elapsedTime < j.stats.TimeMetrics.MinRunTime {
j.stats.TimeMetrics.MinRunTime = elapsedTime
}
if elapsedTime > j.stats.TimeMetrics.MaxRunTime {
j.stats.TimeMetrics.MaxRunTime = elapsedTime
}
if j.stats.RunCount == 0 {
j.stats.TimeMetrics.AvgRunTime = 0
} else {
j.stats.TimeMetrics.AvgRunTime = j.stats.TimeMetrics.TotalRunTime / time.Duration(j.stats.RunCount)
}
if isSuccessful {
j.stats.SuccessCount++
} else {
j.stats.ErrorCount++
}
newLog := JobLog{
StartTime: startTime,
}
if isSuccessful {
newLog.IsSuccessful = true
newLog.ExitCode = 0
} else {
switch v := any(&r.Data).(type) {
case GetResultAndError:
newLog.ErrorMsg, newLog.ExitCode = v.GetError()
}
if newLog.ErrorMsg == "" {
newLog.ErrorMsg = runnerError.Error()
}
newLog.IsSuccessful = false
if newLog.ExitCode == 0 {
newLog.ExitCode = DefaultErrorExitCode
}
}
newLog.StartTime = startTime
newLog.EndTime = endTime
newLog.ProcessID = os.Getpid() // Assuming you want the PID of the current process
newLog.EndTime = time.Now()
switch v := any(&r.Data).(type) {
case GetResultAndError:
newLog.Result = v.GetResult()
}
j.logs = append(j.logs, newLog)
genericResult := RunGenericResult(r)
return genericResult, runnerError
}
// Cancel cancels the job, currently a no-op
func (j *Job[T]) Cancel() error {
return nil
}
// SetPriority sets the priority of the job
func (j *Job[T]) SetPriority(priority Priority) *Job[T] {
j.mu.Lock()
defer j.mu.Unlock()
j.priority = priority
return j
}
// GetPriority returns the priority of the job
func (j *Job[T]) GetPriority() Priority {
j.mu.Lock()
defer j.mu.Unlock()
return j.priority
}
// SetTimeout sets the timeout of the job
func (j *Job[T]) SetTimeout(timeout time.Duration) *Job[T] {
j.mu.Lock()
defer j.mu.Unlock()
j.timeout = &timeout
return j
}
// GetTimeout returns the timeout of the job
func (j *Job[T]) GetTimeout() *time.Duration {
j.mu.Lock()
defer j.mu.Unlock()
return j.timeout
}
// SetMaxRetries sets the max retries of the job
func (j *Job[T]) SetMaxRetries(maxRetries uint) *Job[T] {
j.mu.Lock()
defer j.mu.Unlock()
j.maxRetries = maxRetries
return j
}
// GetMaxRetries returns the max retries of the job
func (j *Job[T]) GetMaxRetries() uint {
j.mu.Lock()
defer j.mu.Unlock()
return j.maxRetries
}
// SetRetryDelay sets the retry delay of the job
func (j *Job[T]) SetRetryDelay(retryDelay time.Duration) *Job[T] {
j.mu.Lock()
defer j.mu.Unlock()
j.retryDelay = &retryDelay
return j
}
// GetRetryDelay returns the retry delay of the job
func (j *Job[T]) GetRetryDelay() *time.Duration {
j.mu.Lock()
defer j.mu.Unlock()
return j.retryDelay
}
// SetDependencies sets the dependencies of the job
func (j *Job[T]) SetDependencies(dependencies []JobID) *Job[T] {
j.mu.Lock()
defer j.mu.Unlock()
j.dependencies = dependencies
return j
}
// AddDependency adds a dependency to the job
func (j *Job[T]) AddDependency(dependency JobID) *Job[T] {
j.mu.Lock()
defer j.mu.Unlock()
j.dependencies = append(j.dependencies, dependency)
return j
}
// RemoveDependency removes the dependency of the job
func (j *Job[T]) RemoveDependency(dependency JobID) *Job[T] {
j.mu.Lock()
defer j.mu.Unlock()
for i, dep := range j.dependencies {
if dep == dependency {
j.dependencies = append(j.dependencies[:i], j.dependencies[i+1:]...)
break
}
}
return j
}
// GetDependencies returns the dependencies of the job
func (j *Job[T]) GetDependencies() []JobID {
j.mu.Lock()
defer j.mu.Unlock()
return j.dependencies
}
// GetID returns the id of the job
func (j *Job[T]) GetID() JobID {
j.mu.Lock()
defer j.mu.Unlock()
return j.id
}
// GetRunnable returns the runnable of the job
func (j *Job[T]) GetRunnable() Runnable[T] {
j.mu.Lock()
defer j.mu.Unlock()
return j.runner
}