// Copyright 2023 schukai GmbH // SPDX-License-Identifier: AGPL-3.0 package jobqueue import ( "encoding/json" "fmt" "github.com/robfig/cron/v3" "gopkg.in/yaml.v3" "gorm.io/gorm" "io" "os" "strings" "time" ) type JobPersistence struct { ID JobID `yaml:"id" json:"id" gorm:"type:varchar(255);primaryKey"` Description string `yaml:"description" json:"description" gorm:"column:description"` Priority Priority `yaml:"priority" json:"priority" gorm:"column:priority"` Timeout time.Duration `yaml:"timeout" json:"timeout" gorm:"column:timeout"` MaxRetries uint `yaml:"maxRetries" json:"maxRetries" gorm:"column:max_retries"` RetryDelay time.Duration `yaml:"retryDelay" json:"retryDelay" gorm:"column:retry_delay"` Dependencies []JobID `yaml:"dependencies" json:"dependencies,omitempty" gorm:"column:dependencies;type:json"` Runnable RunnableImport `yaml:"runnable" json:"runnable" gorm:"embedded;embeddedPrefix:runnable_"` Scheduler SchedulerPersistence `yaml:"scheduler" json:"scheduler,omitempty" gorm:"embedded;embeddedPrefix:scheduler_"` Pause bool `yaml:"pause" json:"pause" gorm:"column:pause"` PauseReason string `yaml:"pauseReason" json:"pauseReason" gorm:"column:pause_reason"` PauseUntil *time.Time `yaml:"pauseUntil" json:"pauseUntil" gorm:"column:pause_until"` Logs []JobLog `gorm:"foreignKey:JobID;references:ID" json:"-" yaml:"-"` Stats JobStats `gorm:"foreignKey:JobID" json:"stats" yaml:"stats"` CreatedAt time.Time `gorm:"column:created_at" json:"createdAt" yaml:"createdAt"` UpdatedAt time.Time `gorm:"column:updated_at" json:"updatedAt" yaml:"updatedAt"` DeletedAt gorm.DeletedAt `gorm:"column:deleted_at;index" json:"-" yaml:"-"` } // UnmarshalJSON implements the json.Unmarshaler interface. func (jp *JobPersistence) UnmarshalJSON(data []byte) error { // Anonymous struct for unmarshalling with custom time format type Alias JobPersistence aux := &struct { PauseUntil *string `json:"pauseUntil,omitempty"` *Alias }{ Alias: (*Alias)(jp), } if err := json.Unmarshal(data, &aux); err != nil { return err } if aux.PauseUntil != nil { var t time.Time var err error for _, format := range SupportedTimeFormats { t, err = time.Parse(format, *aux.PauseUntil) if err == nil { break } } if err != nil { return err } jp.PauseUntil = &t } return nil } func (jp JobPersistence) GetLogs() []JobLog { return jp.Logs } func (jp JobPersistence) GetStats() JobStats { return jp.Stats } func (jp JobPersistence) GetID() JobID { return jp.ID } func (jp JobPersistence) GetPersistence() JobPersistence { return jp } func (jp JobPersistence) GetDescription() string { return jp.Description } func (jp JobPersistence) GetPriority() Priority { return jp.Priority } func (jp JobPersistence) GetTimeout() time.Duration { return jp.Timeout } func (JobPersistence) TableName() string { return globalTableNamePrefix + "jobs" } type RunnableImport struct { Type string `yaml:"type" json:"type" gorm:"column:type"` Data JSONMap `yaml:"data,omitempty" json:"data,omitempty" gorm:"column:data;type:json"` } func ReadYAML(r io.Reader) ([]JobPersistence, error) { var jobs []JobPersistence decoder := yaml.NewDecoder(r) if err := decoder.Decode(&jobs); err != nil { return nil, err } return jobs, nil } func ReadJSON(r io.Reader) ([]JobPersistence, error) { var jobs []JobPersistence decoder := json.NewDecoder(r) if err := decoder.Decode(&jobs); err != nil { return nil, err } return jobs, nil } func ReadYAMLFile(filePath string) ([]JobPersistence, error) { // #nosec file, err := os.Open(filePath) if err != nil { return nil, err } defer func() { _ = file.Close() }() return ReadYAML(file) } func ReadJsonFile(filePath string) ([]JobPersistence, error) { // #nosec file, err := os.Open(filePath) if err != nil { return nil, err } defer func() { _ = file.Close() }() return ReadJSON(file) } func ReadFromGORM(db *gorm.DB) ([]JobPersistence, error) { var jobs []JobPersistence err := db.Transaction(func(tx *gorm.DB) error { if err := tx.Find(&jobs).Error; err != nil { return err } var wrappedErr []error for i := range jobs { if err := tx.Model(&jobs[i]).Association("Stats").Find(&jobs[i].Stats); err != nil { wrappedErr = append(wrappedErr, err) } } if len(wrappedErr) > 0 { returnErr := ErrCannotLoadStatsFromDatabase for _, err := range wrappedErr { returnErr = fmt.Errorf("%w: %v", returnErr, err) } return returnErr } return nil }) return jobs, err } func CreateGenericJobFromPersistence[T any](jobImport JobPersistence, runner Runnable[T]) GenericJob { return &Job[T]{ id: jobImport.ID, description: jobImport.Description, priority: jobImport.Priority, timeout: jobImport.Timeout, maxRetries: jobImport.MaxRetries, RetryDelay: jobImport.RetryDelay, dependencies: jobImport.Dependencies, pause: jobImport.Pause, pauseReason: jobImport.PauseReason, pauseUntil: jobImport.PauseUntil, runner: runner, stats: jobImport.Stats, logs: jobImport.Logs, } } func CreateJobAndSchedulerFromPersistence(jobImport JobPersistence, manager *Manager) (GenericJob, Scheduler, error) { var job GenericJob rType := strings.ToLower(jobImport.Runnable.Type) runnableData := make(map[string]interface{}) for k, v := range jobImport.Runnable.Data { runnableData[strings.ToLower(k)] = v } switch rType { case "dummy": runner, err := NewDummyRunnableFromMap(runnableData) if err != nil { return nil, nil, err } job = CreateGenericJobFromPersistence[DummyResult](jobImport, runner) case "counter": runner, err := NewCounterRunnableFromMap(runnableData) if err != nil { return nil, nil, err } job = CreateGenericJobFromPersistence[CounterResult](jobImport, runner) case "fileoperation": runner, err := NewFileOperationRunnableFromMap(runnableData) if err != nil { return nil, nil, err } job = CreateGenericJobFromPersistence[FileOperationResult](jobImport, runner) case "db": runner, err := NewDBRunnableFromMap(runnableData) if err != nil { return nil, nil, err } job = CreateGenericJobFromPersistence[DBResult](jobImport, runner) case "http": runner, err := NewHTTPRunnableFromMap(runnableData) if err != nil { return nil, nil, err } job = CreateGenericJobFromPersistence[HTTPResult](jobImport, runner) case "mail": runner, err := NewMailRunnableFromMap(runnableData) if err != nil { return nil, nil, err } job = CreateGenericJobFromPersistence[MailResult](jobImport, runner) case "sftp": runner, err := NewSFTPRunnableFromMap(runnableData) if err != nil { return nil, nil, err } job = CreateGenericJobFromPersistence[SFTPResult](jobImport, runner) case "shell": runner, err := NewShellRunnableFromMap(runnableData) if err != nil { return nil, nil, err } job = CreateGenericJobFromPersistence[ShellResult](jobImport, runner) default: return nil, nil, fmt.Errorf("%w: %s, available types: dummy, counter, fileoperation, db, http, mail, sftp, shell", ErrUnknownRunnableType, rType) } sType := strings.ToLower(jobImport.Scheduler.Type) scheduleData := make(map[string]interface{}) for k, v := range jobImport.Runnable.Data { scheduleData[strings.ToLower(k)] = v } var scheduler Scheduler switch sType { case "interval": if jobImport.Scheduler.Interval == 0 { return nil, nil, fmt.Errorf("%w: interval is 0", ErrSchedulerMisconfiguration) } scheduler = &IntervalScheduler{Interval: jobImport.Scheduler.Interval} case "cron": if jobImport.Scheduler.Spec == "" { return nil, nil, fmt.Errorf("%w: spec is empty", ErrSchedulerMisconfiguration) } // check spec parser := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor) _, err := parser.Parse(jobImport.Scheduler.Spec) if err != nil { return nil, nil, fmt.Errorf("%w: %v", ErrSchedulerMisconfiguration, err) } scheduler = &CronScheduler{ Spec: jobImport.Scheduler.Spec, } if manager != nil { scheduler.(*CronScheduler).cron = manager.GetCronInstance() } case "delay": if jobImport.Scheduler.Delay == 0 { return nil, nil, fmt.Errorf("%w: delay is 0", ErrSchedulerMisconfiguration) } scheduler = &DelayScheduler{Delay: jobImport.Scheduler.Delay} case "event": if jobImport.Scheduler.Event == "" { return nil, nil, fmt.Errorf("%w: event is empty", ErrSchedulerMisconfiguration) } scheduler = &EventScheduler{Event: EventName(jobImport.Scheduler.Event)} case "instant": scheduler = &InstantScheduler{} case "time": if jobImport.Scheduler.Time == nil { return nil, nil, fmt.Errorf("%w: time is nil", ErrSchedulerMisconfiguration) } scheduler = &TimeScheduler{Time: *jobImport.Scheduler.Time} default: return nil, nil, ErrUnknownSchedulerType } return job, scheduler, nil } // LoadJobsAndScheduleFromFile read jobs from a file and schedule them. (json/yaml) func LoadJobsAndScheduleFromFile(filePath string, manager *Manager) error { var err error var imp []JobPersistence if filePath[len(filePath)-4:] == "json" { imp, err = ReadJsonFile(filePath) } else if filePath[len(filePath)-4:] == "yaml" { imp, err = ReadYAMLFile(filePath) } else { return ErrUnknownFormat } if err != nil { return err } for _, imp := range imp { job, scheduler, err := CreateJobAndSchedulerFromPersistence(imp, manager) if err != nil { return err } err = manager.ScheduleJob(job, scheduler) if err != nil { return err } } return nil } func LoadJobsAndScheduleFromDatabase(db *gorm.DB, manager *Manager) error { jobs, err := ReadFromGORM(db) if err != nil { return err } var errs []error for _, job := range jobs { j, s, err := CreateJobAndSchedulerFromPersistence(job, manager) if err != nil { errs = append(errs, err) continue } err = manager.ScheduleJob(j, s) if err != nil { errs = append(errs, err) continue } } if len(errs) > 0 { returnErr := fmt.Errorf("errors while loading jobs from database") for _, err := range errs { returnErr = fmt.Errorf("%w: %v", returnErr, err) } return returnErr } return nil } // ImportJobsAndSchedule lädt Jobs aus einem Reader und plant sie ein. func ImportJobsAndSchedule(reader io.Reader, format string, manager *Manager) error { var err error var imp []JobPersistence // format to lowercase format = strings.ToLower(format) if format == "json" { imp, err = ReadJSON(reader) } else if format == "yaml" { imp, err = ReadYAML(reader) } else { return fmt.Errorf("%w: %s", ErrUnknownFormat, format) } if err != nil { return err } for _, imp := range imp { job, scheduler, err := CreateJobAndSchedulerFromPersistence(imp, manager) if err != nil { return err } err = manager.ScheduleJob(job, scheduler) if err != nil { return err } } return nil }