From 7fe47833df16a7189dc161378920f64a4a01b8c2 Mon Sep 17 00:00:00 2001 From: Volker Schukai <volker.schukai@schukai.com> Date: Sun, 12 Nov 2023 01:15:06 +0100 Subject: [PATCH] feat: pause and resume jobs #14 --- errors.go | 1 + event-bus.go | 80 ++++++++++++++++++++++++++++++++++++++++++----- event-bus_test.go | 5 +++ job-generic.go | 7 +++++ job.go | 63 ++++++++++++++++++++++++++++++++++--- job_test.go | 11 +++++++ manager.go | 67 ++++++++++++++++++++++++++++++++++++++- manager_test.go | 21 +++++++++++-- scheduler.go | 34 +++++++++++++++----- util.go | 15 +++++++++ worker.go | 5 +-- worker_test.go | 19 +++++++++-- 12 files changed, 300 insertions(+), 28 deletions(-) create mode 100644 util.go diff --git a/errors.go b/errors.go index 22efbaa..6936c04 100644 --- a/errors.go +++ b/errors.go @@ -40,4 +40,5 @@ var ( ErrSchedulerNotSet = fmt.Errorf("scheduler is not set") ErrJobNotActive = fmt.Errorf("job is not active") ErrJobAlreadyActive = fmt.Errorf("job is already active") + ErrChannelAlreadyClosed = fmt.Errorf("channel is already closed") ) diff --git a/event-bus.go b/event-bus.go index b62c7ba..9892af5 100644 --- a/event-bus.go +++ b/event-bus.go @@ -1,12 +1,18 @@ package jobqueue import ( + "fmt" "github.com/google/uuid" "sync" + "time" ) type EventName string +func (e EventName) String() string { + return string(e) +} + const ( JobAdded EventName = "JobAdded" JobReady EventName = "JobReady" @@ -14,39 +20,62 @@ const ( JobFinished EventName = "JobFinished" ) +type MessageID string + +func (m MessageID) String() string { + return string(m) +} + +func NewMessageID() MessageID { + return MessageID(uuid.New().String()) +} + type Event struct { Name EventName Data any - MessageID string + MessageID MessageID } // EventBus is a simple event bus type EventBus struct { - subscribers map[EventName][]chan interface{} - mu sync.RWMutex + subscribers map[EventName][]chan interface{} + publishErr map[MessageID]error + mu sync.RWMutex + shutdownChan chan struct{} + wg sync.WaitGroup } // NewEventBus creates a new event bus func NewEventBus() *EventBus { return &EventBus{ - subscribers: make(map[EventName][]chan interface{}), + subscribers: make(map[EventName][]chan interface{}), + publishErr: make(map[MessageID]error), + shutdownChan: make(chan struct{}), } } +func (eb *EventBus) Shutdown() { + close(eb.shutdownChan) + eb.wg.Wait() +} + // Subscribe adds a channel to the subscribers list func (eb *EventBus) Subscribe(name EventName, ch chan interface{}) { eb.mu.Lock() defer eb.mu.Unlock() + if _, found := eb.subscribers[name]; !found { eb.subscribers[name] = []chan interface{}{} } eb.subscribers[name] = append(eb.subscribers[name], ch) + } // Unsubscribe removes a channel from the subscribers list func (eb *EventBus) Unsubscribe(name EventName, ch chan interface{}) { eb.mu.Lock() defer eb.mu.Unlock() + if channels, found := eb.subscribers[name]; found { for i := range channels { if channels[i] == ch { @@ -57,18 +86,55 @@ func (eb *EventBus) Unsubscribe(name EventName, ch chan interface{}) { } } +func (eb *EventBus) GetPublishError(msgID MessageID) error { + eb.mu.RLock() + defer eb.mu.RUnlock() + return eb.publishErr[msgID] +} + +func (eb *EventBus) SetPublishError(msgID MessageID, err error) { + eb.mu.Lock() + defer eb.mu.Unlock() + eb.publishErr[msgID] = err +} + // Publish publishes an event to all subscribers -func (eb *EventBus) Publish(name EventName, data interface{}) { +func (eb *EventBus) Publish(name EventName, data any) { eb.mu.RLock() defer eb.mu.RUnlock() + + select { + case <-eb.shutdownChan: + return + default: + + } + if channels, found := eb.subscribers[name]; found { for _, ch := range channels { + eb.wg.Add(1) go func(ch chan interface{}) { - ch <- Event{ + defer eb.wg.Done() + + msgID := NewMessageID() + + defer func() { + if r := recover(); r != nil { + eb.SetPublishError(msgID, fmt.Errorf("publish panic: %v", r)) + } + }() + + select { + case ch <- Event{ Name: name, Data: data, - MessageID: uuid.New().String(), + MessageID: msgID, + }: + + case <-time.After(time.Second * 1): + eb.SetPublishError(msgID, fmt.Errorf("publish timeout")) } + }(ch) } } diff --git a/event-bus_test.go b/event-bus_test.go index 66671b6..67c78ed 100644 --- a/event-bus_test.go +++ b/event-bus_test.go @@ -10,6 +10,7 @@ func TestSubscribeAndPublish(t *testing.T) { eb := NewEventBus() jobAddedCh := make(chan interface{}, 1) + defer close(jobAddedCh) eb.Subscribe(JobAdded, jobAddedCh) jobData := "New Job Data" @@ -32,6 +33,8 @@ func TestUnsubscribe(t *testing.T) { eb := NewEventBus() jobAddedCh := make(chan interface{}, 1) + defer close(jobAddedCh) + eb.Subscribe(JobAdded, jobAddedCh) eb.Unsubscribe(JobAdded, jobAddedCh) @@ -53,6 +56,8 @@ func TestMultipleSubscribers(t *testing.T) { jobAddedCh2 := make(chan interface{}, 1) eb.Subscribe(JobAdded, jobAddedCh1) eb.Subscribe(JobAdded, jobAddedCh2) + defer close(jobAddedCh1) + defer close(jobAddedCh2) jobData := "New Job Data" eb.Publish(JobAdded, jobData) diff --git a/job-generic.go b/job-generic.go index 07d2d43..aaa37b5 100644 --- a/job-generic.go +++ b/job-generic.go @@ -26,4 +26,11 @@ type GenericJob interface { SetScheduler(scheduler Scheduler) GetScheduler() Scheduler + + Pause() + PauseUntil(until time.Time) + + Resume() + + IsPaused() bool } diff --git a/job.go b/job.go index f922cb3..59777b7 100644 --- a/job.go +++ b/job.go @@ -35,6 +35,9 @@ type Job[T any] struct { scheduler Scheduler + pause bool + pauseUntil time.Time + dependencies []JobID mu sync.Mutex @@ -116,15 +119,52 @@ func (j *Job[T]) GetPersistence() JobPersistence { func (j *Job[T]) SetScheduler(scheduler Scheduler) { j.mu.Lock() defer j.mu.Unlock() - j.scheduler = scheduler } +// Pause pauses the job +func (j *Job[T]) Pause() { + j.mu.Lock() + defer j.mu.Unlock() + j.pause = true + +} + +// PauseUntil pauses the job until the given time +func (j *Job[T]) PauseUntil(until time.Time) { + j.mu.Lock() + defer j.mu.Unlock() + j.pause = true + j.pauseUntil = until + +} + +func (j *Job[T]) Resume() { + j.mu.Lock() + defer j.mu.Unlock() + j.pause = false + j.pauseUntil = time.Time{} + +} + +// IsPaused returns true if the job is paused +func (j *Job[T]) IsPaused() bool { + j.mu.Lock() + defer j.mu.Unlock() + if j.pause { + if j.pauseUntil.IsZero() { + return true + } else { + return j.pauseUntil.After(time.Now()) + } + } + return false +} + // GetScheduler returns the scheduler of the job func (j *Job[T]) GetScheduler() Scheduler { j.mu.Lock() defer j.mu.Unlock() - return j.scheduler } @@ -142,14 +182,27 @@ func (j *Job[T]) Execute(ctx context.Context) (RunGenericResult, error) { j.stats.RunCount++ // Update TimeMetrics - j.stats.TimeMetrics.TotalRunTime += elapsedTime + newTotalRunTime := j.stats.TimeMetrics.TotalRunTime + elapsedTime + if newTotalRunTime > j.stats.TimeMetrics.TotalRunTime { // no overflow happened + j.stats.TimeMetrics.TotalRunTime = newTotalRunTime + } else { + // set to max + j.stats.TimeMetrics.TotalRunTime = time.Duration(^uint64(0) >> 1) + } + if j.stats.TimeMetrics.MinRunTime == 0 || elapsedTime < j.stats.TimeMetrics.MinRunTime { j.stats.TimeMetrics.MinRunTime = elapsedTime } + if elapsedTime > j.stats.TimeMetrics.MaxRunTime { j.stats.TimeMetrics.MaxRunTime = elapsedTime } - j.stats.TimeMetrics.AvgRunTime = j.stats.TimeMetrics.TotalRunTime / time.Duration(j.stats.RunCount) + + if j.stats.RunCount == 0 { + j.stats.TimeMetrics.AvgRunTime = 0 + } else { + j.stats.TimeMetrics.AvgRunTime = j.stats.TimeMetrics.TotalRunTime / time.Duration(j.stats.RunCount) + } // Update SuccessCount or ErrorCount and codes if runnerError == nil { @@ -186,7 +239,7 @@ func (j *Job[T]) Execute(ctx context.Context) (RunGenericResult, error) { return genericResult, runnerError } -// Cancel cancels the job +// Cancel cancels the job, currently a no-op func (j *Job[T]) Cancel() error { return nil } diff --git a/job_test.go b/job_test.go index 79b54df..e09377d 100644 --- a/job_test.go +++ b/job_test.go @@ -9,6 +9,17 @@ import ( "time" ) +func TestPauseJob(t *testing.T) { + + runner := &ShellRunnable{ScriptPath: "path"} + job := NewJob[ShellResult]("id1", runner) + + job.Pause() + assert.True(t, job.IsPaused()) + job.Resume() + assert.False(t, job.IsPaused()) +} + func TestNewJob(t *testing.T) { runner := &ShellRunnable{ScriptPath: "path"} job := NewJob[ShellResult]("id1", runner) diff --git a/manager.go b/manager.go index dad53ee..1bf57d3 100644 --- a/manager.go +++ b/manager.go @@ -107,6 +107,70 @@ func (m *Manager) GetActiveJobs() map[JobID]GenericJob { return m.activeJobs } +func (m *Manager) RemoveJob(id JobID) error { + m.mu.Lock() + defer m.mu.Unlock() + + if _, ok := m.activeJobs[id]; !ok { + return ErrJobNotActive + } + + scheduler := m.activeJobs[id].GetScheduler() + + switch scheduler.(type) { + case *CronScheduler: + if err := scheduler.(*CronScheduler).Cancel(id); err != nil { + return err + } + case *DelayScheduler: + if err := scheduler.(*DelayScheduler).Cancel(id); err != nil { + return err + } + case *EventScheduler: + if err := scheduler.(*EventScheduler).Cancel(id); err != nil { + return err + } + case *InstantScheduler: + if err := scheduler.(*InstantScheduler).Cancel(id); err != nil { + return err + } + case *IntervalScheduler: + if err := scheduler.(*IntervalScheduler).Cancel(id); err != nil { + return err + } + default: + return fmt.Errorf("Unknown scheduler type") + + } + + delete(m.activeJobs, id) + return nil + +} + +func (m *Manager) UpdateJob(job GenericJob) error { + m.mu.Lock() + defer m.mu.Unlock() + + if _, ok := m.activeJobs[job.GetID()]; !ok { + return ErrJobNotActive + } + + scheduler := m.activeJobs[job.GetID()].GetScheduler() + + err := m.RemoveJob(job.GetID()) + if err != nil { + return err + } + + err = m.ScheduleJob(job, scheduler) + if err != nil { + return err + } + + return nil +} + // ContainsActiveJob checks if a job is active func (m *Manager) ContainsActiveJob(id JobID) bool { m.mu.Lock() @@ -115,6 +179,7 @@ func (m *Manager) ContainsActiveJob(id JobID) bool { return ok } +// SetDB sets the database connection func (m *Manager) SetDB(db *gorm.DB) *Manager { m.mu.Lock() defer m.mu.Unlock() @@ -297,7 +362,7 @@ func (m *Manager) Stop() error { m.eventBus.Unsubscribe(QueueJob, m.jobEventCh) m.eventBus.Unsubscribe(JobReady, m.jobEventCh) - close(m.jobEventCh) + _ = safeClose(m.jobEventCh) var wrappedErr error diff --git a/manager_test.go b/manager_test.go index b5abdd6..6406c72 100644 --- a/manager_test.go +++ b/manager_test.go @@ -93,6 +93,22 @@ func (m *MockGenericJob) Cancel() error { return nil } +func (m *MockGenericJob) Pause() { + +} + +func (m *MockGenericJob) PauseUntil(until time.Time) { + +} + +func (m *MockGenericJob) Resume() { + +} + +func (m *MockGenericJob) IsPaused() bool { + return false +} + func (m *MockGenericJob) GetPersistence() JobPersistence { return JobPersistence{} } @@ -107,9 +123,10 @@ func (m *MockGenericJob) GetScheduler() Scheduler { } func TestNewManager(t *testing.T) { - eventBus := NewEventBus() - manager := NewManager() + manager := NewManager() + eventBus := manager.eventBus + assert.NotNil(t, manager) assert.Equal(t, ManagerState(ManagerStateStopped), manager.state) assert.NotNil(t, manager.queue) diff --git a/scheduler.go b/scheduler.go index 75328a1..5fce6f1 100644 --- a/scheduler.go +++ b/scheduler.go @@ -58,7 +58,11 @@ func (s *IntervalScheduler) Schedule(job GenericJob, eventBus *EventBus) error { for { select { case <-ticker.C: - eventBus.Publish(QueueJob, job) + + if !job.IsPaused() { + eventBus.Publish(QueueJob, job) + } + case <-stopChan: ticker.Stop() return @@ -143,7 +147,9 @@ func (s *CronScheduler) Schedule(job GenericJob, eventBus *EventBus) error { } entryId, err := s.cron.AddFunc(s.Spec, func() { - eventBus.Publish(QueueJob, job) + if !job.IsPaused() { + eventBus.Publish(QueueJob, job) + } }) s.jobs[id] = entryId @@ -229,7 +235,9 @@ func (s *DelayScheduler) Schedule(job GenericJob, eventBus *EventBus) error { go func() { select { case <-timer.C: - eventBus.Publish(QueueJob, job) + if !job.IsPaused() { + eventBus.Publish(QueueJob, job) + } case <-stopChan: timer.Stop() } @@ -289,13 +297,19 @@ func (s *DelayScheduler) GetPersistence() SchedulerPersistence { // EventScheduler is a scheduler that schedules a job when an event is received type EventScheduler struct { - Event EventName - jobs map[JobID]StopChan + Event EventName + EventBus *EventBus + jobs map[JobID]StopChan } func (s *EventScheduler) Schedule(job GenericJob, eventBus *EventBus) error { ch := make(chan interface{}) - eventBus.Subscribe(s.Event, ch) + + if s.EventBus != nil { + s.EventBus.Subscribe(s.Event, ch) + } else { + eventBus.Subscribe(s.Event, ch) + } if s.jobs == nil { s.jobs = make(map[JobID]StopChan) @@ -313,7 +327,9 @@ func (s *EventScheduler) Schedule(job GenericJob, eventBus *EventBus) error { for { select { case <-ch: - eventBus.Publish(QueueJob, job) + if !job.IsPaused() { + eventBus.Publish(QueueJob, job) + } case <-stopChan: eventBus.Unsubscribe(s.Event, ch) return @@ -378,7 +394,9 @@ func (s *EventScheduler) GetPersistence() SchedulerPersistence { type InstantScheduler struct{} func (s *InstantScheduler) Schedule(job GenericJob, eventBus *EventBus) error { - eventBus.Publish(QueueJob, job) + if !job.IsPaused() { + eventBus.Publish(QueueJob, job) + } return nil } diff --git a/util.go b/util.go new file mode 100644 index 0000000..0fd0e2c --- /dev/null +++ b/util.go @@ -0,0 +1,15 @@ +package jobqueue + +// safeClose closes the given channel and returns an error if the channel is already closed +func safeClose(ch chan interface{}) (err error) { + defer func() { + if recover() != nil { + err = ErrChannelAlreadyClosed + } + }() + + err = nil + + close(ch) + return +} diff --git a/worker.go b/worker.go index 893a33e..63253ab 100644 --- a/worker.go +++ b/worker.go @@ -83,6 +83,7 @@ func (w *LocalWorker) Start() error { go w.run(w.jobChannels[i], w.stopChans[i], w.cancelChans[i]) } + time.Sleep(200 * time.Millisecond) // wait go routine until select w.wg.Wait() w.status = WorkerStatusRunning @@ -121,7 +122,6 @@ func (w *LocalWorker) Stop() error { } func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancelChan chan bool) { - w.wg.Done() workerThreadID := w.ID.String() + "-" + fmt.Sprintf("%p", &w) @@ -130,8 +130,10 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel } stopFlag := false + w.wg.Done() for { + select { case job := <-jobChannel: ctx, cancel := context.WithCancel(context.Background()) @@ -210,7 +212,6 @@ func (w *LocalWorker) AssignJob(job GenericJob) error { } for _, ch := range w.jobChannels { - select { case ch <- job: return nil diff --git a/worker_test.go b/worker_test.go index a940f5f..d2a7fc8 100644 --- a/worker_test.go +++ b/worker_test.go @@ -15,6 +15,22 @@ func (j DummyJob) GetID() JobID { return j.id } +func (j DummyJob) Pause() { + +} + +func (j DummyJob) PauseUntil(until time.Time) { + +} + +func (j DummyJob) Resume() { + +} + +func (j DummyJob) IsPaused() bool { + return false +} + func (j DummyJob) GetMaxRetries() uint { return 0 } @@ -206,9 +222,6 @@ func TestCancelJob(t *testing.T) { t.Errorf("AssignJob() returned error: %v", err) } - // Test job cancellation - //worker.CancelJob(JobID("1")) - err = worker.Stop() if err != nil { t.Errorf("Stop() returned error: %v", err) -- GitLab