Select Git revision
readme-standard.mk
import.go 4.16 KiB
package jobqueue
import (
"gopkg.in/yaml.v3"
"os"
"time"
)
type JobImport struct {
ID string `yaml:"id" json:"id"`
Priority int `yaml:"priority" json:"priority"`
Timeout time.Duration `yaml:"timeout" json:"timeout"`
MaxRetries uint `yaml:"maxRetries" json:"maxRetries"`
RetryDelay time.Duration `yaml:"retryDelay" json:"retryDelay"`
Dependencies []string `yaml:"dependencies" json:"dependencies,omitempty"`
Runnable RunnableImport `yaml:"runnable" json:"runnable"`
Scheduler SchedulerImport `yaml:"scheduler" json:"scheduler,omitempty"`
}
type RunnableImport struct {
Type string `yaml:"type" json:"type"`
Data map[string]any `yaml:"data,omitempty" json:"data,omitempty"`
}
type SchedulerImport struct {
Type string `yaml:"type" json:"type"`
Interval time.Duration `yaml:"interval,omitempty" json:"interval,omitempty"`
Spec string `yaml:"spec,omitempty" json:"spec,omitempty"`
Delay time.Duration `yaml:"delay,omitempty" json:"delay,omitempty"`
Event string `yaml:"event,omitempty" json:"event,omitempty"`
}
func ReadYAMLFile(filePath string) ([]JobImport, error) {
data, err := os.ReadFile(filePath)
if err != nil {
return nil, err
}
var jobs []JobImport
err = yaml.Unmarshal(data, &jobs)
if err != nil {
return nil, err
}
return jobs, nil
}
func ReadJsonFile(filePath string) ([]JobImport, error) {
data, err := os.ReadFile(filePath)
if err != nil {
return nil, err
}
var jobs []JobImport
err = yaml.Unmarshal(data, &jobs)
if err != nil {
return nil, err
}
return jobs, nil
}
func CreateJobAndSchedulerFromImport(jobImport JobImport) (GenericJob, Scheduler, error) {
var job GenericJob
switch jobImport.Runnable.Type {
case "Shell":
runner := &ShellRunnable{
ScriptPath: jobImport.Runnable.Data["ScriptPath"].(string),
}
job = GenericJob(&Job[ShellResult]{
id: JobID(jobImport.ID),
priority: Priority(jobImport.Priority),
timout: jobImport.Timeout,
maxRetries: jobImport.MaxRetries,
RetryDelay: jobImport.RetryDelay,
runner: runner,
})
case "Counter":
runner := &CounterRunnable{
Count: jobImport.Runnable.Data["Count"].(int),
}
job = GenericJob(&Job[CounterResult]{
id: JobID(jobImport.ID),
priority: Priority(jobImport.Priority),
timout: jobImport.Timeout,
maxRetries: jobImport.MaxRetries,
RetryDelay: jobImport.RetryDelay,
runner: runner,
})
case "HTTP":
runner := &HTTPRunnable{
URL: jobImport.Runnable.Data["URL"].(string),
}
job = GenericJob(&Job[HTTPResult]{id: JobID(jobImport.ID),
priority: Priority(jobImport.Priority),
timout: jobImport.Timeout,
maxRetries: jobImport.MaxRetries,
RetryDelay: jobImport.RetryDelay,
runner: runner,
})
case "DB":
runner := &DBRunnable{
Query: jobImport.Runnable.Data["Query"].(string),
}
job = GenericJob(&Job[DBResult]{id: JobID(jobImport.ID),
priority: Priority(jobImport.Priority),
timout: jobImport.Timeout,
maxRetries: jobImport.MaxRetries,
RetryDelay: jobImport.RetryDelay,
runner: runner,
})
default:
return nil, nil, ErrUnknownRunnableType
}
var scheduler Scheduler
switch jobImport.Scheduler.Type {
case "Interval":
scheduler = &IntervalScheduler{Interval: jobImport.Scheduler.Interval}
case "Cron":
scheduler = &CronScheduler{Spec: jobImport.Scheduler.Spec}
case "Delay":
scheduler = &DelayScheduler{Delay: jobImport.Scheduler.Delay}
case "Event":
scheduler = &EventScheduler{Event: EventName(jobImport.Scheduler.Event)}
default:
return nil, nil, ErrUnknownSchedulerType
}
return job, scheduler, nil
}
func LoadJobsAndSchedule(filePath string, manager *Manager) error {
var err error
var imp []JobImport
switch filePath[len(filePath)-4:] {
case "yaml":
imp, err = ReadJsonFile(filePath)
break
case "json":
imp, err = ReadYAMLFile(filePath)
break
}
if err != nil {
return err
}
for _, imp := range imp {
job, scheduler, err := CreateJobAndSchedulerFromImport(imp)
if err != nil {
return err
}
err = manager.ScheduleJob(job, scheduler)
if err != nil {
return err
}
}
return nil
}