From daa4084496a7fa468cd78caa9c906088d45a9973 Mon Sep 17 00:00:00 2001 From: Volker Schukai <volker.schukai@schukai.com> Date: Thu, 18 Apr 2024 02:20:44 +0200 Subject: [PATCH] fix: optimize save #52 --- database.go | 27 ++++++++++++++++++--------- devenv.nix | 1 + errors.go | 5 ++++- job-generic.go | 2 ++ job-syncer.go | 2 +- queue.go | 6 ++---- runnable.go | 1 - schedule-time.go | 1 - scheduler.go | 22 +++++++--------------- scheduler_test.go | 20 ++++++++++++++++++-- 10 files changed, 53 insertions(+), 34 deletions(-) diff --git a/database.go b/database.go index e8591df..170cc06 100644 --- a/database.go +++ b/database.go @@ -18,11 +18,10 @@ func (s *JobSyncer) CheckAndSaveOrUpdate(job GenericJob) error { return ErrNoDatabaseConnection } - permJob := job.GetPersistence() db := s.manager.database var existing JobPersistence - result := db.Where("id = ?", permJob.ID).First(&existing) + result := db.Where("id = ?", job.GetID()).First(&existing) if result.Error == nil { return updateJob(job, db) @@ -181,18 +180,28 @@ func save(job *JobPersistence, db *gorm.DB) error { return db.Transaction(func(tx *gorm.DB) error { - if err := tx.Save(job).Error; err != nil { - return err + var existingJob JobPersistence + if err := tx.First(&existingJob, "id = ?", job.ID).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + if err := tx.Create(job).Error; err != nil { + return ErrFailedToCreate + } + } else { + return ErrFailedToQueryExistingJob + } + } else { + if err := tx.Save(job).Error; err != nil { + return ErrFailedToSaveJob + } } if job.Stats != (JobStats{}) { job.Stats.JobID = job.ID - if job.Stats.RunCount == 0 { - Info("Stats runCount is 0, skipping update") - } - - if err := tx.Save(&job.Stats).Error; err != nil { + if err := tx.Model(job.Stats). + Select("*"). + Omit("job_id", "created_at"). + Updates(job.Stats).Error; err != nil { return err } } diff --git a/devenv.nix b/devenv.nix index 31e72c2..fc622ce 100644 --- a/devenv.nix +++ b/devenv.nix @@ -9,6 +9,7 @@ blackbox-terminal coreutils-full dbeaver + glibc.static dbeaver delve dialog diff --git a/errors.go b/errors.go index 97a20f1..b231eec 100644 --- a/errors.go +++ b/errors.go @@ -43,7 +43,7 @@ var ( ErrSchedulerNotSet = fmt.Errorf("scheduler is not set") ErrJobNotActive = fmt.Errorf("job is not active") ErrJobAlreadyActive = fmt.Errorf("job is already active") - ErrChannelAlreadyClosed = fmt.Errorf("channel is already closed") + ErrChannelAlreadyClosed = fmt.Errorf("the channel is already closed") ErrUnknownScheduleType = fmt.Errorf("unknown schedule type") ErrNoManager = fmt.Errorf("no manager") ErrCannotLoadStatsFromDatabase = fmt.Errorf("errors while loading stats from database") @@ -54,4 +54,7 @@ var ( ErrJobSyncerNotRunning = fmt.Errorf("JobSyncer is not running") ErrMaxRetriesReached = fmt.Errorf("maximum number of retries reached") ErrTimeoutReached = fmt.Errorf("timeout reached") + ErrFailedToCreate = fmt.Errorf("failed to create") + ErrFailedToQueryExistingJob = fmt.Errorf("failed to query an existing job") + ErrFailedToSaveJob = fmt.Errorf("failed to save a job") ) diff --git a/job-generic.go b/job-generic.go index 4c3c9fa..79c5f57 100644 --- a/job-generic.go +++ b/job-generic.go @@ -10,6 +10,7 @@ import ( type GenericJob interface { GetID() JobID + GetDependencies() []JobID GetPriority() Priority @@ -31,6 +32,7 @@ type GenericJob interface { GetScheduler() Scheduler Pause() + PauseUntil(until time.Time) Resume() diff --git a/job-syncer.go b/job-syncer.go index 57b4535..250705b 100644 --- a/job-syncer.go +++ b/job-syncer.go @@ -66,7 +66,7 @@ func (js *JobSyncer) Sync(job GenericJob) { err := js.CheckAndSaveOrUpdate(job) if err != nil { - Error("Error while creating or updating job", err) + Error("Error while creating or updating the job", err) } }() diff --git a/queue.go b/queue.go index a165112..4415df6 100644 --- a/queue.go +++ b/queue.go @@ -95,7 +95,7 @@ func (q *Queue) Enqueue(job GenericJob) error { currentReadyJobIDs[job.GetID()] = struct{}{} } - fullJobList := []GenericJob{} + var fullJobList []GenericJob for _, job := range readyJobList { fullJobList = append(fullJobList, job) } @@ -122,7 +122,7 @@ func (q *Queue) Enqueue(job GenericJob) error { if q.eventBus != nil && len(q.readyQueue) > 0 { Info("Job ready", "job_id", job.GetID()) - + q.eventBus.Publish(JobReady, nil) } @@ -182,7 +182,6 @@ func (q *Queue) Dequeue() (GenericJob, error) { ProcessedTime: time.Now(), ID: job.GetID(), }) - return job, nil } @@ -194,6 +193,5 @@ func removeJobID(deps []JobID, id JobID) []JobID { return deps[:len(deps)-1] } } - return deps } diff --git a/runnable.go b/runnable.go index c0bab85..b48de02 100644 --- a/runnable.go +++ b/runnable.go @@ -30,6 +30,5 @@ func (r RunResult[T]) GetStatus() ResultStatus { type Runnable[T any] interface { Run(ctx context.Context) (RunResult[T], error) GetType() string - GetPersistence() RunnableImport } diff --git a/schedule-time.go b/schedule-time.go index f6aa754..f04c32d 100644 --- a/schedule-time.go +++ b/schedule-time.go @@ -101,7 +101,6 @@ func (s *TimeScheduler) JobExists(id JobID) bool { if s.jobs == nil { return false } - _, ok := s.jobs[id] return ok } diff --git a/scheduler.go b/scheduler.go index 1cf128d..cb1c3fb 100644 --- a/scheduler.go +++ b/scheduler.go @@ -17,24 +17,22 @@ type Scheduler interface { Cancel(id JobID) error CancelAll() error JobExists(id JobID) bool - GetType() string - IsAdHoc() bool - GetPersistence() SchedulerPersistence } type SchedulerPersistence struct { - Type string `yaml:"type" json:"type" gorm:"column:type"` + Time *time.Time `yaml:"time,omitempty" json:"time,omitempty" gorm:"column:time"` Interval time.Duration `yaml:"interval,omitempty" json:"interval,omitempty" gorm:"column:interval"` - Spec string `yaml:"spec,omitempty" json:"spec,omitempty" gorm:"column:spec"` Delay time.Duration `yaml:"delay,omitempty" json:"delay,omitempty" gorm:"column:delay"` - Event EventName `yaml:"event,omitempty" json:"event,omitempty" gorm:"column:event"` - Time *time.Time `yaml:"time,omitempty" json:"time,omitempty" gorm:"column:time"` - Executed bool `yaml:"executed,omitempty" json:"executed,omitempty" gorm:"column:executed"` - Path string `yaml:"path,omitempty" json:"path,omitempty" gorm:"column:path"` EventFlags fsnotify.Op `yaml:"eventFlags,omitempty" json:"eventFlags,omitempty" gorm:"column:eventFlags"` + + Type string `yaml:"type" json:"type" gorm:"column:type"` + Spec string `yaml:"spec,omitempty" json:"spec,omitempty" gorm:"column:spec"` + Event EventName `yaml:"event,omitempty" json:"event,omitempty" gorm:"column:event"` + Executed bool `yaml:"executed,omitempty" json:"executed,omitempty" gorm:"column:executed"` + Path string `yaml:"path,omitempty" json:"path,omitempty" gorm:"column:path"` } type scheduleImportStruct struct { @@ -112,15 +110,11 @@ func (sp *SchedulerPersistence) parseAndAssignFields(aux scheduleImportStruct) e // UnmarshalJSON implements the json.Unmarshaler interface func (sp *SchedulerPersistence) UnmarshalJSON(data []byte) error { - var aux scheduleImportStruct - if err := json.Unmarshal(data, &aux); err != nil { return err } - return sp.parseAndAssignFields(aux) - } func (sp *SchedulerPersistence) UnmarshalYAML(unmarshal func(interface{}) error) error { @@ -128,7 +122,5 @@ func (sp *SchedulerPersistence) UnmarshalYAML(unmarshal func(interface{}) error) if err := unmarshal(&aux); err != nil { return err } - return sp.parseAndAssignFields(aux) - } diff --git a/scheduler_test.go b/scheduler_test.go index 40a81db..61edd28 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -37,7 +37,7 @@ func TestIntervalScheduler_BasicFunctionality(t *testing.T) { time.Sleep(time.Millisecond * 500) if atomic.LoadInt32(&count) < 4 { - t.Errorf("Expected to run at least 4 times, ran %d times", count) + t.Errorf("Expected to run at least four times, ran %d times", count) } } @@ -175,7 +175,7 @@ func TestDelayScheduler_BasicFunctionality(t *testing.T) { time.Sleep(time.Millisecond * 200) if atomic.LoadInt32(&count) != 1 { - t.Errorf("Expected to run 1 time, ran %d times", count) + t.Errorf("Expected to run one time, ran %d times", count) } } @@ -432,3 +432,19 @@ time: "2023-12-15T12:00:00Z" assert.Equal(t, expectedInterval, sp.Interval, "Interval should be unmarshalled correctly") assert.Equal(t, &expectedTime, sp.Time, "Time should be unmarshalled correctly") } + +func TestUnmarshalSchedulerPersistenceIntervalYAML(t *testing.T) { + + yamlData := ` +type: Interval +interval: "1m1s" +` + + var sp SchedulerPersistence + err := yaml.Unmarshal([]byte(yamlData), &sp) + assert.Nil(t, err, "Unmarshalling should not produce an error") + + expectedInterval, _ := time.ParseDuration("1m") + assert.Equal(t, "interval", sp.Type, "Type should be unmarshalled correctly") + assert.Equal(t, expectedInterval, sp.Interval, "Interval should be unmarshalled correctly") +} -- GitLab