// Copyright 2023 schukai GmbH // SPDX-License-Identifier: AGPL-3.0 package jobqueue import ( "context" "os" "sync" "time" ) type JobID string // String returns the string representation of a JobID func (id JobID) String() string { return string(id) } type GetResultAndError interface { GetResult() string GetError() (string, int) } // Priority is the priority of a job type Priority int const ( PriorityLow Priority = iota PriorityDefault PriorityHigh PriorityCritical ) // Job is a job that can be executed type Job[T any] struct { id JobID description string priority Priority timeout *time.Duration maxRetries uint retryDelay *time.Duration scheduler Scheduler pause bool pauseReason string pauseUntil *time.Time dependencies []JobID mu sync.Mutex runner Runnable[T] stats *JobStats logs []JobLog } // NewJob creates a new job with the given id and runner func NewJob[T any](id JobID, runner Runnable[T]) *Job[T] { return &Job[T]{ id: id, runner: runner, priority: PriorityDefault, logs: make([]JobLog, 0), stats: &JobStats{}, } } // GetLogs returns the logs of the job func (j *Job[T]) GetLogs() []JobLog { j.mu.Lock() defer j.mu.Unlock() logs := j.logs j.logs = make([]JobLog, 0) return logs } // GetStats returns the stats of the job func (j *Job[T]) GetStats() JobStats { j.mu.Lock() defer j.mu.Unlock() // workaround for gorm if j.stats == nil { j.stats = &JobStats{} } j.stats.JobID = j.id return *j.stats } // GetPersistence returns the persistence of the job // and clears the logs. After this call, the logs are // no longer available in the job. func (j *Job[T]) GetPersistence() JobPersistence { j.mu.Lock() defer j.mu.Unlock() job := JobPersistence{ ID: j.id, Description: j.description, Priority: j.priority, Timeout: j.timeout, MaxRetries: j.maxRetries, RetryDelay: j.retryDelay, Dependencies: j.dependencies, Pause: j.pause, PauseReason: j.pauseReason, PauseUntil: j.pauseUntil, Logs: j.logs, Stats: j.stats, } if j.runner != nil { job.Runnable = j.runner.GetPersistence() } if j.scheduler != nil { job.Scheduler = j.scheduler.GetPersistence() } if job.Stats != nil { job.Stats.JobID = job.ID } for i := range job.Logs { job.Logs[i].JobID = job.ID } // Clear logs j.logs = make([]JobLog, 0) return job } // SetScheduler sets the scheduler of the job func (j *Job[T]) SetScheduler(scheduler Scheduler) { j.mu.Lock() defer j.mu.Unlock() j.scheduler = scheduler } // Pause pauses the job func (j *Job[T]) Pause() { j.mu.Lock() defer j.mu.Unlock() j.pause = true } // PauseUntil pauses the job until the given time func (j *Job[T]) PauseUntil(until time.Time) { j.mu.Lock() defer j.mu.Unlock() j.pause = true j.pauseUntil = &until } func (j *Job[T]) Resume() { j.mu.Lock() defer j.mu.Unlock() j.pause = false j.pauseUntil = nil } func (j *Job[T]) ResetStats() { j.mu.Lock() defer j.mu.Unlock() j.stats = &JobStats{ JobID: j.id, RunCount: 0, SuccessCount: 0, ErrorCount: 0, TimeMetrics: TimeMetrics{ TotalRunTime: 0, MinRunTime: 0, MaxRunTime: 0, AvgRunTime: 0, }, } } // IsPaused returns true if the job is paused func (j *Job[T]) IsPaused() bool { j.mu.Lock() defer j.mu.Unlock() if j.pause { if j.pauseUntil == nil || j.pauseUntil.IsZero() { return true } else { return j.pauseUntil.After(time.Now()) } } return false } // GetScheduler returns the scheduler of the job func (j *Job[T]) GetScheduler() Scheduler { j.mu.Lock() defer j.mu.Unlock() return j.scheduler } // Execute executes the job func (j *Job[T]) Execute(ctx context.Context) (RunGenericResult, error) { startTime := time.Now() r, runnerError := j.runner.Run(ctx) endTime := time.Now() elapsedTime := endTime.Sub(startTime) isSuccessful := runnerError == nil if r.Status != ResultStatusSuccess { isSuccessful = false } j.mu.Lock() defer j.mu.Unlock() // Update RunCount if j.stats == nil { j.stats = &JobStats{ JobID: j.id, } } j.stats.RunCount++ // Update TimeMetrics newTotalRunTime := j.stats.TimeMetrics.TotalRunTime + elapsedTime if newTotalRunTime > j.stats.TimeMetrics.TotalRunTime { // no overflow happened j.stats.TimeMetrics.TotalRunTime = newTotalRunTime } else { // set to max j.stats.TimeMetrics.TotalRunTime = time.Duration(^uint64(0) >> 1) } if j.stats.TimeMetrics.MinRunTime == 0 || elapsedTime < j.stats.TimeMetrics.MinRunTime { j.stats.TimeMetrics.MinRunTime = elapsedTime } if elapsedTime > j.stats.TimeMetrics.MaxRunTime { j.stats.TimeMetrics.MaxRunTime = elapsedTime } if j.stats.RunCount == 0 { j.stats.TimeMetrics.AvgRunTime = 0 } else { j.stats.TimeMetrics.AvgRunTime = j.stats.TimeMetrics.TotalRunTime / time.Duration(j.stats.RunCount) } if isSuccessful { j.stats.SuccessCount++ } else { j.stats.ErrorCount++ } newLog := JobLog{ StartTime: startTime, } if isSuccessful { newLog.IsSuccessful = true newLog.ExitCode = 0 } else { switch v := any(&r.Data).(type) { case GetResultAndError: newLog.ErrorMsg, newLog.ExitCode = v.GetError() } if newLog.ErrorMsg == "" { newLog.ErrorMsg = runnerError.Error() } newLog.IsSuccessful = false if newLog.ExitCode == 0 { newLog.ExitCode = DefaultErrorExitCode } } newLog.StartTime = startTime newLog.EndTime = endTime newLog.ProcessID = os.Getpid() // Assuming you want the PID of the current process newLog.EndTime = time.Now() switch v := any(&r.Data).(type) { case GetResultAndError: newLog.Result = v.GetResult() } j.logs = append(j.logs, newLog) genericResult := RunGenericResult(r) return genericResult, runnerError } // Cancel cancels the job, currently a no-op func (j *Job[T]) Cancel() error { return nil } // SetPriority sets the priority of the job func (j *Job[T]) SetPriority(priority Priority) *Job[T] { j.mu.Lock() defer j.mu.Unlock() j.priority = priority return j } // GetPriority returns the priority of the job func (j *Job[T]) GetPriority() Priority { j.mu.Lock() defer j.mu.Unlock() return j.priority } // SetTimeout sets the timeout of the job func (j *Job[T]) SetTimeout(timeout time.Duration) *Job[T] { j.mu.Lock() defer j.mu.Unlock() j.timeout = &timeout return j } // GetTimeout returns the timeout of the job func (j *Job[T]) GetTimeout() *time.Duration { j.mu.Lock() defer j.mu.Unlock() return j.timeout } // SetMaxRetries sets the max retries of the job func (j *Job[T]) SetMaxRetries(maxRetries uint) *Job[T] { j.mu.Lock() defer j.mu.Unlock() j.maxRetries = maxRetries return j } // GetMaxRetries returns the max retries of the job func (j *Job[T]) GetMaxRetries() uint { j.mu.Lock() defer j.mu.Unlock() return j.maxRetries } // SetRetryDelay sets the retry delay of the job func (j *Job[T]) SetRetryDelay(retryDelay time.Duration) *Job[T] { j.mu.Lock() defer j.mu.Unlock() j.retryDelay = &retryDelay return j } // GetRetryDelay returns the retry delay of the job func (j *Job[T]) GetRetryDelay() *time.Duration { j.mu.Lock() defer j.mu.Unlock() return j.retryDelay } // SetDependencies sets the dependencies of the job func (j *Job[T]) SetDependencies(dependencies []JobID) *Job[T] { j.mu.Lock() defer j.mu.Unlock() j.dependencies = dependencies return j } // AddDependency adds a dependency to the job func (j *Job[T]) AddDependency(dependency JobID) *Job[T] { j.mu.Lock() defer j.mu.Unlock() j.dependencies = append(j.dependencies, dependency) return j } // RemoveDependency removes the dependency of the job func (j *Job[T]) RemoveDependency(dependency JobID) *Job[T] { j.mu.Lock() defer j.mu.Unlock() for i, dep := range j.dependencies { if dep == dependency { j.dependencies = append(j.dependencies[:i], j.dependencies[i+1:]...) break } } return j } // GetDependencies returns the dependencies of the job func (j *Job[T]) GetDependencies() []JobID { j.mu.Lock() defer j.mu.Unlock() return j.dependencies } // GetID returns the id of the job func (j *Job[T]) GetID() JobID { j.mu.Lock() defer j.mu.Unlock() return j.id } // GetRunnable returns the runnable of the job func (j *Job[T]) GetRunnable() Runnable[T] { j.mu.Lock() defer j.mu.Unlock() return j.runner }