Skip to content
Snippets Groups Projects
Verified Commit d00fb6fa authored by Volker Schukai's avatar Volker Schukai :alien:
Browse files

fix: reset stats dont work #25

parent 6991d4e8
No related branches found
No related tags found
No related merge requests found
...@@ -85,11 +85,22 @@ func (s *DBSaver) Start() error { ...@@ -85,11 +85,22 @@ func (s *DBSaver) Start() error {
s.migrateFlag = true s.migrateFlag = true
} }
var wg sync.WaitGroup
wg.Add(1)
go func() { go func() {
s.setStatus(DBSaverStatusRunning) wg.Done()
// this is protected by the lock above
s.status = DBSaverStatusRunning
defer func() { defer func() {
// this runs after the function returns
// and needs to be protected by the lock
// of the setStatus method
s.setStatus(DBSaverStatusStopped) s.setStatus(DBSaverStatusStopped)
}() }()
for { for {
select { select {
case job := <-s.saveChannel: case job := <-s.saveChannel:
...@@ -120,7 +131,19 @@ func (s *DBSaver) Start() error { ...@@ -120,7 +131,19 @@ func (s *DBSaver) Start() error {
} }
} }
tx.Model(&permJob.Stats).Updates(permJob.Stats) tx.Model(&permJob.Stats).
Select(
[]string{
"run_count",
"success_count",
"error_count",
"time_metrics_avg_run_time",
"time_metrics_max_run_time",
"time_metrics_min_run_time",
"time_metrics_total_run_time",
},
).
UpdateColumns(permJob.Stats)
for i, _ := range memLogs { for i, _ := range memLogs {
memLogs[i].LogID = 0 memLogs[i].LogID = 0
...@@ -141,6 +164,8 @@ func (s *DBSaver) Start() error { ...@@ -141,6 +164,8 @@ func (s *DBSaver) Start() error {
} }
}() }()
wg.Wait()
return nil return nil
} }
...@@ -265,25 +290,23 @@ func (s *DBSaver) ResetLogs(job GenericJob) error { ...@@ -265,25 +290,23 @@ func (s *DBSaver) ResetLogs(job GenericJob) error {
func (s *DBSaver) ResetStats(job GenericJob) error { func (s *DBSaver) ResetStats(job GenericJob) error {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
var db *gorm.DB
var err error
if db, err = checkRunningSaver(s); err != nil { if s.saveChannel == nil {
return err return ErrDBSaverNotInitialized
} }
return db.Transaction(func(tx *gorm.DB) error { if s.status != DBSaverStatusRunning {
permJob := job.GetPersistence() return ErrDBSaverNotRunning
defaultStats := JobStats{
JobID: permJob.GetID(),
} }
txErr := tx.Model(&permJob).Update("Stats", defaultStats).Error defer func() {
if txErr != nil { if r := recover(); r != nil {
return txErr s.logError("Error while saving job", "error", r)
} }
}()
job.ResetStats()
s.saveChannel <- job
return nil return nil
})
} }
...@@ -119,7 +119,10 @@ func startTestMySQLDockerImageAndContainer(t *testing.T, port string, ctx contex ...@@ -119,7 +119,10 @@ func startTestMySQLDockerImageAndContainer(t *testing.T, port string, ctx contex
func TestWriteToDB(t *testing.T) { func TestWriteToDB(t *testing.T) {
// if true, logging and port 3306 is used // if true, logging and port 3306 is used
debugMode := false useMySQLPort := os.Getenv("MYSQL_PORT")
//useMySQLPort = "3306"
printLogging := os.Getenv("MYSQL_LOGGING")
//printLogging = "true"
var err error var err error
...@@ -139,11 +142,7 @@ func TestWriteToDB(t *testing.T) { ...@@ -139,11 +142,7 @@ func TestWriteToDB(t *testing.T) {
portAsString := fmt.Sprintf("%d", portAsInt) portAsString := fmt.Sprintf("%d", portAsInt)
_ = listener.Close() _ = listener.Close()
useMySQLPort := os.Getenv("MYSQL_PORT") if useMySQLPort != "" {
if debugMode || useMySQLPort != "" {
if useMySQLPort == "" {
useMySQLPort = "3306"
}
portAsString = useMySQLPort portAsString = useMySQLPort
i, _ := strconv.Atoi(portAsString) i, _ := strconv.Atoi(portAsString)
...@@ -188,7 +187,7 @@ func TestWriteToDB(t *testing.T) { ...@@ -188,7 +187,7 @@ func TestWriteToDB(t *testing.T) {
var dbLogger logger.Interface var dbLogger logger.Interface
if debugMode { if printLogging == "true" {
dbLogger = logger.New( dbLogger = logger.New(
log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer
logger.Config{ logger.Config{
...@@ -338,7 +337,7 @@ func TestWriteToDB(t *testing.T) { ...@@ -338,7 +337,7 @@ func TestWriteToDB(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
runner := &CounterRunnable{} runner := &CounterRunnable{}
job := NewJob[CounterResult]("job1", runner) job := NewJob[CounterResult]("job2", runner)
scheduler := &InstantScheduler{} scheduler := &InstantScheduler{}
err = mgr.ScheduleJob(job, scheduler) err = mgr.ScheduleJob(job, scheduler)
...@@ -350,44 +349,107 @@ func TestWriteToDB(t *testing.T) { ...@@ -350,44 +349,107 @@ func TestWriteToDB(t *testing.T) {
// test is job in database // test is job in database
var tmpJob JobPersistence var tmpJob JobPersistence
if err := db.First(&tmpJob, "id = ?", "job1").Error; err == nil { err = db.First(&tmpJob, "id = ?", "job2").Error
t.Error("job1 is still in database") assert.NotNil(t, err)
}
}) })
wg.Wait() wg.Add(1)
t.Run("ResetStats", func(t *testing.T) {
defer wg.Done()
var jobPersistence JobPersistence mgr := NewManager()
var jobStats JobStats mgr.SetDB(db)
var jobLogs []JobLog // Assuming JobLog is your log model worker := NewLocalWorker(1)
err := mgr.AddWorker(worker)
assert.Nil(t, err)
// Query JobPersistence err = mgr.Start()
if err := db.First(&jobPersistence, "id = ?", "job1").Error; err != nil { assert.Nil(t, err)
t.Errorf("Failed to query JobPersistence: %v", err)
} else { runner := &CounterRunnable{}
// Validate the fields job := NewJob[CounterResult]("job3", runner)
assert.Equal(t, JobID("job1"), jobPersistence.ID)
job.stats = JobStats{
JobID: job.GetID(),
RunCount: 20,
SuccessCount: 30,
ErrorCount: 40,
} }
// Query JobStats scheduler := &InstantScheduler{}
if err := db.First(&jobStats, "job_id = ?", jobPersistence.ID).Error; err != nil { err = mgr.ScheduleJob(job, scheduler)
t.Errorf("Failed to query JobStats: %v", err) assert.Nil(t, err)
} else {
time.Sleep(200 * time.Millisecond)
// check is stats are the values above
var tmpJob JobPersistence
err = db.Preload("Stats").First(&tmpJob, "id = ?", "job3").Error
assert.Nil(t, err)
// Validate the fields // Validate the fields
assert.Equal(t, jobPersistence.ID, jobStats.JobID) assert.Equal(t, JobID("job3"), tmpJob.ID)
} assert.Equal(t, 21, tmpJob.Stats.RunCount) // +1 because of the first run
assert.Equal(t, 31, tmpJob.Stats.SuccessCount) // +1 because of the first run
assert.Equal(t, 40, tmpJob.Stats.ErrorCount)
// Query JobLogs // reset stats
if err := db.Find(&jobLogs, "job_id = ?", jobPersistence.ID).Error; err != nil { err = mgr.ResetJobStats(job.GetID())
t.Errorf("Failed to query JobLogs: %v", err) assert.Nil(t, err)
} else {
assert.NotEmpty(t, jobLogs)
for _, l := range jobLogs { time.Sleep(2 * time.Second)
assert.Equal(t, jobPersistence.ID, l.JobID)
} var tmpJob2 JobPersistence
} // check is stats are the values above
err = db.First(&tmpJob2, "id = ?", "job3").Error
err = db.Preload("Stats").First(&tmpJob2, "id = ?", "job3").Error
assert.Nil(t, err)
// Validate the fields
assert.Equal(t, JobID("job3"), tmpJob2.ID)
assert.Equal(t, 0, tmpJob2.Stats.RunCount)
assert.Equal(t, 0, tmpJob2.Stats.SuccessCount)
assert.Equal(t, 0, tmpJob2.Stats.ErrorCount)
err = mgr.DeleteJob(job.GetID())
assert.Nil(t, err)
})
wg.Wait()
//var jobPersistence JobPersistence
//var jobStats JobStats
//var jobLogs []JobLog // Assuming JobLog is your log model
//
//// Query JobPersistence
//if err := db.First(&jobPersistence, "id = ?", "job1").Error; err != nil {
// t.Errorf("Failed to query JobPersistence: %v", err)
//} else {
// // Validate the fields
// assert.Equal(t, JobID("job1"), jobPersistence.ID)
//}
//
//// Query JobStats
//if err := db.First(&jobStats, "job_id = ?", jobPersistence.ID).Error; err != nil {
// t.Errorf("Failed to query JobStats: %v", err)
//} else {
// // Validate the fields
// assert.Equal(t, jobPersistence.ID, jobStats.JobID)
//}
//
//// Query JobLogs
//if err := db.Find(&jobLogs, "job_id = ?", jobPersistence.ID).Error; err != nil {
// t.Errorf("Failed to query JobLogs: %v", err)
//} else {
// assert.NotEmpty(t, jobLogs)
//
// for _, l := range jobLogs {
// assert.Equal(t, jobPersistence.ID, l.JobID)
// }
//}
cancel() cancel()
......
...@@ -33,4 +33,6 @@ type GenericJob interface { ...@@ -33,4 +33,6 @@ type GenericJob interface {
Resume() Resume()
IsPaused() bool IsPaused() bool
ResetStats()
} }
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"time" "time"
) )
// important: if you want to add fields to this struct, you have to add them to the ResetStats() method as well
type JobStats struct { type JobStats struct {
JobID JobID `json:"job_id" gorm:"primaryKey"` JobID JobID `json:"job_id" gorm:"primaryKey"`
RunCount int `json:"run_count"` RunCount int `json:"run_count"`
......
...@@ -38,7 +38,7 @@ type Job[T any] struct { ...@@ -38,7 +38,7 @@ type Job[T any] struct {
pause bool pause bool
pauseReason string pauseReason string
pauseUntil time.Time pauseUntil *time.Time
dependencies []JobID dependencies []JobID
...@@ -142,7 +142,7 @@ func (j *Job[T]) PauseUntil(until time.Time) { ...@@ -142,7 +142,7 @@ func (j *Job[T]) PauseUntil(until time.Time) {
j.mu.Lock() j.mu.Lock()
defer j.mu.Unlock() defer j.mu.Unlock()
j.pause = true j.pause = true
j.pauseUntil = until j.pauseUntil = &until
} }
...@@ -150,16 +150,33 @@ func (j *Job[T]) Resume() { ...@@ -150,16 +150,33 @@ func (j *Job[T]) Resume() {
j.mu.Lock() j.mu.Lock()
defer j.mu.Unlock() defer j.mu.Unlock()
j.pause = false j.pause = false
j.pauseUntil = time.Time{} j.pauseUntil = nil
} }
func (j *Job[T]) ResetStats() {
j.mu.Lock()
defer j.mu.Unlock()
j.stats = JobStats{
JobID: j.id,
RunCount: 0,
SuccessCount: 0,
ErrorCount: 0,
TimeMetrics: TimeMetrics{
TotalRunTime: 0,
MinRunTime: 0,
MaxRunTime: 0,
AvgRunTime: 0,
},
}
}
// IsPaused returns true if the job is paused // IsPaused returns true if the job is paused
func (j *Job[T]) IsPaused() bool { func (j *Job[T]) IsPaused() bool {
j.mu.Lock() j.mu.Lock()
defer j.mu.Unlock() defer j.mu.Unlock()
if j.pause { if j.pause {
if j.pauseUntil.IsZero() { if j.pauseUntil == nil || j.pauseUntil.IsZero() {
return true return true
} else { } else {
return j.pauseUntil.After(time.Now()) return j.pauseUntil.After(time.Now())
......
...@@ -57,6 +57,10 @@ type MockGenericJob struct { ...@@ -57,6 +57,10 @@ type MockGenericJob struct {
Scheduler Scheduler Scheduler Scheduler
} }
func (m *MockGenericJob) ResetStats() {
}
func (m *MockGenericJob) GetMaxRetries() uint { func (m *MockGenericJob) GetMaxRetries() uint {
return 0 return 0
} }
......
...@@ -24,7 +24,7 @@ type JobPersistence struct { ...@@ -24,7 +24,7 @@ type JobPersistence struct {
Pause bool `yaml:"pause" json:"pause" gorm:"column:pause"` Pause bool `yaml:"pause" json:"pause" gorm:"column:pause"`
PauseReason string `yaml:"pauseReason" json:"pauseReason" gorm:"column:pause_reason"` PauseReason string `yaml:"pauseReason" json:"pauseReason" gorm:"column:pause_reason"`
PauseUntil time.Time `yaml:"pauseUntil" json:"pauseUntil" gorm:"column:pause_until"` PauseUntil *time.Time `yaml:"pauseUntil" json:"pauseUntil" gorm:"column:pause_until"`
Logs []JobLog `gorm:"foreignKey:JobID;references:ID" json:"-" yaml:"-"` Logs []JobLog `gorm:"foreignKey:JobID;references:ID" json:"-" yaml:"-"`
Stats JobStats `gorm:"foreignKey:JobID" json:"stats" yaml:"stats"` Stats JobStats `gorm:"foreignKey:JobID" json:"stats" yaml:"stats"`
......
...@@ -31,6 +31,10 @@ func (j DummyJob) IsPaused() bool { ...@@ -31,6 +31,10 @@ func (j DummyJob) IsPaused() bool {
return false return false
} }
func (j DummyJob) ResetStats() {
}
func (j DummyJob) GetMaxRetries() uint { func (j DummyJob) GetMaxRetries() uint {
return 0 return 0
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment