diff --git a/database.go b/database.go index e8591df9bb641a1d579f92a4462a20304dc8545a..170cc06065edc5b695d3aca99c345ca3db4cd098 100644 --- a/database.go +++ b/database.go @@ -18,11 +18,10 @@ func (s *JobSyncer) CheckAndSaveOrUpdate(job GenericJob) error { return ErrNoDatabaseConnection } - permJob := job.GetPersistence() db := s.manager.database var existing JobPersistence - result := db.Where("id = ?", permJob.ID).First(&existing) + result := db.Where("id = ?", job.GetID()).First(&existing) if result.Error == nil { return updateJob(job, db) @@ -181,18 +180,28 @@ func save(job *JobPersistence, db *gorm.DB) error { return db.Transaction(func(tx *gorm.DB) error { - if err := tx.Save(job).Error; err != nil { - return err + var existingJob JobPersistence + if err := tx.First(&existingJob, "id = ?", job.ID).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + if err := tx.Create(job).Error; err != nil { + return ErrFailedToCreate + } + } else { + return ErrFailedToQueryExistingJob + } + } else { + if err := tx.Save(job).Error; err != nil { + return ErrFailedToSaveJob + } } if job.Stats != (JobStats{}) { job.Stats.JobID = job.ID - if job.Stats.RunCount == 0 { - Info("Stats runCount is 0, skipping update") - } - - if err := tx.Save(&job.Stats).Error; err != nil { + if err := tx.Model(job.Stats). + Select("*"). + Omit("job_id", "created_at"). + Updates(job.Stats).Error; err != nil { return err } } diff --git a/devenv.nix b/devenv.nix index 31e72c236ace18fdfe811e66eb6bb89f8365d6a4..fc622ce6fe256d1d82e7f9c9d8a1a80deab4e63c 100644 --- a/devenv.nix +++ b/devenv.nix @@ -9,6 +9,7 @@ blackbox-terminal coreutils-full dbeaver + glibc.static dbeaver delve dialog diff --git a/errors.go b/errors.go index 97a20f12bf51a2beb495d89ad329bb0231d5cbbd..b231eec230fa777aee87ba6537665c48eda2d2ac 100644 --- a/errors.go +++ b/errors.go @@ -43,7 +43,7 @@ var ( ErrSchedulerNotSet = fmt.Errorf("scheduler is not set") ErrJobNotActive = fmt.Errorf("job is not active") ErrJobAlreadyActive = fmt.Errorf("job is already active") - ErrChannelAlreadyClosed = fmt.Errorf("channel is already closed") + ErrChannelAlreadyClosed = fmt.Errorf("the channel is already closed") ErrUnknownScheduleType = fmt.Errorf("unknown schedule type") ErrNoManager = fmt.Errorf("no manager") ErrCannotLoadStatsFromDatabase = fmt.Errorf("errors while loading stats from database") @@ -54,4 +54,7 @@ var ( ErrJobSyncerNotRunning = fmt.Errorf("JobSyncer is not running") ErrMaxRetriesReached = fmt.Errorf("maximum number of retries reached") ErrTimeoutReached = fmt.Errorf("timeout reached") + ErrFailedToCreate = fmt.Errorf("failed to create") + ErrFailedToQueryExistingJob = fmt.Errorf("failed to query an existing job") + ErrFailedToSaveJob = fmt.Errorf("failed to save a job") ) diff --git a/job-generic.go b/job-generic.go index 4c3c9fa92208af30f5d654f9577ae95933edad92..79c5f571731f9690c066fcea902d56293bafd190 100644 --- a/job-generic.go +++ b/job-generic.go @@ -10,6 +10,7 @@ import ( type GenericJob interface { GetID() JobID + GetDependencies() []JobID GetPriority() Priority @@ -31,6 +32,7 @@ type GenericJob interface { GetScheduler() Scheduler Pause() + PauseUntil(until time.Time) Resume() diff --git a/job-syncer.go b/job-syncer.go index 57b453557f4dcd80ed46adb8fad54710714efdd0..250705b54a049f39ccda9848ba9cb2d41ea91994 100644 --- a/job-syncer.go +++ b/job-syncer.go @@ -66,7 +66,7 @@ func (js *JobSyncer) Sync(job GenericJob) { err := js.CheckAndSaveOrUpdate(job) if err != nil { - Error("Error while creating or updating job", err) + Error("Error while creating or updating the job", err) } }() diff --git a/queue.go b/queue.go index a1651123193c2252b2113cc790655fb9ef4b0346..4415df61c7a07619f30b4c9c1b4ecd63f5e2d816 100644 --- a/queue.go +++ b/queue.go @@ -95,7 +95,7 @@ func (q *Queue) Enqueue(job GenericJob) error { currentReadyJobIDs[job.GetID()] = struct{}{} } - fullJobList := []GenericJob{} + var fullJobList []GenericJob for _, job := range readyJobList { fullJobList = append(fullJobList, job) } @@ -122,7 +122,7 @@ func (q *Queue) Enqueue(job GenericJob) error { if q.eventBus != nil && len(q.readyQueue) > 0 { Info("Job ready", "job_id", job.GetID()) - + q.eventBus.Publish(JobReady, nil) } @@ -182,7 +182,6 @@ func (q *Queue) Dequeue() (GenericJob, error) { ProcessedTime: time.Now(), ID: job.GetID(), }) - return job, nil } @@ -194,6 +193,5 @@ func removeJobID(deps []JobID, id JobID) []JobID { return deps[:len(deps)-1] } } - return deps } diff --git a/runnable.go b/runnable.go index c0bab856c5658fdf30a5c56e3f8f172d7ccd1ba7..b48de027273ba91178dfe57b6e554de5141cb931 100644 --- a/runnable.go +++ b/runnable.go @@ -30,6 +30,5 @@ func (r RunResult[T]) GetStatus() ResultStatus { type Runnable[T any] interface { Run(ctx context.Context) (RunResult[T], error) GetType() string - GetPersistence() RunnableImport } diff --git a/schedule-time.go b/schedule-time.go index f6aa75486eac85181ff937ffa7beb014a51f911c..f04c32d33ee9009a0d4cac20adc1dd1514633cc2 100644 --- a/schedule-time.go +++ b/schedule-time.go @@ -101,7 +101,6 @@ func (s *TimeScheduler) JobExists(id JobID) bool { if s.jobs == nil { return false } - _, ok := s.jobs[id] return ok } diff --git a/scheduler.go b/scheduler.go index 1cf128d9cff3e7bda87b8087e8676b1503f1bcf7..cb1c3fbf9612a280ae420ec4a44b6e2e907ebe03 100644 --- a/scheduler.go +++ b/scheduler.go @@ -17,24 +17,22 @@ type Scheduler interface { Cancel(id JobID) error CancelAll() error JobExists(id JobID) bool - GetType() string - IsAdHoc() bool - GetPersistence() SchedulerPersistence } type SchedulerPersistence struct { - Type string `yaml:"type" json:"type" gorm:"column:type"` + Time *time.Time `yaml:"time,omitempty" json:"time,omitempty" gorm:"column:time"` Interval time.Duration `yaml:"interval,omitempty" json:"interval,omitempty" gorm:"column:interval"` - Spec string `yaml:"spec,omitempty" json:"spec,omitempty" gorm:"column:spec"` Delay time.Duration `yaml:"delay,omitempty" json:"delay,omitempty" gorm:"column:delay"` - Event EventName `yaml:"event,omitempty" json:"event,omitempty" gorm:"column:event"` - Time *time.Time `yaml:"time,omitempty" json:"time,omitempty" gorm:"column:time"` - Executed bool `yaml:"executed,omitempty" json:"executed,omitempty" gorm:"column:executed"` - Path string `yaml:"path,omitempty" json:"path,omitempty" gorm:"column:path"` EventFlags fsnotify.Op `yaml:"eventFlags,omitempty" json:"eventFlags,omitempty" gorm:"column:eventFlags"` + + Type string `yaml:"type" json:"type" gorm:"column:type"` + Spec string `yaml:"spec,omitempty" json:"spec,omitempty" gorm:"column:spec"` + Event EventName `yaml:"event,omitempty" json:"event,omitempty" gorm:"column:event"` + Executed bool `yaml:"executed,omitempty" json:"executed,omitempty" gorm:"column:executed"` + Path string `yaml:"path,omitempty" json:"path,omitempty" gorm:"column:path"` } type scheduleImportStruct struct { @@ -112,15 +110,11 @@ func (sp *SchedulerPersistence) parseAndAssignFields(aux scheduleImportStruct) e // UnmarshalJSON implements the json.Unmarshaler interface func (sp *SchedulerPersistence) UnmarshalJSON(data []byte) error { - var aux scheduleImportStruct - if err := json.Unmarshal(data, &aux); err != nil { return err } - return sp.parseAndAssignFields(aux) - } func (sp *SchedulerPersistence) UnmarshalYAML(unmarshal func(interface{}) error) error { @@ -128,7 +122,5 @@ func (sp *SchedulerPersistence) UnmarshalYAML(unmarshal func(interface{}) error) if err := unmarshal(&aux); err != nil { return err } - return sp.parseAndAssignFields(aux) - } diff --git a/scheduler_test.go b/scheduler_test.go index 40a81dbc71302ef5b12e5eca976d292a5c43cbe7..61edd28eb5da9f52af13981b1da1df67e3a5263e 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -37,7 +37,7 @@ func TestIntervalScheduler_BasicFunctionality(t *testing.T) { time.Sleep(time.Millisecond * 500) if atomic.LoadInt32(&count) < 4 { - t.Errorf("Expected to run at least 4 times, ran %d times", count) + t.Errorf("Expected to run at least four times, ran %d times", count) } } @@ -175,7 +175,7 @@ func TestDelayScheduler_BasicFunctionality(t *testing.T) { time.Sleep(time.Millisecond * 200) if atomic.LoadInt32(&count) != 1 { - t.Errorf("Expected to run 1 time, ran %d times", count) + t.Errorf("Expected to run one time, ran %d times", count) } } @@ -432,3 +432,19 @@ time: "2023-12-15T12:00:00Z" assert.Equal(t, expectedInterval, sp.Interval, "Interval should be unmarshalled correctly") assert.Equal(t, &expectedTime, sp.Time, "Time should be unmarshalled correctly") } + +func TestUnmarshalSchedulerPersistenceIntervalYAML(t *testing.T) { + + yamlData := ` +type: Interval +interval: "1m1s" +` + + var sp SchedulerPersistence + err := yaml.Unmarshal([]byte(yamlData), &sp) + assert.Nil(t, err, "Unmarshalling should not produce an error") + + expectedInterval, _ := time.ParseDuration("1m") + assert.Equal(t, "interval", sp.Type, "Type should be unmarshalled correctly") + assert.Equal(t, expectedInterval, sp.Interval, "Interval should be unmarshalled correctly") +}