diff --git a/Taskfile.yml b/Taskfile.yml index b48a18a1f7c7a047e8f4dde1a7d02a53d8b6fcd4..69daba1165ede72b0fd2768dc8d1225d7bf89248 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -20,6 +20,8 @@ tasks: env: TEST_BY_TASK: true cmds: + - docker pull atmoz/sftp:alpine + - docker pull axllent/mailpit - echo "Execute unit tests in Go." - gosec . - go test -tags=runOnTask -cover -v ./... diff --git a/devenv.nix b/devenv.nix index e55f2ceea75c6f150c6edf3be02261cf1a2657a4..db9d80861c80262030452e4e0424ac2504476aad 100644 --- a/devenv.nix +++ b/devenv.nix @@ -336,6 +336,8 @@ tasks: env: TEST_BY_TASK: true cmds: + - docker pull atmoz/sftp:alpine + - docker pull axllent/mailpit - echo "Execute unit tests in Go." - gosec . - go test -tags=runOnTask -cover -v ./... diff --git a/manager_test.go b/manager_test.go index 55c390ff9add5da9be75a88c84cf34edebbb5113..90cf408db5d33582792807b2a06740d3b245809e 100644 --- a/manager_test.go +++ b/manager_test.go @@ -58,7 +58,7 @@ type MockGenericJob struct { } func (m *MockGenericJob) ResetStats() { - + } func (m *MockGenericJob) GetMaxRetries() uint { diff --git a/queue_test.go b/queue_test.go index c263ac7c34f0422797996f064c82d6059e1594dd..7ce5f40bff2af8ef789a01298ffa268301d1df21 100644 --- a/queue_test.go +++ b/queue_test.go @@ -145,8 +145,8 @@ func TestProcessedJobs(t *testing.T) { t.Fatalf("Dequeue failed: %v", err) } - for _, jobInfo := range q.processedJobs { - if jobInfo.ID == job1.GetID() { + for _, jobInfo := range q.readyQueue { + if jobInfo.GetID() == job1.GetID() { t.Fatalf("Job 1 should not be in processedJobs") } } diff --git a/worker.go b/worker.go index 63253ab163790e562f1eee7e41407ff94ca16e59..3b9e95f93175a9cd5f1c38e3f9ac3cc3de619eea 100644 --- a/worker.go +++ b/worker.go @@ -2,6 +2,7 @@ package jobqueue import ( "context" + "errors" "fmt" "github.com/google/uuid" "sync" @@ -33,6 +34,29 @@ type Worker interface { SetManager(manager *Manager) } +type Statistic struct { + TotalThreads int + ActiveThreads int + JobsAssigned int + JobsCompleted int + FailedJobs int + TotalExecutionTime time.Duration +} + +func (s *Statistic) AverageExecutionTime() time.Duration { + if s.JobsCompleted == 0 { + return 0 + } + return s.TotalExecutionTime / time.Duration(s.JobsCompleted) +} + +func (s *Statistic) UtilizationRate() float64 { + if s.TotalThreads == 0 { + return 0 + } + return float64(s.ActiveThreads) / float64(s.TotalThreads) * 100 +} + // GenericWorker is a generic worker type GenericWorker struct { ID WorkerID @@ -47,8 +71,10 @@ type LocalWorker struct { cancelChans []chan bool maxJobs int mu sync.Mutex + statisticMu sync.Mutex wg sync.WaitGroup manager *Manager + statistic Statistic } // GetID returns the ID of the worker @@ -58,7 +84,7 @@ func (w *GenericWorker) GetID() WorkerID { // NewLocalWorker creates a new local worker func NewLocalWorker(maxJobs int) *LocalWorker { - w := &LocalWorker{maxJobs: maxJobs} + w := &LocalWorker{maxJobs: maxJobs, statistic: Statistic{TotalThreads: maxJobs}} w.jobChannels = make([]chan GenericJob, maxJobs) w.stopChans = make([]chan bool, maxJobs) w.cancelChans = make([]chan bool, maxJobs) @@ -94,6 +120,27 @@ func (w *LocalWorker) Start() error { return nil } +// UpdateStatisticExtended updates the worker's statistics with job execution details +func (w *LocalWorker) UpdateStatisticExtended(jobDuration time.Duration, jobFailed bool) { + w.statisticMu.Lock() + defer w.statisticMu.Unlock() + + if jobFailed { + w.statistic.FailedJobs++ + } else { + w.statistic.TotalExecutionTime += jobDuration + w.statistic.JobsCompleted++ + } + +} + +// GetStatistic returns the current statistics of the worker +func (w *LocalWorker) GetStatistic() Statistic { + w.mu.Lock() + defer w.mu.Unlock() + return w.statistic +} + func (w *LocalWorker) SetManager(manager *Manager) { w.mu.Lock() defer w.mu.Unlock() @@ -136,10 +183,18 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel select { case job := <-jobChannel: + + w.statisticMu.Lock() + w.statistic.JobsAssigned++ + w.statistic.ActiveThreads++ + w.statisticMu.Unlock() + ctx, cancel := context.WithCancel(context.Background()) retries := job.GetMaxRetries() retryDelay := job.GetRetryDelay() + startTime := time.Now() + if retries == 0 { retries = 1 } @@ -159,12 +214,18 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel } _, err = job.Execute(ctx) + jobFailed := false + if err != nil { + jobFailed = true + } + + w.UpdateStatisticExtended(time.Since(startTime), jobFailed) if cancel != nil { cancel() } - if err == nil || ctx.Err() == context.Canceled { + if err == nil || errors.Is(ctx.Err(), context.Canceled) { break } @@ -186,6 +247,10 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel } } + w.statisticMu.Lock() + w.statistic.ActiveThreads-- + w.statisticMu.Unlock() + case <-stopChan: stopFlag = true break diff --git a/worker_test.go b/worker_test.go index 2cc3994f51c06083bfc872af9aea49010331bb72..464bb6cdec690df4d63d1f62d08df17c131777c4 100644 --- a/worker_test.go +++ b/worker_test.go @@ -8,7 +8,8 @@ import ( ) type DummyJob struct { - id JobID + id JobID + sleepTime time.Duration } func (j DummyJob) GetID() JobID { @@ -32,7 +33,7 @@ func (j DummyJob) IsPaused() bool { } func (j DummyJob) ResetStats() { - + } func (j DummyJob) GetMaxRetries() uint { @@ -48,7 +49,9 @@ func (j DummyJob) GetTimeout() time.Duration { } func (j DummyJob) Execute(_ context.Context) (RunGenericResult, error) { - time.Sleep(100 * time.Millisecond) + if j.sleepTime > 0 { + time.Sleep(j.sleepTime) + } return nil, nil } @@ -231,3 +234,46 @@ func TestCancelJob(t *testing.T) { t.Errorf("Stop() returned error: %v", err) } } + +func TestLocalWorker_Statistics(t *testing.T) { + maxJobs := 3 + worker := NewLocalWorker(maxJobs) + + // Start the worker + if err := worker.Start(); err != nil { + t.Fatalf("failed to start worker: %v", err) + } + + // Assign a job with a longer execution time + mockJob1 := DummyJob{id: JobID("1"), sleepTime: 30 * time.Millisecond} + if err := worker.AssignJob(mockJob1); err != nil { + t.Fatalf("failed to assign job: %v", err) + } + // Assign a job with a longer execution time + mockJob2 := DummyJob{id: JobID("2"), sleepTime: 10 * time.Millisecond} + if err := worker.AssignJob(mockJob2); err != nil { + t.Fatalf("failed to assign job: %v", err) + } + + // Loop to check statistics every 10ms + timeout := time.After(100 * time.Millisecond) + tick := time.Tick(1 * time.Nanosecond) + + for { + select { + case <-timeout: + t.Fatal("Test timed out") + case <-tick: + stats := worker.GetStatistic() + if stats.JobsCompleted >= 2 { + goto TEST_SUCCESS + } + } + } + +TEST_SUCCESS: + // Stop the worker + if err := worker.Stop(); err != nil { + t.Fatalf("failed to stop worker: %v", err) + } +}