Skip to content
Snippets Groups Projects
Verified Commit 5f2b8c87 authored by Volker Schukai's avatar Volker Schukai :alien:
Browse files

feat: save jobs after execution #8

parent 8d575c14
Branches
Tags v1.6.0
No related merge requests found
...@@ -131,7 +131,7 @@ func (j *Job[T]) GetScheduler() Scheduler { ...@@ -131,7 +131,7 @@ func (j *Job[T]) GetScheduler() Scheduler {
// Execute executes the job // Execute executes the job
func (j *Job[T]) Execute(ctx context.Context) (RunGenericResult, error) { func (j *Job[T]) Execute(ctx context.Context) (RunGenericResult, error) {
startTime := time.Now() startTime := time.Now()
r, runnerError := j.runner.Run() r, runnerError := j.runner.Run(ctx)
endTime := time.Now() endTime := time.Now()
elapsedTime := endTime.Sub(startTime) elapsedTime := endTime.Sub(startTime)
......
...@@ -113,7 +113,7 @@ echo "Hello World" ...@@ -113,7 +113,7 @@ echo "Hello World"
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
tmpfile.Write([]byte(scriptContent)) _, _ = tmpfile.Write([]byte(scriptContent))
defer tmpfile.Close() defer tmpfile.Close()
...@@ -135,7 +135,7 @@ echo "Hello World" ...@@ -135,7 +135,7 @@ echo "Hello World"
assert.NotEqual(t, 0, stats.TimeMetrics.MinRunTime) assert.NotEqual(t, 0, stats.TimeMetrics.MinRunTime)
// Simulate a failed run // Simulate a failed run
job.Execute(context.Background()) // Assume Execute updates stats and returns an error _, _ = job.Execute(context.Background()) // Assume Execute updates stats and returns an error
stats = job.stats stats = job.stats
assert.Equal(t, 2, stats.RunCount) assert.Equal(t, 2, stats.RunCount)
......
...@@ -326,6 +326,7 @@ func (m *Manager) GetLogger() Logger { ...@@ -326,6 +326,7 @@ func (m *Manager) GetLogger() Logger {
return m.logger return m.logger
} }
// handleJobEvents handles job events
func (m *Manager) handleJobEvents() { func (m *Manager) handleJobEvents() {
for event := range m.jobEventCh { for event := range m.jobEventCh {
......
package jobqueue package jobqueue
import ( import (
"context"
"fmt" "fmt"
"sync" "sync"
) )
...@@ -37,7 +38,7 @@ func (c *CounterRunnable) GetCount() int { ...@@ -37,7 +38,7 @@ func (c *CounterRunnable) GetCount() int {
} }
// Run runs the counter // Run runs the counter
func (c *CounterRunnable) Run() (RunResult[CounterResult], error) { func (c *CounterRunnable) Run(_ context.Context) (RunResult[CounterResult], error) {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
......
...@@ -2,13 +2,17 @@ package jobqueue ...@@ -2,13 +2,17 @@ package jobqueue
import ( import (
"testing" "testing"
"context"
) )
func TestRunnableCounter(t *testing.T) { func TestRunnableCounter(t *testing.T) {
runner := &CounterRunnable{} runner := &CounterRunnable{}
r, err := runner.Run() ctx := context.Background()
r, err := runner.Run(ctx)
if err != nil { if err != nil {
t.Errorf("Unexpected error: %v", err) t.Errorf("Unexpected error: %v", err)
} }
......
package jobqueue package jobqueue
import "context"
func NewDummyRunnableFromMap(data map[string]any) (*DummyRunnable, error) { func NewDummyRunnableFromMap(data map[string]any) (*DummyRunnable, error) {
return &DummyRunnable{}, nil return &DummyRunnable{}, nil
} }
...@@ -10,7 +12,7 @@ type DummyResult struct { ...@@ -10,7 +12,7 @@ type DummyResult struct {
type DummyRunnable struct{} type DummyRunnable struct{}
func (d *DummyRunnable) Run() (RunResult[DummyResult], error) { func (d *DummyRunnable) Run(_ context.Context) (RunResult[DummyResult], error) {
return RunResult[DummyResult]{ return RunResult[DummyResult]{
Status: ResultStatusSuccess, Status: ResultStatusSuccess,
}, nil }, nil
......
package jobqueue package jobqueue
import ( import (
"context"
"testing" "testing"
) )
...@@ -8,7 +9,8 @@ func TestDummyRunnable(t *testing.T) { ...@@ -8,7 +9,8 @@ func TestDummyRunnable(t *testing.T) {
runner := &DummyRunnable{} runner := &DummyRunnable{}
_, err := runner.Run() ctx := context.Background()
_, err := runner.Run(ctx)
if err != nil { if err != nil {
t.Errorf("Unexpected error: %v", err) t.Errorf("Unexpected error: %v", err)
} }
......
package jobqueue package jobqueue
import ( import (
"context"
"fmt" "fmt"
"os" "os"
) )
...@@ -47,7 +48,7 @@ type FileOperationRunnable struct { ...@@ -47,7 +48,7 @@ type FileOperationRunnable struct {
Content string // Optional, je nach Operation Content string // Optional, je nach Operation
} }
func (f *FileOperationRunnable) Run() (RunResult[FileOperationResult], error) { func (f *FileOperationRunnable) Run(_ context.Context) (RunResult[FileOperationResult], error) {
switch f.Operation { switch f.Operation {
case FileOperationRead: case FileOperationRead:
content, err := os.ReadFile(f.FilePath) content, err := os.ReadFile(f.FilePath)
......
package jobqueue package jobqueue
import ( import (
"context"
"io/ioutil" "io/ioutil"
"os" "os"
"path" "path"
...@@ -15,26 +16,28 @@ func TestFileOperationRunnable(t *testing.T) { ...@@ -15,26 +16,28 @@ func TestFileOperationRunnable(t *testing.T) {
t.Fatalf("Failed to change directory: %v", err) t.Fatalf("Failed to change directory: %v", err)
} }
ctx := context.Background()
testFilePath := path.Join(dir, "test.txt") testFilePath := path.Join(dir, "test.txt")
testContent := "Hello, World!" testContent := "Hello, World!"
// Test FileOperationCreate // Test FileOperationCreate
createRunner := FileOperationRunnable{Operation: FileOperationCreate, FilePath: testFilePath} createRunner := FileOperationRunnable{Operation: FileOperationCreate, FilePath: testFilePath}
_, err = createRunner.Run() _, err = createRunner.Run(ctx)
if err != nil { if err != nil {
t.Fatalf("Failed to create file: %v", err) t.Fatalf("Failed to create file: %v", err)
} }
// Test FileOperationWrite // Test FileOperationWrite
writeRunner := FileOperationRunnable{Operation: FileOperationWrite, FilePath: testFilePath, Content: testContent} writeRunner := FileOperationRunnable{Operation: FileOperationWrite, FilePath: testFilePath, Content: testContent}
_, err = writeRunner.Run() _, err = writeRunner.Run(ctx)
if err != nil { if err != nil {
t.Fatalf("Failed to write to file: %v", err) t.Fatalf("Failed to write to file: %v", err)
} }
// Test FileOperationRead // Test FileOperationRead
readRunner := FileOperationRunnable{Operation: FileOperationRead, FilePath: testFilePath} readRunner := FileOperationRunnable{Operation: FileOperationRead, FilePath: testFilePath}
result, err := readRunner.Run() result, err := readRunner.Run(ctx)
if err != nil || result.Data.Content != testContent { if err != nil || result.Data.Content != testContent {
t.Fatalf("Failed to read from file: %v", err) t.Fatalf("Failed to read from file: %v", err)
} }
...@@ -42,7 +45,7 @@ func TestFileOperationRunnable(t *testing.T) { ...@@ -42,7 +45,7 @@ func TestFileOperationRunnable(t *testing.T) {
// Test FileOperationAppend // Test FileOperationAppend
appendContent := " Appended." appendContent := " Appended."
appendRunner := FileOperationRunnable{Operation: FileOperationAppend, FilePath: testFilePath, Content: appendContent} appendRunner := FileOperationRunnable{Operation: FileOperationAppend, FilePath: testFilePath, Content: appendContent}
_, err = appendRunner.Run() _, err = appendRunner.Run(ctx)
if err != nil { if err != nil {
t.Fatalf("Failed to append to file: %v", err) t.Fatalf("Failed to append to file: %v", err)
} }
...@@ -55,7 +58,7 @@ func TestFileOperationRunnable(t *testing.T) { ...@@ -55,7 +58,7 @@ func TestFileOperationRunnable(t *testing.T) {
// Test FileOperationDelete // Test FileOperationDelete
deleteRunner := FileOperationRunnable{Operation: FileOperationDelete, FilePath: testFilePath} deleteRunner := FileOperationRunnable{Operation: FileOperationDelete, FilePath: testFilePath}
_, err = deleteRunner.Run() _, err = deleteRunner.Run(ctx)
if err != nil { if err != nil {
t.Fatalf("Failed to delete file: %v", err) t.Fatalf("Failed to delete file: %v", err)
} }
......
package jobqueue package jobqueue
import ( import (
"context"
"fmt" "fmt"
"gorm.io/driver/mysql" "gorm.io/driver/mysql"
"gorm.io/gorm" "gorm.io/gorm"
...@@ -38,34 +39,41 @@ type DBRunnable struct { ...@@ -38,34 +39,41 @@ type DBRunnable struct {
Type string Type string
DSN string DSN string
Query string Query string
db *gorm.DB // internal for testing
} }
func (d *DBRunnable) Run() (RunResult[DBResult], error) { type dbKey struct{}
// DBRunnableWithDB returns a new context with the provided gorm.DB injected.
func DBRunnableWithDB(ctx context.Context, db *gorm.DB) context.Context {
return context.WithValue(ctx, dbKey{}, db)
}
// GetDBFromContext tries to retrieve a *gorm.DB from the context. If it exists, returns the db and true, otherwise returns nil and false.
func getDBFromContext(ctx context.Context) (*gorm.DB, bool) {
db, ok := ctx.Value(dbKey{}).(*gorm.DB)
return db, ok
}
func (d *DBRunnable) Run(ctx context.Context) (RunResult[DBResult], error) {
var db *gorm.DB var db *gorm.DB
var ok bool
var err error var err error
if d.db == nil { if db, ok = getDBFromContext(ctx); !ok {
// No *gorm.DB in context, create a new connection
switch d.Type { switch d.Type {
case "mysql": case "mysql":
db, err = gorm.Open(mysql.Open(d.DSN), &gorm.Config{}) db, err = gorm.Open(mysql.Open(d.DSN), &gorm.Config{})
default: default:
return RunResult[DBResult]{Status: ResultStatusFailed}, ErrUnsupportedDatabaseType return RunResult[DBResult]{Status: ResultStatusFailed}, ErrUnsupportedDatabaseType
} }
} else {
db = d.db
}
if err != nil { if err != nil {
return RunResult[DBResult]{Status: ResultStatusFailed}, err return RunResult[DBResult]{Status: ResultStatusFailed}, err
} }
}
var result *gorm.DB result := db.Exec(d.Query)
result = db.Exec(d.Query)
if result.Error != nil { if result.Error != nil {
return RunResult[DBResult]{Status: ResultStatusFailed}, result.Error return RunResult[DBResult]{Status: ResultStatusFailed}, result.Error
} }
......
package jobqueue package jobqueue
import ( import (
"context"
"github.com/DATA-DOG/go-sqlmock" "github.com/DATA-DOG/go-sqlmock"
"gorm.io/driver/mysql" "gorm.io/driver/mysql"
"gorm.io/gorm" "gorm.io/gorm"
...@@ -29,12 +30,14 @@ func TestDBRunnable_Run(t *testing.T) { ...@@ -29,12 +30,14 @@ func TestDBRunnable_Run(t *testing.T) {
// Erstellen Sie die zu testende Instanz // Erstellen Sie die zu testende Instanz
runnable := &DBRunnable{ runnable := &DBRunnable{
Type: "mysql", Type: "mysql",
db: gormDB, // Injizierte Mock-DB
Query: "SELECT * FROM table_name", Query: "SELECT * FROM table_name",
} }
ctx := context.Background()
ctx = DBRunnableWithDB(ctx, gormDB)
// Rufen Sie die Run()-Methode auf und überprüfen Sie die Ergebnisse // Rufen Sie die Run()-Methode auf und überprüfen Sie die Ergebnisse
result, err := runnable.Run() result, err := runnable.Run(ctx)
// Überprüfungen hier // Überprüfungen hier
if err != nil { if err != nil {
......
...@@ -2,6 +2,7 @@ package jobqueue ...@@ -2,6 +2,7 @@ package jobqueue
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
...@@ -49,11 +50,11 @@ type HTTPRunnable struct { ...@@ -49,11 +50,11 @@ type HTTPRunnable struct {
Body string Body string
} }
func (h *HTTPRunnable) Run() (RunResult[HTTPResult], error) { func (h *HTTPRunnable) Run(ctx context.Context) (RunResult[HTTPResult], error) {
client := &http.Client{} client := &http.Client{}
reqBody := bytes.NewBufferString(h.Body) reqBody := bytes.NewBufferString(h.Body)
req, err := http.NewRequest(h.Method, h.URL, reqBody) req, err := http.NewRequestWithContext(ctx, h.Method, h.URL, reqBody)
if err != nil { if err != nil {
return RunResult[HTTPResult]{Status: ResultStatusFailed}, err return RunResult[HTTPResult]{Status: ResultStatusFailed}, err
} }
......
package jobqueue package jobqueue
import ( import (
"context"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
...@@ -23,7 +24,8 @@ func TestHTTPRunnable_Run(t *testing.T) { ...@@ -23,7 +24,8 @@ func TestHTTPRunnable_Run(t *testing.T) {
Body: "", Body: "",
} }
result, err := httpRunnable.Run() ctx := context.Background()
result, err := httpRunnable.Run(ctx)
// Assertions // Assertions
assert.NoError(t, err) assert.NoError(t, err)
......
package jobqueue package jobqueue
import ( import (
"context"
"fmt" "fmt"
"net/smtp" "net/smtp"
) )
...@@ -83,7 +84,7 @@ type MailRunnable struct { ...@@ -83,7 +84,7 @@ type MailRunnable struct {
Headers map[string]string Headers map[string]string
} }
func (m *MailRunnable) Run() (RunResult[MailResult], error) { func (m *MailRunnable) Run(_ context.Context) (RunResult[MailResult], error) {
smtpServer := m.Server + ":" + m.Port smtpServer := m.Server + ":" + m.Port
......
...@@ -170,7 +170,8 @@ func TestMailRunner(t *testing.T) { ...@@ -170,7 +170,8 @@ func TestMailRunner(t *testing.T) {
}, },
} }
result, err := mailRunnable.Run() xtx := context.Background()
result, err := mailRunnable.Run(xtx)
// Assertions // Assertions
assert.NoError(t, err) assert.NoError(t, err)
......
package jobqueue package jobqueue
import ( import (
"context"
"fmt" "fmt"
"github.com/pkg/sftp" "github.com/pkg/sftp"
"golang.org/x/crypto/ssh" "golang.org/x/crypto/ssh"
...@@ -103,7 +104,7 @@ type SFTPRunnable struct { ...@@ -103,7 +104,7 @@ type SFTPRunnable struct {
TransferDirection Direction TransferDirection Direction
} }
func (s *SFTPRunnable) Run() (RunResult[SFTPResult], error) { func (s *SFTPRunnable) Run(_ context.Context) (RunResult[SFTPResult], error) {
var authMethod ssh.AuthMethod var authMethod ssh.AuthMethod
......
...@@ -177,7 +177,8 @@ func TestSFTPCRunnerLocalToRemote(t *testing.T) { ...@@ -177,7 +177,8 @@ func TestSFTPCRunnerLocalToRemote(t *testing.T) {
TransferDirection: LocalToRemote, TransferDirection: LocalToRemote,
} }
result, err := sftpRunnable.Run() ctx = context.Background()
result, err := sftpRunnable.Run(ctx)
// Assertions // Assertions
assert.NoError(t, err) assert.NoError(t, err)
...@@ -280,7 +281,8 @@ func TestSFTPCRunnerRemoteToLocal(t *testing.T) { ...@@ -280,7 +281,8 @@ func TestSFTPCRunnerRemoteToLocal(t *testing.T) {
} }
// Methode aufrufen // Methode aufrufen
result, err := sftpRunnable.Run() ctx = context.Background()
result, err := sftpRunnable.Run(ctx)
// Assertions // Assertions
assert.NoError(t, err) assert.NoError(t, err)
......
package jobqueue package jobqueue
import ( import (
"context"
"fmt" "fmt"
"os" "os"
"os/exec" "os/exec"
...@@ -38,7 +39,7 @@ type ShellRunnable struct { ...@@ -38,7 +39,7 @@ type ShellRunnable struct {
Script string Script string
} }
func (s *ShellRunnable) Run() (RunResult[ShellResult], error) { func (s *ShellRunnable) Run(ctx context.Context) (RunResult[ShellResult], error) {
scriptPath := s.ScriptPath scriptPath := s.ScriptPath
...@@ -75,7 +76,7 @@ func (s *ShellRunnable) Run() (RunResult[ShellResult], error) { ...@@ -75,7 +76,7 @@ func (s *ShellRunnable) Run() (RunResult[ShellResult], error) {
} }
// #nosec // #nosec
cmd := exec.Command("sh", scriptPath) cmd := exec.CommandContext(ctx, "sh", scriptPath)
output, err := cmd.Output() output, err := cmd.Output()
var stderr []byte var stderr []byte
......
package jobqueue package jobqueue
import ( import (
"context"
"os" "os"
"path" "path"
"testing" "testing"
...@@ -27,7 +28,8 @@ func TestShellRunnable_Run(t *testing.T) { ...@@ -27,7 +28,8 @@ func TestShellRunnable_Run(t *testing.T) {
shellRunnable := ShellRunnable{ScriptPath: fp.Name()} shellRunnable := ShellRunnable{ScriptPath: fp.Name()}
// Run-Methode aufrufen // Run-Methode aufrufen
result, err := shellRunnable.Run() ctx := context.Background()
result, err := shellRunnable.Run(ctx)
// Überprüfungen // Überprüfungen
if err != nil { if err != nil {
......
package jobqueue package jobqueue
import (
"context"
)
type ResultStatus int type ResultStatus int
const ( const (
...@@ -21,7 +25,7 @@ func (r RunResult[T]) GetStatus() ResultStatus { ...@@ -21,7 +25,7 @@ func (r RunResult[T]) GetStatus() ResultStatus {
} }
type Runnable[T any] interface { type Runnable[T any] interface {
Run() (RunResult[T], error) Run(ctx context.Context) (RunResult[T], error)
GetType() string GetType() string
GetPersistence() RunnableImport GetPersistence() RunnableImport
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment