// Copyright 2023 schukai GmbH // SPDX-License-Identifier: AGPL-3.0 package jobqueue import ( "fmt" "time" ) // IntervalScheduler is a scheduler that schedules a job at a fixed interval type IntervalScheduler struct { Interval time.Duration jobs map[JobID]StopChan } func (s *IntervalScheduler) Schedule(job GenericJob, eventBus *EventBus) error { if s.Interval <= 0 { return fmt.Errorf("invalid interval: %v", s.Interval) } if s.jobs == nil { s.jobs = make(map[JobID]StopChan) } id := job.GetID() if _, ok := s.jobs[id]; ok { return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id) } stopChan := make(StopChan) s.jobs[id] = stopChan ticker := time.NewTicker(s.Interval) go func() { for { select { case <-ticker.C: if !job.IsPaused() { eventBus.Publish(QueueJob, job) } case <-stopChan: ticker.Stop() return } } }() return nil } func (s *IntervalScheduler) GetType() string { return "Interval" } func (s *IntervalScheduler) IsAdHoc() bool { return false } func (s *IntervalScheduler) Cancel(id JobID) error { if s.jobs == nil { return nil } if stopChan, ok := s.jobs[id]; ok { select { case stopChan <- true: default: } delete(s.jobs, id) } return nil } func (s *IntervalScheduler) CancelAll() error { if s.jobs == nil { return nil } for _, stopChan := range s.jobs { select { case stopChan <- true: default: } } s.jobs = nil return nil } func (s *IntervalScheduler) JobExists(id JobID) bool { if s.jobs == nil { return false } _, ok := s.jobs[id] return ok } func (s *IntervalScheduler) GetPersistence() SchedulerPersistence { return SchedulerPersistence{ Type: s.GetType(), Interval: s.Interval, } }