Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • master
  • v1.0.0
  • v1.0.1
  • v1.1.0
  • v1.10.0
  • v1.10.1
  • v1.10.2
  • v1.11.0
  • v1.12.0
  • v1.12.1
  • v1.12.2
  • v1.12.3
  • v1.12.4
  • v1.12.5
  • v1.12.6
  • v1.12.7
  • v1.12.8
  • v1.13.0
  • v1.13.1
  • v1.13.2
  • v1.14.0
  • v1.15.0
  • v1.15.1
  • v1.15.10
  • v1.15.11
  • v1.15.12
  • v1.15.13
  • v1.15.14
  • v1.15.15
  • v1.15.16
  • v1.15.17
  • v1.15.2
  • v1.15.3
  • v1.15.4
  • v1.15.5
  • v1.15.6
  • v1.15.7
  • v1.15.8
  • v1.15.9
  • v1.16.0
  • v1.16.1
  • v1.17.0
  • v1.18.0
  • v1.18.1
  • v1.18.2
  • v1.19.0
  • v1.19.1
  • v1.19.2
  • v1.19.3
  • v1.19.4
  • v1.2.0
  • v1.20.0
  • v1.20.1
  • v1.20.2
  • v1.20.3
  • v1.21.0
  • v1.21.1
  • v1.22.0
  • v1.23.0
  • v1.23.1
  • v1.23.2
  • v1.3.0
  • v1.3.1
  • v1.3.2
  • v1.4.0
  • v1.5.0
  • v1.5.1
  • v1.6.0
  • v1.6.1
  • v1.7.0
  • v1.7.1
  • v1.7.2
  • v1.7.3
  • v1.8.0
  • v1.8.1
  • v1.9.0
76 results

Target

Select target project
  • oss/libraries/go/services/job-queues
1 result
Select Git revision
  • master
  • v1.0.0
  • v1.0.1
  • v1.1.0
  • v1.10.0
  • v1.10.1
  • v1.10.2
  • v1.11.0
  • v1.12.0
  • v1.12.1
  • v1.12.2
  • v1.12.3
  • v1.12.4
  • v1.12.5
  • v1.12.6
  • v1.12.7
  • v1.12.8
  • v1.13.0
  • v1.13.1
  • v1.13.2
  • v1.14.0
  • v1.15.0
  • v1.15.1
  • v1.15.10
  • v1.15.11
  • v1.15.12
  • v1.15.13
  • v1.15.14
  • v1.15.15
  • v1.15.16
  • v1.15.17
  • v1.15.2
  • v1.15.3
  • v1.15.4
  • v1.15.5
  • v1.15.6
  • v1.15.7
  • v1.15.8
  • v1.15.9
  • v1.16.0
  • v1.16.1
  • v1.17.0
  • v1.18.0
  • v1.18.1
  • v1.18.2
  • v1.19.0
  • v1.19.1
  • v1.19.2
  • v1.19.3
  • v1.19.4
  • v1.2.0
  • v1.20.0
  • v1.20.1
  • v1.20.2
  • v1.20.3
  • v1.21.0
  • v1.21.1
  • v1.22.0
  • v1.23.0
  • v1.23.1
  • v1.23.2
  • v1.3.0
  • v1.3.1
  • v1.3.2
  • v1.4.0
  • v1.5.0
  • v1.5.1
  • v1.6.0
  • v1.6.1
  • v1.7.0
  • v1.7.1
  • v1.7.2
  • v1.7.3
  • v1.8.0
  • v1.8.1
  • v1.9.0
