From 1a93bb5466ea1b6393f79b2f781ddf3f8363ce64 Mon Sep 17 00:00:00 2001 From: Volker Schukai <volker.schukai@schukai.com> Date: Thu, 30 Nov 2023 20:46:47 +0100 Subject: [PATCH] feat: new fsnotify scheduler #33 --- go.mod | 7 +- go.sum | 7 + manager.go | 71 ++++-- queue.go | 58 ++++- queue_test.go | 7 +- runnable-shell.go | 2 +- schedule-cron.go | 94 ++++++++ schedule-delay.go | 99 ++++++++ schedule-interval.go | 108 +++++++++ schedule-time.go | 112 +++++++++ scheduler-event.go | 104 +++++++++ scheduler-inotify.go | 139 +++++++++++ scheduler-instant.go | 37 +++ scheduler.go | 545 +------------------------------------------ scheduler_test.go | 42 ++++ 15 files changed, 862 insertions(+), 570 deletions(-) create mode 100644 schedule-cron.go create mode 100644 schedule-delay.go create mode 100644 schedule-interval.go create mode 100644 schedule-time.go create mode 100644 scheduler-event.go create mode 100644 scheduler-inotify.go create mode 100644 scheduler-instant.go diff --git a/go.mod b/go.mod index 9c1ee4b..55c99eb 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,12 @@ module gitlab.schukai.com/oss/libraries/go/services/job-queues -go 1.20 +go 1.21 require ( github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/docker/docker v24.0.6+incompatible github.com/docker/go-connections v0.4.0 + github.com/fsnotify/fsnotify v1.7.0 github.com/go-chi/chi/v5 v5.0.10 github.com/google/uuid v1.4.0 github.com/pkg/sftp v1.13.6 @@ -13,7 +14,7 @@ require ( github.com/shirou/gopsutil/v3 v3.23.10 github.com/stretchr/testify v1.8.4 go.uber.org/zap v1.26.0 - golang.org/x/crypto v0.14.0 + golang.org/x/crypto v0.16.0 gopkg.in/yaml.v3 v3.0.1 gorm.io/driver/mysql v1.5.2 gorm.io/gorm v1.25.5 @@ -46,7 +47,7 @@ require ( go.uber.org/multierr v1.11.0 // indirect golang.org/x/mod v0.8.0 // indirect golang.org/x/net v0.10.0 // indirect - golang.org/x/sys v0.14.0 // indirect + golang.org/x/sys v0.15.0 // indirect golang.org/x/time v0.3.0 // indirect golang.org/x/tools v0.6.0 // indirect gotest.tools/v3 v3.5.1 // indirect diff --git a/go.sum b/go.sum index 91a5103..451a031 100644 --- a/go.sum +++ b/go.sum @@ -16,6 +16,8 @@ github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKoh github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/go-chi/chi/v5 v5.0.10 h1:rLz5avzKpjqxrYwXNfmjkrYYXOyLJd37pz53UFHC6vk= github.com/go-chi/chi/v5 v5.0.10/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= @@ -98,6 +100,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY= +golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= @@ -134,10 +138,13 @@ golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= +golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= diff --git a/manager.go b/manager.go index 9056c3c..cc9d6bd 100644 --- a/manager.go +++ b/manager.go @@ -2,6 +2,7 @@ package jobqueue import ( "fmt" + "github.com/fsnotify/fsnotify" "github.com/robfig/cron/v3" "gorm.io/gorm" "sync" @@ -52,12 +53,14 @@ func NewManager() *Manager { } +// GetEventBus returns the event bus func (m *Manager) GetEventBus() *EventBus { m.mu.Lock() defer m.mu.Unlock() return m.eventBus } +// SetCronInstance sets the cron instance func (m *Manager) SetCronInstance(cronInstance *cron.Cron) *Manager { m.mu.Lock() defer m.mu.Unlock() @@ -65,12 +68,14 @@ func (m *Manager) SetCronInstance(cronInstance *cron.Cron) *Manager { return m } +// GetCronInstance returns the cron instance func (m *Manager) GetCronInstance() *cron.Cron { m.mu.Lock() defer m.mu.Unlock() return m.cronInstance } +// NewCronScheduler creates a new cron scheduler func (m *Manager) NewCronScheduler(spec string) *CronScheduler { return &CronScheduler{ Spec: spec, @@ -78,28 +83,47 @@ func (m *Manager) NewCronScheduler(spec string) *CronScheduler { } } +// NewInstantScheduler creates a new instant scheduler func (m *Manager) NewInstantScheduler() *InstantScheduler { return &InstantScheduler{} } +// NewIntervalScheduler creates a new interval scheduler func (m *Manager) NewIntervalScheduler(interval time.Duration) *IntervalScheduler { return &IntervalScheduler{ Interval: interval, } } +// NewDelayScheduler creates a new delay scheduler func (m *Manager) NewDelayScheduler(delay time.Duration) *DelayScheduler { return &DelayScheduler{ Delay: delay, } } +// NewEventScheduler creates a new event scheduler func (m *Manager) NewEventScheduler(event EventName) *EventScheduler { return &EventScheduler{ Event: event, } } +// NewTimeScheduler creates a new time scheduler +func (m *Manager) NewTimeScheduler(t time.Time) *TimeScheduler { + return &TimeScheduler{ + Time: t, + } +} + +// NewInotifyScheduler creates a new inotify scheduler +func (m *Manager) NewInotifyScheduler(path string, eventFlags fsnotify.Op) *InotifyScheduler { + return &InotifyScheduler{ + Path: path, + EventFlags: eventFlags, + } +} + // GetActiveJobs returns the active jobs func (m *Manager) GetActiveJobs() map[JobID]GenericJob { m.mu.Lock() @@ -189,38 +213,35 @@ func (m *Manager) ResetJobStats(id JobID) error { return nil } +type CancelScheduler interface { + Cancel(id JobID) error +} + +func cancelSchedulerByManager(scheduler CancelScheduler, id JobID) error { + + err := scheduler.Cancel(id) + if err != nil { + return err + } + + return nil + +} + func (m *Manager) removeJobInternal(id JobID) error { scheduler := m.activeJobs[id].GetScheduler() + if scheduler == nil { + return ErrJobNotScheduled + } - switch scheduler.(type) { - case *CronScheduler: - if err := scheduler.(*CronScheduler).Cancel(id); err != nil { - return err - } - case *DelayScheduler: - if err := scheduler.(*DelayScheduler).Cancel(id); err != nil { - return err - } - case *EventScheduler: - if err := scheduler.(*EventScheduler).Cancel(id); err != nil { - return err - } - case *InstantScheduler: - if err := scheduler.(*InstantScheduler).Cancel(id); err != nil { - return err - } - case *IntervalScheduler: - if err := scheduler.(*IntervalScheduler).Cancel(id); err != nil { - return err - } - case *TimeScheduler: - if err := scheduler.(*TimeScheduler).Cancel(id); err != nil { + if cancelScheduler, ok := scheduler.(CancelScheduler); ok { + err := cancelSchedulerByManager(cancelScheduler, id) + if err != nil { return err } - default: + } else { return ErrUnknownScheduleType - } delete(m.activeJobs, id) diff --git a/queue.go b/queue.go index 49f9ba3..069af8f 100644 --- a/queue.go +++ b/queue.go @@ -2,14 +2,26 @@ package jobqueue import ( "sync" + "time" ) +// MaxAge is the maximum age of a job in the processed jobs map +const MaxAge = 24 * time.Hour + +// MaxProcessedJobs is the maximum number of jobs in the processed jobs map +const MaxProcessedJobs = 50000 + +type ProcessedJobInfo struct { + ProcessedTime time.Time + ID JobID +} + // Queue is a job queue type Queue struct { jobMap map[JobID]GenericJob pendingDependencies map[JobID][]JobID readyQueue []GenericJob - processedJobs map[JobID]struct{} + processedJobs []ProcessedJobInfo eventBus *EventBus mu sync.Mutex manger *Manager @@ -21,7 +33,7 @@ func NewQueue(EventBus *EventBus) *Queue { jobMap: make(map[JobID]GenericJob), pendingDependencies: make(map[JobID][]JobID), readyQueue: []GenericJob{}, - processedJobs: make(map[JobID]struct{}), + processedJobs: []ProcessedJobInfo{}, eventBus: EventBus, } } @@ -85,7 +97,8 @@ func (q *Queue) Enqueue(job GenericJob) error { fullJobList = append(fullJobList, job) } - for id := range q.processedJobs { + for i := range q.processedJobs { + id := q.processedJobs[i].ID fullJobList = append(fullJobList, q.jobMap[id]) } @@ -117,6 +130,40 @@ func (q *Queue) Enqueue(job GenericJob) error { return nil } +// ClearProcessedJobs removes old jobs from the processed jobs map +func (q *Queue) ClearProcessedJobs() { + q.mu.Lock() + defer q.mu.Unlock() + + // remove old jobs + currentTime := time.Now() + cutoffIndex := 0 + for i, jobInfo := range q.processedJobs { + if currentTime.Sub(jobInfo.ProcessedTime) <= MaxAge { + cutoffIndex = i + break + } + } + q.processedJobs = q.processedJobs[cutoffIndex:] + + // if the processed jobs map is too large, remove the oldest jobs + if len(q.processedJobs) > MaxProcessedJobs { + startIdx := len(q.processedJobs) - MaxProcessedJobs + q.processedJobs = q.processedJobs[startIdx:] + } +} + +func (q *Queue) isDependency(id JobID) bool { + for _, deps := range q.pendingDependencies { + for _, depID := range deps { + if depID == id { + return true + } + } + } + return false +} + // Dequeue removes a job from the queue func (q *Queue) Dequeue() (GenericJob, error) { q.mu.Lock() @@ -130,7 +177,10 @@ func (q *Queue) Dequeue() (GenericJob, error) { q.readyQueue = q.readyQueue[1:] // Mark the job as processed but keep it in the jobMap for dependency resolution - q.processedJobs[job.GetID()] = struct{}{} + q.processedJobs = append(q.processedJobs, ProcessedJobInfo{ + ProcessedTime: time.Now(), + ID: job.GetID(), + }) return job, nil } diff --git a/queue_test.go b/queue_test.go index 1040d95..c263ac7 100644 --- a/queue_test.go +++ b/queue_test.go @@ -145,9 +145,12 @@ func TestProcessedJobs(t *testing.T) { t.Fatalf("Dequeue failed: %v", err) } - if _, exists := q.processedJobs[job1.GetID()]; !exists { - t.Fatalf("Job 1 not marked as processed") + for _, jobInfo := range q.processedJobs { + if jobInfo.ID == job1.GetID() { + t.Fatalf("Job 1 should not be in processedJobs") + } } + } func TestCyclicDependencies(t *testing.T) { diff --git a/runnable-shell.go b/runnable-shell.go index cb28e62..8d29e24 100644 --- a/runnable-shell.go +++ b/runnable-shell.go @@ -117,7 +117,7 @@ func (s *ShellRunnable) GetType() string { func (c *ShellRunnable) GetPersistence() RunnableImport { data := JSONMap{ - "script_path": c.ScriptPath, + "scriptPath": c.ScriptPath, "script": c.Script, } diff --git a/schedule-cron.go b/schedule-cron.go new file mode 100644 index 0000000..4a99d7e --- /dev/null +++ b/schedule-cron.go @@ -0,0 +1,94 @@ +package jobqueue + +import ( + "fmt" + "github.com/robfig/cron/v3" +) + +// CronScheduler is a scheduler that uses the cron library to schedule jobs +type CronScheduler struct { + cron *cron.Cron + Spec string + jobs map[JobID]cron.EntryID +} + +func (s *CronScheduler) Schedule(job GenericJob, eventBus *EventBus) error { + var err error + + if s.cron == nil { + return ErrCronNotInitialized + } + + if s.jobs == nil { + s.jobs = make(map[JobID]cron.EntryID) + } + + id := job.GetID() + if _, ok := s.jobs[id]; ok { + return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id) + } + + entryId, err := s.cron.AddFunc(s.Spec, func() { + if !job.IsPaused() { + eventBus.Publish(QueueJob, job) + } + }) + + s.jobs[id] = entryId + + if err != nil { + return err + } + + return nil +} + +func (s *CronScheduler) GetType() string { + return "Cron" +} + +func (s *CronScheduler) IsAdHoc() bool { + return false +} + +func (s *CronScheduler) Cancel(id JobID) error { + + if s.jobs == nil { + return nil + } + + if entryId, ok := s.jobs[id]; ok { + s.cron.Remove(entryId) + } + + return nil +} + +func (s *CronScheduler) CancelAll() error { + if s.jobs == nil { + return nil + } + + for _, entryId := range s.jobs { + s.cron.Remove(entryId) + } + + s.jobs = nil + return nil +} + +func (s *CronScheduler) JobExists(id JobID) bool { + if s.jobs == nil { + return false + } + + _, ok := s.jobs[id] + return ok +} + +func (s *CronScheduler) GetPersistence() SchedulerPersistence { + return SchedulerPersistence{ + Type: s.GetType(), + Spec: s.Spec, + } +} diff --git a/schedule-delay.go b/schedule-delay.go new file mode 100644 index 0000000..3c89425 --- /dev/null +++ b/schedule-delay.go @@ -0,0 +1,99 @@ +package jobqueue + +import ( + "fmt" + "time" +) + +// DelayScheduler is a scheduler that schedules a job after a delay +type DelayScheduler struct { + Delay time.Duration + jobs map[JobID]StopChan +} + +func (s *DelayScheduler) Schedule(job GenericJob, eventBus *EventBus) error { + + if s.jobs == nil { + s.jobs = make(map[JobID]StopChan) + } + + id := job.GetID() + if _, ok := s.jobs[id]; ok { + return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id) + } + + stopChan := make(StopChan) + s.jobs[id] = stopChan + + timer := time.NewTimer(s.Delay) + + go func() { + select { + case <-timer.C: + if !job.IsPaused() { + eventBus.Publish(QueueJob, job) + } + case <-stopChan: + timer.Stop() + return + } + }() + + return nil +} + +func (s *DelayScheduler) GetType() string { + return "Delay" +} + +func (s *DelayScheduler) IsAdHoc() bool { + return true +} + +func (s *DelayScheduler) Cancel(id JobID) error { + if s.jobs == nil { + return nil + } + + if stopChan, ok := s.jobs[id]; ok { + select { + case stopChan <- true: + default: + } + delete(s.jobs, id) + } + + return nil +} + +func (s *DelayScheduler) CancelAll() error { + if s.jobs == nil { + return nil + } + + for _, stopChan := range s.jobs { + select { + case stopChan <- true: + default: + } + } + + s.jobs = nil + return nil +} + +func (s *DelayScheduler) JobExists(id JobID) bool { + if s.jobs == nil { + return false + } + + _, ok := s.jobs[id] + return ok +} + +func (s *DelayScheduler) GetPersistence() SchedulerPersistence { + return SchedulerPersistence{ + Type: s.GetType(), + Delay: s.Delay, + } +} diff --git a/schedule-interval.go b/schedule-interval.go new file mode 100644 index 0000000..3029824 --- /dev/null +++ b/schedule-interval.go @@ -0,0 +1,108 @@ +package jobqueue + +import ( + "fmt" + "time" +) + +// IntervalScheduler is a scheduler that schedules a job at a fixed interval +type IntervalScheduler struct { + Interval time.Duration + jobs map[JobID]StopChan +} + +func (s *IntervalScheduler) Schedule(job GenericJob, eventBus *EventBus) error { + + if s.Interval <= 0 { + return fmt.Errorf("invalid interval: %v", s.Interval) + } + + if s.jobs == nil { + s.jobs = make(map[JobID]StopChan) + } + + id := job.GetID() + if _, ok := s.jobs[id]; ok { + return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id) + } + + stopChan := make(StopChan) + s.jobs[id] = stopChan + + ticker := time.NewTicker(s.Interval) + go func() { + for { + select { + case <-ticker.C: + + if !job.IsPaused() { + eventBus.Publish(QueueJob, job) + } + + case <-stopChan: + ticker.Stop() + return + } + } + }() + + return nil +} + +func (s *IntervalScheduler) GetType() string { + return "Interval" +} + +func (s *IntervalScheduler) IsAdHoc() bool { + return false +} + +func (s *IntervalScheduler) Cancel(id JobID) error { + if s.jobs == nil { + return nil + } + + if stopChan, ok := s.jobs[id]; ok { + select { + case stopChan <- true: + default: + } + + delete(s.jobs, id) + } + + return nil +} + +func (s *IntervalScheduler) CancelAll() error { + if s.jobs == nil { + return nil + } + + for _, stopChan := range s.jobs { + + select { + case stopChan <- true: + default: + } + } + + s.jobs = nil + return nil +} + +func (s *IntervalScheduler) JobExists(id JobID) bool { + if s.jobs == nil { + return false + } + + _, ok := s.jobs[id] + return ok +} + +func (s *IntervalScheduler) GetPersistence() SchedulerPersistence { + return SchedulerPersistence{ + Type: s.GetType(), + Interval: s.Interval, + } +} diff --git a/schedule-time.go b/schedule-time.go new file mode 100644 index 0000000..b49efa3 --- /dev/null +++ b/schedule-time.go @@ -0,0 +1,112 @@ +package jobqueue + +import ( + "fmt" + "time" +) + +// TimeScheduler is a scheduler that schedules at a specific time +type TimeScheduler struct { + Time time.Time + jobs map[JobID]StopChan + executed bool +} + +func (s *TimeScheduler) Schedule(job GenericJob, eventBus *EventBus) error { + if s.executed { + return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, job.GetID()) + } + + if s.Time.Before(time.Now()) { + return fmt.Errorf("%w: scheduled time is in the past", ErrInvalidTime) + } + + if s.jobs == nil { + s.jobs = make(map[JobID]StopChan) + } + + id := job.GetID() + if _, ok := s.jobs[id]; ok { + return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id) + } + + stopChan := make(StopChan) + s.jobs[id] = stopChan + + timer := time.NewTimer(s.Time.Sub(time.Now())) + + go func() { + select { + case <-timer.C: + if !job.IsPaused() { + eventBus.Publish(QueueJob, job) + s.executed = true + } else { + timer.Stop() + stopChan <- true + } + case <-stopChan: + timer.Stop() + return + } + }() + + return nil +} + +func (s *TimeScheduler) GetType() string { + return "Time" +} + +func (s *TimeScheduler) IsAdHoc() bool { + return false +} + +func (s *TimeScheduler) Cancel(id JobID) error { + if s.jobs == nil { + return nil + } + + if stopChan, ok := s.jobs[id]; ok { + select { + case stopChan <- true: + default: + } + delete(s.jobs, id) + } + + return nil +} + +func (s *TimeScheduler) CancelAll() error { + if s.jobs == nil { + return nil + } + + for _, stopChan := range s.jobs { + select { + case stopChan <- true: + default: + } + } + + s.jobs = nil + return nil +} + +func (s *TimeScheduler) JobExists(id JobID) bool { + if s.jobs == nil { + return false + } + + _, ok := s.jobs[id] + return ok +} + +func (s *TimeScheduler) GetPersistence() SchedulerPersistence { + return SchedulerPersistence{ + Type: s.GetType(), + Time: &s.Time, + Executed: s.executed, + } +} diff --git a/scheduler-event.go b/scheduler-event.go new file mode 100644 index 0000000..80ca022 --- /dev/null +++ b/scheduler-event.go @@ -0,0 +1,104 @@ +package jobqueue + +import "fmt" + +// EventScheduler is a scheduler that schedules a job when an event is received +type EventScheduler struct { + Event EventName + EventBus *EventBus + jobs map[JobID]StopChan +} + +func (s *EventScheduler) Schedule(job GenericJob, eventBus *EventBus) error { + ch := make(chan interface{}) + + if s.EventBus != nil { + s.EventBus.Subscribe(s.Event, ch) + } else { + eventBus.Subscribe(s.Event, ch) + } + + if s.jobs == nil { + s.jobs = make(map[JobID]StopChan) + } + + id := job.GetID() + if _, ok := s.jobs[id]; ok { + return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id) + } + + stopChan := make(StopChan) + s.jobs[id] = stopChan + + go func() { + for { + select { + case <-ch: + if !job.IsPaused() { + eventBus.Publish(QueueJob, job) + } + case <-stopChan: + eventBus.Unsubscribe(s.Event, ch) + return + } + } + }() + return nil +} + +func (s *EventScheduler) GetType() string { + return "Event" +} + +func (s *EventScheduler) IsAdHoc() bool { + return false +} + +func (s *EventScheduler) Cancel(id JobID) error { + if s.jobs == nil { + return nil + } + + if stopChan, ok := s.jobs[id]; ok { + select { + case stopChan <- true: + default: + } + delete(s.jobs, id) + } + + return nil + +} + +func (s *EventScheduler) CancelAll() error { + if s.jobs == nil { + return nil + } + + for _, stopChan := range s.jobs { + select { + case stopChan <- true: + default: + } + } + + s.jobs = nil + return nil +} + +func (s *EventScheduler) JobExists(id JobID) bool { + if s.jobs == nil { + return false + } + + _, ok := s.jobs[id] + return ok +} + +func (s *EventScheduler) GetPersistence() SchedulerPersistence { + return SchedulerPersistence{ + Type: s.GetType(), + Event: s.Event, + } +} diff --git a/scheduler-inotify.go b/scheduler-inotify.go new file mode 100644 index 0000000..417c99e --- /dev/null +++ b/scheduler-inotify.go @@ -0,0 +1,139 @@ +package jobqueue + +import ( + "fmt" + "github.com/fsnotify/fsnotify" + "os" +) + +// InotifyScheduler is a scheduler that schedules a job when a file changes +type InotifyScheduler struct { + Path string + EventFlags fsnotify.Op + jobs map[JobID]StopChan +} + +func (s *InotifyScheduler) Schedule(job GenericJob, eventBus *EventBus) error { + + if s.EventFlags == 0 { + return fmt.Errorf("event flags are empty, at least one flag must be set. Valid flags are: 1 (Create), 2 (Write), 4 (Remove), 8 (Rename), 16 (Chmod)") + } + + if s.Path == "" { + return fmt.Errorf("path is empty") + } + + // exists and a directory + if fi, err := os.Stat(s.Path); err != nil || !fi.IsDir() { + return fmt.Errorf("path %s does not exist or is not a directory", s.Path) + } + + if s.jobs == nil { + s.jobs = make(map[JobID]StopChan) + } + + id := job.GetID() + if _, ok := s.jobs[id]; ok { + return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id) + } + + stopChan := make(StopChan) + s.jobs[id] = stopChan + + watcher, err := fsnotify.NewWatcher() + if err != nil { + return err + } + + go func() { + for { + select { + case event, ok := <-watcher.Events: + if !ok { + continue + } + + if event.Op&s.EventFlags != 0 { + if !job.IsPaused() { + eventBus.Publish(QueueJob, job) + } + } + + case _, _ = <-watcher.Errors: + + case <-stopChan: + _ = watcher.Close() + return + } + + } + }() + + err = watcher.Add(s.Path) + if err != nil { + select { + case stopChan <- true: + default: + } + return err + } + + return nil +} + +func (s *InotifyScheduler) GetType() string { + return "Inotify" +} + +func (s *InotifyScheduler) IsAdHoc() bool { + return false +} + +func (s *InotifyScheduler) Cancel(id JobID) error { + if s.jobs == nil { + return nil + } + + if stopChan, ok := s.jobs[id]; ok { + select { + case stopChan <- true: + default: + } + delete(s.jobs, id) + } + + return nil +} + +func (s *InotifyScheduler) CancelAll() error { + if s.jobs == nil { + return nil + } + + for _, stopChan := range s.jobs { + select { + case stopChan <- true: + default: + } + } + + s.jobs = nil + return nil +} + +func (s *InotifyScheduler) JobExists(id JobID) bool { + if s.jobs == nil { + return false + } + + _, ok := s.jobs[id] + return ok +} + +func (s *InotifyScheduler) GetPersistence() SchedulerPersistence { + return SchedulerPersistence{ + Type: s.GetType(), + Path: s.Path, + EventFlags: s.EventFlags, + } +} diff --git a/scheduler-instant.go b/scheduler-instant.go new file mode 100644 index 0000000..b209767 --- /dev/null +++ b/scheduler-instant.go @@ -0,0 +1,37 @@ +package jobqueue + +// InstantScheduler is a scheduler that schedules a job instantly +type InstantScheduler struct{} + +func (s *InstantScheduler) Schedule(job GenericJob, eventBus *EventBus) error { + if !job.IsPaused() { + eventBus.Publish(QueueJob, job) + } + return nil +} + +func (s *InstantScheduler) GetType() string { + return "Instant" +} + +func (s *InstantScheduler) IsAdHoc() bool { + return true +} + +func (s *InstantScheduler) Cancel(id JobID) error { + return nil +} + +func (s *InstantScheduler) CancelAll() error { + return nil +} + +func (s *InstantScheduler) JobExists(id JobID) bool { + return false +} + +func (s *InstantScheduler) GetPersistence() SchedulerPersistence { + return SchedulerPersistence{ + Type: s.GetType(), + } +} diff --git a/scheduler.go b/scheduler.go index e40ee14..3215424 100644 --- a/scheduler.go +++ b/scheduler.go @@ -2,8 +2,7 @@ package jobqueue import ( "encoding/json" - "fmt" - "github.com/robfig/cron/v3" + "github.com/fsnotify/fsnotify" "time" ) @@ -23,13 +22,15 @@ type Scheduler interface { } type SchedulerPersistence struct { - Type string `yaml:"type" json:"type" gorm:"column:type"` - Interval time.Duration `yaml:"interval,omitempty" json:"interval,omitempty" gorm:"column:interval"` - Spec string `yaml:"spec,omitempty" json:"spec,omitempty" gorm:"column:spec"` - Delay time.Duration `yaml:"delay,omitempty" json:"delay,omitempty" gorm:"column:delay"` - Event EventName `yaml:"event,omitempty" json:"event,omitempty" gorm:"column:event"` - Time *time.Time `yaml:"time,omitempty" json:"time,omitempty" gorm:"column:time"` - Executed bool `yaml:"executed,omitempty" json:"executed,omitempty" gorm:"column:executed"` + Type string `yaml:"type" json:"type" gorm:"column:type"` + Interval time.Duration `yaml:"interval,omitempty" json:"interval,omitempty" gorm:"column:interval"` + Spec string `yaml:"spec,omitempty" json:"spec,omitempty" gorm:"column:spec"` + Delay time.Duration `yaml:"delay,omitempty" json:"delay,omitempty" gorm:"column:delay"` + Event EventName `yaml:"event,omitempty" json:"event,omitempty" gorm:"column:event"` + Time *time.Time `yaml:"time,omitempty" json:"time,omitempty" gorm:"column:time"` + Executed bool `yaml:"executed,omitempty" json:"executed,omitempty" gorm:"column:executed"` + Path string `yaml:"path,omitempty" json:"path,omitempty" gorm:"column:path"` + EventFlags fsnotify.Op `yaml:"eventFlags,omitempty" json:"eventFlags,omitempty" gorm:"column:eventFlags"` } // UnmarshalJSON implements the json.Unmarshaler interface @@ -64,529 +65,3 @@ func (sp *SchedulerPersistence) UnmarshalJSON(data []byte) error { return nil } - -// IntervalScheduler is a scheduler that schedules a job at a fixed interval -type IntervalScheduler struct { - Interval time.Duration - jobs map[JobID]StopChan -} - -func (s *IntervalScheduler) Schedule(job GenericJob, eventBus *EventBus) error { - - if s.Interval <= 0 { - return fmt.Errorf("invalid interval: %v", s.Interval) - } - - if s.jobs == nil { - s.jobs = make(map[JobID]StopChan) - } - - id := job.GetID() - if _, ok := s.jobs[id]; ok { - return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id) - } - - stopChan := make(StopChan) - s.jobs[id] = stopChan - - ticker := time.NewTicker(s.Interval) - go func() { - for { - select { - case <-ticker.C: - - if !job.IsPaused() { - eventBus.Publish(QueueJob, job) - } - - case <-stopChan: - ticker.Stop() - return - } - } - }() - - return nil -} - -func (s *IntervalScheduler) GetType() string { - return "Interval" -} - -func (s *IntervalScheduler) IsAdHoc() bool { - return false -} - -func (s *IntervalScheduler) Cancel(id JobID) error { - if s.jobs == nil { - return nil - } - - if stopChan, ok := s.jobs[id]; ok { - select { - case stopChan <- true: - default: - } - - delete(s.jobs, id) - } - - return nil -} - -func (s *IntervalScheduler) CancelAll() error { - if s.jobs == nil { - return nil - } - - for _, stopChan := range s.jobs { - - select { - case stopChan <- true: - default: - } - } - - s.jobs = nil - return nil -} - -func (s *IntervalScheduler) JobExists(id JobID) bool { - if s.jobs == nil { - return false - } - - _, ok := s.jobs[id] - return ok -} - -func (s *IntervalScheduler) GetPersistence() SchedulerPersistence { - return SchedulerPersistence{ - Type: s.GetType(), - Interval: s.Interval, - } -} - -// CronScheduler is a scheduler that uses the cron library to schedule jobs -type CronScheduler struct { - cron *cron.Cron - Spec string - jobs map[JobID]cron.EntryID -} - -func (s *CronScheduler) Schedule(job GenericJob, eventBus *EventBus) error { - var err error - - if s.cron == nil { - return ErrCronNotInitialized - } - - if s.jobs == nil { - s.jobs = make(map[JobID]cron.EntryID) - } - - id := job.GetID() - if _, ok := s.jobs[id]; ok { - return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id) - } - - entryId, err := s.cron.AddFunc(s.Spec, func() { - if !job.IsPaused() { - eventBus.Publish(QueueJob, job) - } - }) - - s.jobs[id] = entryId - - if err != nil { - return err - } - - return nil -} - -func (s *CronScheduler) GetType() string { - return "Cron" -} - -func (s *CronScheduler) IsAdHoc() bool { - return false -} - -func (s *CronScheduler) Cancel(id JobID) error { - - if s.jobs == nil { - return nil - } - - if entryId, ok := s.jobs[id]; ok { - s.cron.Remove(entryId) - } - - return nil -} - -func (s *CronScheduler) CancelAll() error { - if s.jobs == nil { - return nil - } - - for _, entryId := range s.jobs { - s.cron.Remove(entryId) - } - - s.jobs = nil - return nil -} - -func (s *CronScheduler) JobExists(id JobID) bool { - if s.jobs == nil { - return false - } - - _, ok := s.jobs[id] - return ok -} - -func (s *CronScheduler) GetPersistence() SchedulerPersistence { - return SchedulerPersistence{ - Type: s.GetType(), - Spec: s.Spec, - } -} - -// DelayScheduler is a scheduler that schedules a job after a delay -type DelayScheduler struct { - Delay time.Duration - jobs map[JobID]StopChan -} - -func (s *DelayScheduler) Schedule(job GenericJob, eventBus *EventBus) error { - - if s.jobs == nil { - s.jobs = make(map[JobID]StopChan) - } - - id := job.GetID() - if _, ok := s.jobs[id]; ok { - return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id) - } - - stopChan := make(StopChan) - s.jobs[id] = stopChan - - timer := time.NewTimer(s.Delay) - - go func() { - select { - case <-timer.C: - if !job.IsPaused() { - eventBus.Publish(QueueJob, job) - } - case <-stopChan: - timer.Stop() - return - } - }() - - return nil -} - -func (s *DelayScheduler) GetType() string { - return "Delay" -} - -func (s *DelayScheduler) IsAdHoc() bool { - return true -} - -func (s *DelayScheduler) Cancel(id JobID) error { - if s.jobs == nil { - return nil - } - - if stopChan, ok := s.jobs[id]; ok { - select { - case stopChan <- true: - default: - } - delete(s.jobs, id) - } - - return nil -} - -func (s *DelayScheduler) CancelAll() error { - if s.jobs == nil { - return nil - } - - for _, stopChan := range s.jobs { - select { - case stopChan <- true: - default: - } - } - - s.jobs = nil - return nil -} - -func (s *DelayScheduler) JobExists(id JobID) bool { - if s.jobs == nil { - return false - } - - _, ok := s.jobs[id] - return ok -} - -func (s *DelayScheduler) GetPersistence() SchedulerPersistence { - return SchedulerPersistence{ - Type: s.GetType(), - Delay: s.Delay, - } -} - -// TimeScheduler is a scheduler that schedules at a specific time -type TimeScheduler struct { - Time time.Time - jobs map[JobID]StopChan - executed bool -} - -func (s *TimeScheduler) Schedule(job GenericJob, eventBus *EventBus) error { - if s.executed { - return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, job.GetID()) - } - - if s.Time.Before(time.Now()) { - return fmt.Errorf("%w: scheduled time is in the past", ErrInvalidTime) - } - - if s.jobs == nil { - s.jobs = make(map[JobID]StopChan) - } - - id := job.GetID() - if _, ok := s.jobs[id]; ok { - return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id) - } - - stopChan := make(StopChan) - s.jobs[id] = stopChan - - timer := time.NewTimer(s.Time.Sub(time.Now())) - - go func() { - select { - case <-timer.C: - if !job.IsPaused() { - eventBus.Publish(QueueJob, job) - s.executed = true - } else { - timer.Stop() - stopChan <- true - } - case <-stopChan: - timer.Stop() - return - } - }() - - return nil -} - -func (s *TimeScheduler) GetType() string { - return "Time" -} - -func (s *TimeScheduler) IsAdHoc() bool { - return false -} - -func (s *TimeScheduler) Cancel(id JobID) error { - if s.jobs == nil { - return nil - } - - if stopChan, ok := s.jobs[id]; ok { - select { - case stopChan <- true: - default: - } - delete(s.jobs, id) - } - - return nil -} - -func (s *TimeScheduler) CancelAll() error { - if s.jobs == nil { - return nil - } - - for _, stopChan := range s.jobs { - select { - case stopChan <- true: - default: - } - } - - s.jobs = nil - return nil -} - -func (s *TimeScheduler) JobExists(id JobID) bool { - if s.jobs == nil { - return false - } - - _, ok := s.jobs[id] - return ok -} - -func (s *TimeScheduler) GetPersistence() SchedulerPersistence { - return SchedulerPersistence{ - Type: s.GetType(), - Time: &s.Time, - Executed: s.executed, - } -} - -// EventScheduler is a scheduler that schedules a job when an event is received -type EventScheduler struct { - Event EventName - EventBus *EventBus - jobs map[JobID]StopChan -} - -func (s *EventScheduler) Schedule(job GenericJob, eventBus *EventBus) error { - ch := make(chan interface{}) - - if s.EventBus != nil { - s.EventBus.Subscribe(s.Event, ch) - } else { - eventBus.Subscribe(s.Event, ch) - } - - if s.jobs == nil { - s.jobs = make(map[JobID]StopChan) - } - - id := job.GetID() - if _, ok := s.jobs[id]; ok { - return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id) - } - - stopChan := make(StopChan) - s.jobs[id] = stopChan - - go func() { - for { - select { - case <-ch: - if !job.IsPaused() { - eventBus.Publish(QueueJob, job) - } - case <-stopChan: - eventBus.Unsubscribe(s.Event, ch) - return - } - } - }() - return nil -} - -func (s *EventScheduler) GetType() string { - return "Event" -} - -func (s *EventScheduler) IsAdHoc() bool { - return false -} - -func (s *EventScheduler) Cancel(id JobID) error { - if s.jobs == nil { - return nil - } - - if stopChan, ok := s.jobs[id]; ok { - select { - case stopChan <- true: - default: - } - delete(s.jobs, id) - } - - return nil - -} - -func (s *EventScheduler) CancelAll() error { - if s.jobs == nil { - return nil - } - - for _, stopChan := range s.jobs { - select { - case stopChan <- true: - default: - } - } - - s.jobs = nil - return nil -} - -func (s *EventScheduler) JobExists(id JobID) bool { - if s.jobs == nil { - return false - } - - _, ok := s.jobs[id] - return ok -} - -func (s *EventScheduler) GetPersistence() SchedulerPersistence { - return SchedulerPersistence{ - Type: s.GetType(), - Event: s.Event, - } -} - -// InstantScheduler is a scheduler that schedules a job instantly -type InstantScheduler struct{} - -func (s *InstantScheduler) Schedule(job GenericJob, eventBus *EventBus) error { - if !job.IsPaused() { - eventBus.Publish(QueueJob, job) - } - return nil -} - -func (s *InstantScheduler) GetType() string { - return "Instant" -} - -func (s *InstantScheduler) IsAdHoc() bool { - return true -} - -func (s *InstantScheduler) Cancel(id JobID) error { - return nil -} - -func (s *InstantScheduler) CancelAll() error { - return nil -} - -func (s *InstantScheduler) JobExists(id JobID) bool { - return false -} - -func (s *InstantScheduler) GetPersistence() SchedulerPersistence { - return SchedulerPersistence{ - Type: s.GetType(), - } -} diff --git a/scheduler_test.go b/scheduler_test.go index 0ed3861..f5e3791 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -2,8 +2,10 @@ package jobqueue import ( "encoding/json" + "github.com/fsnotify/fsnotify" "github.com/robfig/cron/v3" "github.com/stretchr/testify/assert" + "os" "sync/atomic" "testing" "time" @@ -366,3 +368,43 @@ func parseTimeForTesting(t *testing.T, value string) *time.Time { t.Fatalf("Failed to parse time '%s' in any known format", value) return nil } + +func TestInotifyScheduler_BasicFunctionality(t *testing.T) { + var count int32 + eventBus := NewEventBus() + + tmpPath := t.TempDir() + + inotifyScheduler := InotifyScheduler{Path: tmpPath, EventFlags: fsnotify.Create | fsnotify.Write | fsnotify.Remove} + + job := NewJob[DummyResult]("test-job", &DummyRunnable{}) + + genericJob := GenericJob(job) + _ = inotifyScheduler.Schedule(genericJob, eventBus) + + jobChannel := make(chan interface{}) + + go func() { + for _ = range jobChannel { + atomic.AddInt32(&count, 1) + } + }() + + eventBus.Subscribe(QueueJob, jobChannel) + + time.Sleep(time.Millisecond * 100) + + tmpFile := tmpPath + "/test.txt" + _, err := os.Create(tmpFile) + assert.NoError(t, err) + + time.Sleep(time.Millisecond * 100) + err = os.Remove(tmpFile) + assert.NoError(t, err) + + time.Sleep(time.Millisecond * 100) + + if atomic.LoadInt32(&count) != 2 { + t.Errorf("Expected to run 2 times, ran %d times", count) + } +} -- GitLab