Something went wrong on our end
-
Volker Schukai authoredVolker Schukai authored
scheduler_test.go 10.47 KiB
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
"encoding/json"
"github.com/fsnotify/fsnotify"
"github.com/robfig/cron/v3"
"github.com/stretchr/testify/assert"
"gopkg.in/yaml.v3"
"os"
"sync/atomic"
"testing"
"time"
)
func TestIntervalScheduler_BasicFunctionality(t *testing.T) {
var count int32
eventBus := NewEventBus()
scheduler := IntervalScheduler{Interval: time.Millisecond * 100}
job := NewJob[DummyResult]("test-job", &DummyRunnable{})
genericJob := GenericJob(job)
_ = scheduler.Schedule(genericJob, eventBus)
jobChannel := make(chan interface{})
go func() {
for range jobChannel {
atomic.AddInt32(&count, 1)
}
}()
eventBus.Subscribe(QueueJob, jobChannel)
time.Sleep(time.Millisecond * 500)
if atomic.LoadInt32(&count) < 4 {
t.Errorf("Expected to run at least four times, ran %d times", count)
}
}
func TestIntervalScheduler_StopTicker(t *testing.T) {
var count int32
eventBus := NewEventBus()
scheduler := IntervalScheduler{Interval: time.Millisecond * 100}
job := NewJob[DummyResult]("test-job", &DummyRunnable{})
genericJob := GenericJob(job)
err := scheduler.Schedule(genericJob, eventBus)
assert.Nil(t, err)
jobChannel := make(chan interface{})
go func() {
for range jobChannel {
atomic.AddInt32(&count, 1)
}
}()
eventBus.Subscribe(QueueJob, jobChannel)
time.Sleep(time.Millisecond * 150)
scheduler.Cancel(job.GetID())
time.Sleep(time.Millisecond * 100)
if atomic.LoadInt32(&count) != 1 {
t.Errorf("Expected to run 1 time, ran %d times", count)
}
}
func TestIntervalScheduler_InvalidInterval(t *testing.T) {
eventBus := NewEventBus()
scheduler := IntervalScheduler{Interval: time.Millisecond * 0}
job := NewJob[DummyResult]("test-job", &DummyRunnable{})
genericJob := GenericJob(job)
err := scheduler.Schedule(genericJob, eventBus)
if err == nil {
t.Errorf("Expected an error due to invalid interval")
}
}
func TestCronScheduler_BasicFunctionality(t *testing.T) {
var count int32
eventBus := NewEventBus()
job := NewJob[DummyResult]("test-job", &DummyRunnable{})
c := cron.New(cron.WithSeconds())
genericJob := GenericJob(job)
cronScheduler := CronScheduler{
cron: c,
Spec: "* * * * * *",
}
_ = cronScheduler.Schedule(genericJob, eventBus)
c.Start()
defer c.Stop()
jobChannel := make(chan interface{})
go func() {
for range jobChannel {
atomic.AddInt32(&count, 1)
}
}()
eventBus.Subscribe(QueueJob, jobChannel)
time.Sleep(time.Second * 3)
if atomic.LoadInt32(&count) < 2 {
t.Errorf("Expected to run at least 2 times, ran %d times", count)
}
}
func TestCronScheduler_StopScheduler(t *testing.T) {
var count int32
eventBus := NewEventBus()
c := cron.New(cron.WithSeconds())
cronScheduler := CronScheduler{cron: c, Spec: "* * * * * *"}
job := NewJob[DummyResult]("test-job", &DummyRunnable{})
genericJob := GenericJob(job)
_ = cronScheduler.Schedule(genericJob, eventBus)
c.Start()
defer c.Stop()
jobChannel := make(chan interface{})
go func() {
for range jobChannel {
atomic.AddInt32(&count, 1)
}
}()
eventBus.Subscribe(QueueJob, jobChannel)
time.Sleep(time.Second * 2)
cronScheduler.Cancel(job.GetID())
time.Sleep(time.Second)
if atomic.LoadInt32(&count) < 1 {
t.Errorf("Expected to run at least 1 time, ran %d times", count)
}
}
func TestDelayScheduler_BasicFunctionality(t *testing.T) {
var count int32
eventBus := NewEventBus()
delayScheduler := DelayScheduler{Delay: time.Millisecond * 100}
job := NewJob[DummyResult]("test-job", &DummyRunnable{})
genericJob := GenericJob(job)
_ = delayScheduler.Schedule(genericJob, eventBus)
jobChannel := make(chan interface{})
go func() {
for range jobChannel {
atomic.AddInt32(&count, 1)
}
}()
eventBus.Subscribe(QueueJob, jobChannel)
time.Sleep(time.Millisecond * 200)
if atomic.LoadInt32(&count) != 1 {
t.Errorf("Expected to run one time, ran %d times", count)
}
}
func TestDelayScheduler_StopBeforeExecute(t *testing.T) {
var count int32
eventBus := NewEventBus()
delayScheduler := DelayScheduler{Delay: time.Millisecond * 100}
job := NewJob[DummyResult]("test-job", &DummyRunnable{})
genericJob := GenericJob(job)
_ = delayScheduler.Schedule(genericJob, eventBus)
jobChannel := make(chan interface{})
go func() {
for range jobChannel {
atomic.AddInt32(&count, 1)
}
}()
eventBus.Subscribe(QueueJob, jobChannel)
time.Sleep(time.Millisecond * 50)
delayScheduler.Cancel(job.GetID())
time.Sleep(time.Millisecond * 100)
if atomic.LoadInt32(&count) != 0 {
t.Errorf("Expected to not run, ran %d times", count)
}
}
func TestInstantScheduler_BasicFunctionality(t *testing.T) {
var count int32
eventBus := NewEventBus()
instantScheduler := InstantScheduler{}
job := NewJob[DummyResult]("test-job", &DummyRunnable{})
jobChannel := make(chan interface{})
go func() {
for range jobChannel {
atomic.AddInt32(&count, 1)
}
}()
eventBus.Subscribe(QueueJob, jobChannel)
time.Sleep(time.Millisecond * 100)
genericJob := GenericJob(job)
_ = instantScheduler.Schedule(genericJob, eventBus)
time.Sleep(time.Millisecond * 100)
if atomic.LoadInt32(&count) != 1 {
t.Errorf("Expected to run 1 time, ran %d times", count)
}
}
func TestEventScheduler_BasicFunctionality(t *testing.T) {
var count int32
eventBus := NewEventBus()
eventScheduler := EventScheduler{Event: "trigger-event"}
job := NewJob[DummyResult]("test-job", &DummyRunnable{})
genericJob := GenericJob(job)
_ = eventScheduler.Schedule(genericJob, eventBus)
jobChannel := make(chan interface{})
eventBus.Subscribe(QueueJob, jobChannel)
go func() {
for range jobChannel {
atomic.AddInt32(&count, 1)
}
}()
// Trigger the event
eventBus.Publish("trigger-event", Event{Data: nil, Name: "trigger-event"})
time.Sleep(time.Millisecond * 50) // Allow some time for the event to propagate
if atomic.LoadInt32(&count) != 1 {
t.Errorf("Expected to run one time, ran %d times", count)
}
}
func TestTimeScheduler_BasicFunctionality(t *testing.T) {
var count int32
eventBus := NewEventBus()
timeScheduler := TimeScheduler{Time: time.Now().Add(time.Second * 1)}
job := NewJob[DummyResult]("test-job", &DummyRunnable{})
genericJob := GenericJob(job)
_ = timeScheduler.Schedule(genericJob, eventBus)
jobChannel := make(chan interface{})
go func() {
for range jobChannel {
atomic.AddInt32(&count, 1)
}
}()
eventBus.Subscribe(QueueJob, jobChannel)
time.Sleep(time.Second * 2)
timeScheduler.Cancel(job.GetID())
time.Sleep(time.Millisecond * 100)
if atomic.LoadInt32(&count) != 1 {
t.Errorf("Expected to not run, ran %d times", count)
}
}
// TestSchedulerPersistenceUnmarshalJSON testet die Unmarshalling-Funktion mit verschiedenen Zeitformaten.
func TestSchedulerPersistenceUnmarshalJSON(t *testing.T) {
testCases := []struct {
name string
jsonInput string
expected *time.Time
wantErr bool
}{
{
name: "RFC3339 Format",
jsonInput: `{"type":"Time","time":"2023-11-15T09:01:00Z"}`,
expected: parseTimeForTesting(t, "2023-11-15T09:01:00Z"),
wantErr: false,
},
{
name: "Date and Time",
jsonInput: `{"type":"Time","time":"2023-11-15T09:01"}`,
expected: parseTimeForTesting(t, "2023-11-15T09:01"),
wantErr: false,
},
{
name: "Date and Time with Space",
jsonInput: `{"type":"Time","time":"2023-11-15 09:01"}`,
expected: parseTimeForTesting(t, "2023-11-15T09:01"),
wantErr: false,
},
{
name: "Date and Time with Space and Seconds",
jsonInput: `{"type":"Time","time":"2023-11-15 09:01:02"}`,
expected: parseTimeForTesting(t, "2023-11-15T09:01:02"),
wantErr: false,
},
{
name: "Only Date",
jsonInput: `{"type":"Time","time":"2023-11-15"}`,
expected: parseTimeForTesting(t, "2023-11-15"),
wantErr: false,
},
{
name: "Invalid Format",
jsonInput: `{"type":"Time","time":"15. November 2023"}`,
expected: nil,
wantErr: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var sp SchedulerPersistence
err := json.Unmarshal([]byte(tc.jsonInput), &sp)
if tc.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, tc.expected, sp.Time)
}
})
}
}
func parseTimeForTesting(t *testing.T, value string) *time.Time {
for _, format := range SupportedTimeFormats {
parsedTime, err := time.Parse(format, value)
if err == nil {
return &parsedTime
}
}
t.Fatalf("Failed to parse time '%s' in any known format", value)
return nil
}
func TestInotifyScheduler_BasicFunctionality(t *testing.T) {
var count int32
eventBus := NewEventBus()
tmpPath := t.TempDir()
inotifyScheduler := InotifyScheduler{Path: tmpPath, EventFlags: fsnotify.Create | fsnotify.Write | fsnotify.Remove}
job := NewJob[DummyResult]("test-job", &DummyRunnable{})
genericJob := GenericJob(job)
_ = inotifyScheduler.Schedule(genericJob, eventBus)
jobChannel := make(chan interface{})
go func() {
for range jobChannel {
atomic.AddInt32(&count, 1)
}
}()
eventBus.Subscribe(QueueJob, jobChannel)
time.Sleep(time.Millisecond * 100)
tmpFile := tmpPath + "/test.txt"
_, err := os.Create(tmpFile)
assert.NoError(t, err)
time.Sleep(time.Millisecond * 100)
err = os.Remove(tmpFile)
assert.NoError(t, err)
time.Sleep(time.Millisecond * 100)
if atomic.LoadInt32(&count) != 2 {
t.Errorf("Expected to run 2 times, ran %d times", count)
}
}
func TestUnmarshalSchedulerPersistenceYAML(t *testing.T) {
// Test unmarshalling of a YAML string into a SchedulerPersistence struct
yamlData := `
type: interval
interval: 1m
time: "2023-12-15T12:00:00Z"
`
var sp SchedulerPersistence
err := yaml.Unmarshal([]byte(yamlData), &sp)
assert.Nil(t, err, "Unmarshalling should not produce an error")
expectedInterval, _ := time.ParseDuration("1m")
expectedTime, _ := time.Parse(time.RFC3339, "2023-12-15T12:00:00Z")
assert.Equal(t, "interval", sp.Type, "Type should be unmarshalled correctly")
assert.Equal(t, &expectedInterval, sp.Interval, "Interval should be unmarshalled correctly")
assert.Equal(t, &expectedTime, sp.Time, "Time should be unmarshalled correctly")
}
func TestUnmarshalSchedulerPersistenceIntervalYAML(t *testing.T) {
yamlData := `
type: Interval
interval: "1m1s"
`
var sp SchedulerPersistence
err := yaml.Unmarshal([]byte(yamlData), &sp)
assert.Nil(t, err, "Unmarshalling should not produce an error")
expectedInterval, _ := time.ParseDuration("1m1s")
assert.Equal(t, "Interval", sp.Type, "Type should be unmarshalled correctly")
assert.Equal(t, &expectedInterval, sp.Interval, "Interval should be unmarshalled correctly")
}