Something went wrong on our end
Select Git revision
defaults.nix
-
Volker Schukai authoredVolker Schukai authored
persistence.go 9.13 KiB
package jobqueue
import (
"encoding/json"
"fmt"
"gopkg.in/yaml.v3"
"gorm.io/gorm"
"io"
"os"
"strings"
"time"
)
type JobPersistence struct {
ID JobID `yaml:"id" json:"id" gorm:"type:varchar(255);primaryKey"`
Description string `yaml:"description" json:"description" gorm:"column:description"`
Priority Priority `yaml:"priority" json:"priority" gorm:"column:priority"`
Timeout time.Duration `yaml:"timeout" json:"timeout" gorm:"column:timeout"`
MaxRetries uint `yaml:"maxRetries" json:"maxRetries" gorm:"column:max_retries"`
RetryDelay time.Duration `yaml:"retryDelay" json:"retryDelay" gorm:"column:retry_delay"`
Dependencies []JobID `yaml:"dependencies" json:"dependencies,omitempty" gorm:"column:dependencies;type:json"`
Runnable RunnableImport `yaml:"runnable" json:"runnable" gorm:"embedded;embeddedPrefix:runnable_"`
Scheduler SchedulerPersistence `yaml:"scheduler" json:"scheduler,omitempty" gorm:"embedded;embeddedPrefix:scheduler_"`
Pause bool `yaml:"pause" json:"pause" gorm:"column:pause"`
PauseReason string `yaml:"pauseReason" json:"pauseReason" gorm:"column:pause_reason"`
PauseUntil time.Time `yaml:"pauseUntil" json:"pauseUntil" gorm:"column:pause_until"`
Logs []JobLog `gorm:"foreignKey:JobID;references:ID" json:"-" yaml:"-"`
Stats JobStats `gorm:"foreignKey:JobID" json:"stats" yaml:"stats"`
CreatedAt time.Time `gorm:"column:created_at" json:"created_at" yaml:"created_at"`
UpdatedAt time.Time `gorm:"column:updated_at" json:"updated_at" yaml:"updated_at"`
DeletedAt gorm.DeletedAt `gorm:"column:deleted_at;index" json:"-" yaml:"-"`
}
func (jp JobPersistence) GetLogs() []JobLog {
return jp.Logs
}
func (jp JobPersistence) GetStats() JobStats {
return jp.Stats
}
func (jp JobPersistence) GetID() JobID {
return jp.ID
}
func (jp JobPersistence) GetPersistence() JobPersistence {
return jp
}
func (jp JobPersistence) GetDescription() string {
return jp.Description
}
func (jp JobPersistence) GetPriority() Priority {
return jp.Priority
}
func (jp JobPersistence) GetTimeout() time.Duration {
return jp.Timeout
}
func (JobPersistence) TableName() string {
return globalTableNamePrefix + "jobs"
}
type RunnableImport struct {
Type string `yaml:"type" json:"type" gorm:"column:type"`
Data JSONMap `yaml:"data,omitempty" json:"data,omitempty" gorm:"column:data;type:json"`
}
func ReadYAML(r io.Reader) ([]JobPersistence, error) {
var jobs []JobPersistence
decoder := yaml.NewDecoder(r)
if err := decoder.Decode(&jobs); err != nil {
return nil, err
}
return jobs, nil
}
func ReadJSON(r io.Reader) ([]JobPersistence, error) {
var jobs []JobPersistence
decoder := json.NewDecoder(r)
if err := decoder.Decode(&jobs); err != nil {
return nil, err
}
return jobs, nil
}
func ReadYAMLFile(filePath string) ([]JobPersistence, error) {
// #nosec
file, err := os.Open(filePath)
if err != nil {
return nil, err
}
defer file.Close()
return ReadYAML(file)
}
func ReadJsonFile(filePath string) ([]JobPersistence, error) {
// #nosec
file, err := os.Open(filePath)
if err != nil {
return nil, err
}
defer file.Close()
return ReadJSON(file)
}
func ReadFromGORM(db *gorm.DB) ([]JobPersistence, error) {
var jobs []JobPersistence
err := db.Transaction(func(tx *gorm.DB) error {
if err := tx.Find(&jobs).Error; err != nil {
return err
}
var wrappedErr []error
for i := range jobs {
if err := tx.Model(&jobs[i]).Association("Stats").Find(&jobs[i].Stats); err != nil {
wrappedErr = append(wrappedErr, err)
}
}
if len(wrappedErr) > 0 {
returnErr := ErrCannotLoadStatsFromDatabase
for _, err := range wrappedErr {
returnErr = fmt.Errorf("%w: %v", returnErr, err)
}
return returnErr
}
return nil
})
return jobs, err
}
func CreateGenericJobFromPersistence[T any](jobImport JobPersistence, runner Runnable[T]) GenericJob {
return &Job[T]{
id: jobImport.ID,
description: jobImport.Description,
priority: jobImport.Priority,
timeout: jobImport.Timeout,
maxRetries: jobImport.MaxRetries,
RetryDelay: jobImport.RetryDelay,
dependencies: jobImport.Dependencies,
pause: jobImport.Pause,
pauseReason: jobImport.PauseReason,
pauseUntil: jobImport.PauseUntil,
runner: runner,
stats: jobImport.Stats,
logs: jobImport.Logs,
}
}
func CreateJobAndSchedulerFromPersistence(jobImport JobPersistence, manager *Manager) (GenericJob, Scheduler, error) {
var job GenericJob
rType := strings.ToLower(jobImport.Runnable.Type)
runnableData := make(map[string]interface{})
for k, v := range jobImport.Runnable.Data {
runnableData[strings.ToLower(k)] = v
}
switch rType {
case "dummy":
runner, err := NewDummyRunnableFromMap(runnableData)
if err != nil {
return nil, nil, err
}
job = CreateGenericJobFromPersistence[DummyResult](jobImport, runner)
case "counter":
runner, err := NewCounterRunnableFromMap(runnableData)
if err != nil {
return nil, nil, err
}
job = CreateGenericJobFromPersistence[CounterResult](jobImport, runner)
case "fileoperation":
runner, err := NewFileOperationRunnableFromMap(runnableData)
if err != nil {
return nil, nil, err
}
job = CreateGenericJobFromPersistence[FileOperationResult](jobImport, runner)
case "db":
runner, err := NewDBRunnableFromMap(runnableData)
if err != nil {
return nil, nil, err
}
job = CreateGenericJobFromPersistence[DBResult](jobImport, runner)
case "http":
runner, err := NewHTTPRunnableFromMap(runnableData)
if err != nil {
return nil, nil, err
}
job = CreateGenericJobFromPersistence[HTTPResult](jobImport, runner)
case "mail":
runner, err := NewMailRunnableFromMap(runnableData)
if err != nil {
return nil, nil, err
}
job = CreateGenericJobFromPersistence[MailResult](jobImport, runner)
case "sftp":
runner, err := NewSFTPRunnableFromMap(runnableData)
if err != nil {
return nil, nil, err
}
job = CreateGenericJobFromPersistence[SFTPResult](jobImport, runner)
case "shell":
runner, err := NewShellRunnableFromMap(runnableData)
if err != nil {
return nil, nil, err
}
job = CreateGenericJobFromPersistence[ShellResult](jobImport, runner)
default:
return nil, nil,
fmt.Errorf("%w: %s, available types: dummy, counter, fileoperation, db, http, mail, sftp, shell",
ErrUnknownRunnableType, rType)
}
sType := strings.ToLower(jobImport.Scheduler.Type)
scheduleData := make(map[string]interface{})
for k, v := range jobImport.Runnable.Data {
scheduleData[strings.ToLower(k)] = v
}
var scheduler Scheduler
switch sType {
case "interval":
scheduler = &IntervalScheduler{Interval: jobImport.Scheduler.Interval}
case "cron":
scheduler = &CronScheduler{
Spec: jobImport.Scheduler.Spec,
}
if manager != nil {
scheduler.(*CronScheduler).cron = manager.GetCronInstance()
}
case "delay":
scheduler = &DelayScheduler{Delay: jobImport.Scheduler.Delay}
case "event":
scheduler = &EventScheduler{Event: EventName(jobImport.Scheduler.Event)}
case "instant":
scheduler = &InstantScheduler{}
default:
return nil, nil, ErrUnknownSchedulerType
}
return job, scheduler, nil
}
// LoadJobsAndScheduleFromFile read jobs from a file and schedule them. (json/yaml)
func LoadJobsAndScheduleFromFile(filePath string, manager *Manager) error {
var err error
var imp []JobPersistence
if filePath[len(filePath)-4:] == "json" {
imp, err = ReadJsonFile(filePath)
} else if filePath[len(filePath)-4:] == "yaml" {
imp, err = ReadYAMLFile(filePath)
} else {
return ErrUnknownFormat
}
if err != nil {
return err
}
for _, imp := range imp {
job, scheduler, err := CreateJobAndSchedulerFromPersistence(imp, manager)
if err != nil {
return err
}
err = manager.ScheduleJob(job, scheduler)
if err != nil {
return err
}
}
return nil
}
func LoadJobsAndScheduleFromDatabase(db *gorm.DB, manager *Manager) error {
jobs, err := ReadFromGORM(db)
if err != nil {
return err
}
var errs []error
for _, job := range jobs {
j, s, err := CreateJobAndSchedulerFromPersistence(job, manager)
if err != nil {
errs = append(errs, err)
continue
}
err = manager.ScheduleJob(j, s)
if err != nil {
errs = append(errs, err)
continue
}
}
if len(errs) > 0 {
returnErr := fmt.Errorf("errors while loading jobs from database")
for _, err := range errs {
returnErr = fmt.Errorf("%w: %v", returnErr, err)
}
return returnErr
}
return nil
}
// ImportJobsAndSchedule lädt Jobs aus einem Reader und plant sie ein.
func ImportJobsAndSchedule(reader io.Reader, format string, manager *Manager) error {
var err error
var imp []JobPersistence
// format to lowercase
format = strings.ToLower(format)
if format == "json" {
imp, err = ReadJSON(reader)
} else if format == "yaml" {
imp, err = ReadYAML(reader)
} else {
return fmt.Errorf("%w: %s", ErrUnknownFormat, format)
}
if err != nil {
return err
}
for _, imp := range imp {
job, scheduler, err := CreateJobAndSchedulerFromPersistence(imp, manager)
if err != nil {
return err
}
err = manager.ScheduleJob(job, scheduler)
if err != nil {
return err
}
}
return nil
}