Select Git revision
pnpm-lock.yaml
job.go 4.49 KiB
package jobqueue
import (
"context"
"os"
"sync"
"time"
)
type JobID string
func (id JobID) String() string {
return string(id)
}
type Priority int
const (
PriorityLow Priority = iota
PriorityDefault
PriorityHigh
PriorityCritical
)
type GenericJob interface {
GetID() JobID
GetDependencies() []JobID
GetPriority() Priority
Execute(ctx context.Context) (RunGenericResult, error)
Cancel() error
GetMaxRetries() uint
GetRetryDelay() time.Duration
GetTimeout() time.Duration
}
type Job[T any] struct {
id JobID
priority Priority
timout time.Duration
maxRetries uint
RetryDelay time.Duration
dependencies []JobID
mu sync.Mutex
runner Runnable[T]
stats JobStats
logs []JobLog
}
// NewJob creates a new job with the given id and runner
func NewJob[T any](id JobID, runner Runnable[T]) *Job[T] {
return &Job[T]{
id: id,
runner: runner,
priority: PriorityDefault,
}
}
// Execute executes the job
func (j *Job[T]) Execute(ctx context.Context) (RunGenericResult, error) {
startTime := time.Now()
r, runnerError := j.runner.Run()
endTime := time.Now()
elapsedTime := endTime.Sub(startTime)
j.mu.Lock()
defer j.mu.Unlock()
// Update RunCount
j.stats.RunCount++
// Update TimeMetrics
j.stats.TimeMetrics.TotalRunTime += elapsedTime
if j.stats.TimeMetrics.MinRunTime == 0 || elapsedTime < j.stats.TimeMetrics.MinRunTime {
j.stats.TimeMetrics.MinRunTime = elapsedTime
}
if elapsedTime > j.stats.TimeMetrics.MaxRunTime {
j.stats.TimeMetrics.MaxRunTime = elapsedTime
}
j.stats.TimeMetrics.AvgRunTime = j.stats.TimeMetrics.TotalRunTime / time.Duration(j.stats.RunCount)
// Update SuccessCount or ErrorCount and codes
if runnerError == nil {
j.stats.SuccessCount++
} else {
j.stats.ErrorCount++
}
newLog := JobLog{
StartTime: startTime,
}
if runnerError == nil {
newLog.IsSuccessful = true
newLog.ExitCode = 0
} else {
newLog.IsSuccessful = false
newLog.ExitCode = 1 // Set to appropriate error code if applicable
newLog.ErrorMsg = runnerError.Error()
}
newLog.StartTime = startTime
newLog.EndTime = endTime
newLog.ProcessID = os.Getpid() // Assuming you want the PID of the current process
newLog.EndTime = time.Now()
if runnerError != nil {
newLog.ErrorMsg = runnerError.Error()
}
j.logs = append(j.logs, newLog)
genericResult := RunGenericResult(r)
return genericResult, runnerError
}
// Cancel cancels the job
func (j *Job[T]) Cancel() error {
return nil
}
// SetPriority sets the priority of the job
func (j *Job[T]) SetPriority(priority Priority) *Job[T] {
j.mu.Lock()
defer j.mu.Unlock()
j.priority = priority
return j
}
func (j *Job[T]) GetPriority() Priority {
j.mu.Lock()
defer j.mu.Unlock()
return j.priority
}
// SetTimeout sets the timeout of the job
func (j *Job[T]) SetTimeout(timeout time.Duration) *Job[T] {
j.mu.Lock()
defer j.mu.Unlock()
j.timout = timeout
return j
}
func (j *Job[T]) GetTimeout() time.Duration {
j.mu.Lock()
defer j.mu.Unlock()
return j.timout
}
// SetMaxRetries sets the max retries of the job
func (j *Job[T]) SetMaxRetries(maxRetries uint) *Job[T] {
j.mu.Lock()
defer j.mu.Unlock()
j.maxRetries = maxRetries
return j
}
func (j *Job[T]) GetMaxRetries() uint {
j.mu.Lock()
defer j.mu.Unlock()
return j.maxRetries
}
// SetRetryDelay sets the retry delay of the job
func (j *Job[T]) SetRetryDelay(retryDelay time.Duration) *Job[T] {
j.mu.Lock()
defer j.mu.Unlock()
j.RetryDelay = retryDelay
return j
}
func (j *Job[T]) GetRetryDelay() time.Duration {
j.mu.Lock()
defer j.mu.Unlock()
return j.RetryDelay
}
// SetDependencies sets the dependencies of the job
func (j *Job[T]) SetDependencies(dependencies []JobID) *Job[T] {
j.mu.Lock()
defer j.mu.Unlock()
j.dependencies = dependencies
return j
}
func (j *Job[T]) AddDependency(dependency JobID) *Job[T] {
j.mu.Lock()
defer j.mu.Unlock()
j.dependencies = append(j.dependencies, dependency)
return j
}
func (j *Job[T]) RemoveDependency(dependency JobID) *Job[T] {
j.mu.Lock()
defer j.mu.Unlock()
for i, dep := range j.dependencies {
if dep == dependency {
j.dependencies = append(j.dependencies[:i], j.dependencies[i+1:]...)
break
}
}
return j
}
func (j *Job[T]) GetDependencies() []JobID {
j.mu.Lock()
defer j.mu.Unlock()
return j.dependencies
}
// GetID returns the id of the job
func (j *Job[T]) GetID() JobID {
j.mu.Lock()
defer j.mu.Unlock()
return j.id
}
// GetRunnable returns the runnable of the job
func (j *Job[T]) GetRunnable() Runnable[T] {
j.mu.Lock()
defer j.mu.Unlock()
return j.runner
}