package jobqueue import ( "sync" ) // Queue is a job queue type Queue struct { jobMap map[JobID]GenericJob pendingDependencies map[JobID][]JobID readyQueue []GenericJob processedJobs map[JobID]struct{} eventBus *EventBus mu sync.Mutex manger *Manager } // NewQueue initializes a new Queue func NewQueue(EventBus *EventBus) *Queue { return &Queue{ jobMap: make(map[JobID]GenericJob), pendingDependencies: make(map[JobID][]JobID), readyQueue: []GenericJob{}, processedJobs: make(map[JobID]struct{}), eventBus: EventBus, } } // SetManager sets the manager for the queue // The manager is mainly used for logging func (q *Queue) SetManager(m *Manager) { q.manger = m } // Enqueue adds a job to the queue func (q *Queue) Enqueue(job GenericJob) error { q.mu.Lock() defer q.mu.Unlock() if _, exists := q.jobMap[job.GetID()]; !exists { q.jobMap[job.GetID()] = job } for _, readyJob := range q.readyQueue { if readyJob.GetID() == job.GetID() { return ErrJobAlreadyExists } } // Check if this job is a dependency for any pending jobs for pendingJobID, pendingDeps := range q.pendingDependencies { q.pendingDependencies[pendingJobID] = removeJobID(pendingDeps, job.GetID()) if len(q.pendingDependencies[pendingJobID]) == 0 { q.readyQueue = append(q.readyQueue, q.jobMap[pendingJobID]) delete(q.pendingDependencies, pendingJobID) } } // Check this job's dependencies var unmetDependencies []JobID for _, depID := range job.GetDependencies() { if _, ok := q.jobMap[depID]; !ok { unmetDependencies = append(unmetDependencies, depID) } } if len(unmetDependencies) > 0 { q.pendingDependencies[job.GetID()] = unmetDependencies } else { q.readyQueue = append(q.readyQueue, job) // Run topological sort on jobs in the ready queue readyJobList := []GenericJob{} for _, readyJob := range q.readyQueue { readyJobList = append(readyJobList, readyJob) } currentReadyJobIDs := make(map[JobID]struct{}) for _, job := range readyJobList { currentReadyJobIDs[job.GetID()] = struct{}{} } fullJobList := []GenericJob{} for _, job := range readyJobList { fullJobList = append(fullJobList, job) } for id := range q.processedJobs { fullJobList = append(fullJobList, q.jobMap[id]) } sortedIDs, err := topologicalSortJobs(fullJobList) if err != nil { return err } newReadyQueue := make([]GenericJob, 0) for _, id := range sortedIDs { if _, exists := currentReadyJobIDs[id]; exists { newReadyQueue = append(newReadyQueue, q.jobMap[id]) } } q.readyQueue = newReadyQueue if q.eventBus != nil && len(q.readyQueue) > 0 { if q.manger != nil && q.manger.logger != nil { q.manger.logger.Info("Job ready", "job_id", job.GetID()) } q.eventBus.Publish(JobReady, nil) } } return nil } // Dequeue removes a job from the queue func (q *Queue) Dequeue() (GenericJob, error) { q.mu.Lock() defer q.mu.Unlock() if len(q.readyQueue) == 0 { return nil, ErrQueueEmpty } job := q.readyQueue[0] q.readyQueue = q.readyQueue[1:] // Mark the job as processed but keep it in the jobMap for dependency resolution q.processedJobs[job.GetID()] = struct{}{} return job, nil } // removeJobID removes a jobID from a slice of jobIDs func removeJobID(deps []JobID, id JobID) []JobID { for i, dep := range deps { if dep == id { deps[i] = deps[len(deps)-1] return deps[:len(deps)-1] } } return deps }