Skip to content
Snippets Groups Projects
Verified Commit 7fe47833 authored by Volker Schukai's avatar Volker Schukai :alien:
Browse files

feat: pause and resume jobs #14

parent 736fa2dd
No related branches found
No related tags found
No related merge requests found
......@@ -40,4 +40,5 @@ var (
ErrSchedulerNotSet = fmt.Errorf("scheduler is not set")
ErrJobNotActive = fmt.Errorf("job is not active")
ErrJobAlreadyActive = fmt.Errorf("job is already active")
ErrChannelAlreadyClosed = fmt.Errorf("channel is already closed")
)
package jobqueue
import (
"fmt"
"github.com/google/uuid"
"sync"
"time"
)
type EventName string
func (e EventName) String() string {
return string(e)
}
const (
JobAdded EventName = "JobAdded"
JobReady EventName = "JobReady"
......@@ -14,39 +20,62 @@ const (
JobFinished EventName = "JobFinished"
)
type MessageID string
func (m MessageID) String() string {
return string(m)
}
func NewMessageID() MessageID {
return MessageID(uuid.New().String())
}
type Event struct {
Name EventName
Data any
MessageID string
MessageID MessageID
}
// EventBus is a simple event bus
type EventBus struct {
subscribers map[EventName][]chan interface{}
publishErr map[MessageID]error
mu sync.RWMutex
shutdownChan chan struct{}
wg sync.WaitGroup
}
// NewEventBus creates a new event bus
func NewEventBus() *EventBus {
return &EventBus{
subscribers: make(map[EventName][]chan interface{}),
publishErr: make(map[MessageID]error),
shutdownChan: make(chan struct{}),
}
}
func (eb *EventBus) Shutdown() {
close(eb.shutdownChan)
eb.wg.Wait()
}
// Subscribe adds a channel to the subscribers list
func (eb *EventBus) Subscribe(name EventName, ch chan interface{}) {
eb.mu.Lock()
defer eb.mu.Unlock()
if _, found := eb.subscribers[name]; !found {
eb.subscribers[name] = []chan interface{}{}
}
eb.subscribers[name] = append(eb.subscribers[name], ch)
}
// Unsubscribe removes a channel from the subscribers list
func (eb *EventBus) Unsubscribe(name EventName, ch chan interface{}) {
eb.mu.Lock()
defer eb.mu.Unlock()
if channels, found := eb.subscribers[name]; found {
for i := range channels {
if channels[i] == ch {
......@@ -57,18 +86,55 @@ func (eb *EventBus) Unsubscribe(name EventName, ch chan interface{}) {
}
}
func (eb *EventBus) GetPublishError(msgID MessageID) error {
eb.mu.RLock()
defer eb.mu.RUnlock()
return eb.publishErr[msgID]
}
func (eb *EventBus) SetPublishError(msgID MessageID, err error) {
eb.mu.Lock()
defer eb.mu.Unlock()
eb.publishErr[msgID] = err
}
// Publish publishes an event to all subscribers
func (eb *EventBus) Publish(name EventName, data interface{}) {
func (eb *EventBus) Publish(name EventName, data any) {
eb.mu.RLock()
defer eb.mu.RUnlock()
select {
case <-eb.shutdownChan:
return
default:
}
if channels, found := eb.subscribers[name]; found {
for _, ch := range channels {
eb.wg.Add(1)
go func(ch chan interface{}) {
ch <- Event{
defer eb.wg.Done()
msgID := NewMessageID()
defer func() {
if r := recover(); r != nil {
eb.SetPublishError(msgID, fmt.Errorf("publish panic: %v", r))
}
}()
select {
case ch <- Event{
Name: name,
Data: data,
MessageID: uuid.New().String(),
MessageID: msgID,
}:
case <-time.After(time.Second * 1):
eb.SetPublishError(msgID, fmt.Errorf("publish timeout"))
}
}(ch)
}
}
......
......@@ -10,6 +10,7 @@ func TestSubscribeAndPublish(t *testing.T) {
eb := NewEventBus()
jobAddedCh := make(chan interface{}, 1)
defer close(jobAddedCh)
eb.Subscribe(JobAdded, jobAddedCh)
jobData := "New Job Data"
......@@ -32,6 +33,8 @@ func TestUnsubscribe(t *testing.T) {
eb := NewEventBus()
jobAddedCh := make(chan interface{}, 1)
defer close(jobAddedCh)
eb.Subscribe(JobAdded, jobAddedCh)
eb.Unsubscribe(JobAdded, jobAddedCh)
......@@ -53,6 +56,8 @@ func TestMultipleSubscribers(t *testing.T) {
jobAddedCh2 := make(chan interface{}, 1)
eb.Subscribe(JobAdded, jobAddedCh1)
eb.Subscribe(JobAdded, jobAddedCh2)
defer close(jobAddedCh1)
defer close(jobAddedCh2)
jobData := "New Job Data"
eb.Publish(JobAdded, jobData)
......
......@@ -26,4 +26,11 @@ type GenericJob interface {
SetScheduler(scheduler Scheduler)
GetScheduler() Scheduler
Pause()
PauseUntil(until time.Time)
Resume()
IsPaused() bool
}
......@@ -35,6 +35,9 @@ type Job[T any] struct {
scheduler Scheduler
pause bool
pauseUntil time.Time
dependencies []JobID
mu sync.Mutex
......@@ -116,15 +119,52 @@ func (j *Job[T]) GetPersistence() JobPersistence {
func (j *Job[T]) SetScheduler(scheduler Scheduler) {
j.mu.Lock()
defer j.mu.Unlock()
j.scheduler = scheduler
}
// Pause pauses the job
func (j *Job[T]) Pause() {
j.mu.Lock()
defer j.mu.Unlock()
j.pause = true
}
// PauseUntil pauses the job until the given time
func (j *Job[T]) PauseUntil(until time.Time) {
j.mu.Lock()
defer j.mu.Unlock()
j.pause = true
j.pauseUntil = until
}
func (j *Job[T]) Resume() {
j.mu.Lock()
defer j.mu.Unlock()
j.pause = false
j.pauseUntil = time.Time{}
}
// IsPaused returns true if the job is paused
func (j *Job[T]) IsPaused() bool {
j.mu.Lock()
defer j.mu.Unlock()
if j.pause {
if j.pauseUntil.IsZero() {
return true
} else {
return j.pauseUntil.After(time.Now())
}
}
return false
}
// GetScheduler returns the scheduler of the job
func (j *Job[T]) GetScheduler() Scheduler {
j.mu.Lock()
defer j.mu.Unlock()
return j.scheduler
}
......@@ -142,14 +182,27 @@ func (j *Job[T]) Execute(ctx context.Context) (RunGenericResult, error) {
j.stats.RunCount++
// Update TimeMetrics
j.stats.TimeMetrics.TotalRunTime += elapsedTime
newTotalRunTime := j.stats.TimeMetrics.TotalRunTime + elapsedTime
if newTotalRunTime > j.stats.TimeMetrics.TotalRunTime { // no overflow happened
j.stats.TimeMetrics.TotalRunTime = newTotalRunTime
} else {
// set to max
j.stats.TimeMetrics.TotalRunTime = time.Duration(^uint64(0) >> 1)
}
if j.stats.TimeMetrics.MinRunTime == 0 || elapsedTime < j.stats.TimeMetrics.MinRunTime {
j.stats.TimeMetrics.MinRunTime = elapsedTime
}
if elapsedTime > j.stats.TimeMetrics.MaxRunTime {
j.stats.TimeMetrics.MaxRunTime = elapsedTime
}
if j.stats.RunCount == 0 {
j.stats.TimeMetrics.AvgRunTime = 0
} else {
j.stats.TimeMetrics.AvgRunTime = j.stats.TimeMetrics.TotalRunTime / time.Duration(j.stats.RunCount)
}
// Update SuccessCount or ErrorCount and codes
if runnerError == nil {
......@@ -186,7 +239,7 @@ func (j *Job[T]) Execute(ctx context.Context) (RunGenericResult, error) {
return genericResult, runnerError
}
// Cancel cancels the job
// Cancel cancels the job, currently a no-op
func (j *Job[T]) Cancel() error {
return nil
}
......
......@@ -9,6 +9,17 @@ import (
"time"
)
func TestPauseJob(t *testing.T) {
runner := &ShellRunnable{ScriptPath: "path"}
job := NewJob[ShellResult]("id1", runner)
job.Pause()
assert.True(t, job.IsPaused())
job.Resume()
assert.False(t, job.IsPaused())
}
func TestNewJob(t *testing.T) {
runner := &ShellRunnable{ScriptPath: "path"}
job := NewJob[ShellResult]("id1", runner)
......
......@@ -107,6 +107,70 @@ func (m *Manager) GetActiveJobs() map[JobID]GenericJob {
return m.activeJobs
}
func (m *Manager) RemoveJob(id JobID) error {
m.mu.Lock()
defer m.mu.Unlock()
if _, ok := m.activeJobs[id]; !ok {
return ErrJobNotActive
}
scheduler := m.activeJobs[id].GetScheduler()
switch scheduler.(type) {
case *CronScheduler:
if err := scheduler.(*CronScheduler).Cancel(id); err != nil {
return err
}
case *DelayScheduler:
if err := scheduler.(*DelayScheduler).Cancel(id); err != nil {
return err
}
case *EventScheduler:
if err := scheduler.(*EventScheduler).Cancel(id); err != nil {
return err
}
case *InstantScheduler:
if err := scheduler.(*InstantScheduler).Cancel(id); err != nil {
return err
}
case *IntervalScheduler:
if err := scheduler.(*IntervalScheduler).Cancel(id); err != nil {
return err
}
default:
return fmt.Errorf("Unknown scheduler type")
}
delete(m.activeJobs, id)
return nil
}
func (m *Manager) UpdateJob(job GenericJob) error {
m.mu.Lock()
defer m.mu.Unlock()
if _, ok := m.activeJobs[job.GetID()]; !ok {
return ErrJobNotActive
}
scheduler := m.activeJobs[job.GetID()].GetScheduler()
err := m.RemoveJob(job.GetID())
if err != nil {
return err
}
err = m.ScheduleJob(job, scheduler)
if err != nil {
return err
}
return nil
}
// ContainsActiveJob checks if a job is active
func (m *Manager) ContainsActiveJob(id JobID) bool {
m.mu.Lock()
......@@ -115,6 +179,7 @@ func (m *Manager) ContainsActiveJob(id JobID) bool {
return ok
}
// SetDB sets the database connection
func (m *Manager) SetDB(db *gorm.DB) *Manager {
m.mu.Lock()
defer m.mu.Unlock()
......@@ -297,7 +362,7 @@ func (m *Manager) Stop() error {
m.eventBus.Unsubscribe(QueueJob, m.jobEventCh)
m.eventBus.Unsubscribe(JobReady, m.jobEventCh)
close(m.jobEventCh)
_ = safeClose(m.jobEventCh)
var wrappedErr error
......
......@@ -93,6 +93,22 @@ func (m *MockGenericJob) Cancel() error {
return nil
}
func (m *MockGenericJob) Pause() {
}
func (m *MockGenericJob) PauseUntil(until time.Time) {
}
func (m *MockGenericJob) Resume() {
}
func (m *MockGenericJob) IsPaused() bool {
return false
}
func (m *MockGenericJob) GetPersistence() JobPersistence {
return JobPersistence{}
}
......@@ -107,8 +123,9 @@ func (m *MockGenericJob) GetScheduler() Scheduler {
}
func TestNewManager(t *testing.T) {
eventBus := NewEventBus()
manager := NewManager()
eventBus := manager.eventBus
assert.NotNil(t, manager)
assert.Equal(t, ManagerState(ManagerStateStopped), manager.state)
......
......@@ -58,7 +58,11 @@ func (s *IntervalScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
for {
select {
case <-ticker.C:
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
}
case <-stopChan:
ticker.Stop()
return
......@@ -143,7 +147,9 @@ func (s *CronScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
}
entryId, err := s.cron.AddFunc(s.Spec, func() {
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
}
})
s.jobs[id] = entryId
......@@ -229,7 +235,9 @@ func (s *DelayScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
go func() {
select {
case <-timer.C:
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
}
case <-stopChan:
timer.Stop()
}
......@@ -290,12 +298,18 @@ func (s *DelayScheduler) GetPersistence() SchedulerPersistence {
// EventScheduler is a scheduler that schedules a job when an event is received
type EventScheduler struct {
Event EventName
EventBus *EventBus
jobs map[JobID]StopChan
}
func (s *EventScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
ch := make(chan interface{})
if s.EventBus != nil {
s.EventBus.Subscribe(s.Event, ch)
} else {
eventBus.Subscribe(s.Event, ch)
}
if s.jobs == nil {
s.jobs = make(map[JobID]StopChan)
......@@ -313,7 +327,9 @@ func (s *EventScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
for {
select {
case <-ch:
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
}
case <-stopChan:
eventBus.Unsubscribe(s.Event, ch)
return
......@@ -378,7 +394,9 @@ func (s *EventScheduler) GetPersistence() SchedulerPersistence {
type InstantScheduler struct{}
func (s *InstantScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
}
return nil
}
......
util.go 0 → 100644
package jobqueue
// safeClose closes the given channel and returns an error if the channel is already closed
func safeClose(ch chan interface{}) (err error) {
defer func() {
if recover() != nil {
err = ErrChannelAlreadyClosed
}
}()
err = nil
close(ch)
return
}
......@@ -83,6 +83,7 @@ func (w *LocalWorker) Start() error {
go w.run(w.jobChannels[i], w.stopChans[i], w.cancelChans[i])
}
time.Sleep(200 * time.Millisecond) // wait go routine until select
w.wg.Wait()
w.status = WorkerStatusRunning
......@@ -121,7 +122,6 @@ func (w *LocalWorker) Stop() error {
}
func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancelChan chan bool) {
w.wg.Done()
workerThreadID := w.ID.String() + "-" + fmt.Sprintf("%p", &w)
......@@ -130,8 +130,10 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel
}
stopFlag := false
w.wg.Done()
for {
select {
case job := <-jobChannel:
ctx, cancel := context.WithCancel(context.Background())
......@@ -210,7 +212,6 @@ func (w *LocalWorker) AssignJob(job GenericJob) error {
}
for _, ch := range w.jobChannels {
select {
case ch <- job:
return nil
......
......@@ -15,6 +15,22 @@ func (j DummyJob) GetID() JobID {
return j.id
}
func (j DummyJob) Pause() {
}
func (j DummyJob) PauseUntil(until time.Time) {
}
func (j DummyJob) Resume() {
}
func (j DummyJob) IsPaused() bool {
return false
}
func (j DummyJob) GetMaxRetries() uint {
return 0
}
......@@ -206,9 +222,6 @@ func TestCancelJob(t *testing.T) {
t.Errorf("AssignJob() returned error: %v", err)
}
// Test job cancellation
//worker.CancelJob(JobID("1"))
err = worker.Stop()
if err != nil {
t.Errorf("Stop() returned error: %v", err)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment