From 5f2b8c874bc6ce8465aed4fddd0f3499654d04ce Mon Sep 17 00:00:00 2001 From: Volker Schukai <volker.schukai@schukai.com> Date: Tue, 7 Nov 2023 20:38:06 +0100 Subject: [PATCH] feat: save jobs after execution #8 --- job.go | 2 +- job_test.go | 4 ++-- manager.go | 1 + runnable-counter.go | 3 ++- runnable-counter_test.go | 6 +++++- runnable-dummy.go | 4 +++- runnable-dummy_test.go | 6 ++++-- runnable-fileoperation.go | 3 ++- runnable-fileoperation_test.go | 13 +++++++----- runnable-gorm.go | 36 +++++++++++++++++++++------------- runnable-gorm_test.go | 7 +++++-- runnable-http.go | 5 +++-- runnable-http_test.go | 4 +++- runnable-mail.go | 3 ++- runnable-mail_test.go | 3 ++- runnable-sftp.go | 5 +++-- runnable-sftp_test.go | 6 ++++-- runnable-shell.go | 5 +++-- runnable-shell_test.go | 4 +++- runnable.go | 6 +++++- runnable_test.go | 15 ++++++++------ worker.go | 24 +++++++++++++++++------ 22 files changed, 110 insertions(+), 55 deletions(-) diff --git a/job.go b/job.go index d574898..f922cb3 100644 --- a/job.go +++ b/job.go @@ -131,7 +131,7 @@ func (j *Job[T]) GetScheduler() Scheduler { // Execute executes the job func (j *Job[T]) Execute(ctx context.Context) (RunGenericResult, error) { startTime := time.Now() - r, runnerError := j.runner.Run() + r, runnerError := j.runner.Run(ctx) endTime := time.Now() elapsedTime := endTime.Sub(startTime) diff --git a/job_test.go b/job_test.go index 3400cef..79b54df 100644 --- a/job_test.go +++ b/job_test.go @@ -113,7 +113,7 @@ echo "Hello World" if err != nil { t.Fatal(err) } - tmpfile.Write([]byte(scriptContent)) + _, _ = tmpfile.Write([]byte(scriptContent)) defer tmpfile.Close() @@ -135,7 +135,7 @@ echo "Hello World" assert.NotEqual(t, 0, stats.TimeMetrics.MinRunTime) // Simulate a failed run - job.Execute(context.Background()) // Assume Execute updates stats and returns an error + _, _ = job.Execute(context.Background()) // Assume Execute updates stats and returns an error stats = job.stats assert.Equal(t, 2, stats.RunCount) diff --git a/manager.go b/manager.go index 3d37ee4..6c277d5 100644 --- a/manager.go +++ b/manager.go @@ -326,6 +326,7 @@ func (m *Manager) GetLogger() Logger { return m.logger } +// handleJobEvents handles job events func (m *Manager) handleJobEvents() { for event := range m.jobEventCh { diff --git a/runnable-counter.go b/runnable-counter.go index d5612e9..5ae846f 100644 --- a/runnable-counter.go +++ b/runnable-counter.go @@ -1,6 +1,7 @@ package jobqueue import ( + "context" "fmt" "sync" ) @@ -37,7 +38,7 @@ func (c *CounterRunnable) GetCount() int { } // Run runs the counter -func (c *CounterRunnable) Run() (RunResult[CounterResult], error) { +func (c *CounterRunnable) Run(_ context.Context) (RunResult[CounterResult], error) { c.mu.Lock() defer c.mu.Unlock() diff --git a/runnable-counter_test.go b/runnable-counter_test.go index 55d28ad..16841ff 100644 --- a/runnable-counter_test.go +++ b/runnable-counter_test.go @@ -2,13 +2,17 @@ package jobqueue import ( "testing" + + "context" ) func TestRunnableCounter(t *testing.T) { runner := &CounterRunnable{} - r, err := runner.Run() + ctx := context.Background() + + r, err := runner.Run(ctx) if err != nil { t.Errorf("Unexpected error: %v", err) } diff --git a/runnable-dummy.go b/runnable-dummy.go index 3870973..14b68ab 100644 --- a/runnable-dummy.go +++ b/runnable-dummy.go @@ -1,5 +1,7 @@ package jobqueue +import "context" + func NewDummyRunnableFromMap(data map[string]any) (*DummyRunnable, error) { return &DummyRunnable{}, nil } @@ -10,7 +12,7 @@ type DummyResult struct { type DummyRunnable struct{} -func (d *DummyRunnable) Run() (RunResult[DummyResult], error) { +func (d *DummyRunnable) Run(_ context.Context) (RunResult[DummyResult], error) { return RunResult[DummyResult]{ Status: ResultStatusSuccess, }, nil diff --git a/runnable-dummy_test.go b/runnable-dummy_test.go index b056ba1..7978128 100644 --- a/runnable-dummy_test.go +++ b/runnable-dummy_test.go @@ -1,14 +1,16 @@ package jobqueue import ( + "context" "testing" ) func TestDummyRunnable(t *testing.T) { - + runner := &DummyRunnable{} - _, err := runner.Run() + ctx := context.Background() + _, err := runner.Run(ctx) if err != nil { t.Errorf("Unexpected error: %v", err) } diff --git a/runnable-fileoperation.go b/runnable-fileoperation.go index 600fc6d..97a79ad 100644 --- a/runnable-fileoperation.go +++ b/runnable-fileoperation.go @@ -1,6 +1,7 @@ package jobqueue import ( + "context" "fmt" "os" ) @@ -47,7 +48,7 @@ type FileOperationRunnable struct { Content string // Optional, je nach Operation } -func (f *FileOperationRunnable) Run() (RunResult[FileOperationResult], error) { +func (f *FileOperationRunnable) Run(_ context.Context) (RunResult[FileOperationResult], error) { switch f.Operation { case FileOperationRead: content, err := os.ReadFile(f.FilePath) diff --git a/runnable-fileoperation_test.go b/runnable-fileoperation_test.go index c62c91f..79ebaec 100644 --- a/runnable-fileoperation_test.go +++ b/runnable-fileoperation_test.go @@ -1,6 +1,7 @@ package jobqueue import ( + "context" "io/ioutil" "os" "path" @@ -15,26 +16,28 @@ func TestFileOperationRunnable(t *testing.T) { t.Fatalf("Failed to change directory: %v", err) } + ctx := context.Background() + testFilePath := path.Join(dir, "test.txt") testContent := "Hello, World!" // Test FileOperationCreate createRunner := FileOperationRunnable{Operation: FileOperationCreate, FilePath: testFilePath} - _, err = createRunner.Run() + _, err = createRunner.Run(ctx) if err != nil { t.Fatalf("Failed to create file: %v", err) } // Test FileOperationWrite writeRunner := FileOperationRunnable{Operation: FileOperationWrite, FilePath: testFilePath, Content: testContent} - _, err = writeRunner.Run() + _, err = writeRunner.Run(ctx) if err != nil { t.Fatalf("Failed to write to file: %v", err) } // Test FileOperationRead readRunner := FileOperationRunnable{Operation: FileOperationRead, FilePath: testFilePath} - result, err := readRunner.Run() + result, err := readRunner.Run(ctx) if err != nil || result.Data.Content != testContent { t.Fatalf("Failed to read from file: %v", err) } @@ -42,7 +45,7 @@ func TestFileOperationRunnable(t *testing.T) { // Test FileOperationAppend appendContent := " Appended." appendRunner := FileOperationRunnable{Operation: FileOperationAppend, FilePath: testFilePath, Content: appendContent} - _, err = appendRunner.Run() + _, err = appendRunner.Run(ctx) if err != nil { t.Fatalf("Failed to append to file: %v", err) } @@ -55,7 +58,7 @@ func TestFileOperationRunnable(t *testing.T) { // Test FileOperationDelete deleteRunner := FileOperationRunnable{Operation: FileOperationDelete, FilePath: testFilePath} - _, err = deleteRunner.Run() + _, err = deleteRunner.Run(ctx) if err != nil { t.Fatalf("Failed to delete file: %v", err) } diff --git a/runnable-gorm.go b/runnable-gorm.go index d417071..f9f7239 100644 --- a/runnable-gorm.go +++ b/runnable-gorm.go @@ -1,6 +1,7 @@ package jobqueue import ( + "context" "fmt" "gorm.io/driver/mysql" "gorm.io/gorm" @@ -38,34 +39,41 @@ type DBRunnable struct { Type string DSN string Query string - db *gorm.DB // internal for testing } -func (d *DBRunnable) Run() (RunResult[DBResult], error) { +type dbKey struct{} + +// DBRunnableWithDB returns a new context with the provided gorm.DB injected. +func DBRunnableWithDB(ctx context.Context, db *gorm.DB) context.Context { + return context.WithValue(ctx, dbKey{}, db) +} + +// GetDBFromContext tries to retrieve a *gorm.DB from the context. If it exists, returns the db and true, otherwise returns nil and false. +func getDBFromContext(ctx context.Context) (*gorm.DB, bool) { + db, ok := ctx.Value(dbKey{}).(*gorm.DB) + return db, ok +} + +func (d *DBRunnable) Run(ctx context.Context) (RunResult[DBResult], error) { var db *gorm.DB + var ok bool var err error - if d.db == nil { - + if db, ok = getDBFromContext(ctx); !ok { + // No *gorm.DB in context, create a new connection switch d.Type { case "mysql": db, err = gorm.Open(mysql.Open(d.DSN), &gorm.Config{}) - default: return RunResult[DBResult]{Status: ResultStatusFailed}, ErrUnsupportedDatabaseType } - } else { - db = d.db - } - if err != nil { - return RunResult[DBResult]{Status: ResultStatusFailed}, err + if err != nil { + return RunResult[DBResult]{Status: ResultStatusFailed}, err + } } - var result *gorm.DB - - result = db.Exec(d.Query) - + result := db.Exec(d.Query) if result.Error != nil { return RunResult[DBResult]{Status: ResultStatusFailed}, result.Error } diff --git a/runnable-gorm_test.go b/runnable-gorm_test.go index e124a0e..d9f8f71 100644 --- a/runnable-gorm_test.go +++ b/runnable-gorm_test.go @@ -1,6 +1,7 @@ package jobqueue import ( + "context" "github.com/DATA-DOG/go-sqlmock" "gorm.io/driver/mysql" "gorm.io/gorm" @@ -29,12 +30,14 @@ func TestDBRunnable_Run(t *testing.T) { // Erstellen Sie die zu testende Instanz runnable := &DBRunnable{ Type: "mysql", - db: gormDB, // Injizierte Mock-DB Query: "SELECT * FROM table_name", } + ctx := context.Background() + ctx = DBRunnableWithDB(ctx, gormDB) + // Rufen Sie die Run()-Methode auf und überprüfen Sie die Ergebnisse - result, err := runnable.Run() + result, err := runnable.Run(ctx) // Überprüfungen hier if err != nil { diff --git a/runnable-http.go b/runnable-http.go index a9c4890..b1a56d0 100644 --- a/runnable-http.go +++ b/runnable-http.go @@ -2,6 +2,7 @@ package jobqueue import ( "bytes" + "context" "fmt" "io" "net/http" @@ -49,11 +50,11 @@ type HTTPRunnable struct { Body string } -func (h *HTTPRunnable) Run() (RunResult[HTTPResult], error) { +func (h *HTTPRunnable) Run(ctx context.Context) (RunResult[HTTPResult], error) { client := &http.Client{} reqBody := bytes.NewBufferString(h.Body) - req, err := http.NewRequest(h.Method, h.URL, reqBody) + req, err := http.NewRequestWithContext(ctx, h.Method, h.URL, reqBody) if err != nil { return RunResult[HTTPResult]{Status: ResultStatusFailed}, err } diff --git a/runnable-http_test.go b/runnable-http_test.go index e867793..5fad88d 100644 --- a/runnable-http_test.go +++ b/runnable-http_test.go @@ -1,6 +1,7 @@ package jobqueue import ( + "context" "github.com/stretchr/testify/assert" "net/http" "net/http/httptest" @@ -23,7 +24,8 @@ func TestHTTPRunnable_Run(t *testing.T) { Body: "", } - result, err := httpRunnable.Run() + ctx := context.Background() + result, err := httpRunnable.Run(ctx) // Assertions assert.NoError(t, err) diff --git a/runnable-mail.go b/runnable-mail.go index dfe8364..a656890 100644 --- a/runnable-mail.go +++ b/runnable-mail.go @@ -1,6 +1,7 @@ package jobqueue import ( + "context" "fmt" "net/smtp" ) @@ -83,7 +84,7 @@ type MailRunnable struct { Headers map[string]string } -func (m *MailRunnable) Run() (RunResult[MailResult], error) { +func (m *MailRunnable) Run(_ context.Context) (RunResult[MailResult], error) { smtpServer := m.Server + ":" + m.Port diff --git a/runnable-mail_test.go b/runnable-mail_test.go index 11f509a..8492e6e 100644 --- a/runnable-mail_test.go +++ b/runnable-mail_test.go @@ -170,7 +170,8 @@ func TestMailRunner(t *testing.T) { }, } - result, err := mailRunnable.Run() + xtx := context.Background() + result, err := mailRunnable.Run(xtx) // Assertions assert.NoError(t, err) diff --git a/runnable-sftp.go b/runnable-sftp.go index dd2bc3b..c526e23 100644 --- a/runnable-sftp.go +++ b/runnable-sftp.go @@ -1,6 +1,7 @@ package jobqueue import ( + "context" "fmt" "github.com/pkg/sftp" "golang.org/x/crypto/ssh" @@ -103,7 +104,7 @@ type SFTPRunnable struct { TransferDirection Direction } -func (s *SFTPRunnable) Run() (RunResult[SFTPResult], error) { +func (s *SFTPRunnable) Run(_ context.Context) (RunResult[SFTPResult], error) { var authMethod ssh.AuthMethod @@ -134,7 +135,7 @@ func (s *SFTPRunnable) Run() (RunResult[SFTPResult], error) { hkCallback = ssh.FixedHostKey(hostKey) } else { if s.Insecure { - // #nosec + // #nosec hkCallback = ssh.InsecureIgnoreHostKey() } else { hkCallback = ssh.FixedHostKey(nil) diff --git a/runnable-sftp_test.go b/runnable-sftp_test.go index 6ec2dd3..7396734 100644 --- a/runnable-sftp_test.go +++ b/runnable-sftp_test.go @@ -177,7 +177,8 @@ func TestSFTPCRunnerLocalToRemote(t *testing.T) { TransferDirection: LocalToRemote, } - result, err := sftpRunnable.Run() + ctx = context.Background() + result, err := sftpRunnable.Run(ctx) // Assertions assert.NoError(t, err) @@ -280,7 +281,8 @@ func TestSFTPCRunnerRemoteToLocal(t *testing.T) { } // Methode aufrufen - result, err := sftpRunnable.Run() + ctx = context.Background() + result, err := sftpRunnable.Run(ctx) // Assertions assert.NoError(t, err) diff --git a/runnable-shell.go b/runnable-shell.go index 6297749..cb28e62 100644 --- a/runnable-shell.go +++ b/runnable-shell.go @@ -1,6 +1,7 @@ package jobqueue import ( + "context" "fmt" "os" "os/exec" @@ -38,7 +39,7 @@ type ShellRunnable struct { Script string } -func (s *ShellRunnable) Run() (RunResult[ShellResult], error) { +func (s *ShellRunnable) Run(ctx context.Context) (RunResult[ShellResult], error) { scriptPath := s.ScriptPath @@ -75,7 +76,7 @@ func (s *ShellRunnable) Run() (RunResult[ShellResult], error) { } // #nosec - cmd := exec.Command("sh", scriptPath) + cmd := exec.CommandContext(ctx, "sh", scriptPath) output, err := cmd.Output() var stderr []byte diff --git a/runnable-shell_test.go b/runnable-shell_test.go index eab6496..a67e015 100644 --- a/runnable-shell_test.go +++ b/runnable-shell_test.go @@ -1,6 +1,7 @@ package jobqueue import ( + "context" "os" "path" "testing" @@ -27,7 +28,8 @@ func TestShellRunnable_Run(t *testing.T) { shellRunnable := ShellRunnable{ScriptPath: fp.Name()} // Run-Methode aufrufen - result, err := shellRunnable.Run() + ctx := context.Background() + result, err := shellRunnable.Run(ctx) // Überprüfungen if err != nil { diff --git a/runnable.go b/runnable.go index a55b757..2143e1e 100644 --- a/runnable.go +++ b/runnable.go @@ -1,5 +1,9 @@ package jobqueue +import ( + "context" +) + type ResultStatus int const ( @@ -21,7 +25,7 @@ func (r RunResult[T]) GetStatus() ResultStatus { } type Runnable[T any] interface { - Run() (RunResult[T], error) + Run(ctx context.Context) (RunResult[T], error) GetType() string GetPersistence() RunnableImport diff --git a/runnable_test.go b/runnable_test.go index 5af9a42..037d546 100644 --- a/runnable_test.go +++ b/runnable_test.go @@ -1,6 +1,7 @@ package jobqueue import ( + "context" "errors" "testing" ) @@ -8,21 +9,21 @@ import ( // MockSuccessfulRunnable gibt immer ResultStatusSuccess zurück type MockSuccessfulRunnable struct{} -func (m MockSuccessfulRunnable) Run() (RunResult[string], error) { +func (m MockSuccessfulRunnable) Run(ctx context.Context) (RunResult[string], error) { return RunResult[string]{Status: ResultStatusSuccess, Data: "Success"}, nil } // MockFailedRunnable gibt immer ResultStatusFailed zurück type MockFailedRunnable struct{} -func (m MockFailedRunnable) Run() (RunResult[string], error) { +func (m MockFailedRunnable) Run(ctx context.Context) (RunResult[string], error) { return RunResult[string]{Status: ResultStatusFailed, Data: "Failed"}, nil } // MockErrorRunnable gibt immer einen Fehler zurück type MockErrorRunnable struct{} -func (m MockErrorRunnable) Run() (RunResult[string], error) { +func (m MockErrorRunnable) Run(ctx context.Context) (RunResult[string], error) { return RunResult[string]{}, errors.New("RunError") } @@ -52,23 +53,25 @@ func (m MockSuccessfulRunnable) GetPersistence() RunnableImport { func TestRunnable(t *testing.T) { var run Runnable[string] + ctx := context.Background() + // Test für erfolgreiche Ausführung run = MockSuccessfulRunnable{} - result, err := run.Run() + result, err := run.Run(ctx) if result.Status != ResultStatusSuccess || err != nil { t.Errorf("Expected success, got %v, %v", result.Status, err) } // Test für fehlgeschlagene Ausführung run = MockFailedRunnable{} - result, err = run.Run() + result, err = run.Run(ctx) if result.Status != ResultStatusFailed || err != nil { t.Errorf("Expected failure, got %v, %v", result.Status, err) } // Test für Ausführungsfehler run = MockErrorRunnable{} - result, err = run.Run() + result, err = run.Run(ctx) if err == nil { t.Errorf("Expected error, got nil") } diff --git a/worker.go b/worker.go index d6a8254..45418fe 100644 --- a/worker.go +++ b/worker.go @@ -143,19 +143,22 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel var err error for retries > 0 { + var cancel context.CancelFunc + timeout := job.GetTimeout() - if timeout == 0 { - timeout = 1 * time.Minute + if timeout > 0 { + ctx, cancel = context.WithTimeout(ctx, timeout) } - ctxTimeout, cancelTimeout := context.WithTimeout(ctx, timeout) - if w.manager != nil && w.manager.logger != nil { w.manager.logger.Info("Executing job on worker thread", "worker", w.ID, "thread_id", workerThreadID, "job_id", job.GetID()) } - _, err = job.Execute(ctxTimeout) - cancelTimeout() + _, err = job.Execute(ctx) + + if cancel != nil { + cancel() + } if err == nil || ctx.Err() == context.Canceled { break @@ -170,6 +173,15 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel cancel() + if w.manager != nil && w.manager.dbSaver != nil { + err = w.manager.dbSaver.SaveJob(job) + if err != nil { + if w.manager.logger != nil { + w.manager.logger.Error("Error while saving job", "job_id", job.GetID(), "error", err) + } + } + } + case <-stopChan: stopFlag = true break -- GitLab