diff --git a/job.go b/job.go index d574898870544ebfa6b1cc592cb0845bcd5e932c..f922cb3af4b90956c70074fca247d14777d6d279 100644 --- a/job.go +++ b/job.go @@ -131,7 +131,7 @@ func (j *Job[T]) GetScheduler() Scheduler { // Execute executes the job func (j *Job[T]) Execute(ctx context.Context) (RunGenericResult, error) { startTime := time.Now() - r, runnerError := j.runner.Run() + r, runnerError := j.runner.Run(ctx) endTime := time.Now() elapsedTime := endTime.Sub(startTime) diff --git a/job_test.go b/job_test.go index 3400cef3f69ba342fe192334d4f7db1e004267f4..79b54df160f438fa9154a21e86d805c19cac418f 100644 --- a/job_test.go +++ b/job_test.go @@ -113,7 +113,7 @@ echo "Hello World" if err != nil { t.Fatal(err) } - tmpfile.Write([]byte(scriptContent)) + _, _ = tmpfile.Write([]byte(scriptContent)) defer tmpfile.Close() @@ -135,7 +135,7 @@ echo "Hello World" assert.NotEqual(t, 0, stats.TimeMetrics.MinRunTime) // Simulate a failed run - job.Execute(context.Background()) // Assume Execute updates stats and returns an error + _, _ = job.Execute(context.Background()) // Assume Execute updates stats and returns an error stats = job.stats assert.Equal(t, 2, stats.RunCount) diff --git a/manager.go b/manager.go index 3d37ee42820f81d626d5a5099686bf98aa0ebe27..6c277d5ddc1c2b7759c437e7805b11dc95648db6 100644 --- a/manager.go +++ b/manager.go @@ -326,6 +326,7 @@ func (m *Manager) GetLogger() Logger { return m.logger } +// handleJobEvents handles job events func (m *Manager) handleJobEvents() { for event := range m.jobEventCh { diff --git a/runnable-counter.go b/runnable-counter.go index d5612e91bb48935f4b10c38847085b8f3f2ce733..5ae846fb7a6dff774b9f642aadc0c0149a28ad85 100644 --- a/runnable-counter.go +++ b/runnable-counter.go @@ -1,6 +1,7 @@ package jobqueue import ( + "context" "fmt" "sync" ) @@ -37,7 +38,7 @@ func (c *CounterRunnable) GetCount() int { } // Run runs the counter -func (c *CounterRunnable) Run() (RunResult[CounterResult], error) { +func (c *CounterRunnable) Run(_ context.Context) (RunResult[CounterResult], error) { c.mu.Lock() defer c.mu.Unlock() diff --git a/runnable-counter_test.go b/runnable-counter_test.go index 55d28ada1cf48a07c0612b6be703132637189598..16841ff84a38639d9b1ffd4b81127eed75baa86b 100644 --- a/runnable-counter_test.go +++ b/runnable-counter_test.go @@ -2,13 +2,17 @@ package jobqueue import ( "testing" + + "context" ) func TestRunnableCounter(t *testing.T) { runner := &CounterRunnable{} - r, err := runner.Run() + ctx := context.Background() + + r, err := runner.Run(ctx) if err != nil { t.Errorf("Unexpected error: %v", err) } diff --git a/runnable-dummy.go b/runnable-dummy.go index 387097317981c9d9170b63f0c072647d838169ce..14b68abca406bf1114a3e93f003b55370cbe3777 100644 --- a/runnable-dummy.go +++ b/runnable-dummy.go @@ -1,5 +1,7 @@ package jobqueue +import "context" + func NewDummyRunnableFromMap(data map[string]any) (*DummyRunnable, error) { return &DummyRunnable{}, nil } @@ -10,7 +12,7 @@ type DummyResult struct { type DummyRunnable struct{} -func (d *DummyRunnable) Run() (RunResult[DummyResult], error) { +func (d *DummyRunnable) Run(_ context.Context) (RunResult[DummyResult], error) { return RunResult[DummyResult]{ Status: ResultStatusSuccess, }, nil diff --git a/runnable-dummy_test.go b/runnable-dummy_test.go index b056ba1ed6b257f79190c3d14cca56e48f68cedc..797812807da7d8d7166782ecccfcb61ab58ab8c5 100644 --- a/runnable-dummy_test.go +++ b/runnable-dummy_test.go @@ -1,14 +1,16 @@ package jobqueue import ( + "context" "testing" ) func TestDummyRunnable(t *testing.T) { - + runner := &DummyRunnable{} - _, err := runner.Run() + ctx := context.Background() + _, err := runner.Run(ctx) if err != nil { t.Errorf("Unexpected error: %v", err) } diff --git a/runnable-fileoperation.go b/runnable-fileoperation.go index 600fc6d3d8e47e4c410ff65a894b7a4e82c96497..97a79ada01f09cf53dc1612af93cdedc454f023a 100644 --- a/runnable-fileoperation.go +++ b/runnable-fileoperation.go @@ -1,6 +1,7 @@ package jobqueue import ( + "context" "fmt" "os" ) @@ -47,7 +48,7 @@ type FileOperationRunnable struct { Content string // Optional, je nach Operation } -func (f *FileOperationRunnable) Run() (RunResult[FileOperationResult], error) { +func (f *FileOperationRunnable) Run(_ context.Context) (RunResult[FileOperationResult], error) { switch f.Operation { case FileOperationRead: content, err := os.ReadFile(f.FilePath) diff --git a/runnable-fileoperation_test.go b/runnable-fileoperation_test.go index c62c91fef58b59ba76a249e0e1488c17bb93e47d..79ebaecfd9226f6bd91c04b7c1979d701dabf86e 100644 --- a/runnable-fileoperation_test.go +++ b/runnable-fileoperation_test.go @@ -1,6 +1,7 @@ package jobqueue import ( + "context" "io/ioutil" "os" "path" @@ -15,26 +16,28 @@ func TestFileOperationRunnable(t *testing.T) { t.Fatalf("Failed to change directory: %v", err) } + ctx := context.Background() + testFilePath := path.Join(dir, "test.txt") testContent := "Hello, World!" // Test FileOperationCreate createRunner := FileOperationRunnable{Operation: FileOperationCreate, FilePath: testFilePath} - _, err = createRunner.Run() + _, err = createRunner.Run(ctx) if err != nil { t.Fatalf("Failed to create file: %v", err) } // Test FileOperationWrite writeRunner := FileOperationRunnable{Operation: FileOperationWrite, FilePath: testFilePath, Content: testContent} - _, err = writeRunner.Run() + _, err = writeRunner.Run(ctx) if err != nil { t.Fatalf("Failed to write to file: %v", err) } // Test FileOperationRead readRunner := FileOperationRunnable{Operation: FileOperationRead, FilePath: testFilePath} - result, err := readRunner.Run() + result, err := readRunner.Run(ctx) if err != nil || result.Data.Content != testContent { t.Fatalf("Failed to read from file: %v", err) } @@ -42,7 +45,7 @@ func TestFileOperationRunnable(t *testing.T) { // Test FileOperationAppend appendContent := " Appended." appendRunner := FileOperationRunnable{Operation: FileOperationAppend, FilePath: testFilePath, Content: appendContent} - _, err = appendRunner.Run() + _, err = appendRunner.Run(ctx) if err != nil { t.Fatalf("Failed to append to file: %v", err) } @@ -55,7 +58,7 @@ func TestFileOperationRunnable(t *testing.T) { // Test FileOperationDelete deleteRunner := FileOperationRunnable{Operation: FileOperationDelete, FilePath: testFilePath} - _, err = deleteRunner.Run() + _, err = deleteRunner.Run(ctx) if err != nil { t.Fatalf("Failed to delete file: %v", err) } diff --git a/runnable-gorm.go b/runnable-gorm.go index d4170711a57522008e0a6918957fbadb283e7c69..f9f7239bbc123ea65383a82249c6b0e0146e66d9 100644 --- a/runnable-gorm.go +++ b/runnable-gorm.go @@ -1,6 +1,7 @@ package jobqueue import ( + "context" "fmt" "gorm.io/driver/mysql" "gorm.io/gorm" @@ -38,34 +39,41 @@ type DBRunnable struct { Type string DSN string Query string - db *gorm.DB // internal for testing } -func (d *DBRunnable) Run() (RunResult[DBResult], error) { +type dbKey struct{} + +// DBRunnableWithDB returns a new context with the provided gorm.DB injected. +func DBRunnableWithDB(ctx context.Context, db *gorm.DB) context.Context { + return context.WithValue(ctx, dbKey{}, db) +} + +// GetDBFromContext tries to retrieve a *gorm.DB from the context. If it exists, returns the db and true, otherwise returns nil and false. +func getDBFromContext(ctx context.Context) (*gorm.DB, bool) { + db, ok := ctx.Value(dbKey{}).(*gorm.DB) + return db, ok +} + +func (d *DBRunnable) Run(ctx context.Context) (RunResult[DBResult], error) { var db *gorm.DB + var ok bool var err error - if d.db == nil { - + if db, ok = getDBFromContext(ctx); !ok { + // No *gorm.DB in context, create a new connection switch d.Type { case "mysql": db, err = gorm.Open(mysql.Open(d.DSN), &gorm.Config{}) - default: return RunResult[DBResult]{Status: ResultStatusFailed}, ErrUnsupportedDatabaseType } - } else { - db = d.db - } - if err != nil { - return RunResult[DBResult]{Status: ResultStatusFailed}, err + if err != nil { + return RunResult[DBResult]{Status: ResultStatusFailed}, err + } } - var result *gorm.DB - - result = db.Exec(d.Query) - + result := db.Exec(d.Query) if result.Error != nil { return RunResult[DBResult]{Status: ResultStatusFailed}, result.Error } diff --git a/runnable-gorm_test.go b/runnable-gorm_test.go index e124a0ea785ac1d3438779ddb6700220f2e10a8e..d9f8f71d1a474c20c3e2e28a66dcc3a60a1a1467 100644 --- a/runnable-gorm_test.go +++ b/runnable-gorm_test.go @@ -1,6 +1,7 @@ package jobqueue import ( + "context" "github.com/DATA-DOG/go-sqlmock" "gorm.io/driver/mysql" "gorm.io/gorm" @@ -29,12 +30,14 @@ func TestDBRunnable_Run(t *testing.T) { // Erstellen Sie die zu testende Instanz runnable := &DBRunnable{ Type: "mysql", - db: gormDB, // Injizierte Mock-DB Query: "SELECT * FROM table_name", } + ctx := context.Background() + ctx = DBRunnableWithDB(ctx, gormDB) + // Rufen Sie die Run()-Methode auf und überprüfen Sie die Ergebnisse - result, err := runnable.Run() + result, err := runnable.Run(ctx) // Überprüfungen hier if err != nil { diff --git a/runnable-http.go b/runnable-http.go index a9c48906027f703a348675747afa91219a97c0cb..b1a56d050e20e9636f1509819f8760e9e67395cb 100644 --- a/runnable-http.go +++ b/runnable-http.go @@ -2,6 +2,7 @@ package jobqueue import ( "bytes" + "context" "fmt" "io" "net/http" @@ -49,11 +50,11 @@ type HTTPRunnable struct { Body string } -func (h *HTTPRunnable) Run() (RunResult[HTTPResult], error) { +func (h *HTTPRunnable) Run(ctx context.Context) (RunResult[HTTPResult], error) { client := &http.Client{} reqBody := bytes.NewBufferString(h.Body) - req, err := http.NewRequest(h.Method, h.URL, reqBody) + req, err := http.NewRequestWithContext(ctx, h.Method, h.URL, reqBody) if err != nil { return RunResult[HTTPResult]{Status: ResultStatusFailed}, err } diff --git a/runnable-http_test.go b/runnable-http_test.go index e86779376da5f73690f5ef9397d032b7c75a0616..5fad88dfedc168de46932434faaa10d77fa40506 100644 --- a/runnable-http_test.go +++ b/runnable-http_test.go @@ -1,6 +1,7 @@ package jobqueue import ( + "context" "github.com/stretchr/testify/assert" "net/http" "net/http/httptest" @@ -23,7 +24,8 @@ func TestHTTPRunnable_Run(t *testing.T) { Body: "", } - result, err := httpRunnable.Run() + ctx := context.Background() + result, err := httpRunnable.Run(ctx) // Assertions assert.NoError(t, err) diff --git a/runnable-mail.go b/runnable-mail.go index dfe8364cae8c87c68f59710ab5daacaf387a7bb5..a6568901f3e6786cbfd0ffc005ec4d2486cf580a 100644 --- a/runnable-mail.go +++ b/runnable-mail.go @@ -1,6 +1,7 @@ package jobqueue import ( + "context" "fmt" "net/smtp" ) @@ -83,7 +84,7 @@ type MailRunnable struct { Headers map[string]string } -func (m *MailRunnable) Run() (RunResult[MailResult], error) { +func (m *MailRunnable) Run(_ context.Context) (RunResult[MailResult], error) { smtpServer := m.Server + ":" + m.Port diff --git a/runnable-mail_test.go b/runnable-mail_test.go index 11f509a60fcd48882087279a71d85310154f98e2..8492e6e37175c474937da9f30c01ca51d24bc030 100644 --- a/runnable-mail_test.go +++ b/runnable-mail_test.go @@ -170,7 +170,8 @@ func TestMailRunner(t *testing.T) { }, } - result, err := mailRunnable.Run() + xtx := context.Background() + result, err := mailRunnable.Run(xtx) // Assertions assert.NoError(t, err) diff --git a/runnable-sftp.go b/runnable-sftp.go index dd2bc3b13c31b97ca6161e4c9704903576b9c214..c526e235ea7b2f53ad550cc257f33aeea029f246 100644 --- a/runnable-sftp.go +++ b/runnable-sftp.go @@ -1,6 +1,7 @@ package jobqueue import ( + "context" "fmt" "github.com/pkg/sftp" "golang.org/x/crypto/ssh" @@ -103,7 +104,7 @@ type SFTPRunnable struct { TransferDirection Direction } -func (s *SFTPRunnable) Run() (RunResult[SFTPResult], error) { +func (s *SFTPRunnable) Run(_ context.Context) (RunResult[SFTPResult], error) { var authMethod ssh.AuthMethod @@ -134,7 +135,7 @@ func (s *SFTPRunnable) Run() (RunResult[SFTPResult], error) { hkCallback = ssh.FixedHostKey(hostKey) } else { if s.Insecure { - // #nosec + // #nosec hkCallback = ssh.InsecureIgnoreHostKey() } else { hkCallback = ssh.FixedHostKey(nil) diff --git a/runnable-sftp_test.go b/runnable-sftp_test.go index 6ec2dd31dacdedb42b249255c465c112f5c330cf..73967345b2c5b7d4535c4374aa4cc20063af438f 100644 --- a/runnable-sftp_test.go +++ b/runnable-sftp_test.go @@ -177,7 +177,8 @@ func TestSFTPCRunnerLocalToRemote(t *testing.T) { TransferDirection: LocalToRemote, } - result, err := sftpRunnable.Run() + ctx = context.Background() + result, err := sftpRunnable.Run(ctx) // Assertions assert.NoError(t, err) @@ -280,7 +281,8 @@ func TestSFTPCRunnerRemoteToLocal(t *testing.T) { } // Methode aufrufen - result, err := sftpRunnable.Run() + ctx = context.Background() + result, err := sftpRunnable.Run(ctx) // Assertions assert.NoError(t, err) diff --git a/runnable-shell.go b/runnable-shell.go index 62977494d9a99023f8b62f1ba2254c6c914967e0..cb28e62b2edfc27dc6c1749ab9472ca8f369a4d3 100644 --- a/runnable-shell.go +++ b/runnable-shell.go @@ -1,6 +1,7 @@ package jobqueue import ( + "context" "fmt" "os" "os/exec" @@ -38,7 +39,7 @@ type ShellRunnable struct { Script string } -func (s *ShellRunnable) Run() (RunResult[ShellResult], error) { +func (s *ShellRunnable) Run(ctx context.Context) (RunResult[ShellResult], error) { scriptPath := s.ScriptPath @@ -75,7 +76,7 @@ func (s *ShellRunnable) Run() (RunResult[ShellResult], error) { } // #nosec - cmd := exec.Command("sh", scriptPath) + cmd := exec.CommandContext(ctx, "sh", scriptPath) output, err := cmd.Output() var stderr []byte diff --git a/runnable-shell_test.go b/runnable-shell_test.go index eab6496e0f24514eaced8d0a6e472909f4ea1232..a67e015025725eaf3ae4d00859d1b39b0b04a483 100644 --- a/runnable-shell_test.go +++ b/runnable-shell_test.go @@ -1,6 +1,7 @@ package jobqueue import ( + "context" "os" "path" "testing" @@ -27,7 +28,8 @@ func TestShellRunnable_Run(t *testing.T) { shellRunnable := ShellRunnable{ScriptPath: fp.Name()} // Run-Methode aufrufen - result, err := shellRunnable.Run() + ctx := context.Background() + result, err := shellRunnable.Run(ctx) // Überprüfungen if err != nil { diff --git a/runnable.go b/runnable.go index a55b7572d7f3b26dc4c58362b89bd49c3df600a2..2143e1e9a5eea79c330ec761841c275656e69dfd 100644 --- a/runnable.go +++ b/runnable.go @@ -1,5 +1,9 @@ package jobqueue +import ( + "context" +) + type ResultStatus int const ( @@ -21,7 +25,7 @@ func (r RunResult[T]) GetStatus() ResultStatus { } type Runnable[T any] interface { - Run() (RunResult[T], error) + Run(ctx context.Context) (RunResult[T], error) GetType() string GetPersistence() RunnableImport diff --git a/runnable_test.go b/runnable_test.go index 5af9a42c6880e6fc8b2889d297f255baad31ccf0..037d54631498ffa970a4678e5e0cc4f04f5822ed 100644 --- a/runnable_test.go +++ b/runnable_test.go @@ -1,6 +1,7 @@ package jobqueue import ( + "context" "errors" "testing" ) @@ -8,21 +9,21 @@ import ( // MockSuccessfulRunnable gibt immer ResultStatusSuccess zurück type MockSuccessfulRunnable struct{} -func (m MockSuccessfulRunnable) Run() (RunResult[string], error) { +func (m MockSuccessfulRunnable) Run(ctx context.Context) (RunResult[string], error) { return RunResult[string]{Status: ResultStatusSuccess, Data: "Success"}, nil } // MockFailedRunnable gibt immer ResultStatusFailed zurück type MockFailedRunnable struct{} -func (m MockFailedRunnable) Run() (RunResult[string], error) { +func (m MockFailedRunnable) Run(ctx context.Context) (RunResult[string], error) { return RunResult[string]{Status: ResultStatusFailed, Data: "Failed"}, nil } // MockErrorRunnable gibt immer einen Fehler zurück type MockErrorRunnable struct{} -func (m MockErrorRunnable) Run() (RunResult[string], error) { +func (m MockErrorRunnable) Run(ctx context.Context) (RunResult[string], error) { return RunResult[string]{}, errors.New("RunError") } @@ -52,23 +53,25 @@ func (m MockSuccessfulRunnable) GetPersistence() RunnableImport { func TestRunnable(t *testing.T) { var run Runnable[string] + ctx := context.Background() + // Test für erfolgreiche Ausführung run = MockSuccessfulRunnable{} - result, err := run.Run() + result, err := run.Run(ctx) if result.Status != ResultStatusSuccess || err != nil { t.Errorf("Expected success, got %v, %v", result.Status, err) } // Test für fehlgeschlagene Ausführung run = MockFailedRunnable{} - result, err = run.Run() + result, err = run.Run(ctx) if result.Status != ResultStatusFailed || err != nil { t.Errorf("Expected failure, got %v, %v", result.Status, err) } // Test für Ausführungsfehler run = MockErrorRunnable{} - result, err = run.Run() + result, err = run.Run(ctx) if err == nil { t.Errorf("Expected error, got nil") } diff --git a/worker.go b/worker.go index d6a8254262392ebba50ad15c51ee6169d70827c6..45418fe31869ce9f695759abcc54a0334e56ab01 100644 --- a/worker.go +++ b/worker.go @@ -143,19 +143,22 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel var err error for retries > 0 { + var cancel context.CancelFunc + timeout := job.GetTimeout() - if timeout == 0 { - timeout = 1 * time.Minute + if timeout > 0 { + ctx, cancel = context.WithTimeout(ctx, timeout) } - ctxTimeout, cancelTimeout := context.WithTimeout(ctx, timeout) - if w.manager != nil && w.manager.logger != nil { w.manager.logger.Info("Executing job on worker thread", "worker", w.ID, "thread_id", workerThreadID, "job_id", job.GetID()) } - _, err = job.Execute(ctxTimeout) - cancelTimeout() + _, err = job.Execute(ctx) + + if cancel != nil { + cancel() + } if err == nil || ctx.Err() == context.Canceled { break @@ -170,6 +173,15 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel cancel() + if w.manager != nil && w.manager.dbSaver != nil { + err = w.manager.dbSaver.SaveJob(job) + if err != nil { + if w.manager.logger != nil { + w.manager.logger.Error("Error while saving job", "job_id", job.GetID(), "error", err) + } + } + } + case <-stopChan: stopFlag = true break