Skip to content
Snippets Groups Projects
Select Git revision
  • 46f8f1f4892ee8469a59dc0263f064dc429951e7
  • master default protected
  • 0.1.1
  • 0.1.0
4 results

.test1.md.swp

Blame
  • manager.go 8.57 KiB
    package jobqueue
    
    import (
    	"fmt"
    	"github.com/robfig/cron/v3"
    	"gorm.io/gorm"
    	"sync"
    	"time"
    )
    
    type ManagerState int
    
    const (
    	ManagerStateStopped = iota
    	ManagerStateRunning
    )
    
    type Manager struct {
    	state ManagerState
    
    	queue     *Queue
    	workerMap map[WorkerID]Worker
    	eventBus  *EventBus
    
    	activeJobs map[JobID]GenericJob
    
    	jobEventCh chan interface{}
    
    	cronInstance *cron.Cron
    	logger       Logger
    
    	database *gorm.DB
    	dbSaver  *DBSaver
    
    	mu sync.Mutex
    }
    
    // NewManager initializes a new Manager
    func NewManager() *Manager {
    
    	eventBus := NewEventBus()
    
    	mng := &Manager{
    		state:      ManagerStateStopped,
    		queue:      NewQueue(eventBus),
    		workerMap:  make(map[WorkerID]Worker),
    		eventBus:   eventBus,
    		activeJobs: make(map[JobID]GenericJob),
    	}
    
    	return mng
    
    }
    
    func (m *Manager) GetEventBus() *EventBus {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    	return m.eventBus
    }
    
    func (m *Manager) SetCronInstance(cronInstance *cron.Cron) *Manager {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    	m.cronInstance = cronInstance
    	return m
    }
    
    func (m *Manager) GetCronInstance() *cron.Cron {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    	return m.cronInstance
    }
    
    func (m *Manager) NewCronScheduler(spec string) *CronScheduler {
    	return &CronScheduler{
    		Spec: spec,
    		cron: m.GetCronInstance(),
    	}
    }
    
    func (m *Manager) NewInstantScheduler() *InstantScheduler {
    	return &InstantScheduler{}
    }
    
    func (m *Manager) NewIntervalScheduler(interval time.Duration) *IntervalScheduler {
    	return &IntervalScheduler{
    		Interval: interval,
    	}
    }
    
    func (m *Manager) NewDelayScheduler(delay time.Duration) *DelayScheduler {
    	return &DelayScheduler{
    		Delay: delay,
    	}
    }
    
    func (m *Manager) NewEventScheduler(event EventName) *EventScheduler {
    	return &EventScheduler{
    		Event: event,
    	}
    }
    
    func (m *Manager) SetDB(db *gorm.DB) *Manager {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    	m.database = db
    
    	if m.dbSaver != nil {
    		return m
    	}
    
    	m.dbSaver = NewDBSaver()
    	m.dbSaver.SetManager(m)
    
    	return m
    }
    
    // GetDB returns the database connection
    func (m *Manager) GetDB() *gorm.DB {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    	return m.database
    }
    
    // GetQueue returns the queue
    func (m *Manager) checkAndSetRunningState() error {
    
    	m.state = ManagerStateStopped
    
    	if m.workerMap == nil {
    		return ErrNoWorkers
    	}
    
    	if len(m.workerMap) == 0 {
    		return ErrNoWorkers
    	}
    
    	for _, worker := range m.workerMap {
    		if worker.Status() == WorkerStatusRunning {
    			m.state = ManagerStateRunning
    			return nil
    		}
    	}
    
    	return nil
    }
    
    // AddWorker adds a worker to the manager
    func (m *Manager) AddWorker(worker Worker) error {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    
    	// check if worker is already added
    	if _, ok := m.workerMap[worker.GetID()]; ok {
    		return ErrWorkerAlreadyAdded
    	}
    
    	// check if state of worker is not running
    	if worker.Status() != WorkerStatusStopped {
    		return ErrWorkerAlreadyRunning
    	}
    
    	if m.state == ManagerStateRunning {
    		err := worker.Start()
    		if err != nil {
    			return err
    		}
    	}
    
    	// add worker to workerMap
    	m.workerMap[worker.GetID()] = worker
    	worker.SetManager(m)
    
    	return m.checkAndSetRunningState()
    }
    
    // RemoveWorker removes a worker from the manager
    func (m *Manager) RemoveWorker(worker Worker) error {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    
    	// check if worker is added
    	if _, ok := m.workerMap[worker.GetID()]; !ok {
    		return ErrWorkerNotAdded
    	}
    
    	// check if state of worker is not running
    	if worker.Status() != WorkerStatusStopped {
    		err := worker.Stop()
    		if err != nil {
    			return err
    		}
    	}
    
    	// remove worker from workerMap
    	delete(m.workerMap, worker.GetID())
    	err := m.checkAndSetRunningState()
    
    	if err != nil && err != ErrNoWorkers {
    		return err
    	}
    
    	return nil
    
    }
    
    // Start starts the manager
    func (m *Manager) Start() error {
    	var err error
    
    	m.mu.Lock()
    	defer m.mu.Unlock()
    
    	if m.state == ManagerStateRunning {
    		return ErrManagerAlreadyRunning
    	}
    
    	if m.dbSaver != nil {
    		err = m.dbSaver.Start()
    		if err != nil {
    			if m.logger != nil {
    				m.logger.Error("Error while starting db saver", "error", err)
    			}
    		}
    	}
    
    	if len(m.workerMap) == 0 {
    		return ErrNoWorkers
    	}
    
    	var wrappedErr error
    
    	for _, worker := range m.workerMap {
    		err := worker.Start()
    		if err != nil && err != ErrWorkerAlreadyRunning {
    			if wrappedErr == nil {
    				wrappedErr = fmt.Errorf("Error: ")
    			}
    
    			wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
    		}
    	}
    
    	// check if we have one worker
    	for _, worker := range m.workerMap {
    		if worker.Status() == WorkerStatusRunning {
    			m.state = ManagerStateRunning
    			break
    		}
    	}
    
    	m.jobEventCh = make(chan interface{}, 100)
    
    	m.eventBus.Subscribe(QueueJob, m.jobEventCh)
    	m.eventBus.Subscribe(JobReady, m.jobEventCh)
    
    	go m.handleJobEvents()
    
    	err = m.checkAndSetRunningState()
    
    	if err != nil {
    		wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
    	}
    
    	if m.cronInstance != nil {
    		m.cronInstance.Start()
    	}
    
    	return wrappedErr
    
    }
    
    // Stop stops the manager
    func (m *Manager) Stop() error {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    
    	if m.state == ManagerStateStopped {
    		return ErrManagerAlreadyStopped
    	}
    
    	m.eventBus.Unsubscribe(QueueJob, m.jobEventCh)
    	m.eventBus.Unsubscribe(JobReady, m.jobEventCh)
    
    	close(m.jobEventCh)
    
    	var wrappedErr error
    
    	for _, worker := range m.workerMap {
    		err := worker.Stop()
    		if err != nil && err != ErrWorkerAlreadyStopped {
    			if wrappedErr == nil {
    				wrappedErr = fmt.Errorf("Error: ")
    			}
    
    			wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
    		}
    	}
    
    	err := m.checkAndSetRunningState()
    
    	if err != nil {
    		wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
    	}
    
    	if m.cronInstance != nil {
    		m.cronInstance.Stop()
    	}
    
    	if m.dbSaver != nil {
    		m.dbSaver.Stop()
    	}
    
    	return wrappedErr
    }
    
    func (m *Manager) SetLogger(logger Logger) *Manager {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    	m.logger = logger
    	return m
    }
    
    func (m *Manager) GetLogger() Logger {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    	return m.logger
    }
    
    // handleJobEvents handles job events
    func (m *Manager) handleJobEvents() {
    
    	for event := range m.jobEventCh {
    		switch event := event.(type) {
    		case Event:
    
    			if m.logger != nil {
    				m.logger.Info("Event received", "event", event.Name, "data", event.Data, "message_id", event.MessageID)
    			}
    
    			switch event.Name {
    			case QueueJob:
    
    				if m.logger != nil {
    					m.logger.Info("Job queued", "job_id", event.Data.(GenericJob).GetID())
    				}
    
    				job := event.Data.(GenericJob)
    				err := m.queue.Enqueue(job)
    				if err != nil && err != ErrJobAlreadyExists {
    					if m.logger != nil {
    						m.logger.Error("Error while queueing job", "error", err)
    					}
    				}
    
    			case JobReady:
    				for {
    
    					nextJob, err := m.queue.Dequeue()
    					if err != nil {
    						break
    					}
    
    					if m.logger != nil {
    						m.logger.Info("Job ready", "job_id", nextJob.GetID())
    					}
    
    					assigned := false
    					maxTries := 10
    
    					for maxTries > 0 {
    						maxTries--
    
    						for _, worker := range m.workerMap {
    							if err := worker.AssignJob(nextJob); err == nil {
    								assigned = true
    								break
    							}
    						}
    
    						if assigned == true {
    							break
    						}
    
    						time.Sleep(1 * time.Second)
    					}
    
    					if !assigned {
    						if m.logger != nil {
    							m.logger.Info("No worker available for job", "job_id", nextJob.GetID())
    						}
    					} else {
    
    						if nextJob.GetScheduler() != nil {
    							if nextJob.GetScheduler().IsAdHoc() {
    								eventBus := m.GetEventBus()
    								eventBus.Publish(JobFinished, nextJob)
    							}
    
    						}
    
    					}
    				}
    
    			case JobFinished:
    				// job is finished and should be archived
    
    				job := event.Data.(GenericJob)
    				_ = job
    
    				err := m.CancelJobSchedule(job.GetID())
    				if err != nil {
    					if m.logger != nil {
    						m.logger.Error("Error while canceling job schedule", "error", err)
    					}
    				}
    			}
    		}
    	}
    }
    
    // ScheduleJob schedules a job
    func (m *Manager) ScheduleJob(job GenericJob, scheduler Scheduler) error {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    
    	if scheduler == nil {
    		return ErrSchedulerNotSet
    	}
    
    	if m.state != ManagerStateRunning {
    		return ErrManagerNotRunning
    	}
    
    	if job.GetScheduler() != nil {
    		return ErrJobAlreadyScheduled
    	}
    
    	if _, ok := m.activeJobs[job.GetID()]; ok {
    		return ErrJobAlreadyActive
    	}
    
    	job.SetScheduler(scheduler)
    	err := scheduler.Schedule(job, m.eventBus)
    	if err != nil {
    		return err
    	}
    
    	m.activeJobs[job.GetID()] = job
    	return nil
    
    }
    
    func (m *Manager) CancelJobSchedule(id JobID) error {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    
    	if m.state != ManagerStateRunning {
    		return ErrManagerNotRunning
    	}
    
    	job, ok := m.activeJobs[id]
    	if !ok {
    		return ErrJobNotActive
    	}
    
    	if job.GetScheduler() == nil {
    		return ErrJobNotScheduled
    	}
    
    	scheduler := job.GetScheduler()
    
    	err := scheduler.Cancel(job.GetID())
    	if err != nil {
    		return err
    	}
    
    	job.SetScheduler(nil)
    	delete(m.activeJobs, job.GetID())
    
    	return nil
    }