Skip to content
Snippets Groups Projects
Select Git revision
  • a49793d94847adc67105d17e7dbee70ed38ffbed
  • master default protected
  • v1.23.2
  • v1.23.1
  • v1.23.0
  • v1.22.0
  • v1.21.1
  • v1.21.0
  • v1.20.3
  • v1.20.2
  • v1.20.1
  • v1.20.0
  • v1.19.4
  • v1.19.3
  • v1.19.2
  • v1.19.1
  • v1.19.0
  • v1.18.2
  • v1.18.1
  • v1.18.0
  • v1.17.0
  • v1.16.1
22 results

clean-up.nix

Blame
  • queue.go 3.46 KiB
    package jobqueue
    
    import (
    	"sync"
    )
    
    // Queue is a job queue
    type Queue struct {
    	jobMap              map[JobID]GenericJob
    	pendingDependencies map[JobID][]JobID
    	readyQueue          []GenericJob
    	processedJobs       map[JobID]struct{}
    	eventBus            *EventBus
    	mu                  sync.Mutex
    	manger              *Manager
    }
    
    // NewQueue initializes a new Queue
    func NewQueue(EventBus *EventBus) *Queue {
    	return &Queue{
    		jobMap:              make(map[JobID]GenericJob),
    		pendingDependencies: make(map[JobID][]JobID),
    		readyQueue:          []GenericJob{},
    		processedJobs:       make(map[JobID]struct{}),
    		eventBus:            EventBus,
    	}
    }
    
    // SetManager sets the manager for the queue
    // The manager is mainly used for logging
    func (q *Queue) SetManager(m *Manager) {
    	q.manger = m
    }
    
    // Enqueue adds a job to the queue
    func (q *Queue) Enqueue(job GenericJob) error {
    	q.mu.Lock()
    	defer q.mu.Unlock()
    
    	if _, exists := q.jobMap[job.GetID()]; !exists {
    		q.jobMap[job.GetID()] = job
    	}
    
    	for _, readyJob := range q.readyQueue {
    		if readyJob.GetID() == job.GetID() {
    			return ErrJobAlreadyExists
    		}
    	}
    
    	// Check if this job is a dependency for any pending jobs
    	for pendingJobID, pendingDeps := range q.pendingDependencies {
    		q.pendingDependencies[pendingJobID] = removeJobID(pendingDeps, job.GetID())
    		if len(q.pendingDependencies[pendingJobID]) == 0 {
    			q.readyQueue = append(q.readyQueue, q.jobMap[pendingJobID])
    			delete(q.pendingDependencies, pendingJobID)
    		}
    	}
    
    	// Check this job's dependencies
    	var unmetDependencies []JobID
    	for _, depID := range job.GetDependencies() {
    		if _, ok := q.jobMap[depID]; !ok {
    			unmetDependencies = append(unmetDependencies, depID)
    		}
    	}
    
    	if len(unmetDependencies) > 0 {
    		q.pendingDependencies[job.GetID()] = unmetDependencies
    	} else {
    		q.readyQueue = append(q.readyQueue, job)
    
    		// Run topological sort on jobs in the ready queue
    		readyJobList := []GenericJob{}
    		for _, readyJob := range q.readyQueue {
    			readyJobList = append(readyJobList, readyJob)
    		}
    
    		currentReadyJobIDs := make(map[JobID]struct{})
    		for _, job := range readyJobList {
    			currentReadyJobIDs[job.GetID()] = struct{}{}
    		}
    
    		fullJobList := []GenericJob{}
    		for _, job := range readyJobList {
    			fullJobList = append(fullJobList, job)
    		}
    
    		for id := range q.processedJobs {
    			fullJobList = append(fullJobList, q.jobMap[id])
    		}
    
    		sortedIDs, err := topologicalSortJobs(fullJobList)
    
    		if err != nil {
    			return err
    		}
    
    		newReadyQueue := make([]GenericJob, 0)
    		for _, id := range sortedIDs {
    			if _, exists := currentReadyJobIDs[id]; exists {
    				newReadyQueue = append(newReadyQueue, q.jobMap[id])
    			}
    		}
    
    		q.readyQueue = newReadyQueue
    
    		if q.eventBus != nil && len(q.readyQueue) > 0 {
    			if q.manger != nil && q.manger.logger != nil {
    				q.manger.logger.Info("Job ready", "job_id", job.GetID())
    			}
    
    			q.eventBus.Publish(JobReady, nil)
    		}
    
    	}
    
    	return nil
    }
    
    // Dequeue removes a job from the queue
    func (q *Queue) Dequeue() (GenericJob, error) {
    	q.mu.Lock()
    	defer q.mu.Unlock()
    
    	if len(q.readyQueue) == 0 {
    		return nil, ErrQueueEmpty
    	}
    
    	job := q.readyQueue[0]
    	q.readyQueue = q.readyQueue[1:]
    
    	// Mark the job as processed but keep it in the jobMap for dependency resolution
    	q.processedJobs[job.GetID()] = struct{}{}
    
    	return job, nil
    }
    
    // removeJobID removes a jobID from a slice of jobIDs
    func removeJobID(deps []JobID, id JobID) []JobID {
    	for i, dep := range deps {
    		if dep == id {
    			deps[i] = deps[len(deps)-1]
    			return deps[:len(deps)-1]
    		}
    	}
    
    	return deps
    }