Something went wrong on our end
Select Git revision
.test1.md.swp
-
Volker Schukai authoredVolker Schukai authored
manager.go 8.57 KiB
package jobqueue
import (
"fmt"
"github.com/robfig/cron/v3"
"gorm.io/gorm"
"sync"
"time"
)
type ManagerState int
const (
ManagerStateStopped = iota
ManagerStateRunning
)
type Manager struct {
state ManagerState
queue *Queue
workerMap map[WorkerID]Worker
eventBus *EventBus
activeJobs map[JobID]GenericJob
jobEventCh chan interface{}
cronInstance *cron.Cron
logger Logger
database *gorm.DB
dbSaver *DBSaver
mu sync.Mutex
}
// NewManager initializes a new Manager
func NewManager() *Manager {
eventBus := NewEventBus()
mng := &Manager{
state: ManagerStateStopped,
queue: NewQueue(eventBus),
workerMap: make(map[WorkerID]Worker),
eventBus: eventBus,
activeJobs: make(map[JobID]GenericJob),
}
return mng
}
func (m *Manager) GetEventBus() *EventBus {
m.mu.Lock()
defer m.mu.Unlock()
return m.eventBus
}
func (m *Manager) SetCronInstance(cronInstance *cron.Cron) *Manager {
m.mu.Lock()
defer m.mu.Unlock()
m.cronInstance = cronInstance
return m
}
func (m *Manager) GetCronInstance() *cron.Cron {
m.mu.Lock()
defer m.mu.Unlock()
return m.cronInstance
}
func (m *Manager) NewCronScheduler(spec string) *CronScheduler {
return &CronScheduler{
Spec: spec,
cron: m.GetCronInstance(),
}
}
func (m *Manager) NewInstantScheduler() *InstantScheduler {
return &InstantScheduler{}
}
func (m *Manager) NewIntervalScheduler(interval time.Duration) *IntervalScheduler {
return &IntervalScheduler{
Interval: interval,
}
}
func (m *Manager) NewDelayScheduler(delay time.Duration) *DelayScheduler {
return &DelayScheduler{
Delay: delay,
}
}
func (m *Manager) NewEventScheduler(event EventName) *EventScheduler {
return &EventScheduler{
Event: event,
}
}
func (m *Manager) SetDB(db *gorm.DB) *Manager {
m.mu.Lock()
defer m.mu.Unlock()
m.database = db
if m.dbSaver != nil {
return m
}
m.dbSaver = NewDBSaver()
m.dbSaver.SetManager(m)
return m
}
// GetDB returns the database connection
func (m *Manager) GetDB() *gorm.DB {
m.mu.Lock()
defer m.mu.Unlock()
return m.database
}
// GetQueue returns the queue
func (m *Manager) checkAndSetRunningState() error {
m.state = ManagerStateStopped
if m.workerMap == nil {
return ErrNoWorkers
}
if len(m.workerMap) == 0 {
return ErrNoWorkers
}
for _, worker := range m.workerMap {
if worker.Status() == WorkerStatusRunning {
m.state = ManagerStateRunning
return nil
}
}
return nil
}
// AddWorker adds a worker to the manager
func (m *Manager) AddWorker(worker Worker) error {
m.mu.Lock()
defer m.mu.Unlock()
// check if worker is already added
if _, ok := m.workerMap[worker.GetID()]; ok {
return ErrWorkerAlreadyAdded
}
// check if state of worker is not running
if worker.Status() != WorkerStatusStopped {
return ErrWorkerAlreadyRunning
}
if m.state == ManagerStateRunning {
err := worker.Start()
if err != nil {
return err
}
}
// add worker to workerMap
m.workerMap[worker.GetID()] = worker
worker.SetManager(m)
return m.checkAndSetRunningState()
}
// RemoveWorker removes a worker from the manager
func (m *Manager) RemoveWorker(worker Worker) error {
m.mu.Lock()
defer m.mu.Unlock()
// check if worker is added
if _, ok := m.workerMap[worker.GetID()]; !ok {
return ErrWorkerNotAdded
}
// check if state of worker is not running
if worker.Status() != WorkerStatusStopped {
err := worker.Stop()
if err != nil {
return err
}
}
// remove worker from workerMap
delete(m.workerMap, worker.GetID())
err := m.checkAndSetRunningState()
if err != nil && err != ErrNoWorkers {
return err
}
return nil
}
// Start starts the manager
func (m *Manager) Start() error {
var err error
m.mu.Lock()
defer m.mu.Unlock()
if m.state == ManagerStateRunning {
return ErrManagerAlreadyRunning
}
if m.dbSaver != nil {
err = m.dbSaver.Start()
if err != nil {
if m.logger != nil {
m.logger.Error("Error while starting db saver", "error", err)
}
}
}
if len(m.workerMap) == 0 {
return ErrNoWorkers
}
var wrappedErr error
for _, worker := range m.workerMap {
err := worker.Start()
if err != nil && err != ErrWorkerAlreadyRunning {
if wrappedErr == nil {
wrappedErr = fmt.Errorf("Error: ")
}
wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
}
}
// check if we have one worker
for _, worker := range m.workerMap {
if worker.Status() == WorkerStatusRunning {
m.state = ManagerStateRunning
break
}
}
m.jobEventCh = make(chan interface{}, 100)
m.eventBus.Subscribe(QueueJob, m.jobEventCh)
m.eventBus.Subscribe(JobReady, m.jobEventCh)
go m.handleJobEvents()
err = m.checkAndSetRunningState()
if err != nil {
wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
}
if m.cronInstance != nil {
m.cronInstance.Start()
}
return wrappedErr
}
// Stop stops the manager
func (m *Manager) Stop() error {
m.mu.Lock()
defer m.mu.Unlock()
if m.state == ManagerStateStopped {
return ErrManagerAlreadyStopped
}
m.eventBus.Unsubscribe(QueueJob, m.jobEventCh)
m.eventBus.Unsubscribe(JobReady, m.jobEventCh)
close(m.jobEventCh)
var wrappedErr error
for _, worker := range m.workerMap {
err := worker.Stop()
if err != nil && err != ErrWorkerAlreadyStopped {
if wrappedErr == nil {
wrappedErr = fmt.Errorf("Error: ")
}
wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
}
}
err := m.checkAndSetRunningState()
if err != nil {
wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
}
if m.cronInstance != nil {
m.cronInstance.Stop()
}
if m.dbSaver != nil {
m.dbSaver.Stop()
}
return wrappedErr
}
func (m *Manager) SetLogger(logger Logger) *Manager {
m.mu.Lock()
defer m.mu.Unlock()
m.logger = logger
return m
}
func (m *Manager) GetLogger() Logger {
m.mu.Lock()
defer m.mu.Unlock()
return m.logger
}
// handleJobEvents handles job events
func (m *Manager) handleJobEvents() {
for event := range m.jobEventCh {
switch event := event.(type) {
case Event:
if m.logger != nil {
m.logger.Info("Event received", "event", event.Name, "data", event.Data, "message_id", event.MessageID)
}
switch event.Name {
case QueueJob:
if m.logger != nil {
m.logger.Info("Job queued", "job_id", event.Data.(GenericJob).GetID())
}
job := event.Data.(GenericJob)
err := m.queue.Enqueue(job)
if err != nil && err != ErrJobAlreadyExists {
if m.logger != nil {
m.logger.Error("Error while queueing job", "error", err)
}
}
case JobReady:
for {
nextJob, err := m.queue.Dequeue()
if err != nil {
break
}
if m.logger != nil {
m.logger.Info("Job ready", "job_id", nextJob.GetID())
}
assigned := false
maxTries := 10
for maxTries > 0 {
maxTries--
for _, worker := range m.workerMap {
if err := worker.AssignJob(nextJob); err == nil {
assigned = true
break
}
}
if assigned == true {
break
}
time.Sleep(1 * time.Second)
}
if !assigned {
if m.logger != nil {
m.logger.Info("No worker available for job", "job_id", nextJob.GetID())
}
} else {
if nextJob.GetScheduler() != nil {
if nextJob.GetScheduler().IsAdHoc() {
eventBus := m.GetEventBus()
eventBus.Publish(JobFinished, nextJob)
}
}
}
}
case JobFinished:
// job is finished and should be archived
job := event.Data.(GenericJob)
_ = job
err := m.CancelJobSchedule(job.GetID())
if err != nil {
if m.logger != nil {
m.logger.Error("Error while canceling job schedule", "error", err)
}
}
}
}
}
}
// ScheduleJob schedules a job
func (m *Manager) ScheduleJob(job GenericJob, scheduler Scheduler) error {
m.mu.Lock()
defer m.mu.Unlock()
if scheduler == nil {
return ErrSchedulerNotSet
}
if m.state != ManagerStateRunning {
return ErrManagerNotRunning
}
if job.GetScheduler() != nil {
return ErrJobAlreadyScheduled
}
if _, ok := m.activeJobs[job.GetID()]; ok {
return ErrJobAlreadyActive
}
job.SetScheduler(scheduler)
err := scheduler.Schedule(job, m.eventBus)
if err != nil {
return err
}
m.activeJobs[job.GetID()] = job
return nil
}
func (m *Manager) CancelJobSchedule(id JobID) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.state != ManagerStateRunning {
return ErrManagerNotRunning
}
job, ok := m.activeJobs[id]
if !ok {
return ErrJobNotActive
}
if job.GetScheduler() == nil {
return ErrJobNotScheduled
}
scheduler := job.GetScheduler()
err := scheduler.Cancel(job.GetID())
if err != nil {
return err
}
job.SetScheduler(nil)
delete(m.activeJobs, job.GetID())
return nil
}