Skip to content
Snippets Groups Projects
Select Git revision
  • edf339623825e442cdad411cc5edb565e4a8b09f
  • master default protected
  • 1.31
  • 4.38.8
  • 4.38.7
  • 4.38.6
  • 4.38.5
  • 4.38.4
  • 4.38.3
  • 4.38.2
  • 4.38.1
  • 4.38.0
  • 4.37.2
  • 4.37.1
  • 4.37.0
  • 4.36.0
  • 4.35.0
  • 4.34.1
  • 4.34.0
  • 4.33.1
  • 4.33.0
  • 4.32.2
  • 4.32.1
23 results

readme-standard.mk

Blame
  • import.go 4.16 KiB
    package jobqueue
    
    import (
    	"gopkg.in/yaml.v3"
    	"os"
    	"time"
    )
    
    type JobImport struct {
    	ID           string          `yaml:"id" json:"id"`
    	Priority     int             `yaml:"priority" json:"priority"`
    	Timeout      time.Duration   `yaml:"timeout" json:"timeout"`
    	MaxRetries   uint            `yaml:"maxRetries" json:"maxRetries"`
    	RetryDelay   time.Duration   `yaml:"retryDelay" json:"retryDelay"`
    	Dependencies []string        `yaml:"dependencies" json:"dependencies,omitempty"`
    	Runnable     RunnableImport  `yaml:"runnable" json:"runnable"`
    	Scheduler    SchedulerImport `yaml:"scheduler" json:"scheduler,omitempty"`
    }
    
    type RunnableImport struct {
    	Type string         `yaml:"type" json:"type"`
    	Data map[string]any `yaml:"data,omitempty" json:"data,omitempty"`
    }
    
    type SchedulerImport struct {
    	Type     string        `yaml:"type" json:"type"`
    	Interval time.Duration `yaml:"interval,omitempty" json:"interval,omitempty"`
    	Spec     string        `yaml:"spec,omitempty" json:"spec,omitempty"`
    	Delay    time.Duration `yaml:"delay,omitempty" json:"delay,omitempty"`
    	Event    string        `yaml:"event,omitempty" json:"event,omitempty"`
    }
    
    func ReadYAMLFile(filePath string) ([]JobImport, error) {
    	data, err := os.ReadFile(filePath)
    	if err != nil {
    		return nil, err
    	}
    
    	var jobs []JobImport
    	err = yaml.Unmarshal(data, &jobs)
    	if err != nil {
    		return nil, err
    	}
    
    	return jobs, nil
    }
    
    func ReadJsonFile(filePath string) ([]JobImport, error) {
    	data, err := os.ReadFile(filePath)
    	if err != nil {
    		return nil, err
    	}
    
    	var jobs []JobImport
    	err = yaml.Unmarshal(data, &jobs)
    	if err != nil {
    		return nil, err
    	}
    
    	return jobs, nil
    }
    
    func CreateJobAndSchedulerFromImport(jobImport JobImport) (GenericJob, Scheduler, error) {
    
    	var job GenericJob
    
    	switch jobImport.Runnable.Type {
    	case "Shell":
    
    		runner := &ShellRunnable{
    			ScriptPath: jobImport.Runnable.Data["ScriptPath"].(string),
    		}
    
    		job = GenericJob(&Job[ShellResult]{
    			id:         JobID(jobImport.ID),
    			priority:   Priority(jobImport.Priority),
    			timout:     jobImport.Timeout,
    			maxRetries: jobImport.MaxRetries,
    			RetryDelay: jobImport.RetryDelay,
    			runner:     runner,
    		})
    
    	case "Counter":
    		runner := &CounterRunnable{
    			Count: jobImport.Runnable.Data["Count"].(int),
    		}
    		job = GenericJob(&Job[CounterResult]{
    			id:         JobID(jobImport.ID),
    			priority:   Priority(jobImport.Priority),
    			timout:     jobImport.Timeout,
    			maxRetries: jobImport.MaxRetries,
    			RetryDelay: jobImport.RetryDelay,
    			runner:     runner,
    		})
    
    	case "HTTP":
    		runner := &HTTPRunnable{
    			URL: jobImport.Runnable.Data["URL"].(string),
    		}
    		job = GenericJob(&Job[HTTPResult]{id: JobID(jobImport.ID),
    			priority:   Priority(jobImport.Priority),
    			timout:     jobImport.Timeout,
    			maxRetries: jobImport.MaxRetries,
    			RetryDelay: jobImport.RetryDelay,
    			runner:     runner,
    		})
    
    	case "DB":
    		runner := &DBRunnable{
    			Query: jobImport.Runnable.Data["Query"].(string),
    		}
    		job = GenericJob(&Job[DBResult]{id: JobID(jobImport.ID),
    			priority:   Priority(jobImport.Priority),
    			timout:     jobImport.Timeout,
    			maxRetries: jobImport.MaxRetries,
    			RetryDelay: jobImport.RetryDelay,
    			runner:     runner,
    		})
    
    	default:
    		return nil, nil, ErrUnknownRunnableType
    	}
    
    	var scheduler Scheduler
    	switch jobImport.Scheduler.Type {
    	case "Interval":
    		scheduler = &IntervalScheduler{Interval: jobImport.Scheduler.Interval}
    
    	case "Cron":
    		scheduler = &CronScheduler{Spec: jobImport.Scheduler.Spec}
    
    	case "Delay":
    		scheduler = &DelayScheduler{Delay: jobImport.Scheduler.Delay}
    
    	case "Event":
    		scheduler = &EventScheduler{Event: EventName(jobImport.Scheduler.Event)}
    
    	default:
    		return nil, nil, ErrUnknownSchedulerType
    	}
    
    	return job, scheduler, nil
    }
    
    func LoadJobsAndSchedule(filePath string, manager *Manager) error {
    
    	var err error
    	var imp []JobImport
    
    	switch filePath[len(filePath)-4:] {
    	case "yaml":
    		imp, err = ReadJsonFile(filePath)
    		break
    	case "json":
    		imp, err = ReadYAMLFile(filePath)
    		break
    
    	}
    
    	if err != nil {
    		return err
    	}
    
    	for _, imp := range imp {
    		job, scheduler, err := CreateJobAndSchedulerFromImport(imp)
    		if err != nil {
    			return err
    		}
    
    		err = manager.ScheduleJob(job, scheduler)
    		if err != nil {
    			return err
    		}
    	}
    
    	return nil
    }