Skip to content
Snippets Groups Projects
Select Git revision
  • 5c57209d0133b83fef0f4e0cce371b7e04eb783c
  • master default protected
  • 1.31
  • 4.28.0
  • 4.27.0
  • 4.26.0
  • 4.25.5
  • 4.25.4
  • 4.25.3
  • 4.25.2
  • 4.25.1
  • 4.25.0
  • 4.24.3
  • 4.24.2
  • 4.24.1
  • 4.24.0
  • 4.23.6
  • 4.23.5
  • 4.23.4
  • 4.23.3
  • 4.23.2
  • 4.23.1
  • 4.23.0
23 results

monster.mjs

Blame
  • scheduler.go 5.51 KiB
    package jobqueue
    
    import (
    	"fmt"
    	"github.com/robfig/cron/v3"
    	"time"
    )
    
    type StopChan chan bool
    
    type Scheduler interface {
    	Schedule(job GenericJob, eventBus *EventBus) error
    	Cancel(id JobID) error
    	CancelAll() error
    	JobExists(id JobID) bool
    
    	GetType() string
    }
    
    // IntervalScheduler is a scheduler that schedules a job at a fixed interval
    type IntervalScheduler struct {
    	Interval time.Duration
    	jobs     map[JobID]StopChan
    }
    
    func (s *IntervalScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
    
    	if s.Interval <= 0 {
    		return fmt.Errorf("invalid interval: %v", s.Interval)
    	}
    
    	if s.jobs == nil {
    		s.jobs = make(map[JobID]StopChan)
    	}
    
    	id := job.GetID()
    	if _, ok := s.jobs[id]; ok {
    		return fmt.Errorf("job %s already scheduled", id)
    	}
    
    	stopChan := make(StopChan)
    	s.jobs[id] = stopChan
    
    	ticker := time.NewTicker(s.Interval)
    	go func() {
    		for {
    			select {
    			case <-ticker.C:
    				eventBus.Publish(QueueJob, job)
    			case <-stopChan:
    				ticker.Stop()
    				return
    			}
    		}
    	}()
    
    	return nil
    }
    
    func (s *IntervalScheduler) GetType() string {
    	return "Interval"
    }
    
    func (s *IntervalScheduler) Cancel(id JobID) error {
    	if s.jobs == nil {
    		return nil
    	}
    
    	if stopChan, ok := s.jobs[id]; ok {
    		stopChan <- true
    		delete(s.jobs, id)
    	}
    
    	return nil
    }
    
    func (s *IntervalScheduler) CancelAll() error {
    	if s.jobs == nil {
    		return nil
    	}
    
    	for _, stopChan := range s.jobs {
    		stopChan <- true
    	}
    
    	s.jobs = nil
    	return nil
    }
    
    func (s *IntervalScheduler) JobExists(id JobID) bool {
    	if s.jobs == nil {
    		return false
    	}
    
    	_, ok := s.jobs[id]
    	return ok
    }
    
    // CronScheduler is a scheduler that uses the cron library to schedule jobs
    type CronScheduler struct {
    	cron *cron.Cron
    	Spec string
    	jobs map[JobID]cron.EntryID
    }
    
    func (s *CronScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
    	var err error
    
    	if s.cron == nil {
    		return ErrCronNotInitialized
    	}
    
    	if s.jobs == nil {
    		s.jobs = make(map[JobID]cron.EntryID)
    	}
    
    	id := job.GetID()
    	if _, ok := s.jobs[id]; ok {
    		return fmt.Errorf("job %s already scheduled", id)
    	}
    
    	entryId, err := s.cron.AddFunc(s.Spec, func() {
    		eventBus.Publish(QueueJob, job)
    	})
    
    	s.jobs[id] = entryId
    
    	if err != nil {
    		return err
    	}
    
    	s.cron.Start()
    	return nil
    }
    
    func (s *CronScheduler) GetType() string {
    	return "Cron"
    }
    
    func (s *CronScheduler) Cancel(id JobID) error {
    
    	if s.jobs == nil {
    		return nil
    	}
    
    	if entryId, ok := s.jobs[id]; ok {
    		s.cron.Remove(entryId)
    	}
    
    	return nil
    }
    
    func (s *CronScheduler) CancelAll() error {
    	if s.jobs == nil {
    		return nil
    	}
    
    	for _, entryId := range s.jobs {
    		s.cron.Remove(entryId)
    	}
    
    	s.jobs = nil
    	return nil
    }
    
    func (s *CronScheduler) JobExists(id JobID) bool {
    	if s.jobs == nil {
    		return false
    	}
    
    	_, ok := s.jobs[id]
    	return ok
    }
    
    // DelayScheduler is a scheduler that schedules a job after a delay
    type DelayScheduler struct {
    	Delay time.Duration
    	jobs  map[JobID]StopChan
    }
    
    func (s *DelayScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
    	timer := time.NewTimer(s.Delay)
    
    	if s.jobs == nil {
    		s.jobs = make(map[JobID]StopChan)
    	}
    
    	id := job.GetID()
    	if _, ok := s.jobs[id]; ok {
    		return fmt.Errorf("job %s already scheduled", id)
    	}
    
    	stopChan := make(StopChan)
    	s.jobs[id] = stopChan
    
    	go func() {
    		select {
    		case <-timer.C:
    			eventBus.Publish(QueueJob, job)
    		case <-stopChan:
    			timer.Stop()
    		}
    	}()
    	return nil
    }
    
    func (s *DelayScheduler) GetType() string {
    	return "Delay"
    }
    
    func (s *DelayScheduler) Cancel(id JobID) error {
    	if s.jobs == nil {
    		return nil
    	}
    
    	if stopChan, ok := s.jobs[id]; ok {
    		stopChan <- true
    		delete(s.jobs, id)
    	}
    
    	return nil
    }
    
    func (s *DelayScheduler) CancelAll() error {
    	if s.jobs == nil {
    		return nil
    	}
    
    	for _, stopChan := range s.jobs {
    		stopChan <- true
    	}
    
    	s.jobs = nil
    	return nil
    }
    
    func (s *DelayScheduler) JobExists(id JobID) bool {
    	if s.jobs == nil {
    		return false
    	}
    
    	_, ok := s.jobs[id]
    	return ok
    }
    
    // EventScheduler is a scheduler that schedules a job when an event is received
    type EventScheduler struct {
    	Event EventName
    	jobs  map[JobID]StopChan
    }
    
    func (s *EventScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
    	ch := make(chan interface{})
    	eventBus.Subscribe(s.Event, ch)
    
    	if s.jobs == nil {
    		s.jobs = make(map[JobID]StopChan)
    	}
    
    	id := job.GetID()
    	if _, ok := s.jobs[id]; ok {
    		return fmt.Errorf("job %s already scheduled", id)
    	}
    
    	stopChan := make(StopChan)
    	s.jobs[id] = stopChan
    
    	go func() {
    		for {
    			select {
    			case <-ch:
    				eventBus.Publish(QueueJob, job)
    			case <-stopChan:
    				eventBus.Unsubscribe(s.Event, ch)
    				return
    			}
    		}
    	}()
    	return nil
    }
    
    func (s *EventScheduler) GetType() string {
    	return "Event"
    }
    
    func (s *EventScheduler) Cancel(id JobID) error {
    	if s.jobs == nil {
    		return nil
    	}
    
    	if stopChan, ok := s.jobs[id]; ok {
    		stopChan <- true
    		delete(s.jobs, id)
    	}
    
    	return nil
    
    }
    
    func (s *EventScheduler) CancelAll() error {
    	if s.jobs == nil {
    		return nil
    	}
    
    	for _, stopChan := range s.jobs {
    		stopChan <- true
    	}
    
    	s.jobs = nil
    	return nil
    }
    
    func (s *EventScheduler) JobExists(id JobID) bool {
    	if s.jobs == nil {
    		return false
    	}
    
    	_, ok := s.jobs[id]
    	return ok
    }
    
    // InstantScheduler is a scheduler that schedules a job instantly
    type InstantScheduler struct{}
    
    func (s *InstantScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
    	eventBus.Publish(QueueJob, job)
    	return nil
    }
    
    func (s *InstantScheduler) GetType() string {
    	return "Instant"
    }
    
    func (s *InstantScheduler) Cancel(id JobID) error {
    	return nil
    }
    
    func (s *InstantScheduler) CancelAll() error {
    	return nil
    }
    
    func (s *InstantScheduler) JobExists(id JobID) bool {
    	return false
    }