Skip to content
Snippets Groups Projects
Select Git revision
  • master default protected
  • v1.23.2
  • v1.23.1
  • v1.23.0
  • v1.22.0
  • v1.21.1
  • v1.21.0
  • v1.20.3
  • v1.20.2
  • v1.20.1
  • v1.20.0
  • v1.19.4
  • v1.19.3
  • v1.19.2
  • v1.19.1
  • v1.19.0
  • v1.18.2
  • v1.18.1
  • v1.18.0
  • v1.17.0
  • v1.16.1
21 results

manager.go

Blame
  • manager.go 12.71 KiB
    // Copyright 2023 schukai GmbH
    // SPDX-License-Identifier: AGPL-3.0
    
    package jobqueue
    
    import (
    	"errors"
    	"fmt"
    	"github.com/fsnotify/fsnotify"
    	"github.com/robfig/cron/v3"
    	"gorm.io/gorm"
    	"sync"
    	"time"
    )
    
    type ManagerState int
    
    const (
    	ManagerStateStopped = iota
    	ManagerStateRunning
    )
    
    type Manager struct {
    	state ManagerState
    
    	queue     *Queue
    	workerMap map[WorkerID]Worker
    	eventBus  *EventBus
    
    	activeJobs map[JobID]GenericJob
    
    	jobEventCh chan interface{}
    
    	cronInstance *cron.Cron
    	//logger       Logger
    
    	database  *gorm.DB
    	jobSyncer *JobSyncer
    
    	mu sync.Mutex
    }
    
    // NewManager initializes a new Manager
    func NewManager() *Manager {
    
    	eventBus := NewEventBus()
    
    	mng := &Manager{
    		state:      ManagerStateStopped,
    		queue:      NewQueue(eventBus),
    		workerMap:  make(map[WorkerID]Worker),
    		eventBus:   eventBus,
    		activeJobs: make(map[JobID]GenericJob),
    	}
    
    	return mng
    
    }
    
    // GetEventBus returns the event bus
    func (m *Manager) GetEventBus() *EventBus {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    	return m.eventBus
    }
    
    // SetCronInstance sets the cron instance
    func (m *Manager) SetCronInstance(cronInstance *cron.Cron) *Manager {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    	m.cronInstance = cronInstance
    	return m
    }
    
    // GetCronInstance returns the cron instance
    func (m *Manager) GetCronInstance() *cron.Cron {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    	return m.cronInstance
    }
    
    // NewCronScheduler creates a new cron scheduler
    func (m *Manager) NewCronScheduler(spec string) *CronScheduler {
    	return &CronScheduler{
    		Spec: spec,
    		cron: m.GetCronInstance(),
    	}
    }
    
    // NewInstantScheduler creates a new instant scheduler
    func (m *Manager) NewInstantScheduler() *InstantScheduler {
    	return &InstantScheduler{}
    }
    
    // NewIntervalScheduler creates a new interval scheduler
    func (m *Manager) NewIntervalScheduler(interval time.Duration) *IntervalScheduler {
    	return &IntervalScheduler{
    		Interval: interval,
    	}
    }
    
    // NewDelayScheduler creates a new delay scheduler
    func (m *Manager) NewDelayScheduler(delay time.Duration) *DelayScheduler {
    	return &DelayScheduler{
    		Delay: delay,
    	}
    }
    
    // NewEventScheduler creates a new event scheduler
    func (m *Manager) NewEventScheduler(event EventName) *EventScheduler {
    	return &EventScheduler{
    		Event: event,
    	}
    }
    
    // NewTimeScheduler creates a new time scheduler
    func (m *Manager) NewTimeScheduler(t time.Time) *TimeScheduler {
    	return &TimeScheduler{
    		Time: t,
    	}
    }
    
    // NewInotifyScheduler creates a new inotify scheduler
    func (m *Manager) NewInotifyScheduler(path string, eventFlags fsnotify.Op) *InotifyScheduler {
    	return &InotifyScheduler{
    		Path:       path,
    		EventFlags: eventFlags,
    	}
    }
    
    // GetActiveJobs returns the active jobs
    func (m *Manager) GetActiveJobs() map[JobID]GenericJob {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    	return m.activeJobs
    }
    
    // DeleteJob removes a job from the active jobs and the database
    func (m *Manager) DeleteJob(id JobID) error {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    
    	var job GenericJob
    	var ok bool
    
    	if job, ok = m.activeJobs[id]; !ok {
    		return ErrJobNotActive
    	}
    
    	err := m.removeJobInternal(id)
    	if err != nil {
    		return err
    	}
    
    	if m.jobSyncer != nil {
    
    		err := m.jobSyncer.DeleteJob(job)
    		if err != nil {
    			return err
    		}
    	}
    
    	return nil
    
    }
    
    // RemoveJob removes a job from the active jobs
    // If you want to remove a job from the active jobs and the database, use DeleteJob instead
    func (m *Manager) RemoveJob(id JobID) error {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    
    	if _, ok := m.activeJobs[id]; !ok {
    		return ErrJobNotActive
    	}
    
    	return m.removeJobInternal(id)
    }
    
    // ResetJobLogs deletes the logs of a job
    func (m *Manager) ResetJobLogs(id JobID) error {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    
    	if _, ok := m.activeJobs[id]; !ok {
    		return ErrJobNotActive
    	}
    
    	if m.jobSyncer != nil {
    
    		err := m.jobSyncer.ResetLogs(m.activeJobs[id])
    		if err != nil {
    			return err
    		}
    	}
    
    	return nil
    }
    
    // ResetJobStats deletes the stats of a job
    func (m *Manager) ResetJobStats(id JobID) error {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    
    	if _, ok := m.activeJobs[id]; !ok {
    		return ErrJobNotActive
    	}
    
    	if m.jobSyncer != nil {
    
    		err := m.jobSyncer.ResetStats(m.activeJobs[id])
    		if err != nil {
    			return err
    		}
    	}
    
    	return nil
    }
    
    type CancelScheduler interface {
    	Cancel(id JobID) error
    }
    
    func cancelSchedulerByManager(scheduler CancelScheduler, id JobID) error {
    
    	err := scheduler.Cancel(id)
    	if err != nil {
    		return err
    	}
    
    	return nil
    
    }
    
    func (m *Manager) removeJobInternal(id JobID) error {
    
    	scheduler := m.activeJobs[id].GetScheduler()
    	if scheduler == nil {
    		return ErrJobNotScheduled
    	}
    
    	if cancelScheduler, ok := scheduler.(CancelScheduler); ok {
    		err := cancelSchedulerByManager(cancelScheduler, id)
    		if err != nil {
    			return err
    		}
    	} else {
    		return ErrUnknownScheduleType
    	}
    
    	delete(m.activeJobs, id)
    	return nil
    
    }
    
    func (m *Manager) UpdateJob(job GenericJob) error {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    
    	if _, ok := m.activeJobs[job.GetID()]; !ok {
    		return ErrJobNotActive
    	}
    
    	scheduler := m.activeJobs[job.GetID()].GetScheduler()
    
    	err := m.RemoveJob(job.GetID())
    	if err != nil {
    		return err
    	}
    
    	err = m.ScheduleJob(job, scheduler)
    	if err != nil {
    		return err
    	}
    
    	return nil
    }
    
    // ContainsActiveJob checks if a job is active
    func (m *Manager) ContainsActiveJob(id JobID) bool {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    	_, ok := m.activeJobs[id]
    	return ok
    }
    
    // SetDB sets the database connection
    func (m *Manager) SetDB(db *gorm.DB) *Manager {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    	m.database = db
    
    	if m.jobSyncer != nil {
    		return m
    	}
    	m.jobSyncer = NewJobSyncer(m)
    
    	return m
    }
    
    // GetDB returns the database connection
    func (m *Manager) GetDB() *gorm.DB {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    	return m.database
    }
    
    // GetQueue returns the queue
    func (m *Manager) checkAndSetRunningState() error {
    
    	m.state = ManagerStateStopped
    
    	if m.workerMap == nil {
    		return ErrNoWorkers
    	}
    
    	if len(m.workerMap) == 0 {
    		return ErrNoWorkers
    	}
    
    	for _, worker := range m.workerMap {
    		if worker.Status() == WorkerStatusRunning {
    			m.state = ManagerStateRunning
    			return nil
    		}
    	}
    
    	return nil
    }
    
    // AddWorker adds a worker to the manager
    func (m *Manager) AddWorker(worker Worker) error {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    
    	// check if worker is already added
    	if _, ok := m.workerMap[worker.GetID()]; ok {
    		return ErrWorkerAlreadyAdded
    	}
    
    	// check if state of worker is not running
    	if worker.Status() != WorkerStatusStopped {
    		return ErrWorkerAlreadyRunning
    	}
    
    	if m.state == ManagerStateRunning {
    		err := worker.Start()
    		if err != nil {
    			return err
    		}
    	}
    
    	// add worker to workerMap
    	m.workerMap[worker.GetID()] = worker
    	worker.SetManager(m)
    
    	return m.checkAndSetRunningState()
    }
    
    // RemoveWorker removes a worker from the manager
    func (m *Manager) RemoveWorker(worker Worker) error {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    
    	// check if worker is added
    	if _, ok := m.workerMap[worker.GetID()]; !ok {
    		return ErrWorkerNotAdded
    	}
    
    	// check if state of worker is not running
    	if worker.Status() != WorkerStatusStopped {
    		err := worker.Stop()
    		if err != nil {
    			return err
    		}
    	}
    
    	// remove worker from workerMap
    	delete(m.workerMap, worker.GetID())
    	err := m.checkAndSetRunningState()
    
    	if err != nil && err != ErrNoWorkers {
    		return err
    	}
    
    	return nil
    
    }
    
    // Start starts the manager
    func (m *Manager) Start() error {
    	//var err error
    
    	m.mu.Lock()
    	defer m.mu.Unlock()
    
    	if m.state == ManagerStateRunning {
    		return ErrManagerAlreadyRunning
    	}
    
    	if m.jobSyncer != nil {
    		p := CreateAndStartJobSyncer(m)
    
    		ready := make(chan struct{})
    
    		var jobSyncerErr error
    
    		Then[*JobSyncer, *JobSyncer](p, func(value *JobSyncer) (*JobSyncer, error) {
    			close(ready)
    			m.mu.Lock()
    			m.jobSyncer = value
    			m.mu.Unlock()
    			return value, nil
    		}, func(e error) error {
    			close(ready)
    			Error("Error while starting db saver", "error", e)
    			jobSyncerErr = e
    			return nil
    		})
    
    		<-ready
    
    		if jobSyncerErr != nil {
    			return jobSyncerErr
    		}
    	}
    
    	if len(m.workerMap) == 0 {
    		return ErrNoWorkers
    	}
    
    	var wrappedErr error
    
    	for _, worker := range m.workerMap {
    		err := worker.Start()
    		if err != nil && !errors.Is(err, ErrWorkerAlreadyRunning) {
    			if wrappedErr == nil {
    				wrappedErr = fmt.Errorf("Error: ")
    			}
    
    			wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
    		}
    	}
    
    	// check if we have one worker
    	for _, worker := range m.workerMap {
    		if worker.Status() == WorkerStatusRunning {
    			m.state = ManagerStateRunning
    			break
    		}
    	}
    
    	m.jobEventCh = make(chan interface{}, 100)
    
    	m.eventBus.Subscribe(QueueJob, m.jobEventCh)
    	m.eventBus.Subscribe(JobReady, m.jobEventCh)
    
    	go m.handleJobEvents()
    
    	err := m.checkAndSetRunningState()
    
    	if err != nil {
    		wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
    	}
    
    	if m.cronInstance != nil {
    		m.cronInstance.Start()
    	}
    
    	return wrappedErr
    
    }
    
    // Stop stops the manager
    func (m *Manager) Stop() error {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    
    	if m.state == ManagerStateStopped {
    		return ErrManagerAlreadyStopped
    	}
    
    	m.eventBus.Unsubscribe(QueueJob, m.jobEventCh)
    	m.eventBus.Unsubscribe(JobReady, m.jobEventCh)
    
    	_ = safeClose(m.jobEventCh)
    
    	var wrappedErr error
    
    	for _, worker := range m.workerMap {
    		err := worker.Stop()
    		if err != nil && !errors.Is(err, ErrWorkerAlreadyStopped) {
    			if wrappedErr == nil {
    				wrappedErr = fmt.Errorf("Error: ")
    			}
    
    			wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
    		}
    	}
    
    	err := m.checkAndSetRunningState()
    
    	if err != nil {
    		wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
    	}
    
    	if m.cronInstance != nil {
    		m.cronInstance.Stop()
    	}
    
    	if m.jobSyncer != nil {
    		err = m.jobSyncer.Stop()
    		if err != nil {
    			if wrappedErr == nil {
    				wrappedErr = fmt.Errorf("Error: ")
    			}
    
    			wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
    
    		}
    	}
    
    	return wrappedErr
    }
    
    func (m *Manager) SetLogger(logger Logger) *Manager {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    
    	if m.database != nil {
    		m.database = m.database.Session(&gorm.Session{Logger: newGormAdapter()})
    	}
    
    	return m
    }
    
    // RunJob runs a job immediately and does not schedule it
    func (m *Manager) RunJob(job GenericJob) error {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    
    	if m.state != ManagerStateRunning {
    		return ErrManagerNotRunning
    	}
    
    	if !job.IsPaused() {
    		m.eventBus.Publish(QueueJob, job)
    	}
    
    	return nil
    
    }
    
    // ScheduleJob schedules a job
    func (m *Manager) ScheduleJob(job GenericJob, scheduler Scheduler) error {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    
    	if scheduler == nil {
    		return ErrSchedulerNotSet
    	}
    
    	if m.state != ManagerStateRunning {
    		return ErrManagerNotRunning
    	}
    
    	if job.GetScheduler() != nil {
    		return ErrJobAlreadyScheduled
    	}
    
    	if _, ok := m.activeJobs[job.GetID()]; ok {
    		return ErrJobAlreadyActive
    	}
    
    	job.SetScheduler(scheduler)
    	err := scheduler.Schedule(job, m.eventBus)
    	if err != nil {
    		return err
    	}
    
    	m.activeJobs[job.GetID()] = job
    
    	if m.jobSyncer != nil {
    		m.jobSyncer.AddJob(job)
    	}
    
    	return nil
    
    }
    
    func (m *Manager) CancelJobSchedule(id JobID) error {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    
    	if m.state != ManagerStateRunning {
    		return ErrManagerNotRunning
    	}
    
    	job, ok := m.activeJobs[id]
    	if !ok {
    		return ErrJobNotActive
    	}
    
    	if job.GetScheduler() == nil {
    		return ErrJobNotScheduled
    	}
    
    	scheduler := job.GetScheduler()
    
    	err := scheduler.Cancel(job.GetID())
    	if err != nil {
    		return err
    	}
    
    	job.SetScheduler(nil)
    	delete(m.activeJobs, job.GetID())
    
    	return nil
    }
    
    // handleJobEvents handles job events
    func (m *Manager) handleJobEvents() {
    
    	for event := range m.jobEventCh {
    		switch event := event.(type) {
    		case Event:
    
    			Info("Event received", "event", event.Name, "data", event.Data, "message_id", event.MessageID)
    
    			switch event.Name {
    			case QueueJob:
    
    				Info("Job queued", "job_id", event.Data.(GenericJob).GetID())
    
    				job := event.Data.(GenericJob)
    				err := m.queue.Enqueue(job)
    				if err != nil && !errors.Is(err, ErrJobAlreadyExists) {
    					Error("Error while queueing job", "error", err)
    
    				}
    
    			case JobReady:
    				for {
    
    					nextJob, err := m.queue.Dequeue()
    					if err != nil {
    						break
    					}
    
    					Info("Job ready", "job_id", nextJob.GetID())
    
    					assigned := false
    					maxTries := 10
    
    					for maxTries > 0 {
    						maxTries--
    
    						for _, worker := range m.workerMap {
    							if err := worker.AssignJob(nextJob); err == nil {
    								assigned = true
    								break
    							}
    						}
    
    						if assigned == true {
    							break
    						}
    
    						time.Sleep(1 * time.Second)
    					}
    
    					if !assigned {
    						Info("No worker available for job", "job_id", nextJob.GetID())
    
    					} else {
    
    						if nextJob.GetScheduler() != nil {
    							if nextJob.GetScheduler().IsAdHoc() {
    								eventBus := m.GetEventBus()
    								eventBus.Publish(JobFinished, nextJob)
    							}
    
    						}
    
    					}
    				}
    
    			case JobFinished:
    				// job is finished and should be archived
    
    				job := event.Data.(GenericJob)
    				_ = job
    
    				err := m.CancelJobSchedule(job.GetID())
    				if err != nil {
    					Error("Error while canceling job schedule", "error", err)
    
    				}
    			}
    		}
    	}
    }