From ce13f3fc9922bbe75ffac9f6e8ca71b535fafd97 Mon Sep 17 00:00:00 2001 From: Volker Schukai <volker.schukai@schukai.com> Date: Sun, 12 Nov 2023 13:08:47 +0100 Subject: [PATCH] feat: add pause to persistence #17 --- errors.go | 1 + job.go | 17 ++++++++++++----- persistence.go | 25 +++++++++++++++++++++++-- 3 files changed, 36 insertions(+), 7 deletions(-) diff --git a/errors.go b/errors.go index a159e21..69b2f3e 100644 --- a/errors.go +++ b/errors.go @@ -43,4 +43,5 @@ var ( ErrChannelAlreadyClosed = fmt.Errorf("channel is already closed") ErrUnknownScheduleType = fmt.Errorf("unknown schedule type") ErrNoManager = fmt.Errorf("no manager") + ErrCannotLoadStatsFromDatabase = fmt.Errorf("errors while loading stats from database") ) diff --git a/job.go b/job.go index 59777b7..5b7a8a6 100644 --- a/job.go +++ b/job.go @@ -26,8 +26,9 @@ const ( // Job is a job that can be executed type Job[T any] struct { - id JobID - priority Priority + id JobID + description string + priority Priority timeout time.Duration maxRetries uint @@ -35,8 +36,9 @@ type Job[T any] struct { scheduler Scheduler - pause bool - pauseUntil time.Time + pause bool + pauseReason string + pauseUntil time.Time dependencies []JobID @@ -87,6 +89,7 @@ func (j *Job[T]) GetPersistence() JobPersistence { job := JobPersistence{ ID: j.id, + Description: j.description, Priority: j.priority, Timeout: j.timeout, MaxRetries: j.maxRetries, @@ -94,6 +97,10 @@ func (j *Job[T]) GetPersistence() JobPersistence { Dependencies: j.dependencies, Runnable: j.runner.GetPersistence(), + Pause: j.pause, + PauseReason: j.pauseReason, + PauseUntil: j.pauseUntil, + Logs: j.logs, Stats: j.stats, } @@ -127,7 +134,7 @@ func (j *Job[T]) Pause() { j.mu.Lock() defer j.mu.Unlock() j.pause = true - + } // PauseUntil pauses the job until the given time diff --git a/persistence.go b/persistence.go index 0454400..f5bb440 100644 --- a/persistence.go +++ b/persistence.go @@ -13,6 +13,7 @@ import ( type JobPersistence struct { ID JobID `yaml:"id" json:"id" gorm:"type:varchar(255);primaryKey"` + Description string `yaml:"description" json:"description" gorm:"column:description"` Priority Priority `yaml:"priority" json:"priority" gorm:"column:priority"` Timeout time.Duration `yaml:"timeout" json:"timeout" gorm:"column:timeout"` MaxRetries uint `yaml:"maxRetries" json:"maxRetries" gorm:"column:max_retries"` @@ -21,6 +22,10 @@ type JobPersistence struct { Runnable RunnableImport `yaml:"runnable" json:"runnable" gorm:"embedded;embeddedPrefix:runnable_"` Scheduler SchedulerPersistence `yaml:"scheduler" json:"scheduler,omitempty" gorm:"embedded;embeddedPrefix:scheduler_"` + Pause bool `yaml:"pause" json:"pause" gorm:"column:pause"` + PauseReason string `yaml:"pauseReason" json:"pauseReason" gorm:"column:pause_reason"` + PauseUntil time.Time `yaml:"pauseUntil" json:"pauseUntil" gorm:"column:pause_until"` + Logs []JobLog `gorm:"foreignKey:JobID;references:ID" json:"-" yaml:"-"` Stats JobStats `gorm:"foreignKey:JobID" json:"stats" yaml:"stats"` @@ -45,6 +50,18 @@ func (jp JobPersistence) GetPersistence() JobPersistence { return jp } +func (jp JobPersistence) GetDescription() string { + return jp.Description +} + +func (jp JobPersistence) GetPriority() Priority { + return jp.Priority +} + +func (jp JobPersistence) GetTimeout() time.Duration { + return jp.Timeout +} + func (JobPersistence) TableName() string { return globalTableNamePrefix + "jobs" } @@ -102,7 +119,7 @@ func ReadFromGORM(db *gorm.DB) ([]JobPersistence, error) { } var wrappedErr []error - // load stats too + for i := range jobs { if err := tx.Model(&jobs[i]).Association("Stats").Find(&jobs[i].Stats); err != nil { wrappedErr = append(wrappedErr, err) @@ -110,7 +127,7 @@ func ReadFromGORM(db *gorm.DB) ([]JobPersistence, error) { } if len(wrappedErr) > 0 { - returnErr := fmt.Errorf("errors while loading stats from database") + returnErr := ErrCannotLoadStatsFromDatabase for _, err := range wrappedErr { returnErr = fmt.Errorf("%w: %v", returnErr, err) } @@ -126,11 +143,15 @@ func ReadFromGORM(db *gorm.DB) ([]JobPersistence, error) { func CreateGenericJobFromPersistence[T any](jobImport JobPersistence, runner Runnable[T]) GenericJob { return &Job[T]{ id: jobImport.ID, + description: jobImport.Description, priority: jobImport.Priority, timeout: jobImport.Timeout, maxRetries: jobImport.MaxRetries, RetryDelay: jobImport.RetryDelay, dependencies: jobImport.Dependencies, + pause: jobImport.Pause, + pauseReason: jobImport.PauseReason, + pauseUntil: jobImport.PauseUntil, runner: runner, stats: jobImport.Stats, logs: jobImport.Logs, -- GitLab