package jobqueue import ( "fmt" "github.com/robfig/cron/v3" "time" ) type StopChan chan bool type Scheduler interface { Schedule(job GenericJob, eventBus *EventBus) error Cancel(id JobID) error CancelAll() error JobExists(id JobID) bool GetType() string IsAdHoc() bool GetPersistence() SchedulerPersistence } type SchedulerPersistence struct { Type string `yaml:"type" json:"type" gorm:"column:type"` Interval time.Duration `yaml:"interval,omitempty" json:"interval,omitempty" gorm:"column:interval"` Spec string `yaml:"spec,omitempty" json:"spec,omitempty" gorm:"column:spec"` Delay time.Duration `yaml:"delay,omitempty" json:"delay,omitempty" gorm:"column:delay"` Event EventName `yaml:"event,omitempty" json:"event,omitempty" gorm:"column:event"` Time *time.Time `yaml:"time,omitempty" json:"time,omitempty" gorm:"column:time"` Executed bool `yaml:"executed,omitempty" json:"executed,omitempty" gorm:"column:executed"` } // IntervalScheduler is a scheduler that schedules a job at a fixed interval type IntervalScheduler struct { Interval time.Duration jobs map[JobID]StopChan } func (s *IntervalScheduler) Schedule(job GenericJob, eventBus *EventBus) error { if s.Interval <= 0 { return fmt.Errorf("invalid interval: %v", s.Interval) } if s.jobs == nil { s.jobs = make(map[JobID]StopChan) } id := job.GetID() if _, ok := s.jobs[id]; ok { return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id) } stopChan := make(StopChan) s.jobs[id] = stopChan ticker := time.NewTicker(s.Interval) go func() { for { select { case <-ticker.C: if !job.IsPaused() { eventBus.Publish(QueueJob, job) } case <-stopChan: ticker.Stop() return } } }() return nil } func (s *IntervalScheduler) GetType() string { return "Interval" } func (s *IntervalScheduler) IsAdHoc() bool { return false } func (s *IntervalScheduler) Cancel(id JobID) error { if s.jobs == nil { return nil } if stopChan, ok := s.jobs[id]; ok { select { case stopChan <- true: default: } delete(s.jobs, id) } return nil } func (s *IntervalScheduler) CancelAll() error { if s.jobs == nil { return nil } for _, stopChan := range s.jobs { select { case stopChan <- true: default: } } s.jobs = nil return nil } func (s *IntervalScheduler) JobExists(id JobID) bool { if s.jobs == nil { return false } _, ok := s.jobs[id] return ok } func (s *IntervalScheduler) GetPersistence() SchedulerPersistence { return SchedulerPersistence{ Type: s.GetType(), Interval: s.Interval, } } // CronScheduler is a scheduler that uses the cron library to schedule jobs type CronScheduler struct { cron *cron.Cron Spec string jobs map[JobID]cron.EntryID } func (s *CronScheduler) Schedule(job GenericJob, eventBus *EventBus) error { var err error if s.cron == nil { return ErrCronNotInitialized } if s.jobs == nil { s.jobs = make(map[JobID]cron.EntryID) } id := job.GetID() if _, ok := s.jobs[id]; ok { return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id) } entryId, err := s.cron.AddFunc(s.Spec, func() { if !job.IsPaused() { eventBus.Publish(QueueJob, job) } }) s.jobs[id] = entryId if err != nil { return err } return nil } func (s *CronScheduler) GetType() string { return "Cron" } func (s *CronScheduler) IsAdHoc() bool { return false } func (s *CronScheduler) Cancel(id JobID) error { if s.jobs == nil { return nil } if entryId, ok := s.jobs[id]; ok { s.cron.Remove(entryId) } return nil } func (s *CronScheduler) CancelAll() error { if s.jobs == nil { return nil } for _, entryId := range s.jobs { s.cron.Remove(entryId) } s.jobs = nil return nil } func (s *CronScheduler) JobExists(id JobID) bool { if s.jobs == nil { return false } _, ok := s.jobs[id] return ok } func (s *CronScheduler) GetPersistence() SchedulerPersistence { return SchedulerPersistence{ Type: s.GetType(), Spec: s.Spec, } } // DelayScheduler is a scheduler that schedules a job after a delay type DelayScheduler struct { Delay time.Duration jobs map[JobID]StopChan } func (s *DelayScheduler) Schedule(job GenericJob, eventBus *EventBus) error { if s.jobs == nil { s.jobs = make(map[JobID]StopChan) } id := job.GetID() if _, ok := s.jobs[id]; ok { return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id) } stopChan := make(StopChan) s.jobs[id] = stopChan timer := time.NewTimer(s.Delay) go func() { select { case <-timer.C: if !job.IsPaused() { eventBus.Publish(QueueJob, job) } case <-stopChan: timer.Stop() return } }() return nil } func (s *DelayScheduler) GetType() string { return "Delay" } func (s *DelayScheduler) IsAdHoc() bool { return true } func (s *DelayScheduler) Cancel(id JobID) error { if s.jobs == nil { return nil } if stopChan, ok := s.jobs[id]; ok { select { case stopChan <- true: default: } delete(s.jobs, id) } return nil } func (s *DelayScheduler) CancelAll() error { if s.jobs == nil { return nil } for _, stopChan := range s.jobs { select { case stopChan <- true: default: } } s.jobs = nil return nil } func (s *DelayScheduler) JobExists(id JobID) bool { if s.jobs == nil { return false } _, ok := s.jobs[id] return ok } func (s *DelayScheduler) GetPersistence() SchedulerPersistence { return SchedulerPersistence{ Type: s.GetType(), Delay: s.Delay, } } // TimeScheduler is a scheduler that schedules at a specific time type TimeScheduler struct { Time time.Time jobs map[JobID]StopChan executed bool } func (s *TimeScheduler) Schedule(job GenericJob, eventBus *EventBus) error { if s.executed { return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, job.GetID()) } if s.Time.Before(time.Now()) { return fmt.Errorf("%w: scheduled time is in the past", ErrInvalidTime) } if s.jobs == nil { s.jobs = make(map[JobID]StopChan) } id := job.GetID() if _, ok := s.jobs[id]; ok { return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id) } stopChan := make(StopChan) s.jobs[id] = stopChan timer := time.NewTimer(s.Time.Sub(time.Now())) go func() { select { case <-timer.C: if !job.IsPaused() { eventBus.Publish(QueueJob, job) s.executed = true } else { timer.Stop() stopChan <- true } case <-stopChan: timer.Stop() return } }() return nil } func (s *TimeScheduler) GetType() string { return "Time" } func (s *TimeScheduler) IsAdHoc() bool { return false } func (s *TimeScheduler) Cancel(id JobID) error { if s.jobs == nil { return nil } if stopChan, ok := s.jobs[id]; ok { select { case stopChan <- true: default: } delete(s.jobs, id) } return nil } func (s *TimeScheduler) CancelAll() error { if s.jobs == nil { return nil } for _, stopChan := range s.jobs { select { case stopChan <- true: default: } } s.jobs = nil return nil } func (s *TimeScheduler) JobExists(id JobID) bool { if s.jobs == nil { return false } _, ok := s.jobs[id] return ok } func (s *TimeScheduler) GetPersistence() SchedulerPersistence { return SchedulerPersistence{ Type: s.GetType(), Time: &s.Time, Executed: s.executed, } } // EventScheduler is a scheduler that schedules a job when an event is received type EventScheduler struct { Event EventName EventBus *EventBus jobs map[JobID]StopChan } func (s *EventScheduler) Schedule(job GenericJob, eventBus *EventBus) error { ch := make(chan interface{}) if s.EventBus != nil { s.EventBus.Subscribe(s.Event, ch) } else { eventBus.Subscribe(s.Event, ch) } if s.jobs == nil { s.jobs = make(map[JobID]StopChan) } id := job.GetID() if _, ok := s.jobs[id]; ok { return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id) } stopChan := make(StopChan) s.jobs[id] = stopChan go func() { for { select { case <-ch: if !job.IsPaused() { eventBus.Publish(QueueJob, job) } case <-stopChan: eventBus.Unsubscribe(s.Event, ch) return } } }() return nil } func (s *EventScheduler) GetType() string { return "Event" } func (s *EventScheduler) IsAdHoc() bool { return false } func (s *EventScheduler) Cancel(id JobID) error { if s.jobs == nil { return nil } if stopChan, ok := s.jobs[id]; ok { select { case stopChan <- true: default: } delete(s.jobs, id) } return nil } func (s *EventScheduler) CancelAll() error { if s.jobs == nil { return nil } for _, stopChan := range s.jobs { select { case stopChan <- true: default: } } s.jobs = nil return nil } func (s *EventScheduler) JobExists(id JobID) bool { if s.jobs == nil { return false } _, ok := s.jobs[id] return ok } func (s *EventScheduler) GetPersistence() SchedulerPersistence { return SchedulerPersistence{ Type: s.GetType(), Event: s.Event, } } // InstantScheduler is a scheduler that schedules a job instantly type InstantScheduler struct{} func (s *InstantScheduler) Schedule(job GenericJob, eventBus *EventBus) error { if !job.IsPaused() { eventBus.Publish(QueueJob, job) } return nil } func (s *InstantScheduler) GetType() string { return "Instant" } func (s *InstantScheduler) IsAdHoc() bool { return true } func (s *InstantScheduler) Cancel(id JobID) error { return nil } func (s *InstantScheduler) CancelAll() error { return nil } func (s *InstantScheduler) JobExists(id JobID) bool { return false } func (s *InstantScheduler) GetPersistence() SchedulerPersistence { return SchedulerPersistence{ Type: s.GetType(), } }