diff --git a/errors.go b/errors.go index 22efbaa18eb3b8f058bc6783191a786be59f89a0..6936c041a1e901d90439611dc88e05f77fb4b102 100644 --- a/errors.go +++ b/errors.go @@ -40,4 +40,5 @@ var ( ErrSchedulerNotSet = fmt.Errorf("scheduler is not set") ErrJobNotActive = fmt.Errorf("job is not active") ErrJobAlreadyActive = fmt.Errorf("job is already active") + ErrChannelAlreadyClosed = fmt.Errorf("channel is already closed") ) diff --git a/event-bus.go b/event-bus.go index b62c7badd18f31c240842d9d6b9bc3c5a6adc182..9892af596aa92ee8a4e1a23cd8b73420c3fd6713 100644 --- a/event-bus.go +++ b/event-bus.go @@ -1,12 +1,18 @@ package jobqueue import ( + "fmt" "github.com/google/uuid" "sync" + "time" ) type EventName string +func (e EventName) String() string { + return string(e) +} + const ( JobAdded EventName = "JobAdded" JobReady EventName = "JobReady" @@ -14,39 +20,62 @@ const ( JobFinished EventName = "JobFinished" ) +type MessageID string + +func (m MessageID) String() string { + return string(m) +} + +func NewMessageID() MessageID { + return MessageID(uuid.New().String()) +} + type Event struct { Name EventName Data any - MessageID string + MessageID MessageID } // EventBus is a simple event bus type EventBus struct { - subscribers map[EventName][]chan interface{} - mu sync.RWMutex + subscribers map[EventName][]chan interface{} + publishErr map[MessageID]error + mu sync.RWMutex + shutdownChan chan struct{} + wg sync.WaitGroup } // NewEventBus creates a new event bus func NewEventBus() *EventBus { return &EventBus{ - subscribers: make(map[EventName][]chan interface{}), + subscribers: make(map[EventName][]chan interface{}), + publishErr: make(map[MessageID]error), + shutdownChan: make(chan struct{}), } } +func (eb *EventBus) Shutdown() { + close(eb.shutdownChan) + eb.wg.Wait() +} + // Subscribe adds a channel to the subscribers list func (eb *EventBus) Subscribe(name EventName, ch chan interface{}) { eb.mu.Lock() defer eb.mu.Unlock() + if _, found := eb.subscribers[name]; !found { eb.subscribers[name] = []chan interface{}{} } eb.subscribers[name] = append(eb.subscribers[name], ch) + } // Unsubscribe removes a channel from the subscribers list func (eb *EventBus) Unsubscribe(name EventName, ch chan interface{}) { eb.mu.Lock() defer eb.mu.Unlock() + if channels, found := eb.subscribers[name]; found { for i := range channels { if channels[i] == ch { @@ -57,18 +86,55 @@ func (eb *EventBus) Unsubscribe(name EventName, ch chan interface{}) { } } +func (eb *EventBus) GetPublishError(msgID MessageID) error { + eb.mu.RLock() + defer eb.mu.RUnlock() + return eb.publishErr[msgID] +} + +func (eb *EventBus) SetPublishError(msgID MessageID, err error) { + eb.mu.Lock() + defer eb.mu.Unlock() + eb.publishErr[msgID] = err +} + // Publish publishes an event to all subscribers -func (eb *EventBus) Publish(name EventName, data interface{}) { +func (eb *EventBus) Publish(name EventName, data any) { eb.mu.RLock() defer eb.mu.RUnlock() + + select { + case <-eb.shutdownChan: + return + default: + + } + if channels, found := eb.subscribers[name]; found { for _, ch := range channels { + eb.wg.Add(1) go func(ch chan interface{}) { - ch <- Event{ + defer eb.wg.Done() + + msgID := NewMessageID() + + defer func() { + if r := recover(); r != nil { + eb.SetPublishError(msgID, fmt.Errorf("publish panic: %v", r)) + } + }() + + select { + case ch <- Event{ Name: name, Data: data, - MessageID: uuid.New().String(), + MessageID: msgID, + }: + + case <-time.After(time.Second * 1): + eb.SetPublishError(msgID, fmt.Errorf("publish timeout")) } + }(ch) } } diff --git a/event-bus_test.go b/event-bus_test.go index 66671b6dfa36aa9e4605b54d09c35f07d7711733..67c78ed47c0c8bb41204a48d869db09c66168b35 100644 --- a/event-bus_test.go +++ b/event-bus_test.go @@ -10,6 +10,7 @@ func TestSubscribeAndPublish(t *testing.T) { eb := NewEventBus() jobAddedCh := make(chan interface{}, 1) + defer close(jobAddedCh) eb.Subscribe(JobAdded, jobAddedCh) jobData := "New Job Data" @@ -32,6 +33,8 @@ func TestUnsubscribe(t *testing.T) { eb := NewEventBus() jobAddedCh := make(chan interface{}, 1) + defer close(jobAddedCh) + eb.Subscribe(JobAdded, jobAddedCh) eb.Unsubscribe(JobAdded, jobAddedCh) @@ -53,6 +56,8 @@ func TestMultipleSubscribers(t *testing.T) { jobAddedCh2 := make(chan interface{}, 1) eb.Subscribe(JobAdded, jobAddedCh1) eb.Subscribe(JobAdded, jobAddedCh2) + defer close(jobAddedCh1) + defer close(jobAddedCh2) jobData := "New Job Data" eb.Publish(JobAdded, jobData) diff --git a/job-generic.go b/job-generic.go index 07d2d43f44b87e2d80010b2b1c6672be75d019b1..aaa37b50546e3a503e1da3f1cddcdd742b38568c 100644 --- a/job-generic.go +++ b/job-generic.go @@ -26,4 +26,11 @@ type GenericJob interface { SetScheduler(scheduler Scheduler) GetScheduler() Scheduler + + Pause() + PauseUntil(until time.Time) + + Resume() + + IsPaused() bool } diff --git a/job.go b/job.go index f922cb3af4b90956c70074fca247d14777d6d279..59777b7923cfe0ab6141bd65de344ebf36586ca2 100644 --- a/job.go +++ b/job.go @@ -35,6 +35,9 @@ type Job[T any] struct { scheduler Scheduler + pause bool + pauseUntil time.Time + dependencies []JobID mu sync.Mutex @@ -116,15 +119,52 @@ func (j *Job[T]) GetPersistence() JobPersistence { func (j *Job[T]) SetScheduler(scheduler Scheduler) { j.mu.Lock() defer j.mu.Unlock() - j.scheduler = scheduler } +// Pause pauses the job +func (j *Job[T]) Pause() { + j.mu.Lock() + defer j.mu.Unlock() + j.pause = true + +} + +// PauseUntil pauses the job until the given time +func (j *Job[T]) PauseUntil(until time.Time) { + j.mu.Lock() + defer j.mu.Unlock() + j.pause = true + j.pauseUntil = until + +} + +func (j *Job[T]) Resume() { + j.mu.Lock() + defer j.mu.Unlock() + j.pause = false + j.pauseUntil = time.Time{} + +} + +// IsPaused returns true if the job is paused +func (j *Job[T]) IsPaused() bool { + j.mu.Lock() + defer j.mu.Unlock() + if j.pause { + if j.pauseUntil.IsZero() { + return true + } else { + return j.pauseUntil.After(time.Now()) + } + } + return false +} + // GetScheduler returns the scheduler of the job func (j *Job[T]) GetScheduler() Scheduler { j.mu.Lock() defer j.mu.Unlock() - return j.scheduler } @@ -142,14 +182,27 @@ func (j *Job[T]) Execute(ctx context.Context) (RunGenericResult, error) { j.stats.RunCount++ // Update TimeMetrics - j.stats.TimeMetrics.TotalRunTime += elapsedTime + newTotalRunTime := j.stats.TimeMetrics.TotalRunTime + elapsedTime + if newTotalRunTime > j.stats.TimeMetrics.TotalRunTime { // no overflow happened + j.stats.TimeMetrics.TotalRunTime = newTotalRunTime + } else { + // set to max + j.stats.TimeMetrics.TotalRunTime = time.Duration(^uint64(0) >> 1) + } + if j.stats.TimeMetrics.MinRunTime == 0 || elapsedTime < j.stats.TimeMetrics.MinRunTime { j.stats.TimeMetrics.MinRunTime = elapsedTime } + if elapsedTime > j.stats.TimeMetrics.MaxRunTime { j.stats.TimeMetrics.MaxRunTime = elapsedTime } - j.stats.TimeMetrics.AvgRunTime = j.stats.TimeMetrics.TotalRunTime / time.Duration(j.stats.RunCount) + + if j.stats.RunCount == 0 { + j.stats.TimeMetrics.AvgRunTime = 0 + } else { + j.stats.TimeMetrics.AvgRunTime = j.stats.TimeMetrics.TotalRunTime / time.Duration(j.stats.RunCount) + } // Update SuccessCount or ErrorCount and codes if runnerError == nil { @@ -186,7 +239,7 @@ func (j *Job[T]) Execute(ctx context.Context) (RunGenericResult, error) { return genericResult, runnerError } -// Cancel cancels the job +// Cancel cancels the job, currently a no-op func (j *Job[T]) Cancel() error { return nil } diff --git a/job_test.go b/job_test.go index 79b54df160f438fa9154a21e86d805c19cac418f..e09377d5bd7e76c3713f75de4ae4489f2eaae192 100644 --- a/job_test.go +++ b/job_test.go @@ -9,6 +9,17 @@ import ( "time" ) +func TestPauseJob(t *testing.T) { + + runner := &ShellRunnable{ScriptPath: "path"} + job := NewJob[ShellResult]("id1", runner) + + job.Pause() + assert.True(t, job.IsPaused()) + job.Resume() + assert.False(t, job.IsPaused()) +} + func TestNewJob(t *testing.T) { runner := &ShellRunnable{ScriptPath: "path"} job := NewJob[ShellResult]("id1", runner) diff --git a/manager.go b/manager.go index dad53ee93264564d7a5a2bf0f448c64fce6dd16a..1bf57d3fd5b7e759221ce4df6304a2eb22c20418 100644 --- a/manager.go +++ b/manager.go @@ -107,6 +107,70 @@ func (m *Manager) GetActiveJobs() map[JobID]GenericJob { return m.activeJobs } +func (m *Manager) RemoveJob(id JobID) error { + m.mu.Lock() + defer m.mu.Unlock() + + if _, ok := m.activeJobs[id]; !ok { + return ErrJobNotActive + } + + scheduler := m.activeJobs[id].GetScheduler() + + switch scheduler.(type) { + case *CronScheduler: + if err := scheduler.(*CronScheduler).Cancel(id); err != nil { + return err + } + case *DelayScheduler: + if err := scheduler.(*DelayScheduler).Cancel(id); err != nil { + return err + } + case *EventScheduler: + if err := scheduler.(*EventScheduler).Cancel(id); err != nil { + return err + } + case *InstantScheduler: + if err := scheduler.(*InstantScheduler).Cancel(id); err != nil { + return err + } + case *IntervalScheduler: + if err := scheduler.(*IntervalScheduler).Cancel(id); err != nil { + return err + } + default: + return fmt.Errorf("Unknown scheduler type") + + } + + delete(m.activeJobs, id) + return nil + +} + +func (m *Manager) UpdateJob(job GenericJob) error { + m.mu.Lock() + defer m.mu.Unlock() + + if _, ok := m.activeJobs[job.GetID()]; !ok { + return ErrJobNotActive + } + + scheduler := m.activeJobs[job.GetID()].GetScheduler() + + err := m.RemoveJob(job.GetID()) + if err != nil { + return err + } + + err = m.ScheduleJob(job, scheduler) + if err != nil { + return err + } + + return nil +} + // ContainsActiveJob checks if a job is active func (m *Manager) ContainsActiveJob(id JobID) bool { m.mu.Lock() @@ -115,6 +179,7 @@ func (m *Manager) ContainsActiveJob(id JobID) bool { return ok } +// SetDB sets the database connection func (m *Manager) SetDB(db *gorm.DB) *Manager { m.mu.Lock() defer m.mu.Unlock() @@ -297,7 +362,7 @@ func (m *Manager) Stop() error { m.eventBus.Unsubscribe(QueueJob, m.jobEventCh) m.eventBus.Unsubscribe(JobReady, m.jobEventCh) - close(m.jobEventCh) + _ = safeClose(m.jobEventCh) var wrappedErr error diff --git a/manager_test.go b/manager_test.go index b5abdd614500a74cfdf971a6edf7b4c654f10f80..6406c726bd9668e20bbd4d6cd54408a32e56024a 100644 --- a/manager_test.go +++ b/manager_test.go @@ -93,6 +93,22 @@ func (m *MockGenericJob) Cancel() error { return nil } +func (m *MockGenericJob) Pause() { + +} + +func (m *MockGenericJob) PauseUntil(until time.Time) { + +} + +func (m *MockGenericJob) Resume() { + +} + +func (m *MockGenericJob) IsPaused() bool { + return false +} + func (m *MockGenericJob) GetPersistence() JobPersistence { return JobPersistence{} } @@ -107,9 +123,10 @@ func (m *MockGenericJob) GetScheduler() Scheduler { } func TestNewManager(t *testing.T) { - eventBus := NewEventBus() - manager := NewManager() + manager := NewManager() + eventBus := manager.eventBus + assert.NotNil(t, manager) assert.Equal(t, ManagerState(ManagerStateStopped), manager.state) assert.NotNil(t, manager.queue) diff --git a/scheduler.go b/scheduler.go index 75328a17d00f2f2fdab7baa875f8d28683e2ffdb..5fce6f1b0e846ab1da102e9347b0cedabc7ce22e 100644 --- a/scheduler.go +++ b/scheduler.go @@ -58,7 +58,11 @@ func (s *IntervalScheduler) Schedule(job GenericJob, eventBus *EventBus) error { for { select { case <-ticker.C: - eventBus.Publish(QueueJob, job) + + if !job.IsPaused() { + eventBus.Publish(QueueJob, job) + } + case <-stopChan: ticker.Stop() return @@ -143,7 +147,9 @@ func (s *CronScheduler) Schedule(job GenericJob, eventBus *EventBus) error { } entryId, err := s.cron.AddFunc(s.Spec, func() { - eventBus.Publish(QueueJob, job) + if !job.IsPaused() { + eventBus.Publish(QueueJob, job) + } }) s.jobs[id] = entryId @@ -229,7 +235,9 @@ func (s *DelayScheduler) Schedule(job GenericJob, eventBus *EventBus) error { go func() { select { case <-timer.C: - eventBus.Publish(QueueJob, job) + if !job.IsPaused() { + eventBus.Publish(QueueJob, job) + } case <-stopChan: timer.Stop() } @@ -289,13 +297,19 @@ func (s *DelayScheduler) GetPersistence() SchedulerPersistence { // EventScheduler is a scheduler that schedules a job when an event is received type EventScheduler struct { - Event EventName - jobs map[JobID]StopChan + Event EventName + EventBus *EventBus + jobs map[JobID]StopChan } func (s *EventScheduler) Schedule(job GenericJob, eventBus *EventBus) error { ch := make(chan interface{}) - eventBus.Subscribe(s.Event, ch) + + if s.EventBus != nil { + s.EventBus.Subscribe(s.Event, ch) + } else { + eventBus.Subscribe(s.Event, ch) + } if s.jobs == nil { s.jobs = make(map[JobID]StopChan) @@ -313,7 +327,9 @@ func (s *EventScheduler) Schedule(job GenericJob, eventBus *EventBus) error { for { select { case <-ch: - eventBus.Publish(QueueJob, job) + if !job.IsPaused() { + eventBus.Publish(QueueJob, job) + } case <-stopChan: eventBus.Unsubscribe(s.Event, ch) return @@ -378,7 +394,9 @@ func (s *EventScheduler) GetPersistence() SchedulerPersistence { type InstantScheduler struct{} func (s *InstantScheduler) Schedule(job GenericJob, eventBus *EventBus) error { - eventBus.Publish(QueueJob, job) + if !job.IsPaused() { + eventBus.Publish(QueueJob, job) + } return nil } diff --git a/util.go b/util.go new file mode 100644 index 0000000000000000000000000000000000000000..0fd0e2cd45858dca3bf6d6870a51c60ccabcd591 --- /dev/null +++ b/util.go @@ -0,0 +1,15 @@ +package jobqueue + +// safeClose closes the given channel and returns an error if the channel is already closed +func safeClose(ch chan interface{}) (err error) { + defer func() { + if recover() != nil { + err = ErrChannelAlreadyClosed + } + }() + + err = nil + + close(ch) + return +} diff --git a/worker.go b/worker.go index 893a33e479785acab7b02eb1f61a46d5a9ee0cd4..63253ab163790e562f1eee7e41407ff94ca16e59 100644 --- a/worker.go +++ b/worker.go @@ -83,6 +83,7 @@ func (w *LocalWorker) Start() error { go w.run(w.jobChannels[i], w.stopChans[i], w.cancelChans[i]) } + time.Sleep(200 * time.Millisecond) // wait go routine until select w.wg.Wait() w.status = WorkerStatusRunning @@ -121,7 +122,6 @@ func (w *LocalWorker) Stop() error { } func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancelChan chan bool) { - w.wg.Done() workerThreadID := w.ID.String() + "-" + fmt.Sprintf("%p", &w) @@ -130,8 +130,10 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel } stopFlag := false + w.wg.Done() for { + select { case job := <-jobChannel: ctx, cancel := context.WithCancel(context.Background()) @@ -210,7 +212,6 @@ func (w *LocalWorker) AssignJob(job GenericJob) error { } for _, ch := range w.jobChannels { - select { case ch <- job: return nil diff --git a/worker_test.go b/worker_test.go index a940f5f3c1534131d88cc091490f1ab566b3850b..d2a7fc8da9473cf9fd23fce2188fd82857cb1c64 100644 --- a/worker_test.go +++ b/worker_test.go @@ -15,6 +15,22 @@ func (j DummyJob) GetID() JobID { return j.id } +func (j DummyJob) Pause() { + +} + +func (j DummyJob) PauseUntil(until time.Time) { + +} + +func (j DummyJob) Resume() { + +} + +func (j DummyJob) IsPaused() bool { + return false +} + func (j DummyJob) GetMaxRetries() uint { return 0 } @@ -206,9 +222,6 @@ func TestCancelJob(t *testing.T) { t.Errorf("AssignJob() returned error: %v", err) } - // Test job cancellation - //worker.CancelJob(JobID("1")) - err = worker.Stop() if err != nil { t.Errorf("Stop() returned error: %v", err)