// Copyright 2023 schukai GmbH // SPDX-License-Identifier: AGPL-3.0 package jobqueue import ( "sync" "time" ) // MaxAge is the maximum age of a job in the processed jobs map const MaxAge = 24 * time.Hour // MaxProcessedJobs is the maximum number of jobs in the processed jobs map const MaxProcessedJobs = 50000 type ProcessedJobInfo struct { ProcessedTime time.Time ID JobID } // Queue is a job queue type Queue struct { jobMap map[JobID]GenericJob pendingDependencies map[JobID][]JobID readyQueue []GenericJob processedJobs []ProcessedJobInfo eventBus *EventBus mu sync.Mutex manger *Manager } // NewQueue initializes a new Queue func NewQueue(EventBus *EventBus) *Queue { return &Queue{ jobMap: make(map[JobID]GenericJob), pendingDependencies: make(map[JobID][]JobID), readyQueue: []GenericJob{}, processedJobs: []ProcessedJobInfo{}, eventBus: EventBus, } } // SetManager sets the manager for the queue // The manager is mainly used for logging func (q *Queue) SetManager(m *Manager) { q.manger = m } // Enqueue adds a job to the queue func (q *Queue) Enqueue(job GenericJob) error { q.mu.Lock() defer q.mu.Unlock() if _, exists := q.jobMap[job.GetID()]; !exists { q.jobMap[job.GetID()] = job } for _, readyJob := range q.readyQueue { if readyJob.GetID() == job.GetID() { return ErrJobAlreadyExists } } // Check if this job is a dependency for any pending jobs for pendingJobID, pendingDeps := range q.pendingDependencies { q.pendingDependencies[pendingJobID] = removeJobID(pendingDeps, job.GetID()) if len(q.pendingDependencies[pendingJobID]) == 0 { q.readyQueue = append(q.readyQueue, q.jobMap[pendingJobID]) delete(q.pendingDependencies, pendingJobID) } } // Check this job's dependencies var unmetDependencies []JobID for _, depID := range job.GetDependencies() { if _, ok := q.jobMap[depID]; !ok { unmetDependencies = append(unmetDependencies, depID) } } if len(unmetDependencies) > 0 { q.pendingDependencies[job.GetID()] = unmetDependencies } else { q.readyQueue = append(q.readyQueue, job) // Run topological sort on jobs in the ready queue readyJobList := []GenericJob{} readyJobList = append(readyJobList, q.readyQueue...) currentReadyJobIDs := make(map[JobID]struct{}) for _, job := range readyJobList { currentReadyJobIDs[job.GetID()] = struct{}{} } var fullJobList []GenericJob fullJobList = append(fullJobList, readyJobList...) for i := range q.processedJobs { id := q.processedJobs[i].ID fullJobList = append(fullJobList, q.jobMap[id]) } sortedIDs, err := topologicalSortJobs(fullJobList) if err != nil { return err } newReadyQueue := make([]GenericJob, 0) for _, id := range sortedIDs { if _, exists := currentReadyJobIDs[id]; exists { newReadyQueue = append(newReadyQueue, q.jobMap[id]) } } q.readyQueue = newReadyQueue if q.eventBus != nil && len(q.readyQueue) > 0 { Info("Job ready", "job_id", job.GetID()) q.eventBus.Publish(JobReady, nil) } } return nil } // ClearProcessedJobs removes old jobs from the processed jobs map func (q *Queue) ClearProcessedJobs() { q.mu.Lock() defer q.mu.Unlock() // remove old jobs currentTime := time.Now() cutoffIndex := 0 for i, jobInfo := range q.processedJobs { if currentTime.Sub(jobInfo.ProcessedTime) <= MaxAge { cutoffIndex = i break } } q.processedJobs = q.processedJobs[cutoffIndex:] // if the processed jobs map is too large, remove the oldest jobs if len(q.processedJobs) > MaxProcessedJobs { startIdx := len(q.processedJobs) - MaxProcessedJobs q.processedJobs = q.processedJobs[startIdx:] } } func (q *Queue) IsDependency(id JobID) bool { for _, deps := range q.pendingDependencies { for _, depID := range deps { if depID == id { return true } } } return false } // Dequeue removes a job from the queue func (q *Queue) Dequeue() (GenericJob, error) { q.mu.Lock() defer q.mu.Unlock() if len(q.readyQueue) == 0 { return nil, ErrQueueEmpty } job := q.readyQueue[0] q.readyQueue = q.readyQueue[1:] // Mark the job as processed but keep it in the jobMap for dependency resolution q.processedJobs = append(q.processedJobs, ProcessedJobInfo{ ProcessedTime: time.Now(), ID: job.GetID(), }) return job, nil } // removeJobID removes a jobID from a slice of jobIDs func removeJobID(deps []JobID, id JobID) []JobID { for i, dep := range deps { if dep == id { deps[i] = deps[len(deps)-1] return deps[:len(deps)-1] } } return deps }