Something went wrong on our end
Select Git revision
scheduler-event.go
-
Volker Schukai authoredVolker Schukai authored
scheduler-event.go 1.80 KiB
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import "fmt"
// EventScheduler is a scheduler that schedules a job when an event is received
type EventScheduler struct {
Event EventName
EventBus *EventBus
jobs map[JobID]StopChan
}
func (s *EventScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
ch := make(chan interface{})
if s.EventBus != nil {
s.EventBus.Subscribe(s.Event, ch)
} else {
eventBus.Subscribe(s.Event, ch)
}
if s.jobs == nil {
s.jobs = make(map[JobID]StopChan)
}
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
stopChan := make(StopChan)
s.jobs[id] = stopChan
go func() {
for {
select {
case <-ch:
if !job.IsPaused() {
Info("EventScheduler: received event %s, scheduling job %s", s.Event, id)
eventBus.Publish(QueueJob, job)
}
case <-stopChan:
eventBus.Unsubscribe(s.Event, ch)
return
}
}
}()
return nil
}
func (s *EventScheduler) GetType() string {
return "Event"
}
func (s *EventScheduler) IsAdHoc() bool {
return false
}
func (s *EventScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if stopChan, ok := s.jobs[id]; ok {
select {
case stopChan <- true:
default:
}
delete(s.jobs, id)
}
return nil
}
func (s *EventScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, stopChan := range s.jobs {
select {
case stopChan <- true:
default:
}
}
s.jobs = nil
return nil
}
func (s *EventScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
func (s *EventScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
Event: s.Event,
}
}