Skip to content
Snippets Groups Projects
Verified Commit daa40844 authored by Volker Schukai's avatar Volker Schukai :alien:
Browse files

fix: optimize save #52

parent db62cbb4
No related branches found
No related tags found
No related merge requests found
......@@ -18,11 +18,10 @@ func (s *JobSyncer) CheckAndSaveOrUpdate(job GenericJob) error {
return ErrNoDatabaseConnection
}
permJob := job.GetPersistence()
db := s.manager.database
var existing JobPersistence
result := db.Where("id = ?", permJob.ID).First(&existing)
result := db.Where("id = ?", job.GetID()).First(&existing)
if result.Error == nil {
return updateJob(job, db)
......@@ -181,18 +180,28 @@ func save(job *JobPersistence, db *gorm.DB) error {
return db.Transaction(func(tx *gorm.DB) error {
var existingJob JobPersistence
if err := tx.First(&existingJob, "id = ?", job.ID).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
if err := tx.Create(job).Error; err != nil {
return ErrFailedToCreate
}
} else {
return ErrFailedToQueryExistingJob
}
} else {
if err := tx.Save(job).Error; err != nil {
return err
return ErrFailedToSaveJob
}
}
if job.Stats != (JobStats{}) {
job.Stats.JobID = job.ID
if job.Stats.RunCount == 0 {
Info("Stats runCount is 0, skipping update")
}
if err := tx.Save(&job.Stats).Error; err != nil {
if err := tx.Model(job.Stats).
Select("*").
Omit("job_id", "created_at").
Updates(job.Stats).Error; err != nil {
return err
}
}
......
......@@ -9,6 +9,7 @@
blackbox-terminal
coreutils-full
dbeaver
glibc.static
dbeaver
delve
dialog
......
......@@ -43,7 +43,7 @@ var (
ErrSchedulerNotSet = fmt.Errorf("scheduler is not set")
ErrJobNotActive = fmt.Errorf("job is not active")
ErrJobAlreadyActive = fmt.Errorf("job is already active")
ErrChannelAlreadyClosed = fmt.Errorf("channel is already closed")
ErrChannelAlreadyClosed = fmt.Errorf("the channel is already closed")
ErrUnknownScheduleType = fmt.Errorf("unknown schedule type")
ErrNoManager = fmt.Errorf("no manager")
ErrCannotLoadStatsFromDatabase = fmt.Errorf("errors while loading stats from database")
......@@ -54,4 +54,7 @@ var (
ErrJobSyncerNotRunning = fmt.Errorf("JobSyncer is not running")
ErrMaxRetriesReached = fmt.Errorf("maximum number of retries reached")
ErrTimeoutReached = fmt.Errorf("timeout reached")
ErrFailedToCreate = fmt.Errorf("failed to create")
ErrFailedToQueryExistingJob = fmt.Errorf("failed to query an existing job")
ErrFailedToSaveJob = fmt.Errorf("failed to save a job")
)
......@@ -10,6 +10,7 @@ import (
type GenericJob interface {
GetID() JobID
GetDependencies() []JobID
GetPriority() Priority
......@@ -31,6 +32,7 @@ type GenericJob interface {
GetScheduler() Scheduler
Pause()
PauseUntil(until time.Time)
Resume()
......
......@@ -66,7 +66,7 @@ func (js *JobSyncer) Sync(job GenericJob) {
err := js.CheckAndSaveOrUpdate(job)
if err != nil {
Error("Error while creating or updating job", err)
Error("Error while creating or updating the job", err)
}
}()
......
......@@ -95,7 +95,7 @@ func (q *Queue) Enqueue(job GenericJob) error {
currentReadyJobIDs[job.GetID()] = struct{}{}
}
fullJobList := []GenericJob{}
var fullJobList []GenericJob
for _, job := range readyJobList {
fullJobList = append(fullJobList, job)
}
......@@ -182,7 +182,6 @@ func (q *Queue) Dequeue() (GenericJob, error) {
ProcessedTime: time.Now(),
ID: job.GetID(),
})
return job, nil
}
......@@ -194,6 +193,5 @@ func removeJobID(deps []JobID, id JobID) []JobID {
return deps[:len(deps)-1]
}
}
return deps
}
......@@ -30,6 +30,5 @@ func (r RunResult[T]) GetStatus() ResultStatus {
type Runnable[T any] interface {
Run(ctx context.Context) (RunResult[T], error)
GetType() string
GetPersistence() RunnableImport
}
......@@ -101,7 +101,6 @@ func (s *TimeScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
......
......@@ -17,24 +17,22 @@ type Scheduler interface {
Cancel(id JobID) error
CancelAll() error
JobExists(id JobID) bool
GetType() string
IsAdHoc() bool
GetPersistence() SchedulerPersistence
}
type SchedulerPersistence struct {
Type string `yaml:"type" json:"type" gorm:"column:type"`
Time *time.Time `yaml:"time,omitempty" json:"time,omitempty" gorm:"column:time"`
Interval time.Duration `yaml:"interval,omitempty" json:"interval,omitempty" gorm:"column:interval"`
Spec string `yaml:"spec,omitempty" json:"spec,omitempty" gorm:"column:spec"`
Delay time.Duration `yaml:"delay,omitempty" json:"delay,omitempty" gorm:"column:delay"`
EventFlags fsnotify.Op `yaml:"eventFlags,omitempty" json:"eventFlags,omitempty" gorm:"column:eventFlags"`
Type string `yaml:"type" json:"type" gorm:"column:type"`
Spec string `yaml:"spec,omitempty" json:"spec,omitempty" gorm:"column:spec"`
Event EventName `yaml:"event,omitempty" json:"event,omitempty" gorm:"column:event"`
Time *time.Time `yaml:"time,omitempty" json:"time,omitempty" gorm:"column:time"`
Executed bool `yaml:"executed,omitempty" json:"executed,omitempty" gorm:"column:executed"`
Path string `yaml:"path,omitempty" json:"path,omitempty" gorm:"column:path"`
EventFlags fsnotify.Op `yaml:"eventFlags,omitempty" json:"eventFlags,omitempty" gorm:"column:eventFlags"`
}
type scheduleImportStruct struct {
......@@ -112,15 +110,11 @@ func (sp *SchedulerPersistence) parseAndAssignFields(aux scheduleImportStruct) e
// UnmarshalJSON implements the json.Unmarshaler interface
func (sp *SchedulerPersistence) UnmarshalJSON(data []byte) error {
var aux scheduleImportStruct
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
return sp.parseAndAssignFields(aux)
}
func (sp *SchedulerPersistence) UnmarshalYAML(unmarshal func(interface{}) error) error {
......@@ -128,7 +122,5 @@ func (sp *SchedulerPersistence) UnmarshalYAML(unmarshal func(interface{}) error)
if err := unmarshal(&aux); err != nil {
return err
}
return sp.parseAndAssignFields(aux)
}
......@@ -37,7 +37,7 @@ func TestIntervalScheduler_BasicFunctionality(t *testing.T) {
time.Sleep(time.Millisecond * 500)
if atomic.LoadInt32(&count) < 4 {
t.Errorf("Expected to run at least 4 times, ran %d times", count)
t.Errorf("Expected to run at least four times, ran %d times", count)
}
}
......@@ -175,7 +175,7 @@ func TestDelayScheduler_BasicFunctionality(t *testing.T) {
time.Sleep(time.Millisecond * 200)
if atomic.LoadInt32(&count) != 1 {
t.Errorf("Expected to run 1 time, ran %d times", count)
t.Errorf("Expected to run one time, ran %d times", count)
}
}
......@@ -432,3 +432,19 @@ time: "2023-12-15T12:00:00Z"
assert.Equal(t, expectedInterval, sp.Interval, "Interval should be unmarshalled correctly")
assert.Equal(t, &expectedTime, sp.Time, "Time should be unmarshalled correctly")
}
func TestUnmarshalSchedulerPersistenceIntervalYAML(t *testing.T) {
yamlData := `
type: Interval
interval: "1m1s"
`
var sp SchedulerPersistence
err := yaml.Unmarshal([]byte(yamlData), &sp)
assert.Nil(t, err, "Unmarshalling should not produce an error")
expectedInterval, _ := time.ParseDuration("1m")
assert.Equal(t, "interval", sp.Type, "Type should be unmarshalled correctly")
assert.Equal(t, expectedInterval, sp.Interval, "Interval should be unmarshalled correctly")
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment