// Copyright 2023 schukai GmbH // SPDX-License-Identifier: AGPL-3.0 package jobqueue import ( "errors" "fmt" "github.com/fsnotify/fsnotify" "github.com/robfig/cron/v3" "gorm.io/gorm" "sync" "time" ) type ManagerState int const ( ManagerStateStopped = iota ManagerStateRunning ) type Manager struct { state ManagerState queue *Queue workerMap map[WorkerID]Worker eventBus *EventBus activeJobs map[JobID]GenericJob jobEventCh chan interface{} cronInstance *cron.Cron //logger Logger database *gorm.DB dbSaver *DBSaver mu sync.Mutex } // NewManager initializes a new Manager func NewManager() *Manager { eventBus := NewEventBus() mng := &Manager{ state: ManagerStateStopped, queue: NewQueue(eventBus), workerMap: make(map[WorkerID]Worker), eventBus: eventBus, activeJobs: make(map[JobID]GenericJob), } return mng } // GetEventBus returns the event bus func (m *Manager) GetEventBus() *EventBus { m.mu.Lock() defer m.mu.Unlock() return m.eventBus } // SetCronInstance sets the cron instance func (m *Manager) SetCronInstance(cronInstance *cron.Cron) *Manager { m.mu.Lock() defer m.mu.Unlock() m.cronInstance = cronInstance return m } // GetCronInstance returns the cron instance func (m *Manager) GetCronInstance() *cron.Cron { m.mu.Lock() defer m.mu.Unlock() return m.cronInstance } // NewCronScheduler creates a new cron scheduler func (m *Manager) NewCronScheduler(spec string) *CronScheduler { return &CronScheduler{ Spec: spec, cron: m.GetCronInstance(), } } // NewInstantScheduler creates a new instant scheduler func (m *Manager) NewInstantScheduler() *InstantScheduler { return &InstantScheduler{} } // NewIntervalScheduler creates a new interval scheduler func (m *Manager) NewIntervalScheduler(interval time.Duration) *IntervalScheduler { return &IntervalScheduler{ Interval: interval, } } // NewDelayScheduler creates a new delay scheduler func (m *Manager) NewDelayScheduler(delay time.Duration) *DelayScheduler { return &DelayScheduler{ Delay: delay, } } // NewEventScheduler creates a new event scheduler func (m *Manager) NewEventScheduler(event EventName) *EventScheduler { return &EventScheduler{ Event: event, } } // NewTimeScheduler creates a new time scheduler func (m *Manager) NewTimeScheduler(t time.Time) *TimeScheduler { return &TimeScheduler{ Time: t, } } // NewInotifyScheduler creates a new inotify scheduler func (m *Manager) NewInotifyScheduler(path string, eventFlags fsnotify.Op) *InotifyScheduler { return &InotifyScheduler{ Path: path, EventFlags: eventFlags, } } // GetActiveJobs returns the active jobs func (m *Manager) GetActiveJobs() map[JobID]GenericJob { m.mu.Lock() defer m.mu.Unlock() return m.activeJobs } // DeleteJob removes a job from the active jobs and the database func (m *Manager) DeleteJob(id JobID) error { m.mu.Lock() defer m.mu.Unlock() var job GenericJob var ok bool if job, ok = m.activeJobs[id]; !ok { return ErrJobNotActive } err := m.removeJobInternal(id) if err != nil { return err } if m.dbSaver != nil { err := m.dbSaver.DeleteJob(job) if err != nil { return err } } return nil } // RemoveJob removes a job from the active jobs // If you want to remove a job from the active jobs and the database, use DeleteJob instead func (m *Manager) RemoveJob(id JobID) error { m.mu.Lock() defer m.mu.Unlock() if _, ok := m.activeJobs[id]; !ok { return ErrJobNotActive } return m.removeJobInternal(id) } // ResetJobLogs deletes the logs of a job func (m *Manager) ResetJobLogs(id JobID) error { m.mu.Lock() defer m.mu.Unlock() if _, ok := m.activeJobs[id]; !ok { return ErrJobNotActive } if m.dbSaver != nil { err := m.dbSaver.ResetLogs(m.activeJobs[id]) if err != nil { return err } } return nil } // ResetJobStats deletes the stats of a job func (m *Manager) ResetJobStats(id JobID) error { m.mu.Lock() defer m.mu.Unlock() if _, ok := m.activeJobs[id]; !ok { return ErrJobNotActive } if m.dbSaver != nil { err := m.dbSaver.ResetStats(m.activeJobs[id]) if err != nil { return err } } return nil } type CancelScheduler interface { Cancel(id JobID) error } func cancelSchedulerByManager(scheduler CancelScheduler, id JobID) error { err := scheduler.Cancel(id) if err != nil { return err } return nil } func (m *Manager) removeJobInternal(id JobID) error { scheduler := m.activeJobs[id].GetScheduler() if scheduler == nil { return ErrJobNotScheduled } if cancelScheduler, ok := scheduler.(CancelScheduler); ok { err := cancelSchedulerByManager(cancelScheduler, id) if err != nil { return err } } else { return ErrUnknownScheduleType } delete(m.activeJobs, id) return nil } func (m *Manager) UpdateJob(job GenericJob) error { m.mu.Lock() defer m.mu.Unlock() if _, ok := m.activeJobs[job.GetID()]; !ok { return ErrJobNotActive } scheduler := m.activeJobs[job.GetID()].GetScheduler() err := m.RemoveJob(job.GetID()) if err != nil { return err } err = m.ScheduleJob(job, scheduler) if err != nil { return err } return nil } // ContainsActiveJob checks if a job is active func (m *Manager) ContainsActiveJob(id JobID) bool { m.mu.Lock() defer m.mu.Unlock() _, ok := m.activeJobs[id] return ok } // SetDB sets the database connection func (m *Manager) SetDB(db *gorm.DB) *Manager { m.mu.Lock() defer m.mu.Unlock() m.database = db if m.dbSaver != nil { return m } m.dbSaver = NewDBSaver() m.dbSaver.SetManager(m) return m } // GetDB returns the database connection func (m *Manager) GetDB() *gorm.DB { m.mu.Lock() defer m.mu.Unlock() return m.database } // GetQueue returns the queue func (m *Manager) checkAndSetRunningState() error { m.state = ManagerStateStopped if m.workerMap == nil { return ErrNoWorkers } if len(m.workerMap) == 0 { return ErrNoWorkers } for _, worker := range m.workerMap { if worker.Status() == WorkerStatusRunning { m.state = ManagerStateRunning return nil } } return nil } // AddWorker adds a worker to the manager func (m *Manager) AddWorker(worker Worker) error { m.mu.Lock() defer m.mu.Unlock() // check if worker is already added if _, ok := m.workerMap[worker.GetID()]; ok { return ErrWorkerAlreadyAdded } // check if state of worker is not running if worker.Status() != WorkerStatusStopped { return ErrWorkerAlreadyRunning } if m.state == ManagerStateRunning { err := worker.Start() if err != nil { return err } } // add worker to workerMap m.workerMap[worker.GetID()] = worker worker.SetManager(m) return m.checkAndSetRunningState() } // RemoveWorker removes a worker from the manager func (m *Manager) RemoveWorker(worker Worker) error { m.mu.Lock() defer m.mu.Unlock() // check if worker is added if _, ok := m.workerMap[worker.GetID()]; !ok { return ErrWorkerNotAdded } // check if state of worker is not running if worker.Status() != WorkerStatusStopped { err := worker.Stop() if err != nil { return err } } // remove worker from workerMap delete(m.workerMap, worker.GetID()) err := m.checkAndSetRunningState() if err != nil && err != ErrNoWorkers { return err } return nil } // Start starts the manager func (m *Manager) Start() error { var err error m.mu.Lock() defer m.mu.Unlock() if m.state == ManagerStateRunning { return ErrManagerAlreadyRunning } if m.dbSaver != nil { p := StartDBSaver(m.dbSaver) ready := make(chan struct{}) Then[bool, bool](p, func(value bool) (bool, error) { close(ready) return value, nil }, func(e error) error { close(ready) Error("Error while starting db saver", "error", err) return nil }) <-ready } if len(m.workerMap) == 0 { return ErrNoWorkers } var wrappedErr error for _, worker := range m.workerMap { err := worker.Start() if err != nil && !errors.Is(err, ErrWorkerAlreadyRunning) { if wrappedErr == nil { wrappedErr = fmt.Errorf("Error: ") } wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error()) } } // check if we have one worker for _, worker := range m.workerMap { if worker.Status() == WorkerStatusRunning { m.state = ManagerStateRunning break } } m.jobEventCh = make(chan interface{}, 100) m.eventBus.Subscribe(QueueJob, m.jobEventCh) m.eventBus.Subscribe(JobReady, m.jobEventCh) go m.handleJobEvents() err = m.checkAndSetRunningState() if err != nil { wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error()) } if m.cronInstance != nil { m.cronInstance.Start() } return wrappedErr } // Stop stops the manager func (m *Manager) Stop() error { m.mu.Lock() defer m.mu.Unlock() if m.state == ManagerStateStopped { return ErrManagerAlreadyStopped } m.eventBus.Unsubscribe(QueueJob, m.jobEventCh) m.eventBus.Unsubscribe(JobReady, m.jobEventCh) _ = safeClose(m.jobEventCh) var wrappedErr error for _, worker := range m.workerMap { err := worker.Stop() if err != nil && err != ErrWorkerAlreadyStopped { if wrappedErr == nil { wrappedErr = fmt.Errorf("Error: ") } wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error()) } } err := m.checkAndSetRunningState() if err != nil { wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error()) } if m.cronInstance != nil { m.cronInstance.Stop() } if m.dbSaver != nil { m.dbSaver.Stop() } return wrappedErr } func (m *Manager) SetLogger(logger Logger) *Manager { m.mu.Lock() defer m.mu.Unlock() if m.database != nil { m.database = m.database.Session(&gorm.Session{Logger: newGormAdapter()}) } return m } // RunJob runs a job immediately and does not schedule it func (m *Manager) RunJob(job GenericJob) error { m.mu.Lock() defer m.mu.Unlock() if m.state != ManagerStateRunning { return ErrManagerNotRunning } if !job.IsPaused() { m.eventBus.Publish(QueueJob, job) } return nil } // ScheduleJob schedules a job func (m *Manager) ScheduleJob(job GenericJob, scheduler Scheduler) error { m.mu.Lock() defer m.mu.Unlock() if scheduler == nil { return ErrSchedulerNotSet } if m.state != ManagerStateRunning { return ErrManagerNotRunning } if job.GetScheduler() != nil { return ErrJobAlreadyScheduled } if _, ok := m.activeJobs[job.GetID()]; ok { return ErrJobAlreadyActive } job.SetScheduler(scheduler) err := scheduler.Schedule(job, m.eventBus) if err != nil { return err } m.activeJobs[job.GetID()] = job if m.dbSaver != nil { err := m.dbSaver.SaveJob(job) if err != nil { return err } } return nil } func (m *Manager) CancelJobSchedule(id JobID) error { m.mu.Lock() defer m.mu.Unlock() if m.state != ManagerStateRunning { return ErrManagerNotRunning } job, ok := m.activeJobs[id] if !ok { return ErrJobNotActive } if job.GetScheduler() == nil { return ErrJobNotScheduled } scheduler := job.GetScheduler() err := scheduler.Cancel(job.GetID()) if err != nil { return err } job.SetScheduler(nil) delete(m.activeJobs, job.GetID()) return nil } // handleJobEvents handles job events func (m *Manager) handleJobEvents() { for event := range m.jobEventCh { switch event := event.(type) { case Event: Info("Event received", "event", event.Name, "data", event.Data, "message_id", event.MessageID) switch event.Name { case QueueJob: Info("Job queued", "job_id", event.Data.(GenericJob).GetID()) job := event.Data.(GenericJob) err := m.queue.Enqueue(job) if err != nil && err != ErrJobAlreadyExists { Error("Error while queueing job", "error", err) } case JobReady: for { nextJob, err := m.queue.Dequeue() if err != nil { break } Info("Job ready", "job_id", nextJob.GetID()) assigned := false maxTries := 10 for maxTries > 0 { maxTries-- for _, worker := range m.workerMap { if err := worker.AssignJob(nextJob); err == nil { assigned = true break } } if assigned == true { break } time.Sleep(1 * time.Second) } if !assigned { Info("No worker available for job", "job_id", nextJob.GetID()) } else { if nextJob.GetScheduler() != nil { if nextJob.GetScheduler().IsAdHoc() { eventBus := m.GetEventBus() eventBus.Publish(JobFinished, nextJob) } } } } case JobFinished: // job is finished and should be archived job := event.Data.(GenericJob) _ = job err := m.CancelJobSchedule(job.GetID()) if err != nil { Error("Error while canceling job schedule", "error", err) } } } } }