Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
Loading items

Target

Select target project
  • oss/libraries/go/services/job-queues
1 result
Select Git revision
Loading items
Show changes
{
pkgs,
lib,
...
}: let
common = pkgs.callPackage ./common.nix {};
in
pkgs.writeShellScriptBin "run-extended-tests" ''
${common}
echo_header "Running extended tests"
download_test_images
setup_go_env
cd_working_dir
echo_step "Running tests"
if ! ${pkgs.go}/bin/go test -tags "runOnTask" -cover ./...
then
echo_fail "Failed to run tests"
exit 1
fi
echo_ok "All tests passed"
echo_step "Running benchmarks"
if ! ${pkgs.go}/bin/go test -tags "runOnTask,bench" -bench ./...
then
echo_fail "Failed to run benchmarks"
exit 1
fi
echo_ok "Benchmarks passed"
echo_step "Running race tests"
if ! ${pkgs.go}/bin/go test -tags "runOnTask,race" -race ./...
then
echo_fail "Failed to run race tests"
exit 1
fi
echo_ok "race tests passed"
echo_step "Running fuzz tests"
if ! ${pkgs.go}/bin/go test -tags "runOnTask,fuzz" -fuzz ./...
then
echo_fail "Failed to run fuzz tests"
exit 1
fi
echo_ok "Fuzz tests passed"
echo_ok "All tests passed"
''
{
pkgs,
lib,
...
}: let
common = pkgs.callPackage ./common.nix {};
in
pkgs.writeShellScriptBin "run-tests" ''
${common}
echo_header "Running tests"
selection=$(${pkgs.gum}/bin/gum choose "run all tests" "run specific test" "Cancel")
if [[ "$selection" == "Cancel" ]]; then
echo_ok "Exiting."
exit 0
fi
download_test_images
setup_go_env
cd_working_dir
if [[ "$selection" == "run all tests" ]]; then
echo_ok "Running all tests"
if ! ${pkgs.go}/bin/go test -tags runOnTask -v -failfast ./...
then
echo_fail "ERROR: Tests failed, check your Go!"
exit 1
fi
echo_ok "All tests passed!"
exit 0
fi
test_files=$(${pkgs.findutils}/bin/find . -name "*_test.go")
test_names=""
for file in $test_files; do
names=$(${pkgs.gnugrep}/bin/grep -oP 'func (Test\w+)' $file | ${pkgs.gawk}/bin/gawk '{print $2}')
test_names+="$names "
done
if [[ -z "$test_names" ]]; then
echo_fail "No tests found!"
exit 1
fi
selected_tests=$(echo "$test_names" | ${pkgs.coreutils}/bin/tr ' ' '\n' | ${pkgs.gum}/bin/gum filter --no-limit )
if [[ -z "$selected_tests" ]]; then
echo_ok "No tests selected, exiting."
exit 0
fi
if ! ${pkgs.go}/bin/go test -tags runOnTask -run "$(echo $selected_tests)"
then
echo_fail "ERROR: Tests failed, check your Go!"
exit 1
fi
echo_ok "All tests passed!"
''
...@@ -86,19 +86,15 @@ func (q *Queue) Enqueue(job GenericJob) error { ...@@ -86,19 +86,15 @@ func (q *Queue) Enqueue(job GenericJob) error {
// Run topological sort on jobs in the ready queue // Run topological sort on jobs in the ready queue
readyJobList := []GenericJob{} readyJobList := []GenericJob{}
for _, readyJob := range q.readyQueue { readyJobList = append(readyJobList, q.readyQueue...)
readyJobList = append(readyJobList, readyJob)
}
currentReadyJobIDs := make(map[JobID]struct{}) currentReadyJobIDs := make(map[JobID]struct{})
for _, job := range readyJobList { for _, job := range readyJobList {
currentReadyJobIDs[job.GetID()] = struct{}{} currentReadyJobIDs[job.GetID()] = struct{}{}
} }
fullJobList := []GenericJob{} var fullJobList []GenericJob
for _, job := range readyJobList { fullJobList = append(fullJobList, readyJobList...)
fullJobList = append(fullJobList, job)
}
for i := range q.processedJobs { for i := range q.processedJobs {
id := q.processedJobs[i].ID id := q.processedJobs[i].ID
...@@ -154,7 +150,7 @@ func (q *Queue) ClearProcessedJobs() { ...@@ -154,7 +150,7 @@ func (q *Queue) ClearProcessedJobs() {
} }
} }
func (q *Queue) isDependency(id JobID) bool { func (q *Queue) IsDependency(id JobID) bool {
for _, deps := range q.pendingDependencies { for _, deps := range q.pendingDependencies {
for _, depID := range deps { for _, depID := range deps {
if depID == id { if depID == id {
...@@ -182,7 +178,6 @@ func (q *Queue) Dequeue() (GenericJob, error) { ...@@ -182,7 +178,6 @@ func (q *Queue) Dequeue() (GenericJob, error) {
ProcessedTime: time.Now(), ProcessedTime: time.Now(),
ID: job.GetID(), ID: job.GetID(),
}) })
return job, nil return job, nil
} }
...@@ -194,6 +189,5 @@ func removeJobID(deps []JobID, id JobID) []JobID { ...@@ -194,6 +189,5 @@ func removeJobID(deps []JobID, id JobID) []JobID {
return deps[:len(deps)-1] return deps[:len(deps)-1]
} }
} }
return deps return deps
} }
...@@ -43,7 +43,7 @@ func (f *FileOperationResult) GetResult() string { ...@@ -43,7 +43,7 @@ func (f *FileOperationResult) GetResult() string {
} }
func (f *FileOperationResult) GetError() (string, int) { func (f *FileOperationResult) GetError() (string, int) {
if f.Success == false { if !f.Success {
return "FileOperationResult failed", 1 return "FileOperationResult failed", 1
} }
return "", 0 return "", 0
......
...@@ -78,7 +78,8 @@ func startTestSMTPDockerImageAndContainer(t *testing.T, port string, ctx context ...@@ -78,7 +78,8 @@ func startTestSMTPDockerImageAndContainer(t *testing.T, port string, ctx context
Timeout: &timeout, Timeout: &timeout,
Signal: "SIGKILL", Signal: "SIGKILL",
} }
newCtx, _ := context.WithTimeout(context.Background(), 60*time.Second) newCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
if err := cli.ContainerStop(newCtx, resp.ID, stopOptions); err != nil { if err := cli.ContainerStop(newCtx, resp.ID, stopOptions); err != nil {
t.Errorf("ContainerStop returned error: %v", err) t.Errorf("ContainerStop returned error: %v", err)
} }
......
...@@ -78,7 +78,10 @@ func startSFTPTestDockerImageAndContainer(t *testing.T, host string, port string ...@@ -78,7 +78,10 @@ func startSFTPTestDockerImageAndContainer(t *testing.T, host string, port string
Timeout: &timeout, Timeout: &timeout,
Signal: "SIGKILL", Signal: "SIGKILL",
} }
newCtx, _ := context.WithTimeout(context.Background(), 60*time.Second)
newCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
if err := cli.ContainerStop(newCtx, resp.ID, stopOptions); err != nil { if err := cli.ContainerStop(newCtx, resp.ID, stopOptions); err != nil {
t.Errorf("ContainerStop returned error: %v", err) t.Errorf("ContainerStop returned error: %v", err)
} }
......
...@@ -30,6 +30,5 @@ func (r RunResult[T]) GetStatus() ResultStatus { ...@@ -30,6 +30,5 @@ func (r RunResult[T]) GetStatus() ResultStatus {
type Runnable[T any] interface { type Runnable[T any] interface {
Run(ctx context.Context) (RunResult[T], error) Run(ctx context.Context) (RunResult[T], error)
GetType() string GetType() string
GetPersistence() RunnableImport GetPersistence() RunnableImport
} }
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
package jobqueue package jobqueue
import ( import (
"errors"
"fmt" "fmt"
"time" "time"
) )
...@@ -16,12 +17,17 @@ type TimeScheduler struct { ...@@ -16,12 +17,17 @@ type TimeScheduler struct {
} }
func (s *TimeScheduler) Schedule(job GenericJob, eventBus *EventBus) error { func (s *TimeScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
if job == nil {
return ErrParameterIsNil
}
if s.executed { if s.executed {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, job.GetID()) return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, job.GetID())
} }
if s.Time.Before(time.Now()) { if s.Time.Before(time.Now()) {
return fmt.Errorf("%w: scheduled time is in the past", ErrInvalidTime) return errors.Join(ErrScheduleTimeIsInThePast, ErrInvalidTime, fmt.Errorf("time: %s", s.Time))
} }
if s.jobs == nil { if s.jobs == nil {
...@@ -29,6 +35,10 @@ func (s *TimeScheduler) Schedule(job GenericJob, eventBus *EventBus) error { ...@@ -29,6 +35,10 @@ func (s *TimeScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
} }
id := job.GetID() id := job.GetID()
if id == "" {
return ErrJobIDEmpty
}
if _, ok := s.jobs[id]; ok { if _, ok := s.jobs[id]; ok {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id) return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
} }
...@@ -36,7 +46,7 @@ func (s *TimeScheduler) Schedule(job GenericJob, eventBus *EventBus) error { ...@@ -36,7 +46,7 @@ func (s *TimeScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
stopChan := make(StopChan) stopChan := make(StopChan)
s.jobs[id] = stopChan s.jobs[id] = stopChan
timer := time.NewTimer(s.Time.Sub(time.Now())) timer := time.NewTimer(time.Until(s.Time))
go func() { go func() {
select { select {
...@@ -101,7 +111,6 @@ func (s *TimeScheduler) JobExists(id JobID) bool { ...@@ -101,7 +111,6 @@ func (s *TimeScheduler) JobExists(id JobID) bool {
if s.jobs == nil { if s.jobs == nil {
return false return false
} }
_, ok := s.jobs[id] _, ok := s.jobs[id]
return ok return ok
} }
......
// Copyright 2024 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
"gotest.tools/v3/assert"
"testing"
"time"
)
func TestStructure(t *testing.T) {
now := time.Now()
oneHourBefore := now.Add(-1 * time.Hour)
onHourAfter := now.Add(1 * time.Hour)
ts := TimeScheduler{
Time: oneHourBefore,
}
err := ts.Schedule(nil, nil)
assert.ErrorIs(t, err, ErrParameterIsNil)
job := &MockGenericJob{}
err = ts.Schedule(job, nil)
assert.ErrorIs(t, err, ErrScheduleTimeIsInThePast)
ts.Time = onHourAfter
err = ts.Schedule(job, nil)
assert.ErrorIs(t, err, ErrJobIDEmpty)
job.ID = "job-id"
err = ts.Schedule(job, nil)
assert.NilError(t, err)
time.Sleep(2 * time.Second)
err = ts.Cancel("job-id")
assert.NilError(t, err)
//time.Sleep(200 * time.Second)
}
...@@ -62,7 +62,7 @@ func (s *InotifyScheduler) Schedule(job GenericJob, eventBus *EventBus) error { ...@@ -62,7 +62,7 @@ func (s *InotifyScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
} }
} }
case _, _ = <-watcher.Errors: case <-watcher.Errors:
case <-stopChan: case <-stopChan:
_ = watcher.Close() _ = watcher.Close()
......
...@@ -17,24 +17,22 @@ type Scheduler interface { ...@@ -17,24 +17,22 @@ type Scheduler interface {
Cancel(id JobID) error Cancel(id JobID) error
CancelAll() error CancelAll() error
JobExists(id JobID) bool JobExists(id JobID) bool
GetType() string GetType() string
IsAdHoc() bool IsAdHoc() bool
GetPersistence() SchedulerPersistence GetPersistence() SchedulerPersistence
} }
type SchedulerPersistence struct { type SchedulerPersistence struct {
Type string `yaml:"type" json:"type" gorm:"column:type"` Time *time.Time `yaml:"time,omitempty" json:"time,omitempty" gorm:"column:time"`
Interval time.Duration `yaml:"interval,omitempty" json:"interval,omitempty" gorm:"column:interval"` Interval time.Duration `yaml:"interval,omitempty" json:"interval,omitempty" gorm:"column:interval"`
Spec string `yaml:"spec,omitempty" json:"spec,omitempty" gorm:"column:spec"`
Delay time.Duration `yaml:"delay,omitempty" json:"delay,omitempty" gorm:"column:delay"` Delay time.Duration `yaml:"delay,omitempty" json:"delay,omitempty" gorm:"column:delay"`
EventFlags fsnotify.Op `yaml:"eventFlags,omitempty" json:"eventFlags,omitempty" gorm:"column:eventFlags"`
Type string `yaml:"type" json:"type" gorm:"column:type"`
Spec string `yaml:"spec,omitempty" json:"spec,omitempty" gorm:"column:spec"`
Event EventName `yaml:"event,omitempty" json:"event,omitempty" gorm:"column:event"` Event EventName `yaml:"event,omitempty" json:"event,omitempty" gorm:"column:event"`
Time *time.Time `yaml:"time,omitempty" json:"time,omitempty" gorm:"column:time"`
Executed bool `yaml:"executed,omitempty" json:"executed,omitempty" gorm:"column:executed"` Executed bool `yaml:"executed,omitempty" json:"executed,omitempty" gorm:"column:executed"`
Path string `yaml:"path,omitempty" json:"path,omitempty" gorm:"column:path"` Path string `yaml:"path,omitempty" json:"path,omitempty" gorm:"column:path"`
EventFlags fsnotify.Op `yaml:"eventFlags,omitempty" json:"eventFlags,omitempty" gorm:"column:eventFlags"`
} }
type scheduleImportStruct struct { type scheduleImportStruct struct {
...@@ -112,15 +110,11 @@ func (sp *SchedulerPersistence) parseAndAssignFields(aux scheduleImportStruct) e ...@@ -112,15 +110,11 @@ func (sp *SchedulerPersistence) parseAndAssignFields(aux scheduleImportStruct) e
// UnmarshalJSON implements the json.Unmarshaler interface // UnmarshalJSON implements the json.Unmarshaler interface
func (sp *SchedulerPersistence) UnmarshalJSON(data []byte) error { func (sp *SchedulerPersistence) UnmarshalJSON(data []byte) error {
var aux scheduleImportStruct var aux scheduleImportStruct
if err := json.Unmarshal(data, &aux); err != nil { if err := json.Unmarshal(data, &aux); err != nil {
return err return err
} }
return sp.parseAndAssignFields(aux) return sp.parseAndAssignFields(aux)
} }
func (sp *SchedulerPersistence) UnmarshalYAML(unmarshal func(interface{}) error) error { func (sp *SchedulerPersistence) UnmarshalYAML(unmarshal func(interface{}) error) error {
...@@ -128,7 +122,5 @@ func (sp *SchedulerPersistence) UnmarshalYAML(unmarshal func(interface{}) error) ...@@ -128,7 +122,5 @@ func (sp *SchedulerPersistence) UnmarshalYAML(unmarshal func(interface{}) error)
if err := unmarshal(&aux); err != nil { if err := unmarshal(&aux); err != nil {
return err return err
} }
return sp.parseAndAssignFields(aux) return sp.parseAndAssignFields(aux)
} }
...@@ -28,7 +28,7 @@ func TestIntervalScheduler_BasicFunctionality(t *testing.T) { ...@@ -28,7 +28,7 @@ func TestIntervalScheduler_BasicFunctionality(t *testing.T) {
jobChannel := make(chan interface{}) jobChannel := make(chan interface{})
go func() { go func() {
for _ = range jobChannel { for range jobChannel {
atomic.AddInt32(&count, 1) atomic.AddInt32(&count, 1)
} }
}() }()
...@@ -37,7 +37,7 @@ func TestIntervalScheduler_BasicFunctionality(t *testing.T) { ...@@ -37,7 +37,7 @@ func TestIntervalScheduler_BasicFunctionality(t *testing.T) {
time.Sleep(time.Millisecond * 500) time.Sleep(time.Millisecond * 500)
if atomic.LoadInt32(&count) < 4 { if atomic.LoadInt32(&count) < 4 {
t.Errorf("Expected to run at least 4 times, ran %d times", count) t.Errorf("Expected to run at least four times, ran %d times", count)
} }
} }
...@@ -55,7 +55,7 @@ func TestIntervalScheduler_StopTicker(t *testing.T) { ...@@ -55,7 +55,7 @@ func TestIntervalScheduler_StopTicker(t *testing.T) {
jobChannel := make(chan interface{}) jobChannel := make(chan interface{})
go func() { go func() {
for _ = range jobChannel { for range jobChannel {
atomic.AddInt32(&count, 1) atomic.AddInt32(&count, 1)
} }
}() }()
...@@ -103,7 +103,7 @@ func TestCronScheduler_BasicFunctionality(t *testing.T) { ...@@ -103,7 +103,7 @@ func TestCronScheduler_BasicFunctionality(t *testing.T) {
jobChannel := make(chan interface{}) jobChannel := make(chan interface{})
go func() { go func() {
for _ = range jobChannel { for range jobChannel {
atomic.AddInt32(&count, 1) atomic.AddInt32(&count, 1)
} }
}() }()
...@@ -134,7 +134,7 @@ func TestCronScheduler_StopScheduler(t *testing.T) { ...@@ -134,7 +134,7 @@ func TestCronScheduler_StopScheduler(t *testing.T) {
jobChannel := make(chan interface{}) jobChannel := make(chan interface{})
go func() { go func() {
for _ = range jobChannel { for range jobChannel {
atomic.AddInt32(&count, 1) atomic.AddInt32(&count, 1)
} }
}() }()
...@@ -163,7 +163,7 @@ func TestDelayScheduler_BasicFunctionality(t *testing.T) { ...@@ -163,7 +163,7 @@ func TestDelayScheduler_BasicFunctionality(t *testing.T) {
jobChannel := make(chan interface{}) jobChannel := make(chan interface{})
go func() { go func() {
for _ = range jobChannel { for range jobChannel {
atomic.AddInt32(&count, 1) atomic.AddInt32(&count, 1)
...@@ -175,7 +175,7 @@ func TestDelayScheduler_BasicFunctionality(t *testing.T) { ...@@ -175,7 +175,7 @@ func TestDelayScheduler_BasicFunctionality(t *testing.T) {
time.Sleep(time.Millisecond * 200) time.Sleep(time.Millisecond * 200)
if atomic.LoadInt32(&count) != 1 { if atomic.LoadInt32(&count) != 1 {
t.Errorf("Expected to run 1 time, ran %d times", count) t.Errorf("Expected to run one time, ran %d times", count)
} }
} }
...@@ -192,7 +192,7 @@ func TestDelayScheduler_StopBeforeExecute(t *testing.T) { ...@@ -192,7 +192,7 @@ func TestDelayScheduler_StopBeforeExecute(t *testing.T) {
jobChannel := make(chan interface{}) jobChannel := make(chan interface{})
go func() { go func() {
for _ = range jobChannel { for range jobChannel {
atomic.AddInt32(&count, 1) atomic.AddInt32(&count, 1)
...@@ -220,7 +220,7 @@ func TestInstantScheduler_BasicFunctionality(t *testing.T) { ...@@ -220,7 +220,7 @@ func TestInstantScheduler_BasicFunctionality(t *testing.T) {
jobChannel := make(chan interface{}) jobChannel := make(chan interface{})
go func() { go func() {
for _ = range jobChannel { for range jobChannel {
atomic.AddInt32(&count, 1) atomic.AddInt32(&count, 1)
} }
}() }()
...@@ -254,7 +254,7 @@ func TestEventScheduler_BasicFunctionality(t *testing.T) { ...@@ -254,7 +254,7 @@ func TestEventScheduler_BasicFunctionality(t *testing.T) {
eventBus.Subscribe(QueueJob, jobChannel) eventBus.Subscribe(QueueJob, jobChannel)
go func() { go func() {
for _ = range jobChannel { for range jobChannel {
atomic.AddInt32(&count, 1) atomic.AddInt32(&count, 1)
} }
}() }()
...@@ -283,7 +283,7 @@ func TestTimeScheduler_BasicFunctionality(t *testing.T) { ...@@ -283,7 +283,7 @@ func TestTimeScheduler_BasicFunctionality(t *testing.T) {
jobChannel := make(chan interface{}) jobChannel := make(chan interface{})
go func() { go func() {
for _ = range jobChannel { for range jobChannel {
atomic.AddInt32(&count, 1) atomic.AddInt32(&count, 1)
} }
}() }()
...@@ -389,7 +389,7 @@ func TestInotifyScheduler_BasicFunctionality(t *testing.T) { ...@@ -389,7 +389,7 @@ func TestInotifyScheduler_BasicFunctionality(t *testing.T) {
jobChannel := make(chan interface{}) jobChannel := make(chan interface{})
go func() { go func() {
for _ = range jobChannel { for range jobChannel {
atomic.AddInt32(&count, 1) atomic.AddInt32(&count, 1)
} }
}() }()
...@@ -432,3 +432,19 @@ time: "2023-12-15T12:00:00Z" ...@@ -432,3 +432,19 @@ time: "2023-12-15T12:00:00Z"
assert.Equal(t, expectedInterval, sp.Interval, "Interval should be unmarshalled correctly") assert.Equal(t, expectedInterval, sp.Interval, "Interval should be unmarshalled correctly")
assert.Equal(t, &expectedTime, sp.Time, "Time should be unmarshalled correctly") assert.Equal(t, &expectedTime, sp.Time, "Time should be unmarshalled correctly")
} }
func TestUnmarshalSchedulerPersistenceIntervalYAML(t *testing.T) {
yamlData := `
type: Interval
interval: "1m1s"
`
var sp SchedulerPersistence
err := yaml.Unmarshal([]byte(yamlData), &sp)
assert.Nil(t, err, "Unmarshalling should not produce an error")
expectedInterval, _ := time.ParseDuration("1m1s")
assert.Equal(t, "Interval", sp.Type, "Type should be unmarshalled correctly")
assert.Equal(t, expectedInterval, sp.Interval, "Interval should be unmarshalled correctly")
}
...@@ -26,11 +26,11 @@ func StopResourceMonitoring() { ...@@ -26,11 +26,11 @@ func StopResourceMonitoring() {
} }
} }
func resetResourceStatsForTesting() { //func resetResourceStatsForTesting() {
if mainResourceStats != nil { // if mainResourceStats != nil {
StopResourceMonitoring() // StopResourceMonitoring()
} // }
} //}
func GetCpuUsage() float64 { func GetCpuUsage() float64 {
......
// Copyright 2023 schukai GmbH // Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0 // SPDX-License-Identifier: AGPL-3.0
// #nosec
package jobqueue package jobqueue
import ( import (
...@@ -241,7 +243,7 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel ...@@ -241,7 +243,7 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel
w.mu.Lock() w.mu.Lock()
defer w.mu.Unlock() defer w.mu.Unlock()
if w.manager != nil { if w.manager != nil {
w.manager.Sync(job) _ = w.manager.Sync(job)
} }
}() }()
......