diff --git a/errors.go b/errors.go index a159e2192f42c95797059afbe7babda48e42a98d..69b2f3e829df45b86db2d4a8276e328d7d15d51d 100644 --- a/errors.go +++ b/errors.go @@ -43,4 +43,5 @@ var ( ErrChannelAlreadyClosed = fmt.Errorf("channel is already closed") ErrUnknownScheduleType = fmt.Errorf("unknown schedule type") ErrNoManager = fmt.Errorf("no manager") + ErrCannotLoadStatsFromDatabase = fmt.Errorf("errors while loading stats from database") ) diff --git a/job.go b/job.go index 59777b7923cfe0ab6141bd65de344ebf36586ca2..5b7a8a6e195d6ae708e2e11d8dd086d28d32a095 100644 --- a/job.go +++ b/job.go @@ -26,8 +26,9 @@ const ( // Job is a job that can be executed type Job[T any] struct { - id JobID - priority Priority + id JobID + description string + priority Priority timeout time.Duration maxRetries uint @@ -35,8 +36,9 @@ type Job[T any] struct { scheduler Scheduler - pause bool - pauseUntil time.Time + pause bool + pauseReason string + pauseUntil time.Time dependencies []JobID @@ -87,6 +89,7 @@ func (j *Job[T]) GetPersistence() JobPersistence { job := JobPersistence{ ID: j.id, + Description: j.description, Priority: j.priority, Timeout: j.timeout, MaxRetries: j.maxRetries, @@ -94,6 +97,10 @@ func (j *Job[T]) GetPersistence() JobPersistence { Dependencies: j.dependencies, Runnable: j.runner.GetPersistence(), + Pause: j.pause, + PauseReason: j.pauseReason, + PauseUntil: j.pauseUntil, + Logs: j.logs, Stats: j.stats, } @@ -127,7 +134,7 @@ func (j *Job[T]) Pause() { j.mu.Lock() defer j.mu.Unlock() j.pause = true - + } // PauseUntil pauses the job until the given time diff --git a/persistence.go b/persistence.go index 045440005d698bbce80c1728051ae5ad9e1d0453..f5bb440668f7a77747dda1c0f3a5c07a0934699c 100644 --- a/persistence.go +++ b/persistence.go @@ -13,6 +13,7 @@ import ( type JobPersistence struct { ID JobID `yaml:"id" json:"id" gorm:"type:varchar(255);primaryKey"` + Description string `yaml:"description" json:"description" gorm:"column:description"` Priority Priority `yaml:"priority" json:"priority" gorm:"column:priority"` Timeout time.Duration `yaml:"timeout" json:"timeout" gorm:"column:timeout"` MaxRetries uint `yaml:"maxRetries" json:"maxRetries" gorm:"column:max_retries"` @@ -21,6 +22,10 @@ type JobPersistence struct { Runnable RunnableImport `yaml:"runnable" json:"runnable" gorm:"embedded;embeddedPrefix:runnable_"` Scheduler SchedulerPersistence `yaml:"scheduler" json:"scheduler,omitempty" gorm:"embedded;embeddedPrefix:scheduler_"` + Pause bool `yaml:"pause" json:"pause" gorm:"column:pause"` + PauseReason string `yaml:"pauseReason" json:"pauseReason" gorm:"column:pause_reason"` + PauseUntil time.Time `yaml:"pauseUntil" json:"pauseUntil" gorm:"column:pause_until"` + Logs []JobLog `gorm:"foreignKey:JobID;references:ID" json:"-" yaml:"-"` Stats JobStats `gorm:"foreignKey:JobID" json:"stats" yaml:"stats"` @@ -45,6 +50,18 @@ func (jp JobPersistence) GetPersistence() JobPersistence { return jp } +func (jp JobPersistence) GetDescription() string { + return jp.Description +} + +func (jp JobPersistence) GetPriority() Priority { + return jp.Priority +} + +func (jp JobPersistence) GetTimeout() time.Duration { + return jp.Timeout +} + func (JobPersistence) TableName() string { return globalTableNamePrefix + "jobs" } @@ -102,7 +119,7 @@ func ReadFromGORM(db *gorm.DB) ([]JobPersistence, error) { } var wrappedErr []error - // load stats too + for i := range jobs { if err := tx.Model(&jobs[i]).Association("Stats").Find(&jobs[i].Stats); err != nil { wrappedErr = append(wrappedErr, err) @@ -110,7 +127,7 @@ func ReadFromGORM(db *gorm.DB) ([]JobPersistence, error) { } if len(wrappedErr) > 0 { - returnErr := fmt.Errorf("errors while loading stats from database") + returnErr := ErrCannotLoadStatsFromDatabase for _, err := range wrappedErr { returnErr = fmt.Errorf("%w: %v", returnErr, err) } @@ -126,11 +143,15 @@ func ReadFromGORM(db *gorm.DB) ([]JobPersistence, error) { func CreateGenericJobFromPersistence[T any](jobImport JobPersistence, runner Runnable[T]) GenericJob { return &Job[T]{ id: jobImport.ID, + description: jobImport.Description, priority: jobImport.Priority, timeout: jobImport.Timeout, maxRetries: jobImport.MaxRetries, RetryDelay: jobImport.RetryDelay, dependencies: jobImport.Dependencies, + pause: jobImport.Pause, + pauseReason: jobImport.PauseReason, + pauseUntil: jobImport.PauseUntil, runner: runner, stats: jobImport.Stats, logs: jobImport.Logs,