Select Git revision
job-queues.iml
jobs.go 4.32 KiB
package jobqueue
import (
"sync"
"time"
)
type JobsInterface interface {
GetJobs() map[JobIDType]ReadOnlyJob
GetExecutableJobs() map[JobIDType]ReadOnlyJob
AddJob(jobSpec JobSpecification, runnable Runnable) error
RemoveJob(id JobIDType) (bool, error)
GetJobStatus(id JobIDType) (JobStatus, error)
Cleanup()
GetFinishedJobs() map[JobIDType]ReadOnlyJob
GetFinishedJob(id JobIDType) ReadOnlyJob
RemoveFinishedJob(id JobIDType) (bool, error)
JobExists(id JobIDType) bool
GetJob(id JobIDType) ReadOnlyJob
GetJobsCount() int
}
type jobs struct {
jobs map[JobIDType]*job
finishedJobs map[JobIDType]*job
mutex sync.Mutex
}
// compile time check if jobs implements JobsInterface
var _ JobsInterface = (*jobs)(nil)
func (jq *jobs) GetJobsCount() int {
jq.mutex.Lock()
defer jq.mutex.Unlock()
return len(jq.jobs)
}
func (jq *jobs) Cleanup() {
jq.mutex.Lock()
defer jq.mutex.Unlock()
for id, job := range jq.jobs {
if job.Status == JobFinished {
jq.finishedJobs[id] = job
delete(jq.jobs, id)
}
}
}
func (jq *jobs) GetFinishedJobs() map[JobIDType]ReadOnlyJob {
jq.mutex.Lock()
defer jq.mutex.Unlock()
jobs := make(map[JobIDType]ReadOnlyJob)
for id, job := range jq.finishedJobs {
jobs[id] = job // Implizites Casting zu ReadOnlyJob
}
return jobs
}
func (jq *jobs) GetFinishedJob(id JobIDType) ReadOnlyJob {
jq.mutex.Lock()
defer jq.mutex.Unlock()
if _, exists := jq.finishedJobs[id]; !exists {
return nil
}
return jq.finishedJobs[id]
}
func (jq *jobs) RemoveFinishedJob(id JobIDType) (bool, error) {
jq.mutex.Lock()
defer jq.mutex.Unlock()
if _, exists := jq.finishedJobs[id]; !exists {
return false, ErrJobNotFound
}
// Update internal data structures.
delete(jq.finishedJobs, id)
return true, nil
}
// GetJobs returns a map of all jobs.
func (jq *jobs) GetJobs() map[JobIDType]ReadOnlyJob {
jq.mutex.Lock()
defer jq.mutex.Unlock()
jobs := make(map[JobIDType]ReadOnlyJob)
for id, job := range jq.jobs {
jobs[id] = job // Implizites Casting zu ReadOnlyJob
}
return jobs
}
// GetJobs returns a map of all jobs.
func (jq *jobs) GetExecutableJobs() map[JobIDType]ReadOnlyJob {
jq.mutex.Lock()
defer jq.mutex.Unlock()
jobs := make(map[JobIDType]ReadOnlyJob)
tempJobs := make(map[JobIDType]*job)
for _, job := range jq.jobs {
if job.Status != JobPending {
continue
}
if job.NextRun.After(time.Now()) {
continue
}
tempJobs[job.Id] = job
}
sortedJobIDs, err := topologicalSortJobs(tempJobs)
if err != nil {
return nil
}
for _, id := range sortedJobIDs {
job := jq.jobs[id]
job.Status = JobScheduled
jobs[id] = jq.jobs[id]
}
return jobs
}
func (jq *jobs) JobExists(id JobIDType) bool {
jq.mutex.Lock()
defer jq.mutex.Unlock()
if _, exists := jq.jobs[id]; !exists {
return false
}
return true
}
func (jq *jobs) GetJob(id JobIDType) ReadOnlyJob {
jq.mutex.Lock()
defer jq.mutex.Unlock()
if _, exists := jq.jobs[id]; !exists {
return nil
}
return jq.jobs[id]
}
// NewJobs creates a new job queue.
func NewJobs() *jobs {
jq := &jobs{
jobs: make(map[JobIDType]*job),
finishedJobs: make(map[JobIDType]*job),
mutex: sync.Mutex{},
}
return jq
}
// AddJob adds a new job to the queue.
func (jq *jobs) AddJob(jobSpec JobSpecification, runnable Runnable) error {
jq.mutex.Lock()
defer jq.mutex.Unlock()
job := newJob(jobSpec)
job.runnable = runnable
if _, exists := jq.jobs[job.Id]; exists {
return ErrJobAlreadyExists
}
for _, dep := range job.Dependencies {
if _, exists := jq.jobs[dep]; !exists {
return ErrUnknownDependency
}
}
jq.jobs[job.Id] = job
return nil
}
// RemoveJob removes a job from the queue.
func (jq *jobs) RemoveJob(id JobIDType) (bool, error) {
jq.mutex.Lock()
defer jq.mutex.Unlock()
if _, exists := jq.jobs[id]; !exists {
return false, ErrJobNotFound
}
// check if job is a dependency of another job
for _, job := range jq.jobs {
for _, dep := range job.Dependencies {
if dep == id {
return false, ErrJobIsDependency
}
}
}
// Update internal data structures.
delete(jq.jobs, id)
return true, nil
}
// GetJobStatus returns the status of a job.
func (jq *jobs) GetJobStatus(id JobIDType) (JobStatus, error) {
jq.mutex.Lock()
defer jq.mutex.Unlock()
if _, exists := jq.jobs[id]; !exists {
return JobStatus(0), ErrJobNotFound
}
return jq.jobs[id].Status, nil
}