diff --git a/errors.go b/errors.go index 8493dd84676d0d3b8013f1648c7befb8191e7d1d..c3d3da9d85113ad7649d671ea17bde318c06c29b 100644 --- a/errors.go +++ b/errors.go @@ -30,4 +30,8 @@ var ( ErrUnsupportedCredentialType = fmt.Errorf("unsupported credential type") ErrUnsupportedTransferDirection = fmt.Errorf("unsupported transfer direction") ErrInvalidData = fmt.Errorf("invalid data") + ErrUnknownFormat = fmt.Errorf("unknown format") + ErrFailedToCreateTempFile = fmt.Errorf("failed to create temp file") + ErrFailedToWriteTempFile = fmt.Errorf("failed to write temp file") + ErrJobAlreadyScheduled = fmt.Errorf("job already scheduled") ) diff --git a/event-bus.go b/event-bus.go index 23def45d633126f854a3674309f1ca843a498b1b..86cc84eaf08f1d2252d15d6367ae86446dcfb58a 100644 --- a/event-bus.go +++ b/event-bus.go @@ -7,11 +7,12 @@ import ( type EventName string const ( - JobAdded EventName = "JobAdded" - JobRemoved EventName = "JobRemoved" - ExecuteJob EventName = "ExecuteJob" - JobReady EventName = "JobReady" - QueueJob EventName = "QueueJob" + JobAdded EventName = "JobAdded" + JobRemoved EventName = "JobRemoved" + ExecuteJob EventName = "ExecuteJob" + JobReady EventName = "JobReady" + QueueJob EventName = "QueueJob" + JobFinished EventName = "JobFinished" // add more events as needed ) diff --git a/import.go b/import.go index 987ec1232c0d55b224fb4fa746a0eaa0b45f53d1..18ed76888b3625820e1b18488b5652215128d39e 100644 --- a/import.go +++ b/import.go @@ -1,8 +1,12 @@ package jobqueue import ( + "encoding/json" + "fmt" "gopkg.in/yaml.v3" + "io" "os" + "strings" "time" ) @@ -30,34 +34,40 @@ type SchedulerImport struct { Event string `yaml:"event,omitempty" json:"event,omitempty"` } -func ReadYAMLFile(filePath string) ([]JobImport, error) { - data, err := os.ReadFile(filePath) - if err != nil { +func ReadYAML(r io.Reader) ([]JobImport, error) { + var jobs []JobImport + decoder := yaml.NewDecoder(r) + if err := decoder.Decode(&jobs); err != nil { return nil, err } + return jobs, nil +} +func ReadJSON(r io.Reader) ([]JobImport, error) { var jobs []JobImport - err = yaml.Unmarshal(data, &jobs) - if err != nil { + decoder := json.NewDecoder(r) + if err := decoder.Decode(&jobs); err != nil { return nil, err } - return jobs, nil } -func ReadJsonFile(filePath string) ([]JobImport, error) { - data, err := os.ReadFile(filePath) +func ReadYAMLFile(filePath string) ([]JobImport, error) { + file, err := os.Open(filePath) if err != nil { return nil, err } + defer file.Close() + return ReadYAML(file) +} - var jobs []JobImport - err = yaml.Unmarshal(data, &jobs) +func ReadJsonFile(filePath string) ([]JobImport, error) { + file, err := os.Open(filePath) if err != nil { return nil, err } - - return jobs, nil + defer file.Close() + return ReadJSON(file) } func CreateGenericJobFromImport[T any](jobImport JobImport, runner Runnable[T]) GenericJob { @@ -71,71 +81,78 @@ func CreateGenericJobFromImport[T any](jobImport JobImport, runner Runnable[T]) } } -func CreateJobAndSchedulerFromImport(jobImport JobImport) (GenericJob, Scheduler, error) { +func CreateJobAndSchedulerFromImport(jobImport JobImport, manager *Manager) (GenericJob, Scheduler, error) { var job GenericJob - switch jobImport.Runnable.Type { - case "Dummy": + rType := strings.ToLower(jobImport.Runnable.Type) + + runnableData := make(map[string]interface{}) + for k, v := range jobImport.Runnable.Data { + runnableData[strings.ToLower(k)] = v + } + + switch rType { + case "dummy": - runner, err := NewDummyRunnableFromMap(jobImport.Runnable.Data) + runner, err := NewDummyRunnableFromMap(runnableData) if err != nil { return nil, nil, err } job = CreateGenericJobFromImport[DummyResult](jobImport, runner) - case "Counter": + case "counter": - runner, err := NewCounterRunnableFromMap(jobImport.Runnable.Data) + runner, err := NewCounterRunnableFromMap(runnableData) if err != nil { return nil, nil, err } job = CreateGenericJobFromImport[CounterResult](jobImport, runner) - case "FileOperation": - runner, err := NewFileOperationRunnableFromMap(jobImport.Runnable.Data) + case "fileoperation": + runner, err := NewFileOperationRunnableFromMap(runnableData) if err != nil { return nil, nil, err } job = CreateGenericJobFromImport[FileOperationResult](jobImport, runner) - case "DB": - runner, err := NewDBRunnableFromMap(jobImport.Runnable.Data) + case "db": + runner, err := NewDBRunnableFromMap(runnableData) if err != nil { return nil, nil, err } job = CreateGenericJobFromImport[DBResult](jobImport, runner) - case "HTTP": - runner, err := NewHTTPRunnableFromMap(jobImport.Runnable.Data) + case "http": + runner, err := NewHTTPRunnableFromMap(runnableData) if err != nil { return nil, nil, err } job = CreateGenericJobFromImport[HTTPResult](jobImport, runner) - case "Mail": - runner, err := NewMailRunnableFromMap(jobImport.Runnable.Data) + case "mail": + runner, err := NewMailRunnableFromMap(runnableData) if err != nil { return nil, nil, err } job = CreateGenericJobFromImport[MailResult](jobImport, runner) - case "SFTP": - runner, err := NewSFTPRunnableFromMap(jobImport.Runnable.Data) + case "sftp": + runner, err := NewSFTPRunnableFromMap(runnableData) if err != nil { return nil, nil, err } job = CreateGenericJobFromImport[SFTPResult](jobImport, runner) - case "Shell": - runner, err := NewShellRunnableFromMap(jobImport.Runnable.Data) + case "shell": + runner, err := NewShellRunnableFromMap(runnableData) if err != nil { return nil, nil, err } @@ -143,21 +160,37 @@ func CreateJobAndSchedulerFromImport(jobImport JobImport) (GenericJob, Scheduler job = CreateGenericJobFromImport[ShellResult](jobImport, runner) default: - return nil, nil, ErrUnknownRunnableType + return nil, nil, + fmt.Errorf("%w: %s, available types: dummy, counter, fileoperation, db, http, mail, sftp, shell", + ErrUnknownRunnableType, rType) + + } + + sType := strings.ToLower(jobImport.Scheduler.Type) + + scheduleData := make(map[string]interface{}) + for k, v := range jobImport.Runnable.Data { + scheduleData[strings.ToLower(k)] = v } var scheduler Scheduler - switch jobImport.Scheduler.Type { - case "Interval": + switch sType { + case "interval": scheduler = &IntervalScheduler{Interval: jobImport.Scheduler.Interval} - case "Cron": - scheduler = &CronScheduler{Spec: jobImport.Scheduler.Spec} + case "cron": + scheduler = &CronScheduler{ + Spec: jobImport.Scheduler.Spec, + } + + if manager != nil { + scheduler.(*CronScheduler).cron = manager.GetCronInstance() + } - case "Delay": + case "delay": scheduler = &DelayScheduler{Delay: jobImport.Scheduler.Delay} - case "Event": + case "event": scheduler = &EventScheduler{Event: EventName(jobImport.Scheduler.Event)} default: @@ -167,19 +200,54 @@ func CreateJobAndSchedulerFromImport(jobImport JobImport) (GenericJob, Scheduler return job, scheduler, nil } -func LoadJobsAndSchedule(filePath string, manager *Manager) error { +// LoadJobsAndScheduleFromFile read jobs from a file and schedule them. (json/yaml) +func LoadJobsAndScheduleFromFile(filePath string, manager *Manager) error { var err error var imp []JobImport - switch filePath[len(filePath)-4:] { - case "yaml": + if filePath[len(filePath)-4:] == "json" { imp, err = ReadJsonFile(filePath) - break - case "json": + } else if filePath[len(filePath)-4:] == "yaml" { imp, err = ReadYAMLFile(filePath) - break + } else { + return ErrUnknownFormat + } + + if err != nil { + return err + } + + for _, imp := range imp { + job, scheduler, err := CreateJobAndSchedulerFromImport(imp, manager) + if err != nil { + return err + } + + err = manager.ScheduleJob(job, scheduler) + if err != nil { + return err + } + } + + return nil + +} + +// ImportJobsAndSchedule lädt Jobs aus einem Reader und plant sie ein. +func ImportJobsAndSchedule(reader io.Reader, format string, manager *Manager) error { + var err error + var imp []JobImport + + // format to lowercase + format = strings.ToLower(format) + if format == "json" { + imp, err = ReadJSON(reader) + } else if format == "yaml" { + imp, err = ReadYAML(reader) + } else { + return fmt.Errorf("%w: %s", ErrUnknownFormat, format) } if err != nil { @@ -187,7 +255,7 @@ func LoadJobsAndSchedule(filePath string, manager *Manager) error { } for _, imp := range imp { - job, scheduler, err := CreateJobAndSchedulerFromImport(imp) + job, scheduler, err := CreateJobAndSchedulerFromImport(imp, manager) if err != nil { return err } diff --git a/import_test.go b/import_test.go index 056ccefd42948fcdbbbcfde58443c48ea4d509a3..8687627c8107c351d0d7828273368d3aac5718aa 100644 --- a/import_test.go +++ b/import_test.go @@ -59,7 +59,7 @@ func TestCreateJobAndSchedulerFromInput(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - gotJob, gotSchedule, err := CreateJobAndSchedulerFromImport(tt.input) + gotJob, gotSchedule, err := CreateJobAndSchedulerFromImport(tt.input, nil) if (err != nil) != tt.wantErr { t.Errorf("CreateJobAndSchedulerFromImport() error = %v, wantErr %v", err, tt.wantErr) diff --git a/issue-1_test.go b/issue-1_test.go new file mode 100644 index 0000000000000000000000000000000000000000..41c0743c86769c07fcf5f64aa4e6a96ea112a782 --- /dev/null +++ b/issue-1_test.go @@ -0,0 +1,71 @@ +package jobqueue + +import ( + "github.com/robfig/cron/v3" + "github.com/stretchr/testify/assert" + "strings" + "testing" + "time" +) + +func TestRoundTrip(t *testing.T) { + + // type JobImport struct { + // ID string `yaml:"id" json:"id"` + // Priority int `yaml:"priority" json:"priority"` + // Timeout time.Duration `yaml:"timeout" json:"timeout"` + // MaxRetries uint `yaml:"maxRetries" json:"maxRetries"` + // RetryDelay time.Duration `yaml:"retryDelay" json:"retryDelay"` + // Dependencies []string `yaml:"dependencies" json:"dependencies,omitempty"` + // Runnable RunnableImport `yaml:"runnable" json:"runnable"` + // Scheduler SchedulerImport `yaml:"scheduler" json:"scheduler,omitempty"` + //} + // + //type RunnableImport struct { + // Type string `yaml:"type" json:"type"` + // Data map[string]any `yaml:"data,omitempty" json:"data,omitempty"` + //} + // + //type SchedulerImport struct { + // Type string `yaml:"type" json:"type"` + // Interval time.Duration `yaml:"interval,omitempty" json:"interval,omitempty"` + // Spec string `yaml:"spec,omitempty" json:"spec,omitempty"` + // Delay time.Duration `yaml:"delay,omitempty" json:"delay,omitempty"` + // Event string `yaml:"event,omitempty" json:"event,omitempty"` + //} + + // define test data with jobs in yaml format + testData := []byte(` +- id: job1 + priority: 1 + timeout: 1s + maxRetries: 3 + retryDelay: 1s + runnable: + type: shell + data: + script: echo "Hello World $(date)" >> /tmp/job1.log + scheduler: + type: cron + spec: "* * * * *" +`) + + var err error + + manager := NewManager() + manager.SetCronInstance(cron.New()) + worker := NewLocalWorker(1) + err = manager.AddWorker(worker) + assert.Nil(t, err) + + err = manager.Start() + assert.Nil(t, err) + + reader := strings.NewReader(string(testData)) + + err = ImportJobsAndSchedule(reader, "yaml", manager) + assert.Nil(t, err) + + time.Sleep(10 * time.Minute) + +} diff --git a/job.go b/job.go index 86f6def9d151d9691bcd34ff05137f0bf6534815..dd747722a0832bc1ae57618d7a364c5698bb9e18 100644 --- a/job.go +++ b/job.go @@ -38,7 +38,9 @@ type GenericJob interface { GetTimeout() time.Duration - Archive() error + SerializeState() JobSerializedState + + UnserializeState(serializedState JobSerializedState) } type Job[T any] struct { @@ -107,10 +109,6 @@ func (j *Job[T]) UnserializeState(serializedState JobSerializedState) { j.logs = serializedState.Logs } -func (j *Job[T]) Archive() error { - return nil -} - // Execute executes the job func (j *Job[T]) Execute(ctx context.Context) (RunGenericResult, error) { startTime := time.Now() diff --git a/manager.go b/manager.go index 5e7fee84fc10310d337d6276ce2bcdc09ba572fd..3edc059d05a7b7dd77aa77bf2eedc12fbd9a4680 100644 --- a/manager.go +++ b/manager.go @@ -2,6 +2,7 @@ package jobqueue import ( "fmt" + "github.com/robfig/cron/v3" "sync" ) @@ -24,6 +25,8 @@ type Manager struct { stateManager StateManager + cronInstance *cron.Cron + mu sync.Mutex } @@ -47,6 +50,18 @@ func (m *Manager) GetEventBus() *EventBus { return m.eventBus } +func (m *Manager) SetCronInstance(cronInstance *cron.Cron) { + m.mu.Lock() + defer m.mu.Unlock() + m.cronInstance = cronInstance +} + +func (m *Manager) GetCronInstance() *cron.Cron { + m.mu.Lock() + defer m.mu.Unlock() + return m.cronInstance +} + func (m *Manager) checkAndSetRunningState() error { m.state = ManagerStateStopped @@ -178,6 +193,10 @@ func (m *Manager) Start() error { wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error()) } + if m.cronInstance != nil { + m.cronInstance.Start() + } + return wrappedErr } @@ -193,6 +212,7 @@ func (m *Manager) Stop() error { m.eventBus.Unsubscribe(QueueJob, m.jobEventCh) m.eventBus.Unsubscribe(JobReady, m.jobEventCh) + close(m.jobEventCh) var wrappedErr error @@ -220,6 +240,10 @@ func (m *Manager) Stop() error { } } + if m.cronInstance != nil { + m.cronInstance.Stop() + } + return wrappedErr } @@ -250,6 +274,18 @@ func (m *Manager) handleJobEvents() { } } } + + case JobFinished: + job := event.Data.(GenericJob) + + // check if job should archived + // is it an single run job? + schd := m.scheduled[job.GetID()] + if schd == nil { + job.SerializeState() + + } + } } } diff --git a/manager_test.go b/manager_test.go index e991a2d1ba9059ae82fe961b483f3b3ea06bd39e..0fada7d392e90a45420c06b83dd0ceced0735ab1 100644 --- a/manager_test.go +++ b/manager_test.go @@ -64,6 +64,14 @@ func (m *MockGenericJob) GetTimeout() time.Duration { return 0 } +func (m *MockGenericJob) SerializeState() JobSerializedState { + return JobSerializedState{} +} + +func (m *MockGenericJob) UnserializeState(serializedState JobSerializedState) { + return +} + func (m *MockGenericJob) GetID() JobID { return m.ID } diff --git a/runnable-counter.go b/runnable-counter.go index d019ce0549bb8308dc3b929efb5b7209a992b5c1..593a84fc0de764730ada7e48dfcbcdbc7cd1a761 100644 --- a/runnable-counter.go +++ b/runnable-counter.go @@ -1,13 +1,14 @@ package jobqueue import ( + "fmt" "sync" ) func NewCounterRunnableFromMap(data map[string]any) (*CounterRunnable, error) { - count, ok := data["Count"].(int) + count, ok := data["count"].(int) if !ok { - return nil, ErrInvalidData + return nil, fmt.Errorf("%w: Invalid count: %v", ErrInvalidData, data["count"]) } return &CounterRunnable{Count: count}, nil } diff --git a/runnable-fileoperation.go b/runnable-fileoperation.go index 020bf73803e33e4c0339ccfac7a1d687bf5960da..e79fd76631587ef1b6e1caac34de7362cde9a576 100644 --- a/runnable-fileoperation.go +++ b/runnable-fileoperation.go @@ -1,21 +1,25 @@ package jobqueue import ( + "fmt" "os" ) func NewFileOperationRunnableFromMap(data map[string]interface{}) (*FileOperationRunnable, error) { - operation, ok := data["Operation"].(string) + operation, ok := data["operation"].(string) if !ok { - return nil, ErrInvalidData + return nil, fmt.Errorf("%w: Invalid Operation: %v", ErrInvalidData, data["operation"]) } - filePath, ok := data["FilePath"].(string) + filePath, ok := data["filepath"].(string) if !ok { - return nil, ErrInvalidData + return nil, fmt.Errorf("%w: Invalid FilePath: %v", ErrInvalidData, data["filepath"]) } - content, _ := data["Content"].(string) // Optional, so no error check + content, ok := data["content"].(string) // Optional, so no error check + if !ok { + return nil, fmt.Errorf("%w: Invalid Content: %v", ErrInvalidData, data["content"]) + } return &FileOperationRunnable{ Operation: operation, diff --git a/runnable-gorm.go b/runnable-gorm.go index 0c66d2cb3442b4f59db523017aede7fcb535790d..65b956b3126987fb1f46d9205c711e7fd97e1106 100644 --- a/runnable-gorm.go +++ b/runnable-gorm.go @@ -1,24 +1,25 @@ package jobqueue import ( + "fmt" "gorm.io/driver/mysql" "gorm.io/gorm" ) func NewDBRunnableFromMap(data map[string]interface{}) (*DBRunnable, error) { - t, ok := data["Type"].(string) + t, ok := data["type"].(string) if !ok { - return nil, ErrInvalidData + return nil, fmt.Errorf("%w: Invalid Type: %v", ErrInvalidData, data["type"]) } - dsn, ok := data["DSN"].(string) + dsn, ok := data["dsn"].(string) if !ok { - return nil, ErrInvalidData + return nil, fmt.Errorf("%w: Invalid DSN: %v", ErrInvalidData, data["dsn"]) } - query, ok := data["Query"].(string) + query, ok := data["query"].(string) if !ok { - return nil, ErrInvalidData + return nil, fmt.Errorf("%w: Invalid Query: %v", ErrInvalidData, data["query"]) } return &DBRunnable{ diff --git a/runnable-http.go b/runnable-http.go index 07412a72a66b8561406228158df0630efe29e035..67ae4cfec7f3993f360ed42a7000183d376b6699 100644 --- a/runnable-http.go +++ b/runnable-http.go @@ -2,29 +2,30 @@ package jobqueue import ( "bytes" + "fmt" "io" "net/http" ) func NewHTTPRunnableFromMap(data map[string]interface{}) (*HTTPRunnable, error) { - url, ok := data["URL"].(string) + url, ok := data["url"].(string) if !ok { - return nil, ErrInvalidData + return nil, fmt.Errorf("%w: Invalid URL: %v", ErrInvalidData, data["url"]) } - method, ok := data["Method"].(string) + method, ok := data["method"].(string) if !ok { - return nil, ErrInvalidData + return nil, fmt.Errorf("%w: Invalid Method: %v", ErrInvalidData, data["method"]) } - header, ok := data["Header"].(map[string]string) + header, ok := data["header"].(map[string]string) if !ok { - return nil, ErrInvalidData + return nil, fmt.Errorf("%w: Invalid Header: %v", ErrInvalidData, data["header"]) } - body, ok := data["Body"].(string) + body, ok := data["body"].(string) if !ok { - return nil, ErrInvalidData + return nil, fmt.Errorf("%w: Invalid Body: %v", ErrInvalidData, data["body"]) } return &HTTPRunnable{ diff --git a/runnable-mail.go b/runnable-mail.go index a6b7f5e95aa5fac70328d07fc6f3df1742886eb6..0a835fdb13cd60477d83a4e841cc8773d72f697c 100644 --- a/runnable-mail.go +++ b/runnable-mail.go @@ -1,53 +1,54 @@ package jobqueue import ( + "fmt" "net/smtp" ) func NewMailRunnableFromMap(data map[string]interface{}) (*MailRunnable, error) { - to, ok := data["To"].(string) + to, ok := data["to"].(string) if !ok { - return nil, ErrInvalidData + return nil, fmt.Errorf("%w: Invalid To: %v", ErrInvalidData, data["to"]) } - from, ok := data["From"].(string) + from, ok := data["from"].(string) if !ok { - return nil, ErrInvalidData + return nil, fmt.Errorf("%w: Invalid From: %v", ErrInvalidData, data["from"]) } - subject, ok := data["Subject"].(string) + subject, ok := data["subject"].(string) if !ok { - return nil, ErrInvalidData + return nil, fmt.Errorf("%w: Invalid Subject: %v", ErrInvalidData, data["subject"]) } - body, ok := data["Body"].(string) + body, ok := data["l"].(string) if !ok { - return nil, ErrInvalidData + return nil, fmt.Errorf("%w: Invalid Body: %v", ErrInvalidData, data["body"]) } - server, ok := data["Server"].(string) + server, ok := data["server"].(string) if !ok { - return nil, ErrInvalidData + return nil, fmt.Errorf("%w: Invalid Server: %v", ErrInvalidData, data["server"]) } - port, ok := data["Port"].(string) + port, ok := data["port"].(string) if !ok { - return nil, ErrInvalidData + return nil, fmt.Errorf("%w: Invalid Port: %v", ErrInvalidData, data["port"]) } - username, ok := data["Username"].(string) + username, ok := data["username"].(string) if !ok { - return nil, ErrInvalidData + return nil, fmt.Errorf("%w: Invalid Username: %v", ErrInvalidData, data["username"]) } - password, ok := data["Password"].(string) + password, ok := data["password"].(string) if !ok { - return nil, ErrInvalidData + return nil, fmt.Errorf("%w: Invalid Password: %v", ErrInvalidData, data["password"]) } - headers, ok := data["Headers"].(map[string]string) + headers, ok := data["headers"].(map[string]string) if !ok { - return nil, ErrInvalidData + return nil, fmt.Errorf("%w: Invalid Headers: %v", ErrInvalidData, data["headers"]) } return &MailRunnable{ diff --git a/runnable-sftp.go b/runnable-sftp.go index 1fbdb7cb7a5ab630539e914edf6de4b52152e0a0..a2213812dc14d059fb3dca7edf4d559029c7ce8e 100644 --- a/runnable-sftp.go +++ b/runnable-sftp.go @@ -9,23 +9,66 @@ import ( ) func NewSFTPRunnableFromMap(data map[string]interface{}) (*SFTPRunnable, error) { - // Your map to struct conversion logic here - // e.g., + host, ok := data["host"].(string) + if !ok { + return nil, fmt.Errorf("%w: Invalid Host: %v", ErrInvalidData, data["host"]) + } + + port, ok := data["port"].(int) + if !ok { + return nil, fmt.Errorf("%w: Invalid Port: %v", ErrInvalidData, data["port"]) + } + + user, ok := data["user"].(string) + if !ok { + return nil, fmt.Errorf("%w: Invalid User: %v", ErrInvalidData, data["user"]) + } + + insecure, ok := data["insecure"].(bool) + if !ok { + return nil, fmt.Errorf("%w: Invalid Insecure: %v", ErrInvalidData, data["insecure"]) + } + + credential, ok := data["credential"].(string) + if !ok { + return nil, fmt.Errorf("%w: Invalid Credential: %v", ErrInvalidData, data["credential"]) + } + + credentialType, ok := data["credentialtype"].(string) + if !ok { + return nil, fmt.Errorf("%w: Invalid CredentialType: %v", ErrInvalidData, data["credentialtype"]) + } + + hostKey, ok := data["hostkey"].(string) + if !ok { + return nil, fmt.Errorf("%w: Invalid HostKey: %v", ErrInvalidData, data["hostkey"]) + } + + srcDir, ok := data["srcdir"].(string) + if !ok { + return nil, fmt.Errorf("%w: Invalid SrcDir: %v", ErrInvalidData, data["srcdir"]) + } - host, ok := data["Host"].(string) + dstDir, ok := data["dstdir"].(string) if !ok { - return nil, fmt.Errorf("invalid Host") + return nil, fmt.Errorf("%w: Invalid DstDir: %v", ErrInvalidData, data["dstdir"]) } - //... (other fields) - transferDirection, ok := data["TransferDirection"].(string) + transferDirection, ok := data["transferdirection"].(string) if !ok { - return nil, fmt.Errorf("invalid TransferDirection") + return nil, fmt.Errorf("%w: Invalid TransferDirection: %v", ErrInvalidData, data["TransferDirection"]) } return &SFTPRunnable{ - Host: host, - //... (other fields) + Host: host, + Port: port, + User: user, + Insecure: insecure, + Credential: credential, + CredentialType: credentialType, + HostKey: hostKey, + SrcDir: srcDir, + DstDir: dstDir, TransferDirection: Direction(transferDirection), }, nil } diff --git a/runnable-shell.go b/runnable-shell.go index 5a6da9db697a05ebe4f6f74750a59511a640a9dd..cfa5533a304a81424b7572b1a0589494dc64b5fa 100644 --- a/runnable-shell.go +++ b/runnable-shell.go @@ -1,18 +1,28 @@ package jobqueue import ( + "fmt" + "os" "os/exec" "strings" ) func NewShellRunnableFromMap(data map[string]interface{}) (*ShellRunnable, error) { - scriptPath, ok := data["ScriptPath"].(string) - if !ok { - return nil, ErrInvalidData + + scriptPath, _ := data["scriptpath"].(string) + script, _ := data["script"].(string) + + if scriptPath != "" && script != "" { + return nil, fmt.Errorf("%w: ScriptPath and Script are mutually exclusive", ErrInvalidData) + } + + if scriptPath == "" && script == "" { + return nil, fmt.Errorf("%w: ScriptPath or Script is required", ErrInvalidData) } return &ShellRunnable{ ScriptPath: scriptPath, + Script: script, }, nil } @@ -25,10 +35,46 @@ type ShellResult struct { type ShellRunnable struct { ScriptPath string + Script string } func (s *ShellRunnable) Run() (RunResult[ShellResult], error) { - cmd := exec.Command("sh", s.ScriptPath) + + scriptPath := s.ScriptPath + + if s.Script != "" { + // write to temp + tmp, err := os.CreateTemp("", "script-*.sh") + if err != nil { + return RunResult[ShellResult]{ + Status: ResultStatusFailed, + Data: ShellResult{ + Output: "", + ExitCode: DefaultErrorExitCode, + Error: fmt.Errorf("%w: %v", ErrFailedToCreateTempFile, err).Error(), + }, + }, err + + } + scriptPath = tmp.Name() + defer os.Remove(scriptPath) + + _, err = tmp.WriteString(s.Script) + defer tmp.Close() + if err != nil { + return RunResult[ShellResult]{ + Status: ResultStatusFailed, + Data: ShellResult{ + Output: "", + ExitCode: DefaultErrorExitCode, + Error: fmt.Errorf("%w: %v", ErrFailedToWriteTempFile, err).Error(), + }, + }, err + } + + } + + cmd := exec.Command("sh", scriptPath) output, err := cmd.Output() var stderr []byte @@ -36,7 +82,7 @@ func (s *ShellRunnable) Run() (RunResult[ShellResult], error) { stderr = err.(*exec.ExitError).Stderr } - exitCode := 0 + exitCode := SuccessExitCode if err != nil { if exitError, ok := err.(*exec.ExitError); ok { diff --git a/scheduler.go b/scheduler.go index b94b6997b8d400b029df864f62b2d63de7a4552b..c26afcf984bec57a5fb74cc83d0d72b1c784be7e 100644 --- a/scheduler.go +++ b/scheduler.go @@ -35,7 +35,7 @@ func (s *IntervalScheduler) Schedule(job GenericJob, eventBus *EventBus) error { id := job.GetID() if _, ok := s.jobs[id]; ok { - return fmt.Errorf("job %s already scheduled", id) + return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id) } stopChan := make(StopChan) @@ -116,7 +116,7 @@ func (s *CronScheduler) Schedule(job GenericJob, eventBus *EventBus) error { id := job.GetID() if _, ok := s.jobs[id]; ok { - return fmt.Errorf("job %s already scheduled", id) + return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id) } entryId, err := s.cron.AddFunc(s.Spec, func() { @@ -128,8 +128,7 @@ func (s *CronScheduler) Schedule(job GenericJob, eventBus *EventBus) error { if err != nil { return err } - - s.cron.Start() + return nil } @@ -187,7 +186,7 @@ func (s *DelayScheduler) Schedule(job GenericJob, eventBus *EventBus) error { id := job.GetID() if _, ok := s.jobs[id]; ok { - return fmt.Errorf("job %s already scheduled", id) + return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id) } stopChan := make(StopChan) @@ -259,7 +258,7 @@ func (s *EventScheduler) Schedule(job GenericJob, eventBus *EventBus) error { id := job.GetID() if _, ok := s.jobs[id]; ok { - return fmt.Errorf("job %s already scheduled", id) + return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id) } stopChan := make(StopChan) diff --git a/state.go b/state.go index a9323ace29565fe5e01b5a6f6da74fe85c4081d3..ade8cfa7d4e506e1200c84ff4b79f67573c40f3d 100644 --- a/state.go +++ b/state.go @@ -5,7 +5,7 @@ import ( "os" ) -// State repräsentiert den Zustand, den wir speichern wollen +// State represent the state of the job queue type State struct { // Jobs enthält alle Jobs, die wir speichern wollen Jobs []JobSerializedState `json:"jobs"` @@ -22,7 +22,7 @@ type FileStateManager struct { state *State } -// LoadState lädt den Zustand aus der Datei +// LoadState load the state from the file func (f *FileStateManager) LoadState() error { file, err := os.Open(f.filePath) if err != nil { @@ -38,7 +38,7 @@ func (f *FileStateManager) LoadState() error { return nil } -// SaveState speichert den Zustand in der Datei +// SaveState save the state to the file func (f *FileStateManager) SaveState() error { file, err := os.Create(f.filePath) if err != nil { diff --git a/worker_test.go b/worker_test.go index 8ad221a45cf7d893b44900875c497b595cccbf47..d9dcecb0628af8fab2f9288441c4689390eed962 100644 --- a/worker_test.go +++ b/worker_test.go @@ -44,6 +44,14 @@ func (j DummyJob) GetPriority() Priority { return PriorityDefault } +func (j DummyJob) SerializeState() JobSerializedState { + return JobSerializedState{} +} + +func (j DummyJob) UnserializeState(serializedState JobSerializedState) { + return +} + func TestAssignJob(t *testing.T) { worker := NewLocalWorker(1) err := worker.Start()