package jobqueue import ( "fmt" "sync" ) type ManagerState int const ( ManagerStateStopped = iota ManagerStateRunning ) type Manager struct { state ManagerState queue *Queue workerMap map[WorkerID]Worker eventBus *EventBus scheduled map[JobID]Scheduler jobEventCh chan interface{} mu sync.Mutex } // NewManager initializes a new Manager func NewManager() *Manager { eventBus := NewEventBus() return &Manager{ state: ManagerStateStopped, queue: NewQueue(eventBus), workerMap: make(map[WorkerID]Worker), eventBus: eventBus, scheduled: make(map[JobID]Scheduler), } } func (m *Manager) GetEventBus() *EventBus { m.mu.Lock() defer m.mu.Unlock() return m.eventBus } func (m *Manager) checkAndSetRunningState() error { m.state = ManagerStateStopped if m.workerMap == nil { return ErrNoWorkers } if len(m.workerMap) == 0 { return ErrNoWorkers } for _, worker := range m.workerMap { if worker.Status() == WorkerStatusRunning { m.state = ManagerStateRunning return nil } } return nil } // AddWorker adds a worker to the manager func (m *Manager) AddWorker(worker Worker) error { m.mu.Lock() defer m.mu.Unlock() // check if worker is already added if _, ok := m.workerMap[worker.GetID()]; ok { return ErrWorkerAlreadyAdded } // check if state of worker is not running if worker.Status() != WorkerStatusStopped { return ErrWorkerAlreadyRunning } if m.state == ManagerStateRunning { err := worker.Start() if err != nil { return err } } // add worker to workerMap m.workerMap[worker.GetID()] = worker return m.checkAndSetRunningState() } // RemoveWorker removes a worker from the manager func (m *Manager) RemoveWorker(worker Worker) error { m.mu.Lock() defer m.mu.Unlock() // check if worker is added if _, ok := m.workerMap[worker.GetID()]; !ok { return ErrWorkerNotAdded } // check if state of worker is not running if worker.Status() != WorkerStatusStopped { err := worker.Stop() if err != nil { return err } } // remove worker from workerMap delete(m.workerMap, worker.GetID()) err := m.checkAndSetRunningState() if err != nil && err != ErrNoWorkers { return err } return nil } // Start starts the manager func (m *Manager) Start() error { m.mu.Lock() defer m.mu.Unlock() if m.state == ManagerStateRunning { return ErrManagerAlreadyRunning } if len(m.workerMap) == 0 { return ErrNoWorkers } var wrappedErr error for _, worker := range m.workerMap { err := worker.Start() if err != nil && err != ErrWorkerAlreadyRunning { if wrappedErr == nil { wrappedErr = fmt.Errorf("Error: ") } wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error()) } } // check if we have one worker for _, worker := range m.workerMap { if worker.Status() == WorkerStatusRunning { m.state = ManagerStateRunning break } } m.jobEventCh = make(chan interface{}, 100) m.eventBus.Subscribe(QueueJob, m.jobEventCh) m.eventBus.Subscribe(JobReady, m.jobEventCh) go m.handleJobEvents() err := m.checkAndSetRunningState() if err != nil { wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error()) } return wrappedErr } // Stop stops the manager func (m *Manager) Stop() error { m.mu.Lock() defer m.mu.Unlock() if m.state == ManagerStateStopped { return ErrManagerAlreadyStopped } m.eventBus.Unsubscribe(QueueJob, m.jobEventCh) m.eventBus.Unsubscribe(JobReady, m.jobEventCh) close(m.jobEventCh) var wrappedErr error for _, worker := range m.workerMap { err := worker.Stop() if err != nil && err != ErrWorkerAlreadyStopped { if wrappedErr == nil { wrappedErr = fmt.Errorf("Error: ") } wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error()) } } err := m.checkAndSetRunningState() if err != nil { wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error()) } return wrappedErr } func (m *Manager) handleJobEvents() { for event := range m.jobEventCh { switch event := event.(type) { case Event: switch event.Name { case QueueJob: job := event.Data.(GenericJob) err := m.queue.Enqueue(job) if err != nil && err != ErrJobAlreadyExists { fmt.Println(err) } case JobReady: for { nextJob, err := m.queue.Dequeue() if err != nil { break } for _, worker := range m.workerMap { if err := worker.AssignJob(nextJob); err == nil { break } } } } } } } // ScheduleJob schedules a job func (m *Manager) ScheduleJob(job GenericJob, scheduler Scheduler) error { m.mu.Lock() defer m.mu.Unlock() if m.state != ManagerStateRunning { return ErrManagerNotRunning } m.scheduled[job.GetID()] = scheduler return scheduler.Schedule(job, m.eventBus) } // CancelJob cancels a scheduled job func (m *Manager) CancelJob(id JobID) error { m.mu.Lock() defer m.mu.Unlock() if m.state != ManagerStateRunning { return ErrManagerNotRunning } if _, ok := m.scheduled[id]; !ok { return ErrJobNotScheduled } scheduler, ok := m.scheduled[id] if !ok { return ErrJobNotScheduled } err := scheduler.Cancel(id) if err != nil { return err } delete(m.scheduled, id) return nil }