// Copyright 2023 schukai GmbH // SPDX-License-Identifier: AGPL-3.0 package jobqueue import ( "fmt" "time" ) // TimeScheduler is a scheduler that schedules at a specific time type TimeScheduler struct { Time time.Time jobs map[JobID]StopChan executed bool } func (s *TimeScheduler) Schedule(job GenericJob, eventBus *EventBus) error { if s.executed { return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, job.GetID()) } if s.Time.Before(time.Now()) { return fmt.Errorf("%w: scheduled time is in the past", ErrInvalidTime) } if s.jobs == nil { s.jobs = make(map[JobID]StopChan) } id := job.GetID() if _, ok := s.jobs[id]; ok { return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id) } stopChan := make(StopChan) s.jobs[id] = stopChan timer := time.NewTimer(s.Time.Sub(time.Now())) go func() { select { case <-timer.C: if !job.IsPaused() { eventBus.Publish(QueueJob, job) s.executed = true } else { timer.Stop() stopChan <- true } case <-stopChan: timer.Stop() return } }() return nil } func (s *TimeScheduler) GetType() string { return "Time" } func (s *TimeScheduler) IsAdHoc() bool { return false } func (s *TimeScheduler) Cancel(id JobID) error { if s.jobs == nil { return nil } if stopChan, ok := s.jobs[id]; ok { select { case stopChan <- true: default: } delete(s.jobs, id) } return nil } func (s *TimeScheduler) CancelAll() error { if s.jobs == nil { return nil } for _, stopChan := range s.jobs { select { case stopChan <- true: default: } } s.jobs = nil return nil } func (s *TimeScheduler) JobExists(id JobID) bool { if s.jobs == nil { return false } _, ok := s.jobs[id] return ok } func (s *TimeScheduler) GetPersistence() SchedulerPersistence { return SchedulerPersistence{ Type: s.GetType(), Time: &s.Time, Executed: s.executed, } }