Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • oss/libraries/go/services/job-queues
1 result
Select Git revision
Show changes
Commits on Source (1)
......@@ -6,3 +6,5 @@
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
# GitHub Copilot persisted chat sessions
/copilot/chatSessions
......@@ -15,7 +15,7 @@ The library also provides a `Cron` instance for scheduling jobs.
### Prerequisites
- Go 1.20+
- Go 1.22+
### Installation
......@@ -51,8 +51,6 @@ func main() {
```
### Job
......
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
//go:build !runOnTask
package jobqueue
import (
"github.com/stretchr/testify/assert"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"testing"
"time"
)
func TestWriteToDB1(t *testing.T) {
gormDB, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{})
if err != nil {
t.Fatalf("a error occurred while opening the database: %v", err)
}
manager := &Manager{database: gormDB}
saver := NewDBSaver().SetManager(manager)
// Starte den DBSaver
p := StartDBSaver(saver)
ready := make(chan struct{})
Then[bool, bool](p, func(value bool) (bool, error) {
close(ready)
return value, nil
}, func(e error) error {
close(ready)
Error("Error while starting db saver", "error", err)
return nil
})
<-ready
runner := &CounterRunnable{}
job := NewJob[CounterResult]("job1", runner)
scheduler := &TimeScheduler{
Time: time.Now().Add(1 * time.Second),
jobs: nil,
}
job.scheduler = scheduler
err = saver.SaveJob(job)
assert.Nil(t, err)
err = saver.SaveJob(job)
assert.Nil(t, err)
time.Sleep(1 * time.Second)
err = saver.Stop()
assert.Nil(t, err)
}
//go:build !runOnTask
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
"github.com/stretchr/testify/assert"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"runtime"
"sync"
"testing"
"time"
)
func TestWriteToDB2(t *testing.T) {
gormDB, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{})
if err != nil {
t.Fatalf("a error occurred while opening the database: %v", err)
}
manager := &Manager{database: gormDB}
saver := NewDBSaver().SetManager(manager)
// Starte den DBSaver
p := StartDBSaver(saver)
ready := make(chan struct{})
Then[bool, bool](p, func(value bool) (bool, error) {
close(ready)
return value, nil
}, func(e error) error {
close(ready)
Error("Error while starting db saver", "error", err)
return nil
})
db := gormDB
<-ready
var wg sync.WaitGroup
wg.Add(1)
defer wg.Done()
mgr := NewManager()
mgr.SetDB(gormDB)
// run sub tests
wg.Add(1)
defer wg.Done()
worker := NewLocalWorker(1)
err = mgr.AddWorker(worker)
assert.Nil(t, err)
err = mgr.Start()
assert.Nil(t, err)
runner := &CounterRunnable{}
job := NewJob[CounterResult]("job1", runner)
scheduler := &InstantScheduler{}
err = mgr.ScheduleJob(job, scheduler)
assert.Nil(t, err)
err = mgr.CancelJobSchedule("job1")
assert.Nil(t, err)
time.Sleep(1 * time.Second)
scheduler2 := &InstantScheduler{}
err = mgr.ScheduleJob(job, scheduler2)
assert.Nil(t, err)
err = mgr.CancelJobSchedule("job1")
assert.Nil(t, err)
time.Sleep(1 * time.Second)
scheduler3 := &InstantScheduler{}
err = mgr.ScheduleJob(job, scheduler3)
assert.Nil(t, err)
time.Sleep(1 * time.Second)
if mgr.dbSaver == nil {
t.Error("mgr.dbSaver == nil")
return
}
time.Sleep(1 * time.Second)
err = mgr.dbSaver.SaveJob(job)
assert.Nil(t, err)
runtime.Gosched()
time.Sleep(1 * time.Second)
err = mgr.CancelJobSchedule("job1")
assert.Nil(t, err)
runtime.Gosched()
time.Sleep(1 * time.Second)
scheduler4 := &InstantScheduler{}
err = mgr.ScheduleJob(job, scheduler4)
assert.Nil(t, err)
runtime.Gosched()
time.Sleep(1 * time.Second)
err = mgr.dbSaver.SaveJob(job)
assert.Nil(t, err)
time.Sleep(2 * time.Second)
err = mgr.CancelJobSchedule("job1")
assert.Nil(t, err)
tries := 10
for tries > 0 {
var tmpJob JobPersistence
if err := db.First(&tmpJob, "id = ?", "job1").Error; err == nil {
break
}
tries--
time.Sleep(1 * time.Second)
}
assert.True(t, tries > 0)
err = LoadJobsAndScheduleFromDatabase(db, mgr)
assert.Nil(t, err)
time.Sleep(1 * time.Second)
err = mgr.dbSaver.SaveJob(job)
}
//go:build !runOnTask
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
"github.com/stretchr/testify/assert"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"testing"
"time"
)
func TestWriteToDB3(t *testing.T) {
db, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{})
if err != nil {
t.Fatalf("a error occurred while opening the database: %v", err)
}
manager := NewManager()
manager.SetDB(db)
err = manager.AddWorker(NewLocalWorker(1))
assert.Nil(t, err)
err = manager.Start()
assert.Nil(t, err)
runner := &CounterRunnable{}
job := NewJob[CounterResult]("job2", runner)
scheduler := &InstantScheduler{}
err = manager.ScheduleJob(job, scheduler)
assert.Nil(t, err)
time.Sleep(1 * time.Second)
err = manager.DeleteJob(job.GetID())
assert.Nil(t, err)
// test is job in database
var tmpJob JobPersistence
err = db.First(&tmpJob, "id = ?", "job2").Error
assert.NotNil(t, err)
}
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
"github.com/stretchr/testify/assert"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"testing"
"time"
)
func TestWriteToDB4(t *testing.T) {
db, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{})
if err != nil {
t.Fatalf("a error occurred while opening the database: %v", err)
}
manager := NewManager()
manager.SetDB(db)
err = manager.AddWorker(NewLocalWorker(1))
assert.Nil(t, err)
err = manager.Start()
assert.Nil(t, err)
runner := &CounterRunnable{}
job := NewJob[CounterResult]("job3", runner)
job.stats = JobStats{
JobID: job.GetID(),
RunCount: 20,
SuccessCount: 30,
ErrorCount: 40,
}
scheduler := &InstantScheduler{}
err = manager.ScheduleJob(job, scheduler)
assert.Nil(t, err)
time.Sleep(200 * time.Millisecond)
// check is stats are the values above
var tmpJob JobPersistence
err = db.Preload("Stats").First(&tmpJob, "id = ?", "job3").Error
assert.Nil(t, err)
// Validate the fields
assert.Equal(t, JobID("job3"), tmpJob.ID)
assert.Equal(t, 21, tmpJob.Stats.RunCount) // +1 because of the first run
assert.Equal(t, 31, tmpJob.Stats.SuccessCount) // +1 because of the first run
assert.Equal(t, 40, tmpJob.Stats.ErrorCount)
// reset stats
err = manager.ResetJobStats(job.GetID())
assert.Nil(t, err)
time.Sleep(2 * time.Second)
var tmpJob2 JobPersistence
// check is stats are the values above
err = db.First(&tmpJob2, "id = ?", "job3").Error
err = db.Preload("Stats").First(&tmpJob2, "id = ?", "job3").Error
assert.Nil(t, err)
// Validate the fields
assert.Equal(t, JobID("job3"), tmpJob2.ID)
assert.Equal(t, 0, tmpJob2.Stats.RunCount)
assert.Equal(t, 0, tmpJob2.Stats.SuccessCount)
assert.Equal(t, 0, tmpJob2.Stats.ErrorCount)
err = manager.DeleteJob(job.GetID())
assert.Nil(t, err)
}
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
"github.com/stretchr/testify/assert"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"testing"
)
func TestWriteToDB5(t *testing.T) {
db, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{})
if err != nil {
t.Fatalf("a error occurred while opening the database: %v", err)
}
manager := NewManager()
manager.SetDB(db)
err = manager.AddWorker(NewLocalWorker(1))
assert.Nil(t, err)
err = manager.Start()
assert.Nil(t, err)
mgr := manager
runner := &CounterRunnable{}
job := NewJob[CounterResult]("jobSameID", runner)
scheduler := &InstantScheduler{}
err = mgr.ScheduleJob(job, scheduler)
assert.Nil(t, err)
sameIDJob := NewJob[CounterResult]("jobSameID", runner)
// Trying to save a job with the same ID should return an error
err = mgr.dbSaver.SaveJob(sameIDJob)
assert.Nil(t, err)
err = mgr.CancelJobSchedule("jobSameID")
assert.Nil(t, err)
err = mgr.dbSaver.Stop()
assert.Nil(t, err)
}
//go:build !runOnTask
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
"fmt"
"github.com/stretchr/testify/assert"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"testing"
"time"
)
func TestWriteToDB6(t *testing.T) {
db, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{})
if err != nil {
t.Fatalf("a error occurred while opening the database: %v", err)
}
manager := NewManager()
manager.SetDB(db)
err = manager.AddWorker(NewLocalWorker(1))
assert.Nil(t, err)
err = manager.Start()
assert.Nil(t, err)
mgr := manager
numJobs := 1000
jobIDs := make([]JobID, numJobs)
for i := 0; i < numJobs; i++ {
jobID := JobID(fmt.Sprintf("burstJob%d", i))
jobIDs[i] = jobID
runner := &CounterRunnable{}
job := NewJob[CounterResult](jobID, runner)
scheduler := &InstantScheduler{}
err = mgr.ScheduleJob(job, scheduler)
assert.Nil(t, err)
err = mgr.dbSaver.SaveJob(job)
assert.Nil(t, err)
}
time.Sleep(10 * time.Second)
for _, jobID := range jobIDs {
var tmpJob JobPersistence
err = db.First(&tmpJob, "id = ?", jobID).Error
if err != nil {
t.Errorf("Job id %s not found in database: %v", jobID, err)
}
}
// check if all jobs are in the database
var jobCount int64
err = db.Model(&JobPersistence{}).Count(&jobCount).Error
assert.Nil(t, err)
assert.Equal(t, int64(numJobs), jobCount)
}
package jobqueue
import (
"context"
"gorm.io/gorm/logger"
"time"
)
type GormAdapter struct{}
func (GormAdapter) LogMode(logger.LogLevel) logger.Interface {
return GormAdapter{} // Noop. Not needed in this case.
}
func (GormAdapter) Info(ctx context.Context, message string, data ...interface{}) {
Info(message, data...)
}
func (GormAdapter) Warn(ctx context.Context, message string, data ...interface{}) {
Warn(message, data...)
}
func (GormAdapter) Error(ctx context.Context, message string, data ...interface{}) {
Error(message, data...)
}
func (GormAdapter) Trace(ctx context.Context, begin time.Time, fc func() (string, int64), err error) {
elapsed := time.Since(begin)
sql, rowsAffected := fc()
if err != nil {
Error("gorm trace",
"elapsed", elapsed,
"sql", sql,
"rows", rowsAffected,
"err", err,
)
return
}
Trace("gorm trace",
"elapsed", elapsed,
"sql", sql,
"rows", rowsAffected,
)
}
var _ logger.Interface = (*GormAdapter)(nil)
func newGormAdapter() *GormAdapter {
return &GormAdapter{}
}
package jobqueue
import (
"testing"
)
func TestDBLogger(t *testing.T) {
mockLogger := newGormAdapter()
if mockLogger == nil {
t.Error("mockLogger == nil")
return
}
}
......@@ -4,9 +4,12 @@
package jobqueue
import (
"context"
"errors"
"gorm.io/gorm"
"math/rand"
"sync"
"time"
)
type DBSaverStatus int
......@@ -17,12 +20,13 @@ const (
)
type DBSaver struct {
saveChannel chan GenericJob
stopChan chan struct{}
migrateFlag bool
manager *Manager
status DBSaverStatus
mu sync.Mutex
saveChannel chan GenericJob
stopChan chan struct{}
migrateFlag bool
manager *Manager
status DBSaverStatus
mu sync.Mutex
jobSaveProgress sync.WaitGroup
}
type RunnerData string
......@@ -31,7 +35,7 @@ type SchedulerData string
// NewDBSaver creates a new DBSaver
func NewDBSaver() *DBSaver {
return &DBSaver{
saveChannel: make(chan GenericJob, 100),
saveChannel: make(chan GenericJob, 1000),
stopChan: make(chan struct{}),
}
}
......@@ -65,191 +69,250 @@ func (s *DBSaver) isStatus(status DBSaverStatus, lock bool) bool {
}
// Start starts the DBSaver
func (s *DBSaver) Start() error {
func StartDBSaver[P *Promise[bool]](s *DBSaver) *Promise[bool] {
s.mu.Lock()
defer s.mu.Unlock()
if s.manager == nil || s.manager.database == nil {
return ErrNoDatabaseConnection
}
return NewPromise[bool](func(resolve func(bool), reject func(error)) {
if s.isStatus(DBSaverStatusRunning, false) {
return nil
}
if s.manager == nil || s.manager.database == nil {
reject(ErrNoDatabaseConnection)
return
}
db := s.manager.database
if s.isStatus(DBSaverStatusRunning, false) {
resolve(true)
return
}
if !s.migrateFlag {
err := db.AutoMigrate(&JobPersistence{}, &JobLog{}, &JobStats{})
if err != nil {
return err
db := s.manager.database
if !s.migrateFlag {
err := db.AutoMigrate(&JobPersistence{}, &JobLog{}, &JobStats{})
if err != nil {
reject(err)
return
}
s.migrateFlag = true
}
ready := make(chan struct{})
go runSaver(s, db, ready)
<-ready
resolve(true)
})
}
//
//// Start starts the DBSaver
//func (s *DBSaver) Start() error {
// s.mu.Lock()
// defer s.mu.Unlock()
//
// if s.manager == nil || s.manager.database == nil {
// return ErrNoDatabaseConnection
// }
//
// if s.isStatus(DBSaverStatusRunning, false) {
// return nil
// }
//
// db := s.manager.database
//
// if !s.migrateFlag {
// err := db.AutoMigrate(&JobPersistence{}, &JobLog{}, &JobStats{})
// if err != nil {
// return err
// }
// s.migrateFlag = true
// }
//
// go runSaver(s, db)
// return nil
//}
func runSaver(s *DBSaver, db *gorm.DB, ready chan struct{}) {
s.setStatus(DBSaverStatusRunning)
defer func() {
// this runs after the function returns
// and needs to be protected by the lock
// of the setStatus method
//s.status = DBSaverStatusStopped
s.setStatus(DBSaverStatusStopped)
}()
close(ready)
for {
select {
case job := <-s.saveChannel:
s.jobSaveProgress.Add(1)
err := CreateOrUpdateJob(job, db)
if err != nil {
Error("Error while saving job", "error", err)
}
s.jobSaveProgress.Done()
case <-s.stopChan:
return
}
s.migrateFlag = true
}
}
var wg sync.WaitGroup
wg.Add(1)
func CreateOrUpdateJob(job GenericJob, db *gorm.DB) error {
go func() {
wg.Done()
// this is protected by the lock above
s.status = DBSaverStatusRunning
defer func() {
// this runs after the function returns
// and needs to be protected by the lock
// of the setStatus method
s.status = DBSaverStatusStopped
//s.setStatus(DBSaverStatusStopped)
}()
for {
select {
case job := <-s.saveChannel:
err := db.Transaction(func(tx *gorm.DB) error {
permJob := job.GetPersistence()
memLogs := permJob.Logs
permJob.Logs = nil
var existingJob JobPersistence
result := tx.Unscoped().Where("id = ?", permJob.GetID()).First(&existingJob)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
err := tx.Create(&permJob).Error
if err != nil {
return err
}
} else {
return result.Error
}
} else {
db.Unscoped().Model(&existingJob).Update("deleted_at", nil)
tx.Model(&existingJob.Scheduler).Select(
[]string{
"type",
"interval",
"spec",
"delay",
"event",
"time",
"executed",
}).UpdateColumns(SchedulerPersistence{
Type: "",
Interval: 0,
Spec: "",
Delay: 0,
Event: "",
Time: nil,
Executed: false,
})
err := tx.Model(&existingJob).Updates(permJob).Error
if err != nil {
return err
}
}
tx.Model(&permJob.Stats).
Select(
[]string{
"run_count",
"success_count",
"error_count",
"time_metrics_avg_run_time",
"time_metrics_max_run_time",
"time_metrics_min_run_time",
"time_metrics_total_run_time",
},
).
UpdateColumns(permJob.Stats)
for i, _ := range memLogs {
memLogs[i].LogID = 0
_ = tx.Create(&memLogs[i])
// no error handling, if it fails, it fails
}
return nil // Commit the transaction
})
return db.Transaction(func(tx *gorm.DB) error {
permJob := job.GetPersistence()
memLogs := permJob.Logs
permJob.Logs = nil
var existingJob JobPersistence
result := tx.Unscoped().Where("id = ?", permJob.GetID()).First(&existingJob)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
err := tx.Create(&permJob).Error
if err != nil {
s.logError("Error while saving job", "error", err)
Trace("Error while creating job", "error", err)
return err
}
case <-s.stopChan:
return
} else {
Trace("Error while creating job", "error", result.Error)
return result.Error
}
} else {
db.Unscoped().Model(&existingJob).Update("deleted_at", nil)
tx.Model(&existingJob.Scheduler).Select(
[]string{
"type",
"interval",
"spec",
"delay",
"event",
"time",
"executed",
}).UpdateColumns(SchedulerPersistence{
Type: "",
Interval: 0,
Spec: "",
Delay: 0,
Event: "",
Time: nil,
Executed: false,
})
err := tx.Model(&existingJob).Updates(permJob).Error
if err != nil {
return err
}
}
}()
wg.Wait()
tx.Model(&permJob.Stats).
Select(
[]string{
"run_count",
"success_count",
"error_count",
"time_metrics_avg_run_time",
"time_metrics_max_run_time",
"time_metrics_min_run_time",
"time_metrics_total_run_time",
},
).
UpdateColumns(permJob.Stats)
if tx.Error != nil {
Error("Error while updating job stats", "error", tx.Error)
return result.Error
}
return nil
}
for i, _ := range memLogs {
memLogs[i].LogID = 0
_ = tx.Create(&memLogs[i])
// no error handling, if it fails, it fails
}
// logError logs an error
func (s *DBSaver) logError(msg string, keysAndValues ...interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
return nil
if s.manager == nil || s.manager.logger == nil {
return
}
})
s.manager.logger.Error(msg, keysAndValues...)
}
// Stop stops the DBSaver
func (s *DBSaver) Stop() *DBSaver {
func (s *DBSaver) Stop() error {
s.mu.Lock()
defer s.mu.Unlock()
select {
case s.stopChan <- struct{}{}:
default:
s.logError("DBSaver stop channel is full")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
go func() {
s.stopChan <- struct{}{}
s.jobSaveProgress.Wait()
cancel()
}()
<-ctx.Done()
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
Error("DBSaver did not stop in time")
return ctx.Err()
}
return s
s.status = DBSaverStatusStopped
return nil
}
func exponentialBackoff(retry int) time.Duration {
waitTime := 100 * time.Millisecond
for i := 0; i < retry; i++ {
waitTime *= 2
waitTime += time.Duration(rand.Int63n(int64(waitTime))) // #nosec G404
}
return waitTime
}
// SaveJob saves a job to the database
func (s *DBSaver) SaveJob(job GenericJob) error {
s.mu.Lock()
defer s.mu.Unlock()
defer func() {
if r := recover(); r != nil {
Error("Error while saving job", "error", r)
}
s.mu.Unlock()
}()
if s.saveChannel == nil {
return ErrDBSaverNotInitialized
}
if s.status != DBSaverStatusRunning {
return ErrDBSaverNotRunning
}
defer func() {
if r := recover(); r != nil {
s.logError("Error while saving job", "error", r)
maxRetries := 5
for retries := maxRetries; retries > 0; retries-- {
select {
case s.saveChannel <- job:
return nil
default:
Error("DBSaver channel is full, dropping safe for job with ID", "job_id", job.GetID())
backoff := exponentialBackoff(maxRetries - retries)
Trace("DBSaver channel is full, retrying in", "backoff", backoff)
time.Sleep(backoff)
}
}()
select {
case s.saveChannel <- job:
default:
// if the channel is full, we just drop the job
// this is not ideal, but better than blocking
// the job queue
s.logError("DBSaver channel is full, dropping job with ID", "job_id", job.GetID())
}
return nil
return errors.New("failed to save job after multiple attempts")
}
func checkRunningSaver(s *DBSaver) (*gorm.DB, error) {
......@@ -280,7 +343,9 @@ func (s *DBSaver) DeleteJob(job GenericJob) error {
return err
}
s.jobSaveProgress.Add(1)
return db.Transaction(func(tx *gorm.DB) error {
defer s.jobSaveProgress.Done()
permJob := job.GetPersistence()
dbErr := tx.Where("job_id = ?", permJob.GetID()).Delete(&JobLog{}).Error
......@@ -313,7 +378,9 @@ func (s *DBSaver) ResetLogs(job GenericJob) error {
return err
}
s.jobSaveProgress.Add(1)
return db.Transaction(func(tx *gorm.DB) error {
defer s.jobSaveProgress.Done()
permJob := job.GetPersistence()
// unscoped because we want to delete the logs finally
......@@ -340,7 +407,7 @@ func (s *DBSaver) ResetStats(job GenericJob) error {
defer func() {
if r := recover(); r != nil {
s.logError("Error while saving job", "error", r)
Error("Error while saving job", "error", r)
}
}()
......@@ -349,7 +416,7 @@ func (s *DBSaver) ResetStats(job GenericJob) error {
select {
case s.saveChannel <- job:
default:
s.logError("DBSaver channel is full, dropping job with ID", "job_id", job.GetID())
Error("DBSaver channel is full, dropping job with ID", "job_id", job.GetID())
}
return nil
......
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
//go:build !runOnTask
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
"context"
"fmt"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/docker/go-connections/nat"
"github.com/stretchr/testify/assert"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/logger"
"log"
"net"
"os"
"runtime"
"strconv"
"sync"
"testing"
"time"
)
func startTestMySQLDockerImageAndContainer(t *testing.T, port string, ctx context.Context) error {
t.Helper()
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
return err
}
imageName := "mysql:8"
reader, err := cli.ImagePull(ctx, imageName, types.ImagePullOptions{})
if err != nil {
return err
}
// if debug image pull, comment out the following lines
//_, _ = io.Copy(os.Stdout, reader)
_ = reader
hostConfig := &container.HostConfig{
PortBindings: nat.PortMap{
"3306/tcp": []nat.PortBinding{
{
HostIP: DOCKER_TEST_HOST_IP,
HostPort: port,
},
},
},
}
resp, err := cli.ContainerCreate(ctx, &container.Config{
Image: imageName,
Env: []string{
"MYSQL_ROOT_PASSWORD=secret",
"MYSQL_USER=user",
"MYSQL_PASSWORD=secret",
"MYSQL_DATABASE=test",
},
}, hostConfig, nil, nil, "")
"github.com/stretchr/testify/assert"
"gorm.io/driver/sqlite"
)
//
//func startTestMySQLDockerImageAndContainer(t *testing.T, port string, ctx context.Context) error {
// t.Helper()
//
// cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
// if err != nil {
// return err
// }
//
// imageName := "mysql:8"
//
// reader, err := cli.ImagePull(ctx, imageName, types.ImagePullOptions{})
// if err != nil {
// return err
// }
//
// // if debug image pull, comment out the following lines
// //_, _ = io.Copy(os.Stdout, reader)
// _ = reader
//
// hostConfig := &container.HostConfig{
// PortBindings: nat.PortMap{
// "3306/tcp": []nat.PortBinding{
// {
// HostIP: DOCKER_TEST_HOST_IP,
// HostPort: port,
// },
// },
// },
// }
//
// resp, err := cli.ContainerCreate(ctx, &container.Config{
// Image: imageName,
// Env: []string{
// "MYSQL_ROOT_PASSWORD=secret",
// "MYSQL_USER=user",
// "MYSQL_PASSWORD=secret",
// "MYSQL_DATABASE=test",
// },
// }, hostConfig, nil, nil, "")
//
// if err != nil {
// return err
// }
//
// if err := cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil {
// return err
// }
//
// go func() {
// <-ctx.Done()
//
// timeout := 0
// stopOptions := container.StopOptions{
// Timeout: &timeout,
// Signal: "SIGKILL",
// }
// newCtx, _ := context.WithTimeout(context.Background(), 60*time.Second)
// if err := cli.ContainerStop(newCtx, resp.ID, stopOptions); err != nil {
// t.Errorf("ContainerStop returned error: %v", err)
// }
// if err := cli.ContainerRemove(newCtx, resp.ID, types.ContainerRemoveOptions{
// Force: true,
// }); err != nil {
// t.Errorf("ContainerRemove returned error: %v", err)
// }
//
// }()
//
// statusCh, errCh := cli.ContainerWait(ctx, resp.ID, container.WaitConditionNotRunning)
// select {
// case err := <-errCh:
// if err != nil {
// // empty error means container exited normally (see container_wait.go)
// if err.Error() == "" {
// return nil
// }
//
// return err
// }
// case <-statusCh:
//
// }
//
// return nil
//}
func TestSaveJobWithSQLite(t *testing.T) {
gormDB, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{})
if err != nil {
return err
}
if err := cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil {
return err
}
go func() {
<-ctx.Done()
timeout := 0
stopOptions := container.StopOptions{
Timeout: &timeout,
Signal: "SIGKILL",
}
newCtx, _ := context.WithTimeout(context.Background(), 60*time.Second)
if err := cli.ContainerStop(newCtx, resp.ID, stopOptions); err != nil {
t.Errorf("ContainerStop returned error: %v", err)
}
if err := cli.ContainerRemove(newCtx, resp.ID, types.ContainerRemoveOptions{
Force: true,
}); err != nil {
t.Errorf("ContainerRemove returned error: %v", err)
}
}()
statusCh, errCh := cli.ContainerWait(ctx, resp.ID, container.WaitConditionNotRunning)
select {
case err := <-errCh:
if err != nil {
// empty error means container exited normally (see container_wait.go)
if err.Error() == "" {
return nil
}
return err
}
case <-statusCh:
t.Fatalf("a error occurred while opening the database: %v", err)
}
return nil
}
func TestWriteToDB(t *testing.T) {
manager := &Manager{database: gormDB}
saver := NewDBSaver().SetManager(manager)
// if true, logging and port 3306 is used
useMySQLPort := os.Getenv("MYSQL_PORT")
//useMySQLPort = "3306"
printLogging := os.Getenv("MYSQL_LOGGING")
printLogging = "true"
// Starte den DBSaver
p := StartDBSaver(saver)
var err error
ready := make(chan struct{})
ctb := context.Background()
ctx, cancel := context.WithCancel(ctb)
t.Cleanup(func() {
cancel()
time.Sleep(1 * time.Second)
Then[bool, bool](p, func(value bool) (bool, error) {
close(ready)
return value, nil
}, func(e error) error {
close(ready)
Error("Error while starting db saver", "error", err)
return nil
})
listener, err := net.Listen("tcp", DOCKER_TEST_HOST_IP+":0")
if err != nil {
t.Errorf("Unexpected error: %v", err)
return
}
portAsInt := listener.Addr().(*net.TCPAddr).Port
portAsString := fmt.Sprintf("%d", portAsInt)
_ = listener.Close()
if useMySQLPort != "" {
portAsString = useMySQLPort
i, _ := strconv.Atoi(portAsString)
portAsInt = i
}
done := make(chan bool)
go func() {
err = startTestMySQLDockerImageAndContainer(t, portAsString, ctx)
if err != nil {
t.Errorf("Unexpected error: %v", err)
cancel()
}
done <- true
}()
waitCtx, waitCancel := context.WithTimeout(ctx, 60*time.Second)
defer waitCancel()
for {
conn, err := net.DialTimeout("tcp", net.JoinHostPort(DOCKER_TEST_HOST_IP, portAsString), 1*time.Second)
if err == nil {
err = conn.Close()
assert.Nil(t, err)
break
}
select {
case <-waitCtx.Done():
t.Error("Timeout waiting for container service")
cancel()
return
default:
time.Sleep(1 * time.Second)
}
}
dsn := "user:secret@tcp(" + net.JoinHostPort(DOCKER_TEST_HOST_IP, portAsString) + ")/test?charset=utf8mb4&parseTime=True&loc=Local"
<-ready
counter := 0
var db *gorm.DB
jobID := JobID("testJob")
job := NewJob[CounterResult](jobID, &CounterRunnable{})
time.Sleep(20 * time.Second)
err = saver.SaveJob(job)
assert.NoError(t, err)
var dbLogger logger.Interface
time.Sleep(100 * time.Millisecond)
if printLogging == "true" {
dbLogger = logger.New(
log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer
logger.Config{
SlowThreshold: time.Second, // Slow SQL threshold
LogLevel: logger.Info, // Log level
Colorful: false, // Disable color
},
)
} else {
saver.Stop()
dbLogger = logger.Default.LogMode(logger.Silent)
}
for counter < 20 {
db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
Logger: dbLogger,
})
if err == nil {
break
}
var count int64
gormDB.Model(&JobPersistence{}).Count(&count)
assert.Equal(t, int64(1), count, "It should be 1 job in the database")
counter++
time.Sleep(1 * time.Second)
}
if err != nil {
t.Errorf("Unexpected error: %v", err)
return
}
// get job from database
var jobFromDB JobPersistence
gormDB.First(&jobFromDB, "id = ?", jobID)
assert.Equal(t, jobID, jobFromDB.ID, "JobID should be the same")
var wg sync.WaitGroup
// run sub tests
t.Run("TestUpdateToDB", func(t *testing.T) {
wg.Add(1)
defer wg.Done()
mgr := NewManager()
mgr.SetDB(db)
//worker := NewLocalWorker(1)
//err := mgr.AddWorker(worker)
//assert.Nil(t, err)
//
//err = mgr.Start()
//assert.Nil(t, err)
dbSaver := NewDBSaver()
dbSaver.SetManager(mgr)
err = dbSaver.Start()
assert.Nil(t, err)
runner := &CounterRunnable{}
job := NewJob[CounterResult]("job1", runner)
scheduler := &TimeScheduler{
Time: time.Now().Add(1 * time.Second),
jobs: nil,
}
job.scheduler = scheduler
err = dbSaver.SaveJob(job)
assert.Nil(t, err)
//err = mgr.ScheduleJob(job, scheduler)
//assert.Nil(t, err)
err := dbSaver.SaveJob(job)
assert.Nil(t, err)
time.Sleep(1 * time.Second)
dbSaver.Stop()
})
// run sub tests
t.Run("TestAddToDB", func(t *testing.T) {
wg.Add(1)
defer wg.Done()
mgr := NewManager()
mgr.SetDB(db)
worker := NewLocalWorker(1)
err := mgr.AddWorker(worker)
assert.Nil(t, err)
err = mgr.Start()
assert.Nil(t, err)
runner := &CounterRunnable{}
job := NewJob[CounterResult]("job1", runner)
scheduler := &InstantScheduler{}
err = mgr.ScheduleJob(job, scheduler)
assert.Nil(t, err)
err = mgr.CancelJobSchedule("job1")
assert.Nil(t, err)
time.Sleep(1 * time.Second)
scheduler2 := &InstantScheduler{}
err = mgr.ScheduleJob(job, scheduler2)
assert.Nil(t, err)
err = mgr.CancelJobSchedule("job1")
assert.Nil(t, err)
time.Sleep(1 * time.Second)
scheduler3 := &InstantScheduler{}
err = mgr.ScheduleJob(job, scheduler3)
assert.Nil(t, err)
time.Sleep(1 * time.Second)
if mgr.dbSaver == nil {
t.Error("mgr.dbSaver == nil")
return
}
time.Sleep(1 * time.Second)
err = mgr.dbSaver.SaveJob(job)
assert.Nil(t, err)
runtime.Gosched()
time.Sleep(1 * time.Second)
err = mgr.CancelJobSchedule("job1")
assert.Nil(t, err)
runtime.Gosched()
time.Sleep(1 * time.Second)
scheduler4 := &InstantScheduler{}
err = mgr.ScheduleJob(job, scheduler4)
assert.Nil(t, err)
runtime.Gosched()
time.Sleep(1 * time.Second)
err = mgr.dbSaver.SaveJob(job)
assert.Nil(t, err)
time.Sleep(2 * time.Second)
err = mgr.CancelJobSchedule("job1")
assert.Nil(t, err)
tries := 10
for tries > 0 {
var tmpJob JobPersistence
if err := db.First(&tmpJob, "id = ?", "job1").Error; err == nil {
break
}
tries--
time.Sleep(1 * time.Second)
}
assert.True(t, tries > 0)
err = LoadJobsAndScheduleFromDatabase(db, mgr)
assert.Nil(t, err)
time.Sleep(1 * time.Second)
err = mgr.dbSaver.SaveJob(job)
assert.Nil(t, err)
time.Sleep(1 * time.Second)
})
t.Run("TestDeleteJob", func(t *testing.T) {
wg.Add(1)
defer wg.Done()
mgr := NewManager()
mgr.SetDB(db)
worker := NewLocalWorker(1)
err := mgr.AddWorker(worker)
assert.Nil(t, err)
err = mgr.Start()
assert.Nil(t, err)
runner := &CounterRunnable{}
job := NewJob[CounterResult]("job2", runner)
scheduler := &InstantScheduler{}
err = mgr.ScheduleJob(job, scheduler)
assert.Nil(t, err)
time.Sleep(1 * time.Second)
err = mgr.DeleteJob(job.GetID())
assert.Nil(t, err)
// test is job in database
var tmpJob JobPersistence
err = db.First(&tmpJob, "id = ?", "job2").Error
assert.NotNil(t, err)
})
wg.Add(1)
t.Run("ResetStats", func(t *testing.T) {
defer wg.Done()
mgr := NewManager()
mgr.SetDB(db)
worker := NewLocalWorker(1)
err := mgr.AddWorker(worker)
assert.Nil(t, err)
err = mgr.Start()
assert.Nil(t, err)
runner := &CounterRunnable{}
job := NewJob[CounterResult]("job3", runner)
job.stats = JobStats{
JobID: job.GetID(),
RunCount: 20,
SuccessCount: 30,
ErrorCount: 40,
}
scheduler := &InstantScheduler{}
err = mgr.ScheduleJob(job, scheduler)
assert.Nil(t, err)
time.Sleep(200 * time.Millisecond)
// check is stats are the values above
var tmpJob JobPersistence
err = db.Preload("Stats").First(&tmpJob, "id = ?", "job3").Error
assert.Nil(t, err)
// Validate the fields
assert.Equal(t, JobID("job3"), tmpJob.ID)
assert.Equal(t, 21, tmpJob.Stats.RunCount) // +1 because of the first run
assert.Equal(t, 31, tmpJob.Stats.SuccessCount) // +1 because of the first run
assert.Equal(t, 40, tmpJob.Stats.ErrorCount)
// reset stats
err = mgr.ResetJobStats(job.GetID())
assert.Nil(t, err)
time.Sleep(2 * time.Second)
var tmpJob2 JobPersistence
// check is stats are the values above
err = db.First(&tmpJob2, "id = ?", "job3").Error
err = db.Preload("Stats").First(&tmpJob2, "id = ?", "job3").Error
assert.Nil(t, err)
// Validate the fields
assert.Equal(t, JobID("job3"), tmpJob2.ID)
assert.Equal(t, 0, tmpJob2.Stats.RunCount)
assert.Equal(t, 0, tmpJob2.Stats.SuccessCount)
assert.Equal(t, 0, tmpJob2.Stats.ErrorCount)
err = mgr.DeleteJob(job.GetID())
assert.Nil(t, err)
})
wg.Wait()
cancel()
select {
case <-done:
time.Sleep(1 * time.Second)
case <-time.After(1 * time.Minute):
t.Error("test hangs, timeout reached")
}
}
......@@ -3,11 +3,11 @@
"devenv": {
"locked": {
"dir": "src/modules",
"lastModified": 1702549996,
"narHash": "sha256-mEN+8gjWUXRxBCcixeth+jlDNuzxbpFwZNOEc4K22vw=",
"lastModified": 1710144971,
"narHash": "sha256-CjTOdoBvT/4AQncTL20SDHyJNgsXZjtGbz62yDIUYnM=",
"owner": "cachix",
"repo": "devenv",
"rev": "e681a99ffe2d2882f413a5d771129223c838ddce",
"rev": "6c0bad0045f1e1802f769f7890f6a59504825f4d",
"type": "github"
},
"original": {
......@@ -20,11 +20,11 @@
"flake-compat": {
"flake": false,
"locked": {
"lastModified": 1673956053,
"narHash": "sha256-4gtG9iQuiKITOjNQQeQIpoIB6b16fm+504Ch3sNKLd8=",
"lastModified": 1696426674,
"narHash": "sha256-kvjfFW7WAETZlt09AgDn1MrtKzP7t90Vf7vypd3OL1U=",
"owner": "edolstra",
"repo": "flake-compat",
"rev": "35bb57c0c8d8b62bbfd284272c928ceb64ddbde9",
"rev": "0f9255e01c2351cc7d116c072cb317785dd33b33",
"type": "github"
},
"original": {
......@@ -38,11 +38,11 @@
"systems": "systems"
},
"locked": {
"lastModified": 1685518550,
"narHash": "sha256-o2d0KcvaXzTrPRIo0kOLV0/QXHhDQ5DTi+OxcjO8xqY=",
"lastModified": 1701680307,
"narHash": "sha256-kAuep2h5ajznlPMD9rnQyffWG8EM/C73lejGofXvdM8=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "a1720a10a6cfe8234c0e93907ffe81be440f4cef",
"rev": "4022d587cbbfd70fe950c1e2083a02621806a725",
"type": "github"
},
"original": {
......@@ -59,11 +59,11 @@
]
},
"locked": {
"lastModified": 1660459072,
"narHash": "sha256-8DFJjXG8zqoONA1vXtgeKXy68KdJL5UaXR8NtVMUbx8=",
"lastModified": 1703887061,
"narHash": "sha256-gGPa9qWNc6eCXT/+Z5/zMkyYOuRZqeFZBDbopNZQkuY=",
"owner": "hercules-ci",
"repo": "gitignore.nix",
"rev": "a20de23b925fd8264fd7fad6454652e142fd7f73",
"rev": "43e1aa1308018f37118e34d3a9cb4f5e75dc11d5",
"type": "github"
},
"original": {
......@@ -74,11 +74,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1702346276,
"narHash": "sha256-eAQgwIWApFQ40ipeOjVSoK4TEHVd6nbSd9fApiHIw5A=",
"lastModified": 1710162809,
"narHash": "sha256-i2R2bcnQp+85de67yjgZVvJhd6rRnJbSYNpGmB6Leb8=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "cf28ee258fd5f9a52de6b9865cdb93a1f96d09b7",
"rev": "ddcd7598b2184008c97e6c9c6a21c5f37590b8d2",
"type": "github"
},
"original": {
......@@ -90,32 +90,32 @@
},
"nixpkgs-stable": {
"locked": {
"lastModified": 1685801374,
"narHash": "sha256-otaSUoFEMM+LjBI1XL/xGB5ao6IwnZOXc47qhIgJe8U=",
"lastModified": 1704874635,
"narHash": "sha256-YWuCrtsty5vVZvu+7BchAxmcYzTMfolSPP5io8+WYCg=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "c37ca420157f4abc31e26f436c1145f8951ff373",
"rev": "3dc440faeee9e889fe2d1b4d25ad0f430d449356",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-23.05",
"ref": "nixos-23.11",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs_2": {
"locked": {
"lastModified": 1702350026,
"narHash": "sha256-A+GNZFZdfl4JdDphYKBJ5Ef1HOiFsP18vQe9mqjmUis=",
"lastModified": 1710162809,
"narHash": "sha256-i2R2bcnQp+85de67yjgZVvJhd6rRnJbSYNpGmB6Leb8=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "9463103069725474698139ab10f17a9d125da859",
"rev": "ddcd7598b2184008c97e6c9c6a21c5f37590b8d2",
"type": "github"
},
"original": {
"id": "nixpkgs",
"ref": "nixos-23.05",
"ref": "nixos-23.11",
"type": "indirect"
}
},
......@@ -130,11 +130,11 @@
"nixpkgs-stable": "nixpkgs-stable"
},
"locked": {
"lastModified": 1702456155,
"narHash": "sha256-I2XhXGAecdGlqi6hPWYT83AQtMgL+aa3ulA85RAEgOk=",
"lastModified": 1708018599,
"narHash": "sha256-M+Ng6+SePmA8g06CmUZWi1AjG2tFBX9WCXElBHEKnyM=",
"owner": "cachix",
"repo": "pre-commit-hooks.nix",
"rev": "007a45d064c1c32d04e1b8a0de5ef00984c419bc",
"rev": "5df5a70ad7575f6601d91f0efec95dd9bc619431",
"type": "github"
},
"original": {
......@@ -171,11 +171,11 @@
"nixpkgs": "nixpkgs_2"
},
"locked": {
"lastModified": 1700695799,
"narHash": "sha256-nXRhRE69kULaNxijX7ZF14pGSu6Ar/FIvfKCIut7OXc=",
"lastModified": 1704542622,
"narHash": "sha256-HnFuaOXHoxv8tpBvMsEjfhcl/hFNxEY7GbBqoyJ1U8U=",
"ref": "refs/heads/master",
"rev": "fdcc60bfd3642207e50e8e6c89c0a9a7b27a40a9",
"revCount": 41,
"rev": "6b4f85fe6d934429cf3055bbcd8cf15014730118",
"revCount": 114,
"type": "git",
"url": "https://gitlab.schukai.com/oss/utilities/version.git"
},
......
......@@ -8,47 +8,50 @@ require (
github.com/docker/go-connections v0.4.0
github.com/fsnotify/fsnotify v1.7.0
github.com/go-chi/chi/v5 v5.0.10
github.com/google/uuid v1.5.0
github.com/google/uuid v1.6.0
github.com/pkg/sftp v1.13.6
github.com/robfig/cron/v3 v3.0.1
github.com/shirou/gopsutil/v3 v3.23.11
github.com/shirou/gopsutil/v3 v3.24.2
github.com/stretchr/testify v1.8.4
go.uber.org/zap v1.26.0
golang.org/x/crypto v0.17.0
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.21.0
gopkg.in/yaml.v3 v3.0.1
gorm.io/driver/mysql v1.5.2
gorm.io/gorm v1.25.5
gorm.io/driver/mysql v1.5.4
gorm.io/gorm v1.25.7
)
require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/distribution/reference v0.5.0 // indirect
github.com/docker/distribution v2.8.3+incompatible // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect
github.com/go-sql-driver/mysql v1.7.1 // indirect
github.com/go-sql-driver/mysql v1.8.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/kr/fs v0.1.0 // indirect
github.com/lufia/plan9stats v0.0.0-20231016141302-07b5767bb0ed // indirect
github.com/lufia/plan9stats v0.0.0-20240226150601-1dcf7310316a // indirect
github.com/mattn/go-sqlite3 v1.14.17 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/tklauser/go-sysconf v0.3.13 // indirect
github.com/tklauser/numcpus v0.7.0 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.6.0 // indirect
gorm.io/driver/sqlite v1.5.5 // indirect
gotest.tools/v3 v3.5.1 // indirect
)
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
......@@ -26,6 +28,8 @@ github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW
github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=
github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/go-sql-driver/mysql v1.8.0 h1:UtktXaU2Nb64z/pLiGIxY4431SJ4/dR5cjMmlVHgnT4=
github.com/go-sql-driver/mysql v1.8.0/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
......@@ -36,6 +40,8 @@ github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
......@@ -47,6 +53,10 @@ github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/lufia/plan9stats v0.0.0-20231016141302-07b5767bb0ed h1:036IscGBfJsFIgJQzlui7nK1Ncm0tp2ktmPj8xO4N/0=
github.com/lufia/plan9stats v0.0.0-20231016141302-07b5767bb0ed/go.mod h1:ilwx/Dta8jXAgpFYFvSWEMwxmbWXyiUHkd5FwyKhb5k=
github.com/lufia/plan9stats v0.0.0-20240226150601-1dcf7310316a h1:3Bm7EwfUQUvhNeKIkUct/gl9eod1TcXuj8stxvi/GoI=
github.com/lufia/plan9stats v0.0.0-20240226150601-1dcf7310316a/go.mod h1:ilwx/Dta8jXAgpFYFvSWEMwxmbWXyiUHkd5FwyKhb5k=
github.com/mattn/go-sqlite3 v1.14.17 h1:mCRHCLDUBXgpKAqIKsaAaAsrAlbkeomtRFKXh2L6YIM=
github.com/mattn/go-sqlite3 v1.14.17/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=
github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y=
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
......@@ -64,6 +74,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b h1:0LFwY6Q3gMACTjAbMZBjXAqTOzOwFaj2Ld6cjeQ7Rig=
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 h1:o4JXh1EVt9k/+g42oCprj/FisM4qX9L3sZB3upGN2ZU=
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/shirou/gopsutil/v3 v3.23.9 h1:ZI5bWVeu2ep4/DIxB4U9okeYJ7zp/QLTO4auRb/ty/E=
......@@ -72,6 +84,8 @@ github.com/shirou/gopsutil/v3 v3.23.10 h1:/N42opWlYzegYaVkWejXWJpbzKv2JDy3mrgGzK
github.com/shirou/gopsutil/v3 v3.23.10/go.mod h1:JIE26kpucQi+innVlAUnIEOSBhBUkirr5b44yr55+WE=
github.com/shirou/gopsutil/v3 v3.23.11 h1:i3jP9NjCPUz7FiZKxlMnODZkdSIp2gnzfrvsu9CuWEQ=
github.com/shirou/gopsutil/v3 v3.23.11/go.mod h1:1FrWgea594Jp7qmjHUUPlJDTPgcsb9mGnXDxavtikzM=
github.com/shirou/gopsutil/v3 v3.24.2 h1:kcR0erMbLg5/3LcInpw0X/rrPSqq4CDPyI6A6ZRC18Y=
github.com/shirou/gopsutil/v3 v3.24.2/go.mod h1:tSg/594BcA+8UdQU2XcW803GWYgdtauFFPgJCJKZlVk=
github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM=
github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ=
github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU=
......@@ -96,11 +110,16 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw=
github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
......@@ -112,6 +131,8 @@ golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY=
golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
......@@ -126,6 +147,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug
golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
......@@ -150,11 +173,15 @@ golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek=
golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4=
golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
......@@ -179,8 +206,15 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/driver/mysql v1.5.2 h1:QC2HRskSE75wBuOxe0+iCkyJZ+RqpudsQtqkp+IMuXs=
gorm.io/driver/mysql v1.5.2/go.mod h1:pQLhh1Ut/WUAySdTHwBpBv6+JKcj+ua4ZFx1QQTBzb8=
gorm.io/driver/mysql v1.5.4 h1:igQmHfKcbaTVyAIHNhhB888vvxh8EdQ2uSUT0LPcBso=
gorm.io/driver/mysql v1.5.4/go.mod h1:9rYxJph/u9SWkWc9yY4XJ1F/+xO0S/ChOmbk3+Z5Tvs=
gorm.io/driver/sqlite v1.5.5 h1:7MDMtUZhV065SilG62E0MquljeArQZNfJnjd9i9gx3E=
gorm.io/driver/sqlite v1.5.5/go.mod h1:6NgQ7sQWAIFsPrJJl1lSNSu2TABh0ZZ/zm5fosATavE=
gorm.io/gorm v1.25.2-0.20230530020048-26663ab9bf55/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k=
gorm.io/gorm v1.25.5 h1:zR9lOiiYf09VNh5Q1gphfyia1JpiClIWG9hQaxB/mls=
gorm.io/gorm v1.25.5/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
gorm.io/gorm v1.25.7-0.20240204074919-46816ad31dde/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
gorm.io/gorm v1.25.7 h1:VsD6acwRjz2zFxGO50gPO6AkNs7KKnvfzUjHQhZDz/A=
gorm.io/gorm v1.25.7/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import "go.uber.org/zap"
import (
"go.uber.org/zap"
"log"
"os"
"sync"
)
// Logger interface, that your library's users have to implement
type Logger interface {
Info(msg string, keysAndValues ...interface{})
Warn(msg string, keysAndValues ...interface{})
Error(msg string, keysAndValues ...interface{})
Trace(msg string, keysAndValues ...interface{})
}
type DefaultLogger struct {
*log.Logger
}
func (l *DefaultLogger) Info(msg string, keysAndValues ...interface{}) {
_ = l.Output(2, "INFO: "+msg)
}
func (l *DefaultLogger) Error(msg string, keysAndValues ...interface{}) {
_ = l.Output(2, "ERROR: "+msg)
}
func (l *DefaultLogger) Warn(msg string, keysAndValues ...interface{}) {
_ = l.Output(2, "WARN: "+msg)
}
func (l *DefaultLogger) Trace(msg string, keysAndValues ...interface{}) {
_ = l.Output(2, "DEBUG: "+msg)
}
// Ensure DefaultLogger satisfies the Logger interface.
var _ Logger = (*DefaultLogger)(nil)
var (
internalLogger Logger
mu sync.Mutex
)
func init() {
internalLogger = &DefaultLogger{
Logger: log.New(os.Stderr, "", log.LstdFlags),
}
}
// SetLogger sets a custom logger
func SetLogger(l Logger) {
mu.Lock()
defer mu.Unlock()
internalLogger = l
}
// Info logs an informational message to the current logger
func Info(msg string, keysAndValues ...interface{}) {
mu.Lock()
defer mu.Unlock()
if internalLogger == nil {
return
}
internalLogger.Info(msg, keysAndValues...)
}
// Error logs an error message to the current logger
func Error(msg string, keysAndValues ...interface{}) {
mu.Lock()
defer mu.Unlock()
if internalLogger == nil {
return
}
internalLogger.Error(msg, keysAndValues...)
}
// Warn logs a warning message to the current logger
func Warn(msg string, keysAndValues ...interface{}) {
mu.Lock()
defer mu.Unlock()
if internalLogger == nil {
return
}
internalLogger.Warn(msg, keysAndValues...)
}
func Trace(msg string, keysAndValues ...interface{}) {
mu.Lock()
defer mu.Unlock()
if internalLogger == nil {
return
}
internalLogger.Trace(msg, keysAndValues...)
}
type ZapAdapter struct {
logger *zap.Logger
mu sync.Mutex
}
func (z *ZapAdapter) Info(msg string, keysAndValues ...interface{}) {
z.logger.Info(msg, zap.Any("info", keysAndValues))
func NewZapAdapter(logger *zap.Logger) *ZapAdapter {
return &ZapAdapter{
logger: logger,
}
}
func (z *ZapAdapter) Error(msg string, keysAndValues ...interface{}) {
z.logger.Error(msg, zap.Any("error", keysAndValues))
func (l *ZapAdapter) Info(msg string, keysAndValues ...interface{}) {
if l.logger == nil {
return
}
l.logger.Info(msg, zap.Any("Info", keysAndValues))
}
func (l *ZapAdapter) Warn(msg string, keysAndValues ...interface{}) {
if l.logger == nil {
return
}
l.logger.Warn(msg, zap.Any("Warn", keysAndValues))
}
func (l *ZapAdapter) Error(msg string, keysAndValues ...interface{}) {
if l.logger == nil {
return
}
l.logger.Error(msg, zap.Any("Error", keysAndValues))
}
func (l *ZapAdapter) Trace(msg string, keysAndValues ...interface{}) {
if l.logger == nil {
return
}
l.logger.Debug(msg, zap.Any("Debug", keysAndValues))
}
// Ensure ZapAdapter satisfies the Logger interface.
var _ Logger = (*ZapAdapter)(nil)
// SetZapLogger sets a custom logger
func SetZapLogger(logger *zap.Logger) {
mu.Lock()
defer mu.Unlock()
internalLogger = NewZapAdapter(logger)
}
package jobqueue
import (
"bytes"
"log"
"strings"
"testing"
)
type MockLogger struct {
Logs []string
}
func (l *MockLogger) Info(msg string, keysAndValues ...interface{}) {
l.Logs = append(l.Logs, "INFO: "+msg)
}
func (l *MockLogger) Warn(msg string, keysAndValues ...interface{}) {
l.Logs = append(l.Logs, "WARN: "+msg)
}
func (l *MockLogger) Error(msg string, keysAndValues ...interface{}) {
l.Logs = append(l.Logs, "ERROR: "+msg)
}
func (l *MockLogger) Trace(msg string, keysAndValues ...interface{}) {
l.Logs = append(l.Logs, "DEBUG: "+msg)
}
func TestSetLogger(t *testing.T) {
mockLogger := &MockLogger{}
SetLogger(mockLogger)
}
func TestInfo(t *testing.T) {
mockLogger := &MockLogger{}
SetLogger(mockLogger)
Info("test info")
if len(mockLogger.Logs) != 1 || !strings.HasPrefix(mockLogger.Logs[0], "INFO:") {
t.Errorf("Info log not as expected")
}
}
func TestError(t *testing.T) {
mockLogger := &MockLogger{}
SetLogger(mockLogger)
Error("test error")
if len(mockLogger.Logs) != 1 || !strings.HasPrefix(mockLogger.Logs[0], "ERROR:") {
t.Errorf("Error log not as expected")
}
}
func TestWarn(t *testing.T) {
mockLogger := &MockLogger{}
SetLogger(mockLogger)
Warn("test warning")
if len(mockLogger.Logs) != 1 || !strings.HasPrefix(mockLogger.Logs[0], "WARN:") {
t.Errorf("Warn log not as expected")
}
}
func TestDebug(t *testing.T) {
mockLogger := &MockLogger{}
SetLogger(mockLogger)
Trace("test debug")
if len(mockLogger.Logs) != 1 || !strings.HasPrefix(mockLogger.Logs[0], "DEBUG:") {
t.Errorf("Debug log not as expected")
}
}
func TestDefaultLogger(t *testing.T) {
buffer := bytes.NewBufferString("")
SetLogger(&DefaultLogger{Logger: log.New(buffer, "", log.LstdFlags)})
message := "Hello World"
Info(message)
if !strings.Contains(buffer.String(), message) {
t.Errorf("Expected %v in the output logs", message)
}
}
......@@ -4,6 +4,7 @@
package jobqueue
import (
"errors"
"fmt"
"github.com/fsnotify/fsnotify"
"github.com/robfig/cron/v3"
......@@ -31,7 +32,7 @@ type Manager struct {
jobEventCh chan interface{}
cronInstance *cron.Cron
logger Logger
//logger Logger
database *gorm.DB
dbSaver *DBSaver
......@@ -400,12 +401,20 @@ func (m *Manager) Start() error {
}
if m.dbSaver != nil {
err = m.dbSaver.Start()
if err != nil {
if m.logger != nil {
m.logger.Error("Error while starting db saver", "error", err)
}
}
p := StartDBSaver(m.dbSaver)
ready := make(chan struct{})
Then[bool, bool](p, func(value bool) (bool, error) {
close(ready)
return value, nil
}, func(e error) error {
close(ready)
Error("Error while starting db saver", "error", err)
return nil
})
<-ready
}
if len(m.workerMap) == 0 {
......@@ -416,7 +425,7 @@ func (m *Manager) Start() error {
for _, worker := range m.workerMap {
err := worker.Start()
if err != nil && err != ErrWorkerAlreadyRunning {
if err != nil && !errors.Is(err, ErrWorkerAlreadyRunning) {
if wrappedErr == nil {
wrappedErr = fmt.Errorf("Error: ")
}
......@@ -501,15 +510,12 @@ func (m *Manager) Stop() error {
func (m *Manager) SetLogger(logger Logger) *Manager {
m.mu.Lock()
defer m.mu.Unlock()
m.logger = logger
return m
}
// GetLogger returns the logger
func (m *Manager) GetLogger() Logger {
m.mu.Lock()
defer m.mu.Unlock()
return m.logger
if m.database != nil {
m.database = m.database.Session(&gorm.Session{Logger: newGormAdapter()})
}
return m
}
// RunJob runs a job immediately and does not schedule it
......@@ -606,23 +612,18 @@ func (m *Manager) handleJobEvents() {
switch event := event.(type) {
case Event:
if m.logger != nil {
m.logger.Info("Event received", "event", event.Name, "data", event.Data, "message_id", event.MessageID)
}
Info("Event received", "event", event.Name, "data", event.Data, "message_id", event.MessageID)
switch event.Name {
case QueueJob:
if m.logger != nil {
m.logger.Info("Job queued", "job_id", event.Data.(GenericJob).GetID())
}
Info("Job queued", "job_id", event.Data.(GenericJob).GetID())
job := event.Data.(GenericJob)
err := m.queue.Enqueue(job)
if err != nil && err != ErrJobAlreadyExists {
if m.logger != nil {
m.logger.Error("Error while queueing job", "error", err)
}
Error("Error while queueing job", "error", err)
}
case JobReady:
......@@ -633,9 +634,7 @@ func (m *Manager) handleJobEvents() {
break
}
if m.logger != nil {
m.logger.Info("Job ready", "job_id", nextJob.GetID())
}
Info("Job ready", "job_id", nextJob.GetID())
assigned := false
maxTries := 10
......@@ -658,9 +657,8 @@ func (m *Manager) handleJobEvents() {
}
if !assigned {
if m.logger != nil {
m.logger.Info("No worker available for job", "job_id", nextJob.GetID())
}
Info("No worker available for job", "job_id", nextJob.GetID())
} else {
if nextJob.GetScheduler() != nil {
......@@ -682,9 +680,8 @@ func (m *Manager) handleJobEvents() {
err := m.CancelJobSchedule(job.GetID())
if err != nil {
if m.logger != nil {
m.logger.Error("Error while canceling job schedule", "error", err)
}
Error("Error while canceling job schedule", "error", err)
}
}
}
......
package jobqueue
import (
"sync"
)
type Promise[T any] struct {
isDone bool
value T
err error
mux sync.Mutex
done chan struct{}
resolve func(T)
reject func(error)
}
func NewPromise[T any](fn func(resolve func(T), reject func(error))) *Promise[T] {
p := &Promise[T]{
done: make(chan struct{}),
}
p.resolve = func(value T) {
p.mux.Lock()
defer p.mux.Unlock()
if p.isDone {
return
}
p.isDone = true
p.value = value
close(p.done)
}
p.reject = func(err error) {
p.mux.Lock()
defer p.mux.Unlock()
if p.isDone {
return
}
p.isDone = true
p.err = err
close(p.done)
}
go fn(p.resolve, p.reject)
return p
}
func Then[T, U any](p *Promise[T], onFulfilled func(T) (U, error), onRejected func(error) error) *Promise[U] {
return NewPromise[U](func(resolve func(U), reject func(error)) {
<-p.done
p.mux.Lock()
defer p.mux.Unlock()
if p.err != nil {
err := onRejected(p.err)
reject(err)
} else {
go func() {
res, err := onFulfilled(p.value)
if err != nil {
reject(err)
} else {
resolve(res)
}
}()
}
})
}
func Catch[T any](p *Promise[T], onRejected func(error) error) *Promise[T] {
return Then[T, T](p, func(value T) (T, error) {
return value, nil
}, onRejected)
}
package jobqueue
import (
"errors"
"strconv"
"sync"
"testing"
"time"
)
func TestResolve(t *testing.T) {
p := NewPromise[int](func(resolve func(int), reject func(error)) {
time.Sleep(time.Second)
resolve(42)
})
wg := sync.WaitGroup{}
wg.Add(1)
Then(p, func(value int) (string, error) {
wg.Done()
return "ok", nil
}, func(e error) error {
t.Fail()
return nil
})
wg.Wait()
}
func TestReject(t *testing.T) {
p := NewPromise[int](func(resolve func(int), reject func(error)) {
reject(errors.New("rejected"))
})
wg := sync.WaitGroup{}
wg.Add(1)
Then(p, func(value int) (string, error) {
t.Fail()
return "", errors.New("should not be called")
}, func(e error) error {
if e.Error() != "rejected" {
t.Fail()
}
wg.Done()
return nil
})
wg.Wait()
}
func TestCatch(t *testing.T) {
p := NewPromise[int](func(resolve func(int), reject func(error)) {
reject(errors.New("catch error"))
})
wg := sync.WaitGroup{}
wg.Add(1)
Catch(p, func(e error) error {
if e.Error() != "catch error" {
t.Fail()
}
wg.Done()
return nil
})
wg.Wait()
}
func TestChaining(t *testing.T) {
p := NewPromise[int](func(resolve func(int), reject func(error)) {
resolve(42)
})
wg := sync.WaitGroup{}
wg.Add(1)
p2 := Then(p, func(value int) (string, error) {
return "ok", nil
}, func(e error) error {
t.Fail()
return nil
})
Catch(p2, func(e error) error {
t.Fail()
return nil
})
Then(p2, func(value string) (int, error) {
wg.Done()
return 1, nil
}, func(e error) error {
t.Fail()
return nil
})
wg.Wait()
}
func TestChainingWithMultipleThen(t *testing.T) {
p := NewPromise[int](func(resolve func(int), reject func(error)) {
resolve(42)
})
wg := sync.WaitGroup{}
wg.Add(1)
p2 := Then(p, func(value int) (string, error) {
return "ok", nil
}, func(e error) error {
t.Fail()
return nil
})
p3 := Then(p2, func(value string) (int, error) {
wg.Done()
return 1, nil
}, func(e error) error {
t.Fail()
return nil
})
Catch(p3, func(e error) error {
t.Fail()
return nil
})
wg.Wait()
}
func TestThenWithManyInputs(t *testing.T) {
// Testfälle mit verschiedenen Eingabetypen und -werten
testCases := []struct {
name string
input interface{}
expected interface{}
}{
// Erfolgreiche Versprechen mit verschiedenen Datentypen
{"test int", 10, 10},
{"test string", "Hallo Welt", "Hallo Welt"},
{"test bool", true, true},
{"test struct", struct{ Name string }{Name: "Bard"}, struct{ Name string }{Name: "Bard"}},
// Erfolgreiche Versprechen mit komplexen Werten
{"test int array", []int{}, []int{}},
{"test string map", map[string]string{}, map[string]string{}},
{"test struct pointer", &struct{ Name string }{Name: "Bard"}, &struct{ Name string }{Name: "Bard"}},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
p := NewPromise[any](func(resolve func(interface{}), reject func(error)) {
resolve(tc.input)
})
wg := &sync.WaitGroup{}
wg.Add(1)
Then(p, func(value any) (interface{}, error) {
switch value.(type) {
case int:
exp := tc.expected.(int)
if value.(int) != exp {
t.Fail()
}
case string:
exp := tc.expected.(string)
if value != exp {
t.Fail()
}
case bool:
exp := tc.expected.(bool)
if value != exp {
t.Fail()
}
case struct{ Name string }:
exp := tc.expected.(struct{ Name string })
if value != exp {
t.Fail()
}
case []int:
exp := tc.expected.([]int)
if len(value.([]int)) != len(exp) {
t.Fail()
}
case map[string]string:
exp := tc.expected.(map[string]string)
if len(value.(map[string]string)) != len(exp) {
t.Fail()
}
case *struct{ Name string }:
exp := tc.expected.(*struct{ Name string })
convertValue := value.(*struct{ Name string })
if convertValue.Name != exp.Name {
t.Fail()
}
default:
t.Fail()
}
wg.Done()
return nil, nil
}, func(e error) error {
t.Fail()
return nil
})
wg.Wait()
})
}
}
func TestPromiseSuccess(t *testing.T) {
promise := NewPromise[int](func(resolve func(int), reject func(error)) {
time.Sleep(100 * time.Millisecond)
resolve(42)
})
resultPromise := Then(promise, func(value int) (string, error) {
return "Value is: " + strconv.Itoa(value), nil
}, func(err error) error {
return err
})
<-resultPromise.done
if resultPromise.err != nil {
t.Errorf("Promise was rejected with error: %v", resultPromise.err)
}
if resultPromise.value != "Value is: 42" {
t.Errorf("Promise was resolved with incorrect value: %v", resultPromise.value)
}
}
func TestPromiseFailure(t *testing.T) {
expectedError := errors.New("something went wrong")
promise := NewPromise[int](func(resolve func(int), reject func(error)) {
time.Sleep(100 * time.Millisecond)
reject(expectedError)
})
resultPromise := Catch(promise, func(err error) error {
if err != expectedError {
t.Errorf("Expected error to be %v, but got %v", expectedError, err)
}
return err // Should return the error to keep the promise chain accurate.
})
<-resultPromise.done
if resultPromise.err != expectedError {
t.Errorf("Expected error to be %v, but got %v", expectedError, resultPromise.err)
}
}
// Der dritte Testfall kann unverändert bleiben
func TestPromiseChain(t *testing.T) {
promise := NewPromise[int](func(resolve func(int), reject func(error)) {
time.Sleep(100 * time.Millisecond)
resolve(1)
})
firstResultPromise := Then(promise, func(value int) (int, error) {
return value + 1, nil
}, func(err error) error {
return err
})
resultPromise := Then(firstResultPromise, func(value int) (int, error) {
return value * 2, nil
}, func(err error) error {
return err
})
<-resultPromise.done
if resultPromise.err != nil {
t.Errorf("Promise was rejected with error: %v", resultPromise.err)
}
if resultPromise.value != 4 {
t.Errorf("Promise was resolved with incorrect value: %v, expected %v", resultPromise.value, 4)
}
}
func BenchmarkPromise(b *testing.B) {
for i := 0; i < b.N; i++ {
promise := NewPromise[int](func(resolve func(int), reject func(error)) {
// Simulieren Sie eine Aufgabe, die einige Zeit in Anspruch nimmt
time.Sleep(10 * time.Millisecond)
resolve(42)
})
<-promise.done // Warten, bis das Promise-Objekt vollständig gelöst ist
if promise.err != nil {
b.Fatal("Promise was rejected with error:", promise.err)
}
}
}
func FuzzPromise(f *testing.F) {
testCases := []struct {
value int
shouldErr bool
}{
{value: 42, shouldErr: false},
{value: -1, shouldErr: true},
}
for _, tc := range testCases {
f.Add(tc.value, tc.shouldErr) // Add test cases to the Fuzz function
}
f.Fuzz(func(t *testing.T, value int, shouldErr bool) {
done := make(chan struct{})
promise := NewPromise[int](func(resolve func(int), reject func(error)) {
if shouldErr {
reject(errors.New("error"))
} else {
resolve(value)
}
})
Then(promise, func(data int) (int, error) {
if data != value {
t.Errorf("Expected result to be %v, got %v", value, data)
}
close(done)
return data, nil
}, func(err error) error {
if !shouldErr {
t.Errorf("Expected no error, but got: %v", err)
}
close(done)
return err // In real case, we'd expect some error handling here.
})
<-done // Wait until Then is resolved or rejected
if shouldErr && promise.err == nil {
t.Errorf("Expected an error but did not get one")
}
if promise.err != nil && !shouldErr {
t.Errorf("Did not expect an error but got one: %v", promise.err)
}
})
}