diff --git a/Taskfile.yml b/Taskfile.yml index 1c9a46effeb030aca31e4a26e5efd6dd1bb82d5f..f3aa11124c8bd4d5bed9f931bb155c694c1aba99 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -21,9 +21,9 @@ tasks: TEST_BY_TASK: true cmds: - echo "Execute unit tests in Go." - - go test -cover -v ./... - - go test -bench -v ./... - - go test -race -v ./... + - go test -tags=runOnTask -cover -v ./... + - go test -tags=runOnTask -bench -v ./... + - go test -tags=runOnTask -race -v ./... test-fuzz: desc: Conduct fuzzing tests.# @@ -31,7 +31,7 @@ tasks: TEST_BY_TASK: true cmds: - echo "Conduct fuzzing tests." - - go test -v -fuzztime=30s -fuzz=Fuzz ./... + - go test -tags=runOnTask -v -fuzztime=30s -fuzz=Fuzz ./... add-licenses: desc: Attach license headers to Go files. diff --git a/config.go b/config.go new file mode 100644 index 0000000000000000000000000000000000000000..3d1ca53abd00a151d47cdd0f62d4ef09ba28b085 --- /dev/null +++ b/config.go @@ -0,0 +1,11 @@ +package jobqueue + +var globalTableNamePrefix = "job_queue_" + +// SetTableNamePrefix sets the global table name prefix +// This is useful for when you want to use the same database for multiple applications +// and you want to avoid name collisions +// The default value is "job_queue_" +func SetTableNamePrefix(prefix string) { + globalTableNamePrefix = prefix +} diff --git a/database.go b/database.go new file mode 100644 index 0000000000000000000000000000000000000000..d6757aacd3cc58d5f03136d125e08a3811159daa --- /dev/null +++ b/database.go @@ -0,0 +1,174 @@ +package jobqueue + +import ( + "gorm.io/gorm" + "sync" +) + +type DBSaverStatus int + +const ( + DBSaverStatusStopped = iota + DBSaverStatusRunning +) + +type DBSaver struct { + saveChannel chan GenericJob + stopChan chan struct{} + migrateFlag bool + manager *Manager + status DBSaverStatus + mu sync.Mutex +} + +type RunnerData string +type SchedulerData string + +//type SavableJob interface { +// GetLogs() []JobLog +// GetStats() JobStats +// GetID() JobID +// +// //GetPersistence() JobPersistence +//} + +func NewDBSaver() *DBSaver { + return &DBSaver{ + saveChannel: make(chan GenericJob, 100), + stopChan: make(chan struct{}), + } +} + +func (s *DBSaver) SetManager(manager *Manager) *DBSaver { + s.mu.Lock() + defer s.mu.Unlock() + + s.manager = manager + return s +} + +func (s *DBSaver) setStatus(status DBSaverStatus) *DBSaver { + s.mu.Lock() + defer s.mu.Unlock() + + s.status = status + return s +} + +func (s *DBSaver) isStatus(status DBSaverStatus) bool { + s.mu.Lock() + defer s.mu.Unlock() + + return s.status == status + +} + +func (s *DBSaver) Start() error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.manager == nil || s.manager.database == nil { + return ErrNoDatabaseConnection + } + + if s.isStatus(DBSaverStatusRunning) { + return nil + } + + db := s.manager.database + + if !s.migrateFlag { + err := db.AutoMigrate(&JobPersistence{}, &JobLog{}, &JobStats{}) + if err != nil { + return err + } + s.migrateFlag = true + } + + go func() { + s.setStatus(DBSaverStatusRunning) + defer func() { + s.setStatus(DBSaverStatusStopped) + }() + for { + select { + case job := <-s.saveChannel: + + err := db.Transaction(func(tx *gorm.DB) error { + + permJob := job.GetPersistence() + + memLogs := permJob.Logs + permJob.Logs = nil + + result := tx.FirstOrCreate(&permJob, map[string]interface{}{"id": permJob.GetID()}). + Assign(permJob) + + err := result.Error + + // Find or create JobMeta + if err != nil { + s.logError("Error while saving job", "error", err) + } + + if result.RowsAffected == 0 { + tx.Model(&permJob.Stats).Updates(permJob.Stats) + } + + //logs := permJob.GetLogs() + for _, log := range memLogs { + log.LogID = 0 + _ = tx.Create(&log) + // no error handling, if it fails, it fails + } + + return nil // Commit the transaction + }) + + if err != nil { + s.logError("Error while saving job", "error", err) + } + + case <-s.stopChan: + return + } + } + }() + + return nil +} + +func (s *DBSaver) logError(msg string, keysAndValues ...interface{}) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.manager == nil || s.manager.logger == nil { + return + } + + s.manager.logger.Error(msg, keysAndValues...) +} + +func (s *DBSaver) Stop() *DBSaver { + s.mu.Lock() + defer s.mu.Unlock() + + s.stopChan <- struct{}{} + return s +} + +func (s *DBSaver) SaveJob(job GenericJob) error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.saveChannel == nil { + return ErrDBSaverNotInitialized + } + + if s.status != DBSaverStatusRunning { + return ErrDBSaverNotRunning + } + + s.saveChannel <- job + return nil +} diff --git a/database_test.go b/database_test.go new file mode 100644 index 0000000000000000000000000000000000000000..37f4c7d5f6309ec22e3b42d1fe1fdc1f5ae4eea4 --- /dev/null +++ b/database_test.go @@ -0,0 +1,368 @@ +//go:build !runOnTask + +package jobqueue + +import ( + "context" + "fmt" + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/client" + "github.com/docker/go-connections/nat" + "github.com/stretchr/testify/assert" + "gorm.io/driver/mysql" + "gorm.io/gorm" + "gorm.io/gorm/logger" + "log" + "net" + "os" + "runtime" + "strconv" + "sync" + "testing" + "time" +) + +func startTestMySQLDockerImageAndContainer(t *testing.T, port string, ctx context.Context) error { + t.Helper() + + cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) + if err != nil { + return err + } + + imageName := "mysql:8" + + reader, err := cli.ImagePull(ctx, imageName, types.ImagePullOptions{}) + if err != nil { + return err + } + + // if debug image pull, comment out the following lines + //_, _ = io.Copy(os.Stdout, reader) + _ = reader + + hostConfig := &container.HostConfig{ + PortBindings: nat.PortMap{ + "3306/tcp": []nat.PortBinding{ + { + HostIP: DOCKER_TEST_HOST_IP, + HostPort: port, + }, + }, + // if you want to test the web interface, uncomment the following lines + //"8025/tcp": []nat.PortBinding{ + // { + // HostIP: DOCKER_TEST_HOST_IP, + // HostPort: "8025", + // }, + //}, + }, + } + + resp, err := cli.ContainerCreate(ctx, &container.Config{ + Image: imageName, + Env: []string{ + "MYSQL_ROOT_PASSWORD=secret", + "MYSQL_USER=user", + "MYSQL_PASSWORD=secret", + "MYSQL_DATABASE=test", + }, + }, hostConfig, nil, nil, "") + + if err != nil { + return err + } + + if err := cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil { + return err + } + + go func() { + <-ctx.Done() + + timeout := 0 + stopOptions := container.StopOptions{ + Timeout: &timeout, + Signal: "SIGKILL", + } + newCtx, _ := context.WithTimeout(context.Background(), 60*time.Second) + if err := cli.ContainerStop(newCtx, resp.ID, stopOptions); err != nil { + t.Errorf("ContainerStop returned error: %v", err) + } + if err := cli.ContainerRemove(newCtx, resp.ID, types.ContainerRemoveOptions{ + Force: true, + }); err != nil { + t.Errorf("ContainerRemove returned error: %v", err) + } + + }() + + statusCh, errCh := cli.ContainerWait(ctx, resp.ID, container.WaitConditionNotRunning) + select { + case err := <-errCh: + if err != nil { + // empty error means container exited normally (see container_wait.go) + if err.Error() == "" { + return nil + } + + return err + } + case <-statusCh: + + } + + return nil +} + +func TestWriteToDB(t *testing.T) { + + // if true, logging and port 3306 is used + debugMode := false + + var err error + + ctb := context.Background() + ctx, cancel := context.WithCancel(ctb) + t.Cleanup(func() { + cancel() + time.Sleep(1 * time.Second) + }) + + listener, err := net.Listen("tcp", DOCKER_TEST_HOST_IP+":0") + if err != nil { + t.Errorf("Unexpected error: %v", err) + return + } + portAsInt := listener.Addr().(*net.TCPAddr).Port + portAsString := fmt.Sprintf("%d", portAsInt) + _ = listener.Close() + + useMySQLPort := os.Getenv("MYSQL_PORT") + if debugMode || useMySQLPort != "" { + if useMySQLPort == "" { + useMySQLPort = "3306" + } + portAsString = useMySQLPort + i, _ := strconv.Atoi(portAsString) + + portAsInt = i + } + + done := make(chan bool) + go func() { + err = startTestMySQLDockerImageAndContainer(t, portAsString, ctx) + if err != nil { + t.Errorf("Unexpected error: %v", err) + cancel() + } + done <- true + }() + + waitCtx, waitCancel := context.WithTimeout(ctx, 60*time.Second) + defer waitCancel() + for { + conn, err := net.DialTimeout("tcp", net.JoinHostPort(DOCKER_TEST_HOST_IP, portAsString), 1*time.Second) + if err == nil { + err = conn.Close() + assert.Nil(t, err) + break + } + select { + case <-waitCtx.Done(): + t.Error("Timeout waiting for container service") + cancel() + return + default: + time.Sleep(1 * time.Second) + } + } + + dsn := "user:secret@tcp(" + net.JoinHostPort(DOCKER_TEST_HOST_IP, portAsString) + ")/test?charset=utf8mb4&parseTime=True&loc=Local" + + counter := 0 + var db *gorm.DB + + time.Sleep(20 * time.Second) + + var dbLogger logger.Interface + + if debugMode { + dbLogger = logger.New( + log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer + logger.Config{ + SlowThreshold: time.Second, // Slow SQL threshold + LogLevel: logger.Info, // Log level + Colorful: false, // Disable color + }, + ) + } else { + + dbLogger = logger.Default.LogMode(logger.Silent) + } + + for counter < 20 { + + db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{ + Logger: dbLogger, + }) + + if err == nil { + break + } + + counter++ + time.Sleep(1 * time.Second) + } + + if err != nil { + t.Errorf("Unexpected error: %v", err) + return + } + + var wg sync.WaitGroup + + // run sub tests + wg.Add(1) + t.Run("TestWriteToDB", func(t *testing.T) { + + defer wg.Done() + + mgr := NewManager() + mgr.SetDB(db) + worker := NewLocalWorker(1) + err := mgr.AddWorker(worker) + assert.Nil(t, err) + + err = mgr.Start() + assert.Nil(t, err) + + runner := &CounterRunnable{} + job := NewJob[CounterResult]("job1", runner) + + scheduler := &InstantScheduler{} + err = mgr.ScheduleJob(job, scheduler) + assert.Nil(t, err) + + err = mgr.CancelJobSchedule("job1") + assert.Nil(t, err) + + time.Sleep(1 * time.Second) + + scheduler2 := &InstantScheduler{} + err = mgr.ScheduleJob(job, scheduler2) + assert.Nil(t, err) + + err = mgr.CancelJobSchedule("job1") + assert.Nil(t, err) + + time.Sleep(1 * time.Second) + + scheduler3 := &InstantScheduler{} + err = mgr.ScheduleJob(job, scheduler3) + assert.Nil(t, err) + + time.Sleep(1 * time.Second) + + if mgr.dbSaver == nil { + t.Error("mgr.dbSaver == nil") + return + } + + time.Sleep(1 * time.Second) + + err = mgr.dbSaver.SaveJob(job) + assert.Nil(t, err) + + runtime.Gosched() + time.Sleep(1 * time.Second) + err = mgr.CancelJobSchedule("job1") + assert.Nil(t, err) + + runtime.Gosched() + time.Sleep(1 * time.Second) + + scheduler4 := &InstantScheduler{} + err = mgr.ScheduleJob(job, scheduler4) + assert.Nil(t, err) + + runtime.Gosched() + time.Sleep(1 * time.Second) + + err = mgr.dbSaver.SaveJob(job) + assert.Nil(t, err) + + time.Sleep(2 * time.Second) + err = mgr.CancelJobSchedule("job1") + assert.Nil(t, err) + + tries := 10 + for tries > 0 { + + var tmpJob JobPersistence + + if err := db.First(&tmpJob, "id = ?", "job1").Error; err == nil { + break + } + + tries-- + time.Sleep(1 * time.Second) + + } + + assert.True(t, tries > 0) + + err = LoadJobsAndScheduleFromDatabase(db, mgr) + assert.Nil(t, err) + + time.Sleep(1 * time.Second) + + err = mgr.dbSaver.SaveJob(job) + assert.Nil(t, err) + time.Sleep(1 * time.Second) + + }) + + wg.Wait() + + var jobPersistence JobPersistence + var jobStats JobStats + var jobLogs []JobLog // Assuming JobLog is your log model + + // Query JobPersistence + if err := db.First(&jobPersistence, "id = ?", "job1").Error; err != nil { + t.Errorf("Failed to query JobPersistence: %v", err) + } else { + // Validate the fields + assert.Equal(t, JobID("job1"), jobPersistence.ID) + } + + // Query JobStats + if err := db.First(&jobStats, "job_id = ?", jobPersistence.ID).Error; err != nil { + t.Errorf("Failed to query JobStats: %v", err) + } else { + // Validate the fields + assert.Equal(t, jobPersistence.ID, jobStats.JobID) + } + + // Query JobLogs + if err := db.Find(&jobLogs, "job_id = ?", jobPersistence.ID).Error; err != nil { + t.Errorf("Failed to query JobLogs: %v", err) + } else { + assert.NotEmpty(t, jobLogs) + + for _, l := range jobLogs { + assert.Equal(t, jobPersistence.ID, l.JobID) + } + } + + cancel() + + select { + case <-done: + time.Sleep(1 * time.Second) + case <-time.After(1 * time.Minute): + t.Error("test hangs, timeout reached") + } +} diff --git a/devenv.nix b/devenv.nix index 9c0b1eb81a0f3cd9c3ac85a057ac5ac581411f9f..da8f4c789e89b01a8804ac3855b99e913743480d 100644 --- a/devenv.nix +++ b/devenv.nix @@ -37,6 +37,7 @@ procps ranger unixtools.xxd + dbeaver unzip util-linux wget @@ -258,9 +259,9 @@ tasks: TEST_BY_TASK: true cmds: - echo "Execute unit tests in Go." - - go test -cover -v ./... - - go test -bench -v ./... - - go test -race -v ./... + - go test -tags=runOnTask -cover -v ./... + - go test -tags=runOnTask -bench -v ./... + - go test -tags=runOnTask -race -v ./... test-fuzz: desc: Conduct fuzzing tests.# @@ -268,7 +269,7 @@ tasks: TEST_BY_TASK: true cmds: - echo "Conduct fuzzing tests." - - go test -v -fuzztime=30s -fuzz=Fuzz ./... + - go test -tags=runOnTask -v -fuzztime=30s -fuzz=Fuzz ./... add-licenses: desc: Attach license headers to Go files. diff --git a/errors.go b/errors.go index c3d3da9d85113ad7649d671ea17bde318c06c29b..22efbaa18eb3b8f058bc6783191a786be59f89a0 100644 --- a/errors.go +++ b/errors.go @@ -34,4 +34,10 @@ var ( ErrFailedToCreateTempFile = fmt.Errorf("failed to create temp file") ErrFailedToWriteTempFile = fmt.Errorf("failed to write temp file") ErrJobAlreadyScheduled = fmt.Errorf("job already scheduled") + ErrNoDatabaseConnection = fmt.Errorf("no database connection") + ErrDBSaverNotRunning = fmt.Errorf("dbsaver is not running") + ErrDBSaverNotInitialized = fmt.Errorf("dbsaver is not initialized") + ErrSchedulerNotSet = fmt.Errorf("scheduler is not set") + ErrJobNotActive = fmt.Errorf("job is not active") + ErrJobAlreadyActive = fmt.Errorf("job is already active") ) diff --git a/import.go b/import.go deleted file mode 100644 index 18ed76888b3625820e1b18488b5652215128d39e..0000000000000000000000000000000000000000 --- a/import.go +++ /dev/null @@ -1,270 +0,0 @@ -package jobqueue - -import ( - "encoding/json" - "fmt" - "gopkg.in/yaml.v3" - "io" - "os" - "strings" - "time" -) - -type JobImport struct { - ID string `yaml:"id" json:"id"` - Priority int `yaml:"priority" json:"priority"` - Timeout time.Duration `yaml:"timeout" json:"timeout"` - MaxRetries uint `yaml:"maxRetries" json:"maxRetries"` - RetryDelay time.Duration `yaml:"retryDelay" json:"retryDelay"` - Dependencies []string `yaml:"dependencies" json:"dependencies,omitempty"` - Runnable RunnableImport `yaml:"runnable" json:"runnable"` - Scheduler SchedulerImport `yaml:"scheduler" json:"scheduler,omitempty"` -} - -type RunnableImport struct { - Type string `yaml:"type" json:"type"` - Data map[string]any `yaml:"data,omitempty" json:"data,omitempty"` -} - -type SchedulerImport struct { - Type string `yaml:"type" json:"type"` - Interval time.Duration `yaml:"interval,omitempty" json:"interval,omitempty"` - Spec string `yaml:"spec,omitempty" json:"spec,omitempty"` - Delay time.Duration `yaml:"delay,omitempty" json:"delay,omitempty"` - Event string `yaml:"event,omitempty" json:"event,omitempty"` -} - -func ReadYAML(r io.Reader) ([]JobImport, error) { - var jobs []JobImport - decoder := yaml.NewDecoder(r) - if err := decoder.Decode(&jobs); err != nil { - return nil, err - } - return jobs, nil -} - -func ReadJSON(r io.Reader) ([]JobImport, error) { - var jobs []JobImport - decoder := json.NewDecoder(r) - if err := decoder.Decode(&jobs); err != nil { - return nil, err - } - return jobs, nil -} - -func ReadYAMLFile(filePath string) ([]JobImport, error) { - file, err := os.Open(filePath) - if err != nil { - return nil, err - } - defer file.Close() - return ReadYAML(file) -} - -func ReadJsonFile(filePath string) ([]JobImport, error) { - file, err := os.Open(filePath) - if err != nil { - return nil, err - } - defer file.Close() - return ReadJSON(file) -} - -func CreateGenericJobFromImport[T any](jobImport JobImport, runner Runnable[T]) GenericJob { - return &Job[T]{ - id: JobID(jobImport.ID), - priority: Priority(jobImport.Priority), - timeout: jobImport.Timeout, - maxRetries: jobImport.MaxRetries, - RetryDelay: jobImport.RetryDelay, - runner: runner, - } -} - -func CreateJobAndSchedulerFromImport(jobImport JobImport, manager *Manager) (GenericJob, Scheduler, error) { - - var job GenericJob - - rType := strings.ToLower(jobImport.Runnable.Type) - - runnableData := make(map[string]interface{}) - for k, v := range jobImport.Runnable.Data { - runnableData[strings.ToLower(k)] = v - } - - switch rType { - case "dummy": - - runner, err := NewDummyRunnableFromMap(runnableData) - if err != nil { - return nil, nil, err - } - - job = CreateGenericJobFromImport[DummyResult](jobImport, runner) - - case "counter": - - runner, err := NewCounterRunnableFromMap(runnableData) - if err != nil { - return nil, nil, err - } - - job = CreateGenericJobFromImport[CounterResult](jobImport, runner) - - case "fileoperation": - runner, err := NewFileOperationRunnableFromMap(runnableData) - if err != nil { - return nil, nil, err - } - - job = CreateGenericJobFromImport[FileOperationResult](jobImport, runner) - - case "db": - runner, err := NewDBRunnableFromMap(runnableData) - if err != nil { - return nil, nil, err - } - - job = CreateGenericJobFromImport[DBResult](jobImport, runner) - - case "http": - runner, err := NewHTTPRunnableFromMap(runnableData) - if err != nil { - return nil, nil, err - } - - job = CreateGenericJobFromImport[HTTPResult](jobImport, runner) - - case "mail": - runner, err := NewMailRunnableFromMap(runnableData) - if err != nil { - return nil, nil, err - } - - job = CreateGenericJobFromImport[MailResult](jobImport, runner) - - case "sftp": - runner, err := NewSFTPRunnableFromMap(runnableData) - if err != nil { - return nil, nil, err - } - - job = CreateGenericJobFromImport[SFTPResult](jobImport, runner) - - case "shell": - runner, err := NewShellRunnableFromMap(runnableData) - if err != nil { - return nil, nil, err - } - - job = CreateGenericJobFromImport[ShellResult](jobImport, runner) - - default: - return nil, nil, - fmt.Errorf("%w: %s, available types: dummy, counter, fileoperation, db, http, mail, sftp, shell", - ErrUnknownRunnableType, rType) - - } - - sType := strings.ToLower(jobImport.Scheduler.Type) - - scheduleData := make(map[string]interface{}) - for k, v := range jobImport.Runnable.Data { - scheduleData[strings.ToLower(k)] = v - } - - var scheduler Scheduler - switch sType { - case "interval": - scheduler = &IntervalScheduler{Interval: jobImport.Scheduler.Interval} - - case "cron": - scheduler = &CronScheduler{ - Spec: jobImport.Scheduler.Spec, - } - - if manager != nil { - scheduler.(*CronScheduler).cron = manager.GetCronInstance() - } - - case "delay": - scheduler = &DelayScheduler{Delay: jobImport.Scheduler.Delay} - - case "event": - scheduler = &EventScheduler{Event: EventName(jobImport.Scheduler.Event)} - - default: - return nil, nil, ErrUnknownSchedulerType - } - - return job, scheduler, nil -} - -// LoadJobsAndScheduleFromFile read jobs from a file and schedule them. (json/yaml) -func LoadJobsAndScheduleFromFile(filePath string, manager *Manager) error { - - var err error - var imp []JobImport - - if filePath[len(filePath)-4:] == "json" { - imp, err = ReadJsonFile(filePath) - } else if filePath[len(filePath)-4:] == "yaml" { - imp, err = ReadYAMLFile(filePath) - } else { - return ErrUnknownFormat - } - - if err != nil { - return err - } - - for _, imp := range imp { - job, scheduler, err := CreateJobAndSchedulerFromImport(imp, manager) - if err != nil { - return err - } - - err = manager.ScheduleJob(job, scheduler) - if err != nil { - return err - } - } - - return nil - -} - -// ImportJobsAndSchedule lädt Jobs aus einem Reader und plant sie ein. -func ImportJobsAndSchedule(reader io.Reader, format string, manager *Manager) error { - var err error - var imp []JobImport - - // format to lowercase - format = strings.ToLower(format) - - if format == "json" { - imp, err = ReadJSON(reader) - } else if format == "yaml" { - imp, err = ReadYAML(reader) - } else { - return fmt.Errorf("%w: %s", ErrUnknownFormat, format) - } - - if err != nil { - return err - } - - for _, imp := range imp { - job, scheduler, err := CreateJobAndSchedulerFromImport(imp, manager) - if err != nil { - return err - } - - err = manager.ScheduleJob(job, scheduler) - if err != nil { - return err - } - } - - return nil -} diff --git a/issue-1_test.go b/issue-1_test.go index e0153b2274cce411346ffe29d95c9730a5cc89ff..da72d65cd92732e38099970065a58efc6d9f125d 100644 --- a/issue-1_test.go +++ b/issue-1_test.go @@ -1,10 +1,12 @@ +//go:build !runOnTask + package jobqueue import ( "github.com/robfig/cron/v3" "github.com/stretchr/testify/assert" "go.uber.org/zap" - "os" + "strings" "testing" "time" @@ -12,34 +14,6 @@ import ( func TestRoundTrip(t *testing.T) { - if os.Getenv("TEST_BY_TASK") != "" { - t.Skip("Skipping test because TEST_BY_TASK is set") - } - - // type JobImport struct { - // ID string `yaml:"id" json:"id"` - // Priority int `yaml:"priority" json:"priority"` - // Timeout time.Duration `yaml:"timeout" json:"timeout"` - // MaxRetries uint `yaml:"maxRetries" json:"maxRetries"` - // RetryDelay time.Duration `yaml:"retryDelay" json:"retryDelay"` - // Dependencies []string `yaml:"dependencies" json:"dependencies,omitempty"` - // Runnable RunnableImport `yaml:"runnable" json:"runnable"` - // Scheduler SchedulerImport `yaml:"scheduler" json:"scheduler,omitempty"` - //} - // - //type RunnableImport struct { - // Type string `yaml:"type" json:"type"` - // Data map[string]any `yaml:"data,omitempty" json:"data,omitempty"` - //} - // - //type SchedulerImport struct { - // Type string `yaml:"type" json:"type"` - // Interval time.Duration `yaml:"interval,omitempty" json:"interval,omitempty"` - // Spec string `yaml:"spec,omitempty" json:"spec,omitempty"` - // Delay time.Duration `yaml:"delay,omitempty" json:"delay,omitempty"` - // Event string `yaml:"event,omitempty" json:"event,omitempty"` - //} - // define test data with jobs in yaml format testData := []byte(` - id: job1 @@ -106,6 +80,6 @@ func TestRoundTrip(t *testing.T) { err = ImportJobsAndSchedule(reader, "yaml", manager) assert.Nil(t, err) - time.Sleep(10 * time.Minute) + time.Sleep(10 * time.Hour) } diff --git a/job-generic.go b/job-generic.go new file mode 100644 index 0000000000000000000000000000000000000000..07d2d43f44b87e2d80010b2b1c6672be75d019b1 --- /dev/null +++ b/job-generic.go @@ -0,0 +1,29 @@ +package jobqueue + +import ( + "context" + "time" +) + +type GenericJob interface { + GetID() JobID + GetDependencies() []JobID + + GetPriority() Priority + + Execute(ctx context.Context) (RunGenericResult, error) + + Cancel() error + + GetMaxRetries() uint + + GetRetryDelay() time.Duration + + GetTimeout() time.Duration + + GetPersistence() JobPersistence + + SetScheduler(scheduler Scheduler) + + GetScheduler() Scheduler +} diff --git a/job-log.go b/job-log.go index 171039ea04363fc9e57c3b6e64e7c4bdcd93d740..655a52d4365a0d5b677d7520bda34fdf9976375c 100644 --- a/job-log.go +++ b/job-log.go @@ -1,26 +1,38 @@ package jobqueue import ( + "gorm.io/gorm" "time" ) type JobLog struct { - ProcessID int `json:"process_id"` - StartTime time.Time `json:"start_time"` - EndTime time.Time `json:"end_time"` - ExitCode int `json:"exit_code"` - Result any `json:"output"` - ResourceUsage struct { - Memory uint64 `json:"memory"` - CPU struct { - Percentage float64 `json:"percentage"` - } `json:"cpu"` - } `json:"resource_usage"` - IO struct { - Disk int64 `json:"disk"` - Network int64 `json:"network"` - } `json:"io"` - ErrorMsg string `json:"error_msg"` - IsSuccessful bool `json:"is_successful"` - //Metadata map[string]string `json:"metadata"` + LogID uint `gorm:"primarykey;autoIncrement:true"` + JobID JobID `json:"job_id" gorm:"type:varchar(255);foreignKey:JobID;references:ID"` + ProcessID int `json:"process_id"` + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` + ExitCode int `json:"exit_code"` + Result string `json:"output" gorm:"type:TEXT"` // Assuming serialized JSON for any type + ResourceUsage ResourceUsage `json:"resource_usage" gorm:"embedded;embeddedPrefix:resource_usage_"` + IO IO `json:"io" gorm:"embedded;embeddedPrefix:io_"` + ErrorMsg string `json:"error_msg"` + IsSuccessful bool `json:"is_successful"` + + CreatedAt time.Time + UpdatedAt time.Time + DeletedAt gorm.DeletedAt `gorm:"index"` +} + +type ResourceUsage struct { + Memory uint64 `json:"memory"` + CPU float64 `json:"cpu"` +} + +type IO struct { + Disk int64 `json:"disk"` + Network int64 `json:"network"` +} + +func (JobLog) TableName() string { + return globalTableNamePrefix + "job_logs" } diff --git a/job-stat.go b/job-stat.go index 68599e0414d59f62a15ea6180b1d4e1ae2edec29..44c8682b9c990e273fad8053746ed5071ef97b0b 100644 --- a/job-stat.go +++ b/job-stat.go @@ -1,17 +1,28 @@ package jobqueue import ( + "gorm.io/gorm" "time" ) type JobStats struct { - RunCount int `json:"run_count"` - SuccessCount int `json:"success_count"` - ErrorCount int `json:"error_count"` - TimeMetrics struct { - AvgRunTime time.Duration `json:"avg"` - MaxRunTime time.Duration `json:"max"` - MinRunTime time.Duration `json:"min"` - TotalRunTime time.Duration `json:"total"` - } `json:"time_metrics"` + JobID JobID `json:"job_id" gorm:"primaryKey"` + RunCount int `json:"run_count"` + SuccessCount int `json:"success_count"` + ErrorCount int `json:"error_count"` + TimeMetrics TimeMetrics `json:"time_metrics" gorm:"embedded;embeddedPrefix:time_metrics_"` + CreatedAt time.Time + UpdatedAt time.Time + DeletedAt gorm.DeletedAt `gorm:"index"` +} + +type TimeMetrics struct { + AvgRunTime time.Duration `json:"avg"` + MaxRunTime time.Duration `json:"max"` + MinRunTime time.Duration `json:"min"` + TotalRunTime time.Duration `json:"total"` +} + +func (JobStats) TableName() string { + return globalTableNamePrefix + "job_stats" } diff --git a/job.go b/job.go index dd747722a0832bc1ae57618d7a364c5698bb9e18..bfe9620d14b3fab3bdcb4c3a1e42c07300a76d64 100644 --- a/job.go +++ b/job.go @@ -22,27 +22,6 @@ const ( PriorityCritical ) -type GenericJob interface { - GetID() JobID - GetDependencies() []JobID - - GetPriority() Priority - - Execute(ctx context.Context) (RunGenericResult, error) - - Cancel() error - - GetMaxRetries() uint - - GetRetryDelay() time.Duration - - GetTimeout() time.Duration - - SerializeState() JobSerializedState - - UnserializeState(serializedState JobSerializedState) -} - type Job[T any] struct { id JobID priority Priority @@ -51,6 +30,8 @@ type Job[T any] struct { maxRetries uint RetryDelay time.Duration + scheduler Scheduler + dependencies []JobID mu sync.Mutex @@ -61,52 +42,85 @@ type Job[T any] struct { logs []JobLog } -type JobSerializedState struct { - ID JobID `json:"id"` - Priority Priority `json:"priority"` - Timeout time.Duration `json:"timeout" ` - MaxRetries uint `json:"maxRetries" ` - RetryDelay time.Duration `json:"retryDelay"` - Dependencies []JobID `json:"dependencies" ` - Stats JobStats `json:"stats"` - Logs []JobLog `json:"logs"` -} - // NewJob creates a new job with the given id and runner func NewJob[T any](id JobID, runner Runnable[T]) *Job[T] { return &Job[T]{ id: id, runner: runner, priority: PriorityDefault, + logs: make([]JobLog, 0), + stats: JobStats{}, } } -func (j *Job[T]) SerializeState() JobSerializedState { +func (j *Job[T]) GetLogs() []JobLog { + j.mu.Lock() + defer j.mu.Unlock() + logs := j.logs + j.logs = make([]JobLog, 0) + return logs +} + +func (j *Job[T]) GetStats() JobStats { + j.mu.Lock() + defer j.mu.Unlock() + // workaround for gorm + j.stats.JobID = j.id + return j.stats +} + +// GetPersistence returns the persistence of the job +// and clears the logs. After this call, the logs are +// no longer available in the job. +func (j *Job[T]) GetPersistence() JobPersistence { + j.mu.Lock() defer j.mu.Unlock() - return JobSerializedState{ + + job := JobPersistence{ ID: j.id, Priority: j.priority, Timeout: j.timeout, MaxRetries: j.maxRetries, RetryDelay: j.RetryDelay, Dependencies: j.dependencies, - Stats: j.stats, - Logs: j.logs, + Runnable: j.runner.GetPersistence(), + + Logs: j.logs, + Stats: j.stats, + } + + if j.scheduler != nil { + job.Scheduler = j.scheduler.GetPersistence() } + + job.Stats.JobID = job.ID + + for i, _ := range job.Logs { + job.Logs[i].JobID = job.ID + } + + // Clear logs + j.logs = make([]JobLog, 0) + + return job + +} + +// SetScheduler sets the scheduler of the job +func (j *Job[T]) SetScheduler(scheduler Scheduler) { + j.mu.Lock() + defer j.mu.Unlock() + + j.scheduler = scheduler } -func (j *Job[T]) UnserializeState(serializedState JobSerializedState) { +// GetScheduler returns the scheduler of the job +func (j *Job[T]) GetScheduler() Scheduler { j.mu.Lock() defer j.mu.Unlock() - j.id = serializedState.ID - j.priority = serializedState.Priority - j.timeout = serializedState.Timeout - j.maxRetries = serializedState.MaxRetries - j.RetryDelay = serializedState.RetryDelay - j.dependencies = serializedState.Dependencies - j.stats = serializedState.Stats - j.logs = serializedState.Logs + + return j.scheduler } // Execute executes the job @@ -269,7 +283,6 @@ func (j *Job[T]) RemoveDependency(dependency JobID) *Job[T] { func (j *Job[T]) GetDependencies() []JobID { j.mu.Lock() defer j.mu.Unlock() - return j.dependencies } @@ -277,7 +290,6 @@ func (j *Job[T]) GetDependencies() []JobID { func (j *Job[T]) GetID() JobID { j.mu.Lock() defer j.mu.Unlock() - return j.id } diff --git a/job_test.go b/job_test.go index b95aac4f5d1093c75ab3d2bd292459ba2d501bdd..3400cef3f69ba342fe192334d4f7db1e004267f4 100644 --- a/job_test.go +++ b/job_test.go @@ -142,51 +142,3 @@ echo "Hello World" assert.Equal(t, 2, stats.SuccessCount) assert.Equal(t, 0, stats.ErrorCount) } - -func TestSerializeState(t *testing.T) { - // Initialize job - job := NewJob[DummyResult]("testJob", &DummyRunnable{}) - - job.SetPriority(PriorityHigh) - job.SetTimeout(5 * time.Minute) - job.SetMaxRetries(3) - job.SetRetryDelay(1 * time.Minute) - job.SetDependencies([]JobID{"dep1", "dep2"}) - - // Serialize state - serializedState := job.SerializeState() - - // Verify serialized state - assert.Equal(t, JobID("testJob"), serializedState.ID) - assert.Equal(t, PriorityHigh, serializedState.Priority) - assert.Equal(t, 5*time.Minute, serializedState.Timeout) - assert.Equal(t, uint(3), serializedState.MaxRetries) - assert.Equal(t, 1*time.Minute, serializedState.RetryDelay) - assert.ElementsMatch(t, []JobID{"dep1", "dep2"}, serializedState.Dependencies) -} - -func TestUnserializeState(t *testing.T) { - // Initialize serialized state - serializedState := JobSerializedState{ - ID: "testJob", - Priority: PriorityHigh, - Timeout: 5 * time.Minute, - MaxRetries: 3, - RetryDelay: 1 * time.Minute, - Dependencies: []JobID{"dep1", "dep2"}, - } - - // Initialize job - job := NewJob[DummyResult]("initialJob", &DummyRunnable{}) - - // Unserialize state - job.UnserializeState(serializedState) - - // Verify job properties - assert.Equal(t, JobID("testJob"), job.GetID()) - assert.Equal(t, PriorityHigh, job.GetPriority()) - assert.Equal(t, 5*time.Minute, job.GetTimeout()) - assert.Equal(t, uint(3), job.GetMaxRetries()) - assert.Equal(t, 1*time.Minute, job.GetRetryDelay()) - assert.ElementsMatch(t, []JobID{"dep1", "dep2"}, job.GetDependencies()) -} diff --git a/json-map.go b/json-map.go new file mode 100644 index 0000000000000000000000000000000000000000..69e4b0608bcdd73fe4f107f1eb26ba6d90d05886 --- /dev/null +++ b/json-map.go @@ -0,0 +1,34 @@ +package jobqueue + +import ( + "database/sql/driver" + "encoding/json" + "errors" +) + +type JSONMap map[string]interface{} + +func (m *JSONMap) Scan(value interface{}) error { + bytes, ok := value.([]byte) + if !ok { + return errors.New("Scan source is not []byte") + } + + if err := json.Unmarshal(bytes, m); err != nil { + return err + } + return nil +} + +func (m JSONMap) Value() (driver.Value, error) { + if m == nil { + // Return NULL if the map is nil + return nil, nil + } + + bytes, err := json.Marshal(m) + if err != nil { + return nil, err + } + return bytes, nil +} diff --git a/manager.go b/manager.go index 5a8468f4007b9b3133463de1ea625fd21c473a50..9b531ecaf89a4a5312ad9ea313291eaa8e75bf74 100644 --- a/manager.go +++ b/manager.go @@ -3,7 +3,9 @@ package jobqueue import ( "fmt" "github.com/robfig/cron/v3" + "gorm.io/gorm" "sync" + "time" ) type ManagerState int @@ -19,15 +21,18 @@ type Manager struct { queue *Queue workerMap map[WorkerID]Worker eventBus *EventBus - scheduled map[JobID]Scheduler - jobEventCh chan interface{} + activeJobs map[JobID]GenericJob + //scheduled map[JobID]Scheduler - stateManager StateManager + jobEventCh chan interface{} cronInstance *cron.Cron logger Logger + database *gorm.DB + dbSaver *DBSaver + mu sync.Mutex } @@ -36,13 +41,19 @@ func NewManager() *Manager { eventBus := NewEventBus() - return &Manager{ - state: ManagerStateStopped, - queue: NewQueue(eventBus), - workerMap: make(map[WorkerID]Worker), - eventBus: eventBus, - scheduled: make(map[JobID]Scheduler), + mng := &Manager{ + state: ManagerStateStopped, + queue: NewQueue(eventBus), + workerMap: make(map[WorkerID]Worker), + eventBus: eventBus, + activeJobs: make(map[JobID]GenericJob), + //scheduled: make(map[JobID]Scheduler), + //dbSaver: NewDBSaver(), } + + //mng.dbSaver.SetManager(mng) + return mng + } func (m *Manager) GetEventBus() *EventBus { @@ -51,10 +62,11 @@ func (m *Manager) GetEventBus() *EventBus { return m.eventBus } -func (m *Manager) SetCronInstance(cronInstance *cron.Cron) { +func (m *Manager) SetCronInstance(cronInstance *cron.Cron) *Manager { m.mu.Lock() defer m.mu.Unlock() m.cronInstance = cronInstance + return m } func (m *Manager) GetCronInstance() *cron.Cron { @@ -63,6 +75,40 @@ func (m *Manager) GetCronInstance() *cron.Cron { return m.cronInstance } +func (m *Manager) SetDB(db *gorm.DB) *Manager { + m.mu.Lock() + defer m.mu.Unlock() + m.database = db + + if m.dbSaver != nil { + return m + } + + m.dbSaver = NewDBSaver() + m.dbSaver.SetManager(m) + + return m +} + +func (m *Manager) GetDB() *gorm.DB { + m.mu.Lock() + defer m.mu.Unlock() + return m.database +} + +//func (m *Manager) initDBSaver() *Manager { +// m.mu.Lock() +// defer m.mu.Unlock() +// +// if m.dbSaver != nil { +// return m +// } +// +// m.dbSaver = NewDBSaver() +// m.dbSaver.SetManager(m) +// return m +//} + func (m *Manager) checkAndSetRunningState() error { m.state = ManagerStateStopped @@ -145,6 +191,8 @@ func (m *Manager) RemoveWorker(worker Worker) error { // Start starts the manager func (m *Manager) Start() error { + var err error + m.mu.Lock() defer m.mu.Unlock() @@ -152,9 +200,18 @@ func (m *Manager) Start() error { return ErrManagerAlreadyRunning } - if m.stateManager != nil { - if err := m.stateManager.LoadState(); err != nil { - return err + //if m.stateManager != nil { + // if err := m.stateManager.LoadState(); err != nil { + // return err + // } + //} + + if m.dbSaver != nil { + err = m.dbSaver.Start() + if err != nil { + if m.logger != nil { + m.logger.Error("Error while starting db saver", "error", err) + } } } @@ -190,7 +247,7 @@ func (m *Manager) Start() error { go m.handleJobEvents() - err := m.checkAndSetRunningState() + err = m.checkAndSetRunningState() if err != nil { wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error()) @@ -237,23 +294,22 @@ func (m *Manager) Stop() error { wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error()) } - if m.stateManager != nil { - if err := m.stateManager.LoadState(); err != nil { - return err - } - } - if m.cronInstance != nil { m.cronInstance.Stop() } + if m.dbSaver != nil { + m.dbSaver.Stop() + } + return wrappedErr } -func (m *Manager) SetLogger(logger Logger) { +func (m *Manager) SetLogger(logger Logger) *Manager { m.mu.Lock() defer m.mu.Unlock() m.logger = logger + return m } func (m *Manager) GetLogger() Logger { @@ -282,8 +338,11 @@ func (m *Manager) handleJobEvents() { job := event.Data.(GenericJob) err := m.queue.Enqueue(job) if err != nil && err != ErrJobAlreadyExists { - fmt.Println(err) + if m.logger != nil { + m.logger.Error("Error while queueing job", "error", err) + } } + case JobReady: for { @@ -297,33 +356,54 @@ func (m *Manager) handleJobEvents() { } assigned := false - for _, worker := range m.workerMap { - if err := worker.AssignJob(nextJob); err == nil { - assigned = true + maxTries := 10 + + for maxTries > 0 { + maxTries-- + + for _, worker := range m.workerMap { + if err := worker.AssignJob(nextJob); err == nil { + assigned = true + break + } + } + + if assigned == true { break } + + time.Sleep(1 * time.Second) } if !assigned { - fmt.Println("Job not assigned") - err = m.queue.Enqueue(nextJob) - if err != nil && err != ErrJobAlreadyExists { - fmt.Println(err) + if m.logger != nil { + m.logger.Info("No worker available for job", "job_id", nextJob.GetID()) + } + } else { + + if nextJob.GetScheduler() != nil { + if nextJob.GetScheduler().IsAdHoc() { + eventBus := m.GetEventBus() + eventBus.Publish(JobFinished, nextJob) + } + } + } } case JobFinished: - job := event.Data.(GenericJob) + // job is finished and should be archived - // check if job should archived - // is it an single run job? - schd := m.scheduled[job.GetID()] - if schd == nil { - job.SerializeState() + job := event.Data.(GenericJob) + _ = job + err := m.CancelJobSchedule(job.GetID()) + if err != nil { + if m.logger != nil { + m.logger.Error("Error while canceling job schedule", "error", err) + } } - } } } @@ -334,17 +414,34 @@ func (m *Manager) ScheduleJob(job GenericJob, scheduler Scheduler) error { m.mu.Lock() defer m.mu.Unlock() + if scheduler == nil { + return ErrSchedulerNotSet + } + if m.state != ManagerStateRunning { return ErrManagerNotRunning } - m.scheduled[job.GetID()] = scheduler + if job.GetScheduler() != nil { + return ErrJobAlreadyScheduled + } + + if _, ok := m.activeJobs[job.GetID()]; ok { + return ErrJobAlreadyActive + } + + job.SetScheduler(scheduler) + err := scheduler.Schedule(job, m.eventBus) + if err != nil { + return err + } + + m.activeJobs[job.GetID()] = job + return nil - return scheduler.Schedule(job, m.eventBus) } -// CancelJob cancels a scheduled job -func (m *Manager) CancelJob(id JobID) error { +func (m *Manager) CancelJobSchedule(id JobID) error { m.mu.Lock() defer m.mu.Unlock() @@ -352,22 +449,24 @@ func (m *Manager) CancelJob(id JobID) error { return ErrManagerNotRunning } - if _, ok := m.scheduled[id]; !ok { - return ErrJobNotScheduled + job, ok := m.activeJobs[id] + if !ok { + return ErrJobNotActive } - scheduler, ok := m.scheduled[id] - if !ok { + if job.GetScheduler() == nil { return ErrJobNotScheduled } - err := scheduler.Cancel(id) + scheduler := job.GetScheduler() + + err := scheduler.Cancel(job.GetID()) if err != nil { return err } - delete(m.scheduled, id) + job.SetScheduler(nil) + delete(m.activeJobs, job.GetID()) return nil - } diff --git a/manager_test.go b/manager_test.go index bbf2fb04665e0eca5595553ff8c68937a40c5d67..b5abdd614500a74cfdf971a6edf7b4c654f10f80 100644 --- a/manager_test.go +++ b/manager_test.go @@ -53,7 +53,8 @@ func (s *MockScheduler) GetNextRunTime(jobID JobID) time.Time { } type MockGenericJob struct { - ID JobID + ID JobID + Scheduler Scheduler } func (m *MockGenericJob) GetMaxRetries() uint { @@ -68,14 +69,6 @@ func (m *MockGenericJob) GetTimeout() time.Duration { return 0 } -func (m *MockGenericJob) SerializeState() JobSerializedState { - return JobSerializedState{} -} - -func (m *MockGenericJob) UnserializeState(serializedState JobSerializedState) { - return -} - func (m *MockGenericJob) GetID() JobID { return m.ID } @@ -100,6 +93,19 @@ func (m *MockGenericJob) Cancel() error { return nil } +func (m *MockGenericJob) GetPersistence() JobPersistence { + return JobPersistence{} +} + +func (m *MockGenericJob) SetScheduler(scheduler Scheduler) { + m.Scheduler = scheduler + return +} + +func (m *MockGenericJob) GetScheduler() Scheduler { + return m.Scheduler +} + func TestNewManager(t *testing.T) { eventBus := NewEventBus() manager := NewManager() @@ -190,11 +196,13 @@ func TestManager_CancelJob(t *testing.T) { assert.Nil(t, err) job := &MockGenericJob{ID: "job1"} - scheduler := InstantScheduler{} + scheduler := EventScheduler{} err = m.ScheduleJob(job, &scheduler) assert.Nil(t, err) - err = m.CancelJob("job1") + time.Sleep(1 * time.Second) + + err = m.CancelJobSchedule(job.GetID()) assert.Nil(t, err) } diff --git a/persistence.go b/persistence.go new file mode 100644 index 0000000000000000000000000000000000000000..33660016dad80d65e9d25461821b7fa5196ba14c --- /dev/null +++ b/persistence.go @@ -0,0 +1,358 @@ +package jobqueue + +import ( + "encoding/json" + "fmt" + "gopkg.in/yaml.v3" + "gorm.io/gorm" + "io" + "os" + "strings" + "time" +) + +type JobPersistence struct { + ID JobID `yaml:"id" json:"id" gorm:"type:varchar(255);primaryKey"` + Priority Priority `yaml:"priority" json:"priority" gorm:"column:priority"` + Timeout time.Duration `yaml:"timeout" json:"timeout" gorm:"column:timeout"` + MaxRetries uint `yaml:"maxRetries" json:"maxRetries" gorm:"column:max_retries"` + RetryDelay time.Duration `yaml:"retryDelay" json:"retryDelay" gorm:"column:retry_delay"` + Dependencies []JobID `yaml:"dependencies" json:"dependencies,omitempty" gorm:"column:dependencies;type:json"` + Runnable RunnableImport `yaml:"runnable" json:"runnable" gorm:"embedded;embeddedPrefix:runnable_"` + Scheduler SchedulerPersistence `yaml:"scheduler" json:"scheduler,omitempty" gorm:"embedded;embeddedPrefix:scheduler_"` + + Logs []JobLog `gorm:"foreignKey:JobID;references:ID"` + Stats JobStats `gorm:"foreignKey:JobID"` + + CreatedAt time.Time `gorm:"column:created_at"` + UpdatedAt time.Time `gorm:"column:updated_at"` + DeletedAt gorm.DeletedAt `gorm:"column:deleted_at;index"` +} + +func (jp JobPersistence) GetLogs() []JobLog { + return jp.Logs +} + +func (jp JobPersistence) GetStats() JobStats { + return jp.Stats +} + +func (jp JobPersistence) GetID() JobID { + return jp.ID +} + +func (jp JobPersistence) GetPersistence() JobPersistence { + return jp +} + +func (JobPersistence) TableName() string { + return globalTableNamePrefix + "jobs" +} + +type RunnableImport struct { + Type string `yaml:"type" json:"type" gorm:"column:type"` + Data JSONMap `yaml:"data,omitempty" json:"data,omitempty" gorm:"column:data;type:json"` +} + +func ReadYAML(r io.Reader) ([]JobPersistence, error) { + var jobs []JobPersistence + decoder := yaml.NewDecoder(r) + if err := decoder.Decode(&jobs); err != nil { + return nil, err + } + return jobs, nil +} + +func ReadJSON(r io.Reader) ([]JobPersistence, error) { + var jobs []JobPersistence + decoder := json.NewDecoder(r) + if err := decoder.Decode(&jobs); err != nil { + return nil, err + } + return jobs, nil +} + +func ReadYAMLFile(filePath string) ([]JobPersistence, error) { + file, err := os.Open(filePath) + if err != nil { + return nil, err + } + defer file.Close() + return ReadYAML(file) +} + +func ReadJsonFile(filePath string) ([]JobPersistence, error) { + file, err := os.Open(filePath) + if err != nil { + return nil, err + } + defer file.Close() + return ReadJSON(file) +} + +func ReadFromGORM(db *gorm.DB) ([]JobPersistence, error) { + var jobs []JobPersistence + + err := db.Transaction(func(tx *gorm.DB) error { + + if err := tx.Find(&jobs).Error; err != nil { + return err + } + + var wrappedErr []error + // load stats too + for i := range jobs { + if err := db.Model(&jobs[i]).Association("Stats").Find(&jobs[i].Stats); err != nil { + wrappedErr = append(wrappedErr, err) + } + } + + if len(wrappedErr) > 0 { + returnErr := fmt.Errorf("errors while loading stats from database") + for _, err := range wrappedErr { + returnErr = fmt.Errorf("%w: %v", returnErr, err) + } + return returnErr + } + + return nil + }) + + return jobs, err +} + +func CreateGenericJobFromPersistence[T any](jobImport JobPersistence, runner Runnable[T]) GenericJob { + return &Job[T]{ + id: jobImport.ID, + priority: jobImport.Priority, + timeout: jobImport.Timeout, + maxRetries: jobImport.MaxRetries, + RetryDelay: jobImport.RetryDelay, + runner: runner, + } +} + +func CreateJobAndSchedulerFromPersistence(jobImport JobPersistence, manager *Manager) (GenericJob, Scheduler, error) { + + var job GenericJob + + rType := strings.ToLower(jobImport.Runnable.Type) + + runnableData := make(map[string]interface{}) + for k, v := range jobImport.Runnable.Data { + runnableData[strings.ToLower(k)] = v + } + + switch rType { + case "dummy": + + runner, err := NewDummyRunnableFromMap(runnableData) + if err != nil { + return nil, nil, err + } + + job = CreateGenericJobFromPersistence[DummyResult](jobImport, runner) + + case "counter": + + runner, err := NewCounterRunnableFromMap(runnableData) + if err != nil { + return nil, nil, err + } + + job = CreateGenericJobFromPersistence[CounterResult](jobImport, runner) + + case "fileoperation": + runner, err := NewFileOperationRunnableFromMap(runnableData) + if err != nil { + return nil, nil, err + } + + job = CreateGenericJobFromPersistence[FileOperationResult](jobImport, runner) + + case "db": + runner, err := NewDBRunnableFromMap(runnableData) + if err != nil { + return nil, nil, err + } + + job = CreateGenericJobFromPersistence[DBResult](jobImport, runner) + + case "http": + runner, err := NewHTTPRunnableFromMap(runnableData) + if err != nil { + return nil, nil, err + } + + job = CreateGenericJobFromPersistence[HTTPResult](jobImport, runner) + + case "mail": + runner, err := NewMailRunnableFromMap(runnableData) + if err != nil { + return nil, nil, err + } + + job = CreateGenericJobFromPersistence[MailResult](jobImport, runner) + + case "sftp": + runner, err := NewSFTPRunnableFromMap(runnableData) + if err != nil { + return nil, nil, err + } + + job = CreateGenericJobFromPersistence[SFTPResult](jobImport, runner) + + case "shell": + runner, err := NewShellRunnableFromMap(runnableData) + if err != nil { + return nil, nil, err + } + + job = CreateGenericJobFromPersistence[ShellResult](jobImport, runner) + + default: + return nil, nil, + fmt.Errorf("%w: %s, available types: dummy, counter, fileoperation, db, http, mail, sftp, shell", + ErrUnknownRunnableType, rType) + + } + + sType := strings.ToLower(jobImport.Scheduler.Type) + + scheduleData := make(map[string]interface{}) + for k, v := range jobImport.Runnable.Data { + scheduleData[strings.ToLower(k)] = v + } + + var scheduler Scheduler + switch sType { + case "interval": + scheduler = &IntervalScheduler{Interval: jobImport.Scheduler.Interval} + + case "cron": + scheduler = &CronScheduler{ + Spec: jobImport.Scheduler.Spec, + } + + if manager != nil { + scheduler.(*CronScheduler).cron = manager.GetCronInstance() + } + + case "delay": + scheduler = &DelayScheduler{Delay: jobImport.Scheduler.Delay} + + case "event": + scheduler = &EventScheduler{Event: EventName(jobImport.Scheduler.Event)} + + case "instant": + scheduler = &InstantScheduler{} + + default: + return nil, nil, ErrUnknownSchedulerType + } + + return job, scheduler, nil +} + +// LoadJobsAndScheduleFromFile read jobs from a file and schedule them. (json/yaml) +func LoadJobsAndScheduleFromFile(filePath string, manager *Manager) error { + + var err error + var imp []JobPersistence + + if filePath[len(filePath)-4:] == "json" { + imp, err = ReadJsonFile(filePath) + } else if filePath[len(filePath)-4:] == "yaml" { + imp, err = ReadYAMLFile(filePath) + } else { + return ErrUnknownFormat + } + + if err != nil { + return err + } + + for _, imp := range imp { + job, scheduler, err := CreateJobAndSchedulerFromPersistence(imp, manager) + if err != nil { + return err + } + + err = manager.ScheduleJob(job, scheduler) + if err != nil { + return err + } + } + + return nil + +} + +func LoadJobsAndScheduleFromDatabase(db *gorm.DB, manager *Manager) error { + jobs, err := ReadFromGORM(db) + if err != nil { + return err + } + + var errs []error + + for _, job := range jobs { + j, s, err := CreateJobAndSchedulerFromPersistence(job, manager) + if err != nil { + errs = append(errs, err) + continue + + } + + err = manager.ScheduleJob(j, s) + if err != nil { + errs = append(errs, err) + continue + } + } + + if len(errs) > 0 { + returnErr := fmt.Errorf("errors while loading jobs from database") + for _, err := range errs { + returnErr = fmt.Errorf("%w: %v", returnErr, err) + } + return returnErr + } + + return nil +} + +// ImportJobsAndSchedule lädt Jobs aus einem Reader und plant sie ein. +func ImportJobsAndSchedule(reader io.Reader, format string, manager *Manager) error { + var err error + var imp []JobPersistence + + // format to lowercase + format = strings.ToLower(format) + + if format == "json" { + imp, err = ReadJSON(reader) + } else if format == "yaml" { + imp, err = ReadYAML(reader) + } else { + return fmt.Errorf("%w: %s", ErrUnknownFormat, format) + } + + if err != nil { + return err + } + + for _, imp := range imp { + job, scheduler, err := CreateJobAndSchedulerFromPersistence(imp, manager) + if err != nil { + return err + } + + err = manager.ScheduleJob(job, scheduler) + if err != nil { + return err + } + } + + return nil +} diff --git a/import_test.go b/persistence_test.go similarity index 89% rename from import_test.go rename to persistence_test.go index 39e1d6d4fa73c9ae661389f0bd762e786d6e5f16..9f070220138f390a64c0a3c082bd5ed988069f4b 100644 --- a/import_test.go +++ b/persistence_test.go @@ -12,14 +12,14 @@ import ( func TestCreateJobAndSchedulerFromInput(t *testing.T) { tests := []struct { name string - input JobImport + input JobPersistence wantJob GenericJob wantSched Scheduler wantErr bool }{ { name: "Shell Runnable and Interval Scheduler", - input: JobImport{ + input: JobPersistence{ ID: "1", Priority: 1, Timeout: 10 * time.Second, @@ -29,7 +29,7 @@ func TestCreateJobAndSchedulerFromInput(t *testing.T) { Type: "Shell", Data: map[string]any{"ScriptPath": "script.sh"}, }, - Scheduler: SchedulerImport{ + Scheduler: SchedulerPersistence{ Type: "Interval", Interval: 1 * time.Minute, }, @@ -40,14 +40,14 @@ func TestCreateJobAndSchedulerFromInput(t *testing.T) { }, { name: "Shell Runnable and Cron Scheduler", - input: JobImport{ + input: JobPersistence{ ID: "1", Priority: 1, Runnable: RunnableImport{ Type: "Shell", Data: map[string]any{"ScriptPath": "script.sh"}, }, - Scheduler: SchedulerImport{ + Scheduler: SchedulerPersistence{ Type: "Cron", Spec: "* * * * * *", }, @@ -60,7 +60,7 @@ func TestCreateJobAndSchedulerFromInput(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - gotJob, gotSchedule, err := CreateJobAndSchedulerFromImport(tt.input, nil) + gotJob, gotSchedule, err := CreateJobAndSchedulerFromPersistence(tt.input, nil) if gotSchedule != nil { if gotSchedule.GetType() == "Cron" { cronInst := cron.New(cron.WithSeconds()) @@ -72,7 +72,7 @@ func TestCreateJobAndSchedulerFromInput(t *testing.T) { } if (err != nil) != tt.wantErr { - t.Errorf("CreateJobAndSchedulerFromImport() error = %v, wantErr %v", err, tt.wantErr) + t.Errorf("CreateJobAndSchedulerFromPersistence() error = %v, wantErr %v", err, tt.wantErr) return } diff --git a/runnable-counter.go b/runnable-counter.go index 593a84fc0de764730ada7e48dfcbcdbc7cd1a761..d5612e91bb48935f4b10c38847085b8f3f2ce733 100644 --- a/runnable-counter.go +++ b/runnable-counter.go @@ -6,10 +6,15 @@ import ( ) func NewCounterRunnableFromMap(data map[string]any) (*CounterRunnable, error) { - count, ok := data["count"].(int) + + // in go numbers are float64 by default + floatCount, ok := data["count"].(float64) if !ok { return nil, fmt.Errorf("%w: Invalid count: %v", ErrInvalidData, data["count"]) } + + count := int(floatCount) + return &CounterRunnable{Count: count}, nil } @@ -45,3 +50,19 @@ func (c *CounterRunnable) Run() (RunResult[CounterResult], error) { }, }, nil } + +func (c *CounterRunnable) GetType() string { + return "counter" +} + +func (c *CounterRunnable) GetPersistence() RunnableImport { + + data := JSONMap{ + "count": c.Count, + } + + return RunnableImport{ + Type: c.GetType(), + Data: data, + } +} diff --git a/runnable-dummy.go b/runnable-dummy.go index 795fa7621287472c93dc2e3dfeb1d748bfa8e5fa..387097317981c9d9170b63f0c072647d838169ce 100644 --- a/runnable-dummy.go +++ b/runnable-dummy.go @@ -15,3 +15,17 @@ func (d *DummyRunnable) Run() (RunResult[DummyResult], error) { Status: ResultStatusSuccess, }, nil } + +func (d *DummyRunnable) GetType() string { + return "dummy" +} + +func (c *DummyRunnable) GetPersistence() RunnableImport { + + data := JSONMap{} + + return RunnableImport{ + Type: c.GetType(), + Data: data, + } +} diff --git a/runnable-fileoperation.go b/runnable-fileoperation.go index e79fd76631587ef1b6e1caac34de7362cde9a576..09d39408e8ed1399dd7f91f0f6842f9e7f7b5ac2 100644 --- a/runnable-fileoperation.go +++ b/runnable-fileoperation.go @@ -121,3 +121,21 @@ func (f *FileOperationRunnable) Run() (RunResult[FileOperationResult], error) { return RunResult[FileOperationResult]{Status: ResultStatusFailed}, ErrUnsupportedFileOption } } + +func (f *FileOperationRunnable) GetType() string { + return "fileoperation" +} + +func (c *FileOperationRunnable) GetPersistence() RunnableImport { + + data := JSONMap{ + "operation": c.Operation, + "filepath": c.FilePath, + "content": c.Content, + } + + return RunnableImport{ + Type: c.GetType(), + Data: data, + } +} diff --git a/runnable-gorm.go b/runnable-gorm.go index 65b956b3126987fb1f46d9205c711e7fd97e1106..d4170711a57522008e0a6918957fbadb283e7c69 100644 --- a/runnable-gorm.go +++ b/runnable-gorm.go @@ -77,3 +77,21 @@ func (d *DBRunnable) Run() (RunResult[DBResult], error) { }, }, nil } + +func (d *DBRunnable) GetType() string { + return "db" +} + +func (c *DBRunnable) GetPersistence() RunnableImport { + + data := JSONMap{ + "type": c.Type, + "dsn": c.DSN, + "query": c.Query, + } + + return RunnableImport{ + Type: c.GetType(), + Data: data, + } +} diff --git a/runnable-http.go b/runnable-http.go index 67ae4cfec7f3993f360ed42a7000183d376b6699..a9c48906027f703a348675747afa91219a97c0cb 100644 --- a/runnable-http.go +++ b/runnable-http.go @@ -81,3 +81,22 @@ func (h *HTTPRunnable) Run() (RunResult[HTTPResult], error) { }, }, nil } + +func (h *HTTPRunnable) GetType() string { + return "http" +} + +func (c *HTTPRunnable) GetPersistence() RunnableImport { + + data := JSONMap{ + "url": c.URL, + "method": c.Method, + "header": c.Header, + "body": c.Body, + } + + return RunnableImport{ + Type: c.GetType(), + Data: data, + } +} diff --git a/runnable-mail.go b/runnable-mail.go index 0a835fdb13cd60477d83a4e841cc8773d72f697c..dfe8364cae8c87c68f59710ab5daacaf387a7bb5 100644 --- a/runnable-mail.go +++ b/runnable-mail.go @@ -137,3 +137,27 @@ func (m *MailRunnable) Run() (RunResult[MailResult], error) { return RunResult[MailResult]{Status: ResultStatusSuccess, Data: MailResult{Sent: true, SmtpStatusCode: smtpStatusCode}}, nil } + +func (m *MailRunnable) GetType() string { + return "mail" +} + +func (c *MailRunnable) GetPersistence() RunnableImport { + + data := JSONMap{ + "to": c.To, + "from": c.From, + "subject": c.Subject, + "body": c.Body, + "server": c.Server, + "port": c.Port, + "username": c.Username, + "password": c.Password, + "headers": c.Headers, + } + + return RunnableImport{ + Type: c.GetType(), + Data: data, + } +} diff --git a/runnable-sftp.go b/runnable-sftp.go index a2213812dc14d059fb3dca7edf4d559029c7ce8e..7c4625fb318ef3014aec9cc96f09c3cfb9363c41 100644 --- a/runnable-sftp.go +++ b/runnable-sftp.go @@ -272,3 +272,28 @@ func (s *SFTPRunnable) copyRemoteToLocal(sftpClient *sftp.Client) ([]string, err return filesCopied, nil } + +func (s *SFTPRunnable) GetType() string { + return "sftp" +} + +func (c *SFTPRunnable) GetPersistence() RunnableImport { + + data := JSONMap{ + "host": c.Host, + "port": c.Port, + "user": c.User, + "insecure": c.Insecure, + "credential": c.Credential, + "credential_type": c.CredentialType, + "hostkey": c.HostKey, + "src_dir": c.SrcDir, + "dst_dir": c.DstDir, + "transfer_direction": c.TransferDirection, + } + + return RunnableImport{ + Type: c.GetType(), + Data: data, + } +} diff --git a/runnable-shell.go b/runnable-shell.go index cfa5533a304a81424b7572b1a0589494dc64b5fa..a3762898a21da8b84e9010358b6f6e13617bbac6 100644 --- a/runnable-shell.go +++ b/runnable-shell.go @@ -107,3 +107,20 @@ func (s *ShellRunnable) Run() (RunResult[ShellResult], error) { }, }, nil } + +func (s *ShellRunnable) GetType() string { + return "shell" +} + +func (c *ShellRunnable) GetPersistence() RunnableImport { + + data := JSONMap{ + "script_path": c.ScriptPath, + "script": c.Script, + } + + return RunnableImport{ + Type: c.GetType(), + Data: data, + } +} diff --git a/runnable.go b/runnable.go index 99f66a1c1f7d929ff90685fe775c869b497c7664..a55b7572d7f3b26dc4c58362b89bd49c3df600a2 100644 --- a/runnable.go +++ b/runnable.go @@ -22,4 +22,7 @@ func (r RunResult[T]) GetStatus() ResultStatus { type Runnable[T any] interface { Run() (RunResult[T], error) + GetType() string + + GetPersistence() RunnableImport } diff --git a/runnable_test.go b/runnable_test.go index 437ef151e29b865b07eb2894574d03aa0ef75917..5af9a42c6880e6fc8b2889d297f255baad31ccf0 100644 --- a/runnable_test.go +++ b/runnable_test.go @@ -26,6 +26,29 @@ func (m MockErrorRunnable) Run() (RunResult[string], error) { return RunResult[string]{}, errors.New("RunError") } +func (m MockErrorRunnable) GetType() string { + return "mock-error" +} + +func (m MockFailedRunnable) GetType() string { + return "mock-failed" +} + +func (m MockSuccessfulRunnable) GetType() string { + return "mock-success" +} + +func (m MockErrorRunnable) GetPersistence() RunnableImport { + return RunnableImport{} +} + +func (m MockFailedRunnable) GetPersistence() RunnableImport { + return RunnableImport{} +} +func (m MockSuccessfulRunnable) GetPersistence() RunnableImport { + return RunnableImport{} +} + func TestRunnable(t *testing.T) { var run Runnable[string] diff --git a/scheduler.go b/scheduler.go index 9b702aeb235bfe66dde82796c5b34f1b6b4662fc..4e4b6b51a3400255b625b894a19f4e1088df7dd9 100644 --- a/scheduler.go +++ b/scheduler.go @@ -15,8 +15,30 @@ type Scheduler interface { JobExists(id JobID) bool GetType() string + + IsAdHoc() bool + + GetPersistence() SchedulerPersistence } +type SchedulerPersistence struct { + Type string `yaml:"type" json:"type" gorm:"column:type"` + Interval time.Duration `yaml:"interval,omitempty" json:"interval,omitempty" gorm:"column:interval"` + Spec string `yaml:"spec,omitempty" json:"spec,omitempty" gorm:"column:spec"` + Delay time.Duration `yaml:"delay,omitempty" json:"delay,omitempty" gorm:"column:delay"` + Event EventName `yaml:"event,omitempty" json:"event,omitempty" gorm:"column:event"` +} + +//func (s Scheduler) GetPersistence() SchedulerPersistence { +// return SchedulerPersistence{ +// Type: s.Type, +// Interval: s.Interval, +// Spec: s.Spec, +// Delay: s.Delay, +// Event: s.Event, +// } +//} + // IntervalScheduler is a scheduler that schedules a job at a fixed interval type IntervalScheduler struct { Interval time.Duration @@ -61,6 +83,10 @@ func (s *IntervalScheduler) GetType() string { return "Interval" } +func (s *IntervalScheduler) IsAdHoc() bool { + return false +} + func (s *IntervalScheduler) Cancel(id JobID) error { if s.jobs == nil { return nil @@ -96,6 +122,13 @@ func (s *IntervalScheduler) JobExists(id JobID) bool { return ok } +func (s *IntervalScheduler) GetPersistence() SchedulerPersistence { + return SchedulerPersistence{ + Type: s.GetType(), + Interval: s.Interval, + } +} + // CronScheduler is a scheduler that uses the cron library to schedule jobs type CronScheduler struct { cron *cron.Cron @@ -136,6 +169,10 @@ func (s *CronScheduler) GetType() string { return "Cron" } +func (s *CronScheduler) IsAdHoc() bool { + return false +} + func (s *CronScheduler) Cancel(id JobID) error { if s.jobs == nil { @@ -171,6 +208,13 @@ func (s *CronScheduler) JobExists(id JobID) bool { return ok } +func (s *CronScheduler) GetPersistence() SchedulerPersistence { + return SchedulerPersistence{ + Type: s.GetType(), + Spec: s.Spec, + } +} + // DelayScheduler is a scheduler that schedules a job after a delay type DelayScheduler struct { Delay time.Duration @@ -207,6 +251,10 @@ func (s *DelayScheduler) GetType() string { return "Delay" } +func (s *DelayScheduler) IsAdHoc() bool { + return true +} + func (s *DelayScheduler) Cancel(id JobID) error { if s.jobs == nil { return nil @@ -242,6 +290,13 @@ func (s *DelayScheduler) JobExists(id JobID) bool { return ok } +func (s *DelayScheduler) GetPersistence() SchedulerPersistence { + return SchedulerPersistence{ + Type: s.GetType(), + Delay: s.Delay, + } +} + // EventScheduler is a scheduler that schedules a job when an event is received type EventScheduler struct { Event EventName @@ -282,6 +337,10 @@ func (s *EventScheduler) GetType() string { return "Event" } +func (s *EventScheduler) IsAdHoc() bool { + return false +} + func (s *EventScheduler) Cancel(id JobID) error { if s.jobs == nil { return nil @@ -318,6 +377,13 @@ func (s *EventScheduler) JobExists(id JobID) bool { return ok } +func (s *EventScheduler) GetPersistence() SchedulerPersistence { + return SchedulerPersistence{ + Type: s.GetType(), + Event: s.Event, + } +} + // InstantScheduler is a scheduler that schedules a job instantly type InstantScheduler struct{} @@ -330,6 +396,10 @@ func (s *InstantScheduler) GetType() string { return "Instant" } +func (s *InstantScheduler) IsAdHoc() bool { + return true +} + func (s *InstantScheduler) Cancel(id JobID) error { return nil } @@ -341,3 +411,9 @@ func (s *InstantScheduler) CancelAll() error { func (s *InstantScheduler) JobExists(id JobID) bool { return false } + +func (s *InstantScheduler) GetPersistence() SchedulerPersistence { + return SchedulerPersistence{ + Type: s.GetType(), + } +} diff --git a/state.go b/state.go deleted file mode 100644 index ade8cfa7d4e506e1200c84ff4b79f67573c40f3d..0000000000000000000000000000000000000000 --- a/state.go +++ /dev/null @@ -1,55 +0,0 @@ -package jobqueue - -import ( - "encoding/json" - "os" -) - -// State represent the state of the job queue -type State struct { - // Jobs enthält alle Jobs, die wir speichern wollen - Jobs []JobSerializedState `json:"jobs"` -} - -type StateManager interface { - LoadState() error - SaveState() error -} - -// FileStateManager implementiert StateManager -type FileStateManager struct { - filePath string - state *State -} - -// LoadState load the state from the file -func (f *FileStateManager) LoadState() error { - file, err := os.Open(f.filePath) - if err != nil { - return err - } - defer file.Close() - - decoder := json.NewDecoder(file) - if err := decoder.Decode(&f.state); err != nil { - return err - } - - return nil -} - -// SaveState save the state to the file -func (f *FileStateManager) SaveState() error { - file, err := os.Create(f.filePath) - if err != nil { - return err - } - defer file.Close() - - encoder := json.NewEncoder(file) - if err := encoder.Encode(f.state); err != nil { - return err - } - - return nil -} diff --git a/state_test.go b/state_test.go deleted file mode 100644 index 8a14b16abe19fa607434462f60df89b25f423f6b..0000000000000000000000000000000000000000 --- a/state_test.go +++ /dev/null @@ -1,40 +0,0 @@ -package jobqueue - -import ( - "os" - "path" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestFileStateManager(t *testing.T) { - - tmpDir := t.TempDir() - - tempFile := path.Join(tmpDir, "state.json") - defer os.Remove(tempFile) - - manager := &FileStateManager{ - filePath: tempFile, - state: &State{ - Jobs: []JobSerializedState{ - {ID: "1"}, - {ID: "2"}, - }, - }, - } - - err := manager.SaveState() - assert.NoError(t, err) - - manager2 := &FileStateManager{ - filePath: tempFile, - state: &State{}, - } - - err = manager2.LoadState() - assert.NoError(t, err) - - assert.Equal(t, manager.state, manager2.state) -} diff --git a/worker_test.go b/worker_test.go index d9dcecb0628af8fab2f9288441c4689390eed962..a940f5f3c1534131d88cc091490f1ab566b3850b 100644 --- a/worker_test.go +++ b/worker_test.go @@ -40,16 +40,20 @@ func (j DummyJob) GetDependencies() []JobID { return []JobID{} } +func (j DummyJob) GetPersistence() JobPersistence { + return JobPersistence{} +} + func (j DummyJob) GetPriority() Priority { return PriorityDefault } -func (j DummyJob) SerializeState() JobSerializedState { - return JobSerializedState{} +func (j DummyJob) SetScheduler(scheduler Scheduler) { + return } -func (j DummyJob) UnserializeState(serializedState JobSerializedState) { - return +func (j DummyJob) GetScheduler() Scheduler { + return nil } func TestAssignJob(t *testing.T) {