Skip to content
Snippets Groups Projects
Verified Commit ce13f3fc authored by Volker Schukai's avatar Volker Schukai :alien:
Browse files

feat: add pause to persistence #17

parent e0d03ed3
No related branches found
No related tags found
No related merge requests found
...@@ -43,4 +43,5 @@ var ( ...@@ -43,4 +43,5 @@ var (
ErrChannelAlreadyClosed = fmt.Errorf("channel is already closed") ErrChannelAlreadyClosed = fmt.Errorf("channel is already closed")
ErrUnknownScheduleType = fmt.Errorf("unknown schedule type") ErrUnknownScheduleType = fmt.Errorf("unknown schedule type")
ErrNoManager = fmt.Errorf("no manager") ErrNoManager = fmt.Errorf("no manager")
ErrCannotLoadStatsFromDatabase = fmt.Errorf("errors while loading stats from database")
) )
...@@ -27,6 +27,7 @@ const ( ...@@ -27,6 +27,7 @@ const (
// Job is a job that can be executed // Job is a job that can be executed
type Job[T any] struct { type Job[T any] struct {
id JobID id JobID
description string
priority Priority priority Priority
timeout time.Duration timeout time.Duration
...@@ -36,6 +37,7 @@ type Job[T any] struct { ...@@ -36,6 +37,7 @@ type Job[T any] struct {
scheduler Scheduler scheduler Scheduler
pause bool pause bool
pauseReason string
pauseUntil time.Time pauseUntil time.Time
dependencies []JobID dependencies []JobID
...@@ -87,6 +89,7 @@ func (j *Job[T]) GetPersistence() JobPersistence { ...@@ -87,6 +89,7 @@ func (j *Job[T]) GetPersistence() JobPersistence {
job := JobPersistence{ job := JobPersistence{
ID: j.id, ID: j.id,
Description: j.description,
Priority: j.priority, Priority: j.priority,
Timeout: j.timeout, Timeout: j.timeout,
MaxRetries: j.maxRetries, MaxRetries: j.maxRetries,
...@@ -94,6 +97,10 @@ func (j *Job[T]) GetPersistence() JobPersistence { ...@@ -94,6 +97,10 @@ func (j *Job[T]) GetPersistence() JobPersistence {
Dependencies: j.dependencies, Dependencies: j.dependencies,
Runnable: j.runner.GetPersistence(), Runnable: j.runner.GetPersistence(),
Pause: j.pause,
PauseReason: j.pauseReason,
PauseUntil: j.pauseUntil,
Logs: j.logs, Logs: j.logs,
Stats: j.stats, Stats: j.stats,
} }
......
...@@ -13,6 +13,7 @@ import ( ...@@ -13,6 +13,7 @@ import (
type JobPersistence struct { type JobPersistence struct {
ID JobID `yaml:"id" json:"id" gorm:"type:varchar(255);primaryKey"` ID JobID `yaml:"id" json:"id" gorm:"type:varchar(255);primaryKey"`
Description string `yaml:"description" json:"description" gorm:"column:description"`
Priority Priority `yaml:"priority" json:"priority" gorm:"column:priority"` Priority Priority `yaml:"priority" json:"priority" gorm:"column:priority"`
Timeout time.Duration `yaml:"timeout" json:"timeout" gorm:"column:timeout"` Timeout time.Duration `yaml:"timeout" json:"timeout" gorm:"column:timeout"`
MaxRetries uint `yaml:"maxRetries" json:"maxRetries" gorm:"column:max_retries"` MaxRetries uint `yaml:"maxRetries" json:"maxRetries" gorm:"column:max_retries"`
...@@ -21,6 +22,10 @@ type JobPersistence struct { ...@@ -21,6 +22,10 @@ type JobPersistence struct {
Runnable RunnableImport `yaml:"runnable" json:"runnable" gorm:"embedded;embeddedPrefix:runnable_"` Runnable RunnableImport `yaml:"runnable" json:"runnable" gorm:"embedded;embeddedPrefix:runnable_"`
Scheduler SchedulerPersistence `yaml:"scheduler" json:"scheduler,omitempty" gorm:"embedded;embeddedPrefix:scheduler_"` Scheduler SchedulerPersistence `yaml:"scheduler" json:"scheduler,omitempty" gorm:"embedded;embeddedPrefix:scheduler_"`
Pause bool `yaml:"pause" json:"pause" gorm:"column:pause"`
PauseReason string `yaml:"pauseReason" json:"pauseReason" gorm:"column:pause_reason"`
PauseUntil time.Time `yaml:"pauseUntil" json:"pauseUntil" gorm:"column:pause_until"`
Logs []JobLog `gorm:"foreignKey:JobID;references:ID" json:"-" yaml:"-"` Logs []JobLog `gorm:"foreignKey:JobID;references:ID" json:"-" yaml:"-"`
Stats JobStats `gorm:"foreignKey:JobID" json:"stats" yaml:"stats"` Stats JobStats `gorm:"foreignKey:JobID" json:"stats" yaml:"stats"`
...@@ -45,6 +50,18 @@ func (jp JobPersistence) GetPersistence() JobPersistence { ...@@ -45,6 +50,18 @@ func (jp JobPersistence) GetPersistence() JobPersistence {
return jp return jp
} }
func (jp JobPersistence) GetDescription() string {
return jp.Description
}
func (jp JobPersistence) GetPriority() Priority {
return jp.Priority
}
func (jp JobPersistence) GetTimeout() time.Duration {
return jp.Timeout
}
func (JobPersistence) TableName() string { func (JobPersistence) TableName() string {
return globalTableNamePrefix + "jobs" return globalTableNamePrefix + "jobs"
} }
...@@ -102,7 +119,7 @@ func ReadFromGORM(db *gorm.DB) ([]JobPersistence, error) { ...@@ -102,7 +119,7 @@ func ReadFromGORM(db *gorm.DB) ([]JobPersistence, error) {
} }
var wrappedErr []error var wrappedErr []error
// load stats too
for i := range jobs { for i := range jobs {
if err := tx.Model(&jobs[i]).Association("Stats").Find(&jobs[i].Stats); err != nil { if err := tx.Model(&jobs[i]).Association("Stats").Find(&jobs[i].Stats); err != nil {
wrappedErr = append(wrappedErr, err) wrappedErr = append(wrappedErr, err)
...@@ -110,7 +127,7 @@ func ReadFromGORM(db *gorm.DB) ([]JobPersistence, error) { ...@@ -110,7 +127,7 @@ func ReadFromGORM(db *gorm.DB) ([]JobPersistence, error) {
} }
if len(wrappedErr) > 0 { if len(wrappedErr) > 0 {
returnErr := fmt.Errorf("errors while loading stats from database") returnErr := ErrCannotLoadStatsFromDatabase
for _, err := range wrappedErr { for _, err := range wrappedErr {
returnErr = fmt.Errorf("%w: %v", returnErr, err) returnErr = fmt.Errorf("%w: %v", returnErr, err)
} }
...@@ -126,11 +143,15 @@ func ReadFromGORM(db *gorm.DB) ([]JobPersistence, error) { ...@@ -126,11 +143,15 @@ func ReadFromGORM(db *gorm.DB) ([]JobPersistence, error) {
func CreateGenericJobFromPersistence[T any](jobImport JobPersistence, runner Runnable[T]) GenericJob { func CreateGenericJobFromPersistence[T any](jobImport JobPersistence, runner Runnable[T]) GenericJob {
return &Job[T]{ return &Job[T]{
id: jobImport.ID, id: jobImport.ID,
description: jobImport.Description,
priority: jobImport.Priority, priority: jobImport.Priority,
timeout: jobImport.Timeout, timeout: jobImport.Timeout,
maxRetries: jobImport.MaxRetries, maxRetries: jobImport.MaxRetries,
RetryDelay: jobImport.RetryDelay, RetryDelay: jobImport.RetryDelay,
dependencies: jobImport.Dependencies, dependencies: jobImport.Dependencies,
pause: jobImport.Pause,
pauseReason: jobImport.PauseReason,
pauseUntil: jobImport.PauseUntil,
runner: runner, runner: runner,
stats: jobImport.Stats, stats: jobImport.Stats,
logs: jobImport.Logs, logs: jobImport.Logs,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment