diff --git a/database.go b/database.go index aa69b2918a1c390c156c41b6129aae18da5f0738..4de0bda7f1cfcdd0ab737d29dd3b7028390deefb 100644 --- a/database.go +++ b/database.go @@ -85,11 +85,22 @@ func (s *DBSaver) Start() error { s.migrateFlag = true } + var wg sync.WaitGroup + wg.Add(1) + go func() { - s.setStatus(DBSaverStatusRunning) + wg.Done() + + // this is protected by the lock above + s.status = DBSaverStatusRunning + defer func() { + // this runs after the function returns + // and needs to be protected by the lock + // of the setStatus method s.setStatus(DBSaverStatusStopped) }() + for { select { case job := <-s.saveChannel: @@ -120,7 +131,19 @@ func (s *DBSaver) Start() error { } } - tx.Model(&permJob.Stats).Updates(permJob.Stats) + tx.Model(&permJob.Stats). + Select( + []string{ + "run_count", + "success_count", + "error_count", + "time_metrics_avg_run_time", + "time_metrics_max_run_time", + "time_metrics_min_run_time", + "time_metrics_total_run_time", + }, + ). + UpdateColumns(permJob.Stats) for i, _ := range memLogs { memLogs[i].LogID = 0 @@ -141,6 +164,8 @@ func (s *DBSaver) Start() error { } }() + wg.Wait() + return nil } @@ -265,25 +290,23 @@ func (s *DBSaver) ResetLogs(job GenericJob) error { func (s *DBSaver) ResetStats(job GenericJob) error { s.mu.Lock() defer s.mu.Unlock() - var db *gorm.DB - var err error - if db, err = checkRunningSaver(s); err != nil { - return err + if s.saveChannel == nil { + return ErrDBSaverNotInitialized } - return db.Transaction(func(tx *gorm.DB) error { - permJob := job.GetPersistence() + if s.status != DBSaverStatusRunning { + return ErrDBSaverNotRunning + } - defaultStats := JobStats{ - JobID: permJob.GetID(), + defer func() { + if r := recover(); r != nil { + s.logError("Error while saving job", "error", r) } + }() - txErr := tx.Model(&permJob).Update("Stats", defaultStats).Error - if txErr != nil { - return txErr - } + job.ResetStats() - return nil - }) + s.saveChannel <- job + return nil } diff --git a/database_test.go b/database_test.go index c2b80e08558817a059c23d60b433d1153ca42014..2c3904dba9bb4d6cd5f4236f55d3598357f7ce72 100644 --- a/database_test.go +++ b/database_test.go @@ -119,7 +119,10 @@ func startTestMySQLDockerImageAndContainer(t *testing.T, port string, ctx contex func TestWriteToDB(t *testing.T) { // if true, logging and port 3306 is used - debugMode := false + useMySQLPort := os.Getenv("MYSQL_PORT") + //useMySQLPort = "3306" + printLogging := os.Getenv("MYSQL_LOGGING") + //printLogging = "true" var err error @@ -139,11 +142,7 @@ func TestWriteToDB(t *testing.T) { portAsString := fmt.Sprintf("%d", portAsInt) _ = listener.Close() - useMySQLPort := os.Getenv("MYSQL_PORT") - if debugMode || useMySQLPort != "" { - if useMySQLPort == "" { - useMySQLPort = "3306" - } + if useMySQLPort != "" { portAsString = useMySQLPort i, _ := strconv.Atoi(portAsString) @@ -188,7 +187,7 @@ func TestWriteToDB(t *testing.T) { var dbLogger logger.Interface - if debugMode { + if printLogging == "true" { dbLogger = logger.New( log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer logger.Config{ @@ -338,7 +337,7 @@ func TestWriteToDB(t *testing.T) { assert.Nil(t, err) runner := &CounterRunnable{} - job := NewJob[CounterResult]("job1", runner) + job := NewJob[CounterResult]("job2", runner) scheduler := &InstantScheduler{} err = mgr.ScheduleJob(job, scheduler) @@ -350,44 +349,107 @@ func TestWriteToDB(t *testing.T) { // test is job in database var tmpJob JobPersistence - if err := db.First(&tmpJob, "id = ?", "job1").Error; err == nil { - t.Error("job1 is still in database") - } + err = db.First(&tmpJob, "id = ?", "job2").Error + assert.NotNil(t, err) }) - wg.Wait() + wg.Add(1) + t.Run("ResetStats", func(t *testing.T) { + defer wg.Done() + + mgr := NewManager() + mgr.SetDB(db) + worker := NewLocalWorker(1) + err := mgr.AddWorker(worker) + assert.Nil(t, err) - var jobPersistence JobPersistence - var jobStats JobStats - var jobLogs []JobLog // Assuming JobLog is your log model + err = mgr.Start() + assert.Nil(t, err) + + runner := &CounterRunnable{} + job := NewJob[CounterResult]("job3", runner) + + job.stats = JobStats{ + JobID: job.GetID(), + RunCount: 20, + SuccessCount: 30, + ErrorCount: 40, + } + + scheduler := &InstantScheduler{} + err = mgr.ScheduleJob(job, scheduler) + assert.Nil(t, err) + + time.Sleep(200 * time.Millisecond) + + // check is stats are the values above + var tmpJob JobPersistence + + err = db.Preload("Stats").First(&tmpJob, "id = ?", "job3").Error + assert.Nil(t, err) - // Query JobPersistence - if err := db.First(&jobPersistence, "id = ?", "job1").Error; err != nil { - t.Errorf("Failed to query JobPersistence: %v", err) - } else { // Validate the fields - assert.Equal(t, JobID("job1"), jobPersistence.ID) - } + assert.Equal(t, JobID("job3"), tmpJob.ID) + assert.Equal(t, 21, tmpJob.Stats.RunCount) // +1 because of the first run + assert.Equal(t, 31, tmpJob.Stats.SuccessCount) // +1 because of the first run + assert.Equal(t, 40, tmpJob.Stats.ErrorCount) + + // reset stats + err = mgr.ResetJobStats(job.GetID()) + assert.Nil(t, err) + + time.Sleep(2 * time.Second) + + var tmpJob2 JobPersistence + // check is stats are the values above + err = db.First(&tmpJob2, "id = ?", "job3").Error + err = db.Preload("Stats").First(&tmpJob2, "id = ?", "job3").Error + assert.Nil(t, err) - // Query JobStats - if err := db.First(&jobStats, "job_id = ?", jobPersistence.ID).Error; err != nil { - t.Errorf("Failed to query JobStats: %v", err) - } else { // Validate the fields - assert.Equal(t, jobPersistence.ID, jobStats.JobID) - } + assert.Equal(t, JobID("job3"), tmpJob2.ID) + assert.Equal(t, 0, tmpJob2.Stats.RunCount) + assert.Equal(t, 0, tmpJob2.Stats.SuccessCount) + assert.Equal(t, 0, tmpJob2.Stats.ErrorCount) - // Query JobLogs - if err := db.Find(&jobLogs, "job_id = ?", jobPersistence.ID).Error; err != nil { - t.Errorf("Failed to query JobLogs: %v", err) - } else { - assert.NotEmpty(t, jobLogs) + err = mgr.DeleteJob(job.GetID()) + assert.Nil(t, err) - for _, l := range jobLogs { - assert.Equal(t, jobPersistence.ID, l.JobID) - } - } + }) + + wg.Wait() + + //var jobPersistence JobPersistence + //var jobStats JobStats + //var jobLogs []JobLog // Assuming JobLog is your log model + // + //// Query JobPersistence + //if err := db.First(&jobPersistence, "id = ?", "job1").Error; err != nil { + // t.Errorf("Failed to query JobPersistence: %v", err) + //} else { + // // Validate the fields + // assert.Equal(t, JobID("job1"), jobPersistence.ID) + //} + // + //// Query JobStats + //if err := db.First(&jobStats, "job_id = ?", jobPersistence.ID).Error; err != nil { + // t.Errorf("Failed to query JobStats: %v", err) + //} else { + // // Validate the fields + // assert.Equal(t, jobPersistence.ID, jobStats.JobID) + //} + // + //// Query JobLogs + //if err := db.Find(&jobLogs, "job_id = ?", jobPersistence.ID).Error; err != nil { + // t.Errorf("Failed to query JobLogs: %v", err) + //} else { + // assert.NotEmpty(t, jobLogs) + // + // for _, l := range jobLogs { + // assert.Equal(t, jobPersistence.ID, l.JobID) + // } + //} cancel() diff --git a/job-generic.go b/job-generic.go index aaa37b50546e3a503e1da3f1cddcdd742b38568c..57f24b52e60926381cfffde4c8c96b5e506182c0 100644 --- a/job-generic.go +++ b/job-generic.go @@ -33,4 +33,6 @@ type GenericJob interface { Resume() IsPaused() bool + + ResetStats() } diff --git a/job-stat.go b/job-stat.go index 808310a78481ebaacc659e0e1dfce0dfbb6bcd12..2a784cccbee25ac412bbd6327504aa491f1fe96b 100644 --- a/job-stat.go +++ b/job-stat.go @@ -5,6 +5,7 @@ import ( "time" ) +// important: if you want to add fields to this struct, you have to add them to the ResetStats() method as well type JobStats struct { JobID JobID `json:"job_id" gorm:"primaryKey"` RunCount int `json:"run_count"` diff --git a/job.go b/job.go index 5b7a8a6e195d6ae708e2e11d8dd086d28d32a095..6db30ec2a2174e5495db121c11d5449bd25d325a 100644 --- a/job.go +++ b/job.go @@ -38,7 +38,7 @@ type Job[T any] struct { pause bool pauseReason string - pauseUntil time.Time + pauseUntil *time.Time dependencies []JobID @@ -142,7 +142,7 @@ func (j *Job[T]) PauseUntil(until time.Time) { j.mu.Lock() defer j.mu.Unlock() j.pause = true - j.pauseUntil = until + j.pauseUntil = &until } @@ -150,16 +150,33 @@ func (j *Job[T]) Resume() { j.mu.Lock() defer j.mu.Unlock() j.pause = false - j.pauseUntil = time.Time{} + j.pauseUntil = nil } +func (j *Job[T]) ResetStats() { + j.mu.Lock() + defer j.mu.Unlock() + j.stats = JobStats{ + JobID: j.id, + RunCount: 0, + SuccessCount: 0, + ErrorCount: 0, + TimeMetrics: TimeMetrics{ + TotalRunTime: 0, + MinRunTime: 0, + MaxRunTime: 0, + AvgRunTime: 0, + }, + } +} + // IsPaused returns true if the job is paused func (j *Job[T]) IsPaused() bool { j.mu.Lock() defer j.mu.Unlock() if j.pause { - if j.pauseUntil.IsZero() { + if j.pauseUntil == nil || j.pauseUntil.IsZero() { return true } else { return j.pauseUntil.After(time.Now()) diff --git a/manager_test.go b/manager_test.go index 6406c726bd9668e20bbd4d6cd54408a32e56024a..55c390ff9add5da9be75a88c84cf34edebbb5113 100644 --- a/manager_test.go +++ b/manager_test.go @@ -57,6 +57,10 @@ type MockGenericJob struct { Scheduler Scheduler } +func (m *MockGenericJob) ResetStats() { + +} + func (m *MockGenericJob) GetMaxRetries() uint { return 0 } @@ -126,7 +130,7 @@ func TestNewManager(t *testing.T) { manager := NewManager() eventBus := manager.eventBus - + assert.NotNil(t, manager) assert.Equal(t, ManagerState(ManagerStateStopped), manager.state) assert.NotNil(t, manager.queue) diff --git a/persistence.go b/persistence.go index 67b84a4b1cb9dfcc3307d01037b24e63e8c61219..35f1b260691be9db792580a4c952e71661b9f188 100644 --- a/persistence.go +++ b/persistence.go @@ -22,9 +22,9 @@ type JobPersistence struct { Runnable RunnableImport `yaml:"runnable" json:"runnable" gorm:"embedded;embeddedPrefix:runnable_"` Scheduler SchedulerPersistence `yaml:"scheduler" json:"scheduler,omitempty" gorm:"embedded;embeddedPrefix:scheduler_"` - Pause bool `yaml:"pause" json:"pause" gorm:"column:pause"` - PauseReason string `yaml:"pauseReason" json:"pauseReason" gorm:"column:pause_reason"` - PauseUntil time.Time `yaml:"pauseUntil" json:"pauseUntil" gorm:"column:pause_until"` + Pause bool `yaml:"pause" json:"pause" gorm:"column:pause"` + PauseReason string `yaml:"pauseReason" json:"pauseReason" gorm:"column:pause_reason"` + PauseUntil *time.Time `yaml:"pauseUntil" json:"pauseUntil" gorm:"column:pause_until"` Logs []JobLog `gorm:"foreignKey:JobID;references:ID" json:"-" yaml:"-"` Stats JobStats `gorm:"foreignKey:JobID" json:"stats" yaml:"stats"` diff --git a/worker_test.go b/worker_test.go index d2a7fc8da9473cf9fd23fce2188fd82857cb1c64..2cc3994f51c06083bfc872af9aea49010331bb72 100644 --- a/worker_test.go +++ b/worker_test.go @@ -31,6 +31,10 @@ func (j DummyJob) IsPaused() bool { return false } +func (j DummyJob) ResetStats() { + +} + func (j DummyJob) GetMaxRetries() uint { return 0 }