// Copyright 2023 schukai GmbH // SPDX-License-Identifier: AGPL-3.0 package jobqueue import ( "fmt" "github.com/robfig/cron/v3" ) // CronScheduler is a scheduler that uses the cron library to schedule jobs type CronScheduler struct { cron *cron.Cron Spec string jobs map[JobID]cron.EntryID } func (s *CronScheduler) Schedule(job GenericJob, eventBus *EventBus) error { var err error if s.cron == nil { return ErrCronNotInitialized } if s.jobs == nil { s.jobs = make(map[JobID]cron.EntryID) } id := job.GetID() if _, ok := s.jobs[id]; ok { return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id) } entryId, err := s.cron.AddFunc(s.Spec, func() { if !job.IsPaused() { eventBus.Publish(QueueJob, job) } }) s.jobs[id] = entryId if err != nil { return err } return nil } func (s *CronScheduler) GetType() string { return "Cron" } func (s *CronScheduler) IsAdHoc() bool { return false } func (s *CronScheduler) Cancel(id JobID) error { if s.jobs == nil { return nil } if entryId, ok := s.jobs[id]; ok { s.cron.Remove(entryId) } return nil } func (s *CronScheduler) CancelAll() error { if s.jobs == nil { return nil } for _, entryId := range s.jobs { s.cron.Remove(entryId) } s.jobs = nil return nil } func (s *CronScheduler) JobExists(id JobID) bool { if s.jobs == nil { return false } _, ok := s.jobs[id] return ok } func (s *CronScheduler) GetPersistence() SchedulerPersistence { return SchedulerPersistence{ Type: s.GetType(), Spec: s.Spec, } }