Select Git revision
event-bus_test.go
scheduler-inotify.go 2.38 KiB
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
"fmt"
"github.com/fsnotify/fsnotify"
"os"
)
// InotifyScheduler is a scheduler that schedules a job when a file changes
type InotifyScheduler struct {
Path string
EventFlags fsnotify.Op
jobs map[JobID]StopChan
}
func (s *InotifyScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
if s.EventFlags == 0 {
return fmt.Errorf("event flags are empty, at least one flag must be set. Valid flags are: 1 (Create), 2 (Write), 4 (Remove), 8 (Rename), 16 (Chmod)")
}
if s.Path == "" {
return fmt.Errorf("path is empty")
}
// exists and a directory
if fi, err := os.Stat(s.Path); err != nil || !fi.IsDir() {
return fmt.Errorf("path %s does not exist or is not a directory", s.Path)
}
if s.jobs == nil {
s.jobs = make(map[JobID]StopChan)
}
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
stopChan := make(StopChan)
s.jobs[id] = stopChan
watcher, err := fsnotify.NewWatcher()
if err != nil {
return err
}
go func() {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
continue
}
if event.Op&s.EventFlags != 0 {
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
}
}
case _, _ = <-watcher.Errors:
case <-stopChan:
_ = watcher.Close()
return
}
}
}()
err = watcher.Add(s.Path)
if err != nil {
select {
case stopChan <- true:
default:
}
return err
}
return nil
}
func (s *InotifyScheduler) GetType() string {
return "Inotify"
}
func (s *InotifyScheduler) IsAdHoc() bool {
return false
}
func (s *InotifyScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if stopChan, ok := s.jobs[id]; ok {
select {
case stopChan <- true:
default:
}
delete(s.jobs, id)
}
return nil
}
func (s *InotifyScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, stopChan := range s.jobs {
select {
case stopChan <- true:
default:
}
}
s.jobs = nil
return nil
}
func (s *InotifyScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
func (s *InotifyScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
Path: s.Path,
EventFlags: s.EventFlags,
}
}