diff --git a/event-bus.go b/event-bus.go index 6f957d6c7c7c424168686b859d636a8822bacf4e..b62c7badd18f31c240842d9d6b9bc3c5a6adc182 100644 --- a/event-bus.go +++ b/event-bus.go @@ -9,12 +9,9 @@ type EventName string const ( JobAdded EventName = "JobAdded" - JobRemoved EventName = "JobRemoved" - ExecuteJob EventName = "ExecuteJob" JobReady EventName = "JobReady" QueueJob EventName = "QueueJob" JobFinished EventName = "JobFinished" - // add more events as needed ) type Event struct { @@ -66,11 +63,13 @@ func (eb *EventBus) Publish(name EventName, data interface{}) { defer eb.mu.RUnlock() if channels, found := eb.subscribers[name]; found { for _, ch := range channels { - ch <- Event{ - Name: name, - Data: data, - MessageID: uuid.New().String(), - } + go func(ch chan interface{}) { + ch <- Event{ + Name: name, + Data: data, + MessageID: uuid.New().String(), + } + }(ch) } } } diff --git a/issue-1_test.go b/issue-1_test.go index bd90bb876af9f82c2a8dfbaf669f1af62001879c..e0153b2274cce411346ffe29d95c9730a5cc89ff 100644 --- a/issue-1_test.go +++ b/issue-1_test.go @@ -86,6 +86,7 @@ func TestRoundTrip(t *testing.T) { cronInstance.Start() zapLogger, _ := zap.NewDevelopment() + _ = zapLogger manager := NewManager() manager.SetLogger(&ZapAdapter{logger: zapLogger}) diff --git a/queue.go b/queue.go index c850f61917090b72476ac122439a7370026e8a26..49f9ba3160d0cc0344eccd4183e88fb976cf6fcf 100644 --- a/queue.go +++ b/queue.go @@ -1,10 +1,10 @@ package jobqueue import ( - "fmt" "sync" ) +// Queue is a job queue type Queue struct { jobMap map[JobID]GenericJob pendingDependencies map[JobID][]JobID @@ -15,6 +15,7 @@ type Queue struct { manger *Manager } +// NewQueue initializes a new Queue func NewQueue(EventBus *EventBus) *Queue { return &Queue{ jobMap: make(map[JobID]GenericJob), @@ -25,10 +26,13 @@ func NewQueue(EventBus *EventBus) *Queue { } } +// SetManager sets the manager for the queue +// The manager is mainly used for logging func (q *Queue) SetManager(m *Manager) { q.manger = m } +// Enqueue adds a job to the queue func (q *Queue) Enqueue(job GenericJob) error { q.mu.Lock() defer q.mu.Unlock() @@ -71,20 +75,33 @@ func (q *Queue) Enqueue(job GenericJob) error { readyJobList = append(readyJobList, readyJob) } + currentReadyJobIDs := make(map[JobID]struct{}) + for _, job := range readyJobList { + currentReadyJobIDs[job.GetID()] = struct{}{} + } + + fullJobList := []GenericJob{} + for _, job := range readyJobList { + fullJobList = append(fullJobList, job) + } + for id := range q.processedJobs { - readyJobList = append(readyJobList, q.jobMap[id]) + fullJobList = append(fullJobList, q.jobMap[id]) } - sortedIDs, err := topologicalSortJobs(readyJobList) + sortedIDs, err := topologicalSortJobs(fullJobList) + if err != nil { return err } - // Reorder q.readyQueue based on sorted job IDs - newReadyQueue := make([]GenericJob, len(sortedIDs)) - for i, id := range sortedIDs { - newReadyQueue[i] = q.jobMap[id] + newReadyQueue := make([]GenericJob, 0) + for _, id := range sortedIDs { + if _, exists := currentReadyJobIDs[id]; exists { + newReadyQueue = append(newReadyQueue, q.jobMap[id]) + } } + q.readyQueue = newReadyQueue if q.eventBus != nil && len(q.readyQueue) > 0 { @@ -92,7 +109,7 @@ func (q *Queue) Enqueue(job GenericJob) error { q.manger.logger.Info("Job ready", "job_id", job.GetID()) } - q.eventBus.Publish(JobReady, job.GetID()) + q.eventBus.Publish(JobReady, nil) } } @@ -100,6 +117,7 @@ func (q *Queue) Enqueue(job GenericJob) error { return nil } +// Dequeue removes a job from the queue func (q *Queue) Dequeue() (GenericJob, error) { q.mu.Lock() defer q.mu.Unlock() @@ -114,11 +132,10 @@ func (q *Queue) Dequeue() (GenericJob, error) { // Mark the job as processed but keep it in the jobMap for dependency resolution q.processedJobs[job.GetID()] = struct{}{} - fmt.Printf("Dequeue: jobs in ready queue: %v, jobs in processed: %v, current job: %v\n", q.readyQueue, q.processedJobs, job.GetID()) - return job, nil } +// removeJobID removes a jobID from a slice of jobIDs func removeJobID(deps []JobID, id JobID) []JobID { for i, dep := range deps { if dep == id { diff --git a/queue_test.go b/queue_test.go index 301de56733f86e309ffde2760d8c54393e5b7935..1040d95ac57901611bcb27595417b2701cb250fc 100644 --- a/queue_test.go +++ b/queue_test.go @@ -1,7 +1,11 @@ package jobqueue import ( + "errors" + "fmt" + "math/rand" "testing" + "time" ) func TestEnqueueJobAlreadyExists(t *testing.T) { @@ -217,4 +221,174 @@ func TestJobWithSelfAsDependency(t *testing.T) { } } -// Continue with other test cases... +func TestEnqueueDequeue(t *testing.T) { + q := NewQueue(nil) + job1 := NewJob[DummyResult]("job1", nil) + job2 := NewJob[DummyResult]("job2", nil) + job3 := NewJob[DummyResult]("job3", nil) + + job2.dependencies = []JobID{"job1"} + job3.dependencies = []JobID{"job2"} + + // Enqueue a job with dependencies that haven't been enqueued yet + if err := q.Enqueue(job3); err != nil { + t.Fatalf("Failed to enqueue job3: %v", err) + } + + // Try to dequeue from an empty readyQueue + if _, err := q.Dequeue(); !errors.Is(err, ErrQueueEmpty) { + t.Fatalf("Expected ErrQueueEmpty, got: %v", err) + } + + // Enqueue a job with no dependencies + if err := q.Enqueue(job1); err != nil { + t.Fatalf("Failed to enqueue job1: %v", err) + } + + // Enqueue a job that has its dependencies met + if err := q.Enqueue(job2); err != nil { + t.Fatalf("Failed to enqueue job2: %v", err) + } + + // Try to dequeue jobs in the expected order + if job, err := q.Dequeue(); err != nil || job.GetID() != "job1" { + t.Fatalf("Expected job1, got: %v, %v", job, err) + } + + if job, err := q.Dequeue(); err != nil || job.GetID() != "job2" { + t.Fatalf("Expected job2, got: %v, %v", job, err) + } + + if job, err := q.Dequeue(); err != nil || job.GetID() != "job3" { + t.Fatalf("Expected job3, got: %v, %v", job, err) + } +} + +func TestDuplicateEnqueue(t *testing.T) { + q := NewQueue(nil) + job1 := NewJob[DummyResult]("job1", nil) + + // Enqueue the same job twice + if err := q.Enqueue(job1); err != nil { + t.Fatalf("Failed to enqueue job1: %v", err) + } + if err := q.Enqueue(job1); !errors.Is(err, ErrJobAlreadyExists) { + t.Fatalf("Expected ErrJobAlreadyExists, got: %v", err) + } +} + +func TestCircularDependency(t *testing.T) { + q := NewQueue(nil) + + job1 := NewJob[DummyResult]("job1", nil) + job2 := NewJob[DummyResult]("job2", nil) + + job1.dependencies = []JobID{"job2"} + job2.dependencies = []JobID{"job1"} + + if err := q.Enqueue(job1); err != nil { + t.Fatalf("Failed to enqueue job1: %v", err) + } + // This enqueue should fail if you have logic to detect circular dependencies + if err := q.Enqueue(job2); err == nil { + t.Fatalf("Expected an error due to circular dependency, got: %v", err) + } +} + +// TestFuzzyEnqueueDequeue tests the queue by generating random jobs and +// you can run it with: +// go test -v -fuzz TestFuzzyEnqueueDequeue +// or with a fuzztime of 10 seconds: +// go test -v -run -fuzz TestFuzzyEnqueueDequeue -fuzztime=10s +func TestFuzzyEnqueueDequeue(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + q := NewQueue(nil) + + // Generate random jobs + jobCount := 1000 + jobs := make([]GenericJob, jobCount) + for i := 0; i < jobCount; i++ { + jobID := JobID(fmt.Sprintf("job%d", i)) + jobs[i] = NewJob[DummyResult](jobID, nil) + } + + // Randomly set dependencies for each job + for _, job := range jobs { + depCount := rand.Intn(5) // up to 5 dependencies + deps := make([]JobID, depCount) + for i := 0; i < depCount; i++ { + depIndex := rand.Intn(jobCount) + deps[i] = jobs[depIndex].GetID() + } + job.(*Job[DummyResult]).dependencies = deps + } + + // Randomly enqueue jobs + for i := 0; i < jobCount; i++ { + index := rand.Intn(len(jobs)) + job := jobs[index] + q.Enqueue(job) + jobs = append(jobs[:index], jobs[index+1:]...) + } + + // Randomly dequeue jobs, ignoring errors for this fuzzy test + for i := 0; i < jobCount; i++ { + _, _ = q.Dequeue() + } + + // Verify that the queue is empty + if _, err := q.Dequeue(); !errors.Is(err, ErrQueueEmpty) { + t.Fatalf("Expected ErrQueueEmpty, got: %v", err) + } +} + +func TestEnqueueDequeueOrder(t *testing.T) { + eventBus := NewEventBus() // Ihre EventBus-Initialisierung + q := NewQueue(eventBus) + + job1 := NewJob[DummyResult]("job1", nil) + job2 := NewJob[DummyResult]("job2", nil) + job3 := NewJob[DummyResult]("job3", nil) + + job2.AddDependency(JobID("job1")) + job3.AddDependency(JobID("job2")) + + if err := q.Enqueue(job1); err != nil { + t.Fatalf("Failed to enqueue job1: %s", err) + } + + if err := q.Enqueue(job3); err != nil { + t.Fatalf("Failed to enqueue job3: %s", err) + } + + if err := q.Enqueue(job2); err != nil { + t.Fatalf("Failed to enqueue job2: %s", err) + } + + dequeuedJob, err := q.Dequeue() + if err != nil { + t.Fatalf("Failed to dequeue: %s", err) + } + + if dequeuedJob.GetID() != "job1" { + t.Errorf("Expected job1, got %s", dequeuedJob.GetID()) + } + + dequeuedJob, err = q.Dequeue() + if err != nil { + t.Fatalf("Failed to dequeue: %s", err) + } + + if dequeuedJob.GetID() != "job2" { + t.Errorf("Expected job2, got %s", dequeuedJob.GetID()) + } + + dequeuedJob, err = q.Dequeue() + if err != nil { + t.Fatalf("Failed to dequeue: %s", err) + } + + if dequeuedJob.GetID() != "job3" { + t.Errorf("Expected job3, got %s", dequeuedJob.GetID()) + } +} diff --git a/worker.go b/worker.go index f09dd8c1068dc2d532d1a9c23fd41bf176069022..d6a8254262392ebba50ad15c51ee6169d70827c6 100644 --- a/worker.go +++ b/worker.go @@ -86,7 +86,7 @@ func (w *LocalWorker) Start() error { w.wg.Wait() w.status = WorkerStatusRunning - if w.manager != nil { + if w.manager != nil && w.manager.logger != nil { w.manager.logger.Info("Worker started", "worker", w.ID) } @@ -168,16 +168,8 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel retries-- } - //select { - //case <-cancelChan: - // cancel() - - //case <-ctx.Done(): - // cancel() - - //default: cancel() - //} + case <-stopChan: stopFlag = true break