diff --git a/.idea/.gitignore b/.idea/.gitignore index 13566b81b018ad684f3a35fee301741b2734c8f4..a9d7db9c0a81b2db47ca92e4e180b30090b27632 100644 --- a/.idea/.gitignore +++ b/.idea/.gitignore @@ -6,3 +6,5 @@ # Datasource local storage ignored files /dataSources/ /dataSources.local.xml +# GitHub Copilot persisted chat sessions +/copilot/chatSessions diff --git a/README.md b/README.md index 6a60c3775d459fd79ea47fa81db862130aa8f734..6751e0163db375384ff8dd9d44ad0850a9a39ce5 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ The library also provides a `Cron` instance for scheduling jobs. ### Prerequisites -- Go 1.20+ +- Go 1.22+ ### Installation @@ -51,8 +51,6 @@ func main() { ``` -### Job - diff --git a/database-1_test.go b/database-1_test.go new file mode 100644 index 0000000000000000000000000000000000000000..f21d9938661937f879467d6b94d360020bc51a10 --- /dev/null +++ b/database-1_test.go @@ -0,0 +1,63 @@ +// Copyright 2023 schukai GmbH +// SPDX-License-Identifier: AGPL-3.0 + +//go:build !runOnTask + +package jobqueue + +import ( + "github.com/stretchr/testify/assert" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + "testing" + "time" +) + +func TestWriteToDB1(t *testing.T) { + + gormDB, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{}) + if err != nil { + t.Fatalf("a error occurred while opening the database: %v", err) + } + + manager := &Manager{database: gormDB} + saver := NewDBSaver().SetManager(manager) + + // Starte den DBSaver + p := StartDBSaver(saver) + + ready := make(chan struct{}) + + Then[bool, bool](p, func(value bool) (bool, error) { + close(ready) + return value, nil + }, func(e error) error { + close(ready) + Error("Error while starting db saver", "error", err) + return nil + }) + + <-ready + + runner := &CounterRunnable{} + job := NewJob[CounterResult]("job1", runner) + + scheduler := &TimeScheduler{ + Time: time.Now().Add(1 * time.Second), + jobs: nil, + } + + job.scheduler = scheduler + + err = saver.SaveJob(job) + assert.Nil(t, err) + + err = saver.SaveJob(job) + assert.Nil(t, err) + + time.Sleep(1 * time.Second) + + err = saver.Stop() + assert.Nil(t, err) + +} diff --git a/database-2_test.go b/database-2_test.go new file mode 100644 index 0000000000000000000000000000000000000000..45d84d219d274ab8ac5b00ff3933026216f2e7c7 --- /dev/null +++ b/database-2_test.go @@ -0,0 +1,145 @@ +//go:build !runOnTask + +// Copyright 2023 schukai GmbH +// SPDX-License-Identifier: AGPL-3.0 +package jobqueue + +import ( + "github.com/stretchr/testify/assert" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + "runtime" + "sync" + "testing" + "time" +) + +func TestWriteToDB2(t *testing.T) { + gormDB, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{}) + if err != nil { + t.Fatalf("a error occurred while opening the database: %v", err) + } + + manager := &Manager{database: gormDB} + saver := NewDBSaver().SetManager(manager) + + // Starte den DBSaver + p := StartDBSaver(saver) + + ready := make(chan struct{}) + + Then[bool, bool](p, func(value bool) (bool, error) { + close(ready) + return value, nil + }, func(e error) error { + close(ready) + Error("Error while starting db saver", "error", err) + return nil + }) + + db := gormDB + + <-ready + + var wg sync.WaitGroup + + wg.Add(1) + defer wg.Done() + + mgr := NewManager() + mgr.SetDB(gormDB) + + // run sub tests + wg.Add(1) + defer wg.Done() + + worker := NewLocalWorker(1) + err = mgr.AddWorker(worker) + assert.Nil(t, err) + + err = mgr.Start() + assert.Nil(t, err) + + runner := &CounterRunnable{} + job := NewJob[CounterResult]("job1", runner) + + scheduler := &InstantScheduler{} + err = mgr.ScheduleJob(job, scheduler) + assert.Nil(t, err) + + err = mgr.CancelJobSchedule("job1") + assert.Nil(t, err) + + time.Sleep(1 * time.Second) + + scheduler2 := &InstantScheduler{} + err = mgr.ScheduleJob(job, scheduler2) + assert.Nil(t, err) + + err = mgr.CancelJobSchedule("job1") + assert.Nil(t, err) + + time.Sleep(1 * time.Second) + + scheduler3 := &InstantScheduler{} + err = mgr.ScheduleJob(job, scheduler3) + assert.Nil(t, err) + + time.Sleep(1 * time.Second) + + if mgr.dbSaver == nil { + t.Error("mgr.dbSaver == nil") + return + } + + time.Sleep(1 * time.Second) + + err = mgr.dbSaver.SaveJob(job) + assert.Nil(t, err) + + runtime.Gosched() + time.Sleep(1 * time.Second) + err = mgr.CancelJobSchedule("job1") + assert.Nil(t, err) + + runtime.Gosched() + time.Sleep(1 * time.Second) + + scheduler4 := &InstantScheduler{} + err = mgr.ScheduleJob(job, scheduler4) + assert.Nil(t, err) + + runtime.Gosched() + time.Sleep(1 * time.Second) + + err = mgr.dbSaver.SaveJob(job) + assert.Nil(t, err) + + time.Sleep(2 * time.Second) + err = mgr.CancelJobSchedule("job1") + assert.Nil(t, err) + + tries := 10 + for tries > 0 { + + var tmpJob JobPersistence + + if err := db.First(&tmpJob, "id = ?", "job1").Error; err == nil { + break + } + + tries-- + time.Sleep(1 * time.Second) + + } + + assert.True(t, tries > 0) + + err = LoadJobsAndScheduleFromDatabase(db, mgr) + assert.Nil(t, err) + + time.Sleep(1 * time.Second) + + err = mgr.dbSaver.SaveJob(job) + +} diff --git a/database-3_test.go b/database-3_test.go new file mode 100644 index 0000000000000000000000000000000000000000..b46304d8794e7ecb890c6e8dc461dedf5b81b677 --- /dev/null +++ b/database-3_test.go @@ -0,0 +1,46 @@ +//go:build !runOnTask + +// Copyright 2023 schukai GmbH +// SPDX-License-Identifier: AGPL-3.0 +package jobqueue + +import ( + "github.com/stretchr/testify/assert" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + "testing" + "time" +) + +func TestWriteToDB3(t *testing.T) { + + db, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{}) + if err != nil { + t.Fatalf("a error occurred while opening the database: %v", err) + } + + manager := NewManager() + manager.SetDB(db) + err = manager.AddWorker(NewLocalWorker(1)) + assert.Nil(t, err) + + err = manager.Start() + assert.Nil(t, err) + runner := &CounterRunnable{} + job := NewJob[CounterResult]("job2", runner) + + scheduler := &InstantScheduler{} + err = manager.ScheduleJob(job, scheduler) + assert.Nil(t, err) + + time.Sleep(1 * time.Second) + + err = manager.DeleteJob(job.GetID()) + assert.Nil(t, err) + + // test is job in database + var tmpJob JobPersistence + err = db.First(&tmpJob, "id = ?", "job2").Error + assert.NotNil(t, err) + +} diff --git a/database-4_test.go b/database-4_test.go new file mode 100644 index 0000000000000000000000000000000000000000..18970b5ecc9b8269c7eecf22ab486354d9f66d96 --- /dev/null +++ b/database-4_test.go @@ -0,0 +1,77 @@ +// Copyright 2023 schukai GmbH +// SPDX-License-Identifier: AGPL-3.0 +package jobqueue + +import ( + "github.com/stretchr/testify/assert" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + "testing" + "time" +) + +func TestWriteToDB4(t *testing.T) { + + db, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{}) + if err != nil { + t.Fatalf("a error occurred while opening the database: %v", err) + } + + manager := NewManager() + manager.SetDB(db) + err = manager.AddWorker(NewLocalWorker(1)) + assert.Nil(t, err) + + err = manager.Start() + assert.Nil(t, err) + + runner := &CounterRunnable{} + job := NewJob[CounterResult]("job3", runner) + + job.stats = JobStats{ + JobID: job.GetID(), + RunCount: 20, + SuccessCount: 30, + ErrorCount: 40, + } + + scheduler := &InstantScheduler{} + err = manager.ScheduleJob(job, scheduler) + assert.Nil(t, err) + + time.Sleep(200 * time.Millisecond) + + // check is stats are the values above + var tmpJob JobPersistence + + err = db.Preload("Stats").First(&tmpJob, "id = ?", "job3").Error + assert.Nil(t, err) + + // Validate the fields + assert.Equal(t, JobID("job3"), tmpJob.ID) + assert.Equal(t, 21, tmpJob.Stats.RunCount) // +1 because of the first run + assert.Equal(t, 31, tmpJob.Stats.SuccessCount) // +1 because of the first run + assert.Equal(t, 40, tmpJob.Stats.ErrorCount) + + // reset stats + err = manager.ResetJobStats(job.GetID()) + assert.Nil(t, err) + + time.Sleep(2 * time.Second) + + var tmpJob2 JobPersistence + // check is stats are the values above + err = db.First(&tmpJob2, "id = ?", "job3").Error + err = db.Preload("Stats").First(&tmpJob2, "id = ?", "job3").Error + assert.Nil(t, err) + + // Validate the fields + assert.Equal(t, JobID("job3"), tmpJob2.ID) + assert.Equal(t, 0, tmpJob2.Stats.RunCount) + assert.Equal(t, 0, tmpJob2.Stats.SuccessCount) + assert.Equal(t, 0, tmpJob2.Stats.ErrorCount) + + err = manager.DeleteJob(job.GetID()) + assert.Nil(t, err) + +} diff --git a/database-5_test.go b/database-5_test.go new file mode 100644 index 0000000000000000000000000000000000000000..33bce67b1bf306ee665dbcb07b0b5780988d9e0c --- /dev/null +++ b/database-5_test.go @@ -0,0 +1,49 @@ +// Copyright 2023 schukai GmbH +// SPDX-License-Identifier: AGPL-3.0 +package jobqueue + +import ( + "github.com/stretchr/testify/assert" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + "testing" +) + +func TestWriteToDB5(t *testing.T) { + + db, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{}) + if err != nil { + t.Fatalf("a error occurred while opening the database: %v", err) + } + + manager := NewManager() + manager.SetDB(db) + err = manager.AddWorker(NewLocalWorker(1)) + assert.Nil(t, err) + + err = manager.Start() + assert.Nil(t, err) + + mgr := manager + + runner := &CounterRunnable{} + job := NewJob[CounterResult]("jobSameID", runner) + + scheduler := &InstantScheduler{} + err = mgr.ScheduleJob(job, scheduler) + assert.Nil(t, err) + + sameIDJob := NewJob[CounterResult]("jobSameID", runner) + + // Trying to save a job with the same ID should return an error + err = mgr.dbSaver.SaveJob(sameIDJob) + assert.Nil(t, err) + + err = mgr.CancelJobSchedule("jobSameID") + assert.Nil(t, err) + + err = mgr.dbSaver.Stop() + + assert.Nil(t, err) + +} diff --git a/database-6_test.go b/database-6_test.go new file mode 100644 index 0000000000000000000000000000000000000000..2dd30284a1e174fad63559ffce1f477b93a218e8 --- /dev/null +++ b/database-6_test.go @@ -0,0 +1,67 @@ +//go:build !runOnTask + +// Copyright 2023 schukai GmbH +// SPDX-License-Identifier: AGPL-3.0 +package jobqueue + +import ( + "fmt" + "github.com/stretchr/testify/assert" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + "testing" + "time" +) + +func TestWriteToDB6(t *testing.T) { + + db, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{}) + if err != nil { + t.Fatalf("a error occurred while opening the database: %v", err) + } + + manager := NewManager() + manager.SetDB(db) + err = manager.AddWorker(NewLocalWorker(1)) + assert.Nil(t, err) + + err = manager.Start() + assert.Nil(t, err) + + mgr := manager + + numJobs := 1000 + jobIDs := make([]JobID, numJobs) + + for i := 0; i < numJobs; i++ { + jobID := JobID(fmt.Sprintf("burstJob%d", i)) + jobIDs[i] = jobID + + runner := &CounterRunnable{} + job := NewJob[CounterResult](jobID, runner) + + scheduler := &InstantScheduler{} + err = mgr.ScheduleJob(job, scheduler) + assert.Nil(t, err) + + err = mgr.dbSaver.SaveJob(job) + assert.Nil(t, err) + } + + time.Sleep(10 * time.Second) + + for _, jobID := range jobIDs { + var tmpJob JobPersistence + err = db.First(&tmpJob, "id = ?", jobID).Error + if err != nil { + t.Errorf("Job id %s not found in database: %v", jobID, err) + } + } + + // check if all jobs are in the database + var jobCount int64 + err = db.Model(&JobPersistence{}).Count(&jobCount).Error + assert.Nil(t, err) + assert.Equal(t, int64(numJobs), jobCount) + +} diff --git a/database-logging.go b/database-logging.go new file mode 100644 index 0000000000000000000000000000000000000000..aa2c26de7fab98ecccf268c7a9bfc2577760507f --- /dev/null +++ b/database-logging.go @@ -0,0 +1,52 @@ +package jobqueue + +import ( + "context" + "gorm.io/gorm/logger" + "time" +) + +type GormAdapter struct{} + +func (GormAdapter) LogMode(logger.LogLevel) logger.Interface { + return GormAdapter{} // Noop. Not needed in this case. +} + +func (GormAdapter) Info(ctx context.Context, message string, data ...interface{}) { + Info(message, data...) +} + +func (GormAdapter) Warn(ctx context.Context, message string, data ...interface{}) { + Warn(message, data...) +} + +func (GormAdapter) Error(ctx context.Context, message string, data ...interface{}) { + Error(message, data...) +} + +func (GormAdapter) Trace(ctx context.Context, begin time.Time, fc func() (string, int64), err error) { + elapsed := time.Since(begin) + sql, rowsAffected := fc() + + if err != nil { + Error("gorm trace", + "elapsed", elapsed, + "sql", sql, + "rows", rowsAffected, + "err", err, + ) + return + } + + Trace("gorm trace", + "elapsed", elapsed, + "sql", sql, + "rows", rowsAffected, + ) +} + +var _ logger.Interface = (*GormAdapter)(nil) + +func newGormAdapter() *GormAdapter { + return &GormAdapter{} +} diff --git a/database-logging_test.go b/database-logging_test.go new file mode 100644 index 0000000000000000000000000000000000000000..f625c1fc6af4f5672e403adb36aa13388250f39e --- /dev/null +++ b/database-logging_test.go @@ -0,0 +1,13 @@ +package jobqueue + +import ( + "testing" +) + +func TestDBLogger(t *testing.T) { + mockLogger := newGormAdapter() + if mockLogger == nil { + t.Error("mockLogger == nil") + return + } +} diff --git a/database.go b/database.go index 17a247af08429852138beba586e18f053087ebc4..58781fb6c5d9f6abcf679ed03d63e3ff2e70d26a 100644 --- a/database.go +++ b/database.go @@ -4,9 +4,12 @@ package jobqueue import ( + "context" "errors" "gorm.io/gorm" + "math/rand" "sync" + "time" ) type DBSaverStatus int @@ -17,12 +20,13 @@ const ( ) type DBSaver struct { - saveChannel chan GenericJob - stopChan chan struct{} - migrateFlag bool - manager *Manager - status DBSaverStatus - mu sync.Mutex + saveChannel chan GenericJob + stopChan chan struct{} + migrateFlag bool + manager *Manager + status DBSaverStatus + mu sync.Mutex + jobSaveProgress sync.WaitGroup } type RunnerData string @@ -31,7 +35,7 @@ type SchedulerData string // NewDBSaver creates a new DBSaver func NewDBSaver() *DBSaver { return &DBSaver{ - saveChannel: make(chan GenericJob, 100), + saveChannel: make(chan GenericJob, 1000), stopChan: make(chan struct{}), } } @@ -65,191 +69,250 @@ func (s *DBSaver) isStatus(status DBSaverStatus, lock bool) bool { } -// Start starts the DBSaver -func (s *DBSaver) Start() error { +func StartDBSaver[P *Promise[bool]](s *DBSaver) *Promise[bool] { s.mu.Lock() defer s.mu.Unlock() - if s.manager == nil || s.manager.database == nil { - return ErrNoDatabaseConnection - } + return NewPromise[bool](func(resolve func(bool), reject func(error)) { - if s.isStatus(DBSaverStatusRunning, false) { - return nil - } + if s.manager == nil || s.manager.database == nil { + reject(ErrNoDatabaseConnection) + return + } - db := s.manager.database + if s.isStatus(DBSaverStatusRunning, false) { + resolve(true) + return + } - if !s.migrateFlag { - err := db.AutoMigrate(&JobPersistence{}, &JobLog{}, &JobStats{}) - if err != nil { - return err + db := s.manager.database + if !s.migrateFlag { + err := db.AutoMigrate(&JobPersistence{}, &JobLog{}, &JobStats{}) + if err != nil { + reject(err) + return + } + s.migrateFlag = true + } + + ready := make(chan struct{}) + go runSaver(s, db, ready) + + <-ready + resolve(true) + }) +} + +// +//// Start starts the DBSaver +//func (s *DBSaver) Start() error { +// s.mu.Lock() +// defer s.mu.Unlock() +// +// if s.manager == nil || s.manager.database == nil { +// return ErrNoDatabaseConnection +// } +// +// if s.isStatus(DBSaverStatusRunning, false) { +// return nil +// } +// +// db := s.manager.database +// +// if !s.migrateFlag { +// err := db.AutoMigrate(&JobPersistence{}, &JobLog{}, &JobStats{}) +// if err != nil { +// return err +// } +// s.migrateFlag = true +// } +// +// go runSaver(s, db) +// return nil +//} + +func runSaver(s *DBSaver, db *gorm.DB, ready chan struct{}) { + s.setStatus(DBSaverStatusRunning) + + defer func() { + // this runs after the function returns + // and needs to be protected by the lock + // of the setStatus method + //s.status = DBSaverStatusStopped + s.setStatus(DBSaverStatusStopped) + }() + + close(ready) + for { + + select { + case job := <-s.saveChannel: + s.jobSaveProgress.Add(1) + + err := CreateOrUpdateJob(job, db) + + if err != nil { + Error("Error while saving job", "error", err) + } + + s.jobSaveProgress.Done() + + case <-s.stopChan: + return } - s.migrateFlag = true } +} - var wg sync.WaitGroup - wg.Add(1) +func CreateOrUpdateJob(job GenericJob, db *gorm.DB) error { - go func() { - wg.Done() - - // this is protected by the lock above - s.status = DBSaverStatusRunning - - defer func() { - // this runs after the function returns - // and needs to be protected by the lock - // of the setStatus method - s.status = DBSaverStatusStopped - //s.setStatus(DBSaverStatusStopped) - }() - - for { - select { - case job := <-s.saveChannel: - - err := db.Transaction(func(tx *gorm.DB) error { - - permJob := job.GetPersistence() - - memLogs := permJob.Logs - permJob.Logs = nil - - var existingJob JobPersistence - result := tx.Unscoped().Where("id = ?", permJob.GetID()).First(&existingJob) - - if result.Error != nil { - if errors.Is(result.Error, gorm.ErrRecordNotFound) { - err := tx.Create(&permJob).Error - if err != nil { - return err - } - } else { - return result.Error - } - } else { - - db.Unscoped().Model(&existingJob).Update("deleted_at", nil) - - tx.Model(&existingJob.Scheduler).Select( - []string{ - "type", - "interval", - "spec", - "delay", - "event", - "time", - "executed", - }).UpdateColumns(SchedulerPersistence{ - Type: "", - Interval: 0, - Spec: "", - Delay: 0, - Event: "", - Time: nil, - Executed: false, - }) - - err := tx.Model(&existingJob).Updates(permJob).Error - if err != nil { - return err - } - } - - tx.Model(&permJob.Stats). - Select( - []string{ - "run_count", - "success_count", - "error_count", - "time_metrics_avg_run_time", - "time_metrics_max_run_time", - "time_metrics_min_run_time", - "time_metrics_total_run_time", - }, - ). - UpdateColumns(permJob.Stats) - - for i, _ := range memLogs { - memLogs[i].LogID = 0 - _ = tx.Create(&memLogs[i]) - // no error handling, if it fails, it fails - } - - return nil // Commit the transaction - }) + return db.Transaction(func(tx *gorm.DB) error { + permJob := job.GetPersistence() + + memLogs := permJob.Logs + permJob.Logs = nil + + var existingJob JobPersistence + result := tx.Unscoped().Where("id = ?", permJob.GetID()).First(&existingJob) + + if result.Error != nil { + if errors.Is(result.Error, gorm.ErrRecordNotFound) { + err := tx.Create(&permJob).Error if err != nil { - s.logError("Error while saving job", "error", err) + Trace("Error while creating job", "error", err) + return err } - - case <-s.stopChan: - return + } else { + Trace("Error while creating job", "error", result.Error) + return result.Error + } + } else { + + db.Unscoped().Model(&existingJob).Update("deleted_at", nil) + + tx.Model(&existingJob.Scheduler).Select( + []string{ + "type", + "interval", + "spec", + "delay", + "event", + "time", + "executed", + }).UpdateColumns(SchedulerPersistence{ + Type: "", + Interval: 0, + Spec: "", + Delay: 0, + Event: "", + Time: nil, + Executed: false, + }) + + err := tx.Model(&existingJob).Updates(permJob).Error + if err != nil { + return err } } - }() - wg.Wait() + tx.Model(&permJob.Stats). + Select( + []string{ + "run_count", + "success_count", + "error_count", + "time_metrics_avg_run_time", + "time_metrics_max_run_time", + "time_metrics_min_run_time", + "time_metrics_total_run_time", + }, + ). + UpdateColumns(permJob.Stats) + + if tx.Error != nil { + Error("Error while updating job stats", "error", tx.Error) + return result.Error + } - return nil -} + for i, _ := range memLogs { + memLogs[i].LogID = 0 + _ = tx.Create(&memLogs[i]) + // no error handling, if it fails, it fails + } -// logError logs an error -func (s *DBSaver) logError(msg string, keysAndValues ...interface{}) { - s.mu.Lock() - defer s.mu.Unlock() + return nil - if s.manager == nil || s.manager.logger == nil { - return - } + }) - s.manager.logger.Error(msg, keysAndValues...) } // Stop stops the DBSaver -func (s *DBSaver) Stop() *DBSaver { +func (s *DBSaver) Stop() error { s.mu.Lock() defer s.mu.Unlock() - select { - case s.stopChan <- struct{}{}: - default: - s.logError("DBSaver stop channel is full") + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + go func() { + s.stopChan <- struct{}{} + s.jobSaveProgress.Wait() + cancel() + }() + + <-ctx.Done() + + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + Error("DBSaver did not stop in time") + return ctx.Err() } - return s + s.status = DBSaverStatusStopped + return nil +} + +func exponentialBackoff(retry int) time.Duration { + waitTime := 100 * time.Millisecond + for i := 0; i < retry; i++ { + waitTime *= 2 + waitTime += time.Duration(rand.Int63n(int64(waitTime))) // #nosec G404 + } + return waitTime } // SaveJob saves a job to the database func (s *DBSaver) SaveJob(job GenericJob) error { s.mu.Lock() - defer s.mu.Unlock() + defer func() { + if r := recover(); r != nil { + Error("Error while saving job", "error", r) + } + s.mu.Unlock() + }() if s.saveChannel == nil { return ErrDBSaverNotInitialized } - if s.status != DBSaverStatusRunning { return ErrDBSaverNotRunning } - defer func() { - if r := recover(); r != nil { - s.logError("Error while saving job", "error", r) + maxRetries := 5 + + for retries := maxRetries; retries > 0; retries-- { + select { + case s.saveChannel <- job: + return nil + default: + Error("DBSaver channel is full, dropping safe for job with ID", "job_id", job.GetID()) + backoff := exponentialBackoff(maxRetries - retries) + Trace("DBSaver channel is full, retrying in", "backoff", backoff) + time.Sleep(backoff) } - }() - - select { - case s.saveChannel <- job: - default: - // if the channel is full, we just drop the job - // this is not ideal, but better than blocking - // the job queue - s.logError("DBSaver channel is full, dropping job with ID", "job_id", job.GetID()) } - return nil + return errors.New("failed to save job after multiple attempts") + } func checkRunningSaver(s *DBSaver) (*gorm.DB, error) { @@ -280,7 +343,9 @@ func (s *DBSaver) DeleteJob(job GenericJob) error { return err } + s.jobSaveProgress.Add(1) return db.Transaction(func(tx *gorm.DB) error { + defer s.jobSaveProgress.Done() permJob := job.GetPersistence() dbErr := tx.Where("job_id = ?", permJob.GetID()).Delete(&JobLog{}).Error @@ -313,7 +378,9 @@ func (s *DBSaver) ResetLogs(job GenericJob) error { return err } + s.jobSaveProgress.Add(1) return db.Transaction(func(tx *gorm.DB) error { + defer s.jobSaveProgress.Done() permJob := job.GetPersistence() // unscoped because we want to delete the logs finally @@ -340,7 +407,7 @@ func (s *DBSaver) ResetStats(job GenericJob) error { defer func() { if r := recover(); r != nil { - s.logError("Error while saving job", "error", r) + Error("Error while saving job", "error", r) } }() @@ -349,7 +416,7 @@ func (s *DBSaver) ResetStats(job GenericJob) error { select { case s.saveChannel <- job: default: - s.logError("DBSaver channel is full, dropping job with ID", "job_id", job.GetID()) + Error("DBSaver channel is full, dropping job with ID", "job_id", job.GetID()) } return nil diff --git a/database_test.go b/database_test.go index 1c969a9989df15130c94e74bc9257c07e7086683..b9f3b73e2d0262a1b79dc3d696ca5cd60ad88d33 100644 --- a/database_test.go +++ b/database_test.go @@ -1,472 +1,148 @@ -// Copyright 2023 schukai GmbH -// SPDX-License-Identifier: AGPL-3.0 - //go:build !runOnTask +// Copyright 2023 schukai GmbH +// SPDX-License-Identifier: AGPL-3.0 package jobqueue import ( - "context" - "fmt" - "github.com/docker/docker/api/types" - "github.com/docker/docker/api/types/container" - "github.com/docker/docker/client" - "github.com/docker/go-connections/nat" - "github.com/stretchr/testify/assert" - "gorm.io/driver/mysql" "gorm.io/gorm" - "gorm.io/gorm/logger" - "log" - "net" - "os" - "runtime" - "strconv" - "sync" "testing" "time" -) - -func startTestMySQLDockerImageAndContainer(t *testing.T, port string, ctx context.Context) error { - t.Helper() - - cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) - if err != nil { - return err - } - - imageName := "mysql:8" - - reader, err := cli.ImagePull(ctx, imageName, types.ImagePullOptions{}) - if err != nil { - return err - } - - // if debug image pull, comment out the following lines - //_, _ = io.Copy(os.Stdout, reader) - _ = reader - hostConfig := &container.HostConfig{ - PortBindings: nat.PortMap{ - "3306/tcp": []nat.PortBinding{ - { - HostIP: DOCKER_TEST_HOST_IP, - HostPort: port, - }, - }, - }, - } - - resp, err := cli.ContainerCreate(ctx, &container.Config{ - Image: imageName, - Env: []string{ - "MYSQL_ROOT_PASSWORD=secret", - "MYSQL_USER=user", - "MYSQL_PASSWORD=secret", - "MYSQL_DATABASE=test", - }, - }, hostConfig, nil, nil, "") + "github.com/stretchr/testify/assert" + "gorm.io/driver/sqlite" +) +// +//func startTestMySQLDockerImageAndContainer(t *testing.T, port string, ctx context.Context) error { +// t.Helper() +// +// cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) +// if err != nil { +// return err +// } +// +// imageName := "mysql:8" +// +// reader, err := cli.ImagePull(ctx, imageName, types.ImagePullOptions{}) +// if err != nil { +// return err +// } +// +// // if debug image pull, comment out the following lines +// //_, _ = io.Copy(os.Stdout, reader) +// _ = reader +// +// hostConfig := &container.HostConfig{ +// PortBindings: nat.PortMap{ +// "3306/tcp": []nat.PortBinding{ +// { +// HostIP: DOCKER_TEST_HOST_IP, +// HostPort: port, +// }, +// }, +// }, +// } +// +// resp, err := cli.ContainerCreate(ctx, &container.Config{ +// Image: imageName, +// Env: []string{ +// "MYSQL_ROOT_PASSWORD=secret", +// "MYSQL_USER=user", +// "MYSQL_PASSWORD=secret", +// "MYSQL_DATABASE=test", +// }, +// }, hostConfig, nil, nil, "") +// +// if err != nil { +// return err +// } +// +// if err := cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil { +// return err +// } +// +// go func() { +// <-ctx.Done() +// +// timeout := 0 +// stopOptions := container.StopOptions{ +// Timeout: &timeout, +// Signal: "SIGKILL", +// } +// newCtx, _ := context.WithTimeout(context.Background(), 60*time.Second) +// if err := cli.ContainerStop(newCtx, resp.ID, stopOptions); err != nil { +// t.Errorf("ContainerStop returned error: %v", err) +// } +// if err := cli.ContainerRemove(newCtx, resp.ID, types.ContainerRemoveOptions{ +// Force: true, +// }); err != nil { +// t.Errorf("ContainerRemove returned error: %v", err) +// } +// +// }() +// +// statusCh, errCh := cli.ContainerWait(ctx, resp.ID, container.WaitConditionNotRunning) +// select { +// case err := <-errCh: +// if err != nil { +// // empty error means container exited normally (see container_wait.go) +// if err.Error() == "" { +// return nil +// } +// +// return err +// } +// case <-statusCh: +// +// } +// +// return nil +//} + +func TestSaveJobWithSQLite(t *testing.T) { + + gormDB, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{}) if err != nil { - return err - } - - if err := cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil { - return err - } - - go func() { - <-ctx.Done() - - timeout := 0 - stopOptions := container.StopOptions{ - Timeout: &timeout, - Signal: "SIGKILL", - } - newCtx, _ := context.WithTimeout(context.Background(), 60*time.Second) - if err := cli.ContainerStop(newCtx, resp.ID, stopOptions); err != nil { - t.Errorf("ContainerStop returned error: %v", err) - } - if err := cli.ContainerRemove(newCtx, resp.ID, types.ContainerRemoveOptions{ - Force: true, - }); err != nil { - t.Errorf("ContainerRemove returned error: %v", err) - } - - }() - - statusCh, errCh := cli.ContainerWait(ctx, resp.ID, container.WaitConditionNotRunning) - select { - case err := <-errCh: - if err != nil { - // empty error means container exited normally (see container_wait.go) - if err.Error() == "" { - return nil - } - - return err - } - case <-statusCh: - + t.Fatalf("a error occurred while opening the database: %v", err) } - return nil -} - -func TestWriteToDB(t *testing.T) { + manager := &Manager{database: gormDB} + saver := NewDBSaver().SetManager(manager) - // if true, logging and port 3306 is used - useMySQLPort := os.Getenv("MYSQL_PORT") - //useMySQLPort = "3306" - printLogging := os.Getenv("MYSQL_LOGGING") - printLogging = "true" + // Starte den DBSaver + p := StartDBSaver(saver) - var err error + ready := make(chan struct{}) - ctb := context.Background() - ctx, cancel := context.WithCancel(ctb) - t.Cleanup(func() { - cancel() - time.Sleep(1 * time.Second) + Then[bool, bool](p, func(value bool) (bool, error) { + close(ready) + return value, nil + }, func(e error) error { + close(ready) + Error("Error while starting db saver", "error", err) + return nil }) - listener, err := net.Listen("tcp", DOCKER_TEST_HOST_IP+":0") - if err != nil { - t.Errorf("Unexpected error: %v", err) - return - } - portAsInt := listener.Addr().(*net.TCPAddr).Port - portAsString := fmt.Sprintf("%d", portAsInt) - _ = listener.Close() - - if useMySQLPort != "" { - portAsString = useMySQLPort - i, _ := strconv.Atoi(portAsString) - - portAsInt = i - } - - done := make(chan bool) - go func() { - err = startTestMySQLDockerImageAndContainer(t, portAsString, ctx) - if err != nil { - t.Errorf("Unexpected error: %v", err) - cancel() - } - done <- true - }() - - waitCtx, waitCancel := context.WithTimeout(ctx, 60*time.Second) - defer waitCancel() - for { - conn, err := net.DialTimeout("tcp", net.JoinHostPort(DOCKER_TEST_HOST_IP, portAsString), 1*time.Second) - if err == nil { - err = conn.Close() - assert.Nil(t, err) - break - } - select { - case <-waitCtx.Done(): - t.Error("Timeout waiting for container service") - cancel() - return - default: - time.Sleep(1 * time.Second) - } - } - - dsn := "user:secret@tcp(" + net.JoinHostPort(DOCKER_TEST_HOST_IP, portAsString) + ")/test?charset=utf8mb4&parseTime=True&loc=Local" + <-ready - counter := 0 - var db *gorm.DB + jobID := JobID("testJob") + job := NewJob[CounterResult](jobID, &CounterRunnable{}) - time.Sleep(20 * time.Second) + err = saver.SaveJob(job) + assert.NoError(t, err) - var dbLogger logger.Interface + time.Sleep(100 * time.Millisecond) - if printLogging == "true" { - dbLogger = logger.New( - log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer - logger.Config{ - SlowThreshold: time.Second, // Slow SQL threshold - LogLevel: logger.Info, // Log level - Colorful: false, // Disable color - }, - ) - } else { + saver.Stop() - dbLogger = logger.Default.LogMode(logger.Silent) - } - - for counter < 20 { - - db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{ - Logger: dbLogger, - }) - - if err == nil { - break - } + var count int64 + gormDB.Model(&JobPersistence{}).Count(&count) + assert.Equal(t, int64(1), count, "It should be 1 job in the database") - counter++ - time.Sleep(1 * time.Second) - } - - if err != nil { - t.Errorf("Unexpected error: %v", err) - return - } + // get job from database + var jobFromDB JobPersistence + gormDB.First(&jobFromDB, "id = ?", jobID) + assert.Equal(t, jobID, jobFromDB.ID, "JobID should be the same") - var wg sync.WaitGroup - - // run sub tests - - t.Run("TestUpdateToDB", func(t *testing.T) { - wg.Add(1) - defer wg.Done() - - mgr := NewManager() - mgr.SetDB(db) - //worker := NewLocalWorker(1) - //err := mgr.AddWorker(worker) - //assert.Nil(t, err) - // - //err = mgr.Start() - //assert.Nil(t, err) - - dbSaver := NewDBSaver() - dbSaver.SetManager(mgr) - - err = dbSaver.Start() - assert.Nil(t, err) - - runner := &CounterRunnable{} - job := NewJob[CounterResult]("job1", runner) - - scheduler := &TimeScheduler{ - Time: time.Now().Add(1 * time.Second), - jobs: nil, - } - - job.scheduler = scheduler - - err = dbSaver.SaveJob(job) - assert.Nil(t, err) - - //err = mgr.ScheduleJob(job, scheduler) - //assert.Nil(t, err) - - err := dbSaver.SaveJob(job) - assert.Nil(t, err) - - time.Sleep(1 * time.Second) - - dbSaver.Stop() - - }) - - // run sub tests - - t.Run("TestAddToDB", func(t *testing.T) { - wg.Add(1) - defer wg.Done() - - mgr := NewManager() - mgr.SetDB(db) - worker := NewLocalWorker(1) - err := mgr.AddWorker(worker) - assert.Nil(t, err) - - err = mgr.Start() - assert.Nil(t, err) - - runner := &CounterRunnable{} - job := NewJob[CounterResult]("job1", runner) - - scheduler := &InstantScheduler{} - err = mgr.ScheduleJob(job, scheduler) - assert.Nil(t, err) - - err = mgr.CancelJobSchedule("job1") - assert.Nil(t, err) - - time.Sleep(1 * time.Second) - - scheduler2 := &InstantScheduler{} - err = mgr.ScheduleJob(job, scheduler2) - assert.Nil(t, err) - - err = mgr.CancelJobSchedule("job1") - assert.Nil(t, err) - - time.Sleep(1 * time.Second) - - scheduler3 := &InstantScheduler{} - err = mgr.ScheduleJob(job, scheduler3) - assert.Nil(t, err) - - time.Sleep(1 * time.Second) - - if mgr.dbSaver == nil { - t.Error("mgr.dbSaver == nil") - return - } - - time.Sleep(1 * time.Second) - - err = mgr.dbSaver.SaveJob(job) - assert.Nil(t, err) - - runtime.Gosched() - time.Sleep(1 * time.Second) - err = mgr.CancelJobSchedule("job1") - assert.Nil(t, err) - - runtime.Gosched() - time.Sleep(1 * time.Second) - - scheduler4 := &InstantScheduler{} - err = mgr.ScheduleJob(job, scheduler4) - assert.Nil(t, err) - - runtime.Gosched() - time.Sleep(1 * time.Second) - - err = mgr.dbSaver.SaveJob(job) - assert.Nil(t, err) - - time.Sleep(2 * time.Second) - err = mgr.CancelJobSchedule("job1") - assert.Nil(t, err) - - tries := 10 - for tries > 0 { - - var tmpJob JobPersistence - - if err := db.First(&tmpJob, "id = ?", "job1").Error; err == nil { - break - } - - tries-- - time.Sleep(1 * time.Second) - - } - - assert.True(t, tries > 0) - - err = LoadJobsAndScheduleFromDatabase(db, mgr) - assert.Nil(t, err) - - time.Sleep(1 * time.Second) - - err = mgr.dbSaver.SaveJob(job) - assert.Nil(t, err) - time.Sleep(1 * time.Second) - - }) - - t.Run("TestDeleteJob", func(t *testing.T) { - wg.Add(1) - defer wg.Done() - - mgr := NewManager() - mgr.SetDB(db) - worker := NewLocalWorker(1) - err := mgr.AddWorker(worker) - assert.Nil(t, err) - - err = mgr.Start() - assert.Nil(t, err) - - runner := &CounterRunnable{} - job := NewJob[CounterResult]("job2", runner) - - scheduler := &InstantScheduler{} - err = mgr.ScheduleJob(job, scheduler) - assert.Nil(t, err) - - time.Sleep(1 * time.Second) - err = mgr.DeleteJob(job.GetID()) - assert.Nil(t, err) - - // test is job in database - var tmpJob JobPersistence - err = db.First(&tmpJob, "id = ?", "job2").Error - assert.NotNil(t, err) - - }) - - wg.Add(1) - t.Run("ResetStats", func(t *testing.T) { - defer wg.Done() - - mgr := NewManager() - mgr.SetDB(db) - worker := NewLocalWorker(1) - err := mgr.AddWorker(worker) - assert.Nil(t, err) - - err = mgr.Start() - assert.Nil(t, err) - - runner := &CounterRunnable{} - job := NewJob[CounterResult]("job3", runner) - - job.stats = JobStats{ - JobID: job.GetID(), - RunCount: 20, - SuccessCount: 30, - ErrorCount: 40, - } - - scheduler := &InstantScheduler{} - err = mgr.ScheduleJob(job, scheduler) - assert.Nil(t, err) - - time.Sleep(200 * time.Millisecond) - - // check is stats are the values above - var tmpJob JobPersistence - - err = db.Preload("Stats").First(&tmpJob, "id = ?", "job3").Error - assert.Nil(t, err) - - // Validate the fields - assert.Equal(t, JobID("job3"), tmpJob.ID) - assert.Equal(t, 21, tmpJob.Stats.RunCount) // +1 because of the first run - assert.Equal(t, 31, tmpJob.Stats.SuccessCount) // +1 because of the first run - assert.Equal(t, 40, tmpJob.Stats.ErrorCount) - - // reset stats - err = mgr.ResetJobStats(job.GetID()) - assert.Nil(t, err) - - time.Sleep(2 * time.Second) - - var tmpJob2 JobPersistence - // check is stats are the values above - err = db.First(&tmpJob2, "id = ?", "job3").Error - err = db.Preload("Stats").First(&tmpJob2, "id = ?", "job3").Error - assert.Nil(t, err) - - // Validate the fields - assert.Equal(t, JobID("job3"), tmpJob2.ID) - assert.Equal(t, 0, tmpJob2.Stats.RunCount) - assert.Equal(t, 0, tmpJob2.Stats.SuccessCount) - assert.Equal(t, 0, tmpJob2.Stats.ErrorCount) - - err = mgr.DeleteJob(job.GetID()) - assert.Nil(t, err) - - }) - - wg.Wait() - cancel() - - select { - case <-done: - time.Sleep(1 * time.Second) - case <-time.After(1 * time.Minute): - t.Error("test hangs, timeout reached") - } } diff --git a/devenv.lock b/devenv.lock index 5a1545ec871fb7f226dee9843b7f93afcbef87a0..760ce636be6b188e97f061413cd3de47443e99f7 100644 --- a/devenv.lock +++ b/devenv.lock @@ -3,11 +3,11 @@ "devenv": { "locked": { "dir": "src/modules", - "lastModified": 1702549996, - "narHash": "sha256-mEN+8gjWUXRxBCcixeth+jlDNuzxbpFwZNOEc4K22vw=", + "lastModified": 1710144971, + "narHash": "sha256-CjTOdoBvT/4AQncTL20SDHyJNgsXZjtGbz62yDIUYnM=", "owner": "cachix", "repo": "devenv", - "rev": "e681a99ffe2d2882f413a5d771129223c838ddce", + "rev": "6c0bad0045f1e1802f769f7890f6a59504825f4d", "type": "github" }, "original": { @@ -20,11 +20,11 @@ "flake-compat": { "flake": false, "locked": { - "lastModified": 1673956053, - "narHash": "sha256-4gtG9iQuiKITOjNQQeQIpoIB6b16fm+504Ch3sNKLd8=", + "lastModified": 1696426674, + "narHash": "sha256-kvjfFW7WAETZlt09AgDn1MrtKzP7t90Vf7vypd3OL1U=", "owner": "edolstra", "repo": "flake-compat", - "rev": "35bb57c0c8d8b62bbfd284272c928ceb64ddbde9", + "rev": "0f9255e01c2351cc7d116c072cb317785dd33b33", "type": "github" }, "original": { @@ -38,11 +38,11 @@ "systems": "systems" }, "locked": { - "lastModified": 1685518550, - "narHash": "sha256-o2d0KcvaXzTrPRIo0kOLV0/QXHhDQ5DTi+OxcjO8xqY=", + "lastModified": 1701680307, + "narHash": "sha256-kAuep2h5ajznlPMD9rnQyffWG8EM/C73lejGofXvdM8=", "owner": "numtide", "repo": "flake-utils", - "rev": "a1720a10a6cfe8234c0e93907ffe81be440f4cef", + "rev": "4022d587cbbfd70fe950c1e2083a02621806a725", "type": "github" }, "original": { @@ -59,11 +59,11 @@ ] }, "locked": { - "lastModified": 1660459072, - "narHash": "sha256-8DFJjXG8zqoONA1vXtgeKXy68KdJL5UaXR8NtVMUbx8=", + "lastModified": 1703887061, + "narHash": "sha256-gGPa9qWNc6eCXT/+Z5/zMkyYOuRZqeFZBDbopNZQkuY=", "owner": "hercules-ci", "repo": "gitignore.nix", - "rev": "a20de23b925fd8264fd7fad6454652e142fd7f73", + "rev": "43e1aa1308018f37118e34d3a9cb4f5e75dc11d5", "type": "github" }, "original": { @@ -74,11 +74,11 @@ }, "nixpkgs": { "locked": { - "lastModified": 1702346276, - "narHash": "sha256-eAQgwIWApFQ40ipeOjVSoK4TEHVd6nbSd9fApiHIw5A=", + "lastModified": 1710162809, + "narHash": "sha256-i2R2bcnQp+85de67yjgZVvJhd6rRnJbSYNpGmB6Leb8=", "owner": "nixos", "repo": "nixpkgs", - "rev": "cf28ee258fd5f9a52de6b9865cdb93a1f96d09b7", + "rev": "ddcd7598b2184008c97e6c9c6a21c5f37590b8d2", "type": "github" }, "original": { @@ -90,32 +90,32 @@ }, "nixpkgs-stable": { "locked": { - "lastModified": 1685801374, - "narHash": "sha256-otaSUoFEMM+LjBI1XL/xGB5ao6IwnZOXc47qhIgJe8U=", + "lastModified": 1704874635, + "narHash": "sha256-YWuCrtsty5vVZvu+7BchAxmcYzTMfolSPP5io8+WYCg=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "c37ca420157f4abc31e26f436c1145f8951ff373", + "rev": "3dc440faeee9e889fe2d1b4d25ad0f430d449356", "type": "github" }, "original": { "owner": "NixOS", - "ref": "nixos-23.05", + "ref": "nixos-23.11", "repo": "nixpkgs", "type": "github" } }, "nixpkgs_2": { "locked": { - "lastModified": 1702350026, - "narHash": "sha256-A+GNZFZdfl4JdDphYKBJ5Ef1HOiFsP18vQe9mqjmUis=", + "lastModified": 1710162809, + "narHash": "sha256-i2R2bcnQp+85de67yjgZVvJhd6rRnJbSYNpGmB6Leb8=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "9463103069725474698139ab10f17a9d125da859", + "rev": "ddcd7598b2184008c97e6c9c6a21c5f37590b8d2", "type": "github" }, "original": { "id": "nixpkgs", - "ref": "nixos-23.05", + "ref": "nixos-23.11", "type": "indirect" } }, @@ -130,11 +130,11 @@ "nixpkgs-stable": "nixpkgs-stable" }, "locked": { - "lastModified": 1702456155, - "narHash": "sha256-I2XhXGAecdGlqi6hPWYT83AQtMgL+aa3ulA85RAEgOk=", + "lastModified": 1708018599, + "narHash": "sha256-M+Ng6+SePmA8g06CmUZWi1AjG2tFBX9WCXElBHEKnyM=", "owner": "cachix", "repo": "pre-commit-hooks.nix", - "rev": "007a45d064c1c32d04e1b8a0de5ef00984c419bc", + "rev": "5df5a70ad7575f6601d91f0efec95dd9bc619431", "type": "github" }, "original": { @@ -171,11 +171,11 @@ "nixpkgs": "nixpkgs_2" }, "locked": { - "lastModified": 1700695799, - "narHash": "sha256-nXRhRE69kULaNxijX7ZF14pGSu6Ar/FIvfKCIut7OXc=", + "lastModified": 1704542622, + "narHash": "sha256-HnFuaOXHoxv8tpBvMsEjfhcl/hFNxEY7GbBqoyJ1U8U=", "ref": "refs/heads/master", - "rev": "fdcc60bfd3642207e50e8e6c89c0a9a7b27a40a9", - "revCount": 41, + "rev": "6b4f85fe6d934429cf3055bbcd8cf15014730118", + "revCount": 114, "type": "git", "url": "https://gitlab.schukai.com/oss/utilities/version.git" }, diff --git a/go.mod b/go.mod index ef70f5fd818283c6eb47a2068ccf5b5cea1a1317..07b3053dd0bb0ad4413d1e11591ecaf1ab4ef727 100644 --- a/go.mod +++ b/go.mod @@ -8,47 +8,50 @@ require ( github.com/docker/go-connections v0.4.0 github.com/fsnotify/fsnotify v1.7.0 github.com/go-chi/chi/v5 v5.0.10 - github.com/google/uuid v1.5.0 + github.com/google/uuid v1.6.0 github.com/pkg/sftp v1.13.6 github.com/robfig/cron/v3 v3.0.1 - github.com/shirou/gopsutil/v3 v3.23.11 + github.com/shirou/gopsutil/v3 v3.24.2 github.com/stretchr/testify v1.8.4 - go.uber.org/zap v1.26.0 - golang.org/x/crypto v0.17.0 + go.uber.org/zap v1.27.0 + golang.org/x/crypto v0.21.0 gopkg.in/yaml.v3 v3.0.1 - gorm.io/driver/mysql v1.5.2 - gorm.io/gorm v1.25.5 + gorm.io/driver/mysql v1.5.4 + gorm.io/gorm v1.25.7 ) require ( + filippo.io/edwards25519 v1.1.0 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/distribution/reference v0.5.0 // indirect github.com/docker/distribution v2.8.3+incompatible // indirect github.com/docker/go-units v0.5.0 // indirect github.com/go-ole/go-ole v1.3.0 // indirect - github.com/go-sql-driver/mysql v1.7.1 // indirect + github.com/go-sql-driver/mysql v1.8.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/kr/fs v0.1.0 // indirect - github.com/lufia/plan9stats v0.0.0-20231016141302-07b5767bb0ed // indirect + github.com/lufia/plan9stats v0.0.0-20240226150601-1dcf7310316a // indirect + github.com/mattn/go-sqlite3 v1.14.17 // indirect github.com/moby/term v0.5.0 // indirect github.com/morikuni/aec v1.0.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.2 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect + github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect github.com/shoenig/go-m1cpu v0.1.6 // indirect github.com/tklauser/go-sysconf v0.3.13 // indirect github.com/tklauser/numcpus v0.7.0 // indirect - github.com/yusufpapurcu/wmi v1.2.3 // indirect + github.com/yusufpapurcu/wmi v1.2.4 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/mod v0.8.0 // indirect - golang.org/x/net v0.10.0 // indirect - golang.org/x/sys v0.15.0 // indirect + golang.org/x/net v0.21.0 // indirect + golang.org/x/sys v0.18.0 // indirect golang.org/x/time v0.3.0 // indirect golang.org/x/tools v0.6.0 // indirect + gorm.io/driver/sqlite v1.5.5 // indirect gotest.tools/v3 v3.5.1 // indirect ) diff --git a/go.sum b/go.sum index 397ae441be46df5bc27a35af6d280d6d62c8dd9c..0e4b77cef74ebf07b51e167498a695aef09417c7 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60= github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= @@ -26,6 +28,8 @@ github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= +github.com/go-sql-driver/mysql v1.8.0 h1:UtktXaU2Nb64z/pLiGIxY4431SJ4/dR5cjMmlVHgnT4= +github.com/go-sql-driver/mysql v1.8.0/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -36,6 +40,8 @@ github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= @@ -47,6 +53,10 @@ github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/lufia/plan9stats v0.0.0-20231016141302-07b5767bb0ed h1:036IscGBfJsFIgJQzlui7nK1Ncm0tp2ktmPj8xO4N/0= github.com/lufia/plan9stats v0.0.0-20231016141302-07b5767bb0ed/go.mod h1:ilwx/Dta8jXAgpFYFvSWEMwxmbWXyiUHkd5FwyKhb5k= +github.com/lufia/plan9stats v0.0.0-20240226150601-1dcf7310316a h1:3Bm7EwfUQUvhNeKIkUct/gl9eod1TcXuj8stxvi/GoI= +github.com/lufia/plan9stats v0.0.0-20240226150601-1dcf7310316a/go.mod h1:ilwx/Dta8jXAgpFYFvSWEMwxmbWXyiUHkd5FwyKhb5k= +github.com/mattn/go-sqlite3 v1.14.17 h1:mCRHCLDUBXgpKAqIKsaAaAsrAlbkeomtRFKXh2L6YIM= +github.com/mattn/go-sqlite3 v1.14.17/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= @@ -64,6 +74,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b h1:0LFwY6Q3gMACTjAbMZBjXAqTOzOwFaj2Ld6cjeQ7Rig= github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= +github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 h1:o4JXh1EVt9k/+g42oCprj/FisM4qX9L3sZB3upGN2ZU= +github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/shirou/gopsutil/v3 v3.23.9 h1:ZI5bWVeu2ep4/DIxB4U9okeYJ7zp/QLTO4auRb/ty/E= @@ -72,6 +84,8 @@ github.com/shirou/gopsutil/v3 v3.23.10 h1:/N42opWlYzegYaVkWejXWJpbzKv2JDy3mrgGzK github.com/shirou/gopsutil/v3 v3.23.10/go.mod h1:JIE26kpucQi+innVlAUnIEOSBhBUkirr5b44yr55+WE= github.com/shirou/gopsutil/v3 v3.23.11 h1:i3jP9NjCPUz7FiZKxlMnODZkdSIp2gnzfrvsu9CuWEQ= github.com/shirou/gopsutil/v3 v3.23.11/go.mod h1:1FrWgea594Jp7qmjHUUPlJDTPgcsb9mGnXDxavtikzM= +github.com/shirou/gopsutil/v3 v3.24.2 h1:kcR0erMbLg5/3LcInpw0X/rrPSqq4CDPyI6A6ZRC18Y= +github.com/shirou/gopsutil/v3 v3.24.2/go.mod h1:tSg/594BcA+8UdQU2XcW803GWYgdtauFFPgJCJKZlVk= github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= @@ -96,11 +110,16 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw= github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= +github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -112,6 +131,8 @@ golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY= golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= +golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= @@ -126,6 +147,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= +golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -150,11 +173,15 @@ golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4= +golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= @@ -179,8 +206,15 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gorm.io/driver/mysql v1.5.2 h1:QC2HRskSE75wBuOxe0+iCkyJZ+RqpudsQtqkp+IMuXs= gorm.io/driver/mysql v1.5.2/go.mod h1:pQLhh1Ut/WUAySdTHwBpBv6+JKcj+ua4ZFx1QQTBzb8= +gorm.io/driver/mysql v1.5.4 h1:igQmHfKcbaTVyAIHNhhB888vvxh8EdQ2uSUT0LPcBso= +gorm.io/driver/mysql v1.5.4/go.mod h1:9rYxJph/u9SWkWc9yY4XJ1F/+xO0S/ChOmbk3+Z5Tvs= +gorm.io/driver/sqlite v1.5.5 h1:7MDMtUZhV065SilG62E0MquljeArQZNfJnjd9i9gx3E= +gorm.io/driver/sqlite v1.5.5/go.mod h1:6NgQ7sQWAIFsPrJJl1lSNSu2TABh0ZZ/zm5fosATavE= gorm.io/gorm v1.25.2-0.20230530020048-26663ab9bf55/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k= gorm.io/gorm v1.25.5 h1:zR9lOiiYf09VNh5Q1gphfyia1JpiClIWG9hQaxB/mls= gorm.io/gorm v1.25.5/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8= +gorm.io/gorm v1.25.7-0.20240204074919-46816ad31dde/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8= +gorm.io/gorm v1.25.7 h1:VsD6acwRjz2zFxGO50gPO6AkNs7KKnvfzUjHQhZDz/A= +gorm.io/gorm v1.25.7/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8= gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU= gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU= diff --git a/logger.go b/logger.go index 7a74477f904337ccefc65ad0a7469ca9f64510b3..241b6b1f1a0f86c498cb61b7b963e0bdaaa03e39 100644 --- a/logger.go +++ b/logger.go @@ -1,23 +1,146 @@ -// Copyright 2023 schukai GmbH -// SPDX-License-Identifier: AGPL-3.0 - package jobqueue -import "go.uber.org/zap" +import ( + "go.uber.org/zap" + "log" + "os" + "sync" +) +// Logger interface, that your library's users have to implement type Logger interface { Info(msg string, keysAndValues ...interface{}) + Warn(msg string, keysAndValues ...interface{}) Error(msg string, keysAndValues ...interface{}) + + Trace(msg string, keysAndValues ...interface{}) +} + +type DefaultLogger struct { + *log.Logger +} + +func (l *DefaultLogger) Info(msg string, keysAndValues ...interface{}) { + _ = l.Output(2, "INFO: "+msg) +} + +func (l *DefaultLogger) Error(msg string, keysAndValues ...interface{}) { + _ = l.Output(2, "ERROR: "+msg) +} + +func (l *DefaultLogger) Warn(msg string, keysAndValues ...interface{}) { + _ = l.Output(2, "WARN: "+msg) +} + +func (l *DefaultLogger) Trace(msg string, keysAndValues ...interface{}) { + _ = l.Output(2, "DEBUG: "+msg) +} + +// Ensure DefaultLogger satisfies the Logger interface. +var _ Logger = (*DefaultLogger)(nil) + +var ( + internalLogger Logger + mu sync.Mutex +) + +func init() { + internalLogger = &DefaultLogger{ + Logger: log.New(os.Stderr, "", log.LstdFlags), + } +} + +// SetLogger sets a custom logger +func SetLogger(l Logger) { + mu.Lock() + defer mu.Unlock() + internalLogger = l +} + +// Info logs an informational message to the current logger +func Info(msg string, keysAndValues ...interface{}) { + mu.Lock() + defer mu.Unlock() + if internalLogger == nil { + return + } + internalLogger.Info(msg, keysAndValues...) +} + +// Error logs an error message to the current logger +func Error(msg string, keysAndValues ...interface{}) { + mu.Lock() + defer mu.Unlock() + if internalLogger == nil { + return + } + internalLogger.Error(msg, keysAndValues...) +} + +// Warn logs a warning message to the current logger +func Warn(msg string, keysAndValues ...interface{}) { + mu.Lock() + defer mu.Unlock() + if internalLogger == nil { + return + } + internalLogger.Warn(msg, keysAndValues...) +} + +func Trace(msg string, keysAndValues ...interface{}) { + mu.Lock() + defer mu.Unlock() + if internalLogger == nil { + return + } + internalLogger.Trace(msg, keysAndValues...) } type ZapAdapter struct { logger *zap.Logger + mu sync.Mutex } -func (z *ZapAdapter) Info(msg string, keysAndValues ...interface{}) { - z.logger.Info(msg, zap.Any("info", keysAndValues)) +func NewZapAdapter(logger *zap.Logger) *ZapAdapter { + return &ZapAdapter{ + logger: logger, + } } -func (z *ZapAdapter) Error(msg string, keysAndValues ...interface{}) { - z.logger.Error(msg, zap.Any("error", keysAndValues)) +func (l *ZapAdapter) Info(msg string, keysAndValues ...interface{}) { + if l.logger == nil { + return + } + l.logger.Info(msg, zap.Any("Info", keysAndValues)) +} + +func (l *ZapAdapter) Warn(msg string, keysAndValues ...interface{}) { + if l.logger == nil { + return + } + l.logger.Warn(msg, zap.Any("Warn", keysAndValues)) +} + +func (l *ZapAdapter) Error(msg string, keysAndValues ...interface{}) { + if l.logger == nil { + return + } + l.logger.Error(msg, zap.Any("Error", keysAndValues)) +} + +func (l *ZapAdapter) Trace(msg string, keysAndValues ...interface{}) { + if l.logger == nil { + return + } + l.logger.Debug(msg, zap.Any("Debug", keysAndValues)) +} + +// Ensure ZapAdapter satisfies the Logger interface. +var _ Logger = (*ZapAdapter)(nil) + +// SetZapLogger sets a custom logger +func SetZapLogger(logger *zap.Logger) { + mu.Lock() + defer mu.Unlock() + internalLogger = NewZapAdapter(logger) } diff --git a/logger_test.go b/logger_test.go new file mode 100644 index 0000000000000000000000000000000000000000..901ba8cc8d7c74a280c43365da45104848ddfdb4 --- /dev/null +++ b/logger_test.go @@ -0,0 +1,89 @@ +package jobqueue + +import ( + "bytes" + "log" + "strings" + "testing" +) + +type MockLogger struct { + Logs []string +} + +func (l *MockLogger) Info(msg string, keysAndValues ...interface{}) { + l.Logs = append(l.Logs, "INFO: "+msg) +} + +func (l *MockLogger) Warn(msg string, keysAndValues ...interface{}) { + l.Logs = append(l.Logs, "WARN: "+msg) +} + +func (l *MockLogger) Error(msg string, keysAndValues ...interface{}) { + l.Logs = append(l.Logs, "ERROR: "+msg) +} + +func (l *MockLogger) Trace(msg string, keysAndValues ...interface{}) { + l.Logs = append(l.Logs, "DEBUG: "+msg) +} + +func TestSetLogger(t *testing.T) { + mockLogger := &MockLogger{} + SetLogger(mockLogger) +} + +func TestInfo(t *testing.T) { + mockLogger := &MockLogger{} + SetLogger(mockLogger) + + Info("test info") + + if len(mockLogger.Logs) != 1 || !strings.HasPrefix(mockLogger.Logs[0], "INFO:") { + t.Errorf("Info log not as expected") + } +} + +func TestError(t *testing.T) { + mockLogger := &MockLogger{} + SetLogger(mockLogger) + + Error("test error") + + if len(mockLogger.Logs) != 1 || !strings.HasPrefix(mockLogger.Logs[0], "ERROR:") { + t.Errorf("Error log not as expected") + } +} + +func TestWarn(t *testing.T) { + mockLogger := &MockLogger{} + SetLogger(mockLogger) + + Warn("test warning") + + if len(mockLogger.Logs) != 1 || !strings.HasPrefix(mockLogger.Logs[0], "WARN:") { + t.Errorf("Warn log not as expected") + } +} + +func TestDebug(t *testing.T) { + mockLogger := &MockLogger{} + SetLogger(mockLogger) + + Trace("test debug") + + if len(mockLogger.Logs) != 1 || !strings.HasPrefix(mockLogger.Logs[0], "DEBUG:") { + t.Errorf("Debug log not as expected") + } +} + +func TestDefaultLogger(t *testing.T) { + buffer := bytes.NewBufferString("") + SetLogger(&DefaultLogger{Logger: log.New(buffer, "", log.LstdFlags)}) + + message := "Hello World" + Info(message) + + if !strings.Contains(buffer.String(), message) { + t.Errorf("Expected %v in the output logs", message) + } +} diff --git a/manager.go b/manager.go index 8a6fa56de40833f53b215248055dff4313ae6259..ea0d2c77c5f3e8c24d95ff67d449851c8072f3af 100644 --- a/manager.go +++ b/manager.go @@ -4,6 +4,7 @@ package jobqueue import ( + "errors" "fmt" "github.com/fsnotify/fsnotify" "github.com/robfig/cron/v3" @@ -31,7 +32,7 @@ type Manager struct { jobEventCh chan interface{} cronInstance *cron.Cron - logger Logger + //logger Logger database *gorm.DB dbSaver *DBSaver @@ -400,12 +401,20 @@ func (m *Manager) Start() error { } if m.dbSaver != nil { - err = m.dbSaver.Start() - if err != nil { - if m.logger != nil { - m.logger.Error("Error while starting db saver", "error", err) - } - } + p := StartDBSaver(m.dbSaver) + + ready := make(chan struct{}) + + Then[bool, bool](p, func(value bool) (bool, error) { + close(ready) + return value, nil + }, func(e error) error { + close(ready) + Error("Error while starting db saver", "error", err) + return nil + }) + + <-ready } if len(m.workerMap) == 0 { @@ -416,7 +425,7 @@ func (m *Manager) Start() error { for _, worker := range m.workerMap { err := worker.Start() - if err != nil && err != ErrWorkerAlreadyRunning { + if err != nil && !errors.Is(err, ErrWorkerAlreadyRunning) { if wrappedErr == nil { wrappedErr = fmt.Errorf("Error: ") } @@ -501,15 +510,12 @@ func (m *Manager) Stop() error { func (m *Manager) SetLogger(logger Logger) *Manager { m.mu.Lock() defer m.mu.Unlock() - m.logger = logger - return m -} -// GetLogger returns the logger -func (m *Manager) GetLogger() Logger { - m.mu.Lock() - defer m.mu.Unlock() - return m.logger + if m.database != nil { + m.database = m.database.Session(&gorm.Session{Logger: newGormAdapter()}) + } + + return m } // RunJob runs a job immediately and does not schedule it @@ -606,23 +612,18 @@ func (m *Manager) handleJobEvents() { switch event := event.(type) { case Event: - if m.logger != nil { - m.logger.Info("Event received", "event", event.Name, "data", event.Data, "message_id", event.MessageID) - } + Info("Event received", "event", event.Name, "data", event.Data, "message_id", event.MessageID) switch event.Name { case QueueJob: - if m.logger != nil { - m.logger.Info("Job queued", "job_id", event.Data.(GenericJob).GetID()) - } + Info("Job queued", "job_id", event.Data.(GenericJob).GetID()) job := event.Data.(GenericJob) err := m.queue.Enqueue(job) if err != nil && err != ErrJobAlreadyExists { - if m.logger != nil { - m.logger.Error("Error while queueing job", "error", err) - } + Error("Error while queueing job", "error", err) + } case JobReady: @@ -633,9 +634,7 @@ func (m *Manager) handleJobEvents() { break } - if m.logger != nil { - m.logger.Info("Job ready", "job_id", nextJob.GetID()) - } + Info("Job ready", "job_id", nextJob.GetID()) assigned := false maxTries := 10 @@ -658,9 +657,8 @@ func (m *Manager) handleJobEvents() { } if !assigned { - if m.logger != nil { - m.logger.Info("No worker available for job", "job_id", nextJob.GetID()) - } + Info("No worker available for job", "job_id", nextJob.GetID()) + } else { if nextJob.GetScheduler() != nil { @@ -682,9 +680,8 @@ func (m *Manager) handleJobEvents() { err := m.CancelJobSchedule(job.GetID()) if err != nil { - if m.logger != nil { - m.logger.Error("Error while canceling job schedule", "error", err) - } + Error("Error while canceling job schedule", "error", err) + } } } diff --git a/promise.go b/promise.go new file mode 100644 index 0000000000000000000000000000000000000000..704f5c4eb866fb7e68459a60dc7a458c897cc80a --- /dev/null +++ b/promise.go @@ -0,0 +1,72 @@ +package jobqueue + +import ( + "sync" +) + +type Promise[T any] struct { + isDone bool + value T + err error + mux sync.Mutex + done chan struct{} + resolve func(T) + reject func(error) +} + +func NewPromise[T any](fn func(resolve func(T), reject func(error))) *Promise[T] { + p := &Promise[T]{ + done: make(chan struct{}), + } + p.resolve = func(value T) { + p.mux.Lock() + defer p.mux.Unlock() + if p.isDone { + return + } + p.isDone = true + p.value = value + close(p.done) + } + p.reject = func(err error) { + p.mux.Lock() + defer p.mux.Unlock() + if p.isDone { + return + } + p.isDone = true + p.err = err + close(p.done) + } + go fn(p.resolve, p.reject) + return p +} +func Then[T, U any](p *Promise[T], onFulfilled func(T) (U, error), onRejected func(error) error) *Promise[U] { + + return NewPromise[U](func(resolve func(U), reject func(error)) { + + <-p.done + + p.mux.Lock() + defer p.mux.Unlock() + if p.err != nil { + err := onRejected(p.err) + reject(err) + } else { + go func() { + res, err := onFulfilled(p.value) + if err != nil { + reject(err) + } else { + resolve(res) + } + }() + } + }) +} + +func Catch[T any](p *Promise[T], onRejected func(error) error) *Promise[T] { + return Then[T, T](p, func(value T) (T, error) { + return value, nil + }, onRejected) +} diff --git a/promise_test.go b/promise_test.go new file mode 100644 index 0000000000000000000000000000000000000000..ba3499f1113aa94f1015efc76416870df07e3e38 --- /dev/null +++ b/promise_test.go @@ -0,0 +1,356 @@ +package jobqueue + +import ( + "errors" + "strconv" + "sync" + "testing" + "time" +) + +func TestResolve(t *testing.T) { + p := NewPromise[int](func(resolve func(int), reject func(error)) { + time.Sleep(time.Second) + resolve(42) + }) + + wg := sync.WaitGroup{} + wg.Add(1) + + Then(p, func(value int) (string, error) { + wg.Done() + return "ok", nil + }, func(e error) error { + t.Fail() + return nil + }) + + wg.Wait() +} + +func TestReject(t *testing.T) { + p := NewPromise[int](func(resolve func(int), reject func(error)) { + reject(errors.New("rejected")) + }) + + wg := sync.WaitGroup{} + wg.Add(1) + + Then(p, func(value int) (string, error) { + t.Fail() + return "", errors.New("should not be called") + }, func(e error) error { + if e.Error() != "rejected" { + t.Fail() + } + wg.Done() + return nil + }) + wg.Wait() +} + +func TestCatch(t *testing.T) { + p := NewPromise[int](func(resolve func(int), reject func(error)) { + reject(errors.New("catch error")) + }) + + wg := sync.WaitGroup{} + wg.Add(1) + + Catch(p, func(e error) error { + if e.Error() != "catch error" { + t.Fail() + } + wg.Done() + return nil + }) + wg.Wait() +} + +func TestChaining(t *testing.T) { + p := NewPromise[int](func(resolve func(int), reject func(error)) { + resolve(42) + }) + + wg := sync.WaitGroup{} + wg.Add(1) + + p2 := Then(p, func(value int) (string, error) { + return "ok", nil + }, func(e error) error { + t.Fail() + return nil + }) + Catch(p2, func(e error) error { + t.Fail() + return nil + }) + + Then(p2, func(value string) (int, error) { + wg.Done() + return 1, nil + }, func(e error) error { + t.Fail() + return nil + }) + + wg.Wait() +} + +func TestChainingWithMultipleThen(t *testing.T) { + p := NewPromise[int](func(resolve func(int), reject func(error)) { + resolve(42) + }) + + wg := sync.WaitGroup{} + wg.Add(1) + + p2 := Then(p, func(value int) (string, error) { + return "ok", nil + }, func(e error) error { + t.Fail() + return nil + }) + p3 := Then(p2, func(value string) (int, error) { + wg.Done() + return 1, nil + }, func(e error) error { + t.Fail() + return nil + }) + Catch(p3, func(e error) error { + t.Fail() + return nil + }) + wg.Wait() +} + +func TestThenWithManyInputs(t *testing.T) { + // Testfälle mit verschiedenen Eingabetypen und -werten + testCases := []struct { + name string + input interface{} + expected interface{} + }{ + // Erfolgreiche Versprechen mit verschiedenen Datentypen + {"test int", 10, 10}, + {"test string", "Hallo Welt", "Hallo Welt"}, + {"test bool", true, true}, + {"test struct", struct{ Name string }{Name: "Bard"}, struct{ Name string }{Name: "Bard"}}, + + // Erfolgreiche Versprechen mit komplexen Werten + {"test int array", []int{}, []int{}}, + {"test string map", map[string]string{}, map[string]string{}}, + {"test struct pointer", &struct{ Name string }{Name: "Bard"}, &struct{ Name string }{Name: "Bard"}}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + p := NewPromise[any](func(resolve func(interface{}), reject func(error)) { + resolve(tc.input) + }) + + wg := &sync.WaitGroup{} + wg.Add(1) + + Then(p, func(value any) (interface{}, error) { + + switch value.(type) { + case int: + exp := tc.expected.(int) + if value.(int) != exp { + t.Fail() + } + case string: + exp := tc.expected.(string) + if value != exp { + t.Fail() + } + + case bool: + exp := tc.expected.(bool) + if value != exp { + t.Fail() + } + + case struct{ Name string }: + exp := tc.expected.(struct{ Name string }) + if value != exp { + t.Fail() + } + + case []int: + exp := tc.expected.([]int) + if len(value.([]int)) != len(exp) { + t.Fail() + } + + case map[string]string: + exp := tc.expected.(map[string]string) + if len(value.(map[string]string)) != len(exp) { + t.Fail() + } + + case *struct{ Name string }: + exp := tc.expected.(*struct{ Name string }) + convertValue := value.(*struct{ Name string }) + if convertValue.Name != exp.Name { + t.Fail() + } + + default: + t.Fail() + } + + wg.Done() + return nil, nil + }, func(e error) error { + t.Fail() + return nil + }) + + wg.Wait() + + }) + } +} + +func TestPromiseSuccess(t *testing.T) { + promise := NewPromise[int](func(resolve func(int), reject func(error)) { + time.Sleep(100 * time.Millisecond) + resolve(42) + }) + + resultPromise := Then(promise, func(value int) (string, error) { + return "Value is: " + strconv.Itoa(value), nil + }, func(err error) error { + return err + }) + + <-resultPromise.done + + if resultPromise.err != nil { + t.Errorf("Promise was rejected with error: %v", resultPromise.err) + } + + if resultPromise.value != "Value is: 42" { + t.Errorf("Promise was resolved with incorrect value: %v", resultPromise.value) + } +} + +func TestPromiseFailure(t *testing.T) { + expectedError := errors.New("something went wrong") + + promise := NewPromise[int](func(resolve func(int), reject func(error)) { + time.Sleep(100 * time.Millisecond) + reject(expectedError) + }) + + resultPromise := Catch(promise, func(err error) error { + if err != expectedError { + t.Errorf("Expected error to be %v, but got %v", expectedError, err) + } + return err // Should return the error to keep the promise chain accurate. + }) + + <-resultPromise.done + + if resultPromise.err != expectedError { + t.Errorf("Expected error to be %v, but got %v", expectedError, resultPromise.err) + } +} + +// Der dritte Testfall kann unverändert bleiben + +func TestPromiseChain(t *testing.T) { + promise := NewPromise[int](func(resolve func(int), reject func(error)) { + time.Sleep(100 * time.Millisecond) + resolve(1) + }) + + firstResultPromise := Then(promise, func(value int) (int, error) { + return value + 1, nil + }, func(err error) error { + return err + }) + + resultPromise := Then(firstResultPromise, func(value int) (int, error) { + return value * 2, nil + }, func(err error) error { + return err + }) + + <-resultPromise.done + + if resultPromise.err != nil { + t.Errorf("Promise was rejected with error: %v", resultPromise.err) + } + + if resultPromise.value != 4 { + t.Errorf("Promise was resolved with incorrect value: %v, expected %v", resultPromise.value, 4) + } +} + +func BenchmarkPromise(b *testing.B) { + for i := 0; i < b.N; i++ { + promise := NewPromise[int](func(resolve func(int), reject func(error)) { + // Simulieren Sie eine Aufgabe, die einige Zeit in Anspruch nimmt + time.Sleep(10 * time.Millisecond) + resolve(42) + }) + + <-promise.done // Warten, bis das Promise-Objekt vollständig gelöst ist + if promise.err != nil { + b.Fatal("Promise was rejected with error:", promise.err) + } + } +} + +func FuzzPromise(f *testing.F) { + testCases := []struct { + value int + shouldErr bool + }{ + {value: 42, shouldErr: false}, + {value: -1, shouldErr: true}, + } + + for _, tc := range testCases { + f.Add(tc.value, tc.shouldErr) // Add test cases to the Fuzz function + } + + f.Fuzz(func(t *testing.T, value int, shouldErr bool) { + done := make(chan struct{}) + promise := NewPromise[int](func(resolve func(int), reject func(error)) { + if shouldErr { + reject(errors.New("error")) + } else { + resolve(value) + } + }) + + Then(promise, func(data int) (int, error) { + if data != value { + t.Errorf("Expected result to be %v, got %v", value, data) + } + close(done) + return data, nil + }, func(err error) error { + if !shouldErr { + t.Errorf("Expected no error, but got: %v", err) + } + close(done) + return err // In real case, we'd expect some error handling here. + }) + + <-done // Wait until Then is resolved or rejected + + if shouldErr && promise.err == nil { + t.Errorf("Expected an error but did not get one") + } + + if promise.err != nil && !shouldErr { + t.Errorf("Did not expect an error but got one: %v", promise.err) + } + }) +} diff --git a/queue.go b/queue.go index 1df0d3eeb15d83e89f35792d3ba5b8c188425bb1..a1651123193c2252b2113cc790655fb9ef4b0346 100644 --- a/queue.go +++ b/queue.go @@ -121,10 +121,8 @@ func (q *Queue) Enqueue(job GenericJob) error { q.readyQueue = newReadyQueue if q.eventBus != nil && len(q.readyQueue) > 0 { - if q.manger != nil && q.manger.logger != nil { - q.manger.logger.Info("Job ready", "job_id", job.GetID()) - } - + Info("Job ready", "job_id", job.GetID()) + q.eventBus.Publish(JobReady, nil) } diff --git a/worker.go b/worker.go index f43a131c3702554fd694cf961c1f28c33a3eba96..d2fbb0e8dfcb565e3a5ec4cf72e9f770dca5c801 100644 --- a/worker.go +++ b/worker.go @@ -116,9 +116,7 @@ func (w *LocalWorker) Start() error { w.wg.Wait() w.status = WorkerStatusRunning - if w.manager != nil && w.manager.logger != nil { - w.manager.logger.Info("Worker started", "worker", w.ID) - } + Info("Worker started", "worker", w.ID) return nil } @@ -164,9 +162,7 @@ func (w *LocalWorker) Stop() error { stopChan <- true } - if w.manager != nil && w.manager.logger != nil { - w.manager.logger.Info("Worker stopped", "worker", w.ID) - } + Info("Worker stopped", "worker", w.ID) return nil } @@ -175,10 +171,8 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel workerThreadID := w.ID.String() + "-" + fmt.Sprintf("%p", &w) - if w.manager != nil && w.manager.logger != nil { - w.manager.logger.Info("Worker thread with id started", "worker", w.ID, "thread_id", workerThreadID) - } - + Info("Worker thread with id started", "worker", w.ID, "thread_id", workerThreadID) + stopFlag := false w.wg.Done() @@ -212,9 +206,7 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel ctx, cancel = context.WithTimeout(ctx, timeout) } - if w.manager != nil && w.manager.logger != nil { - w.manager.logger.Info("Executing job on worker thread", "worker", w.ID, "thread_id", workerThreadID, "job_id", job.GetID()) - } + Info("Executing job on worker thread", "worker", w.ID, "thread_id", workerThreadID, "job_id", job.GetID()) _, err = job.Execute(ctx) jobFailed := false @@ -244,9 +236,8 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel if w.manager != nil && w.manager.dbSaver != nil { err = w.manager.dbSaver.SaveJob(job) if err != nil { - if w.manager.logger != nil { - w.manager.logger.Error("Error while saving job", "job_id", job.GetID(), "error", err) - } + Error("Error while saving job", "job_id", job.GetID(), "error", err) + } } @@ -264,9 +255,7 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel } } - if w.manager != nil && w.manager.logger != nil { - w.manager.logger.Info("Worker thread with id stopped", "worker", w.ID, "thread_id", workerThreadID) - } + Info("Worker thread with id stopped", "worker", w.ID, "thread_id", workerThreadID) }