package jobqueue import ( "encoding/json" "fmt" "gopkg.in/yaml.v3" "gorm.io/gorm" "io" "os" "strings" "time" ) type JobPersistence struct { ID JobID `yaml:"id" json:"id" gorm:"type:varchar(255);primaryKey"` Priority Priority `yaml:"priority" json:"priority" gorm:"column:priority"` Timeout time.Duration `yaml:"timeout" json:"timeout" gorm:"column:timeout"` MaxRetries uint `yaml:"maxRetries" json:"maxRetries" gorm:"column:max_retries"` RetryDelay time.Duration `yaml:"retryDelay" json:"retryDelay" gorm:"column:retry_delay"` Dependencies []JobID `yaml:"dependencies" json:"dependencies,omitempty" gorm:"column:dependencies;type:json"` Runnable RunnableImport `yaml:"runnable" json:"runnable" gorm:"embedded;embeddedPrefix:runnable_"` Scheduler SchedulerPersistence `yaml:"scheduler" json:"scheduler,omitempty" gorm:"embedded;embeddedPrefix:scheduler_"` Logs []JobLog `gorm:"foreignKey:JobID;references:ID"` Stats JobStats `gorm:"foreignKey:JobID"` CreatedAt time.Time `gorm:"column:created_at"` UpdatedAt time.Time `gorm:"column:updated_at"` DeletedAt gorm.DeletedAt `gorm:"column:deleted_at;index"` } func (jp JobPersistence) GetLogs() []JobLog { return jp.Logs } func (jp JobPersistence) GetStats() JobStats { return jp.Stats } func (jp JobPersistence) GetID() JobID { return jp.ID } func (jp JobPersistence) GetPersistence() JobPersistence { return jp } func (JobPersistence) TableName() string { return globalTableNamePrefix + "jobs" } type RunnableImport struct { Type string `yaml:"type" json:"type" gorm:"column:type"` Data JSONMap `yaml:"data,omitempty" json:"data,omitempty" gorm:"column:data;type:json"` } func ReadYAML(r io.Reader) ([]JobPersistence, error) { var jobs []JobPersistence decoder := yaml.NewDecoder(r) if err := decoder.Decode(&jobs); err != nil { return nil, err } return jobs, nil } func ReadJSON(r io.Reader) ([]JobPersistence, error) { var jobs []JobPersistence decoder := json.NewDecoder(r) if err := decoder.Decode(&jobs); err != nil { return nil, err } return jobs, nil } func ReadYAMLFile(filePath string) ([]JobPersistence, error) { // #nosec file, err := os.Open(filePath) if err != nil { return nil, err } defer file.Close() return ReadYAML(file) } func ReadJsonFile(filePath string) ([]JobPersistence, error) { // #nosec file, err := os.Open(filePath) if err != nil { return nil, err } defer file.Close() return ReadJSON(file) } func ReadFromGORM(db *gorm.DB) ([]JobPersistence, error) { var jobs []JobPersistence err := db.Transaction(func(tx *gorm.DB) error { if err := tx.Find(&jobs).Error; err != nil { return err } var wrappedErr []error // load stats too for i := range jobs { if err := tx.Model(&jobs[i]).Association("Stats").Find(&jobs[i].Stats); err != nil { wrappedErr = append(wrappedErr, err) } } if len(wrappedErr) > 0 { returnErr := fmt.Errorf("errors while loading stats from database") for _, err := range wrappedErr { returnErr = fmt.Errorf("%w: %v", returnErr, err) } return returnErr } return nil }) return jobs, err } func CreateGenericJobFromPersistence[T any](jobImport JobPersistence, runner Runnable[T]) GenericJob { return &Job[T]{ id: jobImport.ID, priority: jobImport.Priority, timeout: jobImport.Timeout, maxRetries: jobImport.MaxRetries, RetryDelay: jobImport.RetryDelay, dependencies: jobImport.Dependencies, runner: runner, stats: jobImport.Stats, logs: jobImport.Logs, } } func CreateJobAndSchedulerFromPersistence(jobImport JobPersistence, manager *Manager) (GenericJob, Scheduler, error) { var job GenericJob rType := strings.ToLower(jobImport.Runnable.Type) runnableData := make(map[string]interface{}) for k, v := range jobImport.Runnable.Data { runnableData[strings.ToLower(k)] = v } switch rType { case "dummy": runner, err := NewDummyRunnableFromMap(runnableData) if err != nil { return nil, nil, err } job = CreateGenericJobFromPersistence[DummyResult](jobImport, runner) case "counter": runner, err := NewCounterRunnableFromMap(runnableData) if err != nil { return nil, nil, err } job = CreateGenericJobFromPersistence[CounterResult](jobImport, runner) case "fileoperation": runner, err := NewFileOperationRunnableFromMap(runnableData) if err != nil { return nil, nil, err } job = CreateGenericJobFromPersistence[FileOperationResult](jobImport, runner) case "db": runner, err := NewDBRunnableFromMap(runnableData) if err != nil { return nil, nil, err } job = CreateGenericJobFromPersistence[DBResult](jobImport, runner) case "http": runner, err := NewHTTPRunnableFromMap(runnableData) if err != nil { return nil, nil, err } job = CreateGenericJobFromPersistence[HTTPResult](jobImport, runner) case "mail": runner, err := NewMailRunnableFromMap(runnableData) if err != nil { return nil, nil, err } job = CreateGenericJobFromPersistence[MailResult](jobImport, runner) case "sftp": runner, err := NewSFTPRunnableFromMap(runnableData) if err != nil { return nil, nil, err } job = CreateGenericJobFromPersistence[SFTPResult](jobImport, runner) case "shell": runner, err := NewShellRunnableFromMap(runnableData) if err != nil { return nil, nil, err } job = CreateGenericJobFromPersistence[ShellResult](jobImport, runner) default: return nil, nil, fmt.Errorf("%w: %s, available types: dummy, counter, fileoperation, db, http, mail, sftp, shell", ErrUnknownRunnableType, rType) } sType := strings.ToLower(jobImport.Scheduler.Type) scheduleData := make(map[string]interface{}) for k, v := range jobImport.Runnable.Data { scheduleData[strings.ToLower(k)] = v } var scheduler Scheduler switch sType { case "interval": scheduler = &IntervalScheduler{Interval: jobImport.Scheduler.Interval} case "cron": scheduler = &CronScheduler{ Spec: jobImport.Scheduler.Spec, } if manager != nil { scheduler.(*CronScheduler).cron = manager.GetCronInstance() } case "delay": scheduler = &DelayScheduler{Delay: jobImport.Scheduler.Delay} case "event": scheduler = &EventScheduler{Event: EventName(jobImport.Scheduler.Event)} case "instant": scheduler = &InstantScheduler{} default: return nil, nil, ErrUnknownSchedulerType } return job, scheduler, nil } // LoadJobsAndScheduleFromFile read jobs from a file and schedule them. (json/yaml) func LoadJobsAndScheduleFromFile(filePath string, manager *Manager) error { var err error var imp []JobPersistence if filePath[len(filePath)-4:] == "json" { imp, err = ReadJsonFile(filePath) } else if filePath[len(filePath)-4:] == "yaml" { imp, err = ReadYAMLFile(filePath) } else { return ErrUnknownFormat } if err != nil { return err } for _, imp := range imp { job, scheduler, err := CreateJobAndSchedulerFromPersistence(imp, manager) if err != nil { return err } err = manager.ScheduleJob(job, scheduler) if err != nil { return err } } return nil } func LoadJobsAndScheduleFromDatabase(db *gorm.DB, manager *Manager) error { jobs, err := ReadFromGORM(db) if err != nil { return err } var errs []error for _, job := range jobs { j, s, err := CreateJobAndSchedulerFromPersistence(job, manager) if err != nil { errs = append(errs, err) continue } err = manager.ScheduleJob(j, s) if err != nil { errs = append(errs, err) continue } } if len(errs) > 0 { returnErr := fmt.Errorf("errors while loading jobs from database") for _, err := range errs { returnErr = fmt.Errorf("%w: %v", returnErr, err) } return returnErr } return nil } // ImportJobsAndSchedule lädt Jobs aus einem Reader und plant sie ein. func ImportJobsAndSchedule(reader io.Reader, format string, manager *Manager) error { var err error var imp []JobPersistence // format to lowercase format = strings.ToLower(format) if format == "json" { imp, err = ReadJSON(reader) } else if format == "yaml" { imp, err = ReadYAML(reader) } else { return fmt.Errorf("%w: %s", ErrUnknownFormat, format) } if err != nil { return err } for _, imp := range imp { job, scheduler, err := CreateJobAndSchedulerFromPersistence(imp, manager) if err != nil { return err } err = manager.ScheduleJob(job, scheduler) if err != nil { return err } } return nil }