Select Git revision
runnable-shell.go
worker_test.go 5.78 KiB
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
"context"
assert "github.com/stretchr/testify/require"
"testing"
"time"
)
type DummyJob struct {
id JobID
sleepTime time.Duration
}
func (j DummyJob) GetID() JobID {
return j.id
}
func (j DummyJob) Pause() {
}
func (j DummyJob) PauseUntil(until time.Time) {
}
func (j DummyJob) Resume() {
}
func (j DummyJob) IsPaused() bool {
return false
}
func (j DummyJob) ResetStats() {
}
func (j DummyJob) GetStats() JobStats {
return JobStats{}
}
func (j DummyJob) GetMaxRetries() uint {
return 0
}
func (j DummyJob) GetRetryDelay() time.Duration {
return 0
}
func (j DummyJob) GetTimeout() time.Duration {
return 0
}
func (j DummyJob) Execute(_ context.Context) (RunGenericResult, error) {
if j.sleepTime > 0 {
time.Sleep(j.sleepTime)
}
return nil, nil
}
func (j DummyJob) Cancel() error {
return nil
}
func (j DummyJob) GetDependencies() []JobID {
return []JobID{}
}
func (j DummyJob) GetPersistence() JobPersistence {
return JobPersistence{}
}
func (j DummyJob) GetPriority() Priority {
return PriorityDefault
}
func (j DummyJob) SetScheduler(scheduler Scheduler) {
return
}
func (j DummyJob) GetScheduler() Scheduler {
return nil
}
func TestAssignJob(t *testing.T) {
worker := NewLocalWorker(1)
err := worker.Start()
if err != nil {
t.Errorf("Start() returned error: %v", err)
}
job := DummyJob{id: JobID("1")}
// Test assigning a job
err = worker.AssignJob(job)
if err != nil {
t.Errorf("AssignJob() returned error: %v", err)
}
// Test maxJobs limit
job2 := DummyJob{id: JobID("2")}
err = worker.AssignJob(job2)
if err != ErrMaxJobsReached {
t.Errorf("AssignJob() should return ErrMaxJobsReached, got: %v", err)
}
err = worker.Stop()
if err != nil {
t.Errorf("Stop() returned error: %v", err)
}
}
func TestWorkerLifeCycle(t *testing.T) {
worker := NewLocalWorker(1)
// Test initial status
if worker.Status() != WorkerStatusStopped {
t.Errorf("Initial worker status should be WorkerStatusStopped")
}
// Test start
_ = worker.Start()
if worker.Status() != WorkerStatusRunning {
t.Errorf("Worker status should be WorkerStatusRunning after Start()")
}
// Test job assignment
job := DummyJob{id: JobID("1")}
err := worker.AssignJob(job)
if err != nil {
t.Errorf("AssignJob() returned error: %v", err)
}
// Test job cancellation
//worker.CancelJob(JobID("1"))
// Test stop
_ = worker.Stop()
if worker.Status() != WorkerStatusStopped {
t.Errorf("Worker status should be WorkerStatusStopped after Stop()")
}
}
func TestWorkerLifeCycle2(t *testing.T) {
worker := NewLocalWorker(2)
if worker.Status() != WorkerStatusStopped {
t.Errorf("Newly created worker should be in Stopped state")
}
// Start the worker
err := worker.Start()
assert.NoError(t, err)
if worker.Status() != WorkerStatusRunning {
t.Errorf("Worker should be in Running state after Start()")
}
// Assign jobs
job1 := DummyJob{id: "job1"}
err = worker.AssignJob(job1)
if err != nil {
t.Errorf("Failed to assign job1: %v", err)
}
job2 := DummyJob{id: "job2"}
err = worker.AssignJob(job2)
if err != nil {
t.Errorf("Failed to assign job2: %v", err)
}
// Check maxJobs limit
job3 := DummyJob{id: "job3"}
err = worker.AssignJob(job3)
if err != ErrMaxJobsReached {
t.Errorf("Expected ErrMaxJobsReached, got: %v", err)
}
// Stop the worker
err = worker.Stop()
assert.NoError(t, err)
if worker.Status() != WorkerStatusStopped {
t.Errorf("Worker should be in Stopped state after Stop()")
}
// Make sure we can't assign jobs when worker is stopped
err = worker.AssignJob(job1)
if err != ErrWorkerNotRunning {
t.Errorf("Expected ErrWorkerNotRunning, got: %v", err)
}
// Check if jobs are cancellable
err = worker.Start()
assert.NoError(t, err)
err = worker.AssignJob(job1)
if err != nil {
t.Errorf("Failed to assign job1: %v", err)
}
//worker.CancelJob("job1")
// Check if Stop() actually stops the jobs
err = worker.AssignJob(DummyJob{id: "longJob"})
assert.NoError(t, err)
err = worker.Stop()
assert.NoError(t, err)
if worker.Status() != WorkerStatusStopped {
t.Errorf("Worker should be in Stopped state after Stop()")
}
// Wait for some time to make sure jobs are actually stopped
time.Sleep(1 * time.Second)
if worker.Status() != WorkerStatusStopped {
t.Errorf("Worker should remain in Stopped state")
}
}
func TestCancelJob(t *testing.T) {
worker := NewLocalWorker(1)
err := worker.Start()
if err != nil {
t.Errorf("Start() returned error: %v", err)
}
job := DummyJob{id: JobID("1")}
// Zuweisung eines Jobs
err = worker.AssignJob(job)
if err != nil {
t.Errorf("AssignJob() returned error: %v", err)
}
err = worker.Stop()
if err != nil {
t.Errorf("Stop() returned error: %v", err)
}
}
func TestLocalWorker_Statistics(t *testing.T) {
maxJobs := 3
worker := NewLocalWorker(maxJobs)
// Start the worker
if err := worker.Start(); err != nil {
t.Fatalf("failed to start worker: %v", err)
}
// Assign a job with a longer execution time
mockJob1 := DummyJob{id: JobID("1"), sleepTime: 30 * time.Millisecond}
if err := worker.AssignJob(mockJob1); err != nil {
t.Fatalf("failed to assign job: %v", err)
}
// Assign a job with a longer execution time
mockJob2 := DummyJob{id: JobID("2"), sleepTime: 10 * time.Millisecond}
if err := worker.AssignJob(mockJob2); err != nil {
t.Fatalf("failed to assign job: %v", err)
}
// Loop to check statistics every 10ms
timeout := time.After(100 * time.Millisecond)
tick := time.Tick(1 * time.Nanosecond)
for {
select {
case <-timeout:
t.Fatal("Test timed out")
case <-tick:
stats := worker.GetStatistic()
if stats.JobsCompleted >= 2 {
goto TEST_SUCCESS
}
}
}
TEST_SUCCESS:
// Stop the worker
if err := worker.Stop(); err != nil {
t.Fatalf("failed to stop worker: %v", err)
}
}