// Copyright 2023 schukai GmbH // SPDX-License-Identifier: AGPL-3.0 package jobqueue import ( "fmt" "time" ) // DelayScheduler is a scheduler that schedules a job after a delay type DelayScheduler struct { Delay time.Duration jobs map[JobID]StopChan } func (s *DelayScheduler) Schedule(job GenericJob, eventBus *EventBus) error { if s.jobs == nil { s.jobs = make(map[JobID]StopChan) } id := job.GetID() if _, ok := s.jobs[id]; ok { return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id) } stopChan := make(StopChan) s.jobs[id] = stopChan timer := time.NewTimer(s.Delay) go func() { select { case <-timer.C: if !job.IsPaused() { eventBus.Publish(QueueJob, job) } case <-stopChan: timer.Stop() return } }() return nil } func (s *DelayScheduler) GetType() string { return "Delay" } func (s *DelayScheduler) IsAdHoc() bool { return true } func (s *DelayScheduler) Cancel(id JobID) error { if s.jobs == nil { return nil } if stopChan, ok := s.jobs[id]; ok { select { case stopChan <- true: default: } delete(s.jobs, id) } return nil } func (s *DelayScheduler) CancelAll() error { if s.jobs == nil { return nil } for _, stopChan := range s.jobs { select { case stopChan <- true: default: } } s.jobs = nil return nil } func (s *DelayScheduler) JobExists(id JobID) bool { if s.jobs == nil { return false } _, ok := s.jobs[id] return ok } func (s *DelayScheduler) GetPersistence() SchedulerPersistence { return SchedulerPersistence{ Type: s.GetType(), Delay: &s.Delay, } }