From 190b3f51e70ca3c97ba192898eac19c5c1fb05a7 Mon Sep 17 00:00:00 2001
From: Volker Schukai <volker.schukai@schukai.com>
Date: Tue, 24 Oct 2023 00:28:32 +0200
Subject: [PATCH] feat: #1

---
 errors.go                 |   4 +
 event-bus.go              |  11 +--
 import.go                 | 156 +++++++++++++++++++++++++++-----------
 import_test.go            |   2 +-
 issue-1_test.go           |  71 +++++++++++++++++
 job.go                    |   8 +-
 manager.go                |  36 +++++++++
 manager_test.go           |   8 ++
 runnable-counter.go       |   5 +-
 runnable-fileoperation.go |  14 ++--
 runnable-gorm.go          |  13 ++--
 runnable-http.go          |  17 +++--
 runnable-mail.go          |  37 ++++-----
 runnable-sftp.go          |  61 ++++++++++++---
 runnable-shell.go         |  56 ++++++++++++--
 scheduler.go              |  11 ++-
 state.go                  |   6 +-
 worker_test.go            |   8 ++
 18 files changed, 407 insertions(+), 117 deletions(-)
 create mode 100644 issue-1_test.go

diff --git a/errors.go b/errors.go
index 8493dd8..c3d3da9 100644
--- a/errors.go
+++ b/errors.go
@@ -30,4 +30,8 @@ var (
 	ErrUnsupportedCredentialType    = fmt.Errorf("unsupported credential type")
 	ErrUnsupportedTransferDirection = fmt.Errorf("unsupported transfer direction")
 	ErrInvalidData                  = fmt.Errorf("invalid data")
+	ErrUnknownFormat                = fmt.Errorf("unknown format")
+	ErrFailedToCreateTempFile       = fmt.Errorf("failed to create temp file")
+	ErrFailedToWriteTempFile        = fmt.Errorf("failed to write temp file")
+	ErrJobAlreadyScheduled          = fmt.Errorf("job already scheduled")
 )
