Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • master
  • v1.0.0
  • v1.0.1
  • v1.1.0
  • v1.10.0
  • v1.10.1
  • v1.10.2
  • v1.11.0
  • v1.12.0
  • v1.12.1
  • v1.12.2
  • v1.12.3
  • v1.12.4
  • v1.12.5
  • v1.12.6
  • v1.12.7
  • v1.12.8
  • v1.13.0
  • v1.13.1
  • v1.13.2
  • v1.14.0
  • v1.15.0
  • v1.15.1
  • v1.15.10
  • v1.15.11
  • v1.15.12
  • v1.15.13
  • v1.15.14
  • v1.15.15
  • v1.15.16
  • v1.15.17
  • v1.15.2
  • v1.15.3
  • v1.15.4
  • v1.15.5
  • v1.15.6
  • v1.15.7
  • v1.15.8
  • v1.15.9
  • v1.16.0
  • v1.16.1
  • v1.17.0
  • v1.18.0
  • v1.18.1
  • v1.18.2
  • v1.19.0
  • v1.19.1
  • v1.19.2
  • v1.19.3
  • v1.19.4
  • v1.2.0
  • v1.20.0
  • v1.20.1
  • v1.20.2
  • v1.20.3
  • v1.21.0
  • v1.21.1
  • v1.22.0
  • v1.23.0
  • v1.23.1
  • v1.23.2
  • v1.3.0
  • v1.3.1
  • v1.3.2
  • v1.4.0
  • v1.5.0
  • v1.5.1
  • v1.6.0
  • v1.6.1
  • v1.7.0
  • v1.7.1
  • v1.7.2
  • v1.7.3
  • v1.8.0
  • v1.8.1
  • v1.9.0
76 results

Target

Select target project
  • oss/libraries/go/services/job-queues
1 result
Select Git revision
  • master
  • v1.0.0
  • v1.0.1
  • v1.1.0
  • v1.10.0
  • v1.10.1
  • v1.10.2
  • v1.11.0
  • v1.12.0
  • v1.12.1
  • v1.12.2
  • v1.12.3
  • v1.12.4
  • v1.12.5
  • v1.12.6
  • v1.12.7
  • v1.12.8
  • v1.13.0
  • v1.13.1
  • v1.13.2
  • v1.14.0
  • v1.15.0
  • v1.15.1
  • v1.15.10
  • v1.15.11
  • v1.15.12
  • v1.15.13
  • v1.15.14
  • v1.15.15
  • v1.15.16
  • v1.15.17
  • v1.15.2
  • v1.15.3
  • v1.15.4
  • v1.15.5
  • v1.15.6
  • v1.15.7
  • v1.15.8
  • v1.15.9
  • v1.16.0
  • v1.16.1
  • v1.17.0
  • v1.18.0
  • v1.18.1
  • v1.18.2
  • v1.19.0
  • v1.19.1
  • v1.19.2
  • v1.19.3
  • v1.19.4
  • v1.2.0
  • v1.20.0
  • v1.20.1
  • v1.20.2
  • v1.20.3
  • v1.21.0
  • v1.21.1
  • v1.22.0
  • v1.23.0
  • v1.23.1
  • v1.23.2
  • v1.3.0
  • v1.3.1
  • v1.3.2
  • v1.4.0
  • v1.5.0
  • v1.5.1
  • v1.6.0
  • v1.6.1
  • v1.7.0
  • v1.7.1
  • v1.7.2
  • v1.7.3
  • v1.8.0
  • v1.8.1
  • v1.9.0
