Skip to content
Snippets Groups Projects
scheduler_test.go 10.47 KiB
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0

package jobqueue

import (
	"encoding/json"
	"github.com/fsnotify/fsnotify"
	"github.com/robfig/cron/v3"
	"github.com/stretchr/testify/assert"
	"gopkg.in/yaml.v3"
	"os"
	"sync/atomic"
	"testing"
	"time"
)

func TestIntervalScheduler_BasicFunctionality(t *testing.T) {
	var count int32
	eventBus := NewEventBus()
	scheduler := IntervalScheduler{Interval: time.Millisecond * 100}

	job := NewJob[DummyResult]("test-job", &DummyRunnable{})

	genericJob := GenericJob(job)
	_ = scheduler.Schedule(genericJob, eventBus)

	jobChannel := make(chan interface{})

	go func() {
		for range jobChannel {
			atomic.AddInt32(&count, 1)
		}
	}()

	eventBus.Subscribe(QueueJob, jobChannel)

	time.Sleep(time.Millisecond * 500)
	if atomic.LoadInt32(&count) < 4 {
		t.Errorf("Expected to run at least four times, ran %d times", count)
	}
}

func TestIntervalScheduler_StopTicker(t *testing.T) {
	var count int32
	eventBus := NewEventBus()
	scheduler := IntervalScheduler{Interval: time.Millisecond * 100}

	job := NewJob[DummyResult]("test-job", &DummyRunnable{})

	genericJob := GenericJob(job)
	err := scheduler.Schedule(genericJob, eventBus)
	assert.Nil(t, err)

	jobChannel := make(chan interface{})

	go func() {
		for range jobChannel {
			atomic.AddInt32(&count, 1)
		}
	}()

	eventBus.Subscribe(QueueJob, jobChannel)

	time.Sleep(time.Millisecond * 150)
	scheduler.Cancel(job.GetID())
	time.Sleep(time.Millisecond * 100)

	if atomic.LoadInt32(&count) != 1 {
		t.Errorf("Expected to run 1 time, ran %d times", count)
	}
}

func TestIntervalScheduler_InvalidInterval(t *testing.T) {
	eventBus := NewEventBus()
	scheduler := IntervalScheduler{Interval: time.Millisecond * 0}

	job := NewJob[DummyResult]("test-job", &DummyRunnable{})

	genericJob := GenericJob(job)
	err := scheduler.Schedule(genericJob, eventBus)
	if err == nil {
		t.Errorf("Expected an error due to invalid interval")
	}
}

func TestCronScheduler_BasicFunctionality(t *testing.T) {
	var count int32
	eventBus := NewEventBus()

	job := NewJob[DummyResult]("test-job", &DummyRunnable{})
	c := cron.New(cron.WithSeconds())

	genericJob := GenericJob(job)
	cronScheduler := CronScheduler{
		cron: c,
		Spec: "* * * * * *",
	}
	_ = cronScheduler.Schedule(genericJob, eventBus)
	c.Start()
	defer c.Stop()

	jobChannel := make(chan interface{})

	go func() {
		for range jobChannel {
			atomic.AddInt32(&count, 1)
		}
	}()

	eventBus.Subscribe(QueueJob, jobChannel)

	time.Sleep(time.Second * 3)
	if atomic.LoadInt32(&count) < 2 {
		t.Errorf("Expected to run at least 2 times, ran %d times", count)
	}
}

func TestCronScheduler_StopScheduler(t *testing.T) {
	var count int32
	eventBus := NewEventBus()

	c := cron.New(cron.WithSeconds())

	cronScheduler := CronScheduler{cron: c, Spec: "* * * * * *"}

	job := NewJob[DummyResult]("test-job", &DummyRunnable{})

	genericJob := GenericJob(job)
	_ = cronScheduler.Schedule(genericJob, eventBus)
	c.Start()
	defer c.Stop()

	jobChannel := make(chan interface{})

	go func() {
		for range jobChannel {
			atomic.AddInt32(&count, 1)
		}
	}()

	eventBus.Subscribe(QueueJob, jobChannel)

	time.Sleep(time.Second * 2)
	cronScheduler.Cancel(job.GetID())
	time.Sleep(time.Second)

	if atomic.LoadInt32(&count) < 1 {
		t.Errorf("Expected to run at least 1 time, ran %d times", count)
	}
}

