Skip to content
Snippets Groups Projects
Select Git revision
  • f1994b9a2676ffce64cfe66c31a80a4036387864
  • master default protected
  • 1.31
  • 4.38.7
  • 4.38.6
  • 4.38.5
  • 4.38.4
  • 4.38.3
  • 4.38.2
  • 4.38.1
  • 4.38.0
  • 4.37.2
  • 4.37.1
  • 4.37.0
  • 4.36.0
  • 4.35.0
  • 4.34.1
  • 4.34.0
  • 4.33.1
  • 4.33.0
  • 4.32.2
  • 4.32.1
  • 4.32.0
23 results

index.html

Blame
  • scheduler-inotify.go 2.38 KiB
    // Copyright 2023 schukai GmbH
    // SPDX-License-Identifier: AGPL-3.0
    
    package jobqueue
    
    import (
    	"fmt"
    	"github.com/fsnotify/fsnotify"
    	"os"
    )
    
    // InotifyScheduler is a scheduler that schedules a job when a file changes
    type InotifyScheduler struct {
    	Path       string
    	EventFlags fsnotify.Op
    	jobs       map[JobID]StopChan
    }
    
    func (s *InotifyScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
    
    	if s.EventFlags == 0 {
    		return fmt.Errorf("event flags are empty, at least one flag must be set. Valid flags are: 1 (Create), 2 (Write), 4 (Remove), 8 (Rename), 16 (Chmod)")
    	}
    
    	if s.Path == "" {
    		return fmt.Errorf("path is empty")
    	}
    
    	// exists and a directory
    	if fi, err := os.Stat(s.Path); err != nil || !fi.IsDir() {
    		return fmt.Errorf("path %s does not exist or is not a directory", s.Path)
    	}
    
    	if s.jobs == nil {
    		s.jobs = make(map[JobID]StopChan)
    	}
    
    	id := job.GetID()
    	if _, ok := s.jobs[id]; ok {
    		return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
    	}
    
    	stopChan := make(StopChan)
    	s.jobs[id] = stopChan
    
    	watcher, err := fsnotify.NewWatcher()
    	if err != nil {
    		return err
    	}
    
    	go func() {
    		for {
    			select {
    			case event, ok := <-watcher.Events:
    				if !ok {
    					continue
    				}
    
    				if event.Op&s.EventFlags != 0 {
    					if !job.IsPaused() {
    						eventBus.Publish(QueueJob, job)
    					}
    				}
    
    			case _, _ = <-watcher.Errors:
    
    			case <-stopChan:
    				_ = watcher.Close()
    				return
    			}
    
    		}
    	}()
    
    	err = watcher.Add(s.Path)
    	if err != nil {
    		select {
    		case stopChan <- true:
    		default:
    		}
    		return err
    	}
    
    	return nil
    }
    
    func (s *InotifyScheduler) GetType() string {
    	return "Inotify"
    }
    
    func (s *InotifyScheduler) IsAdHoc() bool {
    	return false
    }
    
    func (s *InotifyScheduler) Cancel(id JobID) error {
    	if s.jobs == nil {
    		return nil
    	}
    
    	if stopChan, ok := s.jobs[id]; ok {
    		select {
    		case stopChan <- true:
    		default:
    		}
    		delete(s.jobs, id)
    	}
    
    	return nil
    }
    
    func (s *InotifyScheduler) CancelAll() error {
    	if s.jobs == nil {
    		return nil
    	}
    
    	for _, stopChan := range s.jobs {
    		select {
    		case stopChan <- true:
    		default:
    		}
    	}
    
    	s.jobs = nil
    	return nil
    }
    
    func (s *InotifyScheduler) JobExists(id JobID) bool {
    	if s.jobs == nil {
    		return false
    	}
    
    	_, ok := s.jobs[id]
    	return ok
    }
    
    func (s *InotifyScheduler) GetPersistence() SchedulerPersistence {
    	return SchedulerPersistence{
    		Type:       s.GetType(),
    		Path:       s.Path,
    		EventFlags: s.EventFlags,
    	}
    }