Skip to content
Snippets Groups Projects
Select Git revision
  • 11fa97747f3dfac0ae80283e9974c5c79472ead4
  • master default protected
  • 1.31
  • 4.28.0
  • 4.27.0
  • 4.26.0
  • 4.25.5
  • 4.25.4
  • 4.25.3
  • 4.25.2
  • 4.25.1
  • 4.25.0
  • 4.24.3
  • 4.24.2
  • 4.24.1
  • 4.24.0
  • 4.23.6
  • 4.23.5
  • 4.23.4
  • 4.23.3
  • 4.23.2
  • 4.23.1
  • 4.23.0
23 results

Monster.DOM.Template.html

Blame
  • queue.go 3.46 KiB
    package jobqueue
    
    import (
    	"sync"
    )
    
    // Queue is a job queue
    type Queue struct {
    	jobMap              map[JobID]GenericJob
    	pendingDependencies map[JobID][]JobID
    	readyQueue          []GenericJob
    	processedJobs       map[JobID]struct{}
    	eventBus            *EventBus
    	mu                  sync.Mutex
    	manger              *Manager
    }
    
    // NewQueue initializes a new Queue
    func NewQueue(EventBus *EventBus) *Queue {
    	return &Queue{
    		jobMap:              make(map[JobID]GenericJob),
    		pendingDependencies: make(map[JobID][]JobID),
    		readyQueue:          []GenericJob{},
    		processedJobs:       make(map[JobID]struct{}),
    		eventBus:            EventBus,
    	}
    }
    
    // SetManager sets the manager for the queue
    // The manager is mainly used for logging
    func (q *Queue) SetManager(m *Manager) {
    	q.manger = m
    }
    
    // Enqueue adds a job to the queue
    func (q *Queue) Enqueue(job GenericJob) error {
    	q.mu.Lock()
    	defer q.mu.Unlock()
    
    	if _, exists := q.jobMap[job.GetID()]; !exists {
    		q.jobMap[job.GetID()] = job
    	}
    
    	for _, readyJob := range q.readyQueue {
    		if readyJob.GetID() == job.GetID() {
    			return ErrJobAlreadyExists
    		}
    	}
    
    	// Check if this job is a dependency for any pending jobs
    	for pendingJobID, pendingDeps := range q.pendingDependencies {
    		q.pendingDependencies[pendingJobID] = removeJobID(pendingDeps, job.GetID())
    		if len(q.pendingDependencies[pendingJobID]) == 0 {
    			q.readyQueue = append(q.readyQueue, q.jobMap[pendingJobID])
    			delete(q.pendingDependencies, pendingJobID)
    		}
    	}
    
    	// Check this job's dependencies
    	var unmetDependencies []JobID
    	for _, depID := range job.GetDependencies() {
    		if _, ok := q.jobMap[depID]; !ok {
    			unmetDependencies = append(unmetDependencies, depID)
    		}
    	}
    
    	if len(unmetDependencies) > 0 {
    		q.pendingDependencies[job.GetID()] = unmetDependencies
    	} else {
    		q.readyQueue = append(q.readyQueue, job)
    
    		// Run topological sort on jobs in the ready queue
    		readyJobList := []GenericJob{}
    		for _, readyJob := range q.readyQueue {
    			readyJobList = append(readyJobList, readyJob)
    		}
    
    		currentReadyJobIDs := make(map[JobID]struct{})
    		for _, job := range readyJobList {
    			currentReadyJobIDs[job.GetID()] = struct{}{}
    		}
    
    		fullJobList := []GenericJob{}
    		for _, job := range readyJobList {
    			fullJobList = append(fullJobList, job)
    		}
    
    		for id := range q.processedJobs {
    			fullJobList = append(fullJobList, q.jobMap[id])
    		}
    
    		sortedIDs, err := topologicalSortJobs(fullJobList)
    
    		if err != nil {
    			return err
    		}
    
    		newReadyQueue := make([]GenericJob, 0)
    		for _, id := range sortedIDs {
    			if _, exists := currentReadyJobIDs[id]; exists {
    				newReadyQueue = append(newReadyQueue, q.jobMap[id])
    			}
    		}
    
    		q.readyQueue = newReadyQueue
    
    		if q.eventBus != nil && len(q.readyQueue) > 0 {
    			if q.manger != nil && q.manger.logger != nil {
    				q.manger.logger.Info("Job ready", "job_id", job.GetID())
    			}
    
    			q.eventBus.Publish(JobReady, nil)
    		}
    
    	}
    
    	return nil
    }
    
    // Dequeue removes a job from the queue
    func (q *Queue) Dequeue() (GenericJob, error) {
    	q.mu.Lock()
    	defer q.mu.Unlock()
    
    	if len(q.readyQueue) == 0 {
    		return nil, ErrQueueEmpty
    	}
    
    	job := q.readyQueue[0]
    	q.readyQueue = q.readyQueue[1:]
    
    	// Mark the job as processed but keep it in the jobMap for dependency resolution
    	q.processedJobs[job.GetID()] = struct{}{}
    
    	return job, nil
    }
    
    // removeJobID removes a jobID from a slice of jobIDs
    func removeJobID(deps []JobID, id JobID) []JobID {
    	for i, dep := range deps {
    		if dep == id {
    			deps[i] = deps[len(deps)-1]
    			return deps[:len(deps)-1]
    		}
    	}
    
    	return deps
    }