Skip to content
Snippets Groups Projects
Select Git revision
  • 11fa97747f3dfac0ae80283e9974c5c79472ead4
  • master default protected
  • 1.31
  • 4.35.0
  • 4.34.1
  • 4.34.0
  • 4.33.1
  • 4.33.0
  • 4.32.2
  • 4.32.1
  • 4.32.0
  • 4.31.0
  • 4.30.1
  • 4.30.0
  • 4.29.1
  • 4.29.0
  • 4.28.0
  • 4.27.0
  • 4.26.0
  • 4.25.5
  • 4.25.4
  • 4.25.3
  • 4.25.2
23 results

Monster.Namespace.html

Blame
  • scheduler-event.go 1.80 KiB
    // Copyright 2023 schukai GmbH
    // SPDX-License-Identifier: AGPL-3.0
    
    package jobqueue
    
    import "fmt"
    
    // EventScheduler is a scheduler that schedules a job when an event is received
    type EventScheduler struct {
    	Event    EventName
    	EventBus *EventBus
    	jobs     map[JobID]StopChan
    }
    
    func (s *EventScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
    	ch := make(chan interface{})
    
    	if s.EventBus != nil {
    		s.EventBus.Subscribe(s.Event, ch)
    	} else {
    		eventBus.Subscribe(s.Event, ch)
    	}
    
    	if s.jobs == nil {
    		s.jobs = make(map[JobID]StopChan)
    	}
    
    	id := job.GetID()
    	if _, ok := s.jobs[id]; ok {
    		return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
    	}
    
    	stopChan := make(StopChan)
    	s.jobs[id] = stopChan
    
    	go func() {
    		for {
    			select {
    			case <-ch:
    				if !job.IsPaused() {
    					Info("EventScheduler: received event %s, scheduling job %s", s.Event, id)
    					eventBus.Publish(QueueJob, job)
    				}
    			case <-stopChan:
    				eventBus.Unsubscribe(s.Event, ch)
    				return
    			}
    		}
    	}()
    	return nil
    }
    
    func (s *EventScheduler) GetType() string {
    	return "Event"
    }
    
    func (s *EventScheduler) IsAdHoc() bool {
    	return false
    }
    
    func (s *EventScheduler) Cancel(id JobID) error {
    	if s.jobs == nil {
    		return nil
    	}
    
    	if stopChan, ok := s.jobs[id]; ok {
    		select {
    		case stopChan <- true:
    		default:
    		}
    		delete(s.jobs, id)
    	}
    
    	return nil
    
    }
    
    func (s *EventScheduler) CancelAll() error {
    	if s.jobs == nil {
    		return nil
    	}
    
    	for _, stopChan := range s.jobs {
    		select {
    		case stopChan <- true:
    		default:
    		}
    	}
    
    	s.jobs = nil
    	return nil
    }
    
    func (s *EventScheduler) JobExists(id JobID) bool {
    	if s.jobs == nil {
    		return false
    	}
    
    	_, ok := s.jobs[id]
    	return ok
    }
    
    func (s *EventScheduler) GetPersistence() SchedulerPersistence {
    	return SchedulerPersistence{
    		Type:  s.GetType(),
    		Event: s.Event,
    	}
    }