Skip to content
Snippets Groups Projects
Verified Commit 190b3f51 authored by Volker Schukai's avatar Volker Schukai :alien:
Browse files

feat: #1

parent addb461c
Branches
Tags
No related merge requests found
......@@ -30,4 +30,8 @@ var (
ErrUnsupportedCredentialType = fmt.Errorf("unsupported credential type")
ErrUnsupportedTransferDirection = fmt.Errorf("unsupported transfer direction")
ErrInvalidData = fmt.Errorf("invalid data")
ErrUnknownFormat = fmt.Errorf("unknown format")
ErrFailedToCreateTempFile = fmt.Errorf("failed to create temp file")
ErrFailedToWriteTempFile = fmt.Errorf("failed to write temp file")
ErrJobAlreadyScheduled = fmt.Errorf("job already scheduled")
)
......@@ -12,6 +12,7 @@ const (
ExecuteJob EventName = "ExecuteJob"
JobReady EventName = "JobReady"
QueueJob EventName = "QueueJob"
JobFinished EventName = "JobFinished"
// add more events as needed
)
......
package jobqueue
import (
"encoding/json"
"fmt"
"gopkg.in/yaml.v3"
"io"
"os"
"strings"
"time"
)
......@@ -30,34 +34,40 @@ type SchedulerImport struct {
Event string `yaml:"event,omitempty" json:"event,omitempty"`
}
func ReadYAMLFile(filePath string) ([]JobImport, error) {
data, err := os.ReadFile(filePath)
if err != nil {
func ReadYAML(r io.Reader) ([]JobImport, error) {
var jobs []JobImport
decoder := yaml.NewDecoder(r)
if err := decoder.Decode(&jobs); err != nil {
return nil, err
}
return jobs, nil
}
func ReadJSON(r io.Reader) ([]JobImport, error) {
var jobs []JobImport
err = yaml.Unmarshal(data, &jobs)
if err != nil {
decoder := json.NewDecoder(r)
if err := decoder.Decode(&jobs); err != nil {
return nil, err
}
return jobs, nil
}
func ReadJsonFile(filePath string) ([]JobImport, error) {
data, err := os.ReadFile(filePath)
func ReadYAMLFile(filePath string) ([]JobImport, error) {
file, err := os.Open(filePath)
if err != nil {
return nil, err
}
defer file.Close()
return ReadYAML(file)
}
var jobs []JobImport
err = yaml.Unmarshal(data, &jobs)
func ReadJsonFile(filePath string) ([]JobImport, error) {
file, err := os.Open(filePath)
if err != nil {
return nil, err
}
return jobs, nil
defer file.Close()
return ReadJSON(file)
}
func CreateGenericJobFromImport[T any](jobImport JobImport, runner Runnable[T]) GenericJob {
......@@ -71,71 +81,78 @@ func CreateGenericJobFromImport[T any](jobImport JobImport, runner Runnable[T])
}
}
func CreateJobAndSchedulerFromImport(jobImport JobImport) (GenericJob, Scheduler, error) {
func CreateJobAndSchedulerFromImport(jobImport JobImport, manager *Manager) (GenericJob, Scheduler, error) {
var job GenericJob
switch jobImport.Runnable.Type {
case "Dummy":
rType := strings.ToLower(jobImport.Runnable.Type)
runnableData := make(map[string]interface{})
for k, v := range jobImport.Runnable.Data {
runnableData[strings.ToLower(k)] = v
}
switch rType {
case "dummy":
runner, err := NewDummyRunnableFromMap(jobImport.Runnable.Data)
runner, err := NewDummyRunnableFromMap(runnableData)
if err != nil {
return nil, nil, err
}
job = CreateGenericJobFromImport[DummyResult](jobImport, runner)
case "Counter":
case "counter":
runner, err := NewCounterRunnableFromMap(jobImport.Runnable.Data)
runner, err := NewCounterRunnableFromMap(runnableData)
if err != nil {
return nil, nil, err
}
job = CreateGenericJobFromImport[CounterResult](jobImport, runner)
case "FileOperation":
runner, err := NewFileOperationRunnableFromMap(jobImport.Runnable.Data)
case "fileoperation":
runner, err := NewFileOperationRunnableFromMap(runnableData)
if err != nil {
return nil, nil, err
}
job = CreateGenericJobFromImport[FileOperationResult](jobImport, runner)
case "DB":
runner, err := NewDBRunnableFromMap(jobImport.Runnable.Data)
case "db":
runner, err := NewDBRunnableFromMap(runnableData)
if err != nil {
return nil, nil, err
}
job = CreateGenericJobFromImport[DBResult](jobImport, runner)
case "HTTP":
runner, err := NewHTTPRunnableFromMap(jobImport.Runnable.Data)
case "http":
runner, err := NewHTTPRunnableFromMap(runnableData)
if err != nil {
return nil, nil, err
}
job = CreateGenericJobFromImport[HTTPResult](jobImport, runner)
case "Mail":
runner, err := NewMailRunnableFromMap(jobImport.Runnable.Data)
case "mail":
runner, err := NewMailRunnableFromMap(runnableData)
if err != nil {
return nil, nil, err
}
job = CreateGenericJobFromImport[MailResult](jobImport, runner)
case "SFTP":
runner, err := NewSFTPRunnableFromMap(jobImport.Runnable.Data)
case "sftp":
runner, err := NewSFTPRunnableFromMap(runnableData)
if err != nil {
return nil, nil, err
}
job = CreateGenericJobFromImport[SFTPResult](jobImport, runner)
case "Shell":
runner, err := NewShellRunnableFromMap(jobImport.Runnable.Data)
case "shell":
runner, err := NewShellRunnableFromMap(runnableData)
if err != nil {
return nil, nil, err
}
......@@ -143,21 +160,37 @@ func CreateJobAndSchedulerFromImport(jobImport JobImport) (GenericJob, Scheduler
job = CreateGenericJobFromImport[ShellResult](jobImport, runner)
default:
return nil, nil, ErrUnknownRunnableType
return nil, nil,
fmt.Errorf("%w: %s, available types: dummy, counter, fileoperation, db, http, mail, sftp, shell",
ErrUnknownRunnableType, rType)
}
sType := strings.ToLower(jobImport.Scheduler.Type)
scheduleData := make(map[string]interface{})
for k, v := range jobImport.Runnable.Data {
scheduleData[strings.ToLower(k)] = v
}
var scheduler Scheduler
switch jobImport.Scheduler.Type {
case "Interval":
switch sType {
case "interval":
scheduler = &IntervalScheduler{Interval: jobImport.Scheduler.Interval}
case "Cron":
scheduler = &CronScheduler{Spec: jobImport.Scheduler.Spec}
case "cron":
scheduler = &CronScheduler{
Spec: jobImport.Scheduler.Spec,
}
if manager != nil {
scheduler.(*CronScheduler).cron = manager.GetCronInstance()
}
case "Delay":
case "delay":
scheduler = &DelayScheduler{Delay: jobImport.Scheduler.Delay}
case "Event":
case "event":
scheduler = &EventScheduler{Event: EventName(jobImport.Scheduler.Event)}
default:
......@@ -167,19 +200,54 @@ func CreateJobAndSchedulerFromImport(jobImport JobImport) (GenericJob, Scheduler
return job, scheduler, nil
}
func LoadJobsAndSchedule(filePath string, manager *Manager) error {
// LoadJobsAndScheduleFromFile read jobs from a file and schedule them. (json/yaml)
func LoadJobsAndScheduleFromFile(filePath string, manager *Manager) error {
var err error
var imp []JobImport
switch filePath[len(filePath)-4:] {
case "yaml":
if filePath[len(filePath)-4:] == "json" {
imp, err = ReadJsonFile(filePath)
break
case "json":
} else if filePath[len(filePath)-4:] == "yaml" {
imp, err = ReadYAMLFile(filePath)
break
} else {
return ErrUnknownFormat
}
if err != nil {
return err
}
for _, imp := range imp {
job, scheduler, err := CreateJobAndSchedulerFromImport(imp, manager)
if err != nil {
return err
}
err = manager.ScheduleJob(job, scheduler)
if err != nil {
return err
}
}
return nil
}
// ImportJobsAndSchedule lädt Jobs aus einem Reader und plant sie ein.
func ImportJobsAndSchedule(reader io.Reader, format string, manager *Manager) error {
var err error
var imp []JobImport
// format to lowercase
format = strings.ToLower(format)
if format == "json" {
imp, err = ReadJSON(reader)
} else if format == "yaml" {
imp, err = ReadYAML(reader)
} else {
return fmt.Errorf("%w: %s", ErrUnknownFormat, format)
}
if err != nil {
......@@ -187,7 +255,7 @@ func LoadJobsAndSchedule(filePath string, manager *Manager) error {
}
for _, imp := range imp {
job, scheduler, err := CreateJobAndSchedulerFromImport(imp)
job, scheduler, err := CreateJobAndSchedulerFromImport(imp, manager)
if err != nil {
return err
}
......
......@@ -59,7 +59,7 @@ func TestCreateJobAndSchedulerFromInput(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotJob, gotSchedule, err := CreateJobAndSchedulerFromImport(tt.input)
gotJob, gotSchedule, err := CreateJobAndSchedulerFromImport(tt.input, nil)
if (err != nil) != tt.wantErr {
t.Errorf("CreateJobAndSchedulerFromImport() error = %v, wantErr %v", err, tt.wantErr)
......
package jobqueue
import (
"github.com/robfig/cron/v3"
"github.com/stretchr/testify/assert"
"strings"
"testing"
"time"
)
func TestRoundTrip(t *testing.T) {
// type JobImport struct {
// ID string `yaml:"id" json:"id"`
// Priority int `yaml:"priority" json:"priority"`
// Timeout time.Duration `yaml:"timeout" json:"timeout"`
// MaxRetries uint `yaml:"maxRetries" json:"maxRetries"`
// RetryDelay time.Duration `yaml:"retryDelay" json:"retryDelay"`
// Dependencies []string `yaml:"dependencies" json:"dependencies,omitempty"`
// Runnable RunnableImport `yaml:"runnable" json:"runnable"`
// Scheduler SchedulerImport `yaml:"scheduler" json:"scheduler,omitempty"`
//}
//
//type RunnableImport struct {
// Type string `yaml:"type" json:"type"`
// Data map[string]any `yaml:"data,omitempty" json:"data,omitempty"`
//}
//
//type SchedulerImport struct {
// Type string `yaml:"type" json:"type"`
// Interval time.Duration `yaml:"interval,omitempty" json:"interval,omitempty"`
// Spec string `yaml:"spec,omitempty" json:"spec,omitempty"`
// Delay time.Duration `yaml:"delay,omitempty" json:"delay,omitempty"`
// Event string `yaml:"event,omitempty" json:"event,omitempty"`
//}
// define test data with jobs in yaml format
testData := []byte(`
- id: job1
priority: 1
timeout: 1s
maxRetries: 3
retryDelay: 1s
runnable:
type: shell
data:
script: echo "Hello World $(date)" >> /tmp/job1.log
scheduler:
type: cron
spec: "* * * * *"
`)
var err error
manager := NewManager()
manager.SetCronInstance(cron.New())
worker := NewLocalWorker(1)
err = manager.AddWorker(worker)
assert.Nil(t, err)
err = manager.Start()
assert.Nil(t, err)
reader := strings.NewReader(string(testData))
err = ImportJobsAndSchedule(reader, "yaml", manager)
assert.Nil(t, err)
time.Sleep(10 * time.Minute)
}
......@@ -38,7 +38,9 @@ type GenericJob interface {
GetTimeout() time.Duration
Archive() error
SerializeState() JobSerializedState
UnserializeState(serializedState JobSerializedState)
}
type Job[T any] struct {
......@@ -107,10 +109,6 @@ func (j *Job[T]) UnserializeState(serializedState JobSerializedState) {
j.logs = serializedState.Logs
}
func (j *Job[T]) Archive() error {
return nil
}
// Execute executes the job
func (j *Job[T]) Execute(ctx context.Context) (RunGenericResult, error) {
startTime := time.Now()
......
......@@ -2,6 +2,7 @@ package jobqueue
import (
"fmt"
"github.com/robfig/cron/v3"
"sync"
)
......@@ -24,6 +25,8 @@ type Manager struct {
stateManager StateManager
cronInstance *cron.Cron
mu sync.Mutex
}
......@@ -47,6 +50,18 @@ func (m *Manager) GetEventBus() *EventBus {
return m.eventBus
}
func (m *Manager) SetCronInstance(cronInstance *cron.Cron) {
m.mu.Lock()
defer m.mu.Unlock()
m.cronInstance = cronInstance
}
func (m *Manager) GetCronInstance() *cron.Cron {
m.mu.Lock()
defer m.mu.Unlock()
return m.cronInstance
}
func (m *Manager) checkAndSetRunningState() error {
m.state = ManagerStateStopped
......@@ -178,6 +193,10 @@ func (m *Manager) Start() error {
wrappedErr = fmt.Errorf("%w\n%s", wrappedErr, err.Error())
}
if m.cronInstance != nil {
m.cronInstance.Start()
}
return wrappedErr
}
......@@ -193,6 +212,7 @@ func (m *Manager) Stop() error {
m.eventBus.Unsubscribe(QueueJob, m.jobEventCh)
m.eventBus.Unsubscribe(JobReady, m.jobEventCh)
close(m.jobEventCh)
var wrappedErr error
......@@ -220,6 +240,10 @@ func (m *Manager) Stop() error {
}
}
if m.cronInstance != nil {
m.cronInstance.Stop()
}
return wrappedErr
}
......@@ -250,6 +274,18 @@ func (m *Manager) handleJobEvents() {
}
}
}
case JobFinished:
job := event.Data.(GenericJob)
// check if job should archived
// is it an single run job?
schd := m.scheduled[job.GetID()]
if schd == nil {
job.SerializeState()
}
}
}
}
......
......@@ -64,6 +64,14 @@ func (m *MockGenericJob) GetTimeout() time.Duration {
return 0
}
func (m *MockGenericJob) SerializeState() JobSerializedState {
return JobSerializedState{}
}
func (m *MockGenericJob) UnserializeState(serializedState JobSerializedState) {
return
}
func (m *MockGenericJob) GetID() JobID {
return m.ID
}
......
package jobqueue
import (
"fmt"
"sync"
)
func NewCounterRunnableFromMap(data map[string]any) (*CounterRunnable, error) {
count, ok := data["Count"].(int)
count, ok := data["count"].(int)
if !ok {
return nil, ErrInvalidData
return nil, fmt.Errorf("%w: Invalid count: %v", ErrInvalidData, data["count"])
}
return &CounterRunnable{Count: count}, nil
}
......
package jobqueue
import (
"fmt"
"os"
)
func NewFileOperationRunnableFromMap(data map[string]interface{}) (*FileOperationRunnable, error) {
operation, ok := data["Operation"].(string)
operation, ok := data["operation"].(string)
if !ok {
return nil, ErrInvalidData
return nil, fmt.Errorf("%w: Invalid Operation: %v", ErrInvalidData, data["operation"])
}
filePath, ok := data["FilePath"].(string)
filePath, ok := data["filepath"].(string)
if !ok {
return nil, ErrInvalidData
return nil, fmt.Errorf("%w: Invalid FilePath: %v", ErrInvalidData, data["filepath"])
}
content, _ := data["Content"].(string) // Optional, so no error check
content, ok := data["content"].(string) // Optional, so no error check
if !ok {
return nil, fmt.Errorf("%w: Invalid Content: %v", ErrInvalidData, data["content"])
}
return &FileOperationRunnable{
Operation: operation,
......
package jobqueue
import (
"fmt"
"gorm.io/driver/mysql"
"gorm.io/gorm"
)
func NewDBRunnableFromMap(data map[string]interface{}) (*DBRunnable, error) {
t, ok := data["Type"].(string)
t, ok := data["type"].(string)
if !ok {
return nil, ErrInvalidData
return nil, fmt.Errorf("%w: Invalid Type: %v", ErrInvalidData, data["type"])
}
dsn, ok := data["DSN"].(string)
dsn, ok := data["dsn"].(string)
if !ok {
return nil, ErrInvalidData
return nil, fmt.Errorf("%w: Invalid DSN: %v", ErrInvalidData, data["dsn"])
}
query, ok := data["Query"].(string)
query, ok := data["query"].(string)
if !ok {
return nil, ErrInvalidData
return nil, fmt.Errorf("%w: Invalid Query: %v", ErrInvalidData, data["query"])
}
return &DBRunnable{
......
......@@ -2,29 +2,30 @@ package jobqueue
import (
"bytes"
"fmt"
"io"
"net/http"
)
func NewHTTPRunnableFromMap(data map[string]interface{}) (*HTTPRunnable, error) {
url, ok := data["URL"].(string)
url, ok := data["url"].(string)
if !ok {
return nil, ErrInvalidData
return nil, fmt.Errorf("%w: Invalid URL: %v", ErrInvalidData, data["url"])
}
method, ok := data["Method"].(string)
method, ok := data["method"].(string)
if !ok {
return nil, ErrInvalidData
return nil, fmt.Errorf("%w: Invalid Method: %v", ErrInvalidData, data["method"])
}
header, ok := data["Header"].(map[string]string)
header, ok := data["header"].(map[string]string)
if !ok {
return nil, ErrInvalidData
return nil, fmt.Errorf("%w: Invalid Header: %v", ErrInvalidData, data["header"])
}
body, ok := data["Body"].(string)
body, ok := data["body"].(string)
if !ok {
return nil, ErrInvalidData
return nil, fmt.Errorf("%w: Invalid Body: %v", ErrInvalidData, data["body"])
}
return &HTTPRunnable{
......
package jobqueue
import (
"fmt"
"net/smtp"
)
func NewMailRunnableFromMap(data map[string]interface{}) (*MailRunnable, error) {
to, ok := data["To"].(string)
to, ok := data["to"].(string)
if !ok {
return nil, ErrInvalidData
return nil, fmt.Errorf("%w: Invalid To: %v", ErrInvalidData, data["to"])
}
from, ok := data["From"].(string)
from, ok := data["from"].(string)
if !ok {
return nil, ErrInvalidData
return nil, fmt.Errorf("%w: Invalid From: %v", ErrInvalidData, data["from"])
}
subject, ok := data["Subject"].(string)
subject, ok := data["subject"].(string)
if !ok {
return nil, ErrInvalidData
return nil, fmt.Errorf("%w: Invalid Subject: %v", ErrInvalidData, data["subject"])
}
body, ok := data["Body"].(string)
body, ok := data["l"].(string)
if !ok {
return nil, ErrInvalidData
return nil, fmt.Errorf("%w: Invalid Body: %v", ErrInvalidData, data["body"])
}
server, ok := data["Server"].(string)
server, ok := data["server"].(string)
if !ok {
return nil, ErrInvalidData
return nil, fmt.Errorf("%w: Invalid Server: %v", ErrInvalidData, data["server"])
}
port, ok := data["Port"].(string)
port, ok := data["port"].(string)
if !ok {
return nil, ErrInvalidData
return nil, fmt.Errorf("%w: Invalid Port: %v", ErrInvalidData, data["port"])
}
username, ok := data["Username"].(string)
username, ok := data["username"].(string)
if !ok {
return nil, ErrInvalidData
return nil, fmt.Errorf("%w: Invalid Username: %v", ErrInvalidData, data["username"])
}
password, ok := data["Password"].(string)
password, ok := data["password"].(string)
if !ok {
return nil, ErrInvalidData
return nil, fmt.Errorf("%w: Invalid Password: %v", ErrInvalidData, data["password"])
}
headers, ok := data["Headers"].(map[string]string)
headers, ok := data["headers"].(map[string]string)
if !ok {
return nil, ErrInvalidData
return nil, fmt.Errorf("%w: Invalid Headers: %v", ErrInvalidData, data["headers"])
}
return &MailRunnable{
......
......@@ -9,23 +9,66 @@ import (
)
func NewSFTPRunnableFromMap(data map[string]interface{}) (*SFTPRunnable, error) {
// Your map to struct conversion logic here
// e.g.,
host, ok := data["host"].(string)
if !ok {
return nil, fmt.Errorf("%w: Invalid Host: %v", ErrInvalidData, data["host"])
}
port, ok := data["port"].(int)
if !ok {
return nil, fmt.Errorf("%w: Invalid Port: %v", ErrInvalidData, data["port"])
}
user, ok := data["user"].(string)
if !ok {
return nil, fmt.Errorf("%w: Invalid User: %v", ErrInvalidData, data["user"])
}
insecure, ok := data["insecure"].(bool)
if !ok {
return nil, fmt.Errorf("%w: Invalid Insecure: %v", ErrInvalidData, data["insecure"])
}
credential, ok := data["credential"].(string)
if !ok {
return nil, fmt.Errorf("%w: Invalid Credential: %v", ErrInvalidData, data["credential"])
}
credentialType, ok := data["credentialtype"].(string)
if !ok {
return nil, fmt.Errorf("%w: Invalid CredentialType: %v", ErrInvalidData, data["credentialtype"])
}
hostKey, ok := data["hostkey"].(string)
if !ok {
return nil, fmt.Errorf("%w: Invalid HostKey: %v", ErrInvalidData, data["hostkey"])
}
srcDir, ok := data["srcdir"].(string)
if !ok {
return nil, fmt.Errorf("%w: Invalid SrcDir: %v", ErrInvalidData, data["srcdir"])
}
host, ok := data["Host"].(string)
dstDir, ok := data["dstdir"].(string)
if !ok {
return nil, fmt.Errorf("invalid Host")
return nil, fmt.Errorf("%w: Invalid DstDir: %v", ErrInvalidData, data["dstdir"])
}
//... (other fields)
transferDirection, ok := data["TransferDirection"].(string)
transferDirection, ok := data["transferdirection"].(string)
if !ok {
return nil, fmt.Errorf("invalid TransferDirection")
return nil, fmt.Errorf("%w: Invalid TransferDirection: %v", ErrInvalidData, data["TransferDirection"])
}
return &SFTPRunnable{
Host: host,
//... (other fields)
Port: port,
User: user,
Insecure: insecure,
Credential: credential,
CredentialType: credentialType,
HostKey: hostKey,
SrcDir: srcDir,
DstDir: dstDir,
TransferDirection: Direction(transferDirection),
}, nil
}
......
package jobqueue
import (
"fmt"
"os"
"os/exec"
"strings"
)
func NewShellRunnableFromMap(data map[string]interface{}) (*ShellRunnable, error) {
scriptPath, ok := data["ScriptPath"].(string)
if !ok {
return nil, ErrInvalidData
scriptPath, _ := data["scriptpath"].(string)
script, _ := data["script"].(string)
if scriptPath != "" && script != "" {
return nil, fmt.Errorf("%w: ScriptPath and Script are mutually exclusive", ErrInvalidData)
}
if scriptPath == "" && script == "" {
return nil, fmt.Errorf("%w: ScriptPath or Script is required", ErrInvalidData)
}
return &ShellRunnable{
ScriptPath: scriptPath,
Script: script,
}, nil
}
......@@ -25,10 +35,46 @@ type ShellResult struct {
type ShellRunnable struct {
ScriptPath string
Script string
}
func (s *ShellRunnable) Run() (RunResult[ShellResult], error) {
cmd := exec.Command("sh", s.ScriptPath)
scriptPath := s.ScriptPath
if s.Script != "" {
// write to temp
tmp, err := os.CreateTemp("", "script-*.sh")
if err != nil {
return RunResult[ShellResult]{
Status: ResultStatusFailed,
Data: ShellResult{
Output: "",
ExitCode: DefaultErrorExitCode,
Error: fmt.Errorf("%w: %v", ErrFailedToCreateTempFile, err).Error(),
},
}, err
}
scriptPath = tmp.Name()
defer os.Remove(scriptPath)
_, err = tmp.WriteString(s.Script)
defer tmp.Close()
if err != nil {
return RunResult[ShellResult]{
Status: ResultStatusFailed,
Data: ShellResult{
Output: "",
ExitCode: DefaultErrorExitCode,
Error: fmt.Errorf("%w: %v", ErrFailedToWriteTempFile, err).Error(),
},
}, err
}
}
cmd := exec.Command("sh", scriptPath)
output, err := cmd.Output()
var stderr []byte
......@@ -36,7 +82,7 @@ func (s *ShellRunnable) Run() (RunResult[ShellResult], error) {
stderr = err.(*exec.ExitError).Stderr
}
exitCode := 0
exitCode := SuccessExitCode
if err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
......
......@@ -35,7 +35,7 @@ func (s *IntervalScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("job %s already scheduled", id)
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
stopChan := make(StopChan)
......@@ -116,7 +116,7 @@ func (s *CronScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("job %s already scheduled", id)
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
entryId, err := s.cron.AddFunc(s.Spec, func() {
......@@ -129,7 +129,6 @@ func (s *CronScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
return err
}
s.cron.Start()
return nil
}
......@@ -187,7 +186,7 @@ func (s *DelayScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("job %s already scheduled", id)
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
stopChan := make(StopChan)
......@@ -259,7 +258,7 @@ func (s *EventScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("job %s already scheduled", id)
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
stopChan := make(StopChan)
......
......@@ -5,7 +5,7 @@ import (
"os"
)
// State repräsentiert den Zustand, den wir speichern wollen
// State represent the state of the job queue
type State struct {
// Jobs enthält alle Jobs, die wir speichern wollen
Jobs []JobSerializedState `json:"jobs"`
......@@ -22,7 +22,7 @@ type FileStateManager struct {
state *State
}
// LoadState lädt den Zustand aus der Datei
// LoadState load the state from the file
func (f *FileStateManager) LoadState() error {
file, err := os.Open(f.filePath)
if err != nil {
......@@ -38,7 +38,7 @@ func (f *FileStateManager) LoadState() error {
return nil
}
// SaveState speichert den Zustand in der Datei
// SaveState save the state to the file
func (f *FileStateManager) SaveState() error {
file, err := os.Create(f.filePath)
if err != nil {
......
......@@ -44,6 +44,14 @@ func (j DummyJob) GetPriority() Priority {
return PriorityDefault
}
func (j DummyJob) SerializeState() JobSerializedState {
return JobSerializedState{}
}
func (j DummyJob) UnserializeState(serializedState JobSerializedState) {
return
}
func TestAssignJob(t *testing.T) {
worker := NewLocalWorker(1)
err := worker.Start()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment