Skip to content
Snippets Groups Projects
Select Git revision
  • 81721920b0a94262200a02aa550bf845d5478ee1
  • master default protected
  • 1.31
  • 4.38.7
  • 4.38.6
  • 4.38.5
  • 4.38.4
  • 4.38.3
  • 4.38.2
  • 4.38.1
  • 4.38.0
  • 4.37.2
  • 4.37.1
  • 4.37.0
  • 4.36.0
  • 4.35.0
  • 4.34.1
  • 4.34.0
  • 4.33.1
  • 4.33.0
  • 4.32.2
  • 4.32.1
  • 4.32.0
23 results

transformer.mjs

Blame
  • scheduler_test.go 5.69 KiB
    package jobqueue
    
    import (
    	"github.com/robfig/cron/v3"
    	"github.com/stretchr/testify/assert"
    	"sync/atomic"
    	"testing"
    	"time"
    )
    
    func TestIntervalScheduler_BasicFunctionality(t *testing.T) {
    	var count int32
    	eventBus := NewEventBus()
    	scheduler := IntervalScheduler{Interval: time.Millisecond * 100}
    
    	job := NewJob[DummyResult]("test-job", &DummyRunnable{})
    
    	genericJob := GenericJob(job)
    	_ = scheduler.Schedule(genericJob, eventBus)
    
    	jobChannel := make(chan interface{})
    
    	go func() {
    		for _ = range jobChannel {
    			atomic.AddInt32(&count, 1)
    		}
    	}()
    
    	eventBus.Subscribe(QueueJob, jobChannel)
    
    	time.Sleep(time.Millisecond * 500)
    	if atomic.LoadInt32(&count) < 4 {
    		t.Errorf("Expected to run at least 4 times, ran %d times", count)
    	}
    }
    
    func TestIntervalScheduler_StopTicker(t *testing.T) {
    	var count int32
    	eventBus := NewEventBus()
    	scheduler := IntervalScheduler{Interval: time.Millisecond * 100}
    
    	job := NewJob[DummyResult]("test-job", &DummyRunnable{})
    
    	genericJob := GenericJob(job)
    	err := scheduler.Schedule(genericJob, eventBus)
    	assert.Nil(t, err)
    
    	jobChannel := make(chan interface{})
    
    	go func() {
    		for _ = range jobChannel {
    			atomic.AddInt32(&count, 1)
    		}
    	}()
    
    	eventBus.Subscribe(QueueJob, jobChannel)
    
    	time.Sleep(time.Millisecond * 150)
    	scheduler.Cancel(job.GetID())
    	time.Sleep(time.Millisecond * 100)
    
    	if atomic.LoadInt32(&count) != 1 {
    		t.Errorf("Expected to run 1 time, ran %d times", count)
    	}
    }
    
    func TestIntervalScheduler_InvalidInterval(t *testing.T) {
    	eventBus := NewEventBus()
    	scheduler := IntervalScheduler{Interval: time.Millisecond * 0}
    
    	job := NewJob[DummyResult]("test-job", &DummyRunnable{})
    
    	genericJob := GenericJob(job)
    	err := scheduler.Schedule(genericJob, eventBus)
    	if err == nil {
    		t.Errorf("Expected an error due to invalid interval")
    	}
    }
    
    func TestCronScheduler_BasicFunctionality(t *testing.T) {
    	var count int32
    	eventBus := NewEventBus()
    
    	job := NewJob[DummyResult]("test-job", &DummyRunnable{})
    
    	genericJob := GenericJob(job)
    	cronScheduler := CronScheduler{cron: cron.New(cron.WithSeconds()), Spec: "*/1 * * * * *"}
    	_ = cronScheduler.Schedule(genericJob, eventBus)
    
    	jobChannel := make(chan interface{})
    
    	go func() {
    		for _ = range jobChannel {
    			atomic.AddInt32(&count, 1)
    		}
    	}()
    
    	eventBus.Subscribe(QueueJob, jobChannel)
    
    	time.Sleep(time.Second * 3)
    	if atomic.LoadInt32(&count) < 2 {
    		t.Errorf("Expected to run at least 2 times, ran %d times", count)
    	}
    }
    
    func TestCronScheduler_StopScheduler(t *testing.T) {
    	var count int32
    	eventBus := NewEventBus()
    
    	c := cron.New(cron.WithSeconds())
    
    	cronScheduler := CronScheduler{cron: c, Spec: "*/1 * * * * *"}
    
    	job := NewJob[DummyResult]("test-job", &DummyRunnable{})
    
    	genericJob := GenericJob(job)
    	_ = cronScheduler.Schedule(genericJob, eventBus)
    
    	jobChannel := make(chan interface{})
    
    	go func() {
    		for _ = range jobChannel {
    
    			atomic.AddInt32(&count, 1)
    
    		}
    	}()
    
    	eventBus.Subscribe(QueueJob, jobChannel)
    
    	time.Sleep(time.Second * 2)
    	cronScheduler.Cancel(job.GetID())
    	time.Sleep(time.Second)
    
    	if atomic.LoadInt32(&count) < 1 {
    		t.Errorf("Expected to run at least 1 time, ran %d times", count)
    	}
    }
    
    func TestDelayScheduler_BasicFunctionality(t *testing.T) {
    	var count int32
    	eventBus := NewEventBus()
    	delayScheduler := DelayScheduler{Delay: time.Millisecond * 100}
    
    	job := NewJob[DummyResult]("test-job", &DummyRunnable{})
    
    	genericJob := GenericJob(job)
    	_ = delayScheduler.Schedule(genericJob, eventBus)
    
    	jobChannel := make(chan interface{})
    
    	go func() {
    		for _ = range jobChannel {
    
    			atomic.AddInt32(&count, 1)
    
    		}
    	}()
    
    	eventBus.Subscribe(QueueJob, jobChannel)
    
    	time.Sleep(time.Millisecond * 200)
    
    	if atomic.LoadInt32(&count) != 1 {
    		t.Errorf("Expected to run 1 time, ran %d times", count)
    	}
    }
    
    func TestDelayScheduler_StopBeforeExecute(t *testing.T) {
    	var count int32
    	eventBus := NewEventBus()
    	delayScheduler := DelayScheduler{Delay: time.Millisecond * 100}
    
    	job := NewJob[DummyResult]("test-job", &DummyRunnable{})
    
    	genericJob := GenericJob(job)
    	_ = delayScheduler.Schedule(genericJob, eventBus)
    
    	jobChannel := make(chan interface{})
    
    	go func() {
    		for _ = range jobChannel {
    
    			atomic.AddInt32(&count, 1)
    
    		}
    	}()
    
    	eventBus.Subscribe(QueueJob, jobChannel)
    
    	time.Sleep(time.Millisecond * 50)
    	delayScheduler.Cancel(job.GetID())
    	time.Sleep(time.Millisecond * 100)
    
    	if atomic.LoadInt32(&count) != 0 {
    		t.Errorf("Expected to not run, ran %d times", count)
    	}
    }
    
    func TestInstantScheduler_BasicFunctionality(t *testing.T) {
    	var count int32
    	eventBus := NewEventBus()
    	instantScheduler := InstantScheduler{}
    
    	job := NewJob[DummyResult]("test-job", &DummyRunnable{})
    
    	jobChannel := make(chan interface{})
    
    	go func() {
    		for _ = range jobChannel {
    			atomic.AddInt32(&count, 1)
    		}
    	}()
    
    	eventBus.Subscribe(QueueJob, jobChannel)
    
    	time.Sleep(time.Millisecond * 100)
    
    	genericJob := GenericJob(job)
    	_ = instantScheduler.Schedule(genericJob, eventBus)
    
    	time.Sleep(time.Millisecond * 100)
    
    	if atomic.LoadInt32(&count) != 1 {
    		t.Errorf("Expected to run 1 time, ran %d times", count)
    	}
    
    }
    
    func TestEventScheduler_BasicFunctionality(t *testing.T) {
    	var count int32
    	eventBus := NewEventBus()
    	eventScheduler := EventScheduler{Event: "trigger-event"}
    
    	job := NewJob[DummyResult]("test-job", &DummyRunnable{})
    
    	genericJob := GenericJob(job)
    	_ = eventScheduler.Schedule(genericJob, eventBus)
    
    	jobChannel := make(chan interface{})
    	eventBus.Subscribe(QueueJob, jobChannel)
    
    	go func() {
    		for _ = range jobChannel {
    			atomic.AddInt32(&count, 1)
    		}
    	}()
    
    	// Trigger the event
    	eventBus.Publish("trigger-event", Event{Data: nil, Name: "trigger-event"})
    
    	time.Sleep(time.Millisecond * 50) // Allow some time for the event to propagate
    
    	if atomic.LoadInt32(&count) != 1 {
    		t.Errorf("Expected to run 1 time, ran %d times", count)
    	}
    }