76 results
Show changes
{
pkgs,
lib,
...
}: let
common = pkgs.callPackage ./common.nix {};
in
pkgs.writeShellScriptBin "run-extended-tests" ''
${common}
echo_header "Running extended tests"
download_test_images
setup_go_env
cd_working_dir
echo_step "Running tests"
if ! ${pkgs.go}/bin/go test -tags "runOnTask" -cover ./...
then
echo_fail "Failed to run tests"
exit 1
fi
echo_ok "All tests passed"
echo_step "Running benchmarks"
if ! ${pkgs.go}/bin/go test -tags "runOnTask,bench" -bench ./...
then
echo_fail "Failed to run benchmarks"
exit 1
fi
echo_ok "Benchmarks passed"
echo_step "Running race tests"
if ! ${pkgs.go}/bin/go test -tags "runOnTask,race" -race ./...
then
echo_fail "Failed to run race tests"
exit 1
fi
echo_ok "race tests passed"
echo_step "Running fuzz tests"
if ! ${pkgs.go}/bin/go test -tags "runOnTask,fuzz" -fuzz ./...
then
echo_fail "Failed to run fuzz tests"
exit 1
fi
echo_ok "Fuzz tests passed"
echo_ok "All tests passed"
''
{
pkgs,
lib,
...
}: let
common = pkgs.callPackage ./common.nix {};
in
pkgs.writeShellScriptBin "run-tests" ''
${common}
echo_header "Running tests"
selection=$(${pkgs.gum}/bin/gum choose "run all tests" "run specific test" "Cancel")
if [[ "$selection" == "Cancel" ]]; then
echo_ok "Exiting."
exit 0
fi
download_test_images
setup_go_env
cd_working_dir
if [[ "$selection" == "run all tests" ]]; then
echo_ok "Running all tests"
if ! ${pkgs.go}/bin/go test -tags runOnTask -v -failfast ./...
then
echo_fail "ERROR: Tests failed, check your Go!"
exit 1
fi
echo_ok "All tests passed!"
exit 0
fi
test_files=$(${pkgs.findutils}/bin/find . -name "*_test.go")
test_names=""
for file in $test_files; do
names=$(${pkgs.gnugrep}/bin/grep -oP 'func (Test\w+)' $file | ${pkgs.gawk}/bin/gawk '{print $2}')
test_names+="$names "
done
if [[ -z "$test_names" ]]; then
echo_fail "No tests found!"
exit 1
fi
selected_tests=$(echo "$test_names" | ${pkgs.coreutils}/bin/tr ' ' '\n' | ${pkgs.gum}/bin/gum filter --no-limit )
if [[ -z "$selected_tests" ]]; then
echo_ok "No tests selected, exiting."
exit 0
fi
if ! ${pkgs.go}/bin/go test -tags runOnTask -run "$(echo $selected_tests)"
then
echo_fail "ERROR: Tests failed, check your Go!"
exit 1
fi
echo_ok "All tests passed!"
''
......@@ -86,19 +86,15 @@ func (q *Queue) Enqueue(job GenericJob) error {
// Run topological sort on jobs in the ready queue
readyJobList := []GenericJob{}
for _, readyJob := range q.readyQueue {
readyJobList = append(readyJobList, readyJob)
}
readyJobList = append(readyJobList, q.readyQueue...)
currentReadyJobIDs := make(map[JobID]struct{})
for _, job := range readyJobList {
currentReadyJobIDs[job.GetID()] = struct{}{}
}
fullJobList := []GenericJob{}
for _, job := range readyJobList {
fullJobList = append(fullJobList, job)
}
var fullJobList []GenericJob
fullJobList = append(fullJobList, readyJobList...)
for i := range q.processedJobs {
id := q.processedJobs[i].ID
......@@ -154,7 +150,7 @@ func (q *Queue) ClearProcessedJobs() {
}
}
func (q *Queue) isDependency(id JobID) bool {
func (q *Queue) IsDependency(id JobID) bool {
for _, deps := range q.pendingDependencies {
for _, depID := range deps {
if depID == id {
......@@ -182,7 +178,6 @@ func (q *Queue) Dequeue() (GenericJob, error) {
ProcessedTime: time.Now(),
ID: job.GetID(),
})
return job, nil
}
......@@ -194,6 +189,5 @@ func removeJobID(deps []JobID, id JobID) []JobID {
return deps[:len(deps)-1]
}
}
return deps
}
......@@ -43,7 +43,7 @@ func (f *FileOperationResult) GetResult() string {
}
func (f *FileOperationResult) GetError() (string, int) {
if f.Success == false {
if !f.Success {
return "FileOperationResult failed", 1
}
return "", 0
......
......@@ -78,7 +78,8 @@ func startTestSMTPDockerImageAndContainer(t *testing.T, port string, ctx context
Timeout: &timeout,
Signal: "SIGKILL",
}
newCtx, _ := context.WithTimeout(context.Background(), 60*time.Second)
newCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
if err := cli.ContainerStop(newCtx, resp.ID, stopOptions); err != nil {
t.Errorf("ContainerStop returned error: %v", err)
}
......
......@@ -78,7 +78,10 @@ func startSFTPTestDockerImageAndContainer(t *testing.T, host string, port string
Timeout: &timeout,
Signal: "SIGKILL",
}
newCtx, _ := context.WithTimeout(context.Background(), 60*time.Second)
newCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
if err := cli.ContainerStop(newCtx, resp.ID, stopOptions); err != nil {
t.Errorf("ContainerStop returned error: %v", err)
}
......
......@@ -30,6 +30,5 @@ func (r RunResult[T]) GetStatus() ResultStatus {
type Runnable[T any] interface {
Run(ctx context.Context) (RunResult[T], error)
GetType() string
GetPersistence() RunnableImport
}
......@@ -4,6 +4,7 @@
package jobqueue
import (
"errors"
"fmt"
"time"
)
......@@ -16,12 +17,17 @@ type TimeScheduler struct {
}
func (s *TimeScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
if job == nil {
return ErrParameterIsNil
}
if s.executed {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, job.GetID())
}
if s.Time.Before(time.Now()) {
return fmt.Errorf("%w: scheduled time is in the past", ErrInvalidTime)
return errors.Join(ErrScheduleTimeIsInThePast, ErrInvalidTime, fmt.Errorf("time: %s", s.Time))
}
if s.jobs == nil {
......@@ -29,6 +35,10 @@ func (s *TimeScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
}
id := job.GetID()
if id == "" {
return ErrJobIDEmpty
}
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
......@@ -36,7 +46,7 @@ func (s *TimeScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
stopChan := make(StopChan)
s.jobs[id] = stopChan
timer := time.NewTimer(s.Time.Sub(time.Now()))
timer := time.NewTimer(time.Until(s.Time))
go func() {
select {
......@@ -101,7 +111,6 @@ func (s *TimeScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
......
// Copyright 2024 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
"gotest.tools/v3/assert"
"testing"
"time"
)
func TestStructure(t *testing.T) {
now := time.Now()
oneHourBefore := now.Add(-1 * time.Hour)
onHourAfter := now.Add(1 * time.Hour)
ts := TimeScheduler{
Time: oneHourBefore,
}
err := ts.Schedule(nil, nil)
assert.ErrorIs(t, err, ErrParameterIsNil)
job := &MockGenericJob{}
err = ts.Schedule(job, nil)
assert.ErrorIs(t, err, ErrScheduleTimeIsInThePast)
ts.Time = onHourAfter
err = ts.Schedule(job, nil)
assert.ErrorIs(t, err, ErrJobIDEmpty)
job.ID = "job-id"
err = ts.Schedule(job, nil)
assert.NilError(t, err)
time.Sleep(2 * time.Second)
err = ts.Cancel("job-id")
assert.NilError(t, err)
//time.Sleep(200 * time.Second)
}
......@@ -62,7 +62,7 @@ func (s *InotifyScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
}
}
case _, _ = <-watcher.Errors:
case <-watcher.Errors:
case <-stopChan:
_ = watcher.Close()
......
......@@ -17,24 +17,22 @@ type Scheduler interface {
Cancel(id JobID) error
CancelAll() error
JobExists(id JobID) bool
GetType() string
IsAdHoc() bool
GetPersistence() SchedulerPersistence
}
type SchedulerPersistence struct {
Type string `yaml:"type" json:"type" gorm:"column:type"`
Time *time.Time `yaml:"time,omitempty" json:"time,omitempty" gorm:"column:time"`
Interval time.Duration `yaml:"interval,omitempty" json:"interval,omitempty" gorm:"column:interval"`
Spec string `yaml:"spec,omitempty" json:"spec,omitempty" gorm:"column:spec"`
Delay time.Duration `yaml:"delay,omitempty" json:"delay,omitempty" gorm:"column:delay"`
EventFlags fsnotify.Op `yaml:"eventFlags,omitempty" json:"eventFlags,omitempty" gorm:"column:eventFlags"`
Type string `yaml:"type" json:"type" gorm:"column:type"`
Spec string `yaml:"spec,omitempty" json:"spec,omitempty" gorm:"column:spec"`
Event EventName `yaml:"event,omitempty" json:"event,omitempty" gorm:"column:event"`
Time *time.Time `yaml:"time,omitempty" json:"time,omitempty" gorm:"column:time"`
Executed bool `yaml:"executed,omitempty" json:"executed,omitempty" gorm:"column:executed"`
Path string `yaml:"path,omitempty" json:"path,omitempty" gorm:"column:path"`
EventFlags fsnotify.Op `yaml:"eventFlags,omitempty" json:"eventFlags,omitempty" gorm:"column:eventFlags"`
}
type scheduleImportStruct struct {
......@@ -112,15 +110,11 @@ func (sp *SchedulerPersistence) parseAndAssignFields(aux scheduleImportStruct) e
// UnmarshalJSON implements the json.Unmarshaler interface
func (sp *SchedulerPersistence) UnmarshalJSON(data []byte) error {
var aux scheduleImportStruct
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
return sp.parseAndAssignFields(aux)
}
func (sp *SchedulerPersistence) UnmarshalYAML(unmarshal func(interface{}) error) error {
......@@ -128,7 +122,5 @@ func (sp *SchedulerPersistence) UnmarshalYAML(unmarshal func(interface{}) error)
if err := unmarshal(&aux); err != nil {
return err
}
return sp.parseAndAssignFields(aux)
}
......@@ -28,7 +28,7 @@ func TestIntervalScheduler_BasicFunctionality(t *testing.T) {
jobChannel := make(chan interface{})
go func() {
for _ = range jobChannel {
for range jobChannel {
atomic.AddInt32(&count, 1)
}
}()
......@@ -37,7 +37,7 @@ func TestIntervalScheduler_BasicFunctionality(t *testing.T) {
time.Sleep(time.Millisecond * 500)
if atomic.LoadInt32(&count) < 4 {
t.Errorf("Expected to run at least 4 times, ran %d times", count)
t.Errorf("Expected to run at least four times, ran %d times", count)
}
}
......@@ -55,7 +55,7 @@ func TestIntervalScheduler_StopTicker(t *testing.T) {
jobChannel := make(chan interface{})
go func() {
for _ = range jobChannel {
for range jobChannel {
atomic.AddInt32(&count, 1)
}
}()
......@@ -103,7 +103,7 @@ func TestCronScheduler_BasicFunctionality(t *testing.T) {
jobChannel := make(chan interface{})
go func() {
for _ = range jobChannel {
for range jobChannel {
atomic.AddInt32(&count, 1)
}
}()
......@@ -134,7 +134,7 @@ func TestCronScheduler_StopScheduler(t *testing.T) {
jobChannel := make(chan interface{})
go func() {
for _ = range jobChannel {
for range jobChannel {
atomic.AddInt32(&count, 1)
}
}()
......@@ -163,7 +163,7 @@ func TestDelayScheduler_BasicFunctionality(t *testing.T) {
jobChannel := make(chan interface{})
go func() {
for _ = range jobChannel {
for range jobChannel {
atomic.AddInt32(&count, 1)
......@@ -175,7 +175,7 @@ func TestDelayScheduler_BasicFunctionality(t *testing.T) {
time.Sleep(time.Millisecond * 200)
if atomic.LoadInt32(&count) != 1 {
t.Errorf("Expected to run 1 time, ran %d times", count)
t.Errorf("Expected to run one time, ran %d times", count)
}
}
......@@ -192,7 +192,7 @@ func TestDelayScheduler_StopBeforeExecute(t *testing.T) {
jobChannel := make(chan interface{})
go func() {
for _ = range jobChannel {
for range jobChannel {
atomic.AddInt32(&count, 1)
......@@ -220,7 +220,7 @@ func TestInstantScheduler_BasicFunctionality(t *testing.T) {
jobChannel := make(chan interface{})
go func() {
for _ = range jobChannel {
for range jobChannel {
atomic.AddInt32(&count, 1)
}
}()
......@@ -254,7 +254,7 @@ func TestEventScheduler_BasicFunctionality(t *testing.T) {
eventBus.Subscribe(QueueJob, jobChannel)
go func() {
for _ = range jobChannel {
for range jobChannel {
atomic.AddInt32(&count, 1)
}
}()
......@@ -283,7 +283,7 @@ func TestTimeScheduler_BasicFunctionality(t *testing.T) {
jobChannel := make(chan interface{})
go func() {
for _ = range jobChannel {
for range jobChannel {
atomic.AddInt32(&count, 1)
}
}()
......@@ -389,7 +389,7 @@ func TestInotifyScheduler_BasicFunctionality(t *testing.T) {
jobChannel := make(chan interface{})
go func() {
for _ = range jobChannel {
for range jobChannel {
atomic.AddInt32(&count, 1)
}
}()
......@@ -432,3 +432,19 @@ time: "2023-12-15T12:00:00Z"
assert.Equal(t, expectedInterval, sp.Interval, "Interval should be unmarshalled correctly")
assert.Equal(t, &expectedTime, sp.Time, "Time should be unmarshalled correctly")
}
func TestUnmarshalSchedulerPersistenceIntervalYAML(t *testing.T) {
yamlData := `
type: Interval
interval: "1m1s"
`
var sp SchedulerPersistence
err := yaml.Unmarshal([]byte(yamlData), &sp)
assert.Nil(t, err, "Unmarshalling should not produce an error")
expectedInterval, _ := time.ParseDuration("1m1s")
assert.Equal(t, "Interval", sp.Type, "Type should be unmarshalled correctly")
assert.Equal(t, expectedInterval, sp.Interval, "Interval should be unmarshalled correctly")
}
......@@ -26,11 +26,11 @@ func StopResourceMonitoring() {
}
}
func resetResourceStatsForTesting() {
if mainResourceStats != nil {
StopResourceMonitoring()
}
}
//func resetResourceStatsForTesting() {
// if mainResourceStats != nil {
// StopResourceMonitoring()
// }
//}
func GetCpuUsage() float64 {
......
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
// #nosec
package jobqueue
import (
......@@ -241,7 +243,7 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel
w.mu.Lock()
defer w.mu.Unlock()
if w.manager != nil {
w.manager.Sync(job)
_ = w.manager.Sync(job)
}
}()
......