Something went wrong on our end
Select Git revision
pnpm-lock.yaml
-
Volker Schukai authoredVolker Schukai authored
schedule-delay.go 1.56 KiB
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
"fmt"
"time"
)
// DelayScheduler is a scheduler that schedules a job after a delay
type DelayScheduler struct {
Delay time.Duration
jobs map[JobID]StopChan
}
func (s *DelayScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
if s.jobs == nil {
s.jobs = make(map[JobID]StopChan)
}
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
stopChan := make(StopChan)
s.jobs[id] = stopChan
timer := time.NewTimer(s.Delay)
go func() {
select {
case <-timer.C:
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
}
case <-stopChan:
timer.Stop()
return
}
}()
return nil
}
func (s *DelayScheduler) GetType() string {
return "Delay"
}
func (s *DelayScheduler) IsAdHoc() bool {
return true
}
func (s *DelayScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if stopChan, ok := s.jobs[id]; ok {
select {
case stopChan <- true:
default:
}
delete(s.jobs, id)
}
return nil
}
func (s *DelayScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, stopChan := range s.jobs {
select {
case stopChan <- true:
default:
}
}
s.jobs = nil
return nil
}
func (s *DelayScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
func (s *DelayScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
Delay: &s.Delay,
}
}