Skip to content
Snippets Groups Projects
Select Git revision
  • 7d668250ca4b8c1b68b3660c2d9fb4294a85298e
  • master default protected
  • v1.23.2
  • v1.23.1
  • v1.23.0
  • v1.22.0
  • v1.21.1
  • v1.21.0
  • v1.20.3
  • v1.20.2
  • v1.20.1
  • v1.20.0
  • v1.19.4
  • v1.19.3
  • v1.19.2
  • v1.19.1
  • v1.19.0
  • v1.18.2
  • v1.18.1
  • v1.18.0
  • v1.17.0
  • v1.16.1
22 results

stat.go

Blame
  • persistence.go 8.22 KiB
    package jobqueue
    
    import (
    	"encoding/json"
    	"fmt"
    	"gopkg.in/yaml.v3"
    	"gorm.io/gorm"
    	"io"
    	"os"
    	"strings"
    	"time"
    )
    
    type JobPersistence struct {
    	ID           JobID                `yaml:"id" json:"id" gorm:"type:varchar(255);primaryKey"`
    	Priority     Priority             `yaml:"priority" json:"priority" gorm:"column:priority"`
    	Timeout      time.Duration        `yaml:"timeout" json:"timeout" gorm:"column:timeout"`
    	MaxRetries   uint                 `yaml:"maxRetries" json:"maxRetries" gorm:"column:max_retries"`
    	RetryDelay   time.Duration        `yaml:"retryDelay" json:"retryDelay" gorm:"column:retry_delay"`
    	Dependencies []JobID              `yaml:"dependencies" json:"dependencies,omitempty" gorm:"column:dependencies;type:json"`
    	Runnable     RunnableImport       `yaml:"runnable" json:"runnable" gorm:"embedded;embeddedPrefix:runnable_"`
    	Scheduler    SchedulerPersistence `yaml:"scheduler" json:"scheduler,omitempty" gorm:"embedded;embeddedPrefix:scheduler_"`
    
    	Logs  []JobLog `gorm:"foreignKey:JobID;references:ID"`
    	Stats JobStats `gorm:"foreignKey:JobID"`
    
    	CreatedAt time.Time      `gorm:"column:created_at"`
    	UpdatedAt time.Time      `gorm:"column:updated_at"`
    	DeletedAt gorm.DeletedAt `gorm:"column:deleted_at;index"`
    }
    
    func (jp JobPersistence) GetLogs() []JobLog {
    	return jp.Logs
    }
    
    func (jp JobPersistence) GetStats() JobStats {
    	return jp.Stats
    }
    
    func (jp JobPersistence) GetID() JobID {
    	return jp.ID
    }
    
    func (jp JobPersistence) GetPersistence() JobPersistence {
    	return jp
    }
    
    func (JobPersistence) TableName() string {
    	return globalTableNamePrefix + "jobs"
    }
    
    type RunnableImport struct {
    	Type string  `yaml:"type" json:"type" gorm:"column:type"`
    	Data JSONMap `yaml:"data,omitempty" json:"data,omitempty" gorm:"column:data;type:json"`
    }
    
    func ReadYAML(r io.Reader) ([]JobPersistence, error) {
    	var jobs []JobPersistence
    	decoder := yaml.NewDecoder(r)
    	if err := decoder.Decode(&jobs); err != nil {
    		return nil, err
    	}
    	return jobs, nil
    }
    
    func ReadJSON(r io.Reader) ([]JobPersistence, error) {
    	var jobs []JobPersistence
    	decoder := json.NewDecoder(r)
    	if err := decoder.Decode(&jobs); err != nil {
    		return nil, err
    	}
    	return jobs, nil
    }
    
    func ReadYAMLFile(filePath string) ([]JobPersistence, error) {
    	// #nosec
    	file, err := os.Open(filePath)
    	if err != nil {
    		return nil, err
    	}
    	defer file.Close()
    	return ReadYAML(file)
    }
    
    func ReadJsonFile(filePath string) ([]JobPersistence, error) {
    	// #nosec
    	file, err := os.Open(filePath)
    	if err != nil {
    		return nil, err
    	}
    	defer file.Close()
    	return ReadJSON(file)
    }
    
    func ReadFromGORM(db *gorm.DB) ([]JobPersistence, error) {
    	var jobs []JobPersistence
    
    	err := db.Transaction(func(tx *gorm.DB) error {
    
    		if err := tx.Find(&jobs).Error; err != nil {
    			return err
    		}
    
    		var wrappedErr []error
    		// load stats too
    		for i := range jobs {
    			if err := db.Model(&jobs[i]).Association("Stats").Find(&jobs[i].Stats); err != nil {
    				wrappedErr = append(wrappedErr, err)
    			}
    		}
    
    		if len(wrappedErr) > 0 {
    			returnErr := fmt.Errorf("errors while loading stats from database")
    			for _, err := range wrappedErr {
    				returnErr = fmt.Errorf("%w: %v", returnErr, err)
    			}
    			return returnErr
    		}
    
    		return nil
    	})
    
    	return jobs, err
    }
    
    func CreateGenericJobFromPersistence[T any](jobImport JobPersistence, runner Runnable[T]) GenericJob {
    	return &Job[T]{
    		id:         jobImport.ID,
    		priority:   jobImport.Priority,
    		timeout:    jobImport.Timeout,
    		maxRetries: jobImport.MaxRetries,
    		RetryDelay: jobImport.RetryDelay,
    		runner:     runner,
    	}
    }
    
    func CreateJobAndSchedulerFromPersistence(jobImport JobPersistence, manager *Manager) (GenericJob, Scheduler, error) {
    
    	var job GenericJob
    
    	rType := strings.ToLower(jobImport.Runnable.Type)
    
    	runnableData := make(map[string]interface{})
    	for k, v := range jobImport.Runnable.Data {
    		runnableData[strings.ToLower(k)] = v
    	}
    
    	switch rType {
    	case "dummy":
    
    		runner, err := NewDummyRunnableFromMap(runnableData)
    		if err != nil {
    			return nil, nil, err
    		}
    
    		job = CreateGenericJobFromPersistence[DummyResult](jobImport, runner)
    
    	case "counter":
    
    		runner, err := NewCounterRunnableFromMap(runnableData)
    		if err != nil {
    			return nil, nil, err
    		}
    
    		job = CreateGenericJobFromPersistence[CounterResult](jobImport, runner)
    
    	case "fileoperation":
    		runner, err := NewFileOperationRunnableFromMap(runnableData)
    		if err != nil {
    			return nil, nil, err
    		}
    
    		job = CreateGenericJobFromPersistence[FileOperationResult](jobImport, runner)
    
    	case "db":
    		runner, err := NewDBRunnableFromMap(runnableData)
    		if err != nil {
    			return nil, nil, err
    		}
    
    		job = CreateGenericJobFromPersistence[DBResult](jobImport, runner)
    
    	case "http":
    		runner, err := NewHTTPRunnableFromMap(runnableData)
    		if err != nil {
    			return nil, nil, err
    		}
    
    		job = CreateGenericJobFromPersistence[HTTPResult](jobImport, runner)
    
    	case "mail":
    		runner, err := NewMailRunnableFromMap(runnableData)
    		if err != nil {
    			return nil, nil, err
    		}
    
    		job = CreateGenericJobFromPersistence[MailResult](jobImport, runner)
    
    	case "sftp":
    		runner, err := NewSFTPRunnableFromMap(runnableData)
    		if err != nil {
    			return nil, nil, err
    		}
    
    		job = CreateGenericJobFromPersistence[SFTPResult](jobImport, runner)
    
    	case "shell":
    		runner, err := NewShellRunnableFromMap(runnableData)
    		if err != nil {
    			return nil, nil, err
    		}
    
    		job = CreateGenericJobFromPersistence[ShellResult](jobImport, runner)
    
    	default:
    		return nil, nil,
    			fmt.Errorf("%w: %s, available types: dummy, counter, fileoperation, db, http, mail, sftp, shell",
    				ErrUnknownRunnableType, rType)
    
    	}
    
    	sType := strings.ToLower(jobImport.Scheduler.Type)
    
    	scheduleData := make(map[string]interface{})
    	for k, v := range jobImport.Runnable.Data {
    		scheduleData[strings.ToLower(k)] = v
    	}
    
    	var scheduler Scheduler
    	switch sType {
    	case "interval":
    		scheduler = &IntervalScheduler{Interval: jobImport.Scheduler.Interval}
    
    	case "cron":
    		scheduler = &CronScheduler{
    			Spec: jobImport.Scheduler.Spec,
    		}
    
    		if manager != nil {
    			scheduler.(*CronScheduler).cron = manager.GetCronInstance()
    		}
    
    	case "delay":
    		scheduler = &DelayScheduler{Delay: jobImport.Scheduler.Delay}
    
    	case "event":
    		scheduler = &EventScheduler{Event: EventName(jobImport.Scheduler.Event)}
    
    	case "instant":
    		scheduler = &InstantScheduler{}
    
    	default:
    		return nil, nil, ErrUnknownSchedulerType
    	}
    
    	return job, scheduler, nil
    }
    
    // LoadJobsAndScheduleFromFile read jobs from a file and schedule them. (json/yaml)
    func LoadJobsAndScheduleFromFile(filePath string, manager *Manager) error {
    
    	var err error
    	var imp []JobPersistence
    
    	if filePath[len(filePath)-4:] == "json" {
    		imp, err = ReadJsonFile(filePath)
    	} else if filePath[len(filePath)-4:] == "yaml" {
    		imp, err = ReadYAMLFile(filePath)
    	} else {
    		return ErrUnknownFormat
    	}
    
    	if err != nil {
    		return err
    	}
    
    	for _, imp := range imp {
    		job, scheduler, err := CreateJobAndSchedulerFromPersistence(imp, manager)
    		if err != nil {
    			return err
    		}
    
    		err = manager.ScheduleJob(job, scheduler)
    		if err != nil {
    			return err
    		}
    	}
    
    	return nil
    
    }
    
    func LoadJobsAndScheduleFromDatabase(db *gorm.DB, manager *Manager) error {
    	jobs, err := ReadFromGORM(db)
    	if err != nil {
    		return err
    	}
    
    	var errs []error
    
    	for _, job := range jobs {
    		j, s, err := CreateJobAndSchedulerFromPersistence(job, manager)
    		if err != nil {
    			errs = append(errs, err)
    			continue
    
    		}
    
    		err = manager.ScheduleJob(j, s)
    		if err != nil {
    			errs = append(errs, err)
    			continue
    		}
    	}
    
    	if len(errs) > 0 {
    		returnErr := fmt.Errorf("errors while loading jobs from database")
    		for _, err := range errs {
    			returnErr = fmt.Errorf("%w: %v", returnErr, err)
    		}
    		return returnErr
    	}
    
    	return nil
    }
    
    // ImportJobsAndSchedule lädt Jobs aus einem Reader und plant sie ein.
    func ImportJobsAndSchedule(reader io.Reader, format string, manager *Manager) error {
    	var err error
    	var imp []JobPersistence
    
    	// format to lowercase
    	format = strings.ToLower(format)
    
    	if format == "json" {
    		imp, err = ReadJSON(reader)
    	} else if format == "yaml" {
    		imp, err = ReadYAML(reader)
    	} else {
    		return fmt.Errorf("%w: %s", ErrUnknownFormat, format)
    	}
    
    	if err != nil {
    		return err
    	}
    
    	for _, imp := range imp {
    		job, scheduler, err := CreateJobAndSchedulerFromPersistence(imp, manager)
    		if err != nil {
    			return err
    		}
    
    		err = manager.ScheduleJob(job, scheduler)
    		if err != nil {
    			return err
    		}
    	}
    
    	return nil
    }