76 results
Show changes
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
// InstantScheduler is a scheduler that schedules a job instantly
type InstantScheduler struct{}
func (s *InstantScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
}
return nil
}
func (s *InstantScheduler) GetType() string {
return "Instant"
}
func (s *InstantScheduler) IsAdHoc() bool {
return true
}
func (s *InstantScheduler) Cancel(id JobID) error {
return nil
}
func (s *InstantScheduler) CancelAll() error {
return nil
}
func (s *InstantScheduler) JobExists(id JobID) bool {
return false
}
func (s *InstantScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
}
}
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
"encoding/json"
"fmt"
"github.com/robfig/cron/v3"
"github.com/fsnotify/fsnotify"
"time"
)
......@@ -30,6 +32,8 @@ type SchedulerPersistence struct {
Event EventName `yaml:"event,omitempty" json:"event,omitempty" gorm:"column:event"`
Time *time.Time `yaml:"time,omitempty" json:"time,omitempty" gorm:"column:time"`
Executed bool `yaml:"executed,omitempty" json:"executed,omitempty" gorm:"column:executed"`
Path string `yaml:"path,omitempty" json:"path,omitempty" gorm:"column:path"`
EventFlags fsnotify.Op `yaml:"eventFlags,omitempty" json:"eventFlags,omitempty" gorm:"column:eventFlags"`
}
// UnmarshalJSON implements the json.Unmarshaler interface
......@@ -64,529 +68,3 @@ func (sp *SchedulerPersistence) UnmarshalJSON(data []byte) error {
return nil
}
// IntervalScheduler is a scheduler that schedules a job at a fixed interval
type IntervalScheduler struct {
Interval time.Duration
jobs map[JobID]StopChan
}
func (s *IntervalScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
if s.Interval <= 0 {
return fmt.Errorf("invalid interval: %v", s.Interval)
}
if s.jobs == nil {
s.jobs = make(map[JobID]StopChan)
}
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
stopChan := make(StopChan)
s.jobs[id] = stopChan
ticker := time.NewTicker(s.Interval)
go func() {
for {
select {
case <-ticker.C:
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
}
case <-stopChan:
ticker.Stop()
return
}
}
}()
return nil
}
func (s *IntervalScheduler) GetType() string {
return "Interval"
}
func (s *IntervalScheduler) IsAdHoc() bool {
return false
}
func (s *IntervalScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if stopChan, ok := s.jobs[id]; ok {
select {
case stopChan <- true:
default:
}
delete(s.jobs, id)
}
return nil
}
func (s *IntervalScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, stopChan := range s.jobs {
select {
case stopChan <- true:
default:
}
}
s.jobs = nil
return nil
}
func (s *IntervalScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
func (s *IntervalScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
Interval: s.Interval,
}
}
// CronScheduler is a scheduler that uses the cron library to schedule jobs
type CronScheduler struct {
cron *cron.Cron
Spec string
jobs map[JobID]cron.EntryID
}
func (s *CronScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
var err error
if s.cron == nil {
return ErrCronNotInitialized
}
if s.jobs == nil {
s.jobs = make(map[JobID]cron.EntryID)
}
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
entryId, err := s.cron.AddFunc(s.Spec, func() {
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
}
})
s.jobs[id] = entryId
if err != nil {
return err
}
return nil
}
func (s *CronScheduler) GetType() string {
return "Cron"
}
func (s *CronScheduler) IsAdHoc() bool {
return false
}
func (s *CronScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if entryId, ok := s.jobs[id]; ok {
s.cron.Remove(entryId)
}
return nil
}
func (s *CronScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, entryId := range s.jobs {
s.cron.Remove(entryId)
}
s.jobs = nil
return nil
}
func (s *CronScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
func (s *CronScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
Spec: s.Spec,
}
}
// DelayScheduler is a scheduler that schedules a job after a delay
type DelayScheduler struct {
Delay time.Duration
jobs map[JobID]StopChan
}
func (s *DelayScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
if s.jobs == nil {
s.jobs = make(map[JobID]StopChan)
}
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
stopChan := make(StopChan)
s.jobs[id] = stopChan
timer := time.NewTimer(s.Delay)
go func() {
select {
case <-timer.C:
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
}
case <-stopChan:
timer.Stop()
return
}
}()
return nil
}
func (s *DelayScheduler) GetType() string {
return "Delay"
}
func (s *DelayScheduler) IsAdHoc() bool {
return true
}
func (s *DelayScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if stopChan, ok := s.jobs[id]; ok {
select {
case stopChan <- true:
default:
}
delete(s.jobs, id)
}
return nil
}
func (s *DelayScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, stopChan := range s.jobs {
select {
case stopChan <- true:
default:
}
}
s.jobs = nil
return nil
}
func (s *DelayScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
func (s *DelayScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
Delay: s.Delay,
}
}
// TimeScheduler is a scheduler that schedules at a specific time
type TimeScheduler struct {
Time time.Time
jobs map[JobID]StopChan
executed bool
}
func (s *TimeScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
if s.executed {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, job.GetID())
}
if s.Time.Before(time.Now()) {
return fmt.Errorf("%w: scheduled time is in the past", ErrInvalidTime)
}
if s.jobs == nil {
s.jobs = make(map[JobID]StopChan)
}
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
stopChan := make(StopChan)
s.jobs[id] = stopChan
timer := time.NewTimer(s.Time.Sub(time.Now()))
go func() {
select {
case <-timer.C:
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
s.executed = true
} else {
timer.Stop()
stopChan <- true
}
case <-stopChan:
timer.Stop()
return
}
}()
return nil
}
func (s *TimeScheduler) GetType() string {
return "Time"
}
func (s *TimeScheduler) IsAdHoc() bool {
return false
}
func (s *TimeScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if stopChan, ok := s.jobs[id]; ok {
select {
case stopChan <- true:
default:
}
delete(s.jobs, id)
}
return nil
}
func (s *TimeScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, stopChan := range s.jobs {
select {
case stopChan <- true:
default:
}
}
s.jobs = nil
return nil
}
func (s *TimeScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
func (s *TimeScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
Time: &s.Time,
Executed: s.executed,
}
}
// EventScheduler is a scheduler that schedules a job when an event is received
type EventScheduler struct {
Event EventName
EventBus *EventBus
jobs map[JobID]StopChan
}
func (s *EventScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
ch := make(chan interface{})
if s.EventBus != nil {
s.EventBus.Subscribe(s.Event, ch)
} else {
eventBus.Subscribe(s.Event, ch)
}
if s.jobs == nil {
s.jobs = make(map[JobID]StopChan)
}
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
stopChan := make(StopChan)
s.jobs[id] = stopChan
go func() {
for {
select {
case <-ch:
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
}
case <-stopChan:
eventBus.Unsubscribe(s.Event, ch)
return
}
}
}()
return nil
}
func (s *EventScheduler) GetType() string {
return "Event"
}
func (s *EventScheduler) IsAdHoc() bool {
return false
}
func (s *EventScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if stopChan, ok := s.jobs[id]; ok {
select {
case stopChan <- true:
default:
}
delete(s.jobs, id)
}
return nil
}
func (s *EventScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, stopChan := range s.jobs {
select {
case stopChan <- true:
default:
}
}
s.jobs = nil
return nil
}
func (s *EventScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
func (s *EventScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
Event: s.Event,
}
}
// InstantScheduler is a scheduler that schedules a job instantly
type InstantScheduler struct{}
func (s *InstantScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
}
return nil
}
func (s *InstantScheduler) GetType() string {
return "Instant"
}
func (s *InstantScheduler) IsAdHoc() bool {
return true
}
func (s *InstantScheduler) Cancel(id JobID) error {
return nil
}
func (s *InstantScheduler) CancelAll() error {
return nil
}
func (s *InstantScheduler) JobExists(id JobID) bool {
return false
}
func (s *InstantScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
}
}
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
"encoding/json"
"github.com/fsnotify/fsnotify"
"github.com/robfig/cron/v3"
"github.com/stretchr/testify/assert"
"os"
"sync/atomic"
"testing"
"time"
......@@ -366,3 +371,43 @@ func parseTimeForTesting(t *testing.T, value string) *time.Time {
t.Fatalf("Failed to parse time '%s' in any known format", value)
return nil
}
func TestInotifyScheduler_BasicFunctionality(t *testing.T) {
var count int32
eventBus := NewEventBus()
tmpPath := t.TempDir()
inotifyScheduler := InotifyScheduler{Path: tmpPath, EventFlags: fsnotify.Create | fsnotify.Write | fsnotify.Remove}
job := NewJob[DummyResult]("test-job", &DummyRunnable{})
genericJob := GenericJob(job)
_ = inotifyScheduler.Schedule(genericJob, eventBus)
jobChannel := make(chan interface{})
go func() {
for _ = range jobChannel {
atomic.AddInt32(&count, 1)
}
}()
eventBus.Subscribe(QueueJob, jobChannel)
time.Sleep(time.Millisecond * 100)
tmpFile := tmpPath + "/test.txt"
_, err := os.Create(tmpFile)
assert.NoError(t, err)
time.Sleep(time.Millisecond * 100)
err = os.Remove(tmpFile)
assert.NoError(t, err)
time.Sleep(time.Millisecond * 100)
if atomic.LoadInt32(&count) != 2 {
t.Errorf("Expected to run 2 times, ran %d times", count)
}
}
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
......
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
......
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import "time"
......
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import "container/heap"
......
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
......
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
// safeClose closes the given channel and returns an error if the channel is already closed
......
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
"context"
"errors"
"fmt"
"github.com/google/uuid"
"sync"
......@@ -33,6 +37,29 @@ type Worker interface {
SetManager(manager *Manager)
}
type Statistic struct {
TotalThreads int
ActiveThreads int
JobsAssigned int
JobsCompleted int
FailedJobs int
TotalExecutionTime time.Duration
}
func (s *Statistic) AverageExecutionTime() time.Duration {
if s.JobsCompleted == 0 {
return 0
}
return s.TotalExecutionTime / time.Duration(s.JobsCompleted)
}
func (s *Statistic) UtilizationRate() float64 {
if s.TotalThreads == 0 {
return 0
}
return float64(s.ActiveThreads) / float64(s.TotalThreads) * 100
}
// GenericWorker is a generic worker
type GenericWorker struct {
ID WorkerID
......@@ -47,8 +74,10 @@ type LocalWorker struct {
cancelChans []chan bool
maxJobs int
mu sync.Mutex
statisticMu sync.Mutex
wg sync.WaitGroup
manager *Manager
statistic Statistic
}
// GetID returns the ID of the worker
......@@ -58,7 +87,7 @@ func (w *GenericWorker) GetID() WorkerID {
// NewLocalWorker creates a new local worker
func NewLocalWorker(maxJobs int) *LocalWorker {
w := &LocalWorker{maxJobs: maxJobs}
w := &LocalWorker{maxJobs: maxJobs, statistic: Statistic{TotalThreads: maxJobs}}
w.jobChannels = make([]chan GenericJob, maxJobs)
w.stopChans = make([]chan bool, maxJobs)
w.cancelChans = make([]chan bool, maxJobs)
......@@ -94,6 +123,27 @@ func (w *LocalWorker) Start() error {
return nil
}
// UpdateStatisticExtended updates the worker's statistics with job execution details
func (w *LocalWorker) UpdateStatisticExtended(jobDuration time.Duration, jobFailed bool) {
w.statisticMu.Lock()
defer w.statisticMu.Unlock()
if jobFailed {
w.statistic.FailedJobs++
} else {
w.statistic.TotalExecutionTime += jobDuration
w.statistic.JobsCompleted++
}
}
// GetStatistic returns the current statistics of the worker
func (w *LocalWorker) GetStatistic() Statistic {
w.statisticMu.Lock()
defer w.statisticMu.Unlock()
return w.statistic
}
func (w *LocalWorker) SetManager(manager *Manager) {
w.mu.Lock()
defer w.mu.Unlock()
......@@ -136,10 +186,18 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel
select {
case job := <-jobChannel:
w.statisticMu.Lock()
w.statistic.JobsAssigned++
w.statistic.ActiveThreads++
w.statisticMu.Unlock()
ctx, cancel := context.WithCancel(context.Background())
retries := job.GetMaxRetries()
retryDelay := job.GetRetryDelay()
startTime := time.Now()
if retries == 0 {
retries = 1
}
......@@ -159,12 +217,18 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel
}
_, err = job.Execute(ctx)
jobFailed := false
if err != nil {
jobFailed = true
}
w.UpdateStatisticExtended(time.Since(startTime), jobFailed)
if cancel != nil {
cancel()
}
if err == nil || ctx.Err() == context.Canceled {
if err == nil || errors.Is(ctx.Err(), context.Canceled) {
break
}
......@@ -186,6 +250,10 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel
}
}
w.statisticMu.Lock()
w.statistic.ActiveThreads--
w.statisticMu.Unlock()
case <-stopChan:
stopFlag = true
break
......
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
......@@ -9,6 +12,7 @@ import (
type DummyJob struct {
id JobID
sleepTime time.Duration
}
func (j DummyJob) GetID() JobID {
......@@ -48,7 +52,9 @@ func (j DummyJob) GetTimeout() time.Duration {
}
func (j DummyJob) Execute(_ context.Context) (RunGenericResult, error) {
time.Sleep(100 * time.Millisecond)
if j.sleepTime > 0 {
time.Sleep(j.sleepTime)
}
return nil, nil
}
......@@ -231,3 +237,46 @@ func TestCancelJob(t *testing.T) {
t.Errorf("Stop() returned error: %v", err)
}
}
func TestLocalWorker_Statistics(t *testing.T) {
maxJobs := 3
worker := NewLocalWorker(maxJobs)
// Start the worker
if err := worker.Start(); err != nil {
t.Fatalf("failed to start worker: %v", err)
}
// Assign a job with a longer execution time
mockJob1 := DummyJob{id: JobID("1"), sleepTime: 30 * time.Millisecond}
if err := worker.AssignJob(mockJob1); err != nil {
t.Fatalf("failed to assign job: %v", err)
}
// Assign a job with a longer execution time
mockJob2 := DummyJob{id: JobID("2"), sleepTime: 10 * time.Millisecond}
if err := worker.AssignJob(mockJob2); err != nil {
t.Fatalf("failed to assign job: %v", err)
}
// Loop to check statistics every 10ms
timeout := time.After(100 * time.Millisecond)
tick := time.Tick(1 * time.Nanosecond)
for {
select {
case <-timeout:
t.Fatal("Test timed out")
case <-tick:
stats := worker.GetStatistic()
if stats.JobsCompleted >= 2 {
goto TEST_SUCCESS
}
}
}
TEST_SUCCESS:
// Stop the worker
if err := worker.Stop(); err != nil {
t.Fatalf("failed to stop worker: %v", err)
}
}