Skip to content
Snippets Groups Projects
Verified Commit 35824db2 authored by Volker Schukai's avatar Volker Schukai :alien:
Browse files

chore: update docs and do code tidy #4

parent 00cbbae0
No related branches found
No related tags found
No related merge requests found
...@@ -24,14 +24,7 @@ type DBSaver struct { ...@@ -24,14 +24,7 @@ type DBSaver struct {
type RunnerData string type RunnerData string
type SchedulerData string type SchedulerData string
//type SavableJob interface { // NewDBSaver creates a new DBSaver
// GetLogs() []JobLog
// GetStats() JobStats
// GetID() JobID
//
// //GetPersistence() JobPersistence
//}
func NewDBSaver() *DBSaver { func NewDBSaver() *DBSaver {
return &DBSaver{ return &DBSaver{
saveChannel: make(chan GenericJob, 100), saveChannel: make(chan GenericJob, 100),
...@@ -39,6 +32,7 @@ func NewDBSaver() *DBSaver { ...@@ -39,6 +32,7 @@ func NewDBSaver() *DBSaver {
} }
} }
// SetManager sets the manager of the DBSaver
func (s *DBSaver) SetManager(manager *Manager) *DBSaver { func (s *DBSaver) SetManager(manager *Manager) *DBSaver {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
...@@ -55,6 +49,7 @@ func (s *DBSaver) setStatus(status DBSaverStatus) *DBSaver { ...@@ -55,6 +49,7 @@ func (s *DBSaver) setStatus(status DBSaverStatus) *DBSaver {
return s return s
} }
// isStatus returns true if the DBSaver has the given status
func (s *DBSaver) isStatus(status DBSaverStatus) bool { func (s *DBSaver) isStatus(status DBSaverStatus) bool {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
...@@ -63,6 +58,7 @@ func (s *DBSaver) isStatus(status DBSaverStatus) bool { ...@@ -63,6 +58,7 @@ func (s *DBSaver) isStatus(status DBSaverStatus) bool {
} }
// Start starts the DBSaver
func (s *DBSaver) Start() error { func (s *DBSaver) Start() error {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
...@@ -115,7 +111,6 @@ func (s *DBSaver) Start() error { ...@@ -115,7 +111,6 @@ func (s *DBSaver) Start() error {
tx.Model(&permJob.Stats).Updates(permJob.Stats) tx.Model(&permJob.Stats).Updates(permJob.Stats)
} }
//logs := permJob.GetLogs()
for _, log := range memLogs { for _, log := range memLogs {
log.LogID = 0 log.LogID = 0
_ = tx.Create(&log) _ = tx.Create(&log)
...@@ -138,6 +133,7 @@ func (s *DBSaver) Start() error { ...@@ -138,6 +133,7 @@ func (s *DBSaver) Start() error {
return nil return nil
} }
// logError logs an error
func (s *DBSaver) logError(msg string, keysAndValues ...interface{}) { func (s *DBSaver) logError(msg string, keysAndValues ...interface{}) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
...@@ -149,6 +145,7 @@ func (s *DBSaver) logError(msg string, keysAndValues ...interface{}) { ...@@ -149,6 +145,7 @@ func (s *DBSaver) logError(msg string, keysAndValues ...interface{}) {
s.manager.logger.Error(msg, keysAndValues...) s.manager.logger.Error(msg, keysAndValues...)
} }
// Stop stops the DBSaver
func (s *DBSaver) Stop() *DBSaver { func (s *DBSaver) Stop() *DBSaver {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
...@@ -157,6 +154,7 @@ func (s *DBSaver) Stop() *DBSaver { ...@@ -157,6 +154,7 @@ func (s *DBSaver) Stop() *DBSaver {
return s return s
} }
// SaveJob saves a job to the database
func (s *DBSaver) SaveJob(job GenericJob) error { func (s *DBSaver) SaveJob(job GenericJob) error {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
......
...@@ -3,11 +3,15 @@ module gitlab.schukai.com/oss/libraries/go/services/job-queues.git ...@@ -3,11 +3,15 @@ module gitlab.schukai.com/oss/libraries/go/services/job-queues.git
go 1.20 go 1.20
require ( require (
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/docker/docker v24.0.6+incompatible github.com/docker/docker v24.0.6+incompatible
github.com/docker/go-connections v0.4.0
github.com/google/uuid v1.4.0
github.com/pkg/sftp v1.13.6 github.com/pkg/sftp v1.13.6
github.com/robfig/cron/v3 v3.0.1 github.com/robfig/cron/v3 v3.0.1
github.com/shirou/gopsutil/v3 v3.23.9 github.com/shirou/gopsutil/v3 v3.23.9
github.com/stretchr/testify v1.8.4 github.com/stretchr/testify v1.8.4
go.uber.org/zap v1.26.0
golang.org/x/crypto v0.14.0 golang.org/x/crypto v0.14.0
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
gorm.io/driver/mysql v1.5.2 gorm.io/driver/mysql v1.5.2
...@@ -15,17 +19,14 @@ require ( ...@@ -15,17 +19,14 @@ require (
) )
require ( require (
github.com/DATA-DOG/go-sqlmock v1.5.0 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/distribution/reference v0.5.0 // indirect github.com/distribution/reference v0.5.0 // indirect
github.com/docker/distribution v2.8.3+incompatible // indirect github.com/docker/distribution v2.8.3+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect github.com/go-ole/go-ole v1.3.0 // indirect
github.com/go-sql-driver/mysql v1.7.1 // indirect github.com/go-sql-driver/mysql v1.7.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect github.com/jinzhu/now v1.1.5 // indirect
github.com/kr/fs v0.1.0 // indirect github.com/kr/fs v0.1.0 // indirect
...@@ -42,7 +43,6 @@ require ( ...@@ -42,7 +43,6 @@ require (
github.com/tklauser/numcpus v0.6.1 // indirect github.com/tklauser/numcpus v0.6.1 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect github.com/yusufpapurcu/wmi v1.2.3 // indirect
go.uber.org/multierr v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/mod v0.8.0 // indirect golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.10.0 // indirect golang.org/x/net v0.10.0 // indirect
golang.org/x/sys v0.13.0 // indirect golang.org/x/sys v0.13.0 // indirect
......
...@@ -28,8 +28,8 @@ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ ...@@ -28,8 +28,8 @@ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
...@@ -82,6 +82,7 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec ...@@ -82,6 +82,7 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw= github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw=
github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
......
...@@ -12,6 +12,10 @@ import ( ...@@ -12,6 +12,10 @@ import (
"time" "time"
) )
// TestRoundTrip tests the round trip of jobs from yaml to the manager and back to yaml
// This test runs forever and must be stopped manually
// Use the following command to run this test:
// go test -timeout 6h -run TestRoundTrip -v
func TestRoundTrip(t *testing.T) { func TestRoundTrip(t *testing.T) {
// define test data with jobs in yaml format // define test data with jobs in yaml format
...@@ -80,6 +84,8 @@ func TestRoundTrip(t *testing.T) { ...@@ -80,6 +84,8 @@ func TestRoundTrip(t *testing.T) {
err = ImportJobsAndSchedule(reader, "yaml", manager) err = ImportJobsAndSchedule(reader, "yaml", manager)
assert.Nil(t, err) assert.Nil(t, err)
time.Sleep(10 * time.Hour) for {
time.Sleep(1 * time.Second)
}
} }
...@@ -9,10 +9,12 @@ import ( ...@@ -9,10 +9,12 @@ import (
type JobID string type JobID string
// String returns the string representation of a JobID
func (id JobID) String() string { func (id JobID) String() string {
return string(id) return string(id)
} }
// Priority is the priority of a job
type Priority int type Priority int
const ( const (
...@@ -22,6 +24,7 @@ const ( ...@@ -22,6 +24,7 @@ const (
PriorityCritical PriorityCritical
) )
// Job is a job that can be executed
type Job[T any] struct { type Job[T any] struct {
id JobID id JobID
priority Priority priority Priority
...@@ -53,6 +56,7 @@ func NewJob[T any](id JobID, runner Runnable[T]) *Job[T] { ...@@ -53,6 +56,7 @@ func NewJob[T any](id JobID, runner Runnable[T]) *Job[T] {
} }
} }
// GetLogs returns the logs of the job
func (j *Job[T]) GetLogs() []JobLog { func (j *Job[T]) GetLogs() []JobLog {
j.mu.Lock() j.mu.Lock()
defer j.mu.Unlock() defer j.mu.Unlock()
...@@ -61,6 +65,7 @@ func (j *Job[T]) GetLogs() []JobLog { ...@@ -61,6 +65,7 @@ func (j *Job[T]) GetLogs() []JobLog {
return logs return logs
} }
// GetStats returns the stats of the job
func (j *Job[T]) GetStats() JobStats { func (j *Job[T]) GetStats() JobStats {
j.mu.Lock() j.mu.Lock()
defer j.mu.Unlock() defer j.mu.Unlock()
...@@ -195,6 +200,7 @@ func (j *Job[T]) SetPriority(priority Priority) *Job[T] { ...@@ -195,6 +200,7 @@ func (j *Job[T]) SetPriority(priority Priority) *Job[T] {
return j return j
} }
// GetPriority returns the priority of the job
func (j *Job[T]) GetPriority() Priority { func (j *Job[T]) GetPriority() Priority {
j.mu.Lock() j.mu.Lock()
defer j.mu.Unlock() defer j.mu.Unlock()
...@@ -211,6 +217,7 @@ func (j *Job[T]) SetTimeout(timeout time.Duration) *Job[T] { ...@@ -211,6 +217,7 @@ func (j *Job[T]) SetTimeout(timeout time.Duration) *Job[T] {
return j return j
} }
// GetTimeout returns the timeout of the job
func (j *Job[T]) GetTimeout() time.Duration { func (j *Job[T]) GetTimeout() time.Duration {
j.mu.Lock() j.mu.Lock()
defer j.mu.Unlock() defer j.mu.Unlock()
...@@ -227,6 +234,7 @@ func (j *Job[T]) SetMaxRetries(maxRetries uint) *Job[T] { ...@@ -227,6 +234,7 @@ func (j *Job[T]) SetMaxRetries(maxRetries uint) *Job[T] {
return j return j
} }
// GetMaxRetries returns the max retries of the job
func (j *Job[T]) GetMaxRetries() uint { func (j *Job[T]) GetMaxRetries() uint {
j.mu.Lock() j.mu.Lock()
defer j.mu.Unlock() defer j.mu.Unlock()
...@@ -243,6 +251,7 @@ func (j *Job[T]) SetRetryDelay(retryDelay time.Duration) *Job[T] { ...@@ -243,6 +251,7 @@ func (j *Job[T]) SetRetryDelay(retryDelay time.Duration) *Job[T] {
return j return j
} }
// GetRetryDelay returns the retry delay of the job
func (j *Job[T]) GetRetryDelay() time.Duration { func (j *Job[T]) GetRetryDelay() time.Duration {
j.mu.Lock() j.mu.Lock()
defer j.mu.Unlock() defer j.mu.Unlock()
...@@ -259,6 +268,7 @@ func (j *Job[T]) SetDependencies(dependencies []JobID) *Job[T] { ...@@ -259,6 +268,7 @@ func (j *Job[T]) SetDependencies(dependencies []JobID) *Job[T] {
return j return j
} }
// AddDependency adds a dependency to the job
func (j *Job[T]) AddDependency(dependency JobID) *Job[T] { func (j *Job[T]) AddDependency(dependency JobID) *Job[T] {
j.mu.Lock() j.mu.Lock()
defer j.mu.Unlock() defer j.mu.Unlock()
...@@ -267,6 +277,7 @@ func (j *Job[T]) AddDependency(dependency JobID) *Job[T] { ...@@ -267,6 +277,7 @@ func (j *Job[T]) AddDependency(dependency JobID) *Job[T] {
return j return j
} }
// RemoveDependency removes the dependency of the job
func (j *Job[T]) RemoveDependency(dependency JobID) *Job[T] { func (j *Job[T]) RemoveDependency(dependency JobID) *Job[T] {
j.mu.Lock() j.mu.Lock()
defer j.mu.Unlock() defer j.mu.Unlock()
...@@ -280,6 +291,7 @@ func (j *Job[T]) RemoveDependency(dependency JobID) *Job[T] { ...@@ -280,6 +291,7 @@ func (j *Job[T]) RemoveDependency(dependency JobID) *Job[T] {
return j return j
} }
// GetDependencies returns the dependencies of the job
func (j *Job[T]) GetDependencies() []JobID { func (j *Job[T]) GetDependencies() []JobID {
j.mu.Lock() j.mu.Lock()
defer j.mu.Unlock() defer j.mu.Unlock()
......
...@@ -90,25 +90,14 @@ func (m *Manager) SetDB(db *gorm.DB) *Manager { ...@@ -90,25 +90,14 @@ func (m *Manager) SetDB(db *gorm.DB) *Manager {
return m return m
} }
// GetDB returns the database connection
func (m *Manager) GetDB() *gorm.DB { func (m *Manager) GetDB() *gorm.DB {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
return m.database return m.database
} }
//func (m *Manager) initDBSaver() *Manager { // GetQueue returns the queue
// m.mu.Lock()
// defer m.mu.Unlock()
//
// if m.dbSaver != nil {
// return m
// }
//
// m.dbSaver = NewDBSaver()
// m.dbSaver.SetManager(m)
// return m
//}
func (m *Manager) checkAndSetRunningState() error { func (m *Manager) checkAndSetRunningState() error {
m.state = ManagerStateStopped m.state = ManagerStateStopped
...@@ -200,12 +189,6 @@ func (m *Manager) Start() error { ...@@ -200,12 +189,6 @@ func (m *Manager) Start() error {
return ErrManagerAlreadyRunning return ErrManagerAlreadyRunning
} }
//if m.stateManager != nil {
// if err := m.stateManager.LoadState(); err != nil {
// return err
// }
//}
if m.dbSaver != nil { if m.dbSaver != nil {
err = m.dbSaver.Start() err = m.dbSaver.Start()
if err != nil { if err != nil {
......
...@@ -29,16 +29,6 @@ type SchedulerPersistence struct { ...@@ -29,16 +29,6 @@ type SchedulerPersistence struct {
Event EventName `yaml:"event,omitempty" json:"event,omitempty" gorm:"column:event"` Event EventName `yaml:"event,omitempty" json:"event,omitempty" gorm:"column:event"`
} }
//func (s Scheduler) GetPersistence() SchedulerPersistence {
// return SchedulerPersistence{
// Type: s.Type,
// Interval: s.Interval,
// Spec: s.Spec,
// Delay: s.Delay,
// Event: s.Event,
// }
//}
// IntervalScheduler is a scheduler that schedules a job at a fixed interval // IntervalScheduler is a scheduler that schedules a job at a fixed interval
type IntervalScheduler struct { type IntervalScheduler struct {
Interval time.Duration Interval time.Duration
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment