From 7d668250ca4b8c1b68b3660c2d9fb4294a85298e Mon Sep 17 00:00:00 2001 From: Volker Schukai <volker.schukai@schukai.com> Date: Tue, 14 Nov 2023 22:22:27 +0100 Subject: [PATCH] fix: update database and persitence #26 --- database.go | 47 +++++++++++++-- database_test.go | 93 +++++++++++++++-------------- errors.go | 1 + scheduler.go | 145 +++++++++++++++++++++++++++++++++++++++++++--- scheduler_test.go | 31 ++++++++++ 5 files changed, 263 insertions(+), 54 deletions(-) diff --git a/database.go b/database.go index 4de0bda..707c62c 100644 --- a/database.go +++ b/database.go @@ -98,7 +98,8 @@ func (s *DBSaver) Start() error { // this runs after the function returns // and needs to be protected by the lock // of the setStatus method - s.setStatus(DBSaverStatusStopped) + s.status = DBSaverStatusStopped + //s.setStatus(DBSaverStatusStopped) }() for { @@ -125,6 +126,26 @@ func (s *DBSaver) Start() error { return result.Error } } else { + + tx.Model(&existingJob.Scheduler).Select( + []string{ + "type", + "interval", + "spec", + "delay", + "event", + "time", + "executed", + }).UpdateColumns(SchedulerPersistence{ + Type: "", + Interval: 0, + Spec: "", + Delay: 0, + Event: "", + Time: nil, + Executed: false, + }) + err := tx.Model(&existingJob).Updates(permJob).Error if err != nil { return err @@ -186,7 +207,12 @@ func (s *DBSaver) Stop() *DBSaver { s.mu.Lock() defer s.mu.Unlock() - s.stopChan <- struct{}{} + select { + case s.stopChan <- struct{}{}: + default: + s.logError("DBSaver stop channel is full") + } + return s } @@ -209,7 +235,15 @@ func (s *DBSaver) SaveJob(job GenericJob) error { } }() - s.saveChannel <- job + select { + case s.saveChannel <- job: + default: + // if the channel is full, we just drop the job + // this is not ideal, but better than blocking + // the job queue + s.logError("DBSaver channel is full, dropping job with ID", "job_id", job.GetID()) + } + return nil } @@ -307,6 +341,11 @@ func (s *DBSaver) ResetStats(job GenericJob) error { job.ResetStats() - s.saveChannel <- job + select { + case s.saveChannel <- job: + default: + s.logError("DBSaver channel is full, dropping job with ID", "job_id", job.GetID()) + } + return nil } diff --git a/database_test.go b/database_test.go index 2c3904d..b1b2a74 100644 --- a/database_test.go +++ b/database_test.go @@ -50,13 +50,6 @@ func startTestMySQLDockerImageAndContainer(t *testing.T, port string, ctx contex HostPort: port, }, }, - // if you want to test the web interface, uncomment the following lines - //"8025/tcp": []nat.PortBinding{ - // { - // HostIP: DOCKER_TEST_HOST_IP, - // HostPort: "8025", - // }, - //}, }, } @@ -122,7 +115,7 @@ func TestWriteToDB(t *testing.T) { useMySQLPort := os.Getenv("MYSQL_PORT") //useMySQLPort = "3306" printLogging := os.Getenv("MYSQL_LOGGING") - //printLogging = "true" + printLogging = "true" var err error @@ -223,9 +216,55 @@ func TestWriteToDB(t *testing.T) { var wg sync.WaitGroup // run sub tests - wg.Add(1) - t.Run("TestWriteToDB", func(t *testing.T) { + t.Run("TestUpdateToDB", func(t *testing.T) { + wg.Add(1) + defer wg.Done() + + mgr := NewManager() + mgr.SetDB(db) + //worker := NewLocalWorker(1) + //err := mgr.AddWorker(worker) + //assert.Nil(t, err) + // + //err = mgr.Start() + //assert.Nil(t, err) + + dbSaver := NewDBSaver() + dbSaver.SetManager(mgr) + + err = dbSaver.Start() + assert.Nil(t, err) + + runner := &CounterRunnable{} + job := NewJob[CounterResult]("job1", runner) + + scheduler := &TimeScheduler{ + Time: time.Now().Add(1 * time.Second), + jobs: nil, + } + + job.scheduler = scheduler + + err = dbSaver.SaveJob(job) + assert.Nil(t, err) + + //err = mgr.ScheduleJob(job, scheduler) + //assert.Nil(t, err) + + err := dbSaver.SaveJob(job) + assert.Nil(t, err) + + time.Sleep(1 * time.Second) + + dbSaver.Stop() + + }) + + // run sub tests + + t.Run("TestAddToDB", func(t *testing.T) { + wg.Add(1) defer wg.Done() mgr := NewManager() @@ -323,8 +362,8 @@ func TestWriteToDB(t *testing.T) { }) - wg.Add(1) t.Run("TestDeleteJob", func(t *testing.T) { + wg.Add(1) defer wg.Done() mgr := NewManager() @@ -419,38 +458,6 @@ func TestWriteToDB(t *testing.T) { }) wg.Wait() - - //var jobPersistence JobPersistence - //var jobStats JobStats - //var jobLogs []JobLog // Assuming JobLog is your log model - // - //// Query JobPersistence - //if err := db.First(&jobPersistence, "id = ?", "job1").Error; err != nil { - // t.Errorf("Failed to query JobPersistence: %v", err) - //} else { - // // Validate the fields - // assert.Equal(t, JobID("job1"), jobPersistence.ID) - //} - // - //// Query JobStats - //if err := db.First(&jobStats, "job_id = ?", jobPersistence.ID).Error; err != nil { - // t.Errorf("Failed to query JobStats: %v", err) - //} else { - // // Validate the fields - // assert.Equal(t, jobPersistence.ID, jobStats.JobID) - //} - // - //// Query JobLogs - //if err := db.Find(&jobLogs, "job_id = ?", jobPersistence.ID).Error; err != nil { - // t.Errorf("Failed to query JobLogs: %v", err) - //} else { - // assert.NotEmpty(t, jobLogs) - // - // for _, l := range jobLogs { - // assert.Equal(t, jobPersistence.ID, l.JobID) - // } - //} - cancel() select { diff --git a/errors.go b/errors.go index 69b2f3e..f42af5e 100644 --- a/errors.go +++ b/errors.go @@ -44,4 +44,5 @@ var ( ErrUnknownScheduleType = fmt.Errorf("unknown schedule type") ErrNoManager = fmt.Errorf("no manager") ErrCannotLoadStatsFromDatabase = fmt.Errorf("errors while loading stats from database") + ErrInvalidTime = fmt.Errorf("invalid time") ) diff --git a/scheduler.go b/scheduler.go index 5fce6f1..7bce73c 100644 --- a/scheduler.go +++ b/scheduler.go @@ -27,6 +27,8 @@ type SchedulerPersistence struct { Spec string `yaml:"spec,omitempty" json:"spec,omitempty" gorm:"column:spec"` Delay time.Duration `yaml:"delay,omitempty" json:"delay,omitempty" gorm:"column:delay"` Event EventName `yaml:"event,omitempty" json:"event,omitempty" gorm:"column:event"` + Time *time.Time `yaml:"time,omitempty" json:"time,omitempty" gorm:"column:time"` + Executed bool `yaml:"executed,omitempty" json:"executed,omitempty" gorm:"column:executed"` } // IntervalScheduler is a scheduler that schedules a job at a fixed interval @@ -87,7 +89,11 @@ func (s *IntervalScheduler) Cancel(id JobID) error { } if stopChan, ok := s.jobs[id]; ok { - stopChan <- true + select { + case stopChan <- true: + default: + } + delete(s.jobs, id) } @@ -100,7 +106,11 @@ func (s *IntervalScheduler) CancelAll() error { } for _, stopChan := range s.jobs { - stopChan <- true + + select { + case stopChan <- true: + default: + } } s.jobs = nil @@ -218,7 +228,6 @@ type DelayScheduler struct { } func (s *DelayScheduler) Schedule(job GenericJob, eventBus *EventBus) error { - timer := time.NewTimer(s.Delay) if s.jobs == nil { s.jobs = make(map[JobID]StopChan) @@ -232,6 +241,8 @@ func (s *DelayScheduler) Schedule(job GenericJob, eventBus *EventBus) error { stopChan := make(StopChan) s.jobs[id] = stopChan + timer := time.NewTimer(s.Delay) + go func() { select { case <-timer.C: @@ -240,8 +251,10 @@ func (s *DelayScheduler) Schedule(job GenericJob, eventBus *EventBus) error { } case <-stopChan: timer.Stop() + return } }() + return nil } @@ -259,7 +272,10 @@ func (s *DelayScheduler) Cancel(id JobID) error { } if stopChan, ok := s.jobs[id]; ok { - stopChan <- true + select { + case stopChan <- true: + default: + } delete(s.jobs, id) } @@ -272,7 +288,10 @@ func (s *DelayScheduler) CancelAll() error { } for _, stopChan := range s.jobs { - stopChan <- true + select { + case stopChan <- true: + default: + } } s.jobs = nil @@ -295,6 +314,112 @@ func (s *DelayScheduler) GetPersistence() SchedulerPersistence { } } +// TimeScheduler is a scheduler that schedules at a specific time +type TimeScheduler struct { + Time time.Time + jobs map[JobID]StopChan + executed bool +} + +func (s *TimeScheduler) Schedule(job GenericJob, eventBus *EventBus) error { + if s.executed { + return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, job.GetID()) + } + + if s.Time.Before(time.Now()) { + return fmt.Errorf("%w: scheduled time is in the past", ErrInvalidTime) + } + + if s.jobs == nil { + s.jobs = make(map[JobID]StopChan) + } + + id := job.GetID() + if _, ok := s.jobs[id]; ok { + return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id) + } + + stopChan := make(StopChan) + s.jobs[id] = stopChan + + timer := time.NewTimer(s.Time.Sub(time.Now())) + + go func() { + select { + case <-timer.C: + if !job.IsPaused() { + eventBus.Publish(QueueJob, job) + s.executed = true + } else { + timer.Stop() + stopChan <- true + } + case <-stopChan: + timer.Stop() + return + } + }() + + return nil +} + +func (s *TimeScheduler) GetType() string { + return "Time" +} + +func (s *TimeScheduler) IsAdHoc() bool { + return false +} + +func (s *TimeScheduler) Cancel(id JobID) error { + if s.jobs == nil { + return nil + } + + if stopChan, ok := s.jobs[id]; ok { + select { + case stopChan <- true: + default: + } + delete(s.jobs, id) + } + + return nil +} + +func (s *TimeScheduler) CancelAll() error { + if s.jobs == nil { + return nil + } + + for _, stopChan := range s.jobs { + select { + case stopChan <- true: + default: + } + } + + s.jobs = nil + return nil +} + +func (s *TimeScheduler) JobExists(id JobID) bool { + if s.jobs == nil { + return false + } + + _, ok := s.jobs[id] + return ok +} + +func (s *TimeScheduler) GetPersistence() SchedulerPersistence { + return SchedulerPersistence{ + Type: s.GetType(), + Time: &s.Time, + Executed: s.executed, + } +} + // EventScheduler is a scheduler that schedules a job when an event is received type EventScheduler struct { Event EventName @@ -353,7 +478,10 @@ func (s *EventScheduler) Cancel(id JobID) error { } if stopChan, ok := s.jobs[id]; ok { - stopChan <- true + select { + case stopChan <- true: + default: + } delete(s.jobs, id) } @@ -367,7 +495,10 @@ func (s *EventScheduler) CancelAll() error { } for _, stopChan := range s.jobs { - stopChan <- true + select { + case stopChan <- true: + default: + } } s.jobs = nil diff --git a/scheduler_test.go b/scheduler_test.go index c2ce7f5..167931e 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -261,3 +261,34 @@ func TestEventScheduler_BasicFunctionality(t *testing.T) { t.Errorf("Expected to run 1 time, ran %d times", count) } } + +func TestTimeScheduler_BasicFunctionality(t *testing.T) { + + var count int32 + eventBus := NewEventBus() + timeScheduler := TimeScheduler{Time: time.Now().Add(time.Second * 1)} + + job := NewJob[DummyResult]("test-job", &DummyRunnable{}) + + genericJob := GenericJob(job) + _ = timeScheduler.Schedule(genericJob, eventBus) + + jobChannel := make(chan interface{}) + + go func() { + for _ = range jobChannel { + atomic.AddInt32(&count, 1) + } + }() + + eventBus.Subscribe(QueueJob, jobChannel) + + time.Sleep(time.Second * 2) + timeScheduler.Cancel(job.GetID()) + time.Sleep(time.Millisecond * 100) + + if atomic.LoadInt32(&count) != 1 { + t.Errorf("Expected to not run, ran %d times", count) + } + +} -- GitLab