func TestDelayScheduler_BasicFunctionality(t *testing.T) {
	var count int32
	eventBus := NewEventBus()
	delayScheduler := DelayScheduler{Delay: time.Millisecond * 100}

	job := NewJob[DummyResult]("test-job", &DummyRunnable{})

	genericJob := GenericJob(job)
	_ = delayScheduler.Schedule(genericJob, eventBus)

	jobChannel := make(chan interface{})

	go func() {
		for range jobChannel {

			atomic.AddInt32(&count, 1)

		}
	}()

	eventBus.Subscribe(QueueJob, jobChannel)

	time.Sleep(time.Millisecond * 200)

	if atomic.LoadInt32(&count) != 1 {
		t.Errorf("Expected to run one time, ran %d times", count)
	}
}

func TestDelayScheduler_StopBeforeExecute(t *testing.T) {
	var count int32
	eventBus := NewEventBus()
	delayScheduler := DelayScheduler{Delay: time.Millisecond * 100}

	job := NewJob[DummyResult]("test-job", &DummyRunnable{})

	genericJob := GenericJob(job)
	_ = delayScheduler.Schedule(genericJob, eventBus)

	jobChannel := make(chan interface{})

	go func() {
		for range jobChannel {

			atomic.AddInt32(&count, 1)

		}
	}()

	eventBus.Subscribe(QueueJob, jobChannel)

	time.Sleep(time.Millisecond * 50)
	delayScheduler.Cancel(job.GetID())
	time.Sleep(time.Millisecond * 100)

	if atomic.LoadInt32(&count) != 0 {
		t.Errorf("Expected to not run, ran %d times", count)
	}
}

func TestInstantScheduler_BasicFunctionality(t *testing.T) {
	var count int32
	eventBus := NewEventBus()
	instantScheduler := InstantScheduler{}

	job := NewJob[DummyResult]("test-job", &DummyRunnable{})

	jobChannel := make(chan interface{})

	go func() {
		for range jobChannel {
			atomic.AddInt32(&count, 1)
		}
	}()

	eventBus.Subscribe(QueueJob, jobChannel)

	time.Sleep(time.Millisecond * 100)

	genericJob := GenericJob(job)
	_ = instantScheduler.Schedule(genericJob, eventBus)

	time.Sleep(time.Millisecond * 100)

	if atomic.LoadInt32(&count) != 1 {
		t.Errorf("Expected to run 1 time, ran %d times", count)
	}

}

func TestEventScheduler_BasicFunctionality(t *testing.T) {
	var count int32
	eventBus := NewEventBus()
	eventScheduler := EventScheduler{Event: "trigger-event"}

	job := NewJob[DummyResult]("test-job", &DummyRunnable{})

	genericJob := GenericJob(job)
	_ = eventScheduler.Schedule(genericJob, eventBus)

	jobChannel := make(chan interface{})
	eventBus.Subscribe(QueueJob, jobChannel)

	go func() {
		for range jobChannel {
			atomic.AddInt32(&count, 1)
		}
	}()

	// Trigger the event
	eventBus.Publish("trigger-event", Event{Data: nil, Name: "trigger-event"})

	time.Sleep(time.Millisecond * 50) // Allow some time for the event to propagate

	if atomic.LoadInt32(&count) != 1 {
		t.Errorf("Expected to run one time, ran %d times", count)
	}
}

func TestTimeScheduler_BasicFunctionality(t *testing.T) {

	var count int32
	eventBus := NewEventBus()
	timeScheduler := TimeScheduler{Time: time.Now().Add(time.Second * 1)}

	job := NewJob[DummyResult]("test-job", &DummyRunnable{})

	genericJob := GenericJob(job)
	_ = timeScheduler.Schedule(genericJob, eventBus)

	jobChannel := make(chan interface{})

	go func() {
		for range jobChannel {
			atomic.AddInt32(&count, 1)
		}
	}()

	eventBus.Subscribe(QueueJob, jobChannel)

	time.Sleep(time.Second * 2)
	timeScheduler.Cancel(job.GetID())
	time.Sleep(time.Millisecond * 100)

	if atomic.LoadInt32(&count) != 1 {
		t.Errorf("Expected to not run, ran %d times", count)
	}

}

