Select Git revision
schedule-cron.go
scheduler_test.go 5.69 KiB
package jobqueue
import (
"github.com/robfig/cron/v3"
"github.com/stretchr/testify/assert"
"sync/atomic"
"testing"
"time"
)
func TestIntervalScheduler_BasicFunctionality(t *testing.T) {
var count int32
eventBus := NewEventBus()
scheduler := IntervalScheduler{Interval: time.Millisecond * 100}
job := NewJob[DummyResult]("test-job", &DummyRunnable{})
genericJob := GenericJob(job)
_ = scheduler.Schedule(genericJob, eventBus)
jobChannel := make(chan interface{})
go func() {
for _ = range jobChannel {
atomic.AddInt32(&count, 1)
}
}()
eventBus.Subscribe(QueueJob, jobChannel)
time.Sleep(time.Millisecond * 500)
if atomic.LoadInt32(&count) < 4 {
t.Errorf("Expected to run at least 4 times, ran %d times", count)
}
}
func TestIntervalScheduler_StopTicker(t *testing.T) {
var count int32
eventBus := NewEventBus()
scheduler := IntervalScheduler{Interval: time.Millisecond * 100}
job := NewJob[DummyResult]("test-job", &DummyRunnable{})
genericJob := GenericJob(job)
err := scheduler.Schedule(genericJob, eventBus)
assert.Nil(t, err)
jobChannel := make(chan interface{})
go func() {
for _ = range jobChannel {
atomic.AddInt32(&count, 1)
}
}()
eventBus.Subscribe(QueueJob, jobChannel)
time.Sleep(time.Millisecond * 150)
scheduler.Cancel(job.GetID())
time.Sleep(time.Millisecond * 100)
if atomic.LoadInt32(&count) != 1 {
t.Errorf("Expected to run 1 time, ran %d times", count)
}
}
func TestIntervalScheduler_InvalidInterval(t *testing.T) {
eventBus := NewEventBus()
scheduler := IntervalScheduler{Interval: time.Millisecond * 0}
job := NewJob[DummyResult]("test-job", &DummyRunnable{})
genericJob := GenericJob(job)
err := scheduler.Schedule(genericJob, eventBus)
if err == nil {
t.Errorf("Expected an error due to invalid interval")
}
}
func TestCronScheduler_BasicFunctionality(t *testing.T) {
var count int32
eventBus := NewEventBus()
job := NewJob[DummyResult]("test-job", &DummyRunnable{})
genericJob := GenericJob(job)
cronScheduler := CronScheduler{cron: cron.New(cron.WithSeconds()), Spec: "*/1 * * * * *"}
_ = cronScheduler.Schedule(genericJob, eventBus)
jobChannel := make(chan interface{})
go func() {
for _ = range jobChannel {
atomic.AddInt32(&count, 1)
}
}()
eventBus.Subscribe(QueueJob, jobChannel)
time.Sleep(time.Second * 3)
if atomic.LoadInt32(&count) < 2 {
t.Errorf("Expected to run at least 2 times, ran %d times", count)
}
}
func TestCronScheduler_StopScheduler(t *testing.T) {
var count int32
eventBus := NewEventBus()
c := cron.New(cron.WithSeconds())
cronScheduler := CronScheduler{cron: c, Spec: "*/1 * * * * *"}
job := NewJob[DummyResult]("test-job", &DummyRunnable{})
genericJob := GenericJob(job)
_ = cronScheduler.Schedule(genericJob, eventBus)
jobChannel := make(chan interface{})
go func() {
for _ = range jobChannel {
atomic.AddInt32(&count, 1)
}
}()
eventBus.Subscribe(QueueJob, jobChannel)
time.Sleep(time.Second * 2)
cronScheduler.Cancel(job.GetID())
time.Sleep(time.Second)
if atomic.LoadInt32(&count) < 1 {
t.Errorf("Expected to run at least 1 time, ran %d times", count)
}
}
func TestDelayScheduler_BasicFunctionality(t *testing.T) {
var count int32
eventBus := NewEventBus()
delayScheduler := DelayScheduler{Delay: time.Millisecond * 100}
job := NewJob[DummyResult]("test-job", &DummyRunnable{})
genericJob := GenericJob(job)
_ = delayScheduler.Schedule(genericJob, eventBus)
jobChannel := make(chan interface{})
go func() {
for _ = range jobChannel {
atomic.AddInt32(&count, 1)
}
}()
eventBus.Subscribe(QueueJob, jobChannel)
time.Sleep(time.Millisecond * 200)
if atomic.LoadInt32(&count) != 1 {
t.Errorf("Expected to run 1 time, ran %d times", count)
}
}
func TestDelayScheduler_StopBeforeExecute(t *testing.T) {
var count int32
eventBus := NewEventBus()
delayScheduler := DelayScheduler{Delay: time.Millisecond * 100}
job := NewJob[DummyResult]("test-job", &DummyRunnable{})
genericJob := GenericJob(job)
_ = delayScheduler.Schedule(genericJob, eventBus)
jobChannel := make(chan interface{})
go func() {
for _ = range jobChannel {
atomic.AddInt32(&count, 1)
}
}()
eventBus.Subscribe(QueueJob, jobChannel)
time.Sleep(time.Millisecond * 50)
delayScheduler.Cancel(job.GetID())
time.Sleep(time.Millisecond * 100)
if atomic.LoadInt32(&count) != 0 {
t.Errorf("Expected to not run, ran %d times", count)
}
}
func TestInstantScheduler_BasicFunctionality(t *testing.T) {
var count int32
eventBus := NewEventBus()
instantScheduler := InstantScheduler{}
job := NewJob[DummyResult]("test-job", &DummyRunnable{})
jobChannel := make(chan interface{})
go func() {
for _ = range jobChannel {
atomic.AddInt32(&count, 1)
}
}()
eventBus.Subscribe(QueueJob, jobChannel)
time.Sleep(time.Millisecond * 100)
genericJob := GenericJob(job)
_ = instantScheduler.Schedule(genericJob, eventBus)
time.Sleep(time.Millisecond * 100)
if atomic.LoadInt32(&count) != 1 {
t.Errorf("Expected to run 1 time, ran %d times", count)
}
}
func TestEventScheduler_BasicFunctionality(t *testing.T) {
var count int32
eventBus := NewEventBus()
eventScheduler := EventScheduler{Event: "trigger-event"}
job := NewJob[DummyResult]("test-job", &DummyRunnable{})
genericJob := GenericJob(job)
_ = eventScheduler.Schedule(genericJob, eventBus)
jobChannel := make(chan interface{})
eventBus.Subscribe(QueueJob, jobChannel)
go func() {
for _ = range jobChannel {
atomic.AddInt32(&count, 1)
}
}()
// Trigger the event
eventBus.Publish("trigger-event", Event{Data: nil, Name: "trigger-event"})
time.Sleep(time.Millisecond * 50) // Allow some time for the event to propagate
if atomic.LoadInt32(&count) != 1 {
t.Errorf("Expected to run 1 time, ran %d times", count)
}
}