Something went wrong on our end
Select Git revision
Monster.DOM.Template.html
-
Volker Schukai authoredVolker Schukai authored
queue.go 3.46 KiB
package jobqueue
import (
"sync"
)
// Queue is a job queue
type Queue struct {
jobMap map[JobID]GenericJob
pendingDependencies map[JobID][]JobID
readyQueue []GenericJob
processedJobs map[JobID]struct{}
eventBus *EventBus
mu sync.Mutex
manger *Manager
}
// NewQueue initializes a new Queue
func NewQueue(EventBus *EventBus) *Queue {
return &Queue{
jobMap: make(map[JobID]GenericJob),
pendingDependencies: make(map[JobID][]JobID),
readyQueue: []GenericJob{},
processedJobs: make(map[JobID]struct{}),
eventBus: EventBus,
}
}
// SetManager sets the manager for the queue
// The manager is mainly used for logging
func (q *Queue) SetManager(m *Manager) {
q.manger = m
}
// Enqueue adds a job to the queue
func (q *Queue) Enqueue(job GenericJob) error {
q.mu.Lock()
defer q.mu.Unlock()
if _, exists := q.jobMap[job.GetID()]; !exists {
q.jobMap[job.GetID()] = job
}
for _, readyJob := range q.readyQueue {
if readyJob.GetID() == job.GetID() {
return ErrJobAlreadyExists
}
}
// Check if this job is a dependency for any pending jobs
for pendingJobID, pendingDeps := range q.pendingDependencies {
q.pendingDependencies[pendingJobID] = removeJobID(pendingDeps, job.GetID())
if len(q.pendingDependencies[pendingJobID]) == 0 {
q.readyQueue = append(q.readyQueue, q.jobMap[pendingJobID])
delete(q.pendingDependencies, pendingJobID)
}
}
// Check this job's dependencies
var unmetDependencies []JobID
for _, depID := range job.GetDependencies() {
if _, ok := q.jobMap[depID]; !ok {
unmetDependencies = append(unmetDependencies, depID)
}
}
if len(unmetDependencies) > 0 {
q.pendingDependencies[job.GetID()] = unmetDependencies
} else {
q.readyQueue = append(q.readyQueue, job)
// Run topological sort on jobs in the ready queue
readyJobList := []GenericJob{}
for _, readyJob := range q.readyQueue {
readyJobList = append(readyJobList, readyJob)
}
currentReadyJobIDs := make(map[JobID]struct{})
for _, job := range readyJobList {
currentReadyJobIDs[job.GetID()] = struct{}{}
}
fullJobList := []GenericJob{}
for _, job := range readyJobList {
fullJobList = append(fullJobList, job)
}
for id := range q.processedJobs {
fullJobList = append(fullJobList, q.jobMap[id])
}
sortedIDs, err := topologicalSortJobs(fullJobList)
if err != nil {
return err
}
newReadyQueue := make([]GenericJob, 0)
for _, id := range sortedIDs {
if _, exists := currentReadyJobIDs[id]; exists {
newReadyQueue = append(newReadyQueue, q.jobMap[id])
}
}
q.readyQueue = newReadyQueue
if q.eventBus != nil && len(q.readyQueue) > 0 {
if q.manger != nil && q.manger.logger != nil {
q.manger.logger.Info("Job ready", "job_id", job.GetID())
}
q.eventBus.Publish(JobReady, nil)
}
}
return nil
}
// Dequeue removes a job from the queue
func (q *Queue) Dequeue() (GenericJob, error) {
q.mu.Lock()
defer q.mu.Unlock()
if len(q.readyQueue) == 0 {
return nil, ErrQueueEmpty
}
job := q.readyQueue[0]
q.readyQueue = q.readyQueue[1:]
// Mark the job as processed but keep it in the jobMap for dependency resolution
q.processedJobs[job.GetID()] = struct{}{}
return job, nil
}
// removeJobID removes a jobID from a slice of jobIDs
func removeJobID(deps []JobID, id JobID) []JobID {
for i, dep := range deps {
if dep == id {
deps[i] = deps[len(deps)-1]
return deps[:len(deps)-1]
}
}
return deps
}