diff --git a/event-bus.go b/event-bus.go
index 23def45..86cc84e 100644
--- a/event-bus.go
+++ b/event-bus.go
@@ -7,11 +7,12 @@ import (
 type EventName string
 
 const (
-	JobAdded   EventName = "JobAdded"
-	JobRemoved EventName = "JobRemoved"
-	ExecuteJob EventName = "ExecuteJob"
-	JobReady   EventName = "JobReady"
-	QueueJob   EventName = "QueueJob"
+	JobAdded    EventName = "JobAdded"
+	JobRemoved  EventName = "JobRemoved"
+	ExecuteJob  EventName = "ExecuteJob"
+	JobReady    EventName = "JobReady"
+	QueueJob    EventName = "QueueJob"
+	JobFinished EventName = "JobFinished"
 	// add more events as needed
 )
 
diff --git a/import.go b/import.go
index 987ec12..18ed768 100644
--- a/import.go
+++ b/import.go
@@ -1,8 +1,12 @@
 package jobqueue
 
 import (
+	"encoding/json"
+	"fmt"
 	"gopkg.in/yaml.v3"
+	"io"
 	"os"
+	"strings"
 	"time"
 )
 
@@ -30,34 +34,40 @@ type SchedulerImport struct {
 	Event    string        `yaml:"event,omitempty" json:"event,omitempty"`
 }
 
-func ReadYAMLFile(filePath string) ([]JobImport, error) {
-	data, err := os.ReadFile(filePath)
-	if err != nil {
+func ReadYAML(r io.Reader) ([]JobImport, error) {
+	var jobs []JobImport
+	decoder := yaml.NewDecoder(r)
+	if err := decoder.Decode(&jobs); err != nil {
 		return nil, err
 	}
+	return jobs, nil
+}
 
+func ReadJSON(r io.Reader) ([]JobImport, error) {
 	var jobs []JobImport
-	err = yaml.Unmarshal(data, &jobs)
-	if err != nil {
+	decoder := json.NewDecoder(r)
+	if err := decoder.Decode(&jobs); err != nil {
 		return nil, err
 	}
-
 	return jobs, nil
 }
 
-func ReadJsonFile(filePath string) ([]JobImport, error) {
-	data, err := os.ReadFile(filePath)
+func ReadYAMLFile(filePath string) ([]JobImport, error) {
+	file, err := os.Open(filePath)
 	if err != nil {
 		return nil, err
 	}
+	defer file.Close()
+	return ReadYAML(file)
+}
 
-	var jobs []JobImport
-	err = yaml.Unmarshal(data, &jobs)
+func ReadJsonFile(filePath string) ([]JobImport, error) {
+	file, err := os.Open(filePath)
 	if err != nil {
 		return nil, err
 	}
-
-	return jobs, nil
+	defer file.Close()
+	return ReadJSON(file)
 }
 
 func CreateGenericJobFromImport[T any](jobImport JobImport, runner Runnable[T]) GenericJob {
@@ -71,71 +81,78 @@ func CreateGenericJobFromImport[T any](jobImport JobImport, runner Runnable[T])
 	}
 }
 
-func CreateJobAndSchedulerFromImport(jobImport JobImport) (GenericJob, Scheduler, error) {
+func CreateJobAndSchedulerFromImport(jobImport JobImport, manager *Manager) (GenericJob, Scheduler, error) {
 
 	var job GenericJob
 
-	switch jobImport.Runnable.Type {
-	case "Dummy":
+	rType := strings.ToLower(jobImport.Runnable.Type)
+
+	runnableData := make(map[string]interface{})
+	for k, v := range jobImport.Runnable.Data {
+		runnableData[strings.ToLower(k)] = v
+	}
+
+	switch rType {
+	case "dummy":
 
-		runner, err := NewDummyRunnableFromMap(jobImport.Runnable.Data)
+		runner, err := NewDummyRunnableFromMap(runnableData)
 		if err != nil {
 			return nil, nil, err
 		}
 
 		job = CreateGenericJobFromImport[DummyResult](jobImport, runner)
 
-	case "Counter":
+	case "counter":
 
-		runner, err := NewCounterRunnableFromMap(jobImport.Runnable.Data)
+		runner, err := NewCounterRunnableFromMap(runnableData)
 		if err != nil {
 			return nil, nil, err
 		}
 
 		job = CreateGenericJobFromImport[CounterResult](jobImport, runner)
 
-	case "FileOperation":
-		runner, err := NewFileOperationRunnableFromMap(jobImport.Runnable.Data)
+	case "fileoperation":
+		runner, err := NewFileOperationRunnableFromMap(runnableData)
 		if err != nil {
 			return nil, nil, err
 		}
 
 		job = CreateGenericJobFromImport[FileOperationResult](jobImport, runner)
 
-	case "DB":
-		runner, err := NewDBRunnableFromMap(jobImport.Runnable.Data)
+	case "db":
+		runner, err := NewDBRunnableFromMap(runnableData)
 		if err != nil {
 			return nil, nil, err
 		}
 
 		job = CreateGenericJobFromImport[DBResult](jobImport, runner)
 
-	case "HTTP":
-		runner, err := NewHTTPRunnableFromMap(jobImport.Runnable.Data)
+	case "http":
+		runner, err := NewHTTPRunnableFromMap(runnableData)
 		if err != nil {
 			return nil, nil, err
 		}
 
 		job = CreateGenericJobFromImport[HTTPResult](jobImport, runner)
 
-	case "Mail":
-		runner, err := NewMailRunnableFromMap(jobImport.Runnable.Data)
+	case "mail":
+		runner, err := NewMailRunnableFromMap(runnableData)
 		if err != nil {
 			return nil, nil, err
 		}
 
 		job = CreateGenericJobFromImport[MailResult](jobImport, runner)
 
-	case "SFTP":
-		runner, err := NewSFTPRunnableFromMap(jobImport.Runnable.Data)
+	case "sftp":
+		runner, err := NewSFTPRunnableFromMap(runnableData)
 		if err != nil {
 			return nil, nil, err
 		}
 
 		job = CreateGenericJobFromImport[SFTPResult](jobImport, runner)
 
-	case "Shell":
-		runner, err := NewShellRunnableFromMap(jobImport.Runnable.Data)
+	case "shell":
+		runner, err := NewShellRunnableFromMap(runnableData)
 		if err != nil {
 			return nil, nil, err
 		}
@@ -143,21 +160,37 @@ func CreateJobAndSchedulerFromImport(jobImport JobImport) (GenericJob, Scheduler
 		job = CreateGenericJobFromImport[ShellResult](jobImport, runner)
 
 	default:
-		return nil, nil, ErrUnknownRunnableType
+		return nil, nil,
+			fmt.Errorf("%w: %s, available types: dummy, counter, fileoperation, db, http, mail, sftp, shell",
+				ErrUnknownRunnableType, rType)
+
+	}
+
+	sType := strings.ToLower(jobImport.Scheduler.Type)
+
+	scheduleData := make(map[string]interface{})
+	for k, v := range jobImport.Runnable.Data {
+		scheduleData[strings.ToLower(k)] = v
 	}
 
 	var scheduler Scheduler
-	switch jobImport.Scheduler.Type {
-	case "Interval":
+	switch sType {
+	case "interval":
 		scheduler = &IntervalScheduler{Interval: jobImport.Scheduler.Interval}
 
-	case "Cron":
-		scheduler = &CronScheduler{Spec: jobImport.Scheduler.Spec}
+	case "cron":
+		scheduler = &CronScheduler{
+			Spec: jobImport.Scheduler.Spec,
+		}
+
+		if manager != nil {
+			scheduler.(*CronScheduler).cron = manager.GetCronInstance()
+		}
 
-	case "Delay":
+	case "delay":
 		scheduler = &DelayScheduler{Delay: jobImport.Scheduler.Delay}
 
-	case "Event":
+	case "event":
 		scheduler = &EventScheduler{Event: EventName(jobImport.Scheduler.Event)}
 
 	default:
@@ -167,19 +200,54 @@ func CreateJobAndSchedulerFromImport(jobImport JobImport) (GenericJob, Scheduler
 	return job, scheduler, nil
 }
 
-func LoadJobsAndSchedule(filePath string, manager *Manager) error {
+// LoadJobsAndScheduleFromFile read jobs from a file and schedule them. (json/yaml)
+func LoadJobsAndScheduleFromFile(filePath string, manager *Manager) error {
 
 	var err error
 	var imp []JobImport
 
-	switch filePath[len(filePath)-4:] {
-	case "yaml":
+	if filePath[len(filePath)-4:] == "json" {
 		imp, err = ReadJsonFile(filePath)
-		break
-	case "json":
+	} else if filePath[len(filePath)-4:] == "yaml" {
 		imp, err = ReadYAMLFile(filePath)
-		break
+	} else {
+		return ErrUnknownFormat
+	}
+
+	if err != nil {
+		return err
+	}
+
+	for _, imp := range imp {
+		job, scheduler, err := CreateJobAndSchedulerFromImport(imp, manager)
+		if err != nil {
+			return err
+		}
+
+		err = manager.ScheduleJob(job, scheduler)
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+
+}
+
+// ImportJobsAndSchedule lädt Jobs aus einem Reader und plant sie ein.
+func ImportJobsAndSchedule(reader io.Reader, format string, manager *Manager) error {
+	var err error
+	var imp []JobImport
+
+	// format to lowercase
+	format = strings.ToLower(format)
 
+	if format == "json" {
+		imp, err = ReadJSON(reader)
+	} else if format == "yaml" {
+		imp, err = ReadYAML(reader)
+	} else {
+		return fmt.Errorf("%w: %s", ErrUnknownFormat, format)
 	}
 
 	if err != nil {
@@ -187,7 +255,7 @@ func LoadJobsAndSchedule(filePath string, manager *Manager) error {
 	}
 
 	for _, imp := range imp {
-		job, scheduler, err := CreateJobAndSchedulerFromImport(imp)
+		job, scheduler, err := CreateJobAndSchedulerFromImport(imp, manager)
 		if err != nil {
 			return err
 		}
diff --git a/import_test.go b/import_test.go
index 056ccef..8687627 100644
--- a/import_test.go
+++ b/import_test.go
@@ -59,7 +59,7 @@ func TestCreateJobAndSchedulerFromInput(t *testing.T) {
 
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			gotJob, gotSchedule, err := CreateJobAndSchedulerFromImport(tt.input)
+			gotJob, gotSchedule, err := CreateJobAndSchedulerFromImport(tt.input, nil)
 
 			if (err != nil) != tt.wantErr {
 				t.Errorf("CreateJobAndSchedulerFromImport() error = %v, wantErr %v", err, tt.wantErr)
diff --git a/issue-1_test.go b/issue-1_test.go
new file mode 100644
index 0000000..41c0743
--- /dev/null
+++ b/issue-1_test.go
@@ -0,0 +1,71 @@
+package jobqueue
+
+import (
+	"github.com/robfig/cron/v3"
+	"github.com/stretchr/testify/assert"
+	"strings"
+	"testing"
+	"time"
+)
+
+func TestRoundTrip(t *testing.T) {
+
+	// type JobImport struct {
+	//	ID           string          `yaml:"id" json:"id"`
+	//	Priority     int             `yaml:"priority" json:"priority"`
+	//	Timeout      time.Duration   `yaml:"timeout" json:"timeout"`
+	//	MaxRetries   uint            `yaml:"maxRetries" json:"maxRetries"`
+	//	RetryDelay   time.Duration   `yaml:"retryDelay" json:"retryDelay"`
+	//	Dependencies []string        `yaml:"dependencies" json:"dependencies,omitempty"`
+	//	Runnable     RunnableImport  `yaml:"runnable" json:"runnable"`
+	//	Scheduler    SchedulerImport `yaml:"scheduler" json:"scheduler,omitempty"`
+	//}
+	//
+	//type RunnableImport struct {
+	//	Type string         `yaml:"type" json:"type"`
+	//	Data map[string]any `yaml:"data,omitempty" json:"data,omitempty"`
+	//}
+	//
+	//type SchedulerImport struct {
+	//	Type     string        `yaml:"type" json:"type"`
+	//	Interval time.Duration `yaml:"interval,omitempty" json:"interval,omitempty"`
+	//	Spec     string        `yaml:"spec,omitempty" json:"spec,omitempty"`
+	//	Delay    time.Duration `yaml:"delay,omitempty" json:"delay,omitempty"`
+	//	Event    string        `yaml:"event,omitempty" json:"event,omitempty"`
+	//}
+
+	// define test data with jobs in yaml format
+	testData := []byte(`
+- id: job1
+  priority: 1
+  timeout: 1s
+  maxRetries: 3
+  retryDelay: 1s
+  runnable:
+    type: shell
+    data:
+      script: echo "Hello World $(date)" >> /tmp/job1.log
+  scheduler:
+    type: cron
+    spec: "* * * * *"
+`)
+
+	var err error
+
+	manager := NewManager()
+	manager.SetCronInstance(cron.New())
+	worker := NewLocalWorker(1)
+	err = manager.AddWorker(worker)
+	assert.Nil(t, err)
+
+	err = manager.Start()
+	assert.Nil(t, err)
+
+	reader := strings.NewReader(string(testData))
+
+	err = ImportJobsAndSchedule(reader, "yaml", manager)
+	assert.Nil(t, err)
+
+	time.Sleep(10 * time.Minute)
+
+}
diff --git a/job.go b/job.go
index 86f6def..dd74772 100644
--- a/job.go
+++ b/job.go
@@ -38,7 +38,9 @@ type GenericJob interface {
 
 	GetTimeout() time.Duration
 
-	Archive() error
+	SerializeState() JobSerializedState
+
+	UnserializeState(serializedState JobSerializedState)
 }
 
 type Job[T any] struct {
@@ -107,10 +109,6 @@ func (j *Job[T]) UnserializeState(serializedState JobSerializedState) {
 	j.logs = serializedState.Logs
 }
 
-func (j *Job[T]) Archive() error {
-	return nil
-}
-
 // Execute executes the job
 func (j *Job[T]) Execute(ctx context.Context) (RunGenericResult, error) {
 	startTime := time.Now()
diff --git a/manager.go b/manager.go
index 5e7fee8..3edc059 100644
--- a/manager.go
+++ b/manager.go
@@ -2,6 +2,7 @@ package jobqueue
 
 import (
 	"fmt"
+	"github.com/robfig/cron/v3"
 	"sync"
 )
 
@@ -24,6 +25,8 @@ type Manager struct {
 
 	stateManager StateManager
 
+	cronInstance *cron.Cron
+
 	mu sync.Mutex
 }
 
@@ -47,6 +50,18 @@ func (m *Manager) GetEventBus() *EventBus {
 	return m.eventBus
 }
 
+func (m *Manager) SetCronInstance(cronInstance *cron.Cron) {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+	m.cronInstance = cronInstance
+}
+
+func (m *Manager) GetCronInstance() *cron.Cron {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+	return m.cronInstance
+}
+
 func (m *Manager) checkAndSetRunningState() error {
 
 	m.state = ManagerStateStopped
@@ -178,6 +193,10 @@ func (m *Manager) Start() error {
 		wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
 	}
 
+	if m.cronInstance != nil {
+		m.cronInstance.Start()
+	}
+
 	return wrappedErr
 
 }
@@ -193,6 +212,7 @@ func (m *Manager) Stop() error {
 
 	m.eventBus.Unsubscribe(QueueJob, m.jobEventCh)
 	m.eventBus.Unsubscribe(JobReady, m.jobEventCh)
+	
 	close(m.jobEventCh)
 
 	var wrappedErr error
@@ -220,6 +240,10 @@ func (m *Manager) Stop() error {
 		}
 	}
 
+	if m.cronInstance != nil {
+		m.cronInstance.Stop()
+	}
+
 	return wrappedErr
 }
 
@@ -250,6 +274,18 @@ func (m *Manager) handleJobEvents() {
 						}
 					}
 				}
+
+			case JobFinished:
+				job := event.Data.(GenericJob)
+
+				// check if job should archived
+				// is it an single run job?
+				schd := m.scheduled[job.GetID()]
+				if schd == nil {
+					job.SerializeState()
+
+				}
+
 			}
 		}
 	}
diff --git a/manager_test.go b/manager_test.go
index e991a2d..0fada7d 100644
--- a/manager_test.go
+++ b/manager_test.go
@@ -64,6 +64,14 @@ func (m *MockGenericJob) GetTimeout() time.Duration {
 	return 0
 }
 
+func (m *MockGenericJob) SerializeState() JobSerializedState {
+	return JobSerializedState{}
+}
+
+func (m *MockGenericJob) UnserializeState(serializedState JobSerializedState) {
+	return
+}
+
 func (m *MockGenericJob) GetID() JobID {
 	return m.ID
 }
diff --git a/runnable-counter.go b/runnable-counter.go
index d019ce0..593a84f 100644
--- a/runnable-counter.go
+++ b/runnable-counter.go
@@ -1,13 +1,14 @@
 package jobqueue
 
 import (
+	"fmt"
 	"sync"
 )
 
 func NewCounterRunnableFromMap(data map[string]any) (*CounterRunnable, error) {
-	count, ok := data["Count"].(int)
+	count, ok := data["count"].(int)
 	if !ok {
-		return nil, ErrInvalidData
+		return nil, fmt.Errorf("%w: Invalid count: %v", ErrInvalidData, data["count"])
 	}
 	return &CounterRunnable{Count: count}, nil
 }
diff --git a/runnable-fileoperation.go b/runnable-fileoperation.go
index 020bf73..e79fd76 100644
--- a/runnable-fileoperation.go
+++ b/runnable-fileoperation.go
@@ -1,21 +1,25 @@
 package jobqueue
 
 import (
+	"fmt"
 	"os"
 )
 
 func NewFileOperationRunnableFromMap(data map[string]interface{}) (*FileOperationRunnable, error) {
-	operation, ok := data["Operation"].(string)
+	operation, ok := data["operation"].(string)
 	if !ok {
-		return nil, ErrInvalidData
+		return nil, fmt.Errorf("%w: Invalid Operation: %v", ErrInvalidData, data["operation"])
 	}
 
-	filePath, ok := data["FilePath"].(string)
+	filePath, ok := data["filepath"].(string)
 	if !ok {
-		return nil, ErrInvalidData
+		return nil, fmt.Errorf("%w: Invalid FilePath: %v", ErrInvalidData, data["filepath"])
 	}
 
-	content, _ := data["Content"].(string) // Optional, so no error check
+	content, ok := data["content"].(string) // Optional, so no error check
+	if !ok {
+		return nil, fmt.Errorf("%w: Invalid Content: %v", ErrInvalidData, data["content"])
+	}
 
 	return &FileOperationRunnable{
 		Operation: operation,
diff --git a/runnable-gorm.go b/runnable-gorm.go
index 0c66d2c..65b956b 100644
--- a/runnable-gorm.go
+++ b/runnable-gorm.go
@@ -1,24 +1,25 @@
 package jobqueue
 
 import (
+	"fmt"
 	"gorm.io/driver/mysql"
 	"gorm.io/gorm"
 )
 
 func NewDBRunnableFromMap(data map[string]interface{}) (*DBRunnable, error) {
-	t, ok := data["Type"].(string)
+	t, ok := data["type"].(string)
 	if !ok {
-		return nil, ErrInvalidData
+		return nil, fmt.Errorf("%w: Invalid Type: %v", ErrInvalidData, data["type"])
 	}
 
-	dsn, ok := data["DSN"].(string)
+	dsn, ok := data["dsn"].(string)
 	if !ok {
-		return nil, ErrInvalidData
+		return nil, fmt.Errorf("%w: Invalid DSN: %v", ErrInvalidData, data["dsn"])
 	}
 
-	query, ok := data["Query"].(string)
+	query, ok := data["query"].(string)
 	if !ok {
-		return nil, ErrInvalidData
+		return nil, fmt.Errorf("%w: Invalid Query: %v", ErrInvalidData, data["query"])
 	}
 
 	return &DBRunnable{
diff --git a/runnable-http.go b/runnable-http.go
index 07412a7..67ae4cf 100644
--- a/runnable-http.go
+++ b/runnable-http.go
@@ -2,29 +2,30 @@ package jobqueue
 
 import (
 	"bytes"
+	"fmt"
 	"io"
 	"net/http"
 )
 
 func NewHTTPRunnableFromMap(data map[string]interface{}) (*HTTPRunnable, error) {
-	url, ok := data["URL"].(string)
+	url, ok := data["url"].(string)
 	if !ok {
-		return nil, ErrInvalidData
+		return nil, fmt.Errorf("%w: Invalid URL: %v", ErrInvalidData, data["url"])
 	}
 
-	method, ok := data["Method"].(string)
+	method, ok := data["method"].(string)
 	if !ok {
-		return nil, ErrInvalidData
+		return nil, fmt.Errorf("%w: Invalid Method: %v", ErrInvalidData, data["method"])
 	}
 
-	header, ok := data["Header"].(map[string]string)
+	header, ok := data["header"].(map[string]string)
 	if !ok {
-		return nil, ErrInvalidData
+		return nil, fmt.Errorf("%w: Invalid Header: %v", ErrInvalidData, data["header"])
 	}
 
-	body, ok := data["Body"].(string)
+	body, ok := data["body"].(string)
 	if !ok {
-		return nil, ErrInvalidData
+		return nil, fmt.Errorf("%w: Invalid Body: %v", ErrInvalidData, data["body"])
 	}
 
 	return &HTTPRunnable{
diff --git a/runnable-mail.go b/runnable-mail.go
index a6b7f5e..0a835fd 100644
--- a/runnable-mail.go
+++ b/runnable-mail.go
@@ -1,53 +1,54 @@
 package jobqueue
 
 import (
+	"fmt"
 	"net/smtp"
 )
 
 func NewMailRunnableFromMap(data map[string]interface{}) (*MailRunnable, error) {
-	to, ok := data["To"].(string)
+	to, ok := data["to"].(string)
 	if !ok {
-		return nil, ErrInvalidData
+		return nil, fmt.Errorf("%w: Invalid To: %v", ErrInvalidData, data["to"])
 	}
 
-	from, ok := data["From"].(string)
+	from, ok := data["from"].(string)
 	if !ok {
-		return nil, ErrInvalidData
+		return nil, fmt.Errorf("%w: Invalid From: %v", ErrInvalidData, data["from"])
 	}
 
-	subject, ok := data["Subject"].(string)
+	subject, ok := data["subject"].(string)
 	if !ok {
-		return nil, ErrInvalidData
+		return nil, fmt.Errorf("%w: Invalid Subject: %v", ErrInvalidData, data["subject"])
 	}
 
-	body, ok := data["Body"].(string)
+	body, ok := data["l"].(string)
 	if !ok {
-		return nil, ErrInvalidData
+		return nil, fmt.Errorf("%w: Invalid Body: %v", ErrInvalidData, data["body"])
 	}
 
-	server, ok := data["Server"].(string)
+	server, ok := data["server"].(string)
 	if !ok {
-		return nil, ErrInvalidData
+		return nil, fmt.Errorf("%w: Invalid Server: %v", ErrInvalidData, data["server"])
 	}
 
-	port, ok := data["Port"].(string)
+	port, ok := data["port"].(string)
 	if !ok {
-		return nil, ErrInvalidData
+		return nil, fmt.Errorf("%w: Invalid Port: %v", ErrInvalidData, data["port"])
 	}
 
-	username, ok := data["Username"].(string)
+	username, ok := data["username"].(string)
 	if !ok {
-		return nil, ErrInvalidData
+		return nil, fmt.Errorf("%w: Invalid Username: %v", ErrInvalidData, data["username"])
 	}
 
-	password, ok := data["Password"].(string)
+	password, ok := data["password"].(string)
 	if !ok {
-		return nil, ErrInvalidData
+		return nil, fmt.Errorf("%w: Invalid Password: %v", ErrInvalidData, data["password"])
 	}
 
-	headers, ok := data["Headers"].(map[string]string)
+	headers, ok := data["headers"].(map[string]string)
 	if !ok {
-		return nil, ErrInvalidData
+		return nil, fmt.Errorf("%w: Invalid Headers: %v", ErrInvalidData, data["headers"])
 	}
 
 	return &MailRunnable{
diff --git a/runnable-sftp.go b/runnable-sftp.go
index 1fbdb7c..a221381 100644
--- a/runnable-sftp.go
+++ b/runnable-sftp.go
@@ -9,23 +9,66 @@ import (
 )
 
 func NewSFTPRunnableFromMap(data map[string]interface{}) (*SFTPRunnable, error) {
-	// Your map to struct conversion logic here
-	// e.g.,
+	host, ok := data["host"].(string)
+	if !ok {
+		return nil, fmt.Errorf("%w: Invalid Host: %v", ErrInvalidData, data["host"])
+	}
+
+	port, ok := data["port"].(int)
+	if !ok {
+		return nil, fmt.Errorf("%w: Invalid Port: %v", ErrInvalidData, data["port"])
+	}
+
+	user, ok := data["user"].(string)
+	if !ok {
+		return nil, fmt.Errorf("%w: Invalid User: %v", ErrInvalidData, data["user"])
+	}
+
+	insecure, ok := data["insecure"].(bool)
+	if !ok {
+		return nil, fmt.Errorf("%w: Invalid Insecure: %v", ErrInvalidData, data["insecure"])
+	}
+
+	credential, ok := data["credential"].(string)
+	if !ok {
+		return nil, fmt.Errorf("%w: Invalid Credential: %v", ErrInvalidData, data["credential"])
+	}
+
+	credentialType, ok := data["credentialtype"].(string)
+	if !ok {
+		return nil, fmt.Errorf("%w: Invalid CredentialType: %v", ErrInvalidData, data["credentialtype"])
+	}
+
+	hostKey, ok := data["hostkey"].(string)
+	if !ok {
+		return nil, fmt.Errorf("%w: Invalid HostKey: %v", ErrInvalidData, data["hostkey"])
+	}
+
+	srcDir, ok := data["srcdir"].(string)
+	if !ok {
+		return nil, fmt.Errorf("%w: Invalid SrcDir: %v", ErrInvalidData, data["srcdir"])
+	}
 
-	host, ok := data["Host"].(string)
+	dstDir, ok := data["dstdir"].(string)
 	if !ok {
-		return nil, fmt.Errorf("invalid Host")
+		return nil, fmt.Errorf("%w: Invalid DstDir: %v", ErrInvalidData, data["dstdir"])
 	}
-	//... (other fields)
 
-	transferDirection, ok := data["TransferDirection"].(string)
+	transferDirection, ok := data["transferdirection"].(string)
 	if !ok {
-		return nil, fmt.Errorf("invalid TransferDirection")
+		return nil, fmt.Errorf("%w: Invalid TransferDirection: %v", ErrInvalidData, data["TransferDirection"])
 	}
 
 	return &SFTPRunnable{
-		Host: host,
-		//... (other fields)
+		Host:              host,
+		Port:              port,
+		User:              user,
+		Insecure:          insecure,
+		Credential:        credential,
+		CredentialType:    credentialType,
+		HostKey:           hostKey,
+		SrcDir:            srcDir,
+		DstDir:            dstDir,
 		TransferDirection: Direction(transferDirection),
 	}, nil
 }
diff --git a/runnable-shell.go b/runnable-shell.go
index 5a6da9d..cfa5533 100644
--- a/runnable-shell.go
+++ b/runnable-shell.go
@@ -1,18 +1,28 @@
 package jobqueue
 
 import (
+	"fmt"
+	"os"
 	"os/exec"
 	"strings"
 )
 
 func NewShellRunnableFromMap(data map[string]interface{}) (*ShellRunnable, error) {
-	scriptPath, ok := data["ScriptPath"].(string)
-	if !ok {
-		return nil, ErrInvalidData
+
+	scriptPath, _ := data["scriptpath"].(string)
+	script, _ := data["script"].(string)
+
+	if scriptPath != "" && script != "" {
+		return nil, fmt.Errorf("%w: ScriptPath and Script are mutually exclusive", ErrInvalidData)
+	}
+
+	if scriptPath == "" && script == "" {
+		return nil, fmt.Errorf("%w: ScriptPath or Script is required", ErrInvalidData)
 	}
 
 	return &ShellRunnable{
 		ScriptPath: scriptPath,
+		Script:     script,
 	}, nil
 }
 
@@ -25,10 +35,46 @@ type ShellResult struct {
 
 type ShellRunnable struct {
 	ScriptPath string
+	Script     string
 }
 
 func (s *ShellRunnable) Run() (RunResult[ShellResult], error) {
-	cmd := exec.Command("sh", s.ScriptPath)
+
+	scriptPath := s.ScriptPath
+
+	if s.Script != "" {
+		// write to temp
+		tmp, err := os.CreateTemp("", "script-*.sh")
+		if err != nil {
+			return RunResult[ShellResult]{
+				Status: ResultStatusFailed,
+				Data: ShellResult{
+					Output:   "",
+					ExitCode: DefaultErrorExitCode,
+					Error:    fmt.Errorf("%w: %v", ErrFailedToCreateTempFile, err).Error(),
+				},
+			}, err
+
+		}
+		scriptPath = tmp.Name()
+		defer os.Remove(scriptPath)
+
+		_, err = tmp.WriteString(s.Script)
+		defer tmp.Close()
+		if err != nil {
+			return RunResult[ShellResult]{
+				Status: ResultStatusFailed,
+				Data: ShellResult{
+					Output:   "",
+					ExitCode: DefaultErrorExitCode,
+					Error:    fmt.Errorf("%w: %v", ErrFailedToWriteTempFile, err).Error(),
+				},
+			}, err
+		}
+
+	}
+
+	cmd := exec.Command("sh", scriptPath)
 	output, err := cmd.Output()
 
 	var stderr []byte
@@ -36,7 +82,7 @@ func (s *ShellRunnable) Run() (RunResult[ShellResult], error) {
 		stderr = err.(*exec.ExitError).Stderr
 	}
 
-	exitCode := 0
+	exitCode := SuccessExitCode
 
 	if err != nil {
 		if exitError, ok := err.(*exec.ExitError); ok {
diff --git a/scheduler.go b/scheduler.go
index b94b699..c26afcf 100644
--- a/scheduler.go
+++ b/scheduler.go
@@ -35,7 +35,7 @@ func (s *IntervalScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
 
 	id := job.GetID()
 	if _, ok := s.jobs[id]; ok {
-		return fmt.Errorf("job %s already scheduled", id)
+		return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
 	}
 
 	stopChan := make(StopChan)
@@ -116,7 +116,7 @@ func (s *CronScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
 
 	id := job.GetID()
 	if _, ok := s.jobs[id]; ok {
-		return fmt.Errorf("job %s already scheduled", id)
+		return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
 	}
 
 	entryId, err := s.cron.AddFunc(s.Spec, func() {
@@ -128,8 +128,7 @@ func (s *CronScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
 	if err != nil {
 		return err
 	}
-
-	s.cron.Start()
+	
 	return nil
 }
 
@@ -187,7 +186,7 @@ func (s *DelayScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
 
 	id := job.GetID()
 	if _, ok := s.jobs[id]; ok {
-		return fmt.Errorf("job %s already scheduled", id)
+		return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
 	}
 
 	stopChan := make(StopChan)
@@ -259,7 +258,7 @@ func (s *EventScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
 
 	id := job.GetID()
 	if _, ok := s.jobs[id]; ok {
-		return fmt.Errorf("job %s already scheduled", id)
+		return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
 	}
 
 	stopChan := make(StopChan)
diff --git a/state.go b/state.go
index a9323ac..ade8cfa 100644
--- a/state.go
+++ b/state.go
@@ -5,7 +5,7 @@ import (
 	"os"
 )
 
-// State repräsentiert den Zustand, den wir speichern wollen
+// State represent the state of the job queue
 type State struct {
 	// Jobs enthält alle Jobs, die wir speichern wollen
 	Jobs []JobSerializedState `json:"jobs"`
@@ -22,7 +22,7 @@ type FileStateManager struct {
 	state    *State
 }
 
-// LoadState lädt den Zustand aus der Datei
+// LoadState load the state from the file
 func (f *FileStateManager) LoadState() error {
 	file, err := os.Open(f.filePath)
 	if err != nil {
@@ -38,7 +38,7 @@ func (f *FileStateManager) LoadState() error {
 	return nil
 }
 
-// SaveState speichert den Zustand in der Datei
+// SaveState save the state to the file
 func (f *FileStateManager) SaveState() error {
 	file, err := os.Create(f.filePath)
 	if err != nil {
diff --git a/worker_test.go b/worker_test.go
index 8ad221a..d9dcecb 100644
--- a/worker_test.go
+++ b/worker_test.go
@@ -44,6 +44,14 @@ func (j DummyJob) GetPriority() Priority {
 	return PriorityDefault
 }
 
+func (j DummyJob) SerializeState() JobSerializedState {
+	return JobSerializedState{}
+}
+
+func (j DummyJob) UnserializeState(serializedState JobSerializedState) {
+	return
+}
+
 func TestAssignJob(t *testing.T) {
 	worker := NewLocalWorker(1)
 	err := worker.Start()
-- 
GitLab