Skip to content
Snippets Groups Projects
Verified Commit 55adbfcd authored by Volker Schukai's avatar Volker Schukai :alien:
Browse files

feat: implement jobqueue

parent 5f8be4ff
Branches
Tags v1.0.0
No related merge requests found
package jobqueue
const (
SuccessExitCode = iota
DefaultErrorExitCode
TimeoutExitCode
RunnableTerminatedExitCode
)
go.mod 0 → 100644
module gitlab.schukai.com/oss/libraries/go/services/job-queues.git
go 1.20
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
github.com/shirou/gopsutil/v3 v3.23.9 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/stretchr/testify v1.8.4 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
golang.org/x/sys v0.12.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
go.sum 0 → 100644
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E=
github.com/robfig/cron/v3 v3.0.0/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil/v3 v3.23.9 h1:ZI5bWVeu2ep4/DIxB4U9okeYJ7zp/QLTO4auRb/ty/E=
github.com/shirou/gopsutil/v3 v3.23.9/go.mod h1:x/NWSb71eMcjFIO0vhyGW5nZ7oSIgVjrCnADckb85GA=
github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM=
github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ=
github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU=
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw=
github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gob.go 0 → 100644
package jobqueue
import (
"bytes"
"encoding/gob"
"sync"
)
// SerializeJob serializes JobData and JobSpecification into a byte slice
func (j *job) SerializeJob() ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
if err := enc.Encode(j.JobData); err != nil {
return nil, err
}
if err := enc.Encode(j.JobSpecification); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// DeserializeJob deserializes a byte slice into a job struct
func DeserializeJob(data []byte) (*job, error) {
var jobData JobData
var jobSpec JobSpecification
buf := bytes.NewBuffer(data)
dec := gob.NewDecoder(buf)
if err := dec.Decode(&jobData); err != nil {
return nil, err
}
if err := dec.Decode(&jobSpec); err != nil {
return nil, err
}
job := &job{
JobData: jobData,
JobSpecification: jobSpec,
mu: sync.Mutex{},
//ctx: context.Background(),
}
return job, nil
}
package jobqueue
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestSerializationAndDeserialization(t *testing.T) {
// Create a job instance and populate its fields
originalJob := &job{
JobSpecification: JobSpecification{
Id: "testJob",
Priority: 1,
},
}
// Serialize the job to a byte slice
data, err := originalJob.SerializeJob()
assert.Nil(t, err)
// Deserialize the byte slice back to a job struct
deserializedJob, err := DeserializeJob(data)
assert.Nil(t, err)
// Compare the original and deserialized jobs
assert.Equal(t, originalJob.Id, deserializedJob.Id)
assert.Equal(t, originalJob.Priority, deserializedJob.Priority)
}
package jobqueue
import (
"errors"
"testing"
)
// TestJobQueueWithDependencies
// Test if jobs are sorted by priority and dependencies
//
// graph TD
//
// A[Job1-PriorityDefault]
// B[Job2-PriorityHigh]
// C[Job3-PriorityLow]
// D[Job4-PriorityCritical]
//
// A --> B
// A --> C
// C --> D
func TestRunJobQueueWithDependencies(t *testing.T) {
jq := NewJobs()
if jq == nil {
t.Errorf("NewJobs returned nil")
}
// create new jobs
job1 := newJob(JobSpecification{Id: "1"}) // default priority is PriorityDefault
job2 := newJob(JobSpecification{Id: "2"})
job2.JobSpecification.Priority = PriorityHigh
job3 := newJob(JobSpecification{Id: "3"})
job3.JobSpecification.Priority = PriorityLow
job4 := newJob(JobSpecification{Id: "4"})
job4.JobSpecification.Priority = PriorityCritical
job3.JobSpecification.Dependencies = []JobIDType{"1"}
job4.JobSpecification.Dependencies = []JobIDType{"3"}
job2.JobSpecification.Dependencies = []JobIDType{"1"}
_ = job1
}
func TestIssue1NewJobQueue(t *testing.T) {
jq := NewJobs()
if jq == nil {
t.Errorf("NewJobs returned nil")
}
// create new jobs
job1 := JobSpecification{Id: "1"} // default priority is PriorityDefault
job2 := JobSpecification{Id: "2", Priority: PriorityHigh}
job3 := JobSpecification{Id: "3", Priority: PriorityLow}
job4 := JobSpecification{Id: "4", Priority: PriorityCritical}
// add jobs to jobs
if err := jq.AddJob(job1, nil); err != nil {
t.Errorf("Failed to add job: %v", err)
}
if err := jq.AddJob(job1, nil); err == nil || !errors.Is(err, ErrJobAlreadyExists) {
t.Errorf("Expected ErrJobAlreadyExists, got %v", err)
}
if err := jq.AddJob(job2, nil); err != nil {
t.Errorf("Failed to add job: %v", err)
}
// add job3 and job4 to jobs
if err := jq.AddJob(job3, nil); err != nil {
t.Errorf("Failed to add job: %v", err)
}
if err := jq.AddJob(job4, nil); err != nil {
t.Errorf("Failed to add job: %v", err)
}
// check if jobs are in jobs
if len(jq.jobs) != 4 {
t.Errorf("Failed to add all jobs to jobs")
}
}
// TestJobQueueWithDependencies
// Test if jobs are sorted by priority and dependencies
//
// graph TD
//
// A[Job1-PriorityDefault]
// B[Job2-PriorityHigh]
// C[Job3-PriorityLow]
// D[Job4-PriorityCritical]
//
// A --> B
// A --> C
// C --> D
func TestJobQueueWithDependencies(t *testing.T) {
jq := NewJobs()
if jq == nil {
t.Errorf("NewJobs returned nil")
}
job1 := JobSpecification{Id: "1"} // default priority is PriorityDefault
job2 := JobSpecification{Id: "2", Priority: PriorityHigh, Dependencies: []JobIDType{"1"}}
job3 := JobSpecification{Id: "3", Priority: PriorityLow, Dependencies: []JobIDType{"1"}}
job4 := JobSpecification{Id: "4", Priority: PriorityCritical, Dependencies: []JobIDType{"3"}}
// set dependencies
// job1 depends on nothing
// job2 depends on job1
// job3 depends on job1
// job4 depends on job3
// add jobs to jobs
if err := jq.AddJob(job1, nil); err != nil {
t.Errorf("Failed to add job: %v", err)
}
if err := jq.AddJob(job2, nil); err != nil {
t.Errorf("Failed to add job: %v", err)
}
if err := jq.AddJob(job3, nil); err != nil {
t.Errorf("Failed to add job: %v", err)
}
if err := jq.AddJob(job4, nil); err != nil {
t.Errorf("Failed to add job: %v", err)
}
// check if jobs are in jobs
if len(jq.jobs) != 4 {
t.Errorf("Failed to add all jobs to jobs")
}
}
package jobqueue
import (
"time"
)
type JobLog struct {
ProcessID int `json:"process_id"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
ExitCode int `json:"exit_code"`
Result any `json:"output"`
ResourceUsage struct {
Memory uint64 `json:"memory"`
CPU struct {
Percentage float64 `json:"percentage"`
} `json:"cpu"`
} `json:"resource_usage"`
IO struct {
Disk int64 `json:"disk"`
Network int64 `json:"network"`
} `json:"io"`
ErrorMsg string `json:"error_msg"`
IsSuccessful bool `json:"is_successful"`
Metadata map[string]string `json:"metadata"`
}
package jobqueue
import (
"context"
"fmt"
"os"
"time"
)
func (j *job) handleFailover() error {
if j.failover != nil {
return j.failover()
}
return nil
}
func (j *job) Run(ctx context.Context) {
var err error
defer func() {
if j.Status == JobRunning {
j.Status = JobPending
}
}()
j.mu.Lock()
defer j.mu.Unlock()
// Check for resource limits
if j.exceedsResourceLimits() {
j.updateStats(0, ErrResourceLimitExceeded, 0)
return
}
// Execute fail-over logic if specified
err = j.handleFailover()
if err != nil {
j.updateStats(0, err, 0)
return
}
// Add timeout if specified
if j.Timeout > 0 {
var cancelFunc context.CancelFunc
ctx, cancelFunc = context.WithTimeout(ctx, j.Timeout)
defer cancelFunc()
}
maxRetries := j.Retries
if maxRetries == 0 {
maxRetries = 1
}
// Run job
err = nil
for i := 0; i < maxRetries; i++ {
err = j.singleRun(ctx)
if err == nil {
break
}
if j.RetryDelay > 0 {
time.Sleep(j.RetryDelay)
}
}
// Update job status
j.LastRun = time.Now()
// calculate next run
if j.scheduleImpl != nil {
j.NextRun = j.scheduleImpl.Next(j.LastRun)
}
if err != nil {
j.Status = JobFailed
return
}
if j.MaxRuns > 0 && j.Stats.RunCount >= j.MaxRuns {
j.Status = JobFinished
return
}
j.Status = JobPending
}
func (j *job) singleRun(ctx context.Context) (err error) {
startTime := time.Now()
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("job runnable resulted in panic: %v", r)
j.Status = JobFailed
}
}()
if j.runnable == nil {
return ErrNoRunDefined
}
j.Status = JobRunning
exitCode, result, err := j.runnable.Run(ctx)
// Log and stats update
logEntry := JobLog{
ProcessID: os.Getpid(),
StartTime: startTime,
EndTime: time.Now(),
ExitCode: exitCode,
ErrorMsg: "",
Result: result,
IsSuccessful: true,
Metadata: nil,
}
if exitCode != 0 {
logEntry.IsSuccessful = false
logEntry.ExitCode = exitCode
}
if err != nil {
logEntry.ErrorMsg = err.Error()
logEntry.IsSuccessful = false
}
j.addLogEntry(logEntry)
j.updateStats(exitCode, err, time.Since(startTime))
return
}
func (j *job) addLogEntry(logEntry JobLog) {
j.Logs = append(j.Logs, logEntry)
for _, hook := range j.telemetryHooks {
go hook(&logEntry)
}
if j.MaxLogEntries == 0 {
return
}
// Überprüfen, ob die maximale Anzahl von Log-Einträgen überschritten wurde
if len(j.Logs) > j.MaxLogEntries {
// Log-Einträge rotieren und an das Logger-Objekt senden
for i := 0; i < len(j.Logs)-j.MaxLogEntries; i++ {
if j.Logger != nil {
_, err := (*j.Logger).Write(j.Logs[i])
if err != nil {
continue
}
}
}
j.Logs = j.Logs[len(j.Logs)-j.MaxLogEntries:]
}
}
func (j *job) exceedsResourceLimits() bool {
currentCPU := GetCpuUsage()
currentMemory := GetMemoryUsage()
if j.ResourceLimits.CPULimit != 0 && currentCPU > j.ResourceLimits.CPULimit {
return true
}
if j.ResourceLimits.MemoryLimit != 0 && currentMemory > j.ResourceLimits.MemoryLimit {
return true
}
return false
}
func (j *job) updateStats(exitCode int, err error, duration time.Duration) {
j.Stats.RunCount++
if err == nil {
j.Stats.SuccessCount++
} else {
j.Stats.ErrorCount++
j.Stats.LastErrorCode = exitCode
}
// Aktualisieren der Zeitmetriken
j.Stats.TimeMetrics.TotalRunTime += duration
if j.Stats.RunCount == 1 {
j.Stats.TimeMetrics.MinRunTime = duration
j.Stats.TimeMetrics.MaxRunTime = duration
} else {
if duration < j.Stats.TimeMetrics.MinRunTime {
j.Stats.TimeMetrics.MinRunTime = duration
}
if duration > j.Stats.TimeMetrics.MaxRunTime {
j.Stats.TimeMetrics.MaxRunTime = duration
}
}
j.Stats.TimeMetrics.AvgRunTime = j.Stats.TimeMetrics.TotalRunTime / time.Duration(j.Stats.RunCount)
}
package jobqueue
import (
"time"
)
type JobStats struct {
RunCount int `json:"run_count"`
SuccessCount int `json:"success_count"`
ErrorCount int `json:"error_count"`
TimeMetrics struct {
AvgRunTime time.Duration `json:"avg"`
MaxRunTime time.Duration `json:"max"`
MinRunTime time.Duration `json:"min"`
TotalRunTime time.Duration `json:"total"`
} `json:"time_metrics"`
LastErrorCode int `json:"last_error_code"`
LastSuccessCode int `json:"last_success_code"`
PriorityEscalates int `json:"priority_escalates"`
ResourceUsage struct {
CPU struct {
Avg float64 `json:"avg"`
StdDev float64 `json:"std_dev"`
} `json:"cpu"`
Memory struct {
Avg int `json:"avg"`
StdDev int `json:"std_dev"`
} `json:"memory"`
IO struct {
Disk struct {
Avg int64 `json:"avg"`
StdDev int64 `json:"std_dev"`
} `json:"disk"`
Network struct {
Avg int64 `json:"avg"`
StdDev int64 `json:"std_dev"`
} `json:"network"`
} `json:"io"`
} `json:"resource_usage"`
}
package jobqueue
// JobStatus is the status of a job
type JobStatus int
const (
JobPending JobStatus = iota
JobScheduled
JobRunning
JobFailed
JobFinished
)
// String returns the string representation of a JobStatus
func (js JobStatus) String() string {
return [...]string{"Pending", "Scheduled", "Running", "Failed", "Finished"}[js]
}
job.go 0 → 100644
package jobqueue
import (
"context"
"github.com/robfig/cron/v3"
"sync"
"time"
)
type JobIDType string
func (j JobIDType) String() string {
return string(j)
}
func newJob(spec JobSpecification) *job {
j := &job{
JobSpecification: spec,
mu: sync.Mutex{},
}
if spec.Schedule != "" {
schedule, err := cron.ParseStandard(spec.Schedule)
if err != nil {
panic(err)
}
j.scheduleImpl = schedule
}
if spec.Priority == 0 {
j.Priority = PriorityDefault
}
return j
}
type ResourceLimits struct {
CPULimit float64 `json:"cpu_limit,omitempty"`
MemoryLimit uint64 `json:"memory_limit,omitempty"`
}
type JobSpecification struct {
Id JobIDType `json:"id,omitempty"`
Priority int `json:"priority,omitempty"`
MaxRuns int `json:"max_runs,omitempty"`
Concurrency int `json:"concurrency,omitempty"`
Schedule string `json:"schedule,omitempty"`
Timeout time.Duration `json:"timeout,omitempty"`
Retries int `json:"retries,omitempty"`
RetryDelay time.Duration `json:"retry_delay,omitempty"`
ResourceLimits ResourceLimits `json:"resource_limits"`
Dependencies []JobIDType `json:"dependencies,omitempty"`
Tags []string `json:"tags,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
type JobData struct {
Status JobStatus `json:"status,omitempty"`
LastRun time.Time `json:"last_run"`
NextRun time.Time `json:"next_run"`
Stats JobStats `json:"stats"`
Logs []JobLog `json:"logs,omitempty"`
Logger *Logger
MaxLogEntries int
}
// job contains both serializable data and functional fields
type job struct {
JobData `json:"data,omitempty"`
JobSpecification `json:"spec,omitempty"`
runnable Runnable
scheduleImpl cron.Schedule
telemetryHooks []func(*JobLog)
failover func() error
mu sync.Mutex
}
type ReadOnlyJob interface {
GetId() JobIDType
GetPriority() int
//GetExclusive() bool
GetMaxRuns() int
GetConcurrency() int
GetLastRun() time.Time
GetNextRun() time.Time
GetRunnable() Runnable
GetSchedule() cron.Schedule
GetDependencies() []JobIDType
GetDependents() []JobIDType
GetRuns() int
GetLogs() []JobLog
GetStats() JobStats
Run(context.Context)
}
func (j *job) GetStats() JobStats {
j.mu.Lock()
defer j.mu.Unlock()
return j.Stats
}
func (j *job) GetLogs() []JobLog {
j.mu.Lock()
defer j.mu.Unlock()
return j.Logs
}
func (j *job) GetId() JobIDType {
j.mu.Lock()
defer j.mu.Unlock()
return j.Id
}
func (j *job) GetPriority() int {
j.mu.Lock()
defer j.mu.Unlock()
return j.Priority
}
func (j *job) GetMaxRuns() int {
j.mu.Lock()
defer j.mu.Unlock()
return j.MaxRuns
}
func (j *job) GetConcurrency() int {
j.mu.Lock()
defer j.mu.Unlock()
return j.Concurrency
}
func (j *job) GetLastRun() time.Time {
j.mu.Lock()
defer j.mu.Unlock()
return j.LastRun
}
func (j *job) GetNextRun() time.Time {
j.mu.Lock()
defer j.mu.Unlock()
return j.NextRun
}
func (j *job) GetRunnable() Runnable {
j.mu.Lock()
defer j.mu.Unlock()
return j.runnable
}
func (j *job) GetSchedule() cron.Schedule {
j.mu.Lock()
defer j.mu.Unlock()
return j.scheduleImpl
}
func (j *job) GetDependencies() []JobIDType {
j.mu.Lock()
defer j.mu.Unlock()
return j.Dependencies
}
func (j *job) GetDependents() []JobIDType {
j.mu.Lock()
defer j.mu.Unlock()
return j.Dependencies
}
func (j *job) GetRuns() int {
j.mu.Lock()
defer j.mu.Unlock()
return j.Stats.RunCount
}
package jobqueue
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/stretchr/testify/assert"
"testing"
"time"
)
type mockRunnable struct {
shouldFail int
sleep time.Duration
}
func (r *mockRunnable) Run(ctx context.Context) (int, any, error) {
// Create a new context with a timeout
done := make(chan struct{})
var err error
go func() {
if r.sleep > 0 {
// Simulate long-running job
time.Sleep(r.sleep)
}
if r.shouldFail > 0 {
r.shouldFail--
err = errors.New("failed")
}
time.Sleep(1 * time.Millisecond)
close(done)
}()
// Wait until either the job is done or the timeout expires
select {
case <-done:
if err != nil {
return DefaultErrorExitCode, nil, err
}
return SuccessExitCode, nil, nil
case <-ctx.Done():
if ctx.Err() == context.DeadlineExceeded {
// It was a timeout
return TimeoutExitCode, nil, fmt.Errorf("timeout")
}
return DefaultErrorExitCode, nil, ctx.Err()
}
}
func TestJobResourceLimitExceeded(t *testing.T) {
_ = StartResourceMonitoring(1 * time.Second)
defer resetResourceStatsForTesting()
j := &job{
//ctx: context.Background(),
runnable: &mockRunnable{shouldFail: 0},
JobSpecification: JobSpecification{
ResourceLimits: ResourceLimits{
CPULimit: 0.1,
MemoryLimit: 1,
},
},
}
j.Run(context.Background())
assert.NotNil(t, j.GetLastRun())
}
func TestJobSuccessful(t *testing.T) {
j := &job{
runnable: &mockRunnable{shouldFail: 0},
// ctx: context.Background(),
}
j.Run(context.Background())
assert.NotNil(t, j.GetLastRun())
}
func TestJobFailed(t *testing.T) {
j := &job{runnable: &mockRunnable{shouldFail: 1}}
j.Run(context.Background())
assert.NotNil(t, j.GetLastRun())
}
func TestJobRetry(t *testing.T) {
j := &job{
runnable: &mockRunnable{shouldFail: 1},
JobSpecification: JobSpecification{
Retries: 1,
},
}
j.Run(context.Background())
assert.NotNil(t, j.GetLastRun())
assert.Equal(t, 1, j.Stats.ErrorCount)
}
func TestJobTimeout(t *testing.T) {
j := &job{
runnable: &mockRunnable{
shouldFail: 0,
sleep: 4 * time.Millisecond,
},
JobSpecification: JobSpecification{
Timeout: 1 * time.Millisecond,
},
}
j.Run(context.Background())
assert.NotNil(t, j.GetLastRun())
}
func TestNewJobFromJSON(t *testing.T) {
jsonStr := `{"id":"testJob","Priority":1}`
job, err := NewJobFromJSON(jsonStr)
assert.Nil(t, err)
assert.Equal(t, "testJob", job.Id.String())
assert.Equal(t, 1, job.Priority)
}
func TestToFromJSON(t *testing.T) {
j := job{
JobSpecification: JobSpecification{
Id: "testJob",
Priority: 1,
},
//ctx: context.Background(),
}
jsonStr, err := j.ToJSON()
assert.Nil(t, err)
var job2 job
err = job2.FromJSON(jsonStr)
assert.Nil(t, err)
assert.Equal(t, "testJob", job2.Id.String())
}
func TestUnmarshalJSON(t *testing.T) {
jsonStr := `{"data":{"last_run":"0001-01-01T00:00:00Z","next_run":"0001-01-01T00:00:00Z","stats":{"run_count":0,"success_count":0,"error_count":0,"time_metrics":{"avg":0,"max":0,"min":0,"total":0},"last_error_code":0,"last_success_code":0,"priority_escalates":0,"resource_usage":{"cpu":{"avg":0,"std_dev":0},"memory":{"avg":0,"std_dev":0},"io":{"disk":{"avg":0,"std_dev":0},"network":{"avg":0,"std_dev":0}}}}},"spec":{"id":"testJob","priority":1,"resource_limits":{}}}`
var job job
err := json.Unmarshal([]byte(jsonStr), &job)
assert.Nil(t, err)
assert.Equal(t, "testJob", job.Id.String())
assert.Equal(t, 1, job.Priority)
}
func TestNewJob(t *testing.T) {
job := newJob(JobSpecification{
Id: "testJob",
})
assert.NotNil(t, job)
assert.Equal(t, "testJob", job.Id.String())
assert.Equal(t, PriorityDefault, job.Priority)
}
jobs.go 0 → 100644
package jobqueue
import (
"sync"
"time"
)
type JobsInterface interface {
GetJobs() map[JobIDType]ReadOnlyJob
GetExecutableJobs() map[JobIDType]ReadOnlyJob
AddJob(jobSpec JobSpecification, runnable Runnable) error
RemoveJob(id JobIDType) (bool, error)
GetJobStatus(id JobIDType) (JobStatus, error)
Cleanup()
GetFinishedJobs() map[JobIDType]ReadOnlyJob
GetFinishedJob(id JobIDType) ReadOnlyJob
RemoveFinishedJob(id JobIDType) (bool, error)
JobExists(id JobIDType) bool
GetJob(id JobIDType) ReadOnlyJob
GetJobsCount() int
}
type jobs struct {
jobs map[JobIDType]*job
finishedJobs map[JobIDType]*job
mutex sync.Mutex
}
// compile time check if jobs implements JobsInterface
var _ JobsInterface = (*jobs)(nil)
func (jq *jobs) GetJobsCount() int {
jq.mutex.Lock()
defer jq.mutex.Unlock()
return len(jq.jobs)
}
func (jq *jobs) Cleanup() {
jq.mutex.Lock()
defer jq.mutex.Unlock()
for id, job := range jq.jobs {
if job.Status == JobFinished {
jq.finishedJobs[id] = job
delete(jq.jobs, id)
}
}
}
func (jq *jobs) GetFinishedJobs() map[JobIDType]ReadOnlyJob {
jq.mutex.Lock()
defer jq.mutex.Unlock()
jobs := make(map[JobIDType]ReadOnlyJob)
for id, job := range jq.finishedJobs {
jobs[id] = job // Implizites Casting zu ReadOnlyJob
}
return jobs
}
func (jq *jobs) GetFinishedJob(id JobIDType) ReadOnlyJob {
jq.mutex.Lock()
defer jq.mutex.Unlock()
if _, exists := jq.finishedJobs[id]; !exists {
return nil
}
return jq.finishedJobs[id]
}
func (jq *jobs) RemoveFinishedJob(id JobIDType) (bool, error) {
jq.mutex.Lock()
defer jq.mutex.Unlock()
if _, exists := jq.finishedJobs[id]; !exists {
return false, ErrJobNotFound
}
// Update internal data structures.
delete(jq.finishedJobs, id)
return true, nil
}
// GetJobs returns a map of all jobs.
func (jq *jobs) GetJobs() map[JobIDType]ReadOnlyJob {
jq.mutex.Lock()
defer jq.mutex.Unlock()
jobs := make(map[JobIDType]ReadOnlyJob)
for id, job := range jq.jobs {
jobs[id] = job // Implizites Casting zu ReadOnlyJob
}
return jobs
}
// GetJobs returns a map of all jobs.
func (jq *jobs) GetExecutableJobs() map[JobIDType]ReadOnlyJob {
jq.mutex.Lock()
defer jq.mutex.Unlock()
jobs := make(map[JobIDType]ReadOnlyJob)
tempJobs := make(map[JobIDType]*job)
for _, job := range jq.jobs {
if job.Status != JobPending {
continue
}
if job.NextRun.After(time.Now()) {
continue
}
tempJobs[job.Id] = job
}
sortedJobIDs, err := topologicalSortJobs(tempJobs)
if err != nil {
return nil
}
for _, id := range sortedJobIDs {
job := jq.jobs[id]
job.Status = JobScheduled
jobs[id] = jq.jobs[id]
}
return jobs
}
func (jq *jobs) JobExists(id JobIDType) bool {
jq.mutex.Lock()
defer jq.mutex.Unlock()
if _, exists := jq.jobs[id]; !exists {
return false
}
return true
}
func (jq *jobs) GetJob(id JobIDType) ReadOnlyJob {
jq.mutex.Lock()
defer jq.mutex.Unlock()
if _, exists := jq.jobs[id]; !exists {
return nil
}
return jq.jobs[id]
}
// NewJobs creates a new job queue.
func NewJobs() *jobs {
jq := &jobs{
jobs: make(map[JobIDType]*job),
finishedJobs: make(map[JobIDType]*job),
mutex: sync.Mutex{},
}
return jq
}
// AddJob adds a new job to the queue.
func (jq *jobs) AddJob(jobSpec JobSpecification, runnable Runnable) error {
jq.mutex.Lock()
defer jq.mutex.Unlock()
job := newJob(jobSpec)
job.runnable = runnable
if _, exists := jq.jobs[job.Id]; exists {
return ErrJobAlreadyExists
}
for _, dep := range job.Dependencies {
if _, exists := jq.jobs[dep]; !exists {
return ErrUnknownDependency
}
}
jq.jobs[job.Id] = job
return nil
}
// RemoveJob removes a job from the queue.
func (jq *jobs) RemoveJob(id JobIDType) (bool, error) {
jq.mutex.Lock()
defer jq.mutex.Unlock()
if _, exists := jq.jobs[id]; !exists {
return false, ErrJobNotFound
}
// check if job is a dependency of another job
for _, job := range jq.jobs {
for _, dep := range job.Dependencies {
if dep == id {
return false, ErrJobIsDependency
}
}
}
// Update internal data structures.
delete(jq.jobs, id)
return true, nil
}
// GetJobStatus returns the status of a job.
func (jq *jobs) GetJobStatus(id JobIDType) (JobStatus, error) {
jq.mutex.Lock()
defer jq.mutex.Unlock()
if _, exists := jq.jobs[id]; !exists {
return JobStatus(0), ErrJobNotFound
}
return jq.jobs[id].Status, nil
}
package jobqueue
import (
"testing"
)
func TestNewJobQueue(t *testing.T) {
jq := NewJobs()
if jq == nil {
t.Errorf("NewJobs returned nil")
}
}
func TestAddJob(t *testing.T) {
jq := NewJobs()
err := jq.AddJob(JobSpecification{
Id: "1",
Priority: 1,
//Exclusive: true,
MaxRuns: 3,
Concurrency: 2,
}, nil)
if err != nil {
t.Errorf("Failed to add job: %v", err)
}
}
func TestRemoveJob(t *testing.T) {
jq := NewJobs()
err := jq.AddJob(JobSpecification{
Id: "1",
// Set other fields
}, nil)
if err != nil {
t.Errorf("Failed to add job: %v", err)
}
removed, _ := jq.RemoveJob("1")
if !removed {
t.Errorf("Failed to remove job")
}
}
json.go 0 → 100644
package jobqueue
import "github.com/robfig/cron/v3"
import "encoding/json"
// UnmarshalJSON unmarshals a job from json.
func (j *job) UnmarshalJSON(data []byte) error {
type Alias job
aux := &struct {
Schedule string `json:"schedule"`
*Alias
}{
Alias: (*Alias)(j),
}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
if aux.Schedule != "" {
schedule, err := cron.ParseStandard(aux.Schedule)
if err != nil {
return err
}
j.scheduleImpl = schedule
}
return nil
}
// MarshalJSON marshals a job to json.
func (j *job) MarshalJSON() ([]byte, error) {
type Alias job
aux := &struct {
*Alias
}{
Alias: (*Alias)(j),
}
return json.Marshal(aux)
}
// NewJobFromJSON creates a new job from a json string.
func NewJobFromJSON(jsonStr string) (*job, error) {
var job JobSpecification
err := json.Unmarshal([]byte(jsonStr), &job)
if err != nil {
return nil, err
}
return newJob(job), nil
}
// ToJSON marshals a job to a json string.
func (j *job) ToJSON() (string, error) {
data, err := json.Marshal(j)
if err != nil {
return "", err
}
return string(data), nil
}
// FromJSON unmarshals a job from a json string.
func (j *job) FromJSON(jsonStr string) error {
return json.Unmarshal([]byte(jsonStr), j)
}
package jobqueue
package jobqueue
import (
"bytes"
"fmt"
"os"
"sync"
"text/template"
)
// Logger ist ein generisches Log-Interface, das verschiedene Ausgabeziele unterstützt.
type Logger interface {
// Write schreibt einen Log-Eintrag.
Write(entry JobLog) (n int, err error)
// Close schließt den Logger.
Close() error
}
// FileLogger is a logger that writes to a file.
type FileLogger struct {
mu sync.Mutex
logDir string
maxLogSize int64
maxLogFiles int
currentLog *os.File
currentSize int64
logFileIndex int
}
// NewFileLogger creates a new FileLogger.
func NewFileLogger(logDir string, maxLogSize int64, maxLogFiles int) (*FileLogger, error) {
if err := os.MkdirAll(logDir, 0755); err != nil {
return nil, err
}
lw := &FileLogger{
logDir: logDir,
maxLogSize: maxLogSize,
maxLogFiles: maxLogFiles,
currentSize: 0,
logFileIndex: 1,
}
if err := lw.rotateLog(); err != nil {
return nil, err
}
return lw, nil
}
// Write the log entry to the current log file.
func (lw *FileLogger) Write(entry JobLog) (n int, err error) {
lw.mu.Lock()
defer lw.mu.Unlock()
logTemplate := "{{.Timestamp}} {{.JobID}} {{.JobName}} {{.JobStatus}} {{.ExitCode}} {{.Duration}} {{.Message}}"
tmpl := template.New("log")
tmpl, err = tmpl.Parse(logTemplate)
if err != nil {
return 0, err
}
buffer := new(bytes.Buffer)
tmpl.Execute(buffer, entry)
n, err = lw.currentLog.Write(buffer.Bytes())
lw.currentSize += int64(n)
if lw.currentSize >= lw.maxLogSize {
_ = lw.rotateLog()
}
return n, err
}
// Close closes the current log file.
func (lw *FileLogger) Close() error {
lw.mu.Lock()
defer lw.mu.Unlock()
return lw.currentLog.Close()
}
// rotateLog closes the current log file and opens a new one.
func (lw *FileLogger) rotateLog() error {
lw.currentSize = 0
if lw.currentLog != nil {
if err := lw.currentLog.Close(); err != nil {
return err
}
}
logFileName := fmt.Sprintf("%s/job_log_%d.log", lw.logDir, lw.logFileIndex)
lw.logFileIndex++
if lw.logFileIndex > lw.maxLogFiles {
lw.logFileIndex = 1
}
f, err := os.Create(logFileName)
if err != nil {
return err
}
lw.currentLog = f
return nil
}
package jobqueue
import (
"context"
"errors"
"sync"
"time"
)
type JobManager struct {
queue *jobs
executor *jobExecutor
mutex sync.Mutex
maxConcurrency int
}
// NewJobManager creates a new job manager with configurable concurrency and interval.
func NewJobManager(maxConcurrency int, interval time.Duration, stopOnEmpty bool) *JobManager {
jq := NewJobs()
instance := NewJobExecutor(jq, maxConcurrency, interval,
func(executor *jobExecutor) bool {
if executor.Ctx.Err() != nil {
return true
}
if stopOnEmpty && executor.Queue.GetJobsCount() == 0 {
return true
}
return false
})
return &JobManager{
queue: jq,
executor: instance,
maxConcurrency: maxConcurrency,
}
}
func (jm *JobManager) GetJobs() map[JobIDType]ReadOnlyJob {
return jm.queue.GetJobs()
}
func (jm *JobManager) AddJob(jobSpec JobSpecification, runnable Runnable) error {
jm.mutex.Lock()
defer jm.mutex.Unlock()
return jm.queue.AddJob(jobSpec, runnable)
}
func (jm *JobManager) GetJob(id JobIDType) ReadOnlyJob {
return jm.queue.GetJob(id)
}
func (jm *JobManager) JobExists(id JobIDType) bool {
return jm.queue.JobExists(id)
}
func (jm *JobManager) RemoveJob(id JobIDType) (bool, error) {
jm.mutex.Lock()
defer jm.mutex.Unlock()
return jm.queue.RemoveJob(id)
}
func (jm *JobManager) Start() error {
jm.mutex.Lock()
defer jm.mutex.Unlock()
return jm.executor.Start()
}
func (jm *JobManager) Stop() error {
jm.mutex.Lock()
defer jm.mutex.Unlock()
return jm.executor.Stop()
}
func (jm *JobManager) Pause() error {
jm.mutex.Lock()
defer jm.mutex.Unlock()
return jm.executor.Pause()
}
func (jm *JobManager) Resume() error {
jm.mutex.Lock()
defer jm.mutex.Unlock()
return jm.executor.Resume()
}
func (jm *JobManager) GetFinishedJobs() map[JobIDType]ReadOnlyJob {
jm.mutex.Lock()
defer jm.mutex.Unlock()
return jm.queue.GetFinishedJobs()
}
func (jm *JobManager) GetFinishedJob(id JobIDType) ReadOnlyJob {
jm.mutex.Lock()
defer jm.mutex.Unlock()
return jm.queue.GetFinishedJob(id)
}
func (jm *JobManager) RemoveFinishedJob(id JobIDType) (bool, error) {
jm.mutex.Lock()
defer jm.mutex.Unlock()
return jm.queue.RemoveFinishedJob(id)
}
func (jm *JobManager) OnStarted(hook func()) error {
timeout := 5 * time.Second
startTime := time.Now()
for {
if jm.executor.IsRunning() {
hook()
return nil
}
if time.Since(startTime) >= timeout {
return ErrTimeout
}
<-time.After(1 * time.Millisecond)
}
}
func (jm *JobManager) Wait() error {
if jm.executor.Ctx.Err() != nil {
return jm.executor.Ctx.Err()
}
if jm.executor.IsRunning() {
<-jm.executor.Ctx.Done()
if errors.Is(jm.executor.Ctx.Err(), context.Canceled) {
return nil
}
return jm.executor.Ctx.Err()
}
return ErrNotRunning
}
func (jm *JobManager) IsRunning() bool {
jm.mutex.Lock()
defer jm.mutex.Unlock()
return jm.executor.IsRunning()
}
package jobqueue
import (
"github.com/stretchr/testify/assert"
//_ "net/http/pprof"
"sync"
"sync/atomic"
"testing"
"time"
)
func TestRepeatNewJobManagerPauseAndResume(t *testing.T) {
numRepeats := 10 // Anzahl der Wiederholungen
for i := 0; i < numRepeats; i++ {
t.Logf("Repeat %d\n", i+1)
dewJobManagerPauseAndResume(t)
}
}
func dewJobManagerPauseAndResume(t *testing.T) {
var wg sync.WaitGroup
tickerTime := 1 * time.Microsecond
waitBeforePause := 5 * tickerTime
doPause := 5 * tickerTime
maxRuns := 3
jm := NewJobManager(1, tickerTime, false)
if jm == nil {
t.Errorf("NewJobManager returned nil")
}
err := jm.AddJob(JobSpecification{
Id: "1",
MaxRuns: maxRuns,
}, &ExternalProcessRunner{
Command: "sleep",
Args: []string{"1"},
})
assert.Nil(t, err)
job := jm.GetJob("1")
assert.NotNil(t, job)
var isPaused int32
err = jm.Start()
assert.Nil(t, err)
// Timer für 5 Sekunden
timer := time.NewTimer(10 * time.Second)
// anonymous function to stop the job manager after the timer has expired
go func() {
<-timer.C // wait for the timer to expire
err := jm.Stop()
assert.Nil(t, err)
}()
defer func() {
err := jm.Stop()
if err != nil {
assert.ErrorIs(t, err, ErrAlreadyStopped)
}
}()
// Go routine to pause and resume the job manager
go func() {
time.Sleep(waitBeforePause)
err := jm.Pause()
assert.Nil(t, err)
atomic.StoreInt32(&isPaused, 1)
time.Sleep(doPause)
err = jm.Resume()
assert.Nil(t, err)
atomic.StoreInt32(&isPaused, 0)
}()
wg.Add(1)
err = jm.OnStarted(func() {
defer wg.Done()
err := jm.Wait()
assert.Nil(t, err)
finishedJob := jm.GetFinishedJob("1")
assert.NotNil(t, finishedJob)
paused := atomic.LoadInt32(&isPaused) == 1
assert.False(t, paused, "Job manager should not be paused")
})
assert.Nil(t, err)
wg.Wait()
}
func TestNewJobManager(t *testing.T) {
doRuns := 5
jm := NewJobManager(10, 1*time.Microsecond, true)
if jm == nil {
t.Errorf("NewJobManager returned nil")
}
err := jm.AddJob(JobSpecification{
Id: "1",
MaxRuns: doRuns,
}, &ExternalProcessRunner{
Command: "echo",
Args: []string{"hello"},
})
assert.Nil(t, err)
err = jm.Start()
err = jm.OnStarted(func() {
assert.Nil(t, err)
assert.True(t, jm.IsRunning())
job := jm.GetJob("1")
assert.NotNil(t, job)
defer func() {
err := jm.Stop()
if err != nil {
assert.ErrorIs(t, err, ErrAlreadyStopped)
}
}()
err = jm.Wait()
assert.Nil(t, err)
finishedJob := jm.GetFinishedJob("1")
assert.NotNil(t, finishedJob)
runs := finishedJob.GetRuns()
assert.Equal(t, doRuns, runs)
logs := finishedJob.GetLogs()
assert.Equal(t, doRuns, len(logs))
stats := finishedJob.GetStats()
assert.Equal(t, doRuns, stats.SuccessCount)
assert.Equal(t, 0, stats.ErrorCount)
})
assert.Nil(t, err)
}
package jobqueue
const (
_ int = iota * 10
PriorityLow
PriorityDefault
PriorityHigh
PriorityCritical
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment