Something went wrong on our end
Select Git revision
-
Volker Schukai authoredVolker Schukai authored
manager.go 12.71 KiB
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
"errors"
"fmt"
"github.com/fsnotify/fsnotify"
"github.com/robfig/cron/v3"
"gorm.io/gorm"
"sync"
"time"
)
type ManagerState int
const (
ManagerStateStopped = iota
ManagerStateRunning
)
type Manager struct {
state ManagerState
queue *Queue
workerMap map[WorkerID]Worker
eventBus *EventBus
activeJobs map[JobID]GenericJob
jobEventCh chan interface{}
cronInstance *cron.Cron
//logger Logger
database *gorm.DB
jobSyncer *JobSyncer
mu sync.Mutex
}
// NewManager initializes a new Manager
func NewManager() *Manager {
eventBus := NewEventBus()
mng := &Manager{
state: ManagerStateStopped,
queue: NewQueue(eventBus),
workerMap: make(map[WorkerID]Worker),
eventBus: eventBus,
activeJobs: make(map[JobID]GenericJob),
}
return mng
}
// GetEventBus returns the event bus
func (m *Manager) GetEventBus() *EventBus {
m.mu.Lock()
defer m.mu.Unlock()
return m.eventBus
}
// SetCronInstance sets the cron instance
func (m *Manager) SetCronInstance(cronInstance *cron.Cron) *Manager {
m.mu.Lock()
defer m.mu.Unlock()
m.cronInstance = cronInstance
return m
}
// GetCronInstance returns the cron instance
func (m *Manager) GetCronInstance() *cron.Cron {
m.mu.Lock()
defer m.mu.Unlock()
return m.cronInstance
}
// NewCronScheduler creates a new cron scheduler
func (m *Manager) NewCronScheduler(spec string) *CronScheduler {
return &CronScheduler{
Spec: spec,
cron: m.GetCronInstance(),
}
}
// NewInstantScheduler creates a new instant scheduler
func (m *Manager) NewInstantScheduler() *InstantScheduler {
return &InstantScheduler{}
}
// NewIntervalScheduler creates a new interval scheduler
func (m *Manager) NewIntervalScheduler(interval time.Duration) *IntervalScheduler {
return &IntervalScheduler{
Interval: interval,
}
}
// NewDelayScheduler creates a new delay scheduler
func (m *Manager) NewDelayScheduler(delay time.Duration) *DelayScheduler {
return &DelayScheduler{
Delay: delay,
}
}
// NewEventScheduler creates a new event scheduler
func (m *Manager) NewEventScheduler(event EventName) *EventScheduler {
return &EventScheduler{
Event: event,
}
}
// NewTimeScheduler creates a new time scheduler
func (m *Manager) NewTimeScheduler(t time.Time) *TimeScheduler {
return &TimeScheduler{
Time: t,
}
}
// NewInotifyScheduler creates a new inotify scheduler
func (m *Manager) NewInotifyScheduler(path string, eventFlags fsnotify.Op) *InotifyScheduler {
return &InotifyScheduler{
Path: path,
EventFlags: eventFlags,
}
}
// GetActiveJobs returns the active jobs
func (m *Manager) GetActiveJobs() map[JobID]GenericJob {
m.mu.Lock()
defer m.mu.Unlock()
return m.activeJobs
}
// DeleteJob removes a job from the active jobs and the database
func (m *Manager) DeleteJob(id JobID) error {
m.mu.Lock()
defer m.mu.Unlock()
var job GenericJob
var ok bool
if job, ok = m.activeJobs[id]; !ok {
return ErrJobNotActive
}
err := m.removeJobInternal(id)
if err != nil {
return err
}
if m.jobSyncer != nil {
err := m.jobSyncer.DeleteJob(job)
if err != nil {
return err
}
}
return nil
}
// RemoveJob removes a job from the active jobs
// If you want to remove a job from the active jobs and the database, use DeleteJob instead
func (m *Manager) RemoveJob(id JobID) error {
m.mu.Lock()
defer m.mu.Unlock()
if _, ok := m.activeJobs[id]; !ok {
return ErrJobNotActive
}
return m.removeJobInternal(id)
}
// ResetJobLogs deletes the logs of a job
func (m *Manager) ResetJobLogs(id JobID) error {
m.mu.Lock()
defer m.mu.Unlock()
if _, ok := m.activeJobs[id]; !ok {
return ErrJobNotActive
}
if m.jobSyncer != nil {
err := m.jobSyncer.ResetLogs(m.activeJobs[id])
if err != nil {
return err
}
}
return nil
}
// ResetJobStats deletes the stats of a job
func (m *Manager) ResetJobStats(id JobID) error {
m.mu.Lock()
defer m.mu.Unlock()
if _, ok := m.activeJobs[id]; !ok {
return ErrJobNotActive
}
if m.jobSyncer != nil {
err := m.jobSyncer.ResetStats(m.activeJobs[id])
if err != nil {
return err
}
}
return nil
}
type CancelScheduler interface {
Cancel(id JobID) error
}
func cancelSchedulerByManager(scheduler CancelScheduler, id JobID) error {
err := scheduler.Cancel(id)
if err != nil {
return err
}
return nil
}
func (m *Manager) removeJobInternal(id JobID) error {
scheduler := m.activeJobs[id].GetScheduler()
if scheduler == nil {
return ErrJobNotScheduled
}
if cancelScheduler, ok := scheduler.(CancelScheduler); ok {
err := cancelSchedulerByManager(cancelScheduler, id)
if err != nil {
return err
}
} else {
return ErrUnknownScheduleType
}
delete(m.activeJobs, id)
return nil
}
func (m *Manager) UpdateJob(job GenericJob) error {
m.mu.Lock()
defer m.mu.Unlock()
if _, ok := m.activeJobs[job.GetID()]; !ok {
return ErrJobNotActive
}
scheduler := m.activeJobs[job.GetID()].GetScheduler()
err := m.RemoveJob(job.GetID())
if err != nil {
return err
}
err = m.ScheduleJob(job, scheduler)
if err != nil {
return err
}
return nil
}
// ContainsActiveJob checks if a job is active
func (m *Manager) ContainsActiveJob(id JobID) bool {
m.mu.Lock()
defer m.mu.Unlock()
_, ok := m.activeJobs[id]
return ok
}
// SetDB sets the database connection
func (m *Manager) SetDB(db *gorm.DB) *Manager {
m.mu.Lock()
defer m.mu.Unlock()
m.database = db
if m.jobSyncer != nil {
return m
}
m.jobSyncer = NewJobSyncer(m)
return m
}
// GetDB returns the database connection
func (m *Manager) GetDB() *gorm.DB {
m.mu.Lock()
defer m.mu.Unlock()
return m.database
}
// GetQueue returns the queue
func (m *Manager) checkAndSetRunningState() error {
m.state = ManagerStateStopped
if m.workerMap == nil {
return ErrNoWorkers
}
if len(m.workerMap) == 0 {
return ErrNoWorkers
}
for _, worker := range m.workerMap {
if worker.Status() == WorkerStatusRunning {
m.state = ManagerStateRunning
return nil
}
}
return nil
}
// AddWorker adds a worker to the manager
func (m *Manager) AddWorker(worker Worker) error {
m.mu.Lock()
defer m.mu.Unlock()
// check if worker is already added
if _, ok := m.workerMap[worker.GetID()]; ok {
return ErrWorkerAlreadyAdded
}
// check if state of worker is not running
if worker.Status() != WorkerStatusStopped {
return ErrWorkerAlreadyRunning
}
if m.state == ManagerStateRunning {
err := worker.Start()
if err != nil {
return err
}
}
// add worker to workerMap
m.workerMap[worker.GetID()] = worker
worker.SetManager(m)
return m.checkAndSetRunningState()
}
// RemoveWorker removes a worker from the manager
func (m *Manager) RemoveWorker(worker Worker) error {
m.mu.Lock()
defer m.mu.Unlock()
// check if worker is added
if _, ok := m.workerMap[worker.GetID()]; !ok {
return ErrWorkerNotAdded
}
// check if state of worker is not running
if worker.Status() != WorkerStatusStopped {
err := worker.Stop()
if err != nil {
return err
}
}
// remove worker from workerMap
delete(m.workerMap, worker.GetID())
err := m.checkAndSetRunningState()
if err != nil && err != ErrNoWorkers {
return err
}
return nil
}
// Start starts the manager
func (m *Manager) Start() error {
//var err error
m.mu.Lock()
defer m.mu.Unlock()
if m.state == ManagerStateRunning {
return ErrManagerAlreadyRunning
}
if m.jobSyncer != nil {
p := CreateAndStartJobSyncer(m)
ready := make(chan struct{})
var jobSyncerErr error
Then[*JobSyncer, *JobSyncer](p, func(value *JobSyncer) (*JobSyncer, error) {
close(ready)
m.mu.Lock()
m.jobSyncer = value
m.mu.Unlock()
return value, nil
}, func(e error) error {
close(ready)
Error("Error while starting db saver", "error", e)
jobSyncerErr = e
return nil
})
<-ready
if jobSyncerErr != nil {
return jobSyncerErr
}
}
if len(m.workerMap) == 0 {
return ErrNoWorkers
}
var wrappedErr error
for _, worker := range m.workerMap {
err := worker.Start()
if err != nil && !errors.Is(err, ErrWorkerAlreadyRunning) {
if wrappedErr == nil {
wrappedErr = fmt.Errorf("Error: ")
}
wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
}
}
// check if we have one worker
for _, worker := range m.workerMap {
if worker.Status() == WorkerStatusRunning {
m.state = ManagerStateRunning
break
}
}
m.jobEventCh = make(chan interface{}, 100)
m.eventBus.Subscribe(QueueJob, m.jobEventCh)
m.eventBus.Subscribe(JobReady, m.jobEventCh)
go m.handleJobEvents()
err := m.checkAndSetRunningState()
if err != nil {
wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
}
if m.cronInstance != nil {
m.cronInstance.Start()
}
return wrappedErr
}
// Stop stops the manager
func (m *Manager) Stop() error {
m.mu.Lock()
defer m.mu.Unlock()
if m.state == ManagerStateStopped {
return ErrManagerAlreadyStopped
}
m.eventBus.Unsubscribe(QueueJob, m.jobEventCh)
m.eventBus.Unsubscribe(JobReady, m.jobEventCh)
_ = safeClose(m.jobEventCh)
var wrappedErr error
for _, worker := range m.workerMap {
err := worker.Stop()
if err != nil && !errors.Is(err, ErrWorkerAlreadyStopped) {
if wrappedErr == nil {
wrappedErr = fmt.Errorf("Error: ")
}
wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
}
}
err := m.checkAndSetRunningState()
if err != nil {
wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
}
if m.cronInstance != nil {
m.cronInstance.Stop()
}
if m.jobSyncer != nil {
err = m.jobSyncer.Stop()
if err != nil {
if wrappedErr == nil {
wrappedErr = fmt.Errorf("Error: ")
}
wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
}
}
return wrappedErr
}
func (m *Manager) SetLogger(logger Logger) *Manager {
m.mu.Lock()
defer m.mu.Unlock()
if m.database != nil {
m.database = m.database.Session(&gorm.Session{Logger: newGormAdapter()})
}
return m
}
// RunJob runs a job immediately and does not schedule it
func (m *Manager) RunJob(job GenericJob) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.state != ManagerStateRunning {
return ErrManagerNotRunning
}
if !job.IsPaused() {
m.eventBus.Publish(QueueJob, job)
}
return nil
}
// ScheduleJob schedules a job
func (m *Manager) ScheduleJob(job GenericJob, scheduler Scheduler) error {
m.mu.Lock()
defer m.mu.Unlock()
if scheduler == nil {
return ErrSchedulerNotSet
}
if m.state != ManagerStateRunning {
return ErrManagerNotRunning
}
if job.GetScheduler() != nil {
return ErrJobAlreadyScheduled
}
if _, ok := m.activeJobs[job.GetID()]; ok {
return ErrJobAlreadyActive
}
job.SetScheduler(scheduler)
err := scheduler.Schedule(job, m.eventBus)
if err != nil {
return err
}
m.activeJobs[job.GetID()] = job
if m.jobSyncer != nil {
m.jobSyncer.AddJob(job)
}
return nil
}
func (m *Manager) CancelJobSchedule(id JobID) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.state != ManagerStateRunning {
return ErrManagerNotRunning
}
job, ok := m.activeJobs[id]
if !ok {
return ErrJobNotActive
}
if job.GetScheduler() == nil {
return ErrJobNotScheduled
}
scheduler := job.GetScheduler()
err := scheduler.Cancel(job.GetID())
if err != nil {
return err
}
job.SetScheduler(nil)
delete(m.activeJobs, job.GetID())
return nil
}
// handleJobEvents handles job events
func (m *Manager) handleJobEvents() {
for event := range m.jobEventCh {
switch event := event.(type) {
case Event:
Info("Event received", "event", event.Name, "data", event.Data, "message_id", event.MessageID)
switch event.Name {
case QueueJob:
Info("Job queued", "job_id", event.Data.(GenericJob).GetID())
job := event.Data.(GenericJob)
err := m.queue.Enqueue(job)
if err != nil && !errors.Is(err, ErrJobAlreadyExists) {
Error("Error while queueing job", "error", err)
}
case JobReady:
for {
nextJob, err := m.queue.Dequeue()
if err != nil {
break
}
Info("Job ready", "job_id", nextJob.GetID())
assigned := false
maxTries := 10
for maxTries > 0 {
maxTries--
for _, worker := range m.workerMap {
if err := worker.AssignJob(nextJob); err == nil {
assigned = true
break
}
}
if assigned == true {
break
}
time.Sleep(1 * time.Second)
}
if !assigned {
Info("No worker available for job", "job_id", nextJob.GetID())
} else {
if nextJob.GetScheduler() != nil {
if nextJob.GetScheduler().IsAdHoc() {
eventBus := m.GetEventBus()
eventBus.Publish(JobFinished, nextJob)
}
}
}
}
case JobFinished:
// job is finished and should be archived
job := event.Data.(GenericJob)
_ = job
err := m.CancelJobSchedule(job.GetID())
if err != nil {
Error("Error while canceling job schedule", "error", err)
}
}
}
}
}