Select Git revision
manager.go 5.07 KiB
package jobqueue
import (
"fmt"
"sync"
)
type ManagerState int
const (
ManagerStateStopped = iota
ManagerStateRunning
)
type Manager struct {
state ManagerState
queue *Queue
workerMap map[WorkerID]Worker
eventBus *EventBus
scheduled map[JobID]Scheduler
jobEventCh chan interface{}
mu sync.Mutex
}
// NewManager initializes a new Manager
func NewManager() *Manager {
eventBus := NewEventBus()
return &Manager{
state: ManagerStateStopped,
queue: NewQueue(eventBus),
workerMap: make(map[WorkerID]Worker),
eventBus: eventBus,
scheduled: make(map[JobID]Scheduler),
}
}
func (m *Manager) GetEventBus() *EventBus {
m.mu.Lock()
defer m.mu.Unlock()
return m.eventBus
}
func (m *Manager) checkAndSetRunningState() error {
m.state = ManagerStateStopped
if m.workerMap == nil {
return ErrNoWorkers
}
if len(m.workerMap) == 0 {
return ErrNoWorkers
}
for _, worker := range m.workerMap {
if worker.Status() == WorkerStatusRunning {
m.state = ManagerStateRunning
return nil
}
}
return nil
}
// AddWorker adds a worker to the manager
func (m *Manager) AddWorker(worker Worker) error {
m.mu.Lock()
defer m.mu.Unlock()
// check if worker is already added
if _, ok := m.workerMap[worker.GetID()]; ok {
return ErrWorkerAlreadyAdded
}
// check if state of worker is not running
if worker.Status() != WorkerStatusStopped {
return ErrWorkerAlreadyRunning
}
if m.state == ManagerStateRunning {
err := worker.Start()
if err != nil {
return err
}
}
// add worker to workerMap
m.workerMap[worker.GetID()] = worker
return m.checkAndSetRunningState()
}
// RemoveWorker removes a worker from the manager
func (m *Manager) RemoveWorker(worker Worker) error {
m.mu.Lock()
defer m.mu.Unlock()
// check if worker is added
if _, ok := m.workerMap[worker.GetID()]; !ok {
return ErrWorkerNotAdded
}
// check if state of worker is not running
if worker.Status() != WorkerStatusStopped {
err := worker.Stop()
if err != nil {
return err
}
}
// remove worker from workerMap
delete(m.workerMap, worker.GetID())
err := m.checkAndSetRunningState()
if err != nil && err != ErrNoWorkers {
return err
}
return nil
}
// Start starts the manager
func (m *Manager) Start() error {
m.mu.Lock()
defer m.mu.Unlock()
if m.state == ManagerStateRunning {
return ErrManagerAlreadyRunning
}
if len(m.workerMap) == 0 {
return ErrNoWorkers
}
var wrappedErr error
for _, worker := range m.workerMap {
err := worker.Start()
if err != nil && err != ErrWorkerAlreadyRunning {
if wrappedErr == nil {
wrappedErr = fmt.Errorf("Error: ")
}
wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
}
}
// check if we have one worker
for _, worker := range m.workerMap {
if worker.Status() == WorkerStatusRunning {
m.state = ManagerStateRunning
break
}
}
m.jobEventCh = make(chan interface{}, 100)
m.eventBus.Subscribe(QueueJob, m.jobEventCh)
m.eventBus.Subscribe(JobReady, m.jobEventCh)
go m.handleJobEvents()
err := m.checkAndSetRunningState()
if err != nil {
wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
}
return wrappedErr
}
// Stop stops the manager
func (m *Manager) Stop() error {
m.mu.Lock()
defer m.mu.Unlock()
if m.state == ManagerStateStopped {
return ErrManagerAlreadyStopped
}
m.eventBus.Unsubscribe(QueueJob, m.jobEventCh)
m.eventBus.Unsubscribe(JobReady, m.jobEventCh)
close(m.jobEventCh)
var wrappedErr error
for _, worker := range m.workerMap {
err := worker.Stop()
if err != nil && err != ErrWorkerAlreadyStopped {
if wrappedErr == nil {
wrappedErr = fmt.Errorf("Error: ")
}
wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
}
}
err := m.checkAndSetRunningState()
if err != nil {
wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
}
return wrappedErr
}
func (m *Manager) handleJobEvents() {
for event := range m.jobEventCh {
switch event := event.(type) {
case Event:
switch event.Name {
case QueueJob:
job := event.Data.(GenericJob)
err := m.queue.Enqueue(job)
if err != nil && err != ErrJobAlreadyExists {
fmt.Println(err)
}
case JobReady:
for {
nextJob, err := m.queue.Dequeue()
if err != nil {
break
}
for _, worker := range m.workerMap {
if err := worker.AssignJob(nextJob); err == nil {
break
}
}
}
}
}
}
}
// ScheduleJob schedules a job
func (m *Manager) ScheduleJob(job GenericJob, scheduler Scheduler) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.state != ManagerStateRunning {
return ErrManagerNotRunning
}
m.scheduled[job.GetID()] = scheduler
return scheduler.Schedule(job, m.eventBus)
}
// CancelJob cancels a scheduled job
func (m *Manager) CancelJob(id JobID) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.state != ManagerStateRunning {
return ErrManagerNotRunning
}
if _, ok := m.scheduled[id]; !ok {
return ErrJobNotScheduled
}
scheduler, ok := m.scheduled[id]
if !ok {
return ErrJobNotScheduled
}
err := scheduler.Cancel(id)
if err != nil {
return err
}
delete(m.scheduled, id)
return nil
}