Skip to content
Snippets Groups Projects
Select Git revision
  • 9e4f7dc34d470a34272ab3f7eb89a8b18d5f8d3e
  • master default protected
  • v1.23.2
  • v1.23.1
  • v1.23.0
  • v1.22.0
  • v1.21.1
  • v1.21.0
  • v1.20.3
  • v1.20.2
  • v1.20.1
  • v1.20.0
  • v1.19.4
  • v1.19.3
  • v1.19.2
  • v1.19.1
  • v1.19.0
  • v1.18.2
  • v1.18.1
  • v1.18.0
  • v1.17.0
  • v1.16.1
22 results

job-queues.iml

Blame
  • jobs.go 4.32 KiB
    package jobqueue
    
    import (
    	"sync"
    	"time"
    )
    
    type JobsInterface interface {
    	GetJobs() map[JobIDType]ReadOnlyJob
    
    	GetExecutableJobs() map[JobIDType]ReadOnlyJob
    
    	AddJob(jobSpec JobSpecification, runnable Runnable) error
    
    	RemoveJob(id JobIDType) (bool, error)
    
    	GetJobStatus(id JobIDType) (JobStatus, error)
    
    	Cleanup()
    
    	GetFinishedJobs() map[JobIDType]ReadOnlyJob
    
    	GetFinishedJob(id JobIDType) ReadOnlyJob
    
    	RemoveFinishedJob(id JobIDType) (bool, error)
    
    	JobExists(id JobIDType) bool
    
    	GetJob(id JobIDType) ReadOnlyJob
    
    	GetJobsCount() int
    }
    
    type jobs struct {
    	jobs         map[JobIDType]*job
    	finishedJobs map[JobIDType]*job
    	mutex        sync.Mutex
    }
    
    // compile time check if jobs implements JobsInterface
    var _ JobsInterface = (*jobs)(nil)
    
    func (jq *jobs) GetJobsCount() int {
    	jq.mutex.Lock()
    	defer jq.mutex.Unlock()
    
    	return len(jq.jobs)
    }
    
    func (jq *jobs) Cleanup() {
    	jq.mutex.Lock()
    	defer jq.mutex.Unlock()
    
    	for id, job := range jq.jobs {
    		if job.Status == JobFinished {
    			jq.finishedJobs[id] = job
    			
    			delete(jq.jobs, id)
    		}
    	}
    }
    
    func (jq *jobs) GetFinishedJobs() map[JobIDType]ReadOnlyJob {
    	jq.mutex.Lock()
    	defer jq.mutex.Unlock()
    
    	jobs := make(map[JobIDType]ReadOnlyJob)
    	for id, job := range jq.finishedJobs {
    		jobs[id] = job // Implizites Casting zu ReadOnlyJob
    	}
    
    	return jobs
    }
    
    func (jq *jobs) GetFinishedJob(id JobIDType) ReadOnlyJob {
    	jq.mutex.Lock()
    	defer jq.mutex.Unlock()
    
    	if _, exists := jq.finishedJobs[id]; !exists {
    		return nil
    	}
    
    	return jq.finishedJobs[id]
    
    }
    
    func (jq *jobs) RemoveFinishedJob(id JobIDType) (bool, error) {
    	jq.mutex.Lock()
    	defer jq.mutex.Unlock()
    
    	if _, exists := jq.finishedJobs[id]; !exists {
    		return false, ErrJobNotFound
    	}
    
    	// Update internal data structures.
    	delete(jq.finishedJobs, id)
    	return true, nil
    }
    
    // GetJobs returns a map of all jobs.
    func (jq *jobs) GetJobs() map[JobIDType]ReadOnlyJob {
    	jq.mutex.Lock()
    	defer jq.mutex.Unlock()
    
    	jobs := make(map[JobIDType]ReadOnlyJob)
    	for id, job := range jq.jobs {
    		jobs[id] = job // Implizites Casting zu ReadOnlyJob
    	}
    
    	return jobs
    }
    
    // GetJobs returns a map of all jobs.
    func (jq *jobs) GetExecutableJobs() map[JobIDType]ReadOnlyJob {
    	jq.mutex.Lock()
    	defer jq.mutex.Unlock()
    
    	jobs := make(map[JobIDType]ReadOnlyJob)
    
    	tempJobs := make(map[JobIDType]*job)
    	for _, job := range jq.jobs {
    
    		if job.Status != JobPending {
    			continue
    		}
    
    		if job.NextRun.After(time.Now()) {
    			continue
    		}
    
    		tempJobs[job.Id] = job
    	}
    
    	sortedJobIDs, err := topologicalSortJobs(tempJobs)
    	if err != nil {
    		return nil
    	}
    
    	for _, id := range sortedJobIDs {
    		job := jq.jobs[id]
    		job.Status = JobScheduled
    		jobs[id] = jq.jobs[id]
    	}
    
    	return jobs
    }
    
    func (jq *jobs) JobExists(id JobIDType) bool {
    	jq.mutex.Lock()
    	defer jq.mutex.Unlock()
    
    	if _, exists := jq.jobs[id]; !exists {
    		return false
    	}
    
    	return true
    }
    
    func (jq *jobs) GetJob(id JobIDType) ReadOnlyJob {
    	jq.mutex.Lock()
    	defer jq.mutex.Unlock()
    
    	if _, exists := jq.jobs[id]; !exists {
    		return nil
    	}
    
    	return jq.jobs[id]
    
    }
    
    // NewJobs creates a new job queue.
    func NewJobs() *jobs {
    
    	jq := &jobs{
    		jobs:         make(map[JobIDType]*job),
    		finishedJobs: make(map[JobIDType]*job),
    		mutex:        sync.Mutex{},
    	}
    
    	return jq
    }
    
    // AddJob adds a new job to the queue.
    func (jq *jobs) AddJob(jobSpec JobSpecification, runnable Runnable) error {
    	jq.mutex.Lock()
    	defer jq.mutex.Unlock()
    
    	job := newJob(jobSpec)
    	job.runnable = runnable
    
    	if _, exists := jq.jobs[job.Id]; exists {
    		return ErrJobAlreadyExists
    	}
    
    	for _, dep := range job.Dependencies {
    		if _, exists := jq.jobs[dep]; !exists {
    			return ErrUnknownDependency
    		}
    	}
    
    	jq.jobs[job.Id] = job
    
    	return nil
    }
    
    // RemoveJob removes a job from the queue.
    func (jq *jobs) RemoveJob(id JobIDType) (bool, error) {
    	jq.mutex.Lock()
    	defer jq.mutex.Unlock()
    
    	if _, exists := jq.jobs[id]; !exists {
    		return false, ErrJobNotFound
    	}
    
    	// check if job is a dependency of another job
    	for _, job := range jq.jobs {
    		for _, dep := range job.Dependencies {
    			if dep == id {
    				return false, ErrJobIsDependency
    			}
    		}
    	}
    
    	// Update internal data structures.
    	delete(jq.jobs, id)
    	return true, nil
    }
    
    // GetJobStatus returns the status of a job.
    func (jq *jobs) GetJobStatus(id JobIDType) (JobStatus, error) {
    	jq.mutex.Lock()
    	defer jq.mutex.Unlock()
    
    	if _, exists := jq.jobs[id]; !exists {
    		return JobStatus(0), ErrJobNotFound
    	}
    
    	return jq.jobs[id].Status, nil
    
    }