Skip to content
Snippets Groups Projects
manager_test.go 5.14 KiB
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0

package jobqueue

import (
	"context"
	"github.com/stretchr/testify/assert"
	"testing"
	"time"
)

type MockWorker struct {
	id     WorkerID
	status WorkerStatus
}

func (m *MockWorker) Start() error {
	m.status = WorkerStatusRunning
	return nil
}

func (m *MockWorker) Stop() error {
	m.status = WorkerStatusStopped
	return nil
}

func (m *MockWorker) GetID() WorkerID {
	return m.id
}

func (m *MockWorker) Status() WorkerStatus {
	return m.status
}

func (m *MockWorker) SetManager(manager *Manager) {
	return
}

func (m *MockWorker) AssignJob(job GenericJob) error {
	return nil
}

type MockScheduler struct{}

func (s *MockScheduler) Schedule(job *GenericJob, eventBus *EventBus, stopChan StopChan) error {
	return nil
}

func (s *MockScheduler) Cancel(jobID JobID) error {
	return nil
}

func (s *MockScheduler) GetNextRunTime(jobID JobID) time.Time {
	return time.Now()
}

type MockGenericJob struct {
	ID        JobID
	Scheduler Scheduler
}

func (m *MockGenericJob) ResetStats() {

}

func (m *MockGenericJob) GetStats() JobStats {
	return JobStats{}
}
func (m *MockGenericJob) GetMaxRetries() uint {
	return 0
}

func (m *MockGenericJob) GetRetryDelay() *time.Duration {
	dur := time.Duration(0)
	return &dur
}

func (m *MockGenericJob) GetTimeout() *time.Duration {
	dur := time.Duration(0)
	return &dur
}

func (m *MockGenericJob) GetID() JobID {
	return m.ID
}

func (m *MockGenericJob) GetDependencies() []JobID {
	return nil
}

func (m *MockGenericJob) GetDependentJobs() []JobID {
	return nil
}

func (m *MockGenericJob) GetPriority() Priority {
	return PriorityDefault
}

func (m *MockGenericJob) Execute(ctx context.Context) (RunGenericResult, error) {
	return nil, nil
}

func (m *MockGenericJob) Cancel() error {
	return nil
}

func (m *MockGenericJob) Pause() {

}

func (m *MockGenericJob) PauseUntil(until time.Time) {

}

func (m *MockGenericJob) Resume() {

}

func (m *MockGenericJob) IsPaused() bool {
	return false
}

func (m *MockGenericJob) GetPersistence() JobPersistence {
	return JobPersistence{}
}

func (m *MockGenericJob) SetScheduler(scheduler Scheduler) {
	m.Scheduler = scheduler
	return
}

func (m *MockGenericJob) GetScheduler() Scheduler {
	return m.Scheduler
}

func TestNewManager(t *testing.T) {

	manager := NewManager()
	eventBus := manager.eventBus

	assert.NotNil(t, manager)
	assert.Equal(t, ManagerState(ManagerStateStopped), manager.state)
	assert.NotNil(t, manager.queue)
	assert.NotNil(t, manager.workerMap)
	assert.NotNil(t, manager.eventBus)
	assert.Equal(t, eventBus, manager.eventBus)
}

func TestManager_AddWorker(t *testing.T) {
	m := NewManager()
	w := &MockWorker{id: "worker1", status: WorkerStatusStopped}

	err := m.AddWorker(w)
	assert.Nil(t, err)
	assert.Equal(t, int(ManagerStateStopped), int(m.state))
}

func TestManager_RemoveWorker(t *testing.T) {
	var err error
	m := NewManager()
	w := &MockWorker{id: "worker1", status: WorkerStatusStopped}
	err = m.AddWorker(w)
	assert.Nil(t, err)
	err = m.RemoveWorker(w)
	assert.Nil(t, err)
	assert.Equal(t, int(ManagerStateStopped), int(m.state))
}

func TestManager_Start(t *testing.T) {
	var err error
	m := NewManager()
	w := &MockWorker{id: "worker1", status: WorkerStatusStopped}
	err = m.AddWorker(w)
	assert.Nil(t, err)

	err = m.Start()
	assert.Nil(t, err)
	assert.Nil(t, err)
	assert.Equal(t, int(ManagerStateRunning), int(m.state))
}

func TestManager_Stop(t *testing.T) {
	var err error
	m := NewManager()
	w := &MockWorker{id: "worker1", status: WorkerStatusStopped}
	err = m.AddWorker(w)
	assert.Nil(t, err)

	err = m.Start()
	assert.Nil(t, err)

	err = m.Stop()
	assert.Nil(t, err)
	assert.Equal(t, int(ManagerStateStopped), int(m.state))
}

func TestManager_ScheduleJob(t *testing.T) {
	var err error

	m := NewManager()
	w := &MockWorker{id: "worker1", status: WorkerStatusStopped}
	err = m.AddWorker(w)
	assert.Nil(t, err)

	err = m.Start()
	assert.Nil(t, err)

	job := &MockGenericJob{ID: "job1"}
	scheduler := InstantScheduler{}

	err = m.ScheduleJob(job, &scheduler)
	assert.Nil(t, err)
}

func TestManager_CancelJob(t *testing.T) {
	var err error

	m := NewManager()
	w := &MockWorker{id: "worker1", status: WorkerStatusStopped}
	err = m.AddWorker(w)
	assert.Nil(t, err)

	err = m.Start()
	assert.Nil(t, err)

	job := &MockGenericJob{ID: "job1"}
	scheduler := EventScheduler{}
	err = m.ScheduleJob(job, &scheduler)
	assert.Nil(t, err)

	time.Sleep(1 * time.Second)

	err = m.CancelJobSchedule(job.GetID())
	assert.Nil(t, err)
}

func TestManagerEventHandling(t *testing.T) {
	mgr := NewManager()
	worker := NewLocalWorker(10)
	err := mgr.AddWorker(worker)
	assert.Nil(t, err)

	err = mgr.Start()
	assert.Nil(t, err)

	runner := &CounterRunnable{}
	job := NewJob[CounterResult]("job1", runner)

	scheduler := &IntervalScheduler{Interval: 2 * time.Millisecond}
	err = mgr.ScheduleJob(job, scheduler)
	assert.Nil(t, err)

	startTime := time.Now()

	for {
		currentCount := job.runner.(*CounterRunnable).GetCount()
		if currentCount > 10 {
			break
		}

		time.Sleep(2 * time.Millisecond)

		if time.Since(startTime) > 100*time.Millisecond {
			t.Fatalf("Job did not finish in time")
		}
	}

	time.Sleep(2 * time.Millisecond)
	err = mgr.Stop()
	assert.Nil(t, err)
}