Something went wrong on our end
Select Git revision
schedule-time.go
-
Volker Schukai authoredVolker Schukai authored
schedule-time.go 2.01 KiB
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
"errors"
"fmt"
"time"
)
// TimeScheduler is a scheduler that schedules at a specific time
type TimeScheduler struct {
Time time.Time
jobs map[JobID]StopChan
executed bool
}
func (s *TimeScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
if job == nil {
return ErrParameterIsNil
}
if s.executed {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, job.GetID())
}
if s.Time.Before(time.Now()) {
return errors.Join(ErrScheduleTimeIsInThePast, ErrInvalidTime, fmt.Errorf("time: %s", s.Time))
}
if s.jobs == nil {
s.jobs = make(map[JobID]StopChan)
}
id := job.GetID()
if id == "" {
return ErrJobIDEmpty
}
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
stopChan := make(StopChan)
s.jobs[id] = stopChan
timer := time.NewTimer(time.Until(s.Time))
go func() {
select {
case <-timer.C:
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
s.executed = true
} else {
timer.Stop()
stopChan <- true
}
case <-stopChan:
timer.Stop()
return
}
}()
return nil
}
func (s *TimeScheduler) GetType() string {
return "Time"
}
func (s *TimeScheduler) IsAdHoc() bool {
return false
}
func (s *TimeScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if stopChan, ok := s.jobs[id]; ok {
select {
case stopChan <- true:
default:
}
delete(s.jobs, id)
}
return nil
}
func (s *TimeScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, stopChan := range s.jobs {
select {
case stopChan <- true:
default:
}
}
s.jobs = nil
return nil
}
func (s *TimeScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
func (s *TimeScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
Time: &s.Time,
Executed: s.executed,
}
}