Skip to content
Snippets Groups Projects
Verified Commit 7d668250 authored by Volker Schukai's avatar Volker Schukai :alien:
Browse files

fix: update database and persitence #26

parent d00fb6fa
No related branches found
No related tags found
No related merge requests found
...@@ -98,7 +98,8 @@ func (s *DBSaver) Start() error { ...@@ -98,7 +98,8 @@ func (s *DBSaver) Start() error {
// this runs after the function returns // this runs after the function returns
// and needs to be protected by the lock // and needs to be protected by the lock
// of the setStatus method // of the setStatus method
s.setStatus(DBSaverStatusStopped) s.status = DBSaverStatusStopped
//s.setStatus(DBSaverStatusStopped)
}() }()
for { for {
...@@ -125,6 +126,26 @@ func (s *DBSaver) Start() error { ...@@ -125,6 +126,26 @@ func (s *DBSaver) Start() error {
return result.Error return result.Error
} }
} else { } else {
tx.Model(&existingJob.Scheduler).Select(
[]string{
"type",
"interval",
"spec",
"delay",
"event",
"time",
"executed",
}).UpdateColumns(SchedulerPersistence{
Type: "",
Interval: 0,
Spec: "",
Delay: 0,
Event: "",
Time: nil,
Executed: false,
})
err := tx.Model(&existingJob).Updates(permJob).Error err := tx.Model(&existingJob).Updates(permJob).Error
if err != nil { if err != nil {
return err return err
...@@ -186,7 +207,12 @@ func (s *DBSaver) Stop() *DBSaver { ...@@ -186,7 +207,12 @@ func (s *DBSaver) Stop() *DBSaver {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
s.stopChan <- struct{}{} select {
case s.stopChan <- struct{}{}:
default:
s.logError("DBSaver stop channel is full")
}
return s return s
} }
...@@ -209,7 +235,15 @@ func (s *DBSaver) SaveJob(job GenericJob) error { ...@@ -209,7 +235,15 @@ func (s *DBSaver) SaveJob(job GenericJob) error {
} }
}() }()
s.saveChannel <- job select {
case s.saveChannel <- job:
default:
// if the channel is full, we just drop the job
// this is not ideal, but better than blocking
// the job queue
s.logError("DBSaver channel is full, dropping job with ID", "job_id", job.GetID())
}
return nil return nil
} }
...@@ -307,6 +341,11 @@ func (s *DBSaver) ResetStats(job GenericJob) error { ...@@ -307,6 +341,11 @@ func (s *DBSaver) ResetStats(job GenericJob) error {
job.ResetStats() job.ResetStats()
s.saveChannel <- job select {
case s.saveChannel <- job:
default:
s.logError("DBSaver channel is full, dropping job with ID", "job_id", job.GetID())
}
return nil return nil
} }
...@@ -50,13 +50,6 @@ func startTestMySQLDockerImageAndContainer(t *testing.T, port string, ctx contex ...@@ -50,13 +50,6 @@ func startTestMySQLDockerImageAndContainer(t *testing.T, port string, ctx contex
HostPort: port, HostPort: port,
}, },
}, },
// if you want to test the web interface, uncomment the following lines
//"8025/tcp": []nat.PortBinding{
// {
// HostIP: DOCKER_TEST_HOST_IP,
// HostPort: "8025",
// },
//},
}, },
} }
...@@ -122,7 +115,7 @@ func TestWriteToDB(t *testing.T) { ...@@ -122,7 +115,7 @@ func TestWriteToDB(t *testing.T) {
useMySQLPort := os.Getenv("MYSQL_PORT") useMySQLPort := os.Getenv("MYSQL_PORT")
//useMySQLPort = "3306" //useMySQLPort = "3306"
printLogging := os.Getenv("MYSQL_LOGGING") printLogging := os.Getenv("MYSQL_LOGGING")
//printLogging = "true" printLogging = "true"
var err error var err error
...@@ -223,9 +216,55 @@ func TestWriteToDB(t *testing.T) { ...@@ -223,9 +216,55 @@ func TestWriteToDB(t *testing.T) {
var wg sync.WaitGroup var wg sync.WaitGroup
// run sub tests // run sub tests
t.Run("TestUpdateToDB", func(t *testing.T) {
wg.Add(1) wg.Add(1)
t.Run("TestWriteToDB", func(t *testing.T) { defer wg.Done()
mgr := NewManager()
mgr.SetDB(db)
//worker := NewLocalWorker(1)
//err := mgr.AddWorker(worker)
//assert.Nil(t, err)
//
//err = mgr.Start()
//assert.Nil(t, err)
dbSaver := NewDBSaver()
dbSaver.SetManager(mgr)
err = dbSaver.Start()
assert.Nil(t, err)
runner := &CounterRunnable{}
job := NewJob[CounterResult]("job1", runner)
scheduler := &TimeScheduler{
Time: time.Now().Add(1 * time.Second),
jobs: nil,
}
job.scheduler = scheduler
err = dbSaver.SaveJob(job)
assert.Nil(t, err)
//err = mgr.ScheduleJob(job, scheduler)
//assert.Nil(t, err)
err := dbSaver.SaveJob(job)
assert.Nil(t, err)
time.Sleep(1 * time.Second)
dbSaver.Stop()
})
// run sub tests
t.Run("TestAddToDB", func(t *testing.T) {
wg.Add(1)
defer wg.Done() defer wg.Done()
mgr := NewManager() mgr := NewManager()
...@@ -323,8 +362,8 @@ func TestWriteToDB(t *testing.T) { ...@@ -323,8 +362,8 @@ func TestWriteToDB(t *testing.T) {
}) })
wg.Add(1)
t.Run("TestDeleteJob", func(t *testing.T) { t.Run("TestDeleteJob", func(t *testing.T) {
wg.Add(1)
defer wg.Done() defer wg.Done()
mgr := NewManager() mgr := NewManager()
...@@ -419,38 +458,6 @@ func TestWriteToDB(t *testing.T) { ...@@ -419,38 +458,6 @@ func TestWriteToDB(t *testing.T) {
}) })
wg.Wait() wg.Wait()
//var jobPersistence JobPersistence
//var jobStats JobStats
//var jobLogs []JobLog // Assuming JobLog is your log model
//
//// Query JobPersistence
//if err := db.First(&jobPersistence, "id = ?", "job1").Error; err != nil {
// t.Errorf("Failed to query JobPersistence: %v", err)
//} else {
// // Validate the fields
// assert.Equal(t, JobID("job1"), jobPersistence.ID)
//}
//
//// Query JobStats
//if err := db.First(&jobStats, "job_id = ?", jobPersistence.ID).Error; err != nil {
// t.Errorf("Failed to query JobStats: %v", err)
//} else {
// // Validate the fields
// assert.Equal(t, jobPersistence.ID, jobStats.JobID)
//}
//
//// Query JobLogs
//if err := db.Find(&jobLogs, "job_id = ?", jobPersistence.ID).Error; err != nil {
// t.Errorf("Failed to query JobLogs: %v", err)
//} else {
// assert.NotEmpty(t, jobLogs)
//
// for _, l := range jobLogs {
// assert.Equal(t, jobPersistence.ID, l.JobID)
// }
//}
cancel() cancel()
select { select {
......
...@@ -44,4 +44,5 @@ var ( ...@@ -44,4 +44,5 @@ var (
ErrUnknownScheduleType = fmt.Errorf("unknown schedule type") ErrUnknownScheduleType = fmt.Errorf("unknown schedule type")
ErrNoManager = fmt.Errorf("no manager") ErrNoManager = fmt.Errorf("no manager")
ErrCannotLoadStatsFromDatabase = fmt.Errorf("errors while loading stats from database") ErrCannotLoadStatsFromDatabase = fmt.Errorf("errors while loading stats from database")
ErrInvalidTime = fmt.Errorf("invalid time")
) )
...@@ -27,6 +27,8 @@ type SchedulerPersistence struct { ...@@ -27,6 +27,8 @@ type SchedulerPersistence struct {
Spec string `yaml:"spec,omitempty" json:"spec,omitempty" gorm:"column:spec"` Spec string `yaml:"spec,omitempty" json:"spec,omitempty" gorm:"column:spec"`
Delay time.Duration `yaml:"delay,omitempty" json:"delay,omitempty" gorm:"column:delay"` Delay time.Duration `yaml:"delay,omitempty" json:"delay,omitempty" gorm:"column:delay"`
Event EventName `yaml:"event,omitempty" json:"event,omitempty" gorm:"column:event"` Event EventName `yaml:"event,omitempty" json:"event,omitempty" gorm:"column:event"`
Time *time.Time `yaml:"time,omitempty" json:"time,omitempty" gorm:"column:time"`
Executed bool `yaml:"executed,omitempty" json:"executed,omitempty" gorm:"column:executed"`
} }
// IntervalScheduler is a scheduler that schedules a job at a fixed interval // IntervalScheduler is a scheduler that schedules a job at a fixed interval
...@@ -87,7 +89,11 @@ func (s *IntervalScheduler) Cancel(id JobID) error { ...@@ -87,7 +89,11 @@ func (s *IntervalScheduler) Cancel(id JobID) error {
} }
if stopChan, ok := s.jobs[id]; ok { if stopChan, ok := s.jobs[id]; ok {
stopChan <- true select {
case stopChan <- true:
default:
}
delete(s.jobs, id) delete(s.jobs, id)
} }
...@@ -100,7 +106,11 @@ func (s *IntervalScheduler) CancelAll() error { ...@@ -100,7 +106,11 @@ func (s *IntervalScheduler) CancelAll() error {
} }
for _, stopChan := range s.jobs { for _, stopChan := range s.jobs {
stopChan <- true
select {
case stopChan <- true:
default:
}
} }
s.jobs = nil s.jobs = nil
...@@ -218,7 +228,6 @@ type DelayScheduler struct { ...@@ -218,7 +228,6 @@ type DelayScheduler struct {
} }
func (s *DelayScheduler) Schedule(job GenericJob, eventBus *EventBus) error { func (s *DelayScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
timer := time.NewTimer(s.Delay)
if s.jobs == nil { if s.jobs == nil {
s.jobs = make(map[JobID]StopChan) s.jobs = make(map[JobID]StopChan)
...@@ -232,6 +241,8 @@ func (s *DelayScheduler) Schedule(job GenericJob, eventBus *EventBus) error { ...@@ -232,6 +241,8 @@ func (s *DelayScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
stopChan := make(StopChan) stopChan := make(StopChan)
s.jobs[id] = stopChan s.jobs[id] = stopChan
timer := time.NewTimer(s.Delay)
go func() { go func() {
select { select {
case <-timer.C: case <-timer.C:
...@@ -240,8 +251,10 @@ func (s *DelayScheduler) Schedule(job GenericJob, eventBus *EventBus) error { ...@@ -240,8 +251,10 @@ func (s *DelayScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
} }
case <-stopChan: case <-stopChan:
timer.Stop() timer.Stop()
return
} }
}() }()
return nil return nil
} }
...@@ -259,7 +272,10 @@ func (s *DelayScheduler) Cancel(id JobID) error { ...@@ -259,7 +272,10 @@ func (s *DelayScheduler) Cancel(id JobID) error {
} }
if stopChan, ok := s.jobs[id]; ok { if stopChan, ok := s.jobs[id]; ok {
stopChan <- true select {
case stopChan <- true:
default:
}
delete(s.jobs, id) delete(s.jobs, id)
} }
...@@ -272,7 +288,10 @@ func (s *DelayScheduler) CancelAll() error { ...@@ -272,7 +288,10 @@ func (s *DelayScheduler) CancelAll() error {
} }
for _, stopChan := range s.jobs { for _, stopChan := range s.jobs {
stopChan <- true select {
case stopChan <- true:
default:
}
} }
s.jobs = nil s.jobs = nil
...@@ -295,6 +314,112 @@ func (s *DelayScheduler) GetPersistence() SchedulerPersistence { ...@@ -295,6 +314,112 @@ func (s *DelayScheduler) GetPersistence() SchedulerPersistence {
} }
} }
// TimeScheduler is a scheduler that schedules at a specific time
type TimeScheduler struct {
Time time.Time
jobs map[JobID]StopChan
executed bool
}
func (s *TimeScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
if s.executed {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, job.GetID())
}
if s.Time.Before(time.Now()) {
return fmt.Errorf("%w: scheduled time is in the past", ErrInvalidTime)
}
if s.jobs == nil {
s.jobs = make(map[JobID]StopChan)
}
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
stopChan := make(StopChan)
s.jobs[id] = stopChan
timer := time.NewTimer(s.Time.Sub(time.Now()))
go func() {
select {
case <-timer.C:
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
s.executed = true
} else {
timer.Stop()
stopChan <- true
}
case <-stopChan:
timer.Stop()
return
}
}()
return nil
}
func (s *TimeScheduler) GetType() string {
return "Time"
}
func (s *TimeScheduler) IsAdHoc() bool {
return false
}
func (s *TimeScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if stopChan, ok := s.jobs[id]; ok {
select {
case stopChan <- true:
default:
}
delete(s.jobs, id)
}
return nil
}
func (s *TimeScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, stopChan := range s.jobs {
select {
case stopChan <- true:
default:
}
}
s.jobs = nil
return nil
}
func (s *TimeScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
func (s *TimeScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
Time: &s.Time,
Executed: s.executed,
}
}
// EventScheduler is a scheduler that schedules a job when an event is received // EventScheduler is a scheduler that schedules a job when an event is received
type EventScheduler struct { type EventScheduler struct {
Event EventName Event EventName
...@@ -353,7 +478,10 @@ func (s *EventScheduler) Cancel(id JobID) error { ...@@ -353,7 +478,10 @@ func (s *EventScheduler) Cancel(id JobID) error {
} }
if stopChan, ok := s.jobs[id]; ok { if stopChan, ok := s.jobs[id]; ok {
stopChan <- true select {
case stopChan <- true:
default:
}
delete(s.jobs, id) delete(s.jobs, id)
} }
...@@ -367,7 +495,10 @@ func (s *EventScheduler) CancelAll() error { ...@@ -367,7 +495,10 @@ func (s *EventScheduler) CancelAll() error {
} }
for _, stopChan := range s.jobs { for _, stopChan := range s.jobs {
stopChan <- true select {
case stopChan <- true:
default:
}
} }
s.jobs = nil s.jobs = nil
......
...@@ -261,3 +261,34 @@ func TestEventScheduler_BasicFunctionality(t *testing.T) { ...@@ -261,3 +261,34 @@ func TestEventScheduler_BasicFunctionality(t *testing.T) {
t.Errorf("Expected to run 1 time, ran %d times", count) t.Errorf("Expected to run 1 time, ran %d times", count)
} }
} }
func TestTimeScheduler_BasicFunctionality(t *testing.T) {
var count int32
eventBus := NewEventBus()
timeScheduler := TimeScheduler{Time: time.Now().Add(time.Second * 1)}
job := NewJob[DummyResult]("test-job", &DummyRunnable{})
genericJob := GenericJob(job)
_ = timeScheduler.Schedule(genericJob, eventBus)
jobChannel := make(chan interface{})
go func() {
for _ = range jobChannel {
atomic.AddInt32(&count, 1)
}
}()
eventBus.Subscribe(QueueJob, jobChannel)
time.Sleep(time.Second * 2)
timeScheduler.Cancel(job.GetID())
time.Sleep(time.Millisecond * 100)
if atomic.LoadInt32(&count) != 1 {
t.Errorf("Expected to not run, ran %d times", count)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment