Skip to content
Snippets Groups Projects
Verified Commit 6cd7bef3 authored by Volker Schukai's avatar Volker Schukai :alien:
Browse files

fix: Sorting and dependency does not work #2

parent b8bcd593
Branches
Tags v1.2.0
No related merge requests found
......@@ -9,12 +9,9 @@ type EventName string
const (
JobAdded EventName = "JobAdded"
JobRemoved EventName = "JobRemoved"
ExecuteJob EventName = "ExecuteJob"
JobReady EventName = "JobReady"
QueueJob EventName = "QueueJob"
JobFinished EventName = "JobFinished"
// add more events as needed
)
type Event struct {
......@@ -66,11 +63,13 @@ func (eb *EventBus) Publish(name EventName, data interface{}) {
defer eb.mu.RUnlock()
if channels, found := eb.subscribers[name]; found {
for _, ch := range channels {
go func(ch chan interface{}) {
ch <- Event{
Name: name,
Data: data,
MessageID: uuid.New().String(),
}
}(ch)
}
}
}
......@@ -86,6 +86,7 @@ func TestRoundTrip(t *testing.T) {
cronInstance.Start()
zapLogger, _ := zap.NewDevelopment()
_ = zapLogger
manager := NewManager()
manager.SetLogger(&ZapAdapter{logger: zapLogger})
......
package jobqueue
import (
"fmt"
"sync"
)
// Queue is a job queue
type Queue struct {
jobMap map[JobID]GenericJob
pendingDependencies map[JobID][]JobID
......@@ -15,6 +15,7 @@ type Queue struct {
manger *Manager
}
// NewQueue initializes a new Queue
func NewQueue(EventBus *EventBus) *Queue {
return &Queue{
jobMap: make(map[JobID]GenericJob),
......@@ -25,10 +26,13 @@ func NewQueue(EventBus *EventBus) *Queue {
}
}
// SetManager sets the manager for the queue
// The manager is mainly used for logging
func (q *Queue) SetManager(m *Manager) {
q.manger = m
}
// Enqueue adds a job to the queue
func (q *Queue) Enqueue(job GenericJob) error {
q.mu.Lock()
defer q.mu.Unlock()
......@@ -71,20 +75,33 @@ func (q *Queue) Enqueue(job GenericJob) error {
readyJobList = append(readyJobList, readyJob)
}
currentReadyJobIDs := make(map[JobID]struct{})
for _, job := range readyJobList {
currentReadyJobIDs[job.GetID()] = struct{}{}
}
fullJobList := []GenericJob{}
for _, job := range readyJobList {
fullJobList = append(fullJobList, job)
}
for id := range q.processedJobs {
readyJobList = append(readyJobList, q.jobMap[id])
fullJobList = append(fullJobList, q.jobMap[id])
}
sortedIDs, err := topologicalSortJobs(readyJobList)
sortedIDs, err := topologicalSortJobs(fullJobList)
if err != nil {
return err
}
// Reorder q.readyQueue based on sorted job IDs
newReadyQueue := make([]GenericJob, len(sortedIDs))
for i, id := range sortedIDs {
newReadyQueue[i] = q.jobMap[id]
newReadyQueue := make([]GenericJob, 0)
for _, id := range sortedIDs {
if _, exists := currentReadyJobIDs[id]; exists {
newReadyQueue = append(newReadyQueue, q.jobMap[id])
}
}
q.readyQueue = newReadyQueue
if q.eventBus != nil && len(q.readyQueue) > 0 {
......@@ -92,7 +109,7 @@ func (q *Queue) Enqueue(job GenericJob) error {
q.manger.logger.Info("Job ready", "job_id", job.GetID())
}
q.eventBus.Publish(JobReady, job.GetID())
q.eventBus.Publish(JobReady, nil)
}
}
......@@ -100,6 +117,7 @@ func (q *Queue) Enqueue(job GenericJob) error {
return nil
}
// Dequeue removes a job from the queue
func (q *Queue) Dequeue() (GenericJob, error) {
q.mu.Lock()
defer q.mu.Unlock()
......@@ -114,11 +132,10 @@ func (q *Queue) Dequeue() (GenericJob, error) {
// Mark the job as processed but keep it in the jobMap for dependency resolution
q.processedJobs[job.GetID()] = struct{}{}
fmt.Printf("Dequeue: jobs in ready queue: %v, jobs in processed: %v, current job: %v\n", q.readyQueue, q.processedJobs, job.GetID())
return job, nil
}
// removeJobID removes a jobID from a slice of jobIDs
func removeJobID(deps []JobID, id JobID) []JobID {
for i, dep := range deps {
if dep == id {
......
package jobqueue
import (
"errors"
"fmt"
"math/rand"
"testing"
"time"
)
func TestEnqueueJobAlreadyExists(t *testing.T) {
......@@ -217,4 +221,174 @@ func TestJobWithSelfAsDependency(t *testing.T) {
}
}
// Continue with other test cases...
func TestEnqueueDequeue(t *testing.T) {
q := NewQueue(nil)
job1 := NewJob[DummyResult]("job1", nil)
job2 := NewJob[DummyResult]("job2", nil)
job3 := NewJob[DummyResult]("job3", nil)
job2.dependencies = []JobID{"job1"}
job3.dependencies = []JobID{"job2"}
// Enqueue a job with dependencies that haven't been enqueued yet
if err := q.Enqueue(job3); err != nil {
t.Fatalf("Failed to enqueue job3: %v", err)
}
// Try to dequeue from an empty readyQueue
if _, err := q.Dequeue(); !errors.Is(err, ErrQueueEmpty) {
t.Fatalf("Expected ErrQueueEmpty, got: %v", err)
}
// Enqueue a job with no dependencies
if err := q.Enqueue(job1); err != nil {
t.Fatalf("Failed to enqueue job1: %v", err)
}
// Enqueue a job that has its dependencies met
if err := q.Enqueue(job2); err != nil {
t.Fatalf("Failed to enqueue job2: %v", err)
}
// Try to dequeue jobs in the expected order
if job, err := q.Dequeue(); err != nil || job.GetID() != "job1" {
t.Fatalf("Expected job1, got: %v, %v", job, err)
}
if job, err := q.Dequeue(); err != nil || job.GetID() != "job2" {
t.Fatalf("Expected job2, got: %v, %v", job, err)
}
if job, err := q.Dequeue(); err != nil || job.GetID() != "job3" {
t.Fatalf("Expected job3, got: %v, %v", job, err)
}
}
func TestDuplicateEnqueue(t *testing.T) {
q := NewQueue(nil)
job1 := NewJob[DummyResult]("job1", nil)
// Enqueue the same job twice
if err := q.Enqueue(job1); err != nil {
t.Fatalf("Failed to enqueue job1: %v", err)
}
if err := q.Enqueue(job1); !errors.Is(err, ErrJobAlreadyExists) {
t.Fatalf("Expected ErrJobAlreadyExists, got: %v", err)
}
}
func TestCircularDependency(t *testing.T) {
q := NewQueue(nil)
job1 := NewJob[DummyResult]("job1", nil)
job2 := NewJob[DummyResult]("job2", nil)
job1.dependencies = []JobID{"job2"}
job2.dependencies = []JobID{"job1"}
if err := q.Enqueue(job1); err != nil {
t.Fatalf("Failed to enqueue job1: %v", err)
}
// This enqueue should fail if you have logic to detect circular dependencies
if err := q.Enqueue(job2); err == nil {
t.Fatalf("Expected an error due to circular dependency, got: %v", err)
}
}
// TestFuzzyEnqueueDequeue tests the queue by generating random jobs and
// you can run it with:
// go test -v -fuzz TestFuzzyEnqueueDequeue
// or with a fuzztime of 10 seconds:
// go test -v -run -fuzz TestFuzzyEnqueueDequeue -fuzztime=10s
func TestFuzzyEnqueueDequeue(t *testing.T) {
rand.Seed(time.Now().UnixNano())
q := NewQueue(nil)
// Generate random jobs
jobCount := 1000
jobs := make([]GenericJob, jobCount)
for i := 0; i < jobCount; i++ {
jobID := JobID(fmt.Sprintf("job%d", i))
jobs[i] = NewJob[DummyResult](jobID, nil)
}
// Randomly set dependencies for each job
for _, job := range jobs {
depCount := rand.Intn(5) // up to 5 dependencies
deps := make([]JobID, depCount)
for i := 0; i < depCount; i++ {
depIndex := rand.Intn(jobCount)
deps[i] = jobs[depIndex].GetID()
}
job.(*Job[DummyResult]).dependencies = deps
}
// Randomly enqueue jobs
for i := 0; i < jobCount; i++ {
index := rand.Intn(len(jobs))
job := jobs[index]
q.Enqueue(job)
jobs = append(jobs[:index], jobs[index+1:]...)
}
// Randomly dequeue jobs, ignoring errors for this fuzzy test
for i := 0; i < jobCount; i++ {
_, _ = q.Dequeue()
}
// Verify that the queue is empty
if _, err := q.Dequeue(); !errors.Is(err, ErrQueueEmpty) {
t.Fatalf("Expected ErrQueueEmpty, got: %v", err)
}
}
func TestEnqueueDequeueOrder(t *testing.T) {
eventBus := NewEventBus() // Ihre EventBus-Initialisierung
q := NewQueue(eventBus)
job1 := NewJob[DummyResult]("job1", nil)
job2 := NewJob[DummyResult]("job2", nil)
job3 := NewJob[DummyResult]("job3", nil)
job2.AddDependency(JobID("job1"))
job3.AddDependency(JobID("job2"))
if err := q.Enqueue(job1); err != nil {
t.Fatalf("Failed to enqueue job1: %s", err)
}
if err := q.Enqueue(job3); err != nil {
t.Fatalf("Failed to enqueue job3: %s", err)
}
if err := q.Enqueue(job2); err != nil {
t.Fatalf("Failed to enqueue job2: %s", err)
}
dequeuedJob, err := q.Dequeue()
if err != nil {
t.Fatalf("Failed to dequeue: %s", err)
}
if dequeuedJob.GetID() != "job1" {
t.Errorf("Expected job1, got %s", dequeuedJob.GetID())
}
dequeuedJob, err = q.Dequeue()
if err != nil {
t.Fatalf("Failed to dequeue: %s", err)
}
if dequeuedJob.GetID() != "job2" {
t.Errorf("Expected job2, got %s", dequeuedJob.GetID())
}
dequeuedJob, err = q.Dequeue()
if err != nil {
t.Fatalf("Failed to dequeue: %s", err)
}
if dequeuedJob.GetID() != "job3" {
t.Errorf("Expected job3, got %s", dequeuedJob.GetID())
}
}
......@@ -86,7 +86,7 @@ func (w *LocalWorker) Start() error {
w.wg.Wait()
w.status = WorkerStatusRunning
if w.manager != nil {
if w.manager != nil && w.manager.logger != nil {
w.manager.logger.Info("Worker started", "worker", w.ID)
}
......@@ -168,16 +168,8 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel
retries--
}
//select {
//case <-cancelChan:
// cancel()
//case <-ctx.Done():
// cancel()
//default:
cancel()
//}
case <-stopChan:
stopFlag = true
break
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment