diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 62252154c67ce4fd2a7f575045a1d8f9ba9e6afa..ca7c8db5840679cfe4e4c5872f1607e91ce0668d 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -64,7 +64,3 @@ deploy: paths: - /nix/store - - artifacts: - paths: - - dist diff --git a/devenv.nix b/devenv.nix index 4d0389caa28cf051329e3d7a4b9f00aa120791ec..78bebff27041e7e638d574f76346b52516d4a435 100644 --- a/devenv.nix +++ b/devenv.nix @@ -363,10 +363,6 @@ deploy: paths: - /nix/store - - artifacts: - paths: - - dist EOF diff --git a/errors.go b/errors.go index ec304a7909f6fcfe5f36dbb9d3ad294b8f07ebb5..8493dd84676d0d3b8013f1648c7befb8191e7d1d 100644 --- a/errors.go +++ b/errors.go @@ -29,4 +29,5 @@ var ( ErrUnsupportedFileOption = fmt.Errorf("unsupported file option") ErrUnsupportedCredentialType = fmt.Errorf("unsupported credential type") ErrUnsupportedTransferDirection = fmt.Errorf("unsupported transfer direction") + ErrInvalidData = fmt.Errorf("invalid data") ) diff --git a/import.go b/import.go index 371f122b0a10a87be0410192a6181351714fc7cb..987ec1232c0d55b224fb4fa746a0eaa0b45f53d1 100644 --- a/import.go +++ b/import.go @@ -60,62 +60,87 @@ func ReadJsonFile(filePath string) ([]JobImport, error) { return jobs, nil } +func CreateGenericJobFromImport[T any](jobImport JobImport, runner Runnable[T]) GenericJob { + return &Job[T]{ + id: JobID(jobImport.ID), + priority: Priority(jobImport.Priority), + timeout: jobImport.Timeout, + maxRetries: jobImport.MaxRetries, + RetryDelay: jobImport.RetryDelay, + runner: runner, + } +} + func CreateJobAndSchedulerFromImport(jobImport JobImport) (GenericJob, Scheduler, error) { var job GenericJob switch jobImport.Runnable.Type { - case "Shell": + case "Dummy": - runner := &ShellRunnable{ - ScriptPath: jobImport.Runnable.Data["ScriptPath"].(string), + runner, err := NewDummyRunnableFromMap(jobImport.Runnable.Data) + if err != nil { + return nil, nil, err } - job = GenericJob(&Job[ShellResult]{ - id: JobID(jobImport.ID), - priority: Priority(jobImport.Priority), - timout: jobImport.Timeout, - maxRetries: jobImport.MaxRetries, - RetryDelay: jobImport.RetryDelay, - runner: runner, - }) + job = CreateGenericJobFromImport[DummyResult](jobImport, runner) case "Counter": - runner := &CounterRunnable{ - Count: jobImport.Runnable.Data["Count"].(int), + + runner, err := NewCounterRunnableFromMap(jobImport.Runnable.Data) + if err != nil { + return nil, nil, err } - job = GenericJob(&Job[CounterResult]{ - id: JobID(jobImport.ID), - priority: Priority(jobImport.Priority), - timout: jobImport.Timeout, - maxRetries: jobImport.MaxRetries, - RetryDelay: jobImport.RetryDelay, - runner: runner, - }) - case "HTTP": - runner := &HTTPRunnable{ - URL: jobImport.Runnable.Data["URL"].(string), + job = CreateGenericJobFromImport[CounterResult](jobImport, runner) + + case "FileOperation": + runner, err := NewFileOperationRunnableFromMap(jobImport.Runnable.Data) + if err != nil { + return nil, nil, err } - job = GenericJob(&Job[HTTPResult]{id: JobID(jobImport.ID), - priority: Priority(jobImport.Priority), - timout: jobImport.Timeout, - maxRetries: jobImport.MaxRetries, - RetryDelay: jobImport.RetryDelay, - runner: runner, - }) + + job = CreateGenericJobFromImport[FileOperationResult](jobImport, runner) case "DB": - runner := &DBRunnable{ - Query: jobImport.Runnable.Data["Query"].(string), + runner, err := NewDBRunnableFromMap(jobImport.Runnable.Data) + if err != nil { + return nil, nil, err } - job = GenericJob(&Job[DBResult]{id: JobID(jobImport.ID), - priority: Priority(jobImport.Priority), - timout: jobImport.Timeout, - maxRetries: jobImport.MaxRetries, - RetryDelay: jobImport.RetryDelay, - runner: runner, - }) + + job = CreateGenericJobFromImport[DBResult](jobImport, runner) + + case "HTTP": + runner, err := NewHTTPRunnableFromMap(jobImport.Runnable.Data) + if err != nil { + return nil, nil, err + } + + job = CreateGenericJobFromImport[HTTPResult](jobImport, runner) + + case "Mail": + runner, err := NewMailRunnableFromMap(jobImport.Runnable.Data) + if err != nil { + return nil, nil, err + } + + job = CreateGenericJobFromImport[MailResult](jobImport, runner) + + case "SFTP": + runner, err := NewSFTPRunnableFromMap(jobImport.Runnable.Data) + if err != nil { + return nil, nil, err + } + + job = CreateGenericJobFromImport[SFTPResult](jobImport, runner) + + case "Shell": + runner, err := NewShellRunnableFromMap(jobImport.Runnable.Data) + if err != nil { + return nil, nil, err + } + + job = CreateGenericJobFromImport[ShellResult](jobImport, runner) default: return nil, nil, ErrUnknownRunnableType diff --git a/job-handler.go b/job-handler.go new file mode 100644 index 0000000000000000000000000000000000000000..8308aab74838662d8c7a0d7bbc8dd5b5c99d9eb5 --- /dev/null +++ b/job-handler.go @@ -0,0 +1,30 @@ +package jobqueue + +type CompletedJobHandler interface { + HandleCompletedJob(job GenericJob) error +} + +type DatabaseArchiver struct { + // ... +} + +func (d *DatabaseArchiver) HandleCompletedJob(job GenericJob) error { + + return nil +} + +type FileLogger struct { +} + +func (f *FileLogger) HandleCompletedJob(job GenericJob) error { + + return nil +} + +type MetricsPublisher struct { +} + +func (m *MetricsPublisher) HandleCompletedJob(job GenericJob) error { + + return nil +} diff --git a/job.go b/job.go index 1dcdb80fc9dd0721ba198f1afe0fec6b5bf2b961..86f6def9d151d9691bcd34ff05137f0bf6534815 100644 --- a/job.go +++ b/job.go @@ -37,13 +37,15 @@ type GenericJob interface { GetRetryDelay() time.Duration GetTimeout() time.Duration + + Archive() error } type Job[T any] struct { id JobID priority Priority - timout time.Duration + timeout time.Duration maxRetries uint RetryDelay time.Duration @@ -57,6 +59,17 @@ type Job[T any] struct { logs []JobLog } +type JobSerializedState struct { + ID JobID `json:"id"` + Priority Priority `json:"priority"` + Timeout time.Duration `json:"timeout" ` + MaxRetries uint `json:"maxRetries" ` + RetryDelay time.Duration `json:"retryDelay"` + Dependencies []JobID `json:"dependencies" ` + Stats JobStats `json:"stats"` + Logs []JobLog `json:"logs"` +} + // NewJob creates a new job with the given id and runner func NewJob[T any](id JobID, runner Runnable[T]) *Job[T] { return &Job[T]{ @@ -66,6 +79,38 @@ func NewJob[T any](id JobID, runner Runnable[T]) *Job[T] { } } +func (j *Job[T]) SerializeState() JobSerializedState { + j.mu.Lock() + defer j.mu.Unlock() + return JobSerializedState{ + ID: j.id, + Priority: j.priority, + Timeout: j.timeout, + MaxRetries: j.maxRetries, + RetryDelay: j.RetryDelay, + Dependencies: j.dependencies, + Stats: j.stats, + Logs: j.logs, + } +} + +func (j *Job[T]) UnserializeState(serializedState JobSerializedState) { + j.mu.Lock() + defer j.mu.Unlock() + j.id = serializedState.ID + j.priority = serializedState.Priority + j.timeout = serializedState.Timeout + j.maxRetries = serializedState.MaxRetries + j.RetryDelay = serializedState.RetryDelay + j.dependencies = serializedState.Dependencies + j.stats = serializedState.Stats + j.logs = serializedState.Logs +} + +func (j *Job[T]) Archive() error { + return nil +} + // Execute executes the job func (j *Job[T]) Execute(ctx context.Context) (RunGenericResult, error) { startTime := time.Now() @@ -150,7 +195,7 @@ func (j *Job[T]) SetTimeout(timeout time.Duration) *Job[T] { j.mu.Lock() defer j.mu.Unlock() - j.timout = timeout + j.timeout = timeout return j } @@ -158,7 +203,7 @@ func (j *Job[T]) GetTimeout() time.Duration { j.mu.Lock() defer j.mu.Unlock() - return j.timout + return j.timeout } // SetMaxRetries sets the max retries of the job diff --git a/job_test.go b/job_test.go index 3400cef3f69ba342fe192334d4f7db1e004267f4..b95aac4f5d1093c75ab3d2bd292459ba2d501bdd 100644 --- a/job_test.go +++ b/job_test.go @@ -142,3 +142,51 @@ echo "Hello World" assert.Equal(t, 2, stats.SuccessCount) assert.Equal(t, 0, stats.ErrorCount) } + +func TestSerializeState(t *testing.T) { + // Initialize job + job := NewJob[DummyResult]("testJob", &DummyRunnable{}) + + job.SetPriority(PriorityHigh) + job.SetTimeout(5 * time.Minute) + job.SetMaxRetries(3) + job.SetRetryDelay(1 * time.Minute) + job.SetDependencies([]JobID{"dep1", "dep2"}) + + // Serialize state + serializedState := job.SerializeState() + + // Verify serialized state + assert.Equal(t, JobID("testJob"), serializedState.ID) + assert.Equal(t, PriorityHigh, serializedState.Priority) + assert.Equal(t, 5*time.Minute, serializedState.Timeout) + assert.Equal(t, uint(3), serializedState.MaxRetries) + assert.Equal(t, 1*time.Minute, serializedState.RetryDelay) + assert.ElementsMatch(t, []JobID{"dep1", "dep2"}, serializedState.Dependencies) +} + +func TestUnserializeState(t *testing.T) { + // Initialize serialized state + serializedState := JobSerializedState{ + ID: "testJob", + Priority: PriorityHigh, + Timeout: 5 * time.Minute, + MaxRetries: 3, + RetryDelay: 1 * time.Minute, + Dependencies: []JobID{"dep1", "dep2"}, + } + + // Initialize job + job := NewJob[DummyResult]("initialJob", &DummyRunnable{}) + + // Unserialize state + job.UnserializeState(serializedState) + + // Verify job properties + assert.Equal(t, JobID("testJob"), job.GetID()) + assert.Equal(t, PriorityHigh, job.GetPriority()) + assert.Equal(t, 5*time.Minute, job.GetTimeout()) + assert.Equal(t, uint(3), job.GetMaxRetries()) + assert.Equal(t, 1*time.Minute, job.GetRetryDelay()) + assert.ElementsMatch(t, []JobID{"dep1", "dep2"}, job.GetDependencies()) +} diff --git a/manager.go b/manager.go index 4b6062002b0a50ea8fb93f502aa8a69b1b934156..5e7fee84fc10310d337d6276ce2bcdc09ba572fd 100644 --- a/manager.go +++ b/manager.go @@ -22,6 +22,8 @@ type Manager struct { jobEventCh chan interface{} + stateManager StateManager + mu sync.Mutex } @@ -134,6 +136,12 @@ func (m *Manager) Start() error { return ErrManagerAlreadyRunning } + if m.stateManager != nil { + if err := m.stateManager.LoadState(); err != nil { + return err + } + } + if len(m.workerMap) == 0 { return ErrNoWorkers } @@ -206,6 +214,12 @@ func (m *Manager) Stop() error { wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error()) } + if m.stateManager != nil { + if err := m.stateManager.LoadState(); err != nil { + return err + } + } + return wrappedErr } diff --git a/manager_test.go b/manager_test.go index fa12cf1dfe89f1e74987c8fc0bcaac0526e28087..e991a2d1ba9059ae82fe961b483f3b3ea06bd39e 100644 --- a/manager_test.go +++ b/manager_test.go @@ -106,60 +106,79 @@ func TestManager_AddWorker(t *testing.T) { } func TestManager_RemoveWorker(t *testing.T) { + var err error m := NewManager() w := &MockWorker{id: "worker1", status: WorkerStatusStopped} - m.AddWorker(w) - - err := m.RemoveWorker(w) + err = m.AddWorker(w) + assert.Nil(t, err) + err = m.RemoveWorker(w) assert.Nil(t, err) assert.Equal(t, int(ManagerStateStopped), int(m.state)) } func TestManager_Start(t *testing.T) { + var err error m := NewManager() w := &MockWorker{id: "worker1", status: WorkerStatusStopped} - m.AddWorker(w) + err = m.AddWorker(w) + assert.Nil(t, err) - err := m.Start() + err = m.Start() + assert.Nil(t, err) assert.Nil(t, err) assert.Equal(t, int(ManagerStateRunning), int(m.state)) } func TestManager_Stop(t *testing.T) { + var err error m := NewManager() w := &MockWorker{id: "worker1", status: WorkerStatusStopped} - m.AddWorker(w) - m.Start() + err = m.AddWorker(w) + assert.Nil(t, err) + + err = m.Start() + assert.Nil(t, err) - err := m.Stop() + err = m.Stop() assert.Nil(t, err) assert.Equal(t, int(ManagerStateStopped), int(m.state)) } func TestManager_ScheduleJob(t *testing.T) { + var err error + m := NewManager() w := &MockWorker{id: "worker1", status: WorkerStatusStopped} - m.AddWorker(w) - m.Start() + err = m.AddWorker(w) + assert.Nil(t, err) + + err = m.Start() + assert.Nil(t, err) job := &MockGenericJob{ID: "job1"} scheduler := InstantScheduler{} - err := m.ScheduleJob(job, &scheduler) + err = m.ScheduleJob(job, &scheduler) assert.Nil(t, err) } func TestManager_CancelJob(t *testing.T) { + var err error + m := NewManager() w := &MockWorker{id: "worker1", status: WorkerStatusStopped} - m.AddWorker(w) - m.Start() + err = m.AddWorker(w) + assert.Nil(t, err) + + err = m.Start() + assert.Nil(t, err) job := &MockGenericJob{ID: "job1"} scheduler := InstantScheduler{} - m.ScheduleJob(job, &scheduler) + err = m.ScheduleJob(job, &scheduler) + assert.Nil(t, err) - err := m.CancelJob("job1") + err = m.CancelJob("job1") assert.Nil(t, err) } diff --git a/runnable-counter.go b/runnable-counter.go index 780768c0fe93b3e25adf46fc96eed69355ef8f5a..d019ce0549bb8308dc3b929efb5b7209a992b5c1 100644 --- a/runnable-counter.go +++ b/runnable-counter.go @@ -4,6 +4,14 @@ import ( "sync" ) +func NewCounterRunnableFromMap(data map[string]any) (*CounterRunnable, error) { + count, ok := data["Count"].(int) + if !ok { + return nil, ErrInvalidData + } + return &CounterRunnable{Count: count}, nil +} + // CounterResult is a result of a counter type CounterResult struct { Count int @@ -11,7 +19,7 @@ type CounterResult struct { // CounterRunnable is a runnable that counts type CounterRunnable struct { - Count int + Count int `json:"count" yaml:"count"` mu sync.Mutex } diff --git a/runnable-dummy.go b/runnable-dummy.go index 6e86990ab866360617b015691b7800243cb8368f..795fa7621287472c93dc2e3dfeb1d748bfa8e5fa 100644 --- a/runnable-dummy.go +++ b/runnable-dummy.go @@ -1,5 +1,9 @@ package jobqueue +func NewDummyRunnableFromMap(data map[string]any) (*DummyRunnable, error) { + return &DummyRunnable{}, nil +} + // DummyResult is a dummy result type DummyResult struct { } diff --git a/runnable-fileoperation.go b/runnable-fileoperation.go index 1aa1e0cca261498fb4a2b24b22640d009c1f2146..020bf73803e33e4c0339ccfac7a1d687bf5960da 100644 --- a/runnable-fileoperation.go +++ b/runnable-fileoperation.go @@ -4,6 +4,26 @@ import ( "os" ) +func NewFileOperationRunnableFromMap(data map[string]interface{}) (*FileOperationRunnable, error) { + operation, ok := data["Operation"].(string) + if !ok { + return nil, ErrInvalidData + } + + filePath, ok := data["FilePath"].(string) + if !ok { + return nil, ErrInvalidData + } + + content, _ := data["Content"].(string) // Optional, so no error check + + return &FileOperationRunnable{ + Operation: operation, + FilePath: filePath, + Content: content, + }, nil +} + type FileOperationResult struct { Success bool Content string // Optional, je nach Operation diff --git a/runnable-gorm.go b/runnable-gorm.go index 53cb581852d9a76404f9e018937137cfbdfd2677..0c66d2cb3442b4f59db523017aede7fcb535790d 100644 --- a/runnable-gorm.go +++ b/runnable-gorm.go @@ -5,6 +5,29 @@ import ( "gorm.io/gorm" ) +func NewDBRunnableFromMap(data map[string]interface{}) (*DBRunnable, error) { + t, ok := data["Type"].(string) + if !ok { + return nil, ErrInvalidData + } + + dsn, ok := data["DSN"].(string) + if !ok { + return nil, ErrInvalidData + } + + query, ok := data["Query"].(string) + if !ok { + return nil, ErrInvalidData + } + + return &DBRunnable{ + Type: t, + DSN: dsn, + Query: query, + }, nil +} + // DBResult is a result of a db query type DBResult struct { RowsAffected int diff --git a/runnable-http.go b/runnable-http.go index b1f498a45e8fe3e576c29b0c93cdd7396468ab01..07412a72a66b8561406228158df0630efe29e035 100644 --- a/runnable-http.go +++ b/runnable-http.go @@ -6,6 +6,35 @@ import ( "net/http" ) +func NewHTTPRunnableFromMap(data map[string]interface{}) (*HTTPRunnable, error) { + url, ok := data["URL"].(string) + if !ok { + return nil, ErrInvalidData + } + + method, ok := data["Method"].(string) + if !ok { + return nil, ErrInvalidData + } + + header, ok := data["Header"].(map[string]string) + if !ok { + return nil, ErrInvalidData + } + + body, ok := data["Body"].(string) + if !ok { + return nil, ErrInvalidData + } + + return &HTTPRunnable{ + URL: url, + Method: method, + Header: header, + Body: body, + }, nil +} + // HTTPResult is a result of a http request type HTTPResult struct { StatusCode int diff --git a/runnable-mail.go b/runnable-mail.go index 9927096786f3a076bf85345b92a94f798c7d56b3..a6b7f5e95aa5fac70328d07fc6f3df1742886eb6 100644 --- a/runnable-mail.go +++ b/runnable-mail.go @@ -4,6 +4,65 @@ import ( "net/smtp" ) +func NewMailRunnableFromMap(data map[string]interface{}) (*MailRunnable, error) { + to, ok := data["To"].(string) + if !ok { + return nil, ErrInvalidData + } + + from, ok := data["From"].(string) + if !ok { + return nil, ErrInvalidData + } + + subject, ok := data["Subject"].(string) + if !ok { + return nil, ErrInvalidData + } + + body, ok := data["Body"].(string) + if !ok { + return nil, ErrInvalidData + } + + server, ok := data["Server"].(string) + if !ok { + return nil, ErrInvalidData + } + + port, ok := data["Port"].(string) + if !ok { + return nil, ErrInvalidData + } + + username, ok := data["Username"].(string) + if !ok { + return nil, ErrInvalidData + } + + password, ok := data["Password"].(string) + if !ok { + return nil, ErrInvalidData + } + + headers, ok := data["Headers"].(map[string]string) + if !ok { + return nil, ErrInvalidData + } + + return &MailRunnable{ + To: to, + From: from, + Subject: subject, + Body: body, + Server: server, + Port: port, + Username: username, + Password: password, + Headers: headers, + }, nil +} + // MailResult is a result of a email type MailResult struct { Sent bool diff --git a/runnable-sftp.go b/runnable-sftp.go index c57400cfa6e445b6b07d6cb6c4098e844d98b86b..1fbdb7cb7a5ab630539e914edf6de4b52152e0a0 100644 --- a/runnable-sftp.go +++ b/runnable-sftp.go @@ -8,6 +8,28 @@ import ( "os" ) +func NewSFTPRunnableFromMap(data map[string]interface{}) (*SFTPRunnable, error) { + // Your map to struct conversion logic here + // e.g., + + host, ok := data["Host"].(string) + if !ok { + return nil, fmt.Errorf("invalid Host") + } + //... (other fields) + + transferDirection, ok := data["TransferDirection"].(string) + if !ok { + return nil, fmt.Errorf("invalid TransferDirection") + } + + return &SFTPRunnable{ + Host: host, + //... (other fields) + TransferDirection: Direction(transferDirection), + }, nil +} + // SFTPResult is a result of a sftp type SFTPResult struct { FilesCopied []string diff --git a/runnable-shell.go b/runnable-shell.go index af60f25d8a67cb93de4618430924b464cedb2f0f..5a6da9db697a05ebe4f6f74750a59511a640a9dd 100644 --- a/runnable-shell.go +++ b/runnable-shell.go @@ -5,6 +5,17 @@ import ( "strings" ) +func NewShellRunnableFromMap(data map[string]interface{}) (*ShellRunnable, error) { + scriptPath, ok := data["ScriptPath"].(string) + if !ok { + return nil, ErrInvalidData + } + + return &ShellRunnable{ + ScriptPath: scriptPath, + }, nil +} + // ShellResult is a result of a shell script type ShellResult struct { Output string diff --git a/state.go b/state.go new file mode 100644 index 0000000000000000000000000000000000000000..a9323ace29565fe5e01b5a6f6da74fe85c4081d3 --- /dev/null +++ b/state.go @@ -0,0 +1,55 @@ +package jobqueue + +import ( + "encoding/json" + "os" +) + +// State repräsentiert den Zustand, den wir speichern wollen +type State struct { + // Jobs enthält alle Jobs, die wir speichern wollen + Jobs []JobSerializedState `json:"jobs"` +} + +type StateManager interface { + LoadState() error + SaveState() error +} + +// FileStateManager implementiert StateManager +type FileStateManager struct { + filePath string + state *State +} + +// LoadState lädt den Zustand aus der Datei +func (f *FileStateManager) LoadState() error { + file, err := os.Open(f.filePath) + if err != nil { + return err + } + defer file.Close() + + decoder := json.NewDecoder(file) + if err := decoder.Decode(&f.state); err != nil { + return err + } + + return nil +} + +// SaveState speichert den Zustand in der Datei +func (f *FileStateManager) SaveState() error { + file, err := os.Create(f.filePath) + if err != nil { + return err + } + defer file.Close() + + encoder := json.NewEncoder(file) + if err := encoder.Encode(f.state); err != nil { + return err + } + + return nil +} diff --git a/state_test.go b/state_test.go new file mode 100644 index 0000000000000000000000000000000000000000..8a14b16abe19fa607434462f60df89b25f423f6b --- /dev/null +++ b/state_test.go @@ -0,0 +1,40 @@ +package jobqueue + +import ( + "os" + "path" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestFileStateManager(t *testing.T) { + + tmpDir := t.TempDir() + + tempFile := path.Join(tmpDir, "state.json") + defer os.Remove(tempFile) + + manager := &FileStateManager{ + filePath: tempFile, + state: &State{ + Jobs: []JobSerializedState{ + {ID: "1"}, + {ID: "2"}, + }, + }, + } + + err := manager.SaveState() + assert.NoError(t, err) + + manager2 := &FileStateManager{ + filePath: tempFile, + state: &State{}, + } + + err = manager2.LoadState() + assert.NoError(t, err) + + assert.Equal(t, manager.state, manager2.state) +}