Something went wrong on our end
Select Git revision
monster.mjs
-
Volker Schukai authoredVolker Schukai authored
scheduler.go 5.51 KiB
package jobqueue
import (
"fmt"
"github.com/robfig/cron/v3"
"time"
)
type StopChan chan bool
type Scheduler interface {
Schedule(job GenericJob, eventBus *EventBus) error
Cancel(id JobID) error
CancelAll() error
JobExists(id JobID) bool
GetType() string
}
// IntervalScheduler is a scheduler that schedules a job at a fixed interval
type IntervalScheduler struct {
Interval time.Duration
jobs map[JobID]StopChan
}
func (s *IntervalScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
if s.Interval <= 0 {
return fmt.Errorf("invalid interval: %v", s.Interval)
}
if s.jobs == nil {
s.jobs = make(map[JobID]StopChan)
}
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("job %s already scheduled", id)
}
stopChan := make(StopChan)
s.jobs[id] = stopChan
ticker := time.NewTicker(s.Interval)
go func() {
for {
select {
case <-ticker.C:
eventBus.Publish(QueueJob, job)
case <-stopChan:
ticker.Stop()
return
}
}
}()
return nil
}
func (s *IntervalScheduler) GetType() string {
return "Interval"
}
func (s *IntervalScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if stopChan, ok := s.jobs[id]; ok {
stopChan <- true
delete(s.jobs, id)
}
return nil
}
func (s *IntervalScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, stopChan := range s.jobs {
stopChan <- true
}
s.jobs = nil
return nil
}
func (s *IntervalScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
// CronScheduler is a scheduler that uses the cron library to schedule jobs
type CronScheduler struct {
cron *cron.Cron
Spec string
jobs map[JobID]cron.EntryID
}
func (s *CronScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
var err error
if s.cron == nil {
return ErrCronNotInitialized
}
if s.jobs == nil {
s.jobs = make(map[JobID]cron.EntryID)
}
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("job %s already scheduled", id)
}
entryId, err := s.cron.AddFunc(s.Spec, func() {
eventBus.Publish(QueueJob, job)
})
s.jobs[id] = entryId
if err != nil {
return err
}
s.cron.Start()
return nil
}
func (s *CronScheduler) GetType() string {
return "Cron"
}
func (s *CronScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if entryId, ok := s.jobs[id]; ok {
s.cron.Remove(entryId)
}
return nil
}
func (s *CronScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, entryId := range s.jobs {
s.cron.Remove(entryId)
}
s.jobs = nil
return nil
}
func (s *CronScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
// DelayScheduler is a scheduler that schedules a job after a delay
type DelayScheduler struct {
Delay time.Duration
jobs map[JobID]StopChan
}
func (s *DelayScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
timer := time.NewTimer(s.Delay)
if s.jobs == nil {
s.jobs = make(map[JobID]StopChan)
}
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("job %s already scheduled", id)
}
stopChan := make(StopChan)
s.jobs[id] = stopChan
go func() {
select {
case <-timer.C:
eventBus.Publish(QueueJob, job)
case <-stopChan:
timer.Stop()
}
}()
return nil
}
func (s *DelayScheduler) GetType() string {
return "Delay"
}
func (s *DelayScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if stopChan, ok := s.jobs[id]; ok {
stopChan <- true
delete(s.jobs, id)
}
return nil
}
func (s *DelayScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, stopChan := range s.jobs {
stopChan <- true
}
s.jobs = nil
return nil
}
func (s *DelayScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
// EventScheduler is a scheduler that schedules a job when an event is received
type EventScheduler struct {
Event EventName
jobs map[JobID]StopChan
}
func (s *EventScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
ch := make(chan interface{})
eventBus.Subscribe(s.Event, ch)
if s.jobs == nil {
s.jobs = make(map[JobID]StopChan)
}
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("job %s already scheduled", id)
}
stopChan := make(StopChan)
s.jobs[id] = stopChan
go func() {
for {
select {
case <-ch:
eventBus.Publish(QueueJob, job)
case <-stopChan:
eventBus.Unsubscribe(s.Event, ch)
return
}
}
}()
return nil
}
func (s *EventScheduler) GetType() string {
return "Event"
}
func (s *EventScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if stopChan, ok := s.jobs[id]; ok {
stopChan <- true
delete(s.jobs, id)
}
return nil
}
func (s *EventScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, stopChan := range s.jobs {
stopChan <- true
}
s.jobs = nil
return nil
}
func (s *EventScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
// InstantScheduler is a scheduler that schedules a job instantly
type InstantScheduler struct{}
func (s *InstantScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
eventBus.Publish(QueueJob, job)
return nil
}
func (s *InstantScheduler) GetType() string {
return "Instant"
}
func (s *InstantScheduler) Cancel(id JobID) error {
return nil
}
func (s *InstantScheduler) CancelAll() error {
return nil
}
func (s *InstantScheduler) JobExists(id JobID) bool {
return false
}