Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • master
  • v1.0.0
  • v1.0.1
  • v1.1.0
  • v1.10.0
  • v1.10.1
  • v1.10.2
  • v1.11.0
  • v1.12.0
  • v1.12.1
  • v1.12.2
  • v1.12.3
  • v1.12.4
  • v1.12.5
  • v1.12.6
  • v1.12.7
  • v1.12.8
  • v1.13.0
  • v1.13.1
  • v1.13.2
  • v1.14.0
  • v1.15.0
  • v1.15.1
  • v1.15.10
  • v1.15.11
  • v1.15.12
  • v1.15.13
  • v1.15.14
  • v1.15.15
  • v1.15.16
  • v1.15.17
  • v1.15.2
  • v1.15.3
  • v1.15.4
  • v1.15.5
  • v1.15.6
  • v1.15.7
  • v1.15.8
  • v1.15.9
  • v1.16.0
  • v1.16.1
  • v1.17.0
  • v1.18.0
  • v1.18.1
  • v1.18.2
  • v1.19.0
  • v1.19.1
  • v1.19.2
  • v1.19.3
  • v1.19.4
  • v1.2.0
  • v1.20.0
  • v1.20.1
  • v1.20.2
  • v1.20.3
  • v1.21.0
  • v1.21.1
  • v1.22.0
  • v1.23.0
  • v1.23.1
  • v1.23.2
  • v1.3.0
  • v1.3.1
  • v1.3.2
  • v1.4.0
  • v1.5.0
  • v1.5.1
  • v1.6.0
  • v1.6.1
  • v1.7.0
  • v1.7.1
  • v1.7.2
  • v1.7.3
  • v1.8.0
  • v1.8.1
  • v1.9.0
76 results

Target

Select target project
  • oss/libraries/go/services/job-queues
1 result
Select Git revision
  • master
  • v1.0.0
  • v1.0.1
  • v1.1.0
  • v1.10.0
  • v1.10.1
  • v1.10.2
  • v1.11.0
  • v1.12.0
  • v1.12.1
  • v1.12.2
  • v1.12.3
  • v1.12.4
  • v1.12.5
  • v1.12.6
  • v1.12.7
  • v1.12.8
  • v1.13.0
  • v1.13.1
  • v1.13.2
  • v1.14.0
  • v1.15.0
  • v1.15.1
  • v1.15.10
  • v1.15.11
  • v1.15.12
  • v1.15.13
  • v1.15.14
  • v1.15.15
  • v1.15.16
  • v1.15.17
  • v1.15.2
  • v1.15.3
  • v1.15.4
  • v1.15.5
  • v1.15.6
  • v1.15.7
  • v1.15.8
  • v1.15.9
  • v1.16.0
  • v1.16.1
  • v1.17.0
  • v1.18.0
  • v1.18.1
  • v1.18.2
  • v1.19.0
  • v1.19.1
  • v1.19.2
  • v1.19.3
  • v1.19.4
  • v1.2.0
  • v1.20.0
  • v1.20.1
  • v1.20.2
  • v1.20.3
  • v1.21.0
  • v1.21.1
  • v1.22.0
  • v1.23.0
  • v1.23.1
  • v1.23.2
  • v1.3.0
  • v1.3.1
  • v1.3.2
  • v1.4.0
  • v1.5.0
  • v1.5.1
  • v1.6.0
  • v1.6.1
  • v1.7.0
  • v1.7.1
  • v1.7.2
  • v1.7.3
  • v1.8.0
  • v1.8.1
  • v1.9.0
