Skip to content
Snippets Groups Projects
Verified Commit addb461c authored by Volker Schukai's avatar Volker Schukai :alien:
Browse files

feat: job serialize and unserialize #1

parent ae10423e
No related branches found
No related tags found
No related merge requests found
......@@ -64,7 +64,3 @@ deploy:
paths:
- /nix/store
artifacts:
paths:
- dist
......@@ -363,10 +363,6 @@ deploy:
paths:
- /nix/store
artifacts:
paths:
- dist
EOF
......
......@@ -29,4 +29,5 @@ var (
ErrUnsupportedFileOption = fmt.Errorf("unsupported file option")
ErrUnsupportedCredentialType = fmt.Errorf("unsupported credential type")
ErrUnsupportedTransferDirection = fmt.Errorf("unsupported transfer direction")
ErrInvalidData = fmt.Errorf("invalid data")
)
......@@ -60,62 +60,87 @@ func ReadJsonFile(filePath string) ([]JobImport, error) {
return jobs, nil
}
func CreateGenericJobFromImport[T any](jobImport JobImport, runner Runnable[T]) GenericJob {
return &Job[T]{
id: JobID(jobImport.ID),
priority: Priority(jobImport.Priority),
timeout: jobImport.Timeout,
maxRetries: jobImport.MaxRetries,
RetryDelay: jobImport.RetryDelay,
runner: runner,
}
}
func CreateJobAndSchedulerFromImport(jobImport JobImport) (GenericJob, Scheduler, error) {
var job GenericJob
switch jobImport.Runnable.Type {
case "Shell":
case "Dummy":
runner := &ShellRunnable{
ScriptPath: jobImport.Runnable.Data["ScriptPath"].(string),
runner, err := NewDummyRunnableFromMap(jobImport.Runnable.Data)
if err != nil {
return nil, nil, err
}
job = GenericJob(&Job[ShellResult]{
id: JobID(jobImport.ID),
priority: Priority(jobImport.Priority),
timout: jobImport.Timeout,
maxRetries: jobImport.MaxRetries,
RetryDelay: jobImport.RetryDelay,
runner: runner,
})
job = CreateGenericJobFromImport[DummyResult](jobImport, runner)
case "Counter":
runner := &CounterRunnable{
Count: jobImport.Runnable.Data["Count"].(int),
runner, err := NewCounterRunnableFromMap(jobImport.Runnable.Data)
if err != nil {
return nil, nil, err
}
job = GenericJob(&Job[CounterResult]{
id: JobID(jobImport.ID),
priority: Priority(jobImport.Priority),
timout: jobImport.Timeout,
maxRetries: jobImport.MaxRetries,
RetryDelay: jobImport.RetryDelay,
runner: runner,
})
case "HTTP":
runner := &HTTPRunnable{
URL: jobImport.Runnable.Data["URL"].(string),
job = CreateGenericJobFromImport[CounterResult](jobImport, runner)
case "FileOperation":
runner, err := NewFileOperationRunnableFromMap(jobImport.Runnable.Data)
if err != nil {
return nil, nil, err
}
job = GenericJob(&Job[HTTPResult]{id: JobID(jobImport.ID),
priority: Priority(jobImport.Priority),
timout: jobImport.Timeout,
maxRetries: jobImport.MaxRetries,
RetryDelay: jobImport.RetryDelay,
runner: runner,
})
job = CreateGenericJobFromImport[FileOperationResult](jobImport, runner)
case "DB":
runner := &DBRunnable{
Query: jobImport.Runnable.Data["Query"].(string),
runner, err := NewDBRunnableFromMap(jobImport.Runnable.Data)
if err != nil {
return nil, nil, err
}
job = GenericJob(&Job[DBResult]{id: JobID(jobImport.ID),
priority: Priority(jobImport.Priority),
timout: jobImport.Timeout,
maxRetries: jobImport.MaxRetries,
RetryDelay: jobImport.RetryDelay,
runner: runner,
})
job = CreateGenericJobFromImport[DBResult](jobImport, runner)
case "HTTP":
runner, err := NewHTTPRunnableFromMap(jobImport.Runnable.Data)
if err != nil {
return nil, nil, err
}
job = CreateGenericJobFromImport[HTTPResult](jobImport, runner)
case "Mail":
runner, err := NewMailRunnableFromMap(jobImport.Runnable.Data)
if err != nil {
return nil, nil, err
}
job = CreateGenericJobFromImport[MailResult](jobImport, runner)
case "SFTP":
runner, err := NewSFTPRunnableFromMap(jobImport.Runnable.Data)
if err != nil {
return nil, nil, err
}
job = CreateGenericJobFromImport[SFTPResult](jobImport, runner)
case "Shell":
runner, err := NewShellRunnableFromMap(jobImport.Runnable.Data)
if err != nil {
return nil, nil, err
}
job = CreateGenericJobFromImport[ShellResult](jobImport, runner)
default:
return nil, nil, ErrUnknownRunnableType
......
package jobqueue
type CompletedJobHandler interface {
HandleCompletedJob(job GenericJob) error
}
type DatabaseArchiver struct {
// ...
}
func (d *DatabaseArchiver) HandleCompletedJob(job GenericJob) error {
return nil
}
type FileLogger struct {
}
func (f *FileLogger) HandleCompletedJob(job GenericJob) error {
return nil
}
type MetricsPublisher struct {
}
func (m *MetricsPublisher) HandleCompletedJob(job GenericJob) error {
return nil
}
......@@ -37,13 +37,15 @@ type GenericJob interface {
GetRetryDelay() time.Duration
GetTimeout() time.Duration
Archive() error
}
type Job[T any] struct {
id JobID
priority Priority
timout time.Duration
timeout time.Duration
maxRetries uint
RetryDelay time.Duration
......@@ -57,6 +59,17 @@ type Job[T any] struct {
logs []JobLog
}
type JobSerializedState struct {
ID JobID `json:"id"`
Priority Priority `json:"priority"`
Timeout time.Duration `json:"timeout" `
MaxRetries uint `json:"maxRetries" `
RetryDelay time.Duration `json:"retryDelay"`
Dependencies []JobID `json:"dependencies" `
Stats JobStats `json:"stats"`
Logs []JobLog `json:"logs"`
}
// NewJob creates a new job with the given id and runner
func NewJob[T any](id JobID, runner Runnable[T]) *Job[T] {
return &Job[T]{
......@@ -66,6 +79,38 @@ func NewJob[T any](id JobID, runner Runnable[T]) *Job[T] {
}
}
func (j *Job[T]) SerializeState() JobSerializedState {
j.mu.Lock()
defer j.mu.Unlock()
return JobSerializedState{
ID: j.id,
Priority: j.priority,
Timeout: j.timeout,
MaxRetries: j.maxRetries,
RetryDelay: j.RetryDelay,
Dependencies: j.dependencies,
Stats: j.stats,
Logs: j.logs,
}
}
func (j *Job[T]) UnserializeState(serializedState JobSerializedState) {
j.mu.Lock()
defer j.mu.Unlock()
j.id = serializedState.ID
j.priority = serializedState.Priority
j.timeout = serializedState.Timeout
j.maxRetries = serializedState.MaxRetries
j.RetryDelay = serializedState.RetryDelay
j.dependencies = serializedState.Dependencies
j.stats = serializedState.Stats
j.logs = serializedState.Logs
}
func (j *Job[T]) Archive() error {
return nil
}
// Execute executes the job
func (j *Job[T]) Execute(ctx context.Context) (RunGenericResult, error) {
startTime := time.Now()
......@@ -150,7 +195,7 @@ func (j *Job[T]) SetTimeout(timeout time.Duration) *Job[T] {
j.mu.Lock()
defer j.mu.Unlock()
j.timout = timeout
j.timeout = timeout
return j
}
......@@ -158,7 +203,7 @@ func (j *Job[T]) GetTimeout() time.Duration {
j.mu.Lock()
defer j.mu.Unlock()
return j.timout
return j.timeout
}
// SetMaxRetries sets the max retries of the job
......
......@@ -142,3 +142,51 @@ echo "Hello World"
assert.Equal(t, 2, stats.SuccessCount)
assert.Equal(t, 0, stats.ErrorCount)
}
func TestSerializeState(t *testing.T) {
// Initialize job
job := NewJob[DummyResult]("testJob", &DummyRunnable{})
job.SetPriority(PriorityHigh)
job.SetTimeout(5 * time.Minute)
job.SetMaxRetries(3)
job.SetRetryDelay(1 * time.Minute)
job.SetDependencies([]JobID{"dep1", "dep2"})
// Serialize state
serializedState := job.SerializeState()
// Verify serialized state
assert.Equal(t, JobID("testJob"), serializedState.ID)
assert.Equal(t, PriorityHigh, serializedState.Priority)
assert.Equal(t, 5*time.Minute, serializedState.Timeout)
assert.Equal(t, uint(3), serializedState.MaxRetries)
assert.Equal(t, 1*time.Minute, serializedState.RetryDelay)
assert.ElementsMatch(t, []JobID{"dep1", "dep2"}, serializedState.Dependencies)
}
func TestUnserializeState(t *testing.T) {
// Initialize serialized state
serializedState := JobSerializedState{
ID: "testJob",
Priority: PriorityHigh,
Timeout: 5 * time.Minute,
MaxRetries: 3,
RetryDelay: 1 * time.Minute,
Dependencies: []JobID{"dep1", "dep2"},
}
// Initialize job
job := NewJob[DummyResult]("initialJob", &DummyRunnable{})
// Unserialize state
job.UnserializeState(serializedState)
// Verify job properties
assert.Equal(t, JobID("testJob"), job.GetID())
assert.Equal(t, PriorityHigh, job.GetPriority())
assert.Equal(t, 5*time.Minute, job.GetTimeout())
assert.Equal(t, uint(3), job.GetMaxRetries())
assert.Equal(t, 1*time.Minute, job.GetRetryDelay())
assert.ElementsMatch(t, []JobID{"dep1", "dep2"}, job.GetDependencies())
}
......@@ -22,6 +22,8 @@ type Manager struct {
jobEventCh chan interface{}
stateManager StateManager
mu sync.Mutex
}
......@@ -134,6 +136,12 @@ func (m *Manager) Start() error {
return ErrManagerAlreadyRunning
}
if m.stateManager != nil {
if err := m.stateManager.LoadState(); err != nil {
return err
}
}
if len(m.workerMap) == 0 {
return ErrNoWorkers
}
......@@ -206,6 +214,12 @@ func (m *Manager) Stop() error {
wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
}
if m.stateManager != nil {
if err := m.stateManager.LoadState(); err != nil {
return err
}
}
return wrappedErr
}
......
......@@ -106,60 +106,79 @@ func TestManager_AddWorker(t *testing.T) {
}
func TestManager_RemoveWorker(t *testing.T) {
var err error
m := NewManager()
w := &MockWorker{id: "worker1", status: WorkerStatusStopped}
m.AddWorker(w)
err := m.RemoveWorker(w)
err = m.AddWorker(w)
assert.Nil(t, err)
err = m.RemoveWorker(w)
assert.Nil(t, err)
assert.Equal(t, int(ManagerStateStopped), int(m.state))
}
func TestManager_Start(t *testing.T) {
var err error
m := NewManager()
w := &MockWorker{id: "worker1", status: WorkerStatusStopped}
m.AddWorker(w)
err = m.AddWorker(w)
assert.Nil(t, err)
err := m.Start()
err = m.Start()
assert.Nil(t, err)
assert.Nil(t, err)
assert.Equal(t, int(ManagerStateRunning), int(m.state))
}
func TestManager_Stop(t *testing.T) {
var err error
m := NewManager()
w := &MockWorker{id: "worker1", status: WorkerStatusStopped}
m.AddWorker(w)
m.Start()
err = m.AddWorker(w)
assert.Nil(t, err)
err = m.Start()
assert.Nil(t, err)
err := m.Stop()
err = m.Stop()
assert.Nil(t, err)
assert.Equal(t, int(ManagerStateStopped), int(m.state))
}
func TestManager_ScheduleJob(t *testing.T) {
var err error
m := NewManager()
w := &MockWorker{id: "worker1", status: WorkerStatusStopped}
m.AddWorker(w)
m.Start()
err = m.AddWorker(w)
assert.Nil(t, err)
err = m.Start()
assert.Nil(t, err)
job := &MockGenericJob{ID: "job1"}
scheduler := InstantScheduler{}
err := m.ScheduleJob(job, &scheduler)
err = m.ScheduleJob(job, &scheduler)
assert.Nil(t, err)
}
func TestManager_CancelJob(t *testing.T) {
var err error
m := NewManager()
w := &MockWorker{id: "worker1", status: WorkerStatusStopped}
m.AddWorker(w)
m.Start()
err = m.AddWorker(w)
assert.Nil(t, err)
err = m.Start()
assert.Nil(t, err)
job := &MockGenericJob{ID: "job1"}
scheduler := InstantScheduler{}
m.ScheduleJob(job, &scheduler)
err = m.ScheduleJob(job, &scheduler)
assert.Nil(t, err)
err := m.CancelJob("job1")
err = m.CancelJob("job1")
assert.Nil(t, err)
}
......
......@@ -4,6 +4,14 @@ import (
"sync"
)
func NewCounterRunnableFromMap(data map[string]any) (*CounterRunnable, error) {
count, ok := data["Count"].(int)
if !ok {
return nil, ErrInvalidData
}
return &CounterRunnable{Count: count}, nil
}
// CounterResult is a result of a counter
type CounterResult struct {
Count int
......@@ -11,7 +19,7 @@ type CounterResult struct {
// CounterRunnable is a runnable that counts
type CounterRunnable struct {
Count int
Count int `json:"count" yaml:"count"`
mu sync.Mutex
}
......
package jobqueue
func NewDummyRunnableFromMap(data map[string]any) (*DummyRunnable, error) {
return &DummyRunnable{}, nil
}
// DummyResult is a dummy result
type DummyResult struct {
}
......
......@@ -4,6 +4,26 @@ import (
"os"
)
func NewFileOperationRunnableFromMap(data map[string]interface{}) (*FileOperationRunnable, error) {
operation, ok := data["Operation"].(string)
if !ok {
return nil, ErrInvalidData
}
filePath, ok := data["FilePath"].(string)
if !ok {
return nil, ErrInvalidData
}
content, _ := data["Content"].(string) // Optional, so no error check
return &FileOperationRunnable{
Operation: operation,
FilePath: filePath,
Content: content,
}, nil
}
type FileOperationResult struct {
Success bool
Content string // Optional, je nach Operation
......
......@@ -5,6 +5,29 @@ import (
"gorm.io/gorm"
)
func NewDBRunnableFromMap(data map[string]interface{}) (*DBRunnable, error) {
t, ok := data["Type"].(string)
if !ok {
return nil, ErrInvalidData
}
dsn, ok := data["DSN"].(string)
if !ok {
return nil, ErrInvalidData
}
query, ok := data["Query"].(string)
if !ok {
return nil, ErrInvalidData
}
return &DBRunnable{
Type: t,
DSN: dsn,
Query: query,
}, nil
}
// DBResult is a result of a db query
type DBResult struct {
RowsAffected int
......
......@@ -6,6 +6,35 @@ import (
"net/http"
)
func NewHTTPRunnableFromMap(data map[string]interface{}) (*HTTPRunnable, error) {
url, ok := data["URL"].(string)
if !ok {
return nil, ErrInvalidData
}
method, ok := data["Method"].(string)
if !ok {
return nil, ErrInvalidData
}
header, ok := data["Header"].(map[string]string)
if !ok {
return nil, ErrInvalidData
}
body, ok := data["Body"].(string)
if !ok {
return nil, ErrInvalidData
}
return &HTTPRunnable{
URL: url,
Method: method,
Header: header,
Body: body,
}, nil
}
// HTTPResult is a result of a http request
type HTTPResult struct {
StatusCode int
......
......@@ -4,6 +4,65 @@ import (
"net/smtp"
)
func NewMailRunnableFromMap(data map[string]interface{}) (*MailRunnable, error) {
to, ok := data["To"].(string)
if !ok {
return nil, ErrInvalidData
}
from, ok := data["From"].(string)
if !ok {
return nil, ErrInvalidData
}
subject, ok := data["Subject"].(string)
if !ok {
return nil, ErrInvalidData
}
body, ok := data["Body"].(string)
if !ok {
return nil, ErrInvalidData
}
server, ok := data["Server"].(string)
if !ok {
return nil, ErrInvalidData
}
port, ok := data["Port"].(string)
if !ok {
return nil, ErrInvalidData
}
username, ok := data["Username"].(string)
if !ok {
return nil, ErrInvalidData
}
password, ok := data["Password"].(string)
if !ok {
return nil, ErrInvalidData
}
headers, ok := data["Headers"].(map[string]string)
if !ok {
return nil, ErrInvalidData
}
return &MailRunnable{
To: to,
From: from,
Subject: subject,
Body: body,
Server: server,
Port: port,
Username: username,
Password: password,
Headers: headers,
}, nil
}
// MailResult is a result of a email
type MailResult struct {
Sent bool
......
......@@ -8,6 +8,28 @@ import (
"os"
)
func NewSFTPRunnableFromMap(data map[string]interface{}) (*SFTPRunnable, error) {
// Your map to struct conversion logic here
// e.g.,
host, ok := data["Host"].(string)
if !ok {
return nil, fmt.Errorf("invalid Host")
}
//... (other fields)
transferDirection, ok := data["TransferDirection"].(string)
if !ok {
return nil, fmt.Errorf("invalid TransferDirection")
}
return &SFTPRunnable{
Host: host,
//... (other fields)
TransferDirection: Direction(transferDirection),
}, nil
}
// SFTPResult is a result of a sftp
type SFTPResult struct {
FilesCopied []string
......
......@@ -5,6 +5,17 @@ import (
"strings"
)
func NewShellRunnableFromMap(data map[string]interface{}) (*ShellRunnable, error) {
scriptPath, ok := data["ScriptPath"].(string)
if !ok {
return nil, ErrInvalidData
}
return &ShellRunnable{
ScriptPath: scriptPath,
}, nil
}
// ShellResult is a result of a shell script
type ShellResult struct {
Output string
......
state.go 0 → 100644
package jobqueue
import (
"encoding/json"
"os"
)
// State repräsentiert den Zustand, den wir speichern wollen
type State struct {
// Jobs enthält alle Jobs, die wir speichern wollen
Jobs []JobSerializedState `json:"jobs"`
}
type StateManager interface {
LoadState() error
SaveState() error
}
// FileStateManager implementiert StateManager
type FileStateManager struct {
filePath string
state *State
}
// LoadState lädt den Zustand aus der Datei
func (f *FileStateManager) LoadState() error {
file, err := os.Open(f.filePath)
if err != nil {
return err
}
defer file.Close()
decoder := json.NewDecoder(file)
if err := decoder.Decode(&f.state); err != nil {
return err
}
return nil
}
// SaveState speichert den Zustand in der Datei
func (f *FileStateManager) SaveState() error {
file, err := os.Create(f.filePath)
if err != nil {
return err
}
defer file.Close()
encoder := json.NewEncoder(file)
if err := encoder.Encode(f.state); err != nil {
return err
}
return nil
}
package jobqueue
import (
"os"
"path"
"testing"
"github.com/stretchr/testify/assert"
)
func TestFileStateManager(t *testing.T) {
tmpDir := t.TempDir()
tempFile := path.Join(tmpDir, "state.json")
defer os.Remove(tempFile)
manager := &FileStateManager{
filePath: tempFile,
state: &State{
Jobs: []JobSerializedState{
{ID: "1"},
{ID: "2"},
},
},
}
err := manager.SaveState()
assert.NoError(t, err)
manager2 := &FileStateManager{
filePath: tempFile,
state: &State{},
}
err = manager2.LoadState()
assert.NoError(t, err)
assert.Equal(t, manager.state, manager2.state)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment