From 7631413f5b9b069164dd05966fc930d418b7ebd9 Mon Sep 17 00:00:00 2001 From: Volker Schukai <volker.schukai@schukai.com> Date: Sat, 23 Mar 2024 14:36:35 +0100 Subject: [PATCH] fix: simplify save #49 --- database-1_test.go | 27 +---- database-2_test.go | 25 +---- database-4_test.go | 46 ++++++-- database-5_test.go | 9 +- database-6_test.go | 6 +- database-7_test.go | 7 +- database-logging.go | 3 + database-logging_test.go | 3 + database.go | 232 +++++++++++++++++++++++++-------------- database_test.go | 9 +- errors.go | 2 + job-syncer.go | 184 +++++++++---------------------- job-syncer_test.go | 21 +--- logger.go | 11 +- logger_test.go | 3 + manager.go | 78 ++++++------- manager_test.go | 7 +- promise.go | 3 + promise_test.go | 3 + util.go | 14 --- worker.go | 19 ++-- 21 files changed, 337 insertions(+), 375 deletions(-) diff --git a/database-1_test.go b/database-1_test.go index db34016..7e04839 100644 --- a/database-1_test.go +++ b/database-1_test.go @@ -25,23 +25,7 @@ func TestWriteToDB1(t *testing.T) { manager := &Manager{database: gormDB} // Starte den DBSaver - promise := CreateAndStartJobSyncer(manager) - - ready := make(chan struct{}) - - var saver *JobSyncer - - Then[*JobSyncer, *JobSyncer](promise, func(value *JobSyncer) (*JobSyncer, error) { - close(ready) - saver = value - return value, nil - }, func(e error) error { - close(ready) - Error("Error while starting db saver", "error", err) - return nil - }) - - <-ready + saver := NewJobSyncer(manager) runner := &CounterRunnable{} job := NewJob[CounterResult]("job1", runner) @@ -53,13 +37,10 @@ func TestWriteToDB1(t *testing.T) { job.scheduler = scheduler - saver.AddJob(job) - saver.AddJob(job) - - time.Sleep(1 * time.Second) + saver.Sync(job) + saver.Sync(job) - err = saver.Stop() - assert.Nil(t, err) + manager.WaitSync() // check if stats are in database var stats JobPersistence diff --git a/database-2_test.go b/database-2_test.go index 8673f8a..266ca73 100644 --- a/database-2_test.go +++ b/database-2_test.go @@ -24,28 +24,11 @@ func TestWriteToDB2(t *testing.T) { manager := &Manager{database: gormDB} - // Starte den DBSaver - p := CreateAndStartJobSyncer(manager) - - ready := make(chan struct{}) - - //var saver *JobSyncer - - Then[*JobSyncer, *JobSyncer](p, func(value *JobSyncer) (*JobSyncer, error) { - close(ready) - // saver = value - return value, nil - }, func(e error) error { - close(ready) - Error("Error while starting db saver", "error", err) - return nil - }) + NewJobSyncer(manager) db := gormDB - <-ready - var wg sync.WaitGroup wg.Add(1) @@ -99,7 +82,7 @@ func TestWriteToDB2(t *testing.T) { time.Sleep(1 * time.Second) - mgr.jobSyncer.AddJob(job) + mgr.jobSyncer.Sync(job) runtime.Gosched() time.Sleep(1 * time.Second) @@ -116,7 +99,7 @@ func TestWriteToDB2(t *testing.T) { runtime.Gosched() time.Sleep(1 * time.Second) - mgr.jobSyncer.AddJob(job) + mgr.jobSyncer.Sync(job) time.Sleep(2 * time.Second) err = mgr.CancelJobSchedule("job1") @@ -143,6 +126,6 @@ func TestWriteToDB2(t *testing.T) { time.Sleep(1 * time.Second) - mgr.jobSyncer.AddJob(job) + mgr.jobSyncer.Sync(job) } diff --git a/database-4_test.go b/database-4_test.go index 2cc20f2..127311e 100644 --- a/database-4_test.go +++ b/database-4_test.go @@ -6,13 +6,22 @@ import ( "github.com/stretchr/testify/assert" "gorm.io/driver/sqlite" "gorm.io/gorm" + "os" "testing" "time" ) func TestWriteToDB4(t *testing.T) { - db, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{}) + testFile := "/home/vs/workspaces/alvine/cloud/framework/dummy.sqlite" + //remove if exists + if _, err := os.Stat(testFile); err == nil { + err = os.Remove(testFile) + assert.Nil(t, err) + } + + // db, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{}) + db, err := gorm.Open(sqlite.Open("file:"+testFile+"?cache=shared"), &gorm.Config{}) if err != nil { t.Fatalf("a error occurred while opening the database: %v", err) } @@ -28,18 +37,28 @@ func TestWriteToDB4(t *testing.T) { runner := &CounterRunnable{} job := NewJob[CounterResult]("job3", runner) + id := job.GetID() + + job.mu.Lock() job.stats = JobStats{ - JobID: job.GetID(), + JobID: id, RunCount: 20, SuccessCount: 30, ErrorCount: 40, } + job.mu.Unlock() + + db.Logger = db.Logger.LogMode(4) scheduler := &InstantScheduler{} err = manager.ScheduleJob(job, scheduler) assert.Nil(t, err) - time.Sleep(500 * time.Millisecond) + time.Sleep(200 * time.Millisecond) + + assert.Equal(t, 21, job.GetStats().RunCount) + + manager.WaitSync() // check is stats are the values above var tmpJob JobPersistence @@ -48,16 +67,18 @@ func TestWriteToDB4(t *testing.T) { assert.Nil(t, err) // Validate the fields + + stats := tmpJob.GetStats() + assert.Equal(t, JobID("job3"), tmpJob.ID) - assert.Equal(t, 21, tmpJob.Stats.RunCount) - assert.Equal(t, 31, tmpJob.Stats.SuccessCount) - assert.Equal(t, 40, tmpJob.Stats.ErrorCount) + assert.Equal(t, 21, stats.RunCount) + assert.Equal(t, 31, stats.SuccessCount) + assert.Equal(t, 40, stats.ErrorCount) // reset stats err = manager.ResetJobStats(job.GetID()) assert.Nil(t, err) - - time.Sleep(500 * time.Millisecond) + time.Sleep(100 * time.Millisecond) var tmpJob2 JobPersistence // check is stats are the values above @@ -66,10 +87,13 @@ func TestWriteToDB4(t *testing.T) { assert.Nil(t, err) // Validate the fields + + stats2 := tmpJob2.GetStats() + assert.Equal(t, JobID("job3"), tmpJob2.ID) - assert.Equal(t, 0, tmpJob2.Stats.RunCount) - assert.Equal(t, 0, tmpJob2.Stats.SuccessCount) - assert.Equal(t, 0, tmpJob2.Stats.ErrorCount) + assert.Equal(t, 0, stats2.RunCount) + assert.Equal(t, 0, stats2.SuccessCount) + assert.Equal(t, 0, stats2.ErrorCount) err = manager.DeleteJob(job.GetID()) assert.Nil(t, err) diff --git a/database-5_test.go b/database-5_test.go index bdd667a..adf2041 100644 --- a/database-5_test.go +++ b/database-5_test.go @@ -7,7 +7,6 @@ import ( "gorm.io/driver/sqlite" "gorm.io/gorm" "testing" - "time" ) func TestWriteToDB5(t *testing.T) { @@ -36,19 +35,17 @@ func TestWriteToDB5(t *testing.T) { sameIDJob := NewJob[CounterResult]("jobSameID", runner) - time.Sleep(500 * time.Millisecond) + manager.WaitSync() // Trying to save a job with the same ID should do nothing mgr.mu.Lock() - mgr.jobSyncer.AddJob(sameIDJob) + mgr.jobSyncer.Sync(sameIDJob) mgr.mu.Unlock() err = mgr.CancelJobSchedule("jobSameID") assert.Nil(t, err) - mgr.mu.Lock() - err = mgr.jobSyncer.Stop() - mgr.mu.Unlock() + manager.WaitSync() assert.Nil(t, err) diff --git a/database-6_test.go b/database-6_test.go index 272a05a..0e72761 100644 --- a/database-6_test.go +++ b/database-6_test.go @@ -48,15 +48,13 @@ func TestWriteToDB6(t *testing.T) { assert.Nil(t, err) mgr.mu.Lock() - mgr.jobSyncer.AddJob(job) + mgr.jobSyncer.Sync(job) mgr.mu.Unlock() time.Sleep(10 * time.Millisecond) } - mgr.mu.Lock() - _ = mgr.jobSyncer.Stop() - mgr.mu.Unlock() + manager.WaitSync() time.Sleep(2 * time.Second) diff --git a/database-7_test.go b/database-7_test.go index 2bb2b13..963cdab 100644 --- a/database-7_test.go +++ b/database-7_test.go @@ -1,3 +1,6 @@ +// Copyright 2024 schukai GmbH +// SPDX-License-Identifier: AGPL-3.0 + package jobqueue import ( @@ -17,7 +20,7 @@ func TestCreateOrUpdateJob(t *testing.T) { runner := &CounterRunnable{} job := NewJob[CounterResult]("job1", runner) - assert.NoError(t, createOrUpdateJob(job, db)) + assert.NoError(t, saveJob(job, db)) var jobPersistence JobPersistence assert.NoError(t, db.First(&jobPersistence, "id = ?", job.GetID()).Error) @@ -26,7 +29,7 @@ func TestCreateOrUpdateJob(t *testing.T) { assert.Equal(t, "", jobPersistence.Description) assert.Equal(t, Priority(1), jobPersistence.Priority) job.description = "Updated description" - assert.NoError(t, createOrUpdateJob(job, db)) + assert.NoError(t, saveJob(job, db)) assert.NoError(t, db.First(&jobPersistence, "id = ?", job.GetID()).Error) assert.Equal(t, "Updated description", jobPersistence.Description) diff --git a/database-logging.go b/database-logging.go index aa2c26d..7354a01 100644 --- a/database-logging.go +++ b/database-logging.go @@ -1,3 +1,6 @@ +// Copyright 2024 schukai GmbH +// SPDX-License-Identifier: AGPL-3.0 + package jobqueue import ( diff --git a/database-logging_test.go b/database-logging_test.go index f625c1f..ace6f4d 100644 --- a/database-logging_test.go +++ b/database-logging_test.go @@ -1,3 +1,6 @@ +// Copyright 2024 schukai GmbH +// SPDX-License-Identifier: AGPL-3.0 + package jobqueue import ( diff --git a/database.go b/database.go index a34c633..5cb3ed1 100644 --- a/database.go +++ b/database.go @@ -1,8 +1,12 @@ +// Copyright 2024 schukai GmbH +// SPDX-License-Identifier: AGPL-3.0 + package jobqueue import ( - "errors" "gorm.io/gorm" + "strings" + "time" ) func (s *JobSyncer) DeleteJob(job GenericJob) error { @@ -15,19 +19,15 @@ func (s *JobSyncer) DeleteJob(job GenericJob) error { return s.manager.database.Transaction(func(tx *gorm.DB) error { permJob := job.GetPersistence() - if err := tx.Where("job_id = ?", permJob.GetID()).Delete(&JobLog{}).Error; err != nil { return err } - if err := tx.Where("job_id = ?", permJob.GetID()).Delete(&JobStats{}).Error; err != nil { return err } - if err := tx.Delete(&permJob).Error; err != nil { return err } - return nil }) } @@ -59,105 +59,173 @@ func (s *JobSyncer) ResetStats(job GenericJob) error { } job.ResetStats() - stats := job.GetStats() - return s.manager.database.Transaction(func(tx *gorm.DB) error { - return tx.Model(&JobStats{}).Where("job_id = ?", job.GetID()).Select("*").Omit("deleted_at", "created_at", "job_id").Updates(stats).Error - - }) + if s.manager == nil || s.manager.database == nil { + return ErrNoDatabaseConnection + } + return saveJob(job, s.manager.database) + //stats := job.GetStats() + //return s.manager.database.Transaction(func(tx *gorm.DB) error { + // return tx.Model(&JobStats{}).Where("job_id = ?", + // job.GetID()). + // Select("*").Omit("deleted_at", "created_at", "job_id").Updates(stats).Error + // + //}) } -func (s *JobSyncer) CreateOrUpdateJob(job GenericJob) error { - +func (s *JobSyncer) SaveJob(job GenericJob) error { s.mu.Lock() defer s.mu.Unlock() - if s.manager == nil || s.manager.database == nil { return ErrNoDatabaseConnection } - - return createOrUpdateJob(job, s.manager.database) - + return saveJob(job, s.manager.database) } -func createOrUpdateJob(job GenericJob, db *gorm.DB) error { - - return db.Transaction(func(tx *gorm.DB) error { +func save(job *JobPersistence, db *gorm.DB) error { - permJob := job.GetPersistence() + if db == nil { + return ErrNoDatabaseConnection + } - memLogs := permJob.Logs - permJob.Logs = nil - - var existingJob JobPersistence - result := tx.Unscoped().Where("id = ?", permJob.GetID()).First(&existingJob) - - if result.Error != nil { - if errors.Is(result.Error, gorm.ErrRecordNotFound) { - err := tx.Create(&permJob).Error - if err != nil { - Trace("Error while creating job", "error", err) - return err - } - } else { - Trace("Error while creating job", "error", result.Error) - return result.Error - } - } else { - - // remove deleted from - r := tx.Unscoped().Model(&existingJob).Where("id = ?", existingJob.ID).Select("*"). - Omit("deleted_at", "created_at", "job_id").Update("deleted_at", nil) - if r.Error != nil { - Trace("Error while deleting job", "error", r.Error) - return r.Error - } + return db.Transaction(func(tx *gorm.DB) error { - // update scheduler - resultStats := tx.Model(&existingJob).Where("id = ?", existingJob.ID).Select("*"). - Omit("deleted_at", "created_at", "job_id").UpdateColumns(SchedulerPersistence{ - Type: "", - Interval: 0, - Spec: "", - Delay: 0, - Event: "", - Time: nil, - Executed: false, - }) - - if resultStats.Error != nil { - Trace("Error while updating job stats", "error", resultStats.Error) - return resultStats.Error - } + if err := tx.Save(job).Error; err != nil { + return err + } - r2 := tx.Model(&existingJob).Where("id = ?", existingJob.ID).Select("*"). - Omit("deleted_at", "created_at", "job_id").Updates(permJob) - if r2.Error != nil { - return r2.Error + if job.Stats != (JobStats{}) { + job.Stats.JobID = job.ID + if err := tx.Save(&job.Stats).Error; err != nil { + return err } } - if tx.Error != nil { - Trace("Error while updating job", "error", tx.Error) - return tx.Error + for i, _ := range job.Logs { + job.Logs[i].LogID = 0 + _ = tx.Create(&job.Logs[i]) + // no error handling, if it fails, it fails } - tx.Model(&permJob.Stats). - Select("*").Omit("deleted_at", "created_at", "job_id"). - UpdateColumns(permJob.Stats) + return nil + }) +} - if tx.Error != nil { - Error("Error while updating job stats", "error", tx.Error) - return result.Error - } +func saveJob(job GenericJob, db *gorm.DB) error { - for i, _ := range memLogs { - memLogs[i].LogID = 0 - _ = tx.Create(&memLogs[i]) - // no error handling, if it fails, it fails + if db == nil { + return ErrNoDatabaseConnection + } + + permJob := job.GetPersistence() + + maxRetries := 3 + var attempt int + for attempt = 0; attempt < maxRetries; attempt++ { + err := save(&permJob, db) + if err != nil { + if strings.Contains(err.Error(), "lock") { + time.Sleep(time.Millisecond * 100) + continue + } + return err } + break + } - return nil + if attempt == maxRetries { + return ErrMaxRetriesReached + } - }) + return nil + + //return db.Transaction(func(tx *gorm.DB) error { + // + // permJob := job.GetPersistence() + // + // memLogs := permJob.Logs + // permJob.Logs = nil + // + // var existingJob JobPersistence + // result := tx.Unscoped().Where("id = ?", permJob.GetID()).First(&existingJob) + // + // if result.Error != nil { + // err, done := createJob(tx, result, permJob) + // if done { + // return err + // } + // } else { + // + // err, done := updateJob(tx, existingJob, permJob) + // if done { + // return err + // } + // } + // + // if tx.Error != nil { + // Trace("Error while updating job", "error", tx.Error) + // return tx.Error + // } + // + // return nil + // + //}) } + +// +//func updateJob(tx *gorm.DB, existingJob JobPersistence, permJob JobPersistence) (error, bool) { +// +// r := tx.Unscoped().Model(&existingJob).Select("*"). +// Omit("deleted_at", "created_at", "job_id"). +// Update("deleted_at", nil) +// +// if r.Error != nil { +// Trace("Error while deleting job", "error", r.Error) +// return r.Error, true +// } +// +// resultStats := tx.Model(&existingJob).Select("*"). +// Omit("deleted_at", "created_at", "job_id").Update() +// +// if resultStats.Error != nil { +// Trace("Error while updating job stats", "error", resultStats.Error) +// return resultStats.Error, true +// } +// +// r2 := tx.Model(&existingJob).Select("*"). +// Omit("deleted_at", "created_at", "job_id").Updates(permJob) +// if r2.Error != nil { +// return r2.Error, true +// } +// +// tx.Model(&permJob.Stats). +// Select("*").Omit("deleted_at", "created_at", "job_id"). +// UpdateColumns(permJob.Stats) +// +// if tx.Error != nil { +// Error("Error while updating job stats", "error", tx.Error) +// return result.Error +// } +// +// for i, _ := range memLogs { +// memLogs[i].LogID = 0 +// _ = tx.Create(&memLogs[i]) +// // no error handling, if it fails, it fails +// } +// +// return nil, false +//} +// +//func createJob(tx *gorm.DB, result *gorm.DB, permJob JobPersistence) (error, bool) { +// if errors.Is(result.Error, gorm.ErrRecordNotFound) { +// err := tx.Create(&permJob).Error +// if err != nil { +// Trace("Error while creating job", "error", err) +// return err, true +// } +// } else { +// Trace("Error while creating job", "error", result.Error) +// return result.Error, true +// } +// return nil, false +//} diff --git a/database_test.go b/database_test.go index f2a8269..1300fa3 100644 --- a/database_test.go +++ b/database_test.go @@ -1,3 +1,6 @@ +// Copyright 2024 schukai GmbH +// SPDX-License-Identifier: AGPL-3.0 + package jobqueue import ( @@ -24,7 +27,7 @@ func TestDeleteJob(t *testing.T) { // Erstelle einen Job zum Löschen runner := &CounterRunnable{} job := NewJob[CounterResult]("testJobID", runner) - err = createOrUpdateJob(job, db) + err = saveJob(job, db) assert.Nil(t, err) var count int64 @@ -56,7 +59,7 @@ func TestResetLogs(t *testing.T) { // Erstelle einen Job und füge einige Logs hinzu runner := &CounterRunnable{} job := NewJob[CounterResult]("testJobID", runner) - err = createOrUpdateJob(job, db) + err = saveJob(job, db) assert.Nil(t, err) // Füge Logs zum Job hinzu @@ -97,7 +100,7 @@ func TestResetStats(t *testing.T) { // Erstelle einen Job und setze einige Statistiken runner := &CounterRunnable{} job := NewJob[CounterResult]("testJobID", runner) - err = createOrUpdateJob(job, db) + err = saveJob(job, db) assert.Nil(t, err) // Aktualisiere die Job-Statistiken diff --git a/errors.go b/errors.go index eafc6bb..97a20f1 100644 --- a/errors.go +++ b/errors.go @@ -52,4 +52,6 @@ var ( ErrInvalidDuration = fmt.Errorf("invalid duration") ErrJobSyncerAlreadyRunning = fmt.Errorf("JobSyncer is already running") ErrJobSyncerNotRunning = fmt.Errorf("JobSyncer is not running") + ErrMaxRetriesReached = fmt.Errorf("maximum number of retries reached") + ErrTimeoutReached = fmt.Errorf("timeout reached") ) diff --git a/job-syncer.go b/job-syncer.go index 53b4a1a..488569d 100644 --- a/job-syncer.go +++ b/job-syncer.go @@ -5,165 +5,83 @@ package jobqueue import ( "sync" + "time" ) type JobSyncer struct { - jobQueue []GenericJob - queueLock sync.Mutex - notifyChannel chan struct{} - stopChan chan struct{} - jobSaveProgress sync.WaitGroup - status Status - manager *Manager - mu sync.Mutex - migrateFlag bool -} - -type Status int - -const ( - JobSyncerStatusStopped Status = iota - JobSyncerStatusRunning -) - -func CreateAndStartJobSyncer[P *Promise[*JobSyncer]](manager *Manager) *Promise[*JobSyncer] { - - s := NewJobSyncer(manager) - - s.mu.Lock() - defer s.mu.Unlock() - - return NewPromise[*JobSyncer](func(resolve func(*JobSyncer), reject func(error)) { - - if s.manager == nil || s.manager.database == nil { - reject(ErrNoDatabaseConnection) - return - } - - if s.status == JobSyncerStatusRunning { - resolve(s) - return - } - - db := s.manager.database - if !s.migrateFlag { - err := db.AutoMigrate(&JobPersistence{}, &JobLog{}, &JobStats{}) - if err != nil { - reject(err) - return - } - s.migrateFlag = true - } - - err := s.Start() - if err != nil { - reject(err) - return - } - resolve(s) - }) + mu sync.Mutex + migrated bool + manager *Manager + running sync.WaitGroup + lastError error } func NewJobSyncer(manager *Manager) *JobSyncer { - return &JobSyncer{ - jobQueue: make([]GenericJob, 0), - manager: manager, + js := &JobSyncer{ + manager: manager, + mu: sync.Mutex{}, } -} -func (js *JobSyncer) Start() error { - js.mu.Lock() - defer js.mu.Unlock() - - if js.status == JobSyncerStatusRunning { - return ErrJobSyncerAlreadyRunning + if manager == nil || manager.database == nil { + return js } - js.notifyChannel = make(chan struct{}, 1) // Buffer to avoid blocking - js.stopChan = make(chan struct{}) - js.status = JobSyncerStatusRunning - - go js.runWorker() - return nil -} - -func (js *JobSyncer) runWorker() { - for { - select { - case <-js.notifyChannel: - js.processJobs() - case <-js.stopChan: - js.cleanup() - return + db := js.manager.database + if !js.migrated { + err := db.AutoMigrate(&JobPersistence{}, &JobLog{}, &JobStats{}) + if err != nil { + js.lastError = err + } else { + js.migrated = true } } -} - -func (js *JobSyncer) processJobs() { - for { - js.queueLock.Lock() - if len(js.jobQueue) == 0 { - js.queueLock.Unlock() - return - } - job := js.jobQueue[0] - js.jobQueue = js.jobQueue[1:] - js.queueLock.Unlock() - js.jobSaveProgress.Add(1) - js.processJob(job) - js.jobSaveProgress.Done() - } + return js } +func (js *JobSyncer) Wait(timeout time.Duration) error { + done := make(chan struct{}) -func (js *JobSyncer) AddJob(job GenericJob) { - js.queueLock.Lock() - - // check if job is already in queue - for _, j := range js.jobQueue { - if j.GetID() == job.GetID() { - js.queueLock.Unlock() - return - } - } - - js.jobQueue = append(js.jobQueue, job) - js.queueLock.Unlock() + go func() { + js.running.Wait() + close(done) + }() - js.jobSaveProgress.Add(1) - - // Non-blocking notify select { - case js.notifyChannel <- struct{}{}: - default: + case <-done: + // Die WaitGroup ist fertig + return nil + case <-time.After(timeout): + return ErrTimeoutReached } } -func (js *JobSyncer) processJob(job GenericJob) { - defer js.jobSaveProgress.Done() - err := createOrUpdateJob(job, js.manager.database) - Error("Error while creating or updating job", "error", err) -} - -func (js *JobSyncer) Stop() error { +func (js *JobSyncer) Sync(job GenericJob) { js.mu.Lock() - if js.status != JobSyncerStatusRunning { - js.mu.Unlock() - return ErrJobSyncerNotRunning - } - js.status = JobSyncerStatusStopped - js.mu.Unlock() + defer js.mu.Unlock() - close(js.stopChan) - js.jobSaveProgress.Wait() + js.running.Add(1) + go func() { + defer js.running.Done() + err := saveJob(job, js.manager.GetDB()) + if err != nil { + Error("Error while creating or updating job", err) + } + }() - return nil } -func (js *JobSyncer) cleanup() { +func (js *JobSyncer) LastError() error { js.mu.Lock() defer js.mu.Unlock() + return js.lastError +} - js.jobSaveProgress.Wait() - js.status = JobSyncerStatusStopped +// Migrated returns if the database has been migrated. +// +// No parameters. +// Returns a boolean. +func (js *JobSyncer) Migrated() bool { + js.mu.Lock() + defer js.mu.Unlock() + return js.migrated } diff --git a/job-syncer_test.go b/job-syncer_test.go index c0453f3..b679296 100644 --- a/job-syncer_test.go +++ b/job-syncer_test.go @@ -24,31 +24,16 @@ func TestSaveJobWithSQLite(t *testing.T) { //saver := NewJobSyncer(manager) // Starte den DBSaver - p := CreateAndStartJobSyncer(manager) - - ready := make(chan struct{}) - var saver *JobSyncer - - Then[*JobSyncer, *JobSyncer](p, func(value *JobSyncer) (*JobSyncer, error) { - saver = value - close(ready) - return value, nil - }, func(e error) error { - close(ready) - Error("Error while starting db saver", "error", err) - return nil - }) - - <-ready + saver := NewJobSyncer(manager) jobID := JobID("testJob") job := NewJob[CounterResult](jobID, &CounterRunnable{}) - saver.AddJob(job) + saver.Sync(job) time.Sleep(100 * time.Millisecond) - saver.Stop() + manager.WaitSync() var count int64 gormDB.Model(&JobPersistence{}).Count(&count) diff --git a/logger.go b/logger.go index 241b6b1..9fe6a9a 100644 --- a/logger.go +++ b/logger.go @@ -1,3 +1,6 @@ +// Copyright 2024 schukai GmbH +// SPDX-License-Identifier: AGPL-3.0 + package jobqueue import ( @@ -21,19 +24,19 @@ type DefaultLogger struct { } func (l *DefaultLogger) Info(msg string, keysAndValues ...interface{}) { - _ = l.Output(2, "INFO: "+msg) + l.Print("INFO: "+msg, keysAndValues) } func (l *DefaultLogger) Error(msg string, keysAndValues ...interface{}) { - _ = l.Output(2, "ERROR: "+msg) + l.Print("ERROR: "+msg, keysAndValues) } func (l *DefaultLogger) Warn(msg string, keysAndValues ...interface{}) { - _ = l.Output(2, "WARN: "+msg) + l.Print("WARN: "+msg, keysAndValues) } func (l *DefaultLogger) Trace(msg string, keysAndValues ...interface{}) { - _ = l.Output(2, "DEBUG: "+msg) + l.Print("DEBUG: "+msg, keysAndValues) } // Ensure DefaultLogger satisfies the Logger interface. diff --git a/logger_test.go b/logger_test.go index 901ba8c..e014afb 100644 --- a/logger_test.go +++ b/logger_test.go @@ -1,3 +1,6 @@ +// Copyright 2024 schukai GmbH +// SPDX-License-Identifier: AGPL-3.0 + package jobqueue import ( diff --git a/manager.go b/manager.go index 848458b..6270407 100644 --- a/manager.go +++ b/manager.go @@ -153,7 +153,6 @@ func (m *Manager) DeleteJob(id JobID) error { } if m.jobSyncer != nil { - err := m.jobSyncer.DeleteJob(job) if err != nil { return err @@ -164,6 +163,23 @@ func (m *Manager) DeleteJob(id JobID) error { } +func (m *Manager) WaitSync() { + m.mu.Lock() + defer m.mu.Unlock() + if m.jobSyncer != nil { + _ = m.jobSyncer.Wait(2 * time.Second) + } +} + +func (m *Manager) Sync(job GenericJob) { + m.mu.Lock() + defer m.mu.Unlock() + + if m.jobSyncer != nil { + m.jobSyncer.Sync(job) + } +} + // RemoveJob removes a job from the active jobs // If you want to remove a job from the active jobs and the database, use DeleteJob instead func (m *Manager) RemoveJob(id JobID) error { @@ -289,12 +305,6 @@ func (m *Manager) SetDB(db *gorm.DB) *Manager { m.mu.Lock() defer m.mu.Unlock() m.database = db - - if m.jobSyncer != nil { - return m - } - m.jobSyncer = NewJobSyncer(m) - return m } @@ -397,31 +407,8 @@ func (m *Manager) Start() error { return ErrManagerAlreadyRunning } - if m.jobSyncer != nil { - p := CreateAndStartJobSyncer(m) - - ready := make(chan struct{}) - - var jobSyncerErr error - - Then[*JobSyncer, *JobSyncer](p, func(value *JobSyncer) (*JobSyncer, error) { - close(ready) - m.mu.Lock() - m.jobSyncer = value - m.mu.Unlock() - return value, nil - }, func(e error) error { - close(ready) - Error("Error while starting db saver", "error", e) - jobSyncerErr = e - return nil - }) - - <-ready - - if jobSyncerErr != nil { - return jobSyncerErr - } + if m.jobSyncer == nil { + m.jobSyncer = NewJobSyncer(m) } if len(m.workerMap) == 0 { @@ -470,6 +457,18 @@ func (m *Manager) Start() error { } +func safeClose(ch chan interface{}) (err error) { + + defer func() { + if recover() != nil { + err = ErrChannelAlreadyClosed + } + }() + + close(ch) + return +} + // Stop stops the manager func (m *Manager) Stop() error { m.mu.Lock() @@ -482,6 +481,7 @@ func (m *Manager) Stop() error { m.eventBus.Unsubscribe(QueueJob, m.jobEventCh) m.eventBus.Unsubscribe(JobReady, m.jobEventCh) + m.eventBus.Shutdown() _ = safeClose(m.jobEventCh) var wrappedErr error @@ -508,15 +508,7 @@ func (m *Manager) Stop() error { } if m.jobSyncer != nil { - err = m.jobSyncer.Stop() - if err != nil { - if wrappedErr == nil { - wrappedErr = fmt.Errorf("Error: ") - } - - wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error()) - - } + _ = m.jobSyncer.Wait(10 * time.Second) } return wrappedErr @@ -530,6 +522,8 @@ func (m *Manager) SetLogger(logger Logger) *Manager { m.database = m.database.Session(&gorm.Session{Logger: newGormAdapter()}) } + SetLogger(logger) + return m } @@ -580,7 +574,7 @@ func (m *Manager) ScheduleJob(job GenericJob, scheduler Scheduler) error { m.activeJobs[job.GetID()] = job if m.jobSyncer != nil { - m.jobSyncer.AddJob(job) + m.jobSyncer.Sync(job) } return nil diff --git a/manager_test.go b/manager_test.go index 5835673..525891c 100644 --- a/manager_test.go +++ b/manager_test.go @@ -253,11 +253,14 @@ func TestManagerEventHandling(t *testing.T) { startTime := time.Now() for { - if job.runner.(*CounterRunnable).GetCount() > 10 { + currentCount := job.runner.(*CounterRunnable).GetCount() + if currentCount > 10 { break } - if time.Since(startTime) > 10*time.Second { + time.Sleep(2 * time.Millisecond) + + if time.Since(startTime) > 1*time.Second { t.Fatalf("Job did not finish in time") } } diff --git a/promise.go b/promise.go index 704f5c4..954c9de 100644 --- a/promise.go +++ b/promise.go @@ -1,3 +1,6 @@ +// Copyright 2024 schukai GmbH +// SPDX-License-Identifier: AGPL-3.0 + package jobqueue import ( diff --git a/promise_test.go b/promise_test.go index ba3499f..513adbd 100644 --- a/promise_test.go +++ b/promise_test.go @@ -1,3 +1,6 @@ +// Copyright 2024 schukai GmbH +// SPDX-License-Identifier: AGPL-3.0 + package jobqueue import ( diff --git a/util.go b/util.go index 28567b5..05803bd 100644 --- a/util.go +++ b/util.go @@ -2,17 +2,3 @@ // SPDX-License-Identifier: AGPL-3.0 package jobqueue - -// safeClose closes the given channel and returns an error if the channel is already closed -func safeClose(ch chan interface{}) (err error) { - defer func() { - if recover() != nil { - err = ErrChannelAlreadyClosed - } - }() - - err = nil - - close(ch) - return -} diff --git a/worker.go b/worker.go index 836a474..c0a70de 100644 --- a/worker.go +++ b/worker.go @@ -237,16 +237,15 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel cancel() - if w.manager != nil { - go func() { - w.manager.mu.Lock() - if w.manager.jobSyncer != nil { - w.manager.jobSyncer.AddJob(job) - } - w.manager.mu.Unlock() - }() - } - + go func() { + w.mu.Lock() + defer w.mu.Unlock() + if w.manager != nil { + w.manager.Sync(job) + } + + }() + w.statisticMu.Lock() w.statistic.ActiveThreads-- w.statisticMu.Unlock() -- GitLab