76 results
Show changes
Commits on Source (2)
...@@ -25,23 +25,7 @@ func TestWriteToDB1(t *testing.T) { ...@@ -25,23 +25,7 @@ func TestWriteToDB1(t *testing.T) {
manager := &Manager{database: gormDB} manager := &Manager{database: gormDB}
// Starte den DBSaver // Starte den DBSaver
promise := CreateAndStartJobSyncer(manager) saver := NewJobSyncer(manager)
ready := make(chan struct{})
var saver *JobSyncer
Then[*JobSyncer, *JobSyncer](promise, func(value *JobSyncer) (*JobSyncer, error) {
close(ready)
saver = value
return value, nil
}, func(e error) error {
close(ready)
Error("Error while starting db saver", "error", err)
return nil
})
<-ready
runner := &CounterRunnable{} runner := &CounterRunnable{}
job := NewJob[CounterResult]("job1", runner) job := NewJob[CounterResult]("job1", runner)
...@@ -53,13 +37,10 @@ func TestWriteToDB1(t *testing.T) { ...@@ -53,13 +37,10 @@ func TestWriteToDB1(t *testing.T) {
job.scheduler = scheduler job.scheduler = scheduler
saver.AddJob(job) saver.Sync(job)
saver.AddJob(job) saver.Sync(job)
time.Sleep(1 * time.Second)
err = saver.Stop() manager.WaitSync()
assert.Nil(t, err)
// check if stats are in database // check if stats are in database
var stats JobPersistence var stats JobPersistence
......
...@@ -24,28 +24,11 @@ func TestWriteToDB2(t *testing.T) { ...@@ -24,28 +24,11 @@ func TestWriteToDB2(t *testing.T) {
manager := &Manager{database: gormDB} manager := &Manager{database: gormDB}
// Starte den DBSaver // Starte den DBSaver
p := CreateAndStartJobSyncer(manager) NewJobSyncer(manager)
ready := make(chan struct{})
//var saver *JobSyncer
Then[*JobSyncer, *JobSyncer](p, func(value *JobSyncer) (*JobSyncer, error) {
close(ready)
// saver = value
return value, nil
}, func(e error) error {
close(ready)
Error("Error while starting db saver", "error", err)
return nil
})
db := gormDB db := gormDB
<-ready
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
...@@ -99,7 +82,7 @@ func TestWriteToDB2(t *testing.T) { ...@@ -99,7 +82,7 @@ func TestWriteToDB2(t *testing.T) {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
mgr.jobSyncer.AddJob(job) mgr.jobSyncer.Sync(job)
runtime.Gosched() runtime.Gosched()
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
...@@ -116,7 +99,7 @@ func TestWriteToDB2(t *testing.T) { ...@@ -116,7 +99,7 @@ func TestWriteToDB2(t *testing.T) {
runtime.Gosched() runtime.Gosched()
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
mgr.jobSyncer.AddJob(job) mgr.jobSyncer.Sync(job)
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
err = mgr.CancelJobSchedule("job1") err = mgr.CancelJobSchedule("job1")
...@@ -143,6 +126,6 @@ func TestWriteToDB2(t *testing.T) { ...@@ -143,6 +126,6 @@ func TestWriteToDB2(t *testing.T) {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
mgr.jobSyncer.AddJob(job) mgr.jobSyncer.Sync(job)
} }
...@@ -28,18 +28,28 @@ func TestWriteToDB4(t *testing.T) { ...@@ -28,18 +28,28 @@ func TestWriteToDB4(t *testing.T) {
runner := &CounterRunnable{} runner := &CounterRunnable{}
job := NewJob[CounterResult]("job3", runner) job := NewJob[CounterResult]("job3", runner)
id := job.GetID()
job.mu.Lock()
job.stats = JobStats{ job.stats = JobStats{
JobID: job.GetID(), JobID: id,
RunCount: 20, RunCount: 20,
SuccessCount: 30, SuccessCount: 30,
ErrorCount: 40, ErrorCount: 40,
} }
job.mu.Unlock()
db.Logger = db.Logger.LogMode(4)
scheduler := &InstantScheduler{} scheduler := &InstantScheduler{}
err = manager.ScheduleJob(job, scheduler) err = manager.ScheduleJob(job, scheduler)
assert.Nil(t, err) assert.Nil(t, err)
time.Sleep(500 * time.Millisecond) time.Sleep(200 * time.Millisecond)
assert.Equal(t, 21, job.GetStats().RunCount)
manager.WaitSync()
// check is stats are the values above // check is stats are the values above
var tmpJob JobPersistence var tmpJob JobPersistence
...@@ -48,16 +58,18 @@ func TestWriteToDB4(t *testing.T) { ...@@ -48,16 +58,18 @@ func TestWriteToDB4(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
// Validate the fields // Validate the fields
stats := tmpJob.GetStats()
assert.Equal(t, JobID("job3"), tmpJob.ID) assert.Equal(t, JobID("job3"), tmpJob.ID)
assert.Equal(t, 21, tmpJob.Stats.RunCount) assert.Equal(t, 21, stats.RunCount)
assert.Equal(t, 31, tmpJob.Stats.SuccessCount) assert.Equal(t, 31, stats.SuccessCount)
assert.Equal(t, 40, tmpJob.Stats.ErrorCount) assert.Equal(t, 40, stats.ErrorCount)
// reset stats // reset stats
err = manager.ResetJobStats(job.GetID()) err = manager.ResetJobStats(job.GetID())
assert.Nil(t, err) assert.Nil(t, err)
time.Sleep(100 * time.Millisecond)
time.Sleep(500 * time.Millisecond)
var tmpJob2 JobPersistence var tmpJob2 JobPersistence
// check is stats are the values above // check is stats are the values above
...@@ -66,10 +78,13 @@ func TestWriteToDB4(t *testing.T) { ...@@ -66,10 +78,13 @@ func TestWriteToDB4(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
// Validate the fields // Validate the fields
stats2 := tmpJob2.GetStats()
assert.Equal(t, JobID("job3"), tmpJob2.ID) assert.Equal(t, JobID("job3"), tmpJob2.ID)
assert.Equal(t, 0, tmpJob2.Stats.RunCount) assert.Equal(t, 0, stats2.RunCount)
assert.Equal(t, 0, tmpJob2.Stats.SuccessCount) assert.Equal(t, 0, stats2.SuccessCount)
assert.Equal(t, 0, tmpJob2.Stats.ErrorCount) assert.Equal(t, 0, stats2.ErrorCount)
err = manager.DeleteJob(job.GetID()) err = manager.DeleteJob(job.GetID())
assert.Nil(t, err) assert.Nil(t, err)
......
...@@ -7,7 +7,6 @@ import ( ...@@ -7,7 +7,6 @@ import (
"gorm.io/driver/sqlite" "gorm.io/driver/sqlite"
"gorm.io/gorm" "gorm.io/gorm"
"testing" "testing"
"time"
) )
func TestWriteToDB5(t *testing.T) { func TestWriteToDB5(t *testing.T) {
...@@ -36,19 +35,17 @@ func TestWriteToDB5(t *testing.T) { ...@@ -36,19 +35,17 @@ func TestWriteToDB5(t *testing.T) {
sameIDJob := NewJob[CounterResult]("jobSameID", runner) sameIDJob := NewJob[CounterResult]("jobSameID", runner)
time.Sleep(500 * time.Millisecond) manager.WaitSync()
// Trying to save a job with the same ID should do nothing // Trying to save a job with the same ID should do nothing
mgr.mu.Lock() mgr.mu.Lock()
mgr.jobSyncer.AddJob(sameIDJob) mgr.jobSyncer.Sync(sameIDJob)
mgr.mu.Unlock() mgr.mu.Unlock()
err = mgr.CancelJobSchedule("jobSameID") err = mgr.CancelJobSchedule("jobSameID")
assert.Nil(t, err) assert.Nil(t, err)
mgr.mu.Lock() manager.WaitSync()
err = mgr.jobSyncer.Stop()
mgr.mu.Unlock()
assert.Nil(t, err) assert.Nil(t, err)
......
...@@ -48,15 +48,13 @@ func TestWriteToDB6(t *testing.T) { ...@@ -48,15 +48,13 @@ func TestWriteToDB6(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
mgr.mu.Lock() mgr.mu.Lock()
mgr.jobSyncer.AddJob(job) mgr.jobSyncer.Sync(job)
mgr.mu.Unlock() mgr.mu.Unlock()
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
} }
mgr.mu.Lock() manager.WaitSync()
_ = mgr.jobSyncer.Stop()
mgr.mu.Unlock()
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
......
// Copyright 2024 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue package jobqueue
import ( import (
...@@ -17,7 +20,7 @@ func TestCreateOrUpdateJob(t *testing.T) { ...@@ -17,7 +20,7 @@ func TestCreateOrUpdateJob(t *testing.T) {
runner := &CounterRunnable{} runner := &CounterRunnable{}
job := NewJob[CounterResult]("job1", runner) job := NewJob[CounterResult]("job1", runner)
assert.NoError(t, createOrUpdateJob(job, db)) assert.NoError(t, saveJob(job, db))
var jobPersistence JobPersistence var jobPersistence JobPersistence
assert.NoError(t, db.First(&jobPersistence, "id = ?", job.GetID()).Error) assert.NoError(t, db.First(&jobPersistence, "id = ?", job.GetID()).Error)
...@@ -26,7 +29,7 @@ func TestCreateOrUpdateJob(t *testing.T) { ...@@ -26,7 +29,7 @@ func TestCreateOrUpdateJob(t *testing.T) {
assert.Equal(t, "", jobPersistence.Description) assert.Equal(t, "", jobPersistence.Description)
assert.Equal(t, Priority(1), jobPersistence.Priority) assert.Equal(t, Priority(1), jobPersistence.Priority)
job.description = "Updated description" job.description = "Updated description"
assert.NoError(t, createOrUpdateJob(job, db)) assert.NoError(t, saveJob(job, db))
assert.NoError(t, db.First(&jobPersistence, "id = ?", job.GetID()).Error) assert.NoError(t, db.First(&jobPersistence, "id = ?", job.GetID()).Error)
assert.Equal(t, "Updated description", jobPersistence.Description) assert.Equal(t, "Updated description", jobPersistence.Description)
......
// Copyright 2024 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue package jobqueue
import ( import (
......
// Copyright 2024 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue package jobqueue
import ( import (
......
// Copyright 2024 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue package jobqueue
import ( import (
"errors"
"gorm.io/gorm" "gorm.io/gorm"
"strings"
"time"
) )
func (s *JobSyncer) DeleteJob(job GenericJob) error { func (s *JobSyncer) DeleteJob(job GenericJob) error {
...@@ -15,19 +19,15 @@ func (s *JobSyncer) DeleteJob(job GenericJob) error { ...@@ -15,19 +19,15 @@ func (s *JobSyncer) DeleteJob(job GenericJob) error {
return s.manager.database.Transaction(func(tx *gorm.DB) error { return s.manager.database.Transaction(func(tx *gorm.DB) error {
permJob := job.GetPersistence() permJob := job.GetPersistence()
if err := tx.Where("job_id = ?", permJob.GetID()).Delete(&JobLog{}).Error; err != nil { if err := tx.Where("job_id = ?", permJob.GetID()).Delete(&JobLog{}).Error; err != nil {
return err return err
} }
if err := tx.Where("job_id = ?", permJob.GetID()).Delete(&JobStats{}).Error; err != nil { if err := tx.Where("job_id = ?", permJob.GetID()).Delete(&JobStats{}).Error; err != nil {
return err return err
} }
if err := tx.Delete(&permJob).Error; err != nil { if err := tx.Delete(&permJob).Error; err != nil {
return err return err
} }
return nil return nil
}) })
} }
...@@ -59,105 +59,173 @@ func (s *JobSyncer) ResetStats(job GenericJob) error { ...@@ -59,105 +59,173 @@ func (s *JobSyncer) ResetStats(job GenericJob) error {
} }
job.ResetStats() job.ResetStats()
stats := job.GetStats() if s.manager == nil || s.manager.database == nil {
return s.manager.database.Transaction(func(tx *gorm.DB) error { return ErrNoDatabaseConnection
return tx.Model(&JobStats{}).Where("job_id = ?", job.GetID()).Select("*").Omit("deleted_at", "created_at", "job_id").Updates(stats).Error }
return saveJob(job, s.manager.database)
}) //stats := job.GetStats()
//return s.manager.database.Transaction(func(tx *gorm.DB) error {
// return tx.Model(&JobStats{}).Where("job_id = ?",
// job.GetID()).
// Select("*").Omit("deleted_at", "created_at", "job_id").Updates(stats).Error
//
//})
} }
func (s *JobSyncer) CreateOrUpdateJob(job GenericJob) error { func (s *JobSyncer) SaveJob(job GenericJob) error {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
if s.manager == nil || s.manager.database == nil { if s.manager == nil || s.manager.database == nil {
return ErrNoDatabaseConnection return ErrNoDatabaseConnection
} }
return saveJob(job, s.manager.database)
return createOrUpdateJob(job, s.manager.database)
} }
func createOrUpdateJob(job GenericJob, db *gorm.DB) error { func save(job *JobPersistence, db *gorm.DB) error {
return db.Transaction(func(tx *gorm.DB) error {
permJob := job.GetPersistence() if db == nil {
return ErrNoDatabaseConnection
}
memLogs := permJob.Logs return db.Transaction(func(tx *gorm.DB) error {
permJob.Logs = nil
var existingJob JobPersistence
result := tx.Unscoped().Where("id = ?", permJob.GetID()).First(&existingJob)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
err := tx.Create(&permJob).Error
if err != nil {
Trace("Error while creating job", "error", err)
return err
}
} else {
Trace("Error while creating job", "error", result.Error)
return result.Error
}
} else {
// remove deleted from
r := tx.Unscoped().Model(&existingJob).Where("id = ?", existingJob.ID).Select("*").
Omit("deleted_at", "created_at", "job_id").Update("deleted_at", nil)
if r.Error != nil {
Trace("Error while deleting job", "error", r.Error)
return r.Error
}
// update scheduler if err := tx.Save(job).Error; err != nil {
resultStats := tx.Model(&existingJob).Where("id = ?", existingJob.ID).Select("*"). return err
Omit("deleted_at", "created_at", "job_id").UpdateColumns(SchedulerPersistence{ }
Type: "",
Interval: 0,
Spec: "",
Delay: 0,
Event: "",
Time: nil,
Executed: false,
})
if resultStats.Error != nil {
Trace("Error while updating job stats", "error", resultStats.Error)
return resultStats.Error
}
r2 := tx.Model(&existingJob).Where("id = ?", existingJob.ID).Select("*"). if job.Stats != (JobStats{}) {
Omit("deleted_at", "created_at", "job_id").Updates(permJob) job.Stats.JobID = job.ID
if r2.Error != nil { if err := tx.Save(&job.Stats).Error; err != nil {
return r2.Error return err
} }
} }
if tx.Error != nil { for i, _ := range job.Logs {
Trace("Error while updating job", "error", tx.Error) job.Logs[i].LogID = 0
return tx.Error _ = tx.Create(&job.Logs[i])
// no error handling, if it fails, it fails
} }
tx.Model(&permJob.Stats). return nil
Select("*").Omit("deleted_at", "created_at", "job_id"). })
UpdateColumns(permJob.Stats) }
if tx.Error != nil { func saveJob(job GenericJob, db *gorm.DB) error {
Error("Error while updating job stats", "error", tx.Error)
return result.Error
}
for i, _ := range memLogs { if db == nil {
memLogs[i].LogID = 0 return ErrNoDatabaseConnection
_ = tx.Create(&memLogs[i]) }
// no error handling, if it fails, it fails
permJob := job.GetPersistence()
maxRetries := 3
var attempt int
for attempt = 0; attempt < maxRetries; attempt++ {
err := save(&permJob, db)
if err != nil {
if strings.Contains(err.Error(), "lock") {
time.Sleep(time.Millisecond * 100)
continue
}
return err
} }
break
}
return nil if attempt == maxRetries {
return ErrMaxRetriesReached
}
}) return nil
//return db.Transaction(func(tx *gorm.DB) error {
//
// permJob := job.GetPersistence()
//
// memLogs := permJob.Logs
// permJob.Logs = nil
//
// var existingJob JobPersistence
// result := tx.Unscoped().Where("id = ?", permJob.GetID()).First(&existingJob)
//
// if result.Error != nil {
// err, done := createJob(tx, result, permJob)
// if done {
// return err
// }
// } else {
//
// err, done := updateJob(tx, existingJob, permJob)
// if done {
// return err
// }
// }
//
// if tx.Error != nil {
// Trace("Error while updating job", "error", tx.Error)
// return tx.Error
// }
//
// return nil
//
//})
} }
//
//func updateJob(tx *gorm.DB, existingJob JobPersistence, permJob JobPersistence) (error, bool) {
//
// r := tx.Unscoped().Model(&existingJob).Select("*").
// Omit("deleted_at", "created_at", "job_id").
// Update("deleted_at", nil)
//
// if r.Error != nil {
// Trace("Error while deleting job", "error", r.Error)
// return r.Error, true
// }
//
// resultStats := tx.Model(&existingJob).Select("*").
// Omit("deleted_at", "created_at", "job_id").Update()
//
// if resultStats.Error != nil {
// Trace("Error while updating job stats", "error", resultStats.Error)
// return resultStats.Error, true
// }
//
// r2 := tx.Model(&existingJob).Select("*").
// Omit("deleted_at", "created_at", "job_id").Updates(permJob)
// if r2.Error != nil {
// return r2.Error, true
// }
//
// tx.Model(&permJob.Stats).
// Select("*").Omit("deleted_at", "created_at", "job_id").
// UpdateColumns(permJob.Stats)
//
// if tx.Error != nil {
// Error("Error while updating job stats", "error", tx.Error)
// return result.Error
// }
//
// for i, _ := range memLogs {
// memLogs[i].LogID = 0
// _ = tx.Create(&memLogs[i])
// // no error handling, if it fails, it fails
// }
//
// return nil, false
//}
//
//func createJob(tx *gorm.DB, result *gorm.DB, permJob JobPersistence) (error, bool) {
// if errors.Is(result.Error, gorm.ErrRecordNotFound) {
// err := tx.Create(&permJob).Error
// if err != nil {
// Trace("Error while creating job", "error", err)
// return err, true
// }
// } else {
// Trace("Error while creating job", "error", result.Error)
// return result.Error, true
// }
// return nil, false
//}
// Copyright 2024 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue package jobqueue
import ( import (
...@@ -24,7 +27,7 @@ func TestDeleteJob(t *testing.T) { ...@@ -24,7 +27,7 @@ func TestDeleteJob(t *testing.T) {
// Erstelle einen Job zum Löschen // Erstelle einen Job zum Löschen
runner := &CounterRunnable{} runner := &CounterRunnable{}
job := NewJob[CounterResult]("testJobID", runner) job := NewJob[CounterResult]("testJobID", runner)
err = createOrUpdateJob(job, db) err = saveJob(job, db)
assert.Nil(t, err) assert.Nil(t, err)
var count int64 var count int64
...@@ -56,7 +59,7 @@ func TestResetLogs(t *testing.T) { ...@@ -56,7 +59,7 @@ func TestResetLogs(t *testing.T) {
// Erstelle einen Job und füge einige Logs hinzu // Erstelle einen Job und füge einige Logs hinzu
runner := &CounterRunnable{} runner := &CounterRunnable{}
job := NewJob[CounterResult]("testJobID", runner) job := NewJob[CounterResult]("testJobID", runner)
err = createOrUpdateJob(job, db) err = saveJob(job, db)
assert.Nil(t, err) assert.Nil(t, err)
// Füge Logs zum Job hinzu // Füge Logs zum Job hinzu
...@@ -97,7 +100,7 @@ func TestResetStats(t *testing.T) { ...@@ -97,7 +100,7 @@ func TestResetStats(t *testing.T) {
// Erstelle einen Job und setze einige Statistiken // Erstelle einen Job und setze einige Statistiken
runner := &CounterRunnable{} runner := &CounterRunnable{}
job := NewJob[CounterResult]("testJobID", runner) job := NewJob[CounterResult]("testJobID", runner)
err = createOrUpdateJob(job, db) err = saveJob(job, db)
assert.Nil(t, err) assert.Nil(t, err)
// Aktualisiere die Job-Statistiken // Aktualisiere die Job-Statistiken
......
...@@ -52,4 +52,6 @@ var ( ...@@ -52,4 +52,6 @@ var (
ErrInvalidDuration = fmt.Errorf("invalid duration") ErrInvalidDuration = fmt.Errorf("invalid duration")
ErrJobSyncerAlreadyRunning = fmt.Errorf("JobSyncer is already running") ErrJobSyncerAlreadyRunning = fmt.Errorf("JobSyncer is already running")
ErrJobSyncerNotRunning = fmt.Errorf("JobSyncer is not running") ErrJobSyncerNotRunning = fmt.Errorf("JobSyncer is not running")
ErrMaxRetriesReached = fmt.Errorf("maximum number of retries reached")
ErrTimeoutReached = fmt.Errorf("timeout reached")
) )
...@@ -5,165 +5,83 @@ package jobqueue ...@@ -5,165 +5,83 @@ package jobqueue
import ( import (
"sync" "sync"
"time"
) )
type JobSyncer struct { type JobSyncer struct {
jobQueue []GenericJob mu sync.Mutex
queueLock sync.Mutex migrated bool
notifyChannel chan struct{} manager *Manager
stopChan chan struct{} running sync.WaitGroup
jobSaveProgress sync.WaitGroup lastError error
status Status
manager *Manager
mu sync.Mutex
migrateFlag bool
}
type Status int
const (
JobSyncerStatusStopped Status = iota
JobSyncerStatusRunning
)
func CreateAndStartJobSyncer[P *Promise[*JobSyncer]](manager *Manager) *Promise[*JobSyncer] {
s := NewJobSyncer(manager)
s.mu.Lock()
defer s.mu.Unlock()
return NewPromise[*JobSyncer](func(resolve func(*JobSyncer), reject func(error)) {
if s.manager == nil || s.manager.database == nil {
reject(ErrNoDatabaseConnection)
return
}
if s.status == JobSyncerStatusRunning {
resolve(s)
return
}
db := s.manager.database
if !s.migrateFlag {
err := db.AutoMigrate(&JobPersistence{}, &JobLog{}, &JobStats{})
if err != nil {
reject(err)
return
}
s.migrateFlag = true
}
err := s.Start()
if err != nil {
reject(err)
return
}
resolve(s)
})
} }
func NewJobSyncer(manager *Manager) *JobSyncer { func NewJobSyncer(manager *Manager) *JobSyncer {
return &JobSyncer{ js := &JobSyncer{
jobQueue: make([]GenericJob, 0), manager: manager,
manager: manager, mu: sync.Mutex{},
} }
}
func (js *JobSyncer) Start() error { if manager == nil || manager.database == nil {
js.mu.Lock() return js
defer js.mu.Unlock()
if js.status == JobSyncerStatusRunning {
return ErrJobSyncerAlreadyRunning
} }
js.notifyChannel = make(chan struct{}, 1) // Buffer to avoid blocking db := js.manager.database
js.stopChan = make(chan struct{}) if !js.migrated {
js.status = JobSyncerStatusRunning err := db.AutoMigrate(&JobPersistence{}, &JobLog{}, &JobStats{})
if err != nil {
go js.runWorker() js.lastError = err
return nil } else {
} js.migrated = true
func (js *JobSyncer) runWorker() {
for {
select {
case <-js.notifyChannel:
js.processJobs()
case <-js.stopChan:
js.cleanup()
return
} }
} }
}
func (js *JobSyncer) processJobs() {
for {
js.queueLock.Lock()
if len(js.jobQueue) == 0 {
js.queueLock.Unlock()
return
}
job := js.jobQueue[0]
js.jobQueue = js.jobQueue[1:]
js.queueLock.Unlock()
js.jobSaveProgress.Add(1) return js
js.processJob(job)
js.jobSaveProgress.Done()
}
} }
func (js *JobSyncer) Wait(timeout time.Duration) error {
done := make(chan struct{})
func (js *JobSyncer) AddJob(job GenericJob) { go func() {
js.queueLock.Lock() js.running.Wait()
close(done)
// check if job is already in queue }()
for _, j := range js.jobQueue {
if j.GetID() == job.GetID() {
js.queueLock.Unlock()
return
}
}
js.jobQueue = append(js.jobQueue, job)
js.queueLock.Unlock()
js.jobSaveProgress.Add(1)
// Non-blocking notify
select { select {
case js.notifyChannel <- struct{}{}: case <-done:
default: // Die WaitGroup ist fertig
return nil
case <-time.After(timeout):
return ErrTimeoutReached
} }
} }
func (js *JobSyncer) processJob(job GenericJob) { func (js *JobSyncer) Sync(job GenericJob) {
defer js.jobSaveProgress.Done()
err := createOrUpdateJob(job, js.manager.database)
Error("Error while creating or updating job", "error", err)
}
func (js *JobSyncer) Stop() error {
js.mu.Lock() js.mu.Lock()
if js.status != JobSyncerStatusRunning { defer js.mu.Unlock()
js.mu.Unlock()
return ErrJobSyncerNotRunning
}
js.status = JobSyncerStatusStopped
js.mu.Unlock()
close(js.stopChan) js.running.Add(1)
js.jobSaveProgress.Wait() go func() {
defer js.running.Done()
err := saveJob(job, js.manager.GetDB())
if err != nil {
Error("Error while creating or updating job", err)
}
}()
return nil
} }
func (js *JobSyncer) cleanup() { func (js *JobSyncer) LastError() error {
js.mu.Lock() js.mu.Lock()
defer js.mu.Unlock() defer js.mu.Unlock()
return js.lastError
}
js.jobSaveProgress.Wait() // Migrated returns if the database has been migrated.
js.status = JobSyncerStatusStopped //
// No parameters.
// Returns a boolean.
func (js *JobSyncer) Migrated() bool {
js.mu.Lock()
defer js.mu.Unlock()
return js.migrated
} }
...@@ -24,31 +24,16 @@ func TestSaveJobWithSQLite(t *testing.T) { ...@@ -24,31 +24,16 @@ func TestSaveJobWithSQLite(t *testing.T) {
//saver := NewJobSyncer(manager) //saver := NewJobSyncer(manager)
// Starte den DBSaver // Starte den DBSaver
p := CreateAndStartJobSyncer(manager) saver := NewJobSyncer(manager)
ready := make(chan struct{})
var saver *JobSyncer
Then[*JobSyncer, *JobSyncer](p, func(value *JobSyncer) (*JobSyncer, error) {
saver = value
close(ready)
return value, nil
}, func(e error) error {
close(ready)
Error("Error while starting db saver", "error", err)
return nil
})
<-ready
jobID := JobID("testJob") jobID := JobID("testJob")
job := NewJob[CounterResult](jobID, &CounterRunnable{}) job := NewJob[CounterResult](jobID, &CounterRunnable{})
saver.AddJob(job) saver.Sync(job)
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
saver.Stop() manager.WaitSync()
var count int64 var count int64
gormDB.Model(&JobPersistence{}).Count(&count) gormDB.Model(&JobPersistence{}).Count(&count)
......
// Copyright 2024 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue package jobqueue
import ( import (
...@@ -21,19 +24,19 @@ type DefaultLogger struct { ...@@ -21,19 +24,19 @@ type DefaultLogger struct {
} }
func (l *DefaultLogger) Info(msg string, keysAndValues ...interface{}) { func (l *DefaultLogger) Info(msg string, keysAndValues ...interface{}) {
_ = l.Output(2, "INFO: "+msg) l.Print("INFO: "+msg, keysAndValues)
} }
func (l *DefaultLogger) Error(msg string, keysAndValues ...interface{}) { func (l *DefaultLogger) Error(msg string, keysAndValues ...interface{}) {
_ = l.Output(2, "ERROR: "+msg) l.Print("ERROR: "+msg, keysAndValues)
} }
func (l *DefaultLogger) Warn(msg string, keysAndValues ...interface{}) { func (l *DefaultLogger) Warn(msg string, keysAndValues ...interface{}) {
_ = l.Output(2, "WARN: "+msg) l.Print("WARN: "+msg, keysAndValues)
} }
func (l *DefaultLogger) Trace(msg string, keysAndValues ...interface{}) { func (l *DefaultLogger) Trace(msg string, keysAndValues ...interface{}) {
_ = l.Output(2, "DEBUG: "+msg) l.Print("DEBUG: "+msg, keysAndValues)
} }
// Ensure DefaultLogger satisfies the Logger interface. // Ensure DefaultLogger satisfies the Logger interface.
......
// Copyright 2024 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue package jobqueue
import ( import (
......
...@@ -153,7 +153,6 @@ func (m *Manager) DeleteJob(id JobID) error { ...@@ -153,7 +153,6 @@ func (m *Manager) DeleteJob(id JobID) error {
} }
if m.jobSyncer != nil { if m.jobSyncer != nil {
err := m.jobSyncer.DeleteJob(job) err := m.jobSyncer.DeleteJob(job)
if err != nil { if err != nil {
return err return err
...@@ -164,6 +163,23 @@ func (m *Manager) DeleteJob(id JobID) error { ...@@ -164,6 +163,23 @@ func (m *Manager) DeleteJob(id JobID) error {
} }
func (m *Manager) WaitSync() {
m.mu.Lock()
defer m.mu.Unlock()
if m.jobSyncer != nil {
_ = m.jobSyncer.Wait(2 * time.Second)
}
}
func (m *Manager) Sync(job GenericJob) {
m.mu.Lock()
defer m.mu.Unlock()
if m.jobSyncer != nil {
m.jobSyncer.Sync(job)
}
}
// RemoveJob removes a job from the active jobs // RemoveJob removes a job from the active jobs
// If you want to remove a job from the active jobs and the database, use DeleteJob instead // If you want to remove a job from the active jobs and the database, use DeleteJob instead
func (m *Manager) RemoveJob(id JobID) error { func (m *Manager) RemoveJob(id JobID) error {
...@@ -289,12 +305,6 @@ func (m *Manager) SetDB(db *gorm.DB) *Manager { ...@@ -289,12 +305,6 @@ func (m *Manager) SetDB(db *gorm.DB) *Manager {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
m.database = db m.database = db
if m.jobSyncer != nil {
return m
}
m.jobSyncer = NewJobSyncer(m)
return m return m
} }
...@@ -397,31 +407,8 @@ func (m *Manager) Start() error { ...@@ -397,31 +407,8 @@ func (m *Manager) Start() error {
return ErrManagerAlreadyRunning return ErrManagerAlreadyRunning
} }
if m.jobSyncer != nil { if m.jobSyncer == nil {
p := CreateAndStartJobSyncer(m) m.jobSyncer = NewJobSyncer(m)
ready := make(chan struct{})
var jobSyncerErr error
Then[*JobSyncer, *JobSyncer](p, func(value *JobSyncer) (*JobSyncer, error) {
close(ready)
m.mu.Lock()
m.jobSyncer = value
m.mu.Unlock()
return value, nil
}, func(e error) error {
close(ready)
Error("Error while starting db saver", "error", e)
jobSyncerErr = e
return nil
})
<-ready
if jobSyncerErr != nil {
return jobSyncerErr
}
} }
if len(m.workerMap) == 0 { if len(m.workerMap) == 0 {
...@@ -470,6 +457,18 @@ func (m *Manager) Start() error { ...@@ -470,6 +457,18 @@ func (m *Manager) Start() error {
} }
func safeClose(ch chan interface{}) (err error) {
defer func() {
if recover() != nil {
err = ErrChannelAlreadyClosed
}
}()
close(ch)
return
}
// Stop stops the manager // Stop stops the manager
func (m *Manager) Stop() error { func (m *Manager) Stop() error {
m.mu.Lock() m.mu.Lock()
...@@ -482,6 +481,7 @@ func (m *Manager) Stop() error { ...@@ -482,6 +481,7 @@ func (m *Manager) Stop() error {
m.eventBus.Unsubscribe(QueueJob, m.jobEventCh) m.eventBus.Unsubscribe(QueueJob, m.jobEventCh)
m.eventBus.Unsubscribe(JobReady, m.jobEventCh) m.eventBus.Unsubscribe(JobReady, m.jobEventCh)
m.eventBus.Shutdown()
_ = safeClose(m.jobEventCh) _ = safeClose(m.jobEventCh)
var wrappedErr error var wrappedErr error
...@@ -508,15 +508,7 @@ func (m *Manager) Stop() error { ...@@ -508,15 +508,7 @@ func (m *Manager) Stop() error {
} }
if m.jobSyncer != nil { if m.jobSyncer != nil {
err = m.jobSyncer.Stop() _ = m.jobSyncer.Wait(10 * time.Second)
if err != nil {
if wrappedErr == nil {
wrappedErr = fmt.Errorf("Error: ")
}
wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
}
} }
return wrappedErr return wrappedErr
...@@ -530,6 +522,8 @@ func (m *Manager) SetLogger(logger Logger) *Manager { ...@@ -530,6 +522,8 @@ func (m *Manager) SetLogger(logger Logger) *Manager {
m.database = m.database.Session(&gorm.Session{Logger: newGormAdapter()}) m.database = m.database.Session(&gorm.Session{Logger: newGormAdapter()})
} }
SetLogger(logger)
return m return m
} }
...@@ -580,7 +574,7 @@ func (m *Manager) ScheduleJob(job GenericJob, scheduler Scheduler) error { ...@@ -580,7 +574,7 @@ func (m *Manager) ScheduleJob(job GenericJob, scheduler Scheduler) error {
m.activeJobs[job.GetID()] = job m.activeJobs[job.GetID()] = job
if m.jobSyncer != nil { if m.jobSyncer != nil {
m.jobSyncer.AddJob(job) m.jobSyncer.Sync(job)
} }
return nil return nil
......
...@@ -253,11 +253,14 @@ func TestManagerEventHandling(t *testing.T) { ...@@ -253,11 +253,14 @@ func TestManagerEventHandling(t *testing.T) {
startTime := time.Now() startTime := time.Now()
for { for {
if job.runner.(*CounterRunnable).GetCount() > 10 { currentCount := job.runner.(*CounterRunnable).GetCount()
if currentCount > 10 {
break break
} }
if time.Since(startTime) > 10*time.Second { time.Sleep(2 * time.Millisecond)
if time.Since(startTime) > 1*time.Second {
t.Fatalf("Job did not finish in time") t.Fatalf("Job did not finish in time")
} }
} }
......
// Copyright 2024 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue package jobqueue
import ( import (
......
// Copyright 2024 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue package jobqueue
import ( import (
......
...@@ -2,17 +2,3 @@ ...@@ -2,17 +2,3 @@
// SPDX-License-Identifier: AGPL-3.0 // SPDX-License-Identifier: AGPL-3.0
package jobqueue package jobqueue
// safeClose closes the given channel and returns an error if the channel is already closed
func safeClose(ch chan interface{}) (err error) {
defer func() {
if recover() != nil {
err = ErrChannelAlreadyClosed
}
}()
err = nil
close(ch)
return
}