// TestSchedulerPersistenceUnmarshalJSON testet die Unmarshalling-Funktion mit verschiedenen Zeitformaten.
func TestSchedulerPersistenceUnmarshalJSON(t *testing.T) {
	testCases := []struct {
		name      string
		jsonInput string
		expected  *time.Time
		wantErr   bool
	}{
		{
			name:      "RFC3339 Format",
			jsonInput: `{"type":"Time","time":"2023-11-15T09:01:00Z"}`,
			expected:  parseTimeForTesting(t, "2023-11-15T09:01:00Z"),
			wantErr:   false,
		},
		{
			name:      "Date and Time",
			jsonInput: `{"type":"Time","time":"2023-11-15T09:01"}`,
			expected:  parseTimeForTesting(t, "2023-11-15T09:01"),
			wantErr:   false,
		},
		{
			name:      "Date and Time with Space",
			jsonInput: `{"type":"Time","time":"2023-11-15 09:01"}`,
			expected:  parseTimeForTesting(t, "2023-11-15T09:01"),
			wantErr:   false,
		},
		{
			name:      "Date and Time with Space and Seconds",
			jsonInput: `{"type":"Time","time":"2023-11-15 09:01:02"}`,
			expected:  parseTimeForTesting(t, "2023-11-15T09:01:02"),
			wantErr:   false,
		},
		{
			name:      "Only Date",
			jsonInput: `{"type":"Time","time":"2023-11-15"}`,
			expected:  parseTimeForTesting(t, "2023-11-15"),
			wantErr:   false,
		},
		{
			name:      "Invalid Format",
			jsonInput: `{"type":"Time","time":"15. November 2023"}`,
			expected:  nil,
			wantErr:   true,
		},
	}

	for _, tc := range testCases {
		t.Run(tc.name, func(t *testing.T) {
			var sp SchedulerPersistence
			err := json.Unmarshal([]byte(tc.jsonInput), &sp)

			if tc.wantErr {
				assert.Error(t, err)
			} else {
				assert.NoError(t, err)
				assert.Equal(t, tc.expected, sp.Time)
			}
		})
	}
}

func parseTimeForTesting(t *testing.T, value string) *time.Time {

	for _, format := range SupportedTimeFormats {
		parsedTime, err := time.Parse(format, value)
		if err == nil {
			return &parsedTime
		}
	}
	t.Fatalf("Failed to parse time '%s' in any known format", value)
	return nil
}

func TestInotifyScheduler_BasicFunctionality(t *testing.T) {
	var count int32
	eventBus := NewEventBus()

	tmpPath := t.TempDir()

	inotifyScheduler := InotifyScheduler{Path: tmpPath, EventFlags: fsnotify.Create | fsnotify.Write | fsnotify.Remove}

	job := NewJob[DummyResult]("test-job", &DummyRunnable{})

	genericJob := GenericJob(job)
	_ = inotifyScheduler.Schedule(genericJob, eventBus)

	jobChannel := make(chan interface{})

	go func() {
		for range jobChannel {
			atomic.AddInt32(&count, 1)
		}
	}()

	eventBus.Subscribe(QueueJob, jobChannel)

	time.Sleep(time.Millisecond * 100)

	tmpFile := tmpPath + "/test.txt"
	_, err := os.Create(tmpFile)
	assert.NoError(t, err)

	time.Sleep(time.Millisecond * 100)
	err = os.Remove(tmpFile)
	assert.NoError(t, err)

	time.Sleep(time.Millisecond * 100)

	if atomic.LoadInt32(&count) != 2 {
		t.Errorf("Expected to run 2 times, ran %d times", count)
	}
}

func TestUnmarshalSchedulerPersistenceYAML(t *testing.T) {
	// Test unmarshalling of a YAML string into a SchedulerPersistence struct
	yamlData := `
type: interval
interval: 1m
time: "2023-12-15T12:00:00Z"
`

	var sp SchedulerPersistence
	err := yaml.Unmarshal([]byte(yamlData), &sp)
	assert.Nil(t, err, "Unmarshalling should not produce an error")

	expectedInterval, _ := time.ParseDuration("1m")
	expectedTime, _ := time.Parse(time.RFC3339, "2023-12-15T12:00:00Z")

	assert.Equal(t, "interval", sp.Type, "Type should be unmarshalled correctly")
	assert.Equal(t, &expectedInterval, sp.Interval, "Interval should be unmarshalled correctly")
	assert.Equal(t, &expectedTime, sp.Time, "Time should be unmarshalled correctly")
}

func TestUnmarshalSchedulerPersistenceIntervalYAML(t *testing.T) {

	yamlData := `
type: Interval
interval: "1m1s"
`

	var sp SchedulerPersistence
	err := yaml.Unmarshal([]byte(yamlData), &sp)
	assert.Nil(t, err, "Unmarshalling should not produce an error")

	expectedInterval, _ := time.ParseDuration("1m1s")
	assert.Equal(t, "Interval", sp.Type, "Type should be unmarshalled correctly")
	assert.Equal(t, &expectedInterval, sp.Interval, "Interval should be unmarshalled correctly")
}