// Copyright 2023 schukai GmbH // SPDX-License-Identifier: AGPL-3.0 package jobqueue import ( "context" "github.com/stretchr/testify/assert" "testing" "time" ) type MockWorker struct { id WorkerID status WorkerStatus } func (m *MockWorker) Start() error { m.status = WorkerStatusRunning return nil } func (m *MockWorker) Stop() error { m.status = WorkerStatusStopped return nil } func (m *MockWorker) GetID() WorkerID { return m.id } func (m *MockWorker) Status() WorkerStatus { return m.status } func (m *MockWorker) SetManager(manager *Manager) { return } func (m *MockWorker) AssignJob(job GenericJob) error { return nil } type MockScheduler struct{} func (s *MockScheduler) Schedule(job *GenericJob, eventBus *EventBus, stopChan StopChan) error { return nil } func (s *MockScheduler) Cancel(jobID JobID) error { return nil } func (s *MockScheduler) GetNextRunTime(jobID JobID) time.Time { return time.Now() } type MockGenericJob struct { ID JobID Scheduler Scheduler } func (m *MockGenericJob) ResetStats() { } func (m *MockGenericJob) GetStats() JobStats { return JobStats{} } func (m *MockGenericJob) GetMaxRetries() uint { return 0 } func (m *MockGenericJob) GetRetryDelay() *time.Duration { dur := time.Duration(0) return &dur } func (m *MockGenericJob) GetTimeout() *time.Duration { dur := time.Duration(0) return &dur } func (m *MockGenericJob) GetID() JobID { return m.ID } func (m *MockGenericJob) GetDependencies() []JobID { return nil } func (m *MockGenericJob) GetDependentJobs() []JobID { return nil } func (m *MockGenericJob) GetPriority() Priority { return PriorityDefault } func (m *MockGenericJob) Execute(ctx context.Context) (RunGenericResult, error) { return nil, nil } func (m *MockGenericJob) Cancel() error { return nil } func (m *MockGenericJob) Pause() { } func (m *MockGenericJob) PauseUntil(until time.Time) { } func (m *MockGenericJob) Resume() { } func (m *MockGenericJob) IsPaused() bool { return false } func (m *MockGenericJob) GetPersistence() JobPersistence { return JobPersistence{} } func (m *MockGenericJob) SetScheduler(scheduler Scheduler) { m.Scheduler = scheduler return } func (m *MockGenericJob) GetScheduler() Scheduler { return m.Scheduler } func TestNewManager(t *testing.T) { manager := NewManager() eventBus := manager.eventBus assert.NotNil(t, manager) assert.Equal(t, ManagerState(ManagerStateStopped), manager.state) assert.NotNil(t, manager.queue) assert.NotNil(t, manager.workerMap) assert.NotNil(t, manager.eventBus) assert.Equal(t, eventBus, manager.eventBus) } func TestManager_AddWorker(t *testing.T) { m := NewManager() w := &MockWorker{id: "worker1", status: WorkerStatusStopped} err := m.AddWorker(w) assert.Nil(t, err) assert.Equal(t, int(ManagerStateStopped), int(m.state)) } func TestManager_RemoveWorker(t *testing.T) { var err error m := NewManager() w := &MockWorker{id: "worker1", status: WorkerStatusStopped} err = m.AddWorker(w) assert.Nil(t, err) err = m.RemoveWorker(w) assert.Nil(t, err) assert.Equal(t, int(ManagerStateStopped), int(m.state)) } func TestManager_Start(t *testing.T) { var err error m := NewManager() w := &MockWorker{id: "worker1", status: WorkerStatusStopped} err = m.AddWorker(w) assert.Nil(t, err) err = m.Start() assert.Nil(t, err) assert.Nil(t, err) assert.Equal(t, int(ManagerStateRunning), int(m.state)) } func TestManager_Stop(t *testing.T) { var err error m := NewManager() w := &MockWorker{id: "worker1", status: WorkerStatusStopped} err = m.AddWorker(w) assert.Nil(t, err) err = m.Start() assert.Nil(t, err) err = m.Stop() assert.Nil(t, err) assert.Equal(t, int(ManagerStateStopped), int(m.state)) } func TestManager_ScheduleJob(t *testing.T) { var err error m := NewManager() w := &MockWorker{id: "worker1", status: WorkerStatusStopped} err = m.AddWorker(w) assert.Nil(t, err) err = m.Start() assert.Nil(t, err) job := &MockGenericJob{ID: "job1"} scheduler := InstantScheduler{} err = m.ScheduleJob(job, &scheduler) assert.Nil(t, err) } func TestManager_CancelJob(t *testing.T) { var err error m := NewManager() w := &MockWorker{id: "worker1", status: WorkerStatusStopped} err = m.AddWorker(w) assert.Nil(t, err) err = m.Start() assert.Nil(t, err) job := &MockGenericJob{ID: "job1"} scheduler := EventScheduler{} err = m.ScheduleJob(job, &scheduler) assert.Nil(t, err) time.Sleep(1 * time.Second) err = m.CancelJobSchedule(job.GetID()) assert.Nil(t, err) } func TestManagerEventHandling(t *testing.T) { mgr := NewManager() worker := NewLocalWorker(10) err := mgr.AddWorker(worker) assert.Nil(t, err) err = mgr.Start() assert.Nil(t, err) runner := &CounterRunnable{} job := NewJob[CounterResult]("job1", runner) scheduler := &IntervalScheduler{Interval: 2 * time.Millisecond} err = mgr.ScheduleJob(job, scheduler) assert.Nil(t, err) startTime := time.Now() for { currentCount := job.runner.(*CounterRunnable).GetCount() if currentCount > 10 { break } time.Sleep(2 * time.Millisecond) if time.Since(startTime) > 100*time.Millisecond { t.Fatalf("Job did not finish in time") } } time.Sleep(2 * time.Millisecond) err = mgr.Stop() assert.Nil(t, err) }