Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • oss/libraries/go/services/job-queues
1 result
Select Git revision
Show changes
Commits on Source (2)
......@@ -25,23 +25,7 @@ func TestWriteToDB1(t *testing.T) {
manager := &Manager{database: gormDB}
// Starte den DBSaver
promise := CreateAndStartJobSyncer(manager)
ready := make(chan struct{})
var saver *JobSyncer
Then[*JobSyncer, *JobSyncer](promise, func(value *JobSyncer) (*JobSyncer, error) {
close(ready)
saver = value
return value, nil
}, func(e error) error {
close(ready)
Error("Error while starting db saver", "error", err)
return nil
})
<-ready
saver := NewJobSyncer(manager)
runner := &CounterRunnable{}
job := NewJob[CounterResult]("job1", runner)
......@@ -53,13 +37,10 @@ func TestWriteToDB1(t *testing.T) {
job.scheduler = scheduler
saver.AddJob(job)
saver.AddJob(job)
time.Sleep(1 * time.Second)
saver.Sync(job)
saver.Sync(job)
err = saver.Stop()
assert.Nil(t, err)
manager.WaitSync()
// check if stats are in database
var stats JobPersistence
......
......@@ -24,28 +24,11 @@ func TestWriteToDB2(t *testing.T) {
manager := &Manager{database: gormDB}
// Starte den DBSaver
p := CreateAndStartJobSyncer(manager)
ready := make(chan struct{})
//var saver *JobSyncer
Then[*JobSyncer, *JobSyncer](p, func(value *JobSyncer) (*JobSyncer, error) {
close(ready)
// saver = value
return value, nil
}, func(e error) error {
close(ready)
Error("Error while starting db saver", "error", err)
return nil
})
NewJobSyncer(manager)
db := gormDB
<-ready
var wg sync.WaitGroup
wg.Add(1)
......@@ -99,7 +82,7 @@ func TestWriteToDB2(t *testing.T) {
time.Sleep(1 * time.Second)
mgr.jobSyncer.AddJob(job)
mgr.jobSyncer.Sync(job)
runtime.Gosched()
time.Sleep(1 * time.Second)
......@@ -116,7 +99,7 @@ func TestWriteToDB2(t *testing.T) {
runtime.Gosched()
time.Sleep(1 * time.Second)
mgr.jobSyncer.AddJob(job)
mgr.jobSyncer.Sync(job)
time.Sleep(2 * time.Second)
err = mgr.CancelJobSchedule("job1")
......@@ -143,6 +126,6 @@ func TestWriteToDB2(t *testing.T) {
time.Sleep(1 * time.Second)
mgr.jobSyncer.AddJob(job)
mgr.jobSyncer.Sync(job)
}
......@@ -28,18 +28,28 @@ func TestWriteToDB4(t *testing.T) {
runner := &CounterRunnable{}
job := NewJob[CounterResult]("job3", runner)
id := job.GetID()
job.mu.Lock()
job.stats = JobStats{
JobID: job.GetID(),
JobID: id,
RunCount: 20,
SuccessCount: 30,
ErrorCount: 40,
}
job.mu.Unlock()
db.Logger = db.Logger.LogMode(4)
scheduler := &InstantScheduler{}
err = manager.ScheduleJob(job, scheduler)
assert.Nil(t, err)
time.Sleep(500 * time.Millisecond)
time.Sleep(200 * time.Millisecond)
assert.Equal(t, 21, job.GetStats().RunCount)
manager.WaitSync()
// check is stats are the values above
var tmpJob JobPersistence
......@@ -48,16 +58,18 @@ func TestWriteToDB4(t *testing.T) {
assert.Nil(t, err)
// Validate the fields
stats := tmpJob.GetStats()
assert.Equal(t, JobID("job3"), tmpJob.ID)
assert.Equal(t, 21, tmpJob.Stats.RunCount)
assert.Equal(t, 31, tmpJob.Stats.SuccessCount)
assert.Equal(t, 40, tmpJob.Stats.ErrorCount)
assert.Equal(t, 21, stats.RunCount)
assert.Equal(t, 31, stats.SuccessCount)
assert.Equal(t, 40, stats.ErrorCount)
// reset stats
err = manager.ResetJobStats(job.GetID())
assert.Nil(t, err)
time.Sleep(500 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
var tmpJob2 JobPersistence
// check is stats are the values above
......@@ -66,10 +78,13 @@ func TestWriteToDB4(t *testing.T) {
assert.Nil(t, err)
// Validate the fields
stats2 := tmpJob2.GetStats()
assert.Equal(t, JobID("job3"), tmpJob2.ID)
assert.Equal(t, 0, tmpJob2.Stats.RunCount)
assert.Equal(t, 0, tmpJob2.Stats.SuccessCount)
assert.Equal(t, 0, tmpJob2.Stats.ErrorCount)
assert.Equal(t, 0, stats2.RunCount)
assert.Equal(t, 0, stats2.SuccessCount)
assert.Equal(t, 0, stats2.ErrorCount)
err = manager.DeleteJob(job.GetID())
assert.Nil(t, err)
......
......@@ -7,7 +7,6 @@ import (
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"testing"
"time"
)
func TestWriteToDB5(t *testing.T) {
......@@ -36,19 +35,17 @@ func TestWriteToDB5(t *testing.T) {
sameIDJob := NewJob[CounterResult]("jobSameID", runner)
time.Sleep(500 * time.Millisecond)
manager.WaitSync()
// Trying to save a job with the same ID should do nothing
mgr.mu.Lock()
mgr.jobSyncer.AddJob(sameIDJob)
mgr.jobSyncer.Sync(sameIDJob)
mgr.mu.Unlock()
err = mgr.CancelJobSchedule("jobSameID")
assert.Nil(t, err)
mgr.mu.Lock()
err = mgr.jobSyncer.Stop()
mgr.mu.Unlock()
manager.WaitSync()
assert.Nil(t, err)
......
......@@ -48,15 +48,13 @@ func TestWriteToDB6(t *testing.T) {
assert.Nil(t, err)
mgr.mu.Lock()
mgr.jobSyncer.AddJob(job)
mgr.jobSyncer.Sync(job)
mgr.mu.Unlock()
time.Sleep(10 * time.Millisecond)
}
mgr.mu.Lock()
_ = mgr.jobSyncer.Stop()
mgr.mu.Unlock()
manager.WaitSync()
time.Sleep(2 * time.Second)
......
// Copyright 2024 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
......@@ -17,7 +20,7 @@ func TestCreateOrUpdateJob(t *testing.T) {
runner := &CounterRunnable{}
job := NewJob[CounterResult]("job1", runner)
assert.NoError(t, createOrUpdateJob(job, db))
assert.NoError(t, saveJob(job, db))
var jobPersistence JobPersistence
assert.NoError(t, db.First(&jobPersistence, "id = ?", job.GetID()).Error)
......@@ -26,7 +29,7 @@ func TestCreateOrUpdateJob(t *testing.T) {
assert.Equal(t, "", jobPersistence.Description)
assert.Equal(t, Priority(1), jobPersistence.Priority)
job.description = "Updated description"
assert.NoError(t, createOrUpdateJob(job, db))
assert.NoError(t, saveJob(job, db))
assert.NoError(t, db.First(&jobPersistence, "id = ?", job.GetID()).Error)
assert.Equal(t, "Updated description", jobPersistence.Description)
......
// Copyright 2024 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
......
// Copyright 2024 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
......
// Copyright 2024 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
"errors"
"gorm.io/gorm"
"strings"
"time"
)
func (s *JobSyncer) DeleteJob(job GenericJob) error {
......@@ -15,19 +19,15 @@ func (s *JobSyncer) DeleteJob(job GenericJob) error {
return s.manager.database.Transaction(func(tx *gorm.DB) error {
permJob := job.GetPersistence()
if err := tx.Where("job_id = ?", permJob.GetID()).Delete(&JobLog{}).Error; err != nil {
return err
}
if err := tx.Where("job_id = ?", permJob.GetID()).Delete(&JobStats{}).Error; err != nil {
return err
}
if err := tx.Delete(&permJob).Error; err != nil {
return err
}
return nil
})
}
......@@ -59,105 +59,173 @@ func (s *JobSyncer) ResetStats(job GenericJob) error {
}
job.ResetStats()
stats := job.GetStats()
return s.manager.database.Transaction(func(tx *gorm.DB) error {
return tx.Model(&JobStats{}).Where("job_id = ?", job.GetID()).Select("*").Omit("deleted_at", "created_at", "job_id").Updates(stats).Error
})
if s.manager == nil || s.manager.database == nil {
return ErrNoDatabaseConnection
}
return saveJob(job, s.manager.database)
//stats := job.GetStats()
//return s.manager.database.Transaction(func(tx *gorm.DB) error {
// return tx.Model(&JobStats{}).Where("job_id = ?",
// job.GetID()).
// Select("*").Omit("deleted_at", "created_at", "job_id").Updates(stats).Error
//
//})
}
func (s *JobSyncer) CreateOrUpdateJob(job GenericJob) error {
func (s *JobSyncer) SaveJob(job GenericJob) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.manager == nil || s.manager.database == nil {
return ErrNoDatabaseConnection
}
return createOrUpdateJob(job, s.manager.database)
return saveJob(job, s.manager.database)
}
func createOrUpdateJob(job GenericJob, db *gorm.DB) error {
return db.Transaction(func(tx *gorm.DB) error {
func save(job *JobPersistence, db *gorm.DB) error {
permJob := job.GetPersistence()
if db == nil {
return ErrNoDatabaseConnection
}
memLogs := permJob.Logs
permJob.Logs = nil
var existingJob JobPersistence
result := tx.Unscoped().Where("id = ?", permJob.GetID()).First(&existingJob)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
err := tx.Create(&permJob).Error
if err != nil {
Trace("Error while creating job", "error", err)
return err
}
} else {
Trace("Error while creating job", "error", result.Error)
return result.Error
}
} else {
// remove deleted from
r := tx.Unscoped().Model(&existingJob).Where("id = ?", existingJob.ID).Select("*").
Omit("deleted_at", "created_at", "job_id").Update("deleted_at", nil)
if r.Error != nil {
Trace("Error while deleting job", "error", r.Error)
return r.Error
}
return db.Transaction(func(tx *gorm.DB) error {
// update scheduler
resultStats := tx.Model(&existingJob).Where("id = ?", existingJob.ID).Select("*").
Omit("deleted_at", "created_at", "job_id").UpdateColumns(SchedulerPersistence{
Type: "",
Interval: 0,
Spec: "",
Delay: 0,
Event: "",
Time: nil,
Executed: false,
})
if resultStats.Error != nil {
Trace("Error while updating job stats", "error", resultStats.Error)
return resultStats.Error
}
if err := tx.Save(job).Error; err != nil {
return err
}
r2 := tx.Model(&existingJob).Where("id = ?", existingJob.ID).Select("*").
Omit("deleted_at", "created_at", "job_id").Updates(permJob)
if r2.Error != nil {
return r2.Error
if job.Stats != (JobStats{}) {
job.Stats.JobID = job.ID
if err := tx.Save(&job.Stats).Error; err != nil {
return err
}
}
if tx.Error != nil {
Trace("Error while updating job", "error", tx.Error)
return tx.Error
for i, _ := range job.Logs {
job.Logs[i].LogID = 0
_ = tx.Create(&job.Logs[i])
// no error handling, if it fails, it fails
}
tx.Model(&permJob.Stats).
Select("*").Omit("deleted_at", "created_at", "job_id").
UpdateColumns(permJob.Stats)
return nil
})
}
if tx.Error != nil {
Error("Error while updating job stats", "error", tx.Error)
return result.Error
}
func saveJob(job GenericJob, db *gorm.DB) error {
for i, _ := range memLogs {
memLogs[i].LogID = 0
_ = tx.Create(&memLogs[i])
// no error handling, if it fails, it fails
if db == nil {
return ErrNoDatabaseConnection
}
permJob := job.GetPersistence()
maxRetries := 3
var attempt int
for attempt = 0; attempt < maxRetries; attempt++ {
err := save(&permJob, db)
if err != nil {
if strings.Contains(err.Error(), "lock") {
time.Sleep(time.Millisecond * 100)
continue
}
return err
}
break
}
return nil
if attempt == maxRetries {
return ErrMaxRetriesReached
}
})
return nil
//return db.Transaction(func(tx *gorm.DB) error {
//
// permJob := job.GetPersistence()
//
// memLogs := permJob.Logs
// permJob.Logs = nil
//
// var existingJob JobPersistence
// result := tx.Unscoped().Where("id = ?", permJob.GetID()).First(&existingJob)
//
// if result.Error != nil {
// err, done := createJob(tx, result, permJob)
// if done {
// return err
// }
// } else {
//
// err, done := updateJob(tx, existingJob, permJob)
// if done {
// return err
// }
// }
//
// if tx.Error != nil {
// Trace("Error while updating job", "error", tx.Error)
// return tx.Error
// }
//
// return nil
//
//})
}
//
//func updateJob(tx *gorm.DB, existingJob JobPersistence, permJob JobPersistence) (error, bool) {
//
// r := tx.Unscoped().Model(&existingJob).Select("*").
// Omit("deleted_at", "created_at", "job_id").
// Update("deleted_at", nil)
//
// if r.Error != nil {
// Trace("Error while deleting job", "error", r.Error)
// return r.Error, true
// }
//
// resultStats := tx.Model(&existingJob).Select("*").
// Omit("deleted_at", "created_at", "job_id").Update()
//
// if resultStats.Error != nil {
// Trace("Error while updating job stats", "error", resultStats.Error)
// return resultStats.Error, true
// }
//
// r2 := tx.Model(&existingJob).Select("*").
// Omit("deleted_at", "created_at", "job_id").Updates(permJob)
// if r2.Error != nil {
// return r2.Error, true
// }
//
// tx.Model(&permJob.Stats).
// Select("*").Omit("deleted_at", "created_at", "job_id").
// UpdateColumns(permJob.Stats)
//
// if tx.Error != nil {
// Error("Error while updating job stats", "error", tx.Error)
// return result.Error
// }
//
// for i, _ := range memLogs {
// memLogs[i].LogID = 0
// _ = tx.Create(&memLogs[i])
// // no error handling, if it fails, it fails
// }
//
// return nil, false
//}
//
//func createJob(tx *gorm.DB, result *gorm.DB, permJob JobPersistence) (error, bool) {
// if errors.Is(result.Error, gorm.ErrRecordNotFound) {
// err := tx.Create(&permJob).Error
// if err != nil {
// Trace("Error while creating job", "error", err)
// return err, true
// }
// } else {
// Trace("Error while creating job", "error", result.Error)
// return result.Error, true
// }
// return nil, false
//}
// Copyright 2024 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
......@@ -24,7 +27,7 @@ func TestDeleteJob(t *testing.T) {
// Erstelle einen Job zum Löschen
runner := &CounterRunnable{}
job := NewJob[CounterResult]("testJobID", runner)
err = createOrUpdateJob(job, db)
err = saveJob(job, db)
assert.Nil(t, err)
var count int64
......@@ -56,7 +59,7 @@ func TestResetLogs(t *testing.T) {
// Erstelle einen Job und füge einige Logs hinzu
runner := &CounterRunnable{}
job := NewJob[CounterResult]("testJobID", runner)
err = createOrUpdateJob(job, db)
err = saveJob(job, db)
assert.Nil(t, err)
// Füge Logs zum Job hinzu
......@@ -97,7 +100,7 @@ func TestResetStats(t *testing.T) {
// Erstelle einen Job und setze einige Statistiken
runner := &CounterRunnable{}
job := NewJob[CounterResult]("testJobID", runner)
err = createOrUpdateJob(job, db)
err = saveJob(job, db)
assert.Nil(t, err)
// Aktualisiere die Job-Statistiken
......
......@@ -52,4 +52,6 @@ var (
ErrInvalidDuration = fmt.Errorf("invalid duration")
ErrJobSyncerAlreadyRunning = fmt.Errorf("JobSyncer is already running")
ErrJobSyncerNotRunning = fmt.Errorf("JobSyncer is not running")
ErrMaxRetriesReached = fmt.Errorf("maximum number of retries reached")
ErrTimeoutReached = fmt.Errorf("timeout reached")
)
......@@ -5,165 +5,83 @@ package jobqueue
import (
"sync"
"time"
)
type JobSyncer struct {
jobQueue []GenericJob
queueLock sync.Mutex
notifyChannel chan struct{}
stopChan chan struct{}
jobSaveProgress sync.WaitGroup
status Status
manager *Manager
mu sync.Mutex
migrateFlag bool
}
type Status int
const (
JobSyncerStatusStopped Status = iota
JobSyncerStatusRunning
)
func CreateAndStartJobSyncer[P *Promise[*JobSyncer]](manager *Manager) *Promise[*JobSyncer] {
s := NewJobSyncer(manager)
s.mu.Lock()
defer s.mu.Unlock()
return NewPromise[*JobSyncer](func(resolve func(*JobSyncer), reject func(error)) {
if s.manager == nil || s.manager.database == nil {
reject(ErrNoDatabaseConnection)
return
}
if s.status == JobSyncerStatusRunning {
resolve(s)
return
}
db := s.manager.database
if !s.migrateFlag {
err := db.AutoMigrate(&JobPersistence{}, &JobLog{}, &JobStats{})
if err != nil {
reject(err)
return
}
s.migrateFlag = true
}
err := s.Start()
if err != nil {
reject(err)
return
}
resolve(s)
})
mu sync.Mutex
migrated bool
manager *Manager
running sync.WaitGroup
lastError error
}
func NewJobSyncer(manager *Manager) *JobSyncer {
return &JobSyncer{
jobQueue: make([]GenericJob, 0),
manager: manager,
js := &JobSyncer{
manager: manager,
mu: sync.Mutex{},
}
}
func (js *JobSyncer) Start() error {
js.mu.Lock()
defer js.mu.Unlock()
if js.status == JobSyncerStatusRunning {
return ErrJobSyncerAlreadyRunning
if manager == nil || manager.database == nil {
return js
}
js.notifyChannel = make(chan struct{}, 1) // Buffer to avoid blocking
js.stopChan = make(chan struct{})
js.status = JobSyncerStatusRunning
go js.runWorker()
return nil
}
func (js *JobSyncer) runWorker() {
for {
select {
case <-js.notifyChannel:
js.processJobs()
case <-js.stopChan:
js.cleanup()
return
db := js.manager.database
if !js.migrated {
err := db.AutoMigrate(&JobPersistence{}, &JobLog{}, &JobStats{})
if err != nil {
js.lastError = err
} else {
js.migrated = true
}
}
}
func (js *JobSyncer) processJobs() {
for {
js.queueLock.Lock()
if len(js.jobQueue) == 0 {
js.queueLock.Unlock()
return
}
job := js.jobQueue[0]
js.jobQueue = js.jobQueue[1:]
js.queueLock.Unlock()
js.jobSaveProgress.Add(1)
js.processJob(job)
js.jobSaveProgress.Done()
}
return js
}
func (js *JobSyncer) Wait(timeout time.Duration) error {
done := make(chan struct{})
func (js *JobSyncer) AddJob(job GenericJob) {
js.queueLock.Lock()
// check if job is already in queue
for _, j := range js.jobQueue {
if j.GetID() == job.GetID() {
js.queueLock.Unlock()
return
}
}
js.jobQueue = append(js.jobQueue, job)
js.queueLock.Unlock()
go func() {
js.running.Wait()
close(done)
}()
js.jobSaveProgress.Add(1)
// Non-blocking notify
select {
case js.notifyChannel <- struct{}{}:
default:
case <-done:
// Die WaitGroup ist fertig
return nil
case <-time.After(timeout):
return ErrTimeoutReached
}
}
func (js *JobSyncer) processJob(job GenericJob) {
defer js.jobSaveProgress.Done()
err := createOrUpdateJob(job, js.manager.database)
Error("Error while creating or updating job", "error", err)
}
func (js *JobSyncer) Stop() error {
func (js *JobSyncer) Sync(job GenericJob) {
js.mu.Lock()
if js.status != JobSyncerStatusRunning {
js.mu.Unlock()
return ErrJobSyncerNotRunning
}
js.status = JobSyncerStatusStopped
js.mu.Unlock()
defer js.mu.Unlock()
close(js.stopChan)
js.jobSaveProgress.Wait()
js.running.Add(1)
go func() {
defer js.running.Done()
err := saveJob(job, js.manager.GetDB())
if err != nil {
Error("Error while creating or updating job", err)
}
}()
return nil
}
func (js *JobSyncer) cleanup() {
func (js *JobSyncer) LastError() error {
js.mu.Lock()
defer js.mu.Unlock()
return js.lastError
}
js.jobSaveProgress.Wait()
js.status = JobSyncerStatusStopped
// Migrated returns if the database has been migrated.
//
// No parameters.
// Returns a boolean.
func (js *JobSyncer) Migrated() bool {
js.mu.Lock()
defer js.mu.Unlock()
return js.migrated
}
......@@ -24,31 +24,16 @@ func TestSaveJobWithSQLite(t *testing.T) {
//saver := NewJobSyncer(manager)
// Starte den DBSaver
p := CreateAndStartJobSyncer(manager)
ready := make(chan struct{})
var saver *JobSyncer
Then[*JobSyncer, *JobSyncer](p, func(value *JobSyncer) (*JobSyncer, error) {
saver = value
close(ready)
return value, nil
}, func(e error) error {
close(ready)
Error("Error while starting db saver", "error", err)
return nil
})
<-ready
saver := NewJobSyncer(manager)
jobID := JobID("testJob")
job := NewJob[CounterResult](jobID, &CounterRunnable{})
saver.AddJob(job)
saver.Sync(job)
time.Sleep(100 * time.Millisecond)
saver.Stop()
manager.WaitSync()
var count int64
gormDB.Model(&JobPersistence{}).Count(&count)
......
// Copyright 2024 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
......@@ -21,19 +24,19 @@ type DefaultLogger struct {
}
func (l *DefaultLogger) Info(msg string, keysAndValues ...interface{}) {
_ = l.Output(2, "INFO: "+msg)
l.Print("INFO: "+msg, keysAndValues)
}
func (l *DefaultLogger) Error(msg string, keysAndValues ...interface{}) {
_ = l.Output(2, "ERROR: "+msg)
l.Print("ERROR: "+msg, keysAndValues)
}
func (l *DefaultLogger) Warn(msg string, keysAndValues ...interface{}) {
_ = l.Output(2, "WARN: "+msg)
l.Print("WARN: "+msg, keysAndValues)
}
func (l *DefaultLogger) Trace(msg string, keysAndValues ...interface{}) {
_ = l.Output(2, "DEBUG: "+msg)
l.Print("DEBUG: "+msg, keysAndValues)
}
// Ensure DefaultLogger satisfies the Logger interface.
......
// Copyright 2024 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
......
......@@ -153,7 +153,6 @@ func (m *Manager) DeleteJob(id JobID) error {
}
if m.jobSyncer != nil {
err := m.jobSyncer.DeleteJob(job)
if err != nil {
return err
......@@ -164,6 +163,23 @@ func (m *Manager) DeleteJob(id JobID) error {
}
func (m *Manager) WaitSync() {
m.mu.Lock()
defer m.mu.Unlock()
if m.jobSyncer != nil {
_ = m.jobSyncer.Wait(2 * time.Second)
}
}
func (m *Manager) Sync(job GenericJob) {
m.mu.Lock()
defer m.mu.Unlock()
if m.jobSyncer != nil {
m.jobSyncer.Sync(job)
}
}
// RemoveJob removes a job from the active jobs
// If you want to remove a job from the active jobs and the database, use DeleteJob instead
func (m *Manager) RemoveJob(id JobID) error {
......@@ -289,12 +305,6 @@ func (m *Manager) SetDB(db *gorm.DB) *Manager {
m.mu.Lock()
defer m.mu.Unlock()
m.database = db
if m.jobSyncer != nil {
return m
}
m.jobSyncer = NewJobSyncer(m)
return m
}
......@@ -397,31 +407,8 @@ func (m *Manager) Start() error {
return ErrManagerAlreadyRunning
}
if m.jobSyncer != nil {
p := CreateAndStartJobSyncer(m)
ready := make(chan struct{})
var jobSyncerErr error
Then[*JobSyncer, *JobSyncer](p, func(value *JobSyncer) (*JobSyncer, error) {
close(ready)
m.mu.Lock()
m.jobSyncer = value
m.mu.Unlock()
return value, nil
}, func(e error) error {
close(ready)
Error("Error while starting db saver", "error", e)
jobSyncerErr = e
return nil
})
<-ready
if jobSyncerErr != nil {
return jobSyncerErr
}
if m.jobSyncer == nil {
m.jobSyncer = NewJobSyncer(m)
}
if len(m.workerMap) == 0 {
......@@ -470,6 +457,18 @@ func (m *Manager) Start() error {
}
func safeClose(ch chan interface{}) (err error) {
defer func() {
if recover() != nil {
err = ErrChannelAlreadyClosed
}
}()
close(ch)
return
}
// Stop stops the manager
func (m *Manager) Stop() error {
m.mu.Lock()
......@@ -482,6 +481,7 @@ func (m *Manager) Stop() error {
m.eventBus.Unsubscribe(QueueJob, m.jobEventCh)
m.eventBus.Unsubscribe(JobReady, m.jobEventCh)
m.eventBus.Shutdown()
_ = safeClose(m.jobEventCh)
var wrappedErr error
......@@ -508,15 +508,7 @@ func (m *Manager) Stop() error {
}
if m.jobSyncer != nil {
err = m.jobSyncer.Stop()
if err != nil {
if wrappedErr == nil {
wrappedErr = fmt.Errorf("Error: ")
}
wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
}
_ = m.jobSyncer.Wait(10 * time.Second)
}
return wrappedErr
......@@ -530,6 +522,8 @@ func (m *Manager) SetLogger(logger Logger) *Manager {
m.database = m.database.Session(&gorm.Session{Logger: newGormAdapter()})
}
SetLogger(logger)
return m
}
......@@ -580,7 +574,7 @@ func (m *Manager) ScheduleJob(job GenericJob, scheduler Scheduler) error {
m.activeJobs[job.GetID()] = job
if m.jobSyncer != nil {
m.jobSyncer.AddJob(job)
m.jobSyncer.Sync(job)
}
return nil
......
......@@ -253,11 +253,14 @@ func TestManagerEventHandling(t *testing.T) {
startTime := time.Now()
for {
if job.runner.(*CounterRunnable).GetCount() > 10 {
currentCount := job.runner.(*CounterRunnable).GetCount()
if currentCount > 10 {
break
}
if time.Since(startTime) > 10*time.Second {
time.Sleep(2 * time.Millisecond)
if time.Since(startTime) > 1*time.Second {
t.Fatalf("Job did not finish in time")
}
}
......
// Copyright 2024 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
......
// Copyright 2024 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
......
......@@ -2,17 +2,3 @@
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
// safeClose closes the given channel and returns an error if the channel is already closed
func safeClose(ch chan interface{}) (err error) {
defer func() {
if recover() != nil {
err = ErrChannelAlreadyClosed
}
}()
err = nil
close(ch)
return
}