Skip to content
Snippets Groups Projects
Select Git revision
  • 2f940e680de80aaa1972bf939f449777a6336f51
  • master default protected
  • 1.31
  • 4.38.8
  • 4.38.7
  • 4.38.6
  • 4.38.5
  • 4.38.4
  • 4.38.3
  • 4.38.2
  • 4.38.1
  • 4.38.0
  • 4.37.2
  • 4.37.1
  • 4.37.0
  • 4.36.0
  • 4.35.0
  • 4.34.1
  • 4.34.0
  • 4.33.1
  • 4.33.0
  • 4.32.2
  • 4.32.1
23 results

Monster.I18n.Locale.html

Blame
  • manager.go 5.07 KiB
    package jobqueue
    
    import (
    	"fmt"
    	"sync"
    )
    
    type ManagerState int
    
    const (
    	ManagerStateStopped = iota
    	ManagerStateRunning
    )
    
    type Manager struct {
    	state ManagerState
    
    	queue     *Queue
    	workerMap map[WorkerID]Worker
    	eventBus  *EventBus
    	scheduled map[JobID]Scheduler
    
    	jobEventCh chan interface{}
    
    	mu sync.Mutex
    }
    
    // NewManager initializes a new Manager
    func NewManager() *Manager {
    
    	eventBus := NewEventBus()
    
    	return &Manager{
    		state:     ManagerStateStopped,
    		queue:     NewQueue(eventBus),
    		workerMap: make(map[WorkerID]Worker),
    		eventBus:  eventBus,
    		scheduled: make(map[JobID]Scheduler),
    	}
    }
    
    func (m *Manager) GetEventBus() *EventBus {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    	return m.eventBus
    }
    
    func (m *Manager) checkAndSetRunningState() error {
    
    	m.state = ManagerStateStopped
    
    	if m.workerMap == nil {
    		return ErrNoWorkers
    	}
    
    	if len(m.workerMap) == 0 {
    		return ErrNoWorkers
    	}
    
    	for _, worker := range m.workerMap {
    		if worker.Status() == WorkerStatusRunning {
    			m.state = ManagerStateRunning
    			return nil
    		}
    	}
    
    	return nil
    }
    
    // AddWorker adds a worker to the manager
    func (m *Manager) AddWorker(worker Worker) error {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    
    	// check if worker is already added
    	if _, ok := m.workerMap[worker.GetID()]; ok {
    		return ErrWorkerAlreadyAdded
    	}
    
    	// check if state of worker is not running
    	if worker.Status() != WorkerStatusStopped {
    		return ErrWorkerAlreadyRunning
    	}
    
    	if m.state == ManagerStateRunning {
    		err := worker.Start()
    		if err != nil {
    			return err
    		}
    	}
    
    	// add worker to workerMap
    	m.workerMap[worker.GetID()] = worker
    
    	return m.checkAndSetRunningState()
    }
    
    // RemoveWorker removes a worker from the manager
    func (m *Manager) RemoveWorker(worker Worker) error {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    
    	// check if worker is added
    	if _, ok := m.workerMap[worker.GetID()]; !ok {
    		return ErrWorkerNotAdded
    	}
    
    	// check if state of worker is not running
    	if worker.Status() != WorkerStatusStopped {
    		err := worker.Stop()
    		if err != nil {
    			return err
    		}
    	}
    
    	// remove worker from workerMap
    	delete(m.workerMap, worker.GetID())
    	err := m.checkAndSetRunningState()
    
    	if err != nil && err != ErrNoWorkers {
    		return err
    	}
    
    	return nil
    
    }
    
    // Start starts the manager
    func (m *Manager) Start() error {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    
    	if m.state == ManagerStateRunning {
    		return ErrManagerAlreadyRunning
    	}
    
    	if len(m.workerMap) == 0 {
    		return ErrNoWorkers
    	}
    
    	var wrappedErr error
    
    	for _, worker := range m.workerMap {
    		err := worker.Start()
    		if err != nil && err != ErrWorkerAlreadyRunning {
    			if wrappedErr == nil {
    				wrappedErr = fmt.Errorf("Error: ")
    			}
    
    			wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
    		}
    	}
    
    	// check if we have one worker
    	for _, worker := range m.workerMap {
    		if worker.Status() == WorkerStatusRunning {
    			m.state = ManagerStateRunning
    			break
    		}
    	}
    
    	m.jobEventCh = make(chan interface{}, 100)
    	m.eventBus.Subscribe(QueueJob, m.jobEventCh)
    	m.eventBus.Subscribe(JobReady, m.jobEventCh)
    	go m.handleJobEvents()
    
    	err := m.checkAndSetRunningState()
    
    	if err != nil {
    		wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
    	}
    
    	return wrappedErr
    
    }
    
    // Stop stops the manager
    func (m *Manager) Stop() error {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    
    	if m.state == ManagerStateStopped {
    		return ErrManagerAlreadyStopped
    	}
    
    	m.eventBus.Unsubscribe(QueueJob, m.jobEventCh)
    	m.eventBus.Unsubscribe(JobReady, m.jobEventCh)
    	close(m.jobEventCh)
    
    	var wrappedErr error
    
    	for _, worker := range m.workerMap {
    		err := worker.Stop()
    		if err != nil && err != ErrWorkerAlreadyStopped {
    			if wrappedErr == nil {
    				wrappedErr = fmt.Errorf("Error: ")
    			}
    
    			wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
    		}
    	}
    
    	err := m.checkAndSetRunningState()
    
    	if err != nil {
    		wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
    	}
    
    	return wrappedErr
    }
    
    func (m *Manager) handleJobEvents() {
    
    	for event := range m.jobEventCh {
    		switch event := event.(type) {
    		case Event:
    
    			switch event.Name {
    			case QueueJob:
    				job := event.Data.(GenericJob)
    				err := m.queue.Enqueue(job)
    				if err != nil && err != ErrJobAlreadyExists {
    					fmt.Println(err)
    
    				}
    			case JobReady:
    				for {
    					nextJob, err := m.queue.Dequeue()
    					if err != nil {
    						break
    					}
    
    					for _, worker := range m.workerMap {
    						if err := worker.AssignJob(nextJob); err == nil {
    							break
    						}
    					}
    				}
    			}
    		}
    	}
    }
    
    // ScheduleJob schedules a job
    func (m *Manager) ScheduleJob(job GenericJob, scheduler Scheduler) error {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    
    	if m.state != ManagerStateRunning {
    		return ErrManagerNotRunning
    	}
    
    	m.scheduled[job.GetID()] = scheduler
    
    	return scheduler.Schedule(job, m.eventBus)
    }
    
    // CancelJob cancels a scheduled job
    func (m *Manager) CancelJob(id JobID) error {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    
    	if m.state != ManagerStateRunning {
    		return ErrManagerNotRunning
    	}
    
    	if _, ok := m.scheduled[id]; !ok {
    		return ErrJobNotScheduled
    	}
    
    	scheduler, ok := m.scheduled[id]
    	if !ok {
    		return ErrJobNotScheduled
    	}
    
    	err := scheduler.Cancel(id)
    	if err != nil {
    		return err
    	}
    
    	delete(m.scheduled, id)
    
    	return nil
    
    }