Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • oss/libraries/go/services/job-queues
1 result
Select Git revision
Show changes
Commits on Source (5)
Showing with 570 additions and 493 deletions
......@@ -20,16 +20,20 @@ func TestWriteToDB1(t *testing.T) {
t.Fatalf("a error occurred while opening the database: %v", err)
}
gormDB.Logger = gormDB.Logger.LogMode(4)
manager := &Manager{database: gormDB}
saver := NewDBSaver().SetManager(manager)
// Starte den DBSaver
p := StartDBSaver(saver)
promise := CreateAndStartJobSyncer(manager)
ready := make(chan struct{})
Then[bool, bool](p, func(value bool) (bool, error) {
var saver *JobSyncer
Then[*JobSyncer, *JobSyncer](promise, func(value *JobSyncer) (*JobSyncer, error) {
close(ready)
saver = value
return value, nil
}, func(e error) error {
close(ready)
......@@ -49,15 +53,20 @@ func TestWriteToDB1(t *testing.T) {
job.scheduler = scheduler
err = saver.SaveJob(job)
assert.Nil(t, err)
err = saver.SaveJob(job)
assert.Nil(t, err)
saver.AddJob(job)
saver.AddJob(job)
time.Sleep(1 * time.Second)
err = saver.Stop()
assert.Nil(t, err)
// check if stats are in database
var stats JobPersistence
err = gormDB.First(&stats, "id = ?", job.GetID()).Error
assert.Nil(t, err)
assert.Equal(t, job.GetID(), stats.ID)
assert.Equal(t, job.GetID(), stats.ID)
}
......@@ -20,16 +20,21 @@ func TestWriteToDB2(t *testing.T) {
t.Fatalf("a error occurred while opening the database: %v", err)
}
gormDB.Logger = gormDB.Logger.LogMode(4)
manager := &Manager{database: gormDB}
saver := NewDBSaver().SetManager(manager)
// Starte den DBSaver
p := StartDBSaver(saver)
p := CreateAndStartJobSyncer(manager)
ready := make(chan struct{})
Then[bool, bool](p, func(value bool) (bool, error) {
//var saver *JobSyncer
Then[*JobSyncer, *JobSyncer](p, func(value *JobSyncer) (*JobSyncer, error) {
close(ready)
// saver = value
return value, nil
}, func(e error) error {
close(ready)
......@@ -87,15 +92,14 @@ func TestWriteToDB2(t *testing.T) {
time.Sleep(1 * time.Second)
if mgr.dbSaver == nil {
t.Error("mgr.dbSaver == nil")
if mgr.jobSyncer == nil {
t.Error("mgr.JobSyncer == nil")
return
}
time.Sleep(1 * time.Second)
err = mgr.dbSaver.SaveJob(job)
assert.Nil(t, err)
mgr.jobSyncer.AddJob(job)
runtime.Gosched()
time.Sleep(1 * time.Second)
......@@ -112,8 +116,7 @@ func TestWriteToDB2(t *testing.T) {
runtime.Gosched()
time.Sleep(1 * time.Second)
err = mgr.dbSaver.SaveJob(job)
assert.Nil(t, err)
mgr.jobSyncer.AddJob(job)
time.Sleep(2 * time.Second)
err = mgr.CancelJobSchedule("job1")
......@@ -140,6 +143,6 @@ func TestWriteToDB2(t *testing.T) {
time.Sleep(1 * time.Second)
err = mgr.dbSaver.SaveJob(job)
mgr.jobSyncer.AddJob(job)
}
......@@ -19,6 +19,8 @@ func TestWriteToDB3(t *testing.T) {
t.Fatalf("a error occurred while opening the database: %v", err)
}
db.Logger = db.Logger.LogMode(4)
manager := NewManager()
manager.SetDB(db)
err = manager.AddWorker(NewLocalWorker(1))
......@@ -35,6 +37,15 @@ func TestWriteToDB3(t *testing.T) {
time.Sleep(1 * time.Second)
// test is job in database
var tmpJob1 JobPersistence
r := db.First(&tmpJob1, "id = ?", "job2")
assert.Nil(t, r.Error)
assert.Equal(t, JobID("job2"), tmpJob1.ID)
time.Sleep(1 * time.Second)
err = manager.DeleteJob(job.GetID())
assert.Nil(t, err)
......
......@@ -39,7 +39,7 @@ func TestWriteToDB4(t *testing.T) {
err = manager.ScheduleJob(job, scheduler)
assert.Nil(t, err)
time.Sleep(200 * time.Millisecond)
time.Sleep(500 * time.Millisecond)
// check is stats are the values above
var tmpJob JobPersistence
......@@ -49,15 +49,15 @@ func TestWriteToDB4(t *testing.T) {
// Validate the fields
assert.Equal(t, JobID("job3"), tmpJob.ID)
assert.Equal(t, 21, tmpJob.Stats.RunCount) // +1 because of the first run
assert.Equal(t, 31, tmpJob.Stats.SuccessCount) // +1 because of the first run
assert.Equal(t, 21, tmpJob.Stats.RunCount)
assert.Equal(t, 31, tmpJob.Stats.SuccessCount)
assert.Equal(t, 40, tmpJob.Stats.ErrorCount)
// reset stats
err = manager.ResetJobStats(job.GetID())
assert.Nil(t, err)
time.Sleep(2 * time.Second)
time.Sleep(500 * time.Millisecond)
var tmpJob2 JobPersistence
// check is stats are the values above
......
......@@ -7,6 +7,7 @@ import (
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"testing"
"time"
)
func TestWriteToDB5(t *testing.T) {
......@@ -35,14 +36,19 @@ func TestWriteToDB5(t *testing.T) {
sameIDJob := NewJob[CounterResult]("jobSameID", runner)
// Trying to save a job with the same ID should return an error
err = mgr.dbSaver.SaveJob(sameIDJob)
assert.Nil(t, err)
time.Sleep(500 * time.Millisecond)
// Trying to save a job with the same ID should do nothing
mgr.mu.Lock()
mgr.jobSyncer.AddJob(sameIDJob)
mgr.mu.Unlock()
err = mgr.CancelJobSchedule("jobSameID")
assert.Nil(t, err)
err = mgr.dbSaver.Stop()
mgr.mu.Lock()
err = mgr.jobSyncer.Stop()
mgr.mu.Unlock()
assert.Nil(t, err)
......
......@@ -7,7 +7,7 @@ package jobqueue
import (
"fmt"
"github.com/stretchr/testify/assert"
"gorm.io/driver/sqlite"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"testing"
"time"
......@@ -15,10 +15,13 @@ import (
func TestWriteToDB6(t *testing.T) {
db, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{})
if err != nil {
t.Fatalf("a error occurred while opening the database: %v", err)
}
// it is necessary to have a running mysql server
// docker rm -f mysql-test && \
// docker run --name mysql-test -e MYSQL_ROOT_PASSWORD=my-secret-pw -e MYSQL_DATABASE=testdb -p 3306:3306 -d mysql:latest && \
// docker logs -f mysql-test
dsn := "root:my-secret-pw@tcp(localhost:3306)/testdb?charset=utf8mb4&parseTime=True&loc=Local"
db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
manager := NewManager()
manager.SetDB(db)
......@@ -34,7 +37,7 @@ func TestWriteToDB6(t *testing.T) {
jobIDs := make([]JobID, numJobs)
for i := 0; i < numJobs; i++ {
jobID := JobID(fmt.Sprintf("burstJob%d", i))
jobID := JobID(fmt.Sprintf("burstJobA%d", i))
jobIDs[i] = jobID
runner := &CounterRunnable{}
......@@ -44,11 +47,18 @@ func TestWriteToDB6(t *testing.T) {
err = mgr.ScheduleJob(job, scheduler)
assert.Nil(t, err)
err = mgr.dbSaver.SaveJob(job)
assert.Nil(t, err)
mgr.mu.Lock()
mgr.jobSyncer.AddJob(job)
mgr.mu.Unlock()
time.Sleep(10 * time.Millisecond)
}
time.Sleep(10 * time.Second)
mgr.mu.Lock()
_ = mgr.jobSyncer.Stop()
mgr.mu.Unlock()
time.Sleep(2 * time.Second)
for _, jobID := range jobIDs {
var tmpJob JobPersistence
......
package jobqueue
import (
"github.com/stretchr/testify/assert"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"testing"
)
func TestCreateOrUpdateJob(t *testing.T) {
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
assert.NoError(t, err)
// Migrate the schema
assert.NoError(t, db.AutoMigrate(&JobPersistence{}, &JobLog{}, &JobStats{}))
runner := &CounterRunnable{}
job := NewJob[CounterResult]("job1", runner)
assert.NoError(t, createOrUpdateJob(job, db))
var jobPersistence JobPersistence
assert.NoError(t, db.First(&jobPersistence, "id = ?", job.GetID()).Error)
assert.Equal(t, job.GetID(), jobPersistence.ID)
assert.Equal(t, "", jobPersistence.Description)
assert.Equal(t, Priority(1), jobPersistence.Priority)
job.description = "Updated description"
assert.NoError(t, createOrUpdateJob(job, db))
assert.NoError(t, db.First(&jobPersistence, "id = ?", job.GetID()).Error)
assert.Equal(t, "Updated description", jobPersistence.Description)
}
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
"context"
"errors"
"gorm.io/gorm"
"math/rand"
"sync"
"time"
)
type DBSaverStatus int
func (s *JobSyncer) DeleteJob(job GenericJob) error {
s.mu.Lock()
defer s.mu.Unlock()
const (
DBSaverStatusStopped = iota
DBSaverStatusRunning
)
if s.manager == nil || s.manager.database == nil {
return ErrNoDatabaseConnection
}
type DBSaver struct {
saveChannel chan GenericJob
stopChan chan struct{}
migrateFlag bool
manager *Manager
status DBSaverStatus
mu sync.Mutex
jobSaveProgress sync.WaitGroup
}
return s.manager.database.Transaction(func(tx *gorm.DB) error {
permJob := job.GetPersistence()
type RunnerData string
type SchedulerData string
if err := tx.Where("job_id = ?", permJob.GetID()).Delete(&JobLog{}).Error; err != nil {
return err
}
// NewDBSaver creates a new DBSaver
func NewDBSaver() *DBSaver {
return &DBSaver{
saveChannel: make(chan GenericJob, 1000),
stopChan: make(chan struct{}),
}
}
if err := tx.Where("job_id = ?", permJob.GetID()).Delete(&JobStats{}).Error; err != nil {
return err
}
// SetManager sets the manager of the DBSaver
func (s *DBSaver) SetManager(manager *Manager) *DBSaver {
s.mu.Lock()
defer s.mu.Unlock()
if err := tx.Delete(&permJob).Error; err != nil {
return err
}
s.manager = manager
return s
return nil
})
}
func (s *DBSaver) setStatus(status DBSaverStatus) *DBSaver {
func (s *JobSyncer) ResetLogs(job GenericJob) error {
s.mu.Lock()
defer s.mu.Unlock()
s.status = status
return s
}
// isStatus returns true if the DBSaver has the given status
// the lock is not needed here, because it is only used in the Start() method
func (s *DBSaver) isStatus(status DBSaverStatus, lock bool) bool {
if lock {
s.mu.Lock()
defer s.mu.Unlock()
if s.manager == nil || s.manager.database == nil {
return ErrNoDatabaseConnection
}
return s.status == status
return s.manager.database.Transaction(func(tx *gorm.DB) error {
permJob := job.GetPersistence()
if err := tx.Unscoped().Where("job_id = ?", permJob.GetID()).Delete(&JobLog{}).Error; err != nil {
return err
}
return nil
})
}
func StartDBSaver[P *Promise[bool]](s *DBSaver) *Promise[bool] {
func (s *JobSyncer) ResetStats(job GenericJob) error {
s.mu.Lock()
defer s.mu.Unlock()
return NewPromise[bool](func(resolve func(bool), reject func(error)) {
if s.manager == nil || s.manager.database == nil {
reject(ErrNoDatabaseConnection)
return
}
if s.isStatus(DBSaverStatusRunning, false) {
resolve(true)
return
}
db := s.manager.database
if !s.migrateFlag {
err := db.AutoMigrate(&JobPersistence{}, &JobLog{}, &JobStats{})
if err != nil {
reject(err)
return
}
s.migrateFlag = true
}
if s.manager == nil || s.manager.database == nil {
return ErrNoDatabaseConnection
}
ready := make(chan struct{})
go runSaver(s, db, ready)
job.ResetStats()
stats := job.GetStats()
return s.manager.database.Transaction(func(tx *gorm.DB) error {
return tx.Model(&JobStats{}).Where("job_id = ?", job.GetID()).Select("*").Omit("deleted_at", "created_at", "job_id").Updates(stats).Error
<-ready
resolve(true)
})
}
//
//// Start starts the DBSaver
//func (s *DBSaver) Start() error {
// s.mu.Lock()
// defer s.mu.Unlock()
//
// if s.manager == nil || s.manager.database == nil {
// return ErrNoDatabaseConnection
// }
//
// if s.isStatus(DBSaverStatusRunning, false) {
// return nil
// }
//
// db := s.manager.database
//
// if !s.migrateFlag {
// err := db.AutoMigrate(&JobPersistence{}, &JobLog{}, &JobStats{})
// if err != nil {
// return err
// }
// s.migrateFlag = true
// }
//
// go runSaver(s, db)
// return nil
//}
func runSaver(s *DBSaver, db *gorm.DB, ready chan struct{}) {
s.setStatus(DBSaverStatusRunning)
defer func() {
// this runs after the function returns
// and needs to be protected by the lock
// of the setStatus method
//s.status = DBSaverStatusStopped
s.setStatus(DBSaverStatusStopped)
}()
close(ready)
for {
select {
case job := <-s.saveChannel:
s.jobSaveProgress.Add(1)
err := CreateOrUpdateJob(job, db)
func (s *JobSyncer) CreateOrUpdateJob(job GenericJob) error {
if err != nil {
Error("Error while saving job", "error", err)
}
s.jobSaveProgress.Done()
s.mu.Lock()
defer s.mu.Unlock()
case <-s.stopChan:
return
}
if s.manager == nil || s.manager.database == nil {
return ErrNoDatabaseConnection
}
return createOrUpdateJob(job, s.manager.database)
}
func CreateOrUpdateJob(job GenericJob, db *gorm.DB) error {
func createOrUpdateJob(job GenericJob, db *gorm.DB) error {
return db.Transaction(func(tx *gorm.DB) error {
......@@ -188,9 +104,9 @@ func CreateOrUpdateJob(job GenericJob, db *gorm.DB) error {
}
} else {
db.Unscoped().Model(&existingJob).Update("deleted_at", nil)
tx.Unscoped().Model(&existingJob).Update("deleted_at", nil)
tx.Model(&existingJob.Scheduler).Select(
tx.Model(&existingJob).Where("id = ?", existingJob.ID).Select(
[]string{
"type",
"interval",
......@@ -215,18 +131,13 @@ func CreateOrUpdateJob(job GenericJob, db *gorm.DB) error {
}
}
if tx.Error != nil {
Trace("Error while updating job", "error", tx.Error)
return tx.Error
}
tx.Model(&permJob.Stats).
Select(
[]string{
"run_count",
"success_count",
"error_count",
"time_metrics_avg_run_time",
"time_metrics_max_run_time",
"time_metrics_min_run_time",
"time_metrics_total_run_time",
},
).
Select("*").Omit("deleted_at", "created_at", "job_id").
UpdateColumns(permJob.Stats)
if tx.Error != nil {
......@@ -245,179 +156,3 @@ func CreateOrUpdateJob(job GenericJob, db *gorm.DB) error {
})
}
// Stop stops the DBSaver
func (s *DBSaver) Stop() error {
s.mu.Lock()
defer s.mu.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
go func() {
s.stopChan <- struct{}{}
s.jobSaveProgress.Wait()
cancel()
}()
<-ctx.Done()
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
Error("DBSaver did not stop in time")
return ctx.Err()
}
s.status = DBSaverStatusStopped
return nil
}
func exponentialBackoff(retry int) time.Duration {
waitTime := 100 * time.Millisecond
for i := 0; i < retry; i++ {
waitTime *= 2
waitTime += time.Duration(rand.Int63n(int64(waitTime))) // #nosec G404
}
return waitTime
}
// SaveJob saves a job to the database
func (s *DBSaver) SaveJob(job GenericJob) error {
s.mu.Lock()
defer func() {
if r := recover(); r != nil {
Error("Error while saving job", "error", r)
}
s.mu.Unlock()
}()
if s.saveChannel == nil {
return ErrDBSaverNotInitialized
}
if s.status != DBSaverStatusRunning {
return ErrDBSaverNotRunning
}
maxRetries := 5
for retries := maxRetries; retries > 0; retries-- {
select {
case s.saveChannel <- job:
return nil
default:
Error("DBSaver channel is full, dropping safe for job with ID", "job_id", job.GetID())
backoff := exponentialBackoff(maxRetries - retries)
Trace("DBSaver channel is full, retrying in", "backoff", backoff)
time.Sleep(backoff)
}
}
return errors.New("failed to save job after multiple attempts")
}
func checkRunningSaver(s *DBSaver) (*gorm.DB, error) {
if s.manager == nil {
return nil, ErrNoManager
}
if s.manager.database == nil {
return nil, ErrNoDatabaseConnection
}
if !s.isStatus(DBSaverStatusRunning, false) {
return nil, ErrDBSaverNotRunning
}
return s.manager.database, nil
}
// DeleteJob deletes a job from the database
func (s *DBSaver) DeleteJob(job GenericJob) error {
s.mu.Lock()
defer s.mu.Unlock()
var db *gorm.DB
var err error
if db, err = checkRunningSaver(s); err != nil {
return err
}
s.jobSaveProgress.Add(1)
return db.Transaction(func(tx *gorm.DB) error {
defer s.jobSaveProgress.Done()
permJob := job.GetPersistence()
dbErr := tx.Where("job_id = ?", permJob.GetID()).Delete(&JobLog{}).Error
if dbErr != nil {
return dbErr
}
dbErr = tx.Where("job_id = ?", permJob.GetID()).Delete(&JobStats{}).Error
if dbErr != nil {
return dbErr
}
dbErr = tx.Delete(&permJob).Error
if dbErr != nil {
return dbErr
}
return nil
})
}
func (s *DBSaver) ResetLogs(job GenericJob) error {
s.mu.Lock()
defer s.mu.Unlock()
var db *gorm.DB
var err error
if db, err = checkRunningSaver(s); err != nil {
return err
}
s.jobSaveProgress.Add(1)
return db.Transaction(func(tx *gorm.DB) error {
defer s.jobSaveProgress.Done()
permJob := job.GetPersistence()
// unscoped because we want to delete the logs finally
dbErr := tx.Unscoped().Where("job_id = ?", permJob.GetID()).Delete(&JobLog{}).Error
if dbErr != nil {
return dbErr
}
return nil
})
}
func (s *DBSaver) ResetStats(job GenericJob) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.saveChannel == nil {
return ErrDBSaverNotInitialized
}
if s.status != DBSaverStatusRunning {
return ErrDBSaverNotRunning
}
defer func() {
if r := recover(); r != nil {
Error("Error while saving job", "error", r)
}
}()
job.ResetStats()
select {
case s.saveChannel <- job:
default:
Error("DBSaver channel is full, dropping job with ID", "job_id", job.GetID())
}
return nil
}
//go:build !runOnTask
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
"github.com/stretchr/testify/assert"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"testing"
"time"
"github.com/stretchr/testify/assert"
"gorm.io/driver/sqlite"
)
//
//func startTestMySQLDockerImageAndContainer(t *testing.T, port string, ctx context.Context) error {
// t.Helper()
//
// cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
// if err != nil {
// return err
// }
//
// imageName := "mysql:8"
//
// reader, err := cli.ImagePull(ctx, imageName, types.ImagePullOptions{})
// if err != nil {
// return err
// }
//
// // if debug image pull, comment out the following lines
// //_, _ = io.Copy(os.Stdout, reader)
// _ = reader
//
// hostConfig := &container.HostConfig{
// PortBindings: nat.PortMap{
// "3306/tcp": []nat.PortBinding{
// {
// HostIP: DOCKER_TEST_HOST_IP,
// HostPort: port,
// },
// },
// },
// }
//
// resp, err := cli.ContainerCreate(ctx, &container.Config{
// Image: imageName,
// Env: []string{
// "MYSQL_ROOT_PASSWORD=secret",
// "MYSQL_USER=user",
// "MYSQL_PASSWORD=secret",
// "MYSQL_DATABASE=test",
// },
// }, hostConfig, nil, nil, "")
//
// if err != nil {
// return err
// }
//
// if err := cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil {
// return err
// }
//
// go func() {
// <-ctx.Done()
//
// timeout := 0
// stopOptions := container.StopOptions{
// Timeout: &timeout,
// Signal: "SIGKILL",
// }
// newCtx, _ := context.WithTimeout(context.Background(), 60*time.Second)
// if err := cli.ContainerStop(newCtx, resp.ID, stopOptions); err != nil {
// t.Errorf("ContainerStop returned error: %v", err)
// }
// if err := cli.ContainerRemove(newCtx, resp.ID, types.ContainerRemoveOptions{
// Force: true,
// }); err != nil {
// t.Errorf("ContainerRemove returned error: %v", err)
// }
//
// }()
//
// statusCh, errCh := cli.ContainerWait(ctx, resp.ID, container.WaitConditionNotRunning)
// select {
// case err := <-errCh:
// if err != nil {
// // empty error means container exited normally (see container_wait.go)
// if err.Error() == "" {
// return nil
// }
//
// return err
// }
// case <-statusCh:
//
// }
//
// return nil
//}
func TestSaveJobWithSQLite(t *testing.T) {
gormDB, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{})
if err != nil {
t.Fatalf("a error occurred while opening the database: %v", err)
}
func TestDeleteJob(t *testing.T) {
db, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{})
manager := &Manager{database: gormDB}
saver := NewDBSaver().SetManager(manager)
db.Logger = db.Logger.LogMode(4)
// Starte den DBSaver
p := StartDBSaver(saver)
assert.Nil(t, err)
err = db.AutoMigrate(&JobPersistence{}, &JobLog{}, &JobStats{})
assert.Nil(t, err)
ready := make(chan struct{})
manager := NewManager()
manager.SetDB(db)
jobSyncer := NewJobSyncer(manager)
Then[bool, bool](p, func(value bool) (bool, error) {
close(ready)
return value, nil
}, func(e error) error {
close(ready)
Error("Error while starting db saver", "error", err)
return nil
})
// Erstelle einen Job zum Löschen
runner := &CounterRunnable{}
job := NewJob[CounterResult]("testJobID", runner)
err = createOrUpdateJob(job, db)
assert.Nil(t, err)
<-ready
var count int64
db.Model(&JobPersistence{}).Where("id = ?", "testJobID").Count(&count)
assert.Equal(t, int64(1), count)
jobID := JobID("testJob")
job := NewJob[CounterResult](jobID, &CounterRunnable{})
// Lösche den Job
err = jobSyncer.DeleteJob(job)
assert.Nil(t, err)
err = saver.SaveJob(job)
assert.NoError(t, err)
// Überprüfe, ob der Job gelöscht wurde
db.Model(&JobPersistence{}).Where("id = ?", "testJobID").Count(&count)
assert.Equal(t, int64(0), count)
}
func TestResetLogs(t *testing.T) {
db, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{})
assert.Nil(t, err)
db.Logger = db.Logger.LogMode(4)
// Automatische Migration für benötigte Strukturen
err = db.AutoMigrate(&JobPersistence{}, &JobLog{}, &JobStats{})
assert.Nil(t, err)
manager := NewManager()
manager.SetDB(db)
jobSyncer := NewJobSyncer(manager)
// Erstelle einen Job und füge einige Logs hinzu
runner := &CounterRunnable{}
job := NewJob[CounterResult]("testJobID", runner)
err = createOrUpdateJob(job, db)
assert.Nil(t, err)
// Füge Logs zum Job hinzu
for i := 0; i < 5; i++ {
log := JobLog{JobID: job.GetID(), Result: "Test Message"}
err = db.Create(&log).Error
assert.Nil(t, err)
}
time.Sleep(100 * time.Millisecond)
var logCount int64
db.Model(&JobLog{}).Where("job_id = ?", job.GetID()).Count(&logCount)
assert.Equal(t, int64(5), logCount)
saver.Stop()
// Setze die Logs zurück
err = jobSyncer.ResetLogs(job)
assert.Nil(t, err)
var count int64
gormDB.Model(&JobPersistence{}).Count(&count)
assert.Equal(t, int64(1), count, "It should be 1 job in the database")
// Überprüfe, ob die Logs gelöscht wurden
// get job from database
var jobFromDB JobPersistence
gormDB.First(&jobFromDB, "id = ?", jobID)
assert.Equal(t, jobID, jobFromDB.ID, "JobID should be the same")
db.Model(&JobLog{}).Where("job_id = ?", job.GetID()).Count(&logCount)
assert.Equal(t, int64(0), logCount)
}
func TestResetStats(t *testing.T) {
db, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{})
assert.Nil(t, err)
db.Logger = db.Logger.LogMode(4)
// Automatische Migration für benötigte Strukturen
err = db.AutoMigrate(&JobPersistence{}, &JobLog{}, &JobStats{})
assert.Nil(t, err)
manager := NewManager()
manager.SetDB(db)
jobSyncer := NewJobSyncer(manager)
// Erstelle einen Job und setze einige Statistiken
runner := &CounterRunnable{}
job := NewJob[CounterResult]("testJobID", runner)
err = createOrUpdateJob(job, db)
assert.Nil(t, err)
// Aktualisiere die Job-Statistiken
jobStats := &JobStats{
JobID: job.GetID(),
RunCount: 5,
SuccessCount: 3,
ErrorCount: 2,
TimeMetrics: TimeMetrics{
AvgRunTime: 10 * time.Second,
MaxRunTime: 15 * time.Second,
MinRunTime: 5 * time.Second,
TotalRunTime: 50 * time.Second,
},
}
err = db.Save(jobStats).Error
assert.Nil(t, err)
// Setze die Statistiken zurück
err = jobSyncer.ResetStats(job)
assert.Nil(t, err)
// Überprüfe, ob die Statistiken zurückgesetzt wurden
var resetStats JobStats
err = db.First(&resetStats, "job_id = ?", job.GetID()).Error
assert.Nil(t, err)
assert.Equal(t, int(0), resetStats.RunCount)
assert.Equal(t, int(0), resetStats.SuccessCount)
assert.Equal(t, int(0), resetStats.ErrorCount)
assert.Equal(t, time.Duration(0), resetStats.TimeMetrics.AvgRunTime)
assert.Equal(t, time.Duration(0), resetStats.TimeMetrics.MaxRunTime)
assert.Equal(t, time.Duration(0), resetStats.TimeMetrics.MinRunTime)
assert.Equal(t, time.Duration(0), resetStats.TimeMetrics.TotalRunTime)
}
......@@ -74,11 +74,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1710162809,
"narHash": "sha256-i2R2bcnQp+85de67yjgZVvJhd6rRnJbSYNpGmB6Leb8=",
"lastModified": 1710695816,
"narHash": "sha256-3Eh7fhEID17pv9ZxrPwCLfqXnYP006RKzSs0JptsN84=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "ddcd7598b2184008c97e6c9c6a21c5f37590b8d2",
"rev": "614b4613980a522ba49f0d194531beddbb7220d3",
"type": "github"
},
"original": {
......@@ -106,11 +106,11 @@
},
"nixpkgs_2": {
"locked": {
"lastModified": 1710162809,
"narHash": "sha256-i2R2bcnQp+85de67yjgZVvJhd6rRnJbSYNpGmB6Leb8=",
"lastModified": 1710695816,
"narHash": "sha256-3Eh7fhEID17pv9ZxrPwCLfqXnYP006RKzSs0JptsN84=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "ddcd7598b2184008c97e6c9c6a21c5f37590b8d2",
"rev": "614b4613980a522ba49f0d194531beddbb7220d3",
"type": "github"
},
"original": {
......
......@@ -50,4 +50,6 @@ var (
ErrInvalidTime = fmt.Errorf("invalid time")
ErrSchedulerMisconfiguration = fmt.Errorf("scheduler misconfiguration")
ErrInvalidDuration = fmt.Errorf("invalid duration")
ErrJobSyncerAlreadyRunning = fmt.Errorf("JobSyncer is already running")
ErrJobSyncerNotRunning = fmt.Errorf("JobSyncer is not running")
)
......@@ -83,6 +83,11 @@ func (eb *EventBus) Unsubscribe(name EventName, ch chan interface{}) {
for i := range channels {
if channels[i] == ch {
eb.subscribers[name] = append(channels[:i], channels[i+1:]...)
if len(eb.subscribers[name]) == 0 {
delete(eb.subscribers, name)
}
break
}
}
......
......@@ -16,8 +16,8 @@ require (
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.21.0
gopkg.in/yaml.v3 v3.0.1
gorm.io/driver/mysql v1.5.4
gorm.io/gorm v1.25.7
gorm.io/driver/mysql v1.5.5
gorm.io/gorm v1.25.8
)
require (
......
......@@ -38,4 +38,6 @@ type GenericJob interface {
IsPaused() bool
ResetStats()
GetStats() JobStats
}
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/.devenv/state/go/pkg/mod/github.com/google/addlicense@v1.1.1/testdata/expected" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/.devenv/state/go/pkg/mod/github.com/google/addlicense@v1.1.1/testdata/initial" isTestSource="false" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
"sync"
)
type JobSyncer struct {
jobQueue []GenericJob
queueLock sync.Mutex
notifyChannel chan struct{}
stopChan chan struct{}
jobSaveProgress sync.WaitGroup
status Status
manager *Manager
mu sync.Mutex
migrateFlag bool
}
type Status int
const (
JobSyncerStatusStopped Status = iota
JobSyncerStatusRunning
)
func CreateAndStartJobSyncer[P *Promise[*JobSyncer]](manager *Manager) *Promise[*JobSyncer] {
s := NewJobSyncer(manager)
s.mu.Lock()
defer s.mu.Unlock()
return NewPromise[*JobSyncer](func(resolve func(*JobSyncer), reject func(error)) {
if s.manager == nil || s.manager.database == nil {
reject(ErrNoDatabaseConnection)
return
}
if s.status == JobSyncerStatusRunning {
resolve(s)
return
}
db := s.manager.database
if !s.migrateFlag {
err := db.AutoMigrate(&JobPersistence{}, &JobLog{}, &JobStats{})
if err != nil {
reject(err)
return
}
s.migrateFlag = true
}
err := s.Start()
if err != nil {
reject(err)
return
}
resolve(s)
})
}
func NewJobSyncer(manager *Manager) *JobSyncer {
return &JobSyncer{
jobQueue: make([]GenericJob, 0),
manager: manager,
}
}
func (js *JobSyncer) Start() error {
js.mu.Lock()
defer js.mu.Unlock()
if js.status == JobSyncerStatusRunning {
return ErrJobSyncerAlreadyRunning
}
js.notifyChannel = make(chan struct{}, 1) // Buffer to avoid blocking
js.stopChan = make(chan struct{})
js.status = JobSyncerStatusRunning
go js.runWorker()
return nil
}
func (js *JobSyncer) runWorker() {
for {
select {
case <-js.notifyChannel:
js.processJobs()
case <-js.stopChan:
js.cleanup()
return
}
}
}
func (js *JobSyncer) processJobs() {
for {
js.queueLock.Lock()
if len(js.jobQueue) == 0 {
js.queueLock.Unlock()
return
}
job := js.jobQueue[0]
js.jobQueue = js.jobQueue[1:]
js.queueLock.Unlock()
js.jobSaveProgress.Add(1)
js.processJob(job)
js.jobSaveProgress.Done()
}
}
func (js *JobSyncer) AddJob(job GenericJob) {
js.queueLock.Lock()
// check if job is already in queue
for _, j := range js.jobQueue {
if j.GetID() == job.GetID() {
js.queueLock.Unlock()
return
}
}
js.jobQueue = append(js.jobQueue, job)
js.queueLock.Unlock()
js.jobSaveProgress.Add(1)
// Non-blocking notify
select {
case js.notifyChannel <- struct{}{}:
default:
}
}
func (js *JobSyncer) processJob(job GenericJob) {
defer js.jobSaveProgress.Done()
err := createOrUpdateJob(job, js.manager.database)
Error("Error while creating or updating job", "error", err)
}
func (js *JobSyncer) Stop() error {
js.mu.Lock()
if js.status != JobSyncerStatusRunning {
js.mu.Unlock()
return ErrJobSyncerNotRunning
}
js.status = JobSyncerStatusStopped
js.mu.Unlock()
close(js.stopChan)
js.jobSaveProgress.Wait()
return nil
}
func (js *JobSyncer) cleanup() {
js.mu.Lock()
defer js.mu.Unlock()
js.jobSaveProgress.Wait()
js.status = JobSyncerStatusStopped
}
//go:build !runOnTask
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
"gorm.io/gorm"
"testing"
"time"
"github.com/stretchr/testify/assert"
"gorm.io/driver/sqlite"
)
func TestSaveJobWithSQLite(t *testing.T) {
gormDB, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{})
if err != nil {
t.Fatalf("a error occurred while opening the database: %v", err)
}
manager := &Manager{database: gormDB}
//saver := NewJobSyncer(manager)
// Starte den DBSaver
p := CreateAndStartJobSyncer(manager)
ready := make(chan struct{})
var saver *JobSyncer
Then[*JobSyncer, *JobSyncer](p, func(value *JobSyncer) (*JobSyncer, error) {
saver = value
close(ready)
return value, nil
}, func(e error) error {
close(ready)
Error("Error while starting db saver", "error", err)
return nil
})
<-ready
jobID := JobID("testJob")
job := NewJob[CounterResult](jobID, &CounterRunnable{})
saver.AddJob(job)
time.Sleep(100 * time.Millisecond)
saver.Stop()
var count int64
gormDB.Model(&JobPersistence{}).Count(&count)
assert.Equal(t, int64(1), count, "It should be 1 job in the database")
// get job from database
var jobFromDB JobPersistence
gormDB.First(&jobFromDB, "id = ?", jobID)
assert.Equal(t, jobID, jobFromDB.ID, "JobID should be the same")
}
Copyright (c) 2009 The Go Authors. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
......@@ -24,18 +24,18 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Initialize CodeQL
uses: github/codeql-action/init@v2
uses: github/codeql-action/init@v3
with:
languages: ${{ matrix.language }}
queries: +security-and-quality
- name: Autobuild
uses: github/codeql-action/autobuild@v2
uses: github/codeql-action/autobuild@v3
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v2
uses: github/codeql-action/analyze@v3
with:
category: "/language:${{ matrix.language }}"