// Copyright 2023 schukai GmbH // SPDX-License-Identifier: AGPL-3.0 package jobqueue import "fmt" // EventScheduler is a scheduler that schedules a job when an event is received type EventScheduler struct { Event EventName EventBus *EventBus jobs map[JobID]StopChan } func (s *EventScheduler) Schedule(job GenericJob, eventBus *EventBus) error { ch := make(chan interface{}) if s.EventBus != nil { s.EventBus.Subscribe(s.Event, ch) } else { eventBus.Subscribe(s.Event, ch) } if s.jobs == nil { s.jobs = make(map[JobID]StopChan) } id := job.GetID() if _, ok := s.jobs[id]; ok { return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id) } stopChan := make(StopChan) s.jobs[id] = stopChan go func() { for { select { case <-ch: if !job.IsPaused() { Info("EventScheduler: received event %s, scheduling job %s", s.Event, id) eventBus.Publish(QueueJob, job) } case <-stopChan: eventBus.Unsubscribe(s.Event, ch) return } } }() return nil } func (s *EventScheduler) GetType() string { return "Event" } func (s *EventScheduler) IsAdHoc() bool { return false } func (s *EventScheduler) Cancel(id JobID) error { if s.jobs == nil { return nil } if stopChan, ok := s.jobs[id]; ok { select { case stopChan <- true: default: } delete(s.jobs, id) } return nil } func (s *EventScheduler) CancelAll() error { if s.jobs == nil { return nil } for _, stopChan := range s.jobs { select { case stopChan <- true: default: } } s.jobs = nil return nil } func (s *EventScheduler) JobExists(id JobID) bool { if s.jobs == nil { return false } _, ok := s.jobs[id] return ok } func (s *EventScheduler) GetPersistence() SchedulerPersistence { return SchedulerPersistence{ Type: s.GetType(), Event: s.Event, } }