diff --git a/database-1_test.go b/database-1_test.go index db3401655e7a7474b5016afcadf432badae0ba29..7e04839fc1e74ef00f2ccd4a23e3fe12162a277f 100644 --- a/database-1_test.go +++ b/database-1_test.go @@ -25,23 +25,7 @@ func TestWriteToDB1(t *testing.T) { manager := &Manager{database: gormDB} // Starte den DBSaver - promise := CreateAndStartJobSyncer(manager) - - ready := make(chan struct{}) - - var saver *JobSyncer - - Then[*JobSyncer, *JobSyncer](promise, func(value *JobSyncer) (*JobSyncer, error) { - close(ready) - saver = value - return value, nil - }, func(e error) error { - close(ready) - Error("Error while starting db saver", "error", err) - return nil - }) - - <-ready + saver := NewJobSyncer(manager) runner := &CounterRunnable{} job := NewJob[CounterResult]("job1", runner) @@ -53,13 +37,10 @@ func TestWriteToDB1(t *testing.T) { job.scheduler = scheduler - saver.AddJob(job) - saver.AddJob(job) - - time.Sleep(1 * time.Second) + saver.Sync(job) + saver.Sync(job) - err = saver.Stop() - assert.Nil(t, err) + manager.WaitSync() // check if stats are in database var stats JobPersistence diff --git a/database-2_test.go b/database-2_test.go index 8673f8a114824cfb3262a5fa01d42036ebea0123..266ca7390385d33cac6b90f9b2984284c4e8d895 100644 --- a/database-2_test.go +++ b/database-2_test.go @@ -24,28 +24,11 @@ func TestWriteToDB2(t *testing.T) { manager := &Manager{database: gormDB} - // Starte den DBSaver - p := CreateAndStartJobSyncer(manager) - - ready := make(chan struct{}) - - //var saver *JobSyncer - - Then[*JobSyncer, *JobSyncer](p, func(value *JobSyncer) (*JobSyncer, error) { - close(ready) - // saver = value - return value, nil - }, func(e error) error { - close(ready) - Error("Error while starting db saver", "error", err) - return nil - }) + NewJobSyncer(manager) db := gormDB - <-ready - var wg sync.WaitGroup wg.Add(1) @@ -99,7 +82,7 @@ func TestWriteToDB2(t *testing.T) { time.Sleep(1 * time.Second) - mgr.jobSyncer.AddJob(job) + mgr.jobSyncer.Sync(job) runtime.Gosched() time.Sleep(1 * time.Second) @@ -116,7 +99,7 @@ func TestWriteToDB2(t *testing.T) { runtime.Gosched() time.Sleep(1 * time.Second) - mgr.jobSyncer.AddJob(job) + mgr.jobSyncer.Sync(job) time.Sleep(2 * time.Second) err = mgr.CancelJobSchedule("job1") @@ -143,6 +126,6 @@ func TestWriteToDB2(t *testing.T) { time.Sleep(1 * time.Second) - mgr.jobSyncer.AddJob(job) + mgr.jobSyncer.Sync(job) } diff --git a/database-4_test.go b/database-4_test.go index 2cc20f2b515904459595ad78819027ffe78eefc3..127311e1ca0a4958370053f1d285783b5779aee0 100644 --- a/database-4_test.go +++ b/database-4_test.go @@ -6,13 +6,22 @@ import ( "github.com/stretchr/testify/assert" "gorm.io/driver/sqlite" "gorm.io/gorm" + "os" "testing" "time" ) func TestWriteToDB4(t *testing.T) { - db, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{}) + testFile := "/home/vs/workspaces/alvine/cloud/framework/dummy.sqlite" + //remove if exists + if _, err := os.Stat(testFile); err == nil { + err = os.Remove(testFile) + assert.Nil(t, err) + } + + // db, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{}) + db, err := gorm.Open(sqlite.Open("file:"+testFile+"?cache=shared"), &gorm.Config{}) if err != nil { t.Fatalf("a error occurred while opening the database: %v", err) } @@ -28,18 +37,28 @@ func TestWriteToDB4(t *testing.T) { runner := &CounterRunnable{} job := NewJob[CounterResult]("job3", runner) + id := job.GetID() + + job.mu.Lock() job.stats = JobStats{ - JobID: job.GetID(), + JobID: id, RunCount: 20, SuccessCount: 30, ErrorCount: 40, } + job.mu.Unlock() + + db.Logger = db.Logger.LogMode(4) scheduler := &InstantScheduler{} err = manager.ScheduleJob(job, scheduler) assert.Nil(t, err) - time.Sleep(500 * time.Millisecond) + time.Sleep(200 * time.Millisecond) + + assert.Equal(t, 21, job.GetStats().RunCount) + + manager.WaitSync() // check is stats are the values above var tmpJob JobPersistence @@ -48,16 +67,18 @@ func TestWriteToDB4(t *testing.T) { assert.Nil(t, err) // Validate the fields + + stats := tmpJob.GetStats() + assert.Equal(t, JobID("job3"), tmpJob.ID) - assert.Equal(t, 21, tmpJob.Stats.RunCount) - assert.Equal(t, 31, tmpJob.Stats.SuccessCount) - assert.Equal(t, 40, tmpJob.Stats.ErrorCount) + assert.Equal(t, 21, stats.RunCount) + assert.Equal(t, 31, stats.SuccessCount) + assert.Equal(t, 40, stats.ErrorCount) // reset stats err = manager.ResetJobStats(job.GetID()) assert.Nil(t, err) - - time.Sleep(500 * time.Millisecond) + time.Sleep(100 * time.Millisecond) var tmpJob2 JobPersistence // check is stats are the values above @@ -66,10 +87,13 @@ func TestWriteToDB4(t *testing.T) { assert.Nil(t, err) // Validate the fields + + stats2 := tmpJob2.GetStats() + assert.Equal(t, JobID("job3"), tmpJob2.ID) - assert.Equal(t, 0, tmpJob2.Stats.RunCount) - assert.Equal(t, 0, tmpJob2.Stats.SuccessCount) - assert.Equal(t, 0, tmpJob2.Stats.ErrorCount) + assert.Equal(t, 0, stats2.RunCount) + assert.Equal(t, 0, stats2.SuccessCount) + assert.Equal(t, 0, stats2.ErrorCount) err = manager.DeleteJob(job.GetID()) assert.Nil(t, err) diff --git a/database-5_test.go b/database-5_test.go index bdd667a8417dcec7c747701d292b372cbf22d98f..adf20410224f18ee1959196bb3f2e8646fec93be 100644 --- a/database-5_test.go +++ b/database-5_test.go @@ -7,7 +7,6 @@ import ( "gorm.io/driver/sqlite" "gorm.io/gorm" "testing" - "time" ) func TestWriteToDB5(t *testing.T) { @@ -36,19 +35,17 @@ func TestWriteToDB5(t *testing.T) { sameIDJob := NewJob[CounterResult]("jobSameID", runner) - time.Sleep(500 * time.Millisecond) + manager.WaitSync() // Trying to save a job with the same ID should do nothing mgr.mu.Lock() - mgr.jobSyncer.AddJob(sameIDJob) + mgr.jobSyncer.Sync(sameIDJob) mgr.mu.Unlock() err = mgr.CancelJobSchedule("jobSameID") assert.Nil(t, err) - mgr.mu.Lock() - err = mgr.jobSyncer.Stop() - mgr.mu.Unlock() + manager.WaitSync() assert.Nil(t, err) diff --git a/database-6_test.go b/database-6_test.go index 272a05a73f55b1a6c485994d492a7801f434cf9e..0e727613a9c77968613367006f5acbdc7dd23c23 100644 --- a/database-6_test.go +++ b/database-6_test.go @@ -48,15 +48,13 @@ func TestWriteToDB6(t *testing.T) { assert.Nil(t, err) mgr.mu.Lock() - mgr.jobSyncer.AddJob(job) + mgr.jobSyncer.Sync(job) mgr.mu.Unlock() time.Sleep(10 * time.Millisecond) } - mgr.mu.Lock() - _ = mgr.jobSyncer.Stop() - mgr.mu.Unlock() + manager.WaitSync() time.Sleep(2 * time.Second) diff --git a/database-7_test.go b/database-7_test.go index 2bb2b1352b101b740f902aa1069d8a30d6ca90b7..963cdab9cfb022f7b6c391e293a2328d97978ccf 100644 --- a/database-7_test.go +++ b/database-7_test.go @@ -1,3 +1,6 @@ +// Copyright 2024 schukai GmbH +// SPDX-License-Identifier: AGPL-3.0 + package jobqueue import ( @@ -17,7 +20,7 @@ func TestCreateOrUpdateJob(t *testing.T) { runner := &CounterRunnable{} job := NewJob[CounterResult]("job1", runner) - assert.NoError(t, createOrUpdateJob(job, db)) + assert.NoError(t, saveJob(job, db)) var jobPersistence JobPersistence assert.NoError(t, db.First(&jobPersistence, "id = ?", job.GetID()).Error) @@ -26,7 +29,7 @@ func TestCreateOrUpdateJob(t *testing.T) { assert.Equal(t, "", jobPersistence.Description) assert.Equal(t, Priority(1), jobPersistence.Priority) job.description = "Updated description" - assert.NoError(t, createOrUpdateJob(job, db)) + assert.NoError(t, saveJob(job, db)) assert.NoError(t, db.First(&jobPersistence, "id = ?", job.GetID()).Error) assert.Equal(t, "Updated description", jobPersistence.Description) diff --git a/database-logging.go b/database-logging.go index aa2c26de7fab98ecccf268c7a9bfc2577760507f..7354a018ee6d88f55c9ce6bd5f2426e06d130c32 100644 --- a/database-logging.go +++ b/database-logging.go @@ -1,3 +1,6 @@ +// Copyright 2024 schukai GmbH +// SPDX-License-Identifier: AGPL-3.0 + package jobqueue import ( diff --git a/database-logging_test.go b/database-logging_test.go index f625c1fc6af4f5672e403adb36aa13388250f39e..ace6f4d47b17b4723422cfe83d0e559cd73bde49 100644 --- a/database-logging_test.go +++ b/database-logging_test.go @@ -1,3 +1,6 @@ +// Copyright 2024 schukai GmbH +// SPDX-License-Identifier: AGPL-3.0 + package jobqueue import ( diff --git a/database.go b/database.go index a34c63395eb691c2cb3389680891636c94f5701c..5cb3ed1434a4ec42d7e30ecd1317edd74e674e3e 100644 --- a/database.go +++ b/database.go @@ -1,8 +1,12 @@ +// Copyright 2024 schukai GmbH +// SPDX-License-Identifier: AGPL-3.0 + package jobqueue import ( - "errors" "gorm.io/gorm" + "strings" + "time" ) func (s *JobSyncer) DeleteJob(job GenericJob) error { @@ -15,19 +19,15 @@ func (s *JobSyncer) DeleteJob(job GenericJob) error { return s.manager.database.Transaction(func(tx *gorm.DB) error { permJob := job.GetPersistence() - if err := tx.Where("job_id = ?", permJob.GetID()).Delete(&JobLog{}).Error; err != nil { return err } - if err := tx.Where("job_id = ?", permJob.GetID()).Delete(&JobStats{}).Error; err != nil { return err } - if err := tx.Delete(&permJob).Error; err != nil { return err } - return nil }) } @@ -59,105 +59,173 @@ func (s *JobSyncer) ResetStats(job GenericJob) error { } job.ResetStats() - stats := job.GetStats() - return s.manager.database.Transaction(func(tx *gorm.DB) error { - return tx.Model(&JobStats{}).Where("job_id = ?", job.GetID()).Select("*").Omit("deleted_at", "created_at", "job_id").Updates(stats).Error - - }) + if s.manager == nil || s.manager.database == nil { + return ErrNoDatabaseConnection + } + return saveJob(job, s.manager.database) + //stats := job.GetStats() + //return s.manager.database.Transaction(func(tx *gorm.DB) error { + // return tx.Model(&JobStats{}).Where("job_id = ?", + // job.GetID()). + // Select("*").Omit("deleted_at", "created_at", "job_id").Updates(stats).Error + // + //}) } -func (s *JobSyncer) CreateOrUpdateJob(job GenericJob) error { - +func (s *JobSyncer) SaveJob(job GenericJob) error { s.mu.Lock() defer s.mu.Unlock() - if s.manager == nil || s.manager.database == nil { return ErrNoDatabaseConnection } - - return createOrUpdateJob(job, s.manager.database) - + return saveJob(job, s.manager.database) } -func createOrUpdateJob(job GenericJob, db *gorm.DB) error { - - return db.Transaction(func(tx *gorm.DB) error { +func save(job *JobPersistence, db *gorm.DB) error { - permJob := job.GetPersistence() + if db == nil { + return ErrNoDatabaseConnection + } - memLogs := permJob.Logs - permJob.Logs = nil - - var existingJob JobPersistence - result := tx.Unscoped().Where("id = ?", permJob.GetID()).First(&existingJob) - - if result.Error != nil { - if errors.Is(result.Error, gorm.ErrRecordNotFound) { - err := tx.Create(&permJob).Error - if err != nil { - Trace("Error while creating job", "error", err) - return err - } - } else { - Trace("Error while creating job", "error", result.Error) - return result.Error - } - } else { - - // remove deleted from - r := tx.Unscoped().Model(&existingJob).Where("id = ?", existingJob.ID).Select("*"). - Omit("deleted_at", "created_at", "job_id").Update("deleted_at", nil) - if r.Error != nil { - Trace("Error while deleting job", "error", r.Error) - return r.Error - } + return db.Transaction(func(tx *gorm.DB) error { - // update scheduler - resultStats := tx.Model(&existingJob).Where("id = ?", existingJob.ID).Select("*"). - Omit("deleted_at", "created_at", "job_id").UpdateColumns(SchedulerPersistence{ - Type: "", - Interval: 0, - Spec: "", - Delay: 0, - Event: "", - Time: nil, - Executed: false, - }) - - if resultStats.Error != nil { - Trace("Error while updating job stats", "error", resultStats.Error) - return resultStats.Error - } + if err := tx.Save(job).Error; err != nil { + return err + } - r2 := tx.Model(&existingJob).Where("id = ?", existingJob.ID).Select("*"). - Omit("deleted_at", "created_at", "job_id").Updates(permJob) - if r2.Error != nil { - return r2.Error + if job.Stats != (JobStats{}) { + job.Stats.JobID = job.ID + if err := tx.Save(&job.Stats).Error; err != nil { + return err } } - if tx.Error != nil { - Trace("Error while updating job", "error", tx.Error) - return tx.Error + for i, _ := range job.Logs { + job.Logs[i].LogID = 0 + _ = tx.Create(&job.Logs[i]) + // no error handling, if it fails, it fails } - tx.Model(&permJob.Stats). - Select("*").Omit("deleted_at", "created_at", "job_id"). - UpdateColumns(permJob.Stats) + return nil + }) +} - if tx.Error != nil { - Error("Error while updating job stats", "error", tx.Error) - return result.Error - } +func saveJob(job GenericJob, db *gorm.DB) error { - for i, _ := range memLogs { - memLogs[i].LogID = 0 - _ = tx.Create(&memLogs[i]) - // no error handling, if it fails, it fails + if db == nil { + return ErrNoDatabaseConnection + } + + permJob := job.GetPersistence() + + maxRetries := 3 + var attempt int + for attempt = 0; attempt < maxRetries; attempt++ { + err := save(&permJob, db) + if err != nil { + if strings.Contains(err.Error(), "lock") { + time.Sleep(time.Millisecond * 100) + continue + } + return err } + break + } - return nil + if attempt == maxRetries { + return ErrMaxRetriesReached + } - }) + return nil + + //return db.Transaction(func(tx *gorm.DB) error { + // + // permJob := job.GetPersistence() + // + // memLogs := permJob.Logs + // permJob.Logs = nil + // + // var existingJob JobPersistence + // result := tx.Unscoped().Where("id = ?", permJob.GetID()).First(&existingJob) + // + // if result.Error != nil { + // err, done := createJob(tx, result, permJob) + // if done { + // return err + // } + // } else { + // + // err, done := updateJob(tx, existingJob, permJob) + // if done { + // return err + // } + // } + // + // if tx.Error != nil { + // Trace("Error while updating job", "error", tx.Error) + // return tx.Error + // } + // + // return nil + // + //}) } + +// +//func updateJob(tx *gorm.DB, existingJob JobPersistence, permJob JobPersistence) (error, bool) { +// +// r := tx.Unscoped().Model(&existingJob).Select("*"). +// Omit("deleted_at", "created_at", "job_id"). +// Update("deleted_at", nil) +// +// if r.Error != nil { +// Trace("Error while deleting job", "error", r.Error) +// return r.Error, true +// } +// +// resultStats := tx.Model(&existingJob).Select("*"). +// Omit("deleted_at", "created_at", "job_id").Update() +// +// if resultStats.Error != nil { +// Trace("Error while updating job stats", "error", resultStats.Error) +// return resultStats.Error, true +// } +// +// r2 := tx.Model(&existingJob).Select("*"). +// Omit("deleted_at", "created_at", "job_id").Updates(permJob) +// if r2.Error != nil { +// return r2.Error, true +// } +// +// tx.Model(&permJob.Stats). +// Select("*").Omit("deleted_at", "created_at", "job_id"). +// UpdateColumns(permJob.Stats) +// +// if tx.Error != nil { +// Error("Error while updating job stats", "error", tx.Error) +// return result.Error +// } +// +// for i, _ := range memLogs { +// memLogs[i].LogID = 0 +// _ = tx.Create(&memLogs[i]) +// // no error handling, if it fails, it fails +// } +// +// return nil, false +//} +// +//func createJob(tx *gorm.DB, result *gorm.DB, permJob JobPersistence) (error, bool) { +// if errors.Is(result.Error, gorm.ErrRecordNotFound) { +// err := tx.Create(&permJob).Error +// if err != nil { +// Trace("Error while creating job", "error", err) +// return err, true +// } +// } else { +// Trace("Error while creating job", "error", result.Error) +// return result.Error, true +// } +// return nil, false +//} diff --git a/database_test.go b/database_test.go index f2a8269ccf523045c8c7114dc41c3044b9470139..1300fa31d68364b6ee45522000c93d4ac657e22d 100644 --- a/database_test.go +++ b/database_test.go @@ -1,3 +1,6 @@ +// Copyright 2024 schukai GmbH +// SPDX-License-Identifier: AGPL-3.0 + package jobqueue import ( @@ -24,7 +27,7 @@ func TestDeleteJob(t *testing.T) { // Erstelle einen Job zum Löschen runner := &CounterRunnable{} job := NewJob[CounterResult]("testJobID", runner) - err = createOrUpdateJob(job, db) + err = saveJob(job, db) assert.Nil(t, err) var count int64 @@ -56,7 +59,7 @@ func TestResetLogs(t *testing.T) { // Erstelle einen Job und füge einige Logs hinzu runner := &CounterRunnable{} job := NewJob[CounterResult]("testJobID", runner) - err = createOrUpdateJob(job, db) + err = saveJob(job, db) assert.Nil(t, err) // Füge Logs zum Job hinzu @@ -97,7 +100,7 @@ func TestResetStats(t *testing.T) { // Erstelle einen Job und setze einige Statistiken runner := &CounterRunnable{} job := NewJob[CounterResult]("testJobID", runner) - err = createOrUpdateJob(job, db) + err = saveJob(job, db) assert.Nil(t, err) // Aktualisiere die Job-Statistiken diff --git a/errors.go b/errors.go index eafc6bbbb96dedc934bc0a6fd9bfa203d596dd4c..97a20f12bf51a2beb495d89ad329bb0231d5cbbd 100644 --- a/errors.go +++ b/errors.go @@ -52,4 +52,6 @@ var ( ErrInvalidDuration = fmt.Errorf("invalid duration") ErrJobSyncerAlreadyRunning = fmt.Errorf("JobSyncer is already running") ErrJobSyncerNotRunning = fmt.Errorf("JobSyncer is not running") + ErrMaxRetriesReached = fmt.Errorf("maximum number of retries reached") + ErrTimeoutReached = fmt.Errorf("timeout reached") ) diff --git a/job-syncer.go b/job-syncer.go index 53b4a1a22e80384b07a394b6afee609671e46bec..488569dd80e7ac42beb982807ceafd90831251cb 100644 --- a/job-syncer.go +++ b/job-syncer.go @@ -5,165 +5,83 @@ package jobqueue import ( "sync" + "time" ) type JobSyncer struct { - jobQueue []GenericJob - queueLock sync.Mutex - notifyChannel chan struct{} - stopChan chan struct{} - jobSaveProgress sync.WaitGroup - status Status - manager *Manager - mu sync.Mutex - migrateFlag bool -} - -type Status int - -const ( - JobSyncerStatusStopped Status = iota - JobSyncerStatusRunning -) - -func CreateAndStartJobSyncer[P *Promise[*JobSyncer]](manager *Manager) *Promise[*JobSyncer] { - - s := NewJobSyncer(manager) - - s.mu.Lock() - defer s.mu.Unlock() - - return NewPromise[*JobSyncer](func(resolve func(*JobSyncer), reject func(error)) { - - if s.manager == nil || s.manager.database == nil { - reject(ErrNoDatabaseConnection) - return - } - - if s.status == JobSyncerStatusRunning { - resolve(s) - return - } - - db := s.manager.database - if !s.migrateFlag { - err := db.AutoMigrate(&JobPersistence{}, &JobLog{}, &JobStats{}) - if err != nil { - reject(err) - return - } - s.migrateFlag = true - } - - err := s.Start() - if err != nil { - reject(err) - return - } - resolve(s) - }) + mu sync.Mutex + migrated bool + manager *Manager + running sync.WaitGroup + lastError error } func NewJobSyncer(manager *Manager) *JobSyncer { - return &JobSyncer{ - jobQueue: make([]GenericJob, 0), - manager: manager, + js := &JobSyncer{ + manager: manager, + mu: sync.Mutex{}, } -} -func (js *JobSyncer) Start() error { - js.mu.Lock() - defer js.mu.Unlock() - - if js.status == JobSyncerStatusRunning { - return ErrJobSyncerAlreadyRunning + if manager == nil || manager.database == nil { + return js } - js.notifyChannel = make(chan struct{}, 1) // Buffer to avoid blocking - js.stopChan = make(chan struct{}) - js.status = JobSyncerStatusRunning - - go js.runWorker() - return nil -} - -func (js *JobSyncer) runWorker() { - for { - select { - case <-js.notifyChannel: - js.processJobs() - case <-js.stopChan: - js.cleanup() - return + db := js.manager.database + if !js.migrated { + err := db.AutoMigrate(&JobPersistence{}, &JobLog{}, &JobStats{}) + if err != nil { + js.lastError = err + } else { + js.migrated = true } } -} - -func (js *JobSyncer) processJobs() { - for { - js.queueLock.Lock() - if len(js.jobQueue) == 0 { - js.queueLock.Unlock() - return - } - job := js.jobQueue[0] - js.jobQueue = js.jobQueue[1:] - js.queueLock.Unlock() - js.jobSaveProgress.Add(1) - js.processJob(job) - js.jobSaveProgress.Done() - } + return js } +func (js *JobSyncer) Wait(timeout time.Duration) error { + done := make(chan struct{}) -func (js *JobSyncer) AddJob(job GenericJob) { - js.queueLock.Lock() - - // check if job is already in queue - for _, j := range js.jobQueue { - if j.GetID() == job.GetID() { - js.queueLock.Unlock() - return - } - } - - js.jobQueue = append(js.jobQueue, job) - js.queueLock.Unlock() + go func() { + js.running.Wait() + close(done) + }() - js.jobSaveProgress.Add(1) - - // Non-blocking notify select { - case js.notifyChannel <- struct{}{}: - default: + case <-done: + // Die WaitGroup ist fertig + return nil + case <-time.After(timeout): + return ErrTimeoutReached } } -func (js *JobSyncer) processJob(job GenericJob) { - defer js.jobSaveProgress.Done() - err := createOrUpdateJob(job, js.manager.database) - Error("Error while creating or updating job", "error", err) -} - -func (js *JobSyncer) Stop() error { +func (js *JobSyncer) Sync(job GenericJob) { js.mu.Lock() - if js.status != JobSyncerStatusRunning { - js.mu.Unlock() - return ErrJobSyncerNotRunning - } - js.status = JobSyncerStatusStopped - js.mu.Unlock() + defer js.mu.Unlock() - close(js.stopChan) - js.jobSaveProgress.Wait() + js.running.Add(1) + go func() { + defer js.running.Done() + err := saveJob(job, js.manager.GetDB()) + if err != nil { + Error("Error while creating or updating job", err) + } + }() - return nil } -func (js *JobSyncer) cleanup() { +func (js *JobSyncer) LastError() error { js.mu.Lock() defer js.mu.Unlock() + return js.lastError +} - js.jobSaveProgress.Wait() - js.status = JobSyncerStatusStopped +// Migrated returns if the database has been migrated. +// +// No parameters. +// Returns a boolean. +func (js *JobSyncer) Migrated() bool { + js.mu.Lock() + defer js.mu.Unlock() + return js.migrated } diff --git a/job-syncer_test.go b/job-syncer_test.go index c0453f3886592959c62228e50b4eb901a9bbe4d7..b67929698d15146d34a0e2b41c4cf2b7bcf35999 100644 --- a/job-syncer_test.go +++ b/job-syncer_test.go @@ -24,31 +24,16 @@ func TestSaveJobWithSQLite(t *testing.T) { //saver := NewJobSyncer(manager) // Starte den DBSaver - p := CreateAndStartJobSyncer(manager) - - ready := make(chan struct{}) - var saver *JobSyncer - - Then[*JobSyncer, *JobSyncer](p, func(value *JobSyncer) (*JobSyncer, error) { - saver = value - close(ready) - return value, nil - }, func(e error) error { - close(ready) - Error("Error while starting db saver", "error", err) - return nil - }) - - <-ready + saver := NewJobSyncer(manager) jobID := JobID("testJob") job := NewJob[CounterResult](jobID, &CounterRunnable{}) - saver.AddJob(job) + saver.Sync(job) time.Sleep(100 * time.Millisecond) - saver.Stop() + manager.WaitSync() var count int64 gormDB.Model(&JobPersistence{}).Count(&count) diff --git a/logger.go b/logger.go index 241b6b1f1a0f86c498cb61b7b963e0bdaaa03e39..9fe6a9a1a2af958db3a2acdcc6a7d3da860c788c 100644 --- a/logger.go +++ b/logger.go @@ -1,3 +1,6 @@ +// Copyright 2024 schukai GmbH +// SPDX-License-Identifier: AGPL-3.0 + package jobqueue import ( @@ -21,19 +24,19 @@ type DefaultLogger struct { } func (l *DefaultLogger) Info(msg string, keysAndValues ...interface{}) { - _ = l.Output(2, "INFO: "+msg) + l.Print("INFO: "+msg, keysAndValues) } func (l *DefaultLogger) Error(msg string, keysAndValues ...interface{}) { - _ = l.Output(2, "ERROR: "+msg) + l.Print("ERROR: "+msg, keysAndValues) } func (l *DefaultLogger) Warn(msg string, keysAndValues ...interface{}) { - _ = l.Output(2, "WARN: "+msg) + l.Print("WARN: "+msg, keysAndValues) } func (l *DefaultLogger) Trace(msg string, keysAndValues ...interface{}) { - _ = l.Output(2, "DEBUG: "+msg) + l.Print("DEBUG: "+msg, keysAndValues) } // Ensure DefaultLogger satisfies the Logger interface. diff --git a/logger_test.go b/logger_test.go index 901ba8cc8d7c74a280c43365da45104848ddfdb4..e014afb170ef4a4b8ac27079bd3d26c24df59299 100644 --- a/logger_test.go +++ b/logger_test.go @@ -1,3 +1,6 @@ +// Copyright 2024 schukai GmbH +// SPDX-License-Identifier: AGPL-3.0 + package jobqueue import ( diff --git a/manager.go b/manager.go index 848458bf51fc22c954b4a6464e1e0c6fa6733897..6270407e8eb6a1cd64fa80304da0b46cc1505142 100644 --- a/manager.go +++ b/manager.go @@ -153,7 +153,6 @@ func (m *Manager) DeleteJob(id JobID) error { } if m.jobSyncer != nil { - err := m.jobSyncer.DeleteJob(job) if err != nil { return err @@ -164,6 +163,23 @@ func (m *Manager) DeleteJob(id JobID) error { } +func (m *Manager) WaitSync() { + m.mu.Lock() + defer m.mu.Unlock() + if m.jobSyncer != nil { + _ = m.jobSyncer.Wait(2 * time.Second) + } +} + +func (m *Manager) Sync(job GenericJob) { + m.mu.Lock() + defer m.mu.Unlock() + + if m.jobSyncer != nil { + m.jobSyncer.Sync(job) + } +} + // RemoveJob removes a job from the active jobs // If you want to remove a job from the active jobs and the database, use DeleteJob instead func (m *Manager) RemoveJob(id JobID) error { @@ -289,12 +305,6 @@ func (m *Manager) SetDB(db *gorm.DB) *Manager { m.mu.Lock() defer m.mu.Unlock() m.database = db - - if m.jobSyncer != nil { - return m - } - m.jobSyncer = NewJobSyncer(m) - return m } @@ -397,31 +407,8 @@ func (m *Manager) Start() error { return ErrManagerAlreadyRunning } - if m.jobSyncer != nil { - p := CreateAndStartJobSyncer(m) - - ready := make(chan struct{}) - - var jobSyncerErr error - - Then[*JobSyncer, *JobSyncer](p, func(value *JobSyncer) (*JobSyncer, error) { - close(ready) - m.mu.Lock() - m.jobSyncer = value - m.mu.Unlock() - return value, nil - }, func(e error) error { - close(ready) - Error("Error while starting db saver", "error", e) - jobSyncerErr = e - return nil - }) - - <-ready - - if jobSyncerErr != nil { - return jobSyncerErr - } + if m.jobSyncer == nil { + m.jobSyncer = NewJobSyncer(m) } if len(m.workerMap) == 0 { @@ -470,6 +457,18 @@ func (m *Manager) Start() error { } +func safeClose(ch chan interface{}) (err error) { + + defer func() { + if recover() != nil { + err = ErrChannelAlreadyClosed + } + }() + + close(ch) + return +} + // Stop stops the manager func (m *Manager) Stop() error { m.mu.Lock() @@ -482,6 +481,7 @@ func (m *Manager) Stop() error { m.eventBus.Unsubscribe(QueueJob, m.jobEventCh) m.eventBus.Unsubscribe(JobReady, m.jobEventCh) + m.eventBus.Shutdown() _ = safeClose(m.jobEventCh) var wrappedErr error @@ -508,15 +508,7 @@ func (m *Manager) Stop() error { } if m.jobSyncer != nil { - err = m.jobSyncer.Stop() - if err != nil { - if wrappedErr == nil { - wrappedErr = fmt.Errorf("Error: ") - } - - wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error()) - - } + _ = m.jobSyncer.Wait(10 * time.Second) } return wrappedErr @@ -530,6 +522,8 @@ func (m *Manager) SetLogger(logger Logger) *Manager { m.database = m.database.Session(&gorm.Session{Logger: newGormAdapter()}) } + SetLogger(logger) + return m } @@ -580,7 +574,7 @@ func (m *Manager) ScheduleJob(job GenericJob, scheduler Scheduler) error { m.activeJobs[job.GetID()] = job if m.jobSyncer != nil { - m.jobSyncer.AddJob(job) + m.jobSyncer.Sync(job) } return nil diff --git a/manager_test.go b/manager_test.go index 58356733ac0e8e6ee75cb757b1fa43584c92d7c3..525891c28fec90a818026299bfc08b752c1ce62c 100644 --- a/manager_test.go +++ b/manager_test.go @@ -253,11 +253,14 @@ func TestManagerEventHandling(t *testing.T) { startTime := time.Now() for { - if job.runner.(*CounterRunnable).GetCount() > 10 { + currentCount := job.runner.(*CounterRunnable).GetCount() + if currentCount > 10 { break } - if time.Since(startTime) > 10*time.Second { + time.Sleep(2 * time.Millisecond) + + if time.Since(startTime) > 1*time.Second { t.Fatalf("Job did not finish in time") } } diff --git a/promise.go b/promise.go index 704f5c4eb866fb7e68459a60dc7a458c897cc80a..954c9de1da4bef6e0d221e910de553ceb2e6828a 100644 --- a/promise.go +++ b/promise.go @@ -1,3 +1,6 @@ +// Copyright 2024 schukai GmbH +// SPDX-License-Identifier: AGPL-3.0 + package jobqueue import ( diff --git a/promise_test.go b/promise_test.go index ba3499f1113aa94f1015efc76416870df07e3e38..513adbdf7f62b9ed619194c7a25e92523f8bf556 100644 --- a/promise_test.go +++ b/promise_test.go @@ -1,3 +1,6 @@ +// Copyright 2024 schukai GmbH +// SPDX-License-Identifier: AGPL-3.0 + package jobqueue import ( diff --git a/util.go b/util.go index 28567b5aa53d5cbee4b3c9f8497cb573df913c49..05803bdb6c2323c13112327228db65ba3ae4510c 100644 --- a/util.go +++ b/util.go @@ -2,17 +2,3 @@ // SPDX-License-Identifier: AGPL-3.0 package jobqueue - -// safeClose closes the given channel and returns an error if the channel is already closed -func safeClose(ch chan interface{}) (err error) { - defer func() { - if recover() != nil { - err = ErrChannelAlreadyClosed - } - }() - - err = nil - - close(ch) - return -} diff --git a/worker.go b/worker.go index 836a474f2a68bf24d4868317409cabbd0946570d..c0a70de71cc2abaa8b79acf20da347fb1b66a6af 100644 --- a/worker.go +++ b/worker.go @@ -237,16 +237,15 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel cancel() - if w.manager != nil { - go func() { - w.manager.mu.Lock() - if w.manager.jobSyncer != nil { - w.manager.jobSyncer.AddJob(job) - } - w.manager.mu.Unlock() - }() - } - + go func() { + w.mu.Lock() + defer w.mu.Unlock() + if w.manager != nil { + w.manager.Sync(job) + } + + }() + w.statisticMu.Lock() w.statistic.ActiveThreads-- w.statisticMu.Unlock()