Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • oss/libraries/go/services/job-queues
1 result
Select Git revision
Show changes
Commits on Source (1)
......@@ -105,6 +105,25 @@ func (j *JobPersistence) UnmarshalJSON(data []byte) error {
return nil
}
func parseDateFormats(value string) (time.Time, error) {
return parseMultipleDateFormats(value, []string{
time.RFC3339, time.RFC3339Nano,
"2006-01-02T15:04:05",
"2006-01-02T15:04:05.999999999",
"2006-01-02T15:04:05Z07:00",
"2006-01-02T15:04:05",
"2006-01-02T15:04",
"2006-01-02T15",
"2006-01-02 15:04:05",
"2006-01-02 15:04:05.999999999",
"2006-01-02 15:04:05Z07:00",
"2006-01-02 15:04:05",
"2006-01-02 15:04",
"2006-01-02 15",
"2006-01-02",
})
}
func parseMultipleDateFormats(value string, formats []string) (time.Time, error) {
var t time.Time
var err error
......@@ -356,11 +375,11 @@ func CreateJobAndSchedulerFromPersistence(jobImport JobPersistence, manager *Man
switch sType {
case "interval":
if jobImport.Scheduler.Interval == 0 {
if *jobImport.Scheduler.Interval == 0 {
return nil, nil, fmt.Errorf("%w: interval is 0", ErrSchedulerMisconfiguration)
}
scheduler = &IntervalScheduler{Interval: jobImport.Scheduler.Interval}
scheduler = &IntervalScheduler{Interval: *jobImport.Scheduler.Interval}
case "cron":
......@@ -385,11 +404,11 @@ func CreateJobAndSchedulerFromPersistence(jobImport JobPersistence, manager *Man
case "delay":
if jobImport.Scheduler.Delay == 0 {
if *jobImport.Scheduler.Delay == 0 {
return nil, nil, fmt.Errorf("%w: delay is 0", ErrSchedulerMisconfiguration)
}
scheduler = &DelayScheduler{Delay: jobImport.Scheduler.Delay}
scheduler = &DelayScheduler{Delay: *jobImport.Scheduler.Delay}
case "event":
......
......@@ -17,6 +17,7 @@ func TestCreateJobAndSchedulerFromInput(t *testing.T) {
time10s := 10 * time.Second
time2s := 5 * time.Minute
time1m := 1 * time.Minute
tests := []struct {
name string
......@@ -39,7 +40,7 @@ func TestCreateJobAndSchedulerFromInput(t *testing.T) {
},
Scheduler: SchedulerPersistence{
Type: "Interval",
Interval: 1 * time.Minute,
Interval: &time1m,
},
},
wantJob: GenericJob(&Job[ShellResult]{ /* Initialization */ }),
......
......@@ -97,6 +97,6 @@ func (s *DelayScheduler) JobExists(id JobID) bool {
func (s *DelayScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
Delay: s.Delay,
Delay: &s.Delay,
}
}
......@@ -105,6 +105,6 @@ func (s *IntervalScheduler) JobExists(id JobID) bool {
func (s *IntervalScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
Interval: s.Interval,
Interval: &s.Interval,
}
}
......@@ -28,7 +28,7 @@ interval: "1h30m"
assert.Equal(t, expectedTime, *sp.Time, "the time should be parsed correctly")
expectedInterval, _ := time.ParseDuration("1h30m")
assert.Equal(t, expectedInterval, sp.Interval, "the interval should be parsed correctly")
assert.Equal(t, &expectedInterval, sp.Interval, "the interval should be parsed correctly")
}
// TestUnmarshalJSON tests the UnmarshalJSON method
......@@ -56,5 +56,5 @@ func TestUnmarshalJSON(t *testing.T) {
// Verify that the Delay field is correctly parsed
expectedInterval, _ := time.ParseDuration("1h30m")
assert.Equal(t, expectedInterval, sp.Interval, "Interval should be correctly parsed")
assert.Equal(t, &expectedInterval, sp.Interval, "Interval should be correctly parsed")
}
......@@ -23,10 +23,15 @@ type Scheduler interface {
}
type SchedulerPersistence struct {
Time *time.Time `yaml:"time,omitempty" json:"time,omitempty" gorm:"column:time"`
Interval time.Duration `yaml:"interval,omitempty" json:"interval,omitempty" gorm:"column:interval"`
Delay time.Duration `yaml:"delay,omitempty" json:"delay,omitempty" gorm:"column:delay"`
EventFlags fsnotify.Op `yaml:"eventFlags,omitempty" json:"eventFlags,omitempty" gorm:"column:eventFlags"`
Time *time.Time `yaml:"-" json:"-" gorm:"column:time"`
Interval *time.Duration `yaml:"-" json:"-" gorm:"column:interval"`
Delay *time.Duration `yaml:"-" json:"-" gorm:"column:delay"`
EventFlags fsnotify.Op `yaml:"-" json:"-" gorm:"column:eventFlags"`
TimeString *string `yaml:"time,omitempty" json:"time,omitempty" gorm:"-"`
IntervalString *string `yaml:"interval,omitempty" json:"interval,omitempty" gorm:"-"`
DelayString *string `yaml:"delay,omitempty" json:"delay,omitempty" gorm:"-"`
EventFlagsString *string `yaml:"eventFlags,omitempty" json:"eventFlags,omitempty" gorm:"-"`
Type string `yaml:"type" json:"type" gorm:"column:type"`
Spec string `yaml:"spec,omitempty" json:"spec,omitempty" gorm:"column:spec"`
......@@ -35,61 +40,155 @@ type SchedulerPersistence struct {
Path string `yaml:"path,omitempty" json:"path,omitempty" gorm:"column:path"`
}
type scheduleImportStruct struct {
Time *string `yaml:"time,omitempty" json:"time,omitempty"`
Interval *string `yaml:"interval,omitempty" json:"interval,omitempty"`
Delay *string `yaml:"delay,omitempty" json:"delay,omitempty"`
EventFlags *string `yaml:"eventFlags,omitempty" json:"eventFlags,omitempty"`
Type string `yaml:"type" json:"type"`
Spec string `yaml:"spec,omitempty" json:"spec,omitempty"`
Event string `yaml:"event,omitempty" json:"event,omitempty"`
Executed bool `yaml:"executed,omitempty" json:"executed,omitempty"`
Path string `yaml:"path,omitempty" json:"path,omitempty"`
func formatEventFlags(flags fsnotify.Op) string {
var flagStrings []string
if flags&fsnotify.Create != 0 {
flagStrings = append(flagStrings, "Create")
}
if flags&fsnotify.Write != 0 {
flagStrings = append(flagStrings, "Write")
}
if flags&fsnotify.Remove != 0 {
flagStrings = append(flagStrings, "Remove")
}
if flags&fsnotify.Rename != 0 {
flagStrings = append(flagStrings, "Rename")
}
if flags&fsnotify.Chmod != 0 {
flagStrings = append(flagStrings, "Chmod")
}
return strings.Join(flagStrings, "|")
}
func (j *SchedulerPersistence) MarshalJSON() ([]byte, error) {
type Alias SchedulerPersistence
return json.Marshal(&struct {
*Alias
TimeString string `json:"time,omitempty"`
IntervalString string `json:"interval,omitempty"`
DelayString string `json:"delay,omitempty"`
EventFlagsString string `json:"eventFlags,omitempty"`
}{
Alias: (*Alias)(j),
TimeString: formatTime(j.Time),
IntervalString: formatDuration(j.Interval),
DelayString: formatDuration(j.Delay),
EventFlagsString: formatEventFlags(j.EventFlags),
})
}
func (sp *SchedulerPersistence) parseAndAssignFields(aux scheduleImportStruct) error {
sp.Type = aux.Type
sp.Spec = aux.Spec
sp.Event = EventName(aux.Event)
sp.Executed = aux.Executed
sp.Path = aux.Path
if aux.Time != nil && *aux.Time != "" {
var t time.Time
var err error
for _, format := range SupportedTimeFormats {
t, err = time.Parse(format, *aux.Time)
if err == nil {
break
func (j *SchedulerPersistence) UnmarshalJSON(data []byte) error {
type Alias SchedulerPersistence
aux := &struct {
*Alias
TimeString *string `json:"time,omitempty"`
IntervalString *string `json:"interval,omitempty"`
DelayString *string `json:"delay,omitempty"`
EventFlagsString *string `json:"eventFlags,omitempty"`
}{
Alias: (*Alias)(j),
}
if err := json.Unmarshal(data, aux); err != nil {
return err
}
if aux.TimeString != nil && *aux.TimeString != "" {
t, err := parseDateFormats(*aux.TimeString)
if err != nil {
return err
}
j.Time = &t
}
if aux.DelayString != nil && *aux.DelayString != "" {
d, err := time.ParseDuration(*aux.DelayString)
if err != nil {
return err
}
j.Delay = &d
}
if aux.IntervalString != nil && *aux.IntervalString != "" {
d, err := time.ParseDuration(*aux.IntervalString)
if err != nil {
return err
}
j.Interval = &d
}
if aux.DelayString != nil && *aux.DelayString != "" {
d, err := time.ParseDuration(*aux.DelayString)
if err != nil {
return err
}
j.Delay = &d
}
if aux.EventFlagsString != nil && *aux.EventFlagsString != "" {
j.EventFlags = fsnotify.Op(0)
for _, flag := range strings.Split(*aux.EventFlagsString, "|") {
switch flag {
case "Create":
j.EventFlags |= fsnotify.Create
case "Write":
j.EventFlags |= fsnotify.Write
case "Remove":
j.EventFlags |= fsnotify.Remove
case "Rename":
j.EventFlags |= fsnotify.Rename
case "Chmod":
j.EventFlags |= fsnotify.Chmod
}
}
}
return nil
}
func (sp *SchedulerPersistence) UnmarshalYAML(unmarshal func(interface{}) error) error {
type Alias SchedulerPersistence
aux := &struct {
*Alias `yaml:",inline"`
//TimeString *string `yaml:"time,omitempty"`
//IntervalString *string `yaml:"interval,omitempty"`
//DelayString *string `yaml:"delay,omitempty"`
//EventFlagsString *string `yaml:"eventFlags,omitempty"`
}{
Alias: (*Alias)(sp),
}
if err := unmarshal(aux); err != nil {
return err
}
if aux.TimeString != nil && *aux.TimeString != "" {
t, err := parseDateFormats(*aux.TimeString)
if err != nil {
return err
}
sp.Time = &t
}
if aux.Interval != nil && *aux.Interval != "" {
d, err := time.ParseDuration(*aux.Interval)
if aux.DelayString != nil && *aux.DelayString != "" {
d, err := time.ParseDuration(*aux.DelayString)
if err != nil {
return err
}
sp.Interval = d
sp.Delay = &d
}
if aux.Delay != nil && *aux.Delay != "" {
d, err := time.ParseDuration(*aux.Delay)
if aux.IntervalString != nil && *aux.IntervalString != "" {
d, err := time.ParseDuration(*aux.IntervalString)
if err != nil {
return err
}
sp.Delay = d
sp.Interval = &d
}
if aux.EventFlags != nil && *aux.EventFlags != "" {
if aux.DelayString != nil && *aux.DelayString != "" {
d, err := time.ParseDuration(*aux.DelayString)
if err != nil {
return err
}
sp.Delay = &d
}
if aux.EventFlagsString != nil && *aux.EventFlagsString != "" {
sp.EventFlags = fsnotify.Op(0)
for _, flag := range strings.Split(*aux.EventFlags, "|") {
for _, flag := range strings.Split(*aux.EventFlagsString, "|") {
switch flag {
case "Create":
sp.EventFlags |= fsnotify.Create
......@@ -101,26 +200,99 @@ func (sp *SchedulerPersistence) parseAndAssignFields(aux scheduleImportStruct) e
sp.EventFlags |= fsnotify.Rename
case "Chmod":
sp.EventFlags |= fsnotify.Chmod
}
}
}
return nil
}
// UnmarshalJSON implements the json.Unmarshaler interface
func (sp *SchedulerPersistence) UnmarshalJSON(data []byte) error {
var aux scheduleImportStruct
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
return sp.parseAndAssignFields(aux)
}
//type scheduleImportStruct struct {
// Time *string `yaml:"time,omitempty" json:"time,omitempty"`
// Interval *string `yaml:"interval,omitempty" json:"interval,omitempty"`
// Delay *string `yaml:"delay,omitempty" json:"delay,omitempty"`
// EventFlags *string `yaml:"eventFlags,omitempty" json:"eventFlags,omitempty"`
//
// Type string `yaml:"type" json:"type"`
// Spec string `yaml:"spec,omitempty" json:"spec,omitempty"`
// Event string `yaml:"event,omitempty" json:"event,omitempty"`
// Executed bool `yaml:"executed,omitempty" json:"executed,omitempty"`
// Path string `yaml:"path,omitempty" json:"path,omitempty"`
//}
//
//func (sp *SchedulerPersistence) parseAndAssignFields(aux scheduleImportStruct) error {
//
// sp.Type = aux.Type
// sp.Spec = aux.Spec
// sp.Event = EventName(aux.Event)
// sp.Executed = aux.Executed
// sp.Path = aux.Path
//
// if aux.Time != nil && *aux.Time != "" {
// var t time.Time
// var err error
// for _, format := range SupportedTimeFormats {
// t, err = time.Parse(format, *aux.Time)
// if err == nil {
// break
// }
// }
// if err != nil {
// return err
// }
// sp.Time = &t
// }
//
// if aux.Interval != nil && *aux.Interval != "" {
// d, err := time.ParseDuration(*aux.Interval)
// if err != nil {
// return err
// }
// sp.Interval = d
// }
//
// if aux.Delay != nil && *aux.Delay != "" {
// d, err := time.ParseDuration(*aux.Delay)
// if err != nil {
// return err
// }
// sp.Delay = d
// }
//
// if aux.EventFlags != nil && *aux.EventFlags != "" {
// sp.EventFlags = fsnotify.Op(0)
// for _, flag := range strings.Split(*aux.EventFlags, "|") {
// switch flag {
// case "Create":
// sp.EventFlags |= fsnotify.Create
// case "Write":
// sp.EventFlags |= fsnotify.Write
// case "Remove":
// sp.EventFlags |= fsnotify.Remove
// case "Rename":
// sp.EventFlags |= fsnotify.Rename
// case "Chmod":
// sp.EventFlags |= fsnotify.Chmod
// }
// }
// }
//
// return nil
//}
func (sp *SchedulerPersistence) UnmarshalYAML(unmarshal func(interface{}) error) error {
var aux scheduleImportStruct
if err := unmarshal(&aux); err != nil {
return err
}
return sp.parseAndAssignFields(aux)
}
//// UnmarshalJSON implements the json.Unmarshaler interface
//func (sp *SchedulerPersistence) UnmarshalJSON(data []byte) error {
// var aux scheduleImportStruct
// if err := json.Unmarshal(data, &aux); err != nil {
// return err
// }
// return sp.parseAndAssignFields(aux)
//}
//
//func (sp *SchedulerPersistence) UnmarshalYAML(unmarshal func(interface{}) error) error {
// var aux scheduleImportStruct
// if err := unmarshal(&aux); err != nil {
// return err
// }
// return sp.parseAndAssignFields(aux)
//}
......@@ -429,7 +429,7 @@ time: "2023-12-15T12:00:00Z"
expectedTime, _ := time.Parse(time.RFC3339, "2023-12-15T12:00:00Z")
assert.Equal(t, "interval", sp.Type, "Type should be unmarshalled correctly")
assert.Equal(t, expectedInterval, sp.Interval, "Interval should be unmarshalled correctly")
assert.Equal(t, &expectedInterval, sp.Interval, "Interval should be unmarshalled correctly")
assert.Equal(t, &expectedTime, sp.Time, "Time should be unmarshalled correctly")
}
......@@ -446,5 +446,5 @@ interval: "1m1s"
expectedInterval, _ := time.ParseDuration("1m1s")
assert.Equal(t, "Interval", sp.Type, "Type should be unmarshalled correctly")
assert.Equal(t, expectedInterval, sp.Interval, "Interval should be unmarshalled correctly")
assert.Equal(t, &expectedInterval, sp.Interval, "Interval should be unmarshalled correctly")
}