package jobqueue import ( "context" "os" "sync" "time" ) type JobID string // String returns the string representation of a JobID func (id JobID) String() string { return string(id) } // Priority is the priority of a job type Priority int const ( PriorityLow Priority = iota PriorityDefault PriorityHigh PriorityCritical ) // Job is a job that can be executed type Job[T any] struct { id JobID priority Priority timeout time.Duration maxRetries uint RetryDelay time.Duration scheduler Scheduler dependencies []JobID mu sync.Mutex runner Runnable[T] stats JobStats logs []JobLog } // NewJob creates a new job with the given id and runner func NewJob[T any](id JobID, runner Runnable[T]) *Job[T] { return &Job[T]{ id: id, runner: runner, priority: PriorityDefault, logs: make([]JobLog, 0), stats: JobStats{}, } } // GetLogs returns the logs of the job func (j *Job[T]) GetLogs() []JobLog { j.mu.Lock() defer j.mu.Unlock() logs := j.logs j.logs = make([]JobLog, 0) return logs } // GetStats returns the stats of the job func (j *Job[T]) GetStats() JobStats { j.mu.Lock() defer j.mu.Unlock() // workaround for gorm j.stats.JobID = j.id return j.stats } // GetPersistence returns the persistence of the job // and clears the logs. After this call, the logs are // no longer available in the job. func (j *Job[T]) GetPersistence() JobPersistence { j.mu.Lock() defer j.mu.Unlock() job := JobPersistence{ ID: j.id, Priority: j.priority, Timeout: j.timeout, MaxRetries: j.maxRetries, RetryDelay: j.RetryDelay, Dependencies: j.dependencies, Runnable: j.runner.GetPersistence(), Logs: j.logs, Stats: j.stats, } if j.scheduler != nil { job.Scheduler = j.scheduler.GetPersistence() } job.Stats.JobID = job.ID for i, _ := range job.Logs { job.Logs[i].JobID = job.ID } // Clear logs j.logs = make([]JobLog, 0) return job } // SetScheduler sets the scheduler of the job func (j *Job[T]) SetScheduler(scheduler Scheduler) { j.mu.Lock() defer j.mu.Unlock() j.scheduler = scheduler } // GetScheduler returns the scheduler of the job func (j *Job[T]) GetScheduler() Scheduler { j.mu.Lock() defer j.mu.Unlock() return j.scheduler } // Execute executes the job func (j *Job[T]) Execute(ctx context.Context) (RunGenericResult, error) { startTime := time.Now() r, runnerError := j.runner.Run(ctx) endTime := time.Now() elapsedTime := endTime.Sub(startTime) j.mu.Lock() defer j.mu.Unlock() // Update RunCount j.stats.RunCount++ // Update TimeMetrics j.stats.TimeMetrics.TotalRunTime += elapsedTime if j.stats.TimeMetrics.MinRunTime == 0 || elapsedTime < j.stats.TimeMetrics.MinRunTime { j.stats.TimeMetrics.MinRunTime = elapsedTime } if elapsedTime > j.stats.TimeMetrics.MaxRunTime { j.stats.TimeMetrics.MaxRunTime = elapsedTime } j.stats.TimeMetrics.AvgRunTime = j.stats.TimeMetrics.TotalRunTime / time.Duration(j.stats.RunCount) // Update SuccessCount or ErrorCount and codes if runnerError == nil { j.stats.SuccessCount++ } else { j.stats.ErrorCount++ } newLog := JobLog{ StartTime: startTime, } if runnerError == nil { newLog.IsSuccessful = true newLog.ExitCode = 0 } else { newLog.IsSuccessful = false newLog.ExitCode = 1 // Set to appropriate error code if applicable newLog.ErrorMsg = runnerError.Error() } newLog.StartTime = startTime newLog.EndTime = endTime newLog.ProcessID = os.Getpid() // Assuming you want the PID of the current process newLog.EndTime = time.Now() if runnerError != nil { newLog.ErrorMsg = runnerError.Error() } j.logs = append(j.logs, newLog) genericResult := RunGenericResult(r) return genericResult, runnerError } // Cancel cancels the job func (j *Job[T]) Cancel() error { return nil } // SetPriority sets the priority of the job func (j *Job[T]) SetPriority(priority Priority) *Job[T] { j.mu.Lock() defer j.mu.Unlock() j.priority = priority return j } // GetPriority returns the priority of the job func (j *Job[T]) GetPriority() Priority { j.mu.Lock() defer j.mu.Unlock() return j.priority } // SetTimeout sets the timeout of the job func (j *Job[T]) SetTimeout(timeout time.Duration) *Job[T] { j.mu.Lock() defer j.mu.Unlock() j.timeout = timeout return j } // GetTimeout returns the timeout of the job func (j *Job[T]) GetTimeout() time.Duration { j.mu.Lock() defer j.mu.Unlock() return j.timeout } // SetMaxRetries sets the max retries of the job func (j *Job[T]) SetMaxRetries(maxRetries uint) *Job[T] { j.mu.Lock() defer j.mu.Unlock() j.maxRetries = maxRetries return j } // GetMaxRetries returns the max retries of the job func (j *Job[T]) GetMaxRetries() uint { j.mu.Lock() defer j.mu.Unlock() return j.maxRetries } // SetRetryDelay sets the retry delay of the job func (j *Job[T]) SetRetryDelay(retryDelay time.Duration) *Job[T] { j.mu.Lock() defer j.mu.Unlock() j.RetryDelay = retryDelay return j } // GetRetryDelay returns the retry delay of the job func (j *Job[T]) GetRetryDelay() time.Duration { j.mu.Lock() defer j.mu.Unlock() return j.RetryDelay } // SetDependencies sets the dependencies of the job func (j *Job[T]) SetDependencies(dependencies []JobID) *Job[T] { j.mu.Lock() defer j.mu.Unlock() j.dependencies = dependencies return j } // AddDependency adds a dependency to the job func (j *Job[T]) AddDependency(dependency JobID) *Job[T] { j.mu.Lock() defer j.mu.Unlock() j.dependencies = append(j.dependencies, dependency) return j } // RemoveDependency removes the dependency of the job func (j *Job[T]) RemoveDependency(dependency JobID) *Job[T] { j.mu.Lock() defer j.mu.Unlock() for i, dep := range j.dependencies { if dep == dependency { j.dependencies = append(j.dependencies[:i], j.dependencies[i+1:]...) break } } return j } // GetDependencies returns the dependencies of the job func (j *Job[T]) GetDependencies() []JobID { j.mu.Lock() defer j.mu.Unlock() return j.dependencies } // GetID returns the id of the job func (j *Job[T]) GetID() JobID { j.mu.Lock() defer j.mu.Unlock() return j.id } // GetRunnable returns the runnable of the job func (j *Job[T]) GetRunnable() Runnable[T] { j.mu.Lock() defer j.mu.Unlock() return j.runner }