Skip to content
Snippets Groups Projects
Select Git revision
  • 61a1232e9dc34bf2fecf8b0db0151c829a9a3b28
  • master default protected
  • v1.23.2
  • v1.23.1
  • v1.23.0
  • v1.22.0
  • v1.21.1
  • v1.21.0
  • v1.20.3
  • v1.20.2
  • v1.20.1
  • v1.20.0
  • v1.19.4
  • v1.19.3
  • v1.19.2
  • v1.19.1
  • v1.19.0
  • v1.18.2
  • v1.18.1
  • v1.18.0
  • v1.17.0
  • v1.16.1
22 results

schedule-time.go

Blame
  • schedule-time.go 2.01 KiB
    // Copyright 2023 schukai GmbH
    // SPDX-License-Identifier: AGPL-3.0
    
    package jobqueue
    
    import (
    	"errors"
    	"fmt"
    	"time"
    )
    
    // TimeScheduler is a scheduler that schedules at a specific time
    type TimeScheduler struct {
    	Time     time.Time
    	jobs     map[JobID]StopChan
    	executed bool
    }
    
    func (s *TimeScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
    
    	if job == nil {
    		return ErrParameterIsNil
    	}
    
    	if s.executed {
    		return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, job.GetID())
    	}
    
    	if s.Time.Before(time.Now()) {
    		return errors.Join(ErrScheduleTimeIsInThePast, ErrInvalidTime, fmt.Errorf("time: %s", s.Time))
    	}
    
    	if s.jobs == nil {
    		s.jobs = make(map[JobID]StopChan)
    	}
    
    	id := job.GetID()
    	if id == "" {
    		return ErrJobIDEmpty
    	}
    
    	if _, ok := s.jobs[id]; ok {
    		return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
    	}
    
    	stopChan := make(StopChan)
    	s.jobs[id] = stopChan
    
    	timer := time.NewTimer(time.Until(s.Time))
    
    	go func() {
    		select {
    		case <-timer.C:
    			if !job.IsPaused() {
    				eventBus.Publish(QueueJob, job)
    				s.executed = true
    			} else {
    				timer.Stop()
    				stopChan <- true
    			}
    		case <-stopChan:
    			timer.Stop()
    			return
    		}
    	}()
    
    	return nil
    }
    
    func (s *TimeScheduler) GetType() string {
    	return "Time"
    }
    
    func (s *TimeScheduler) IsAdHoc() bool {
    	return false
    }
    
    func (s *TimeScheduler) Cancel(id JobID) error {
    	if s.jobs == nil {
    		return nil
    	}
    
    	if stopChan, ok := s.jobs[id]; ok {
    		select {
    		case stopChan <- true:
    		default:
    		}
    		delete(s.jobs, id)
    	}
    
    	return nil
    }
    
    func (s *TimeScheduler) CancelAll() error {
    	if s.jobs == nil {
    		return nil
    	}
    
    	for _, stopChan := range s.jobs {
    		select {
    		case stopChan <- true:
    		default:
    		}
    	}
    
    	s.jobs = nil
    	return nil
    }
    
    func (s *TimeScheduler) JobExists(id JobID) bool {
    	if s.jobs == nil {
    		return false
    	}
    	_, ok := s.jobs[id]
    	return ok
    }
    
    func (s *TimeScheduler) GetPersistence() SchedulerPersistence {
    	return SchedulerPersistence{
    		Type:     s.GetType(),
    		Time:     &s.Time,
    		Executed: s.executed,
    	}
    }