// Copyright 2023 schukai GmbH // SPDX-License-Identifier: AGPL-3.0 package jobqueue import ( "encoding/json" "github.com/fsnotify/fsnotify" "github.com/robfig/cron/v3" "github.com/stretchr/testify/assert" "gopkg.in/yaml.v3" "os" "sync/atomic" "testing" "time" ) func TestIntervalScheduler_BasicFunctionality(t *testing.T) { var count int32 eventBus := NewEventBus() scheduler := IntervalScheduler{Interval: time.Millisecond * 100} job := NewJob[DummyResult]("test-job", &DummyRunnable{}) genericJob := GenericJob(job) _ = scheduler.Schedule(genericJob, eventBus) jobChannel := make(chan interface{}) go func() { for range jobChannel { atomic.AddInt32(&count, 1) } }() eventBus.Subscribe(QueueJob, jobChannel) time.Sleep(time.Millisecond * 500) if atomic.LoadInt32(&count) < 4 { t.Errorf("Expected to run at least four times, ran %d times", count) } } func TestIntervalScheduler_StopTicker(t *testing.T) { var count int32 eventBus := NewEventBus() scheduler := IntervalScheduler{Interval: time.Millisecond * 100} job := NewJob[DummyResult]("test-job", &DummyRunnable{}) genericJob := GenericJob(job) err := scheduler.Schedule(genericJob, eventBus) assert.Nil(t, err) jobChannel := make(chan interface{}) go func() { for range jobChannel { atomic.AddInt32(&count, 1) } }() eventBus.Subscribe(QueueJob, jobChannel) time.Sleep(time.Millisecond * 150) scheduler.Cancel(job.GetID()) time.Sleep(time.Millisecond * 100) if atomic.LoadInt32(&count) != 1 { t.Errorf("Expected to run 1 time, ran %d times", count) } } func TestIntervalScheduler_InvalidInterval(t *testing.T) { eventBus := NewEventBus() scheduler := IntervalScheduler{Interval: time.Millisecond * 0} job := NewJob[DummyResult]("test-job", &DummyRunnable{}) genericJob := GenericJob(job) err := scheduler.Schedule(genericJob, eventBus) if err == nil { t.Errorf("Expected an error due to invalid interval") } } func TestCronScheduler_BasicFunctionality(t *testing.T) { var count int32 eventBus := NewEventBus() job := NewJob[DummyResult]("test-job", &DummyRunnable{}) c := cron.New(cron.WithSeconds()) genericJob := GenericJob(job) cronScheduler := CronScheduler{ cron: c, Spec: "* * * * * *", } _ = cronScheduler.Schedule(genericJob, eventBus) c.Start() defer c.Stop() jobChannel := make(chan interface{}) go func() { for range jobChannel { atomic.AddInt32(&count, 1) } }() eventBus.Subscribe(QueueJob, jobChannel) time.Sleep(time.Second * 3) if atomic.LoadInt32(&count) < 2 { t.Errorf("Expected to run at least 2 times, ran %d times", count) } } func TestCronScheduler_StopScheduler(t *testing.T) { var count int32 eventBus := NewEventBus() c := cron.New(cron.WithSeconds()) cronScheduler := CronScheduler{cron: c, Spec: "* * * * * *"} job := NewJob[DummyResult]("test-job", &DummyRunnable{}) genericJob := GenericJob(job) _ = cronScheduler.Schedule(genericJob, eventBus) c.Start() defer c.Stop() jobChannel := make(chan interface{}) go func() { for range jobChannel { atomic.AddInt32(&count, 1) } }() eventBus.Subscribe(QueueJob, jobChannel) time.Sleep(time.Second * 2) cronScheduler.Cancel(job.GetID()) time.Sleep(time.Second) if atomic.LoadInt32(&count) < 1 { t.Errorf("Expected to run at least 1 time, ran %d times", count) } } func TestDelayScheduler_BasicFunctionality(t *testing.T) { var count int32 eventBus := NewEventBus() delayScheduler := DelayScheduler{Delay: time.Millisecond * 100} job := NewJob[DummyResult]("test-job", &DummyRunnable{}) genericJob := GenericJob(job) _ = delayScheduler.Schedule(genericJob, eventBus) jobChannel := make(chan interface{}) go func() { for range jobChannel { atomic.AddInt32(&count, 1) } }() eventBus.Subscribe(QueueJob, jobChannel) time.Sleep(time.Millisecond * 200) if atomic.LoadInt32(&count) != 1 { t.Errorf("Expected to run one time, ran %d times", count) } } func TestDelayScheduler_StopBeforeExecute(t *testing.T) { var count int32 eventBus := NewEventBus() delayScheduler := DelayScheduler{Delay: time.Millisecond * 100} job := NewJob[DummyResult]("test-job", &DummyRunnable{}) genericJob := GenericJob(job) _ = delayScheduler.Schedule(genericJob, eventBus) jobChannel := make(chan interface{}) go func() { for range jobChannel { atomic.AddInt32(&count, 1) } }() eventBus.Subscribe(QueueJob, jobChannel) time.Sleep(time.Millisecond * 50) delayScheduler.Cancel(job.GetID()) time.Sleep(time.Millisecond * 100) if atomic.LoadInt32(&count) != 0 { t.Errorf("Expected to not run, ran %d times", count) } } func TestInstantScheduler_BasicFunctionality(t *testing.T) { var count int32 eventBus := NewEventBus() instantScheduler := InstantScheduler{} job := NewJob[DummyResult]("test-job", &DummyRunnable{}) jobChannel := make(chan interface{}) go func() { for range jobChannel { atomic.AddInt32(&count, 1) } }() eventBus.Subscribe(QueueJob, jobChannel) time.Sleep(time.Millisecond * 100) genericJob := GenericJob(job) _ = instantScheduler.Schedule(genericJob, eventBus) time.Sleep(time.Millisecond * 100) if atomic.LoadInt32(&count) != 1 { t.Errorf("Expected to run 1 time, ran %d times", count) } } func TestEventScheduler_BasicFunctionality(t *testing.T) { var count int32 eventBus := NewEventBus() eventScheduler := EventScheduler{Event: "trigger-event"} job := NewJob[DummyResult]("test-job", &DummyRunnable{}) genericJob := GenericJob(job) _ = eventScheduler.Schedule(genericJob, eventBus) jobChannel := make(chan interface{}) eventBus.Subscribe(QueueJob, jobChannel) go func() { for range jobChannel { atomic.AddInt32(&count, 1) } }() // Trigger the event eventBus.Publish("trigger-event", Event{Data: nil, Name: "trigger-event"}) time.Sleep(time.Millisecond * 50) // Allow some time for the event to propagate if atomic.LoadInt32(&count) != 1 { t.Errorf("Expected to run one time, ran %d times", count) } } func TestTimeScheduler_BasicFunctionality(t *testing.T) { var count int32 eventBus := NewEventBus() timeScheduler := TimeScheduler{Time: time.Now().Add(time.Second * 1)} job := NewJob[DummyResult]("test-job", &DummyRunnable{}) genericJob := GenericJob(job) _ = timeScheduler.Schedule(genericJob, eventBus) jobChannel := make(chan interface{}) go func() { for range jobChannel { atomic.AddInt32(&count, 1) } }() eventBus.Subscribe(QueueJob, jobChannel) time.Sleep(time.Second * 2) timeScheduler.Cancel(job.GetID()) time.Sleep(time.Millisecond * 100) if atomic.LoadInt32(&count) != 1 { t.Errorf("Expected to not run, ran %d times", count) } } // TestSchedulerPersistenceUnmarshalJSON testet die Unmarshalling-Funktion mit verschiedenen Zeitformaten. func TestSchedulerPersistenceUnmarshalJSON(t *testing.T) { testCases := []struct { name string jsonInput string expected *time.Time wantErr bool }{ { name: "RFC3339 Format", jsonInput: `{"type":"Time","time":"2023-11-15T09:01:00Z"}`, expected: parseTimeForTesting(t, "2023-11-15T09:01:00Z"), wantErr: false, }, { name: "Date and Time", jsonInput: `{"type":"Time","time":"2023-11-15T09:01"}`, expected: parseTimeForTesting(t, "2023-11-15T09:01"), wantErr: false, }, { name: "Date and Time with Space", jsonInput: `{"type":"Time","time":"2023-11-15 09:01"}`, expected: parseTimeForTesting(t, "2023-11-15T09:01"), wantErr: false, }, { name: "Date and Time with Space and Seconds", jsonInput: `{"type":"Time","time":"2023-11-15 09:01:02"}`, expected: parseTimeForTesting(t, "2023-11-15T09:01:02"), wantErr: false, }, { name: "Only Date", jsonInput: `{"type":"Time","time":"2023-11-15"}`, expected: parseTimeForTesting(t, "2023-11-15"), wantErr: false, }, { name: "Invalid Format", jsonInput: `{"type":"Time","time":"15. November 2023"}`, expected: nil, wantErr: true, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { var sp SchedulerPersistence err := json.Unmarshal([]byte(tc.jsonInput), &sp) if tc.wantErr { assert.Error(t, err) } else { assert.NoError(t, err) assert.Equal(t, tc.expected, sp.Time) } }) } } func parseTimeForTesting(t *testing.T, value string) *time.Time { for _, format := range SupportedTimeFormats { parsedTime, err := time.Parse(format, value) if err == nil { return &parsedTime } } t.Fatalf("Failed to parse time '%s' in any known format", value) return nil } func TestInotifyScheduler_BasicFunctionality(t *testing.T) { var count int32 eventBus := NewEventBus() tmpPath := t.TempDir() inotifyScheduler := InotifyScheduler{Path: tmpPath, EventFlags: fsnotify.Create | fsnotify.Write | fsnotify.Remove} job := NewJob[DummyResult]("test-job", &DummyRunnable{}) genericJob := GenericJob(job) _ = inotifyScheduler.Schedule(genericJob, eventBus) jobChannel := make(chan interface{}) go func() { for range jobChannel { atomic.AddInt32(&count, 1) } }() eventBus.Subscribe(QueueJob, jobChannel) time.Sleep(time.Millisecond * 100) tmpFile := tmpPath + "/test.txt" _, err := os.Create(tmpFile) assert.NoError(t, err) time.Sleep(time.Millisecond * 100) err = os.Remove(tmpFile) assert.NoError(t, err) time.Sleep(time.Millisecond * 100) if atomic.LoadInt32(&count) != 2 { t.Errorf("Expected to run 2 times, ran %d times", count) } } func TestUnmarshalSchedulerPersistenceYAML(t *testing.T) { // Test unmarshalling of a YAML string into a SchedulerPersistence struct yamlData := ` type: interval interval: 1m time: "2023-12-15T12:00:00Z" ` var sp SchedulerPersistence err := yaml.Unmarshal([]byte(yamlData), &sp) assert.Nil(t, err, "Unmarshalling should not produce an error") expectedInterval, _ := time.ParseDuration("1m") expectedTime, _ := time.Parse(time.RFC3339, "2023-12-15T12:00:00Z") assert.Equal(t, "interval", sp.Type, "Type should be unmarshalled correctly") assert.Equal(t, &expectedInterval, sp.Interval, "Interval should be unmarshalled correctly") assert.Equal(t, &expectedTime, sp.Time, "Time should be unmarshalled correctly") } func TestUnmarshalSchedulerPersistenceIntervalYAML(t *testing.T) { yamlData := ` type: Interval interval: "1m1s" ` var sp SchedulerPersistence err := yaml.Unmarshal([]byte(yamlData), &sp) assert.Nil(t, err, "Unmarshalling should not produce an error") expectedInterval, _ := time.ParseDuration("1m1s") assert.Equal(t, "Interval", sp.Type, "Type should be unmarshalled correctly") assert.Equal(t, &expectedInterval, sp.Interval, "Interval should be unmarshalled correctly") }