Something went wrong on our end
-
Volker Schukai authoredVolker Schukai authored
manager_test.go 5.14 KiB
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
"context"
"github.com/stretchr/testify/assert"
"testing"
"time"
)
type MockWorker struct {
id WorkerID
status WorkerStatus
}
func (m *MockWorker) Start() error {
m.status = WorkerStatusRunning
return nil
}
func (m *MockWorker) Stop() error {
m.status = WorkerStatusStopped
return nil
}
func (m *MockWorker) GetID() WorkerID {
return m.id
}
func (m *MockWorker) Status() WorkerStatus {
return m.status
}
func (m *MockWorker) SetManager(manager *Manager) {
return
}
func (m *MockWorker) AssignJob(job GenericJob) error {
return nil
}
type MockScheduler struct{}
func (s *MockScheduler) Schedule(job *GenericJob, eventBus *EventBus, stopChan StopChan) error {
return nil
}
func (s *MockScheduler) Cancel(jobID JobID) error {
return nil
}
func (s *MockScheduler) GetNextRunTime(jobID JobID) time.Time {
return time.Now()
}
type MockGenericJob struct {
ID JobID
Scheduler Scheduler
}
func (m *MockGenericJob) ResetStats() {
}
func (m *MockGenericJob) GetStats() JobStats {
return JobStats{}
}
func (m *MockGenericJob) GetMaxRetries() uint {
return 0
}
func (m *MockGenericJob) GetRetryDelay() *time.Duration {
dur := time.Duration(0)
return &dur
}
func (m *MockGenericJob) GetTimeout() *time.Duration {
dur := time.Duration(0)
return &dur
}
func (m *MockGenericJob) GetID() JobID {
return m.ID
}
func (m *MockGenericJob) GetDependencies() []JobID {
return nil
}
func (m *MockGenericJob) GetDependentJobs() []JobID {
return nil
}
func (m *MockGenericJob) GetPriority() Priority {
return PriorityDefault
}
func (m *MockGenericJob) Execute(ctx context.Context) (RunGenericResult, error) {
return nil, nil
}
func (m *MockGenericJob) Cancel() error {
return nil
}
func (m *MockGenericJob) Pause() {
}
func (m *MockGenericJob) PauseUntil(until time.Time) {
}
func (m *MockGenericJob) Resume() {
}
func (m *MockGenericJob) IsPaused() bool {
return false
}
func (m *MockGenericJob) GetPersistence() JobPersistence {
return JobPersistence{}
}
func (m *MockGenericJob) SetScheduler(scheduler Scheduler) {
m.Scheduler = scheduler
return
}
func (m *MockGenericJob) GetScheduler() Scheduler {
return m.Scheduler
}
func TestNewManager(t *testing.T) {
manager := NewManager()
eventBus := manager.eventBus
assert.NotNil(t, manager)
assert.Equal(t, ManagerState(ManagerStateStopped), manager.state)
assert.NotNil(t, manager.queue)
assert.NotNil(t, manager.workerMap)
assert.NotNil(t, manager.eventBus)
assert.Equal(t, eventBus, manager.eventBus)
}
func TestManager_AddWorker(t *testing.T) {
m := NewManager()
w := &MockWorker{id: "worker1", status: WorkerStatusStopped}
err := m.AddWorker(w)
assert.Nil(t, err)
assert.Equal(t, int(ManagerStateStopped), int(m.state))
}
func TestManager_RemoveWorker(t *testing.T) {
var err error
m := NewManager()
w := &MockWorker{id: "worker1", status: WorkerStatusStopped}
err = m.AddWorker(w)
assert.Nil(t, err)
err = m.RemoveWorker(w)
assert.Nil(t, err)
assert.Equal(t, int(ManagerStateStopped), int(m.state))
}
func TestManager_Start(t *testing.T) {
var err error
m := NewManager()
w := &MockWorker{id: "worker1", status: WorkerStatusStopped}
err = m.AddWorker(w)
assert.Nil(t, err)
err = m.Start()
assert.Nil(t, err)
assert.Nil(t, err)
assert.Equal(t, int(ManagerStateRunning), int(m.state))
}
func TestManager_Stop(t *testing.T) {
var err error
m := NewManager()
w := &MockWorker{id: "worker1", status: WorkerStatusStopped}
err = m.AddWorker(w)
assert.Nil(t, err)
err = m.Start()
assert.Nil(t, err)
err = m.Stop()
assert.Nil(t, err)
assert.Equal(t, int(ManagerStateStopped), int(m.state))
}
func TestManager_ScheduleJob(t *testing.T) {
var err error
m := NewManager()
w := &MockWorker{id: "worker1", status: WorkerStatusStopped}
err = m.AddWorker(w)
assert.Nil(t, err)
err = m.Start()
assert.Nil(t, err)
job := &MockGenericJob{ID: "job1"}
scheduler := InstantScheduler{}
err = m.ScheduleJob(job, &scheduler)
assert.Nil(t, err)
}
func TestManager_CancelJob(t *testing.T) {
var err error
m := NewManager()
w := &MockWorker{id: "worker1", status: WorkerStatusStopped}
err = m.AddWorker(w)
assert.Nil(t, err)
err = m.Start()
assert.Nil(t, err)
job := &MockGenericJob{ID: "job1"}
scheduler := EventScheduler{}
err = m.ScheduleJob(job, &scheduler)
assert.Nil(t, err)
time.Sleep(1 * time.Second)
err = m.CancelJobSchedule(job.GetID())
assert.Nil(t, err)
}
func TestManagerEventHandling(t *testing.T) {
mgr := NewManager()
worker := NewLocalWorker(10)
err := mgr.AddWorker(worker)
assert.Nil(t, err)
err = mgr.Start()
assert.Nil(t, err)
runner := &CounterRunnable{}
job := NewJob[CounterResult]("job1", runner)
scheduler := &IntervalScheduler{Interval: 2 * time.Millisecond}
err = mgr.ScheduleJob(job, scheduler)
assert.Nil(t, err)
startTime := time.Now()
for {
currentCount := job.runner.(*CounterRunnable).GetCount()
if currentCount > 10 {
break
}
time.Sleep(2 * time.Millisecond)
if time.Since(startTime) > 100*time.Millisecond {
t.Fatalf("Job did not finish in time")
}
}
time.Sleep(2 * time.Millisecond)
err = mgr.Stop()
assert.Nil(t, err)
}