Skip to content
Snippets Groups Projects
Verified Commit 0d2bc9c9 authored by Volker Schukai's avatar Volker Schukai :alien:
Browse files

feat: worker stats

parent 1a93bb54
No related branches found
No related tags found
No related merge requests found
...@@ -20,6 +20,8 @@ tasks: ...@@ -20,6 +20,8 @@ tasks:
env: env:
TEST_BY_TASK: true TEST_BY_TASK: true
cmds: cmds:
- docker pull atmoz/sftp:alpine
- docker pull axllent/mailpit
- echo "Execute unit tests in Go." - echo "Execute unit tests in Go."
- gosec . - gosec .
- go test -tags=runOnTask -cover -v ./... - go test -tags=runOnTask -cover -v ./...
......
...@@ -336,6 +336,8 @@ tasks: ...@@ -336,6 +336,8 @@ tasks:
env: env:
TEST_BY_TASK: true TEST_BY_TASK: true
cmds: cmds:
- docker pull atmoz/sftp:alpine
- docker pull axllent/mailpit
- echo "Execute unit tests in Go." - echo "Execute unit tests in Go."
- gosec . - gosec .
- go test -tags=runOnTask -cover -v ./... - go test -tags=runOnTask -cover -v ./...
......
...@@ -145,8 +145,8 @@ func TestProcessedJobs(t *testing.T) { ...@@ -145,8 +145,8 @@ func TestProcessedJobs(t *testing.T) {
t.Fatalf("Dequeue failed: %v", err) t.Fatalf("Dequeue failed: %v", err)
} }
for _, jobInfo := range q.processedJobs { for _, jobInfo := range q.readyQueue {
if jobInfo.ID == job1.GetID() { if jobInfo.GetID() == job1.GetID() {
t.Fatalf("Job 1 should not be in processedJobs") t.Fatalf("Job 1 should not be in processedJobs")
} }
} }
......
...@@ -2,6 +2,7 @@ package jobqueue ...@@ -2,6 +2,7 @@ package jobqueue
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"github.com/google/uuid" "github.com/google/uuid"
"sync" "sync"
...@@ -33,6 +34,29 @@ type Worker interface { ...@@ -33,6 +34,29 @@ type Worker interface {
SetManager(manager *Manager) SetManager(manager *Manager)
} }
type Statistic struct {
TotalThreads int
ActiveThreads int
JobsAssigned int
JobsCompleted int
FailedJobs int
TotalExecutionTime time.Duration
}
func (s *Statistic) AverageExecutionTime() time.Duration {
if s.JobsCompleted == 0 {
return 0
}
return s.TotalExecutionTime / time.Duration(s.JobsCompleted)
}
func (s *Statistic) UtilizationRate() float64 {
if s.TotalThreads == 0 {
return 0
}
return float64(s.ActiveThreads) / float64(s.TotalThreads) * 100
}
// GenericWorker is a generic worker // GenericWorker is a generic worker
type GenericWorker struct { type GenericWorker struct {
ID WorkerID ID WorkerID
...@@ -47,8 +71,10 @@ type LocalWorker struct { ...@@ -47,8 +71,10 @@ type LocalWorker struct {
cancelChans []chan bool cancelChans []chan bool
maxJobs int maxJobs int
mu sync.Mutex mu sync.Mutex
statisticMu sync.Mutex
wg sync.WaitGroup wg sync.WaitGroup
manager *Manager manager *Manager
statistic Statistic
} }
// GetID returns the ID of the worker // GetID returns the ID of the worker
...@@ -58,7 +84,7 @@ func (w *GenericWorker) GetID() WorkerID { ...@@ -58,7 +84,7 @@ func (w *GenericWorker) GetID() WorkerID {
// NewLocalWorker creates a new local worker // NewLocalWorker creates a new local worker
func NewLocalWorker(maxJobs int) *LocalWorker { func NewLocalWorker(maxJobs int) *LocalWorker {
w := &LocalWorker{maxJobs: maxJobs} w := &LocalWorker{maxJobs: maxJobs, statistic: Statistic{TotalThreads: maxJobs}}
w.jobChannels = make([]chan GenericJob, maxJobs) w.jobChannels = make([]chan GenericJob, maxJobs)
w.stopChans = make([]chan bool, maxJobs) w.stopChans = make([]chan bool, maxJobs)
w.cancelChans = make([]chan bool, maxJobs) w.cancelChans = make([]chan bool, maxJobs)
...@@ -94,6 +120,27 @@ func (w *LocalWorker) Start() error { ...@@ -94,6 +120,27 @@ func (w *LocalWorker) Start() error {
return nil return nil
} }
// UpdateStatisticExtended updates the worker's statistics with job execution details
func (w *LocalWorker) UpdateStatisticExtended(jobDuration time.Duration, jobFailed bool) {
w.statisticMu.Lock()
defer w.statisticMu.Unlock()
if jobFailed {
w.statistic.FailedJobs++
} else {
w.statistic.TotalExecutionTime += jobDuration
w.statistic.JobsCompleted++
}
}
// GetStatistic returns the current statistics of the worker
func (w *LocalWorker) GetStatistic() Statistic {
w.mu.Lock()
defer w.mu.Unlock()
return w.statistic
}
func (w *LocalWorker) SetManager(manager *Manager) { func (w *LocalWorker) SetManager(manager *Manager) {
w.mu.Lock() w.mu.Lock()
defer w.mu.Unlock() defer w.mu.Unlock()
...@@ -136,10 +183,18 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel ...@@ -136,10 +183,18 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel
select { select {
case job := <-jobChannel: case job := <-jobChannel:
w.statisticMu.Lock()
w.statistic.JobsAssigned++
w.statistic.ActiveThreads++
w.statisticMu.Unlock()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
retries := job.GetMaxRetries() retries := job.GetMaxRetries()
retryDelay := job.GetRetryDelay() retryDelay := job.GetRetryDelay()
startTime := time.Now()
if retries == 0 { if retries == 0 {
retries = 1 retries = 1
} }
...@@ -159,12 +214,18 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel ...@@ -159,12 +214,18 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel
} }
_, err = job.Execute(ctx) _, err = job.Execute(ctx)
jobFailed := false
if err != nil {
jobFailed = true
}
w.UpdateStatisticExtended(time.Since(startTime), jobFailed)
if cancel != nil { if cancel != nil {
cancel() cancel()
} }
if err == nil || ctx.Err() == context.Canceled { if err == nil || errors.Is(ctx.Err(), context.Canceled) {
break break
} }
...@@ -186,6 +247,10 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel ...@@ -186,6 +247,10 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel
} }
} }
w.statisticMu.Lock()
w.statistic.ActiveThreads--
w.statisticMu.Unlock()
case <-stopChan: case <-stopChan:
stopFlag = true stopFlag = true
break break
......
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
type DummyJob struct { type DummyJob struct {
id JobID id JobID
sleepTime time.Duration
} }
func (j DummyJob) GetID() JobID { func (j DummyJob) GetID() JobID {
...@@ -48,7 +49,9 @@ func (j DummyJob) GetTimeout() time.Duration { ...@@ -48,7 +49,9 @@ func (j DummyJob) GetTimeout() time.Duration {
} }
func (j DummyJob) Execute(_ context.Context) (RunGenericResult, error) { func (j DummyJob) Execute(_ context.Context) (RunGenericResult, error) {
time.Sleep(100 * time.Millisecond) if j.sleepTime > 0 {
time.Sleep(j.sleepTime)
}
return nil, nil return nil, nil
} }
...@@ -231,3 +234,46 @@ func TestCancelJob(t *testing.T) { ...@@ -231,3 +234,46 @@ func TestCancelJob(t *testing.T) {
t.Errorf("Stop() returned error: %v", err) t.Errorf("Stop() returned error: %v", err)
} }
} }
func TestLocalWorker_Statistics(t *testing.T) {
maxJobs := 3
worker := NewLocalWorker(maxJobs)
// Start the worker
if err := worker.Start(); err != nil {
t.Fatalf("failed to start worker: %v", err)
}
// Assign a job with a longer execution time
mockJob1 := DummyJob{id: JobID("1"), sleepTime: 30 * time.Millisecond}
if err := worker.AssignJob(mockJob1); err != nil {
t.Fatalf("failed to assign job: %v", err)
}
// Assign a job with a longer execution time
mockJob2 := DummyJob{id: JobID("2"), sleepTime: 10 * time.Millisecond}
if err := worker.AssignJob(mockJob2); err != nil {
t.Fatalf("failed to assign job: %v", err)
}
// Loop to check statistics every 10ms
timeout := time.After(100 * time.Millisecond)
tick := time.Tick(1 * time.Nanosecond)
for {
select {
case <-timeout:
t.Fatal("Test timed out")
case <-tick:
stats := worker.GetStatistic()
if stats.JobsCompleted >= 2 {
goto TEST_SUCCESS
}
}
}
TEST_SUCCESS:
// Stop the worker
if err := worker.Stop(); err != nil {
t.Fatalf("failed to stop worker: %v", err)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment