Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
Loading items

Target

Select target project
  • oss/libraries/go/services/job-queues
1 result
Select Git revision
Loading items
Show changes
Showing with 726 additions and 2 deletions
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue package jobqueue
import ( import (
......
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue package jobqueue
import ( import (
......
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue package jobqueue
import ( import (
......
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue package jobqueue
import ( import (
......
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue package jobqueue
import ( import (
......
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue package jobqueue
import ( import (
......
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue package jobqueue
import ( import (
......
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
//go:build !bench && !race
package jobqueue package jobqueue
import ( import (
......
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue package jobqueue
import ( import (
......
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
//go:build !bench && !race && !runOnTask
// the creation of the container is not working on the CI server
// nor on the task command. use this test manually to test the
// sftp functionality
package jobqueue package jobqueue
import ( import (
...@@ -302,7 +311,7 @@ func TestSFTPCRunnerRemoteToLocal(t *testing.T) { ...@@ -302,7 +311,7 @@ func TestSFTPCRunnerRemoteToLocal(t *testing.T) {
select { select {
case <-done: case <-done:
time.Sleep(1 * time.Second) time.Sleep(5 * time.Second)
case <-time.After(1 * time.Minute): case <-time.After(1 * time.Minute):
t.Error("test hangs, timeout reached") t.Error("test hangs, timeout reached")
} }
......
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue package jobqueue
import ( import (
...@@ -117,7 +120,7 @@ func (s *ShellRunnable) GetType() string { ...@@ -117,7 +120,7 @@ func (s *ShellRunnable) GetType() string {
func (c *ShellRunnable) GetPersistence() RunnableImport { func (c *ShellRunnable) GetPersistence() RunnableImport {
data := JSONMap{ data := JSONMap{
"script_path": c.ScriptPath, "scriptPath": c.ScriptPath,
"script": c.Script, "script": c.Script,
} }
......
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue package jobqueue
import ( import (
......
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue package jobqueue
import ( import (
......
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue package jobqueue
import ( import (
......
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
"fmt"
"github.com/robfig/cron/v3"
)
// CronScheduler is a scheduler that uses the cron library to schedule jobs
type CronScheduler struct {
cron *cron.Cron
Spec string
jobs map[JobID]cron.EntryID
}
func (s *CronScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
var err error
if s.cron == nil {
return ErrCronNotInitialized
}
if s.jobs == nil {
s.jobs = make(map[JobID]cron.EntryID)
}
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
entryId, err := s.cron.AddFunc(s.Spec, func() {
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
}
})
s.jobs[id] = entryId
if err != nil {
return err
}
return nil
}
func (s *CronScheduler) GetType() string {
return "Cron"
}
func (s *CronScheduler) IsAdHoc() bool {
return false
}
func (s *CronScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if entryId, ok := s.jobs[id]; ok {
s.cron.Remove(entryId)
}
return nil
}
func (s *CronScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, entryId := range s.jobs {
s.cron.Remove(entryId)
}
s.jobs = nil
return nil
}
func (s *CronScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
func (s *CronScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
Spec: s.Spec,
}
}
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
"fmt"
"time"
)
// DelayScheduler is a scheduler that schedules a job after a delay
type DelayScheduler struct {
Delay time.Duration
jobs map[JobID]StopChan
}
func (s *DelayScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
if s.jobs == nil {
s.jobs = make(map[JobID]StopChan)
}
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
stopChan := make(StopChan)
s.jobs[id] = stopChan
timer := time.NewTimer(s.Delay)
go func() {
select {
case <-timer.C:
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
}
case <-stopChan:
timer.Stop()
return
}
}()
return nil
}
func (s *DelayScheduler) GetType() string {
return "Delay"
}
func (s *DelayScheduler) IsAdHoc() bool {
return true
}
func (s *DelayScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if stopChan, ok := s.jobs[id]; ok {
select {
case stopChan <- true:
default:
}
delete(s.jobs, id)
}
return nil
}
func (s *DelayScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, stopChan := range s.jobs {
select {
case stopChan <- true:
default:
}
}
s.jobs = nil
return nil
}
func (s *DelayScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
func (s *DelayScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
Delay: s.Delay,
}
}
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
"fmt"
"time"
)
// IntervalScheduler is a scheduler that schedules a job at a fixed interval
type IntervalScheduler struct {
Interval time.Duration
jobs map[JobID]StopChan
}
func (s *IntervalScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
if s.Interval <= 0 {
return fmt.Errorf("invalid interval: %v", s.Interval)
}
if s.jobs == nil {
s.jobs = make(map[JobID]StopChan)
}
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
stopChan := make(StopChan)
s.jobs[id] = stopChan
ticker := time.NewTicker(s.Interval)
go func() {
for {
select {
case <-ticker.C:
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
}
case <-stopChan:
ticker.Stop()
return
}
}
}()
return nil
}
func (s *IntervalScheduler) GetType() string {
return "Interval"
}
func (s *IntervalScheduler) IsAdHoc() bool {
return false
}
func (s *IntervalScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if stopChan, ok := s.jobs[id]; ok {
select {
case stopChan <- true:
default:
}
delete(s.jobs, id)
}
return nil
}
func (s *IntervalScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, stopChan := range s.jobs {
select {
case stopChan <- true:
default:
}
}
s.jobs = nil
return nil
}
func (s *IntervalScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
func (s *IntervalScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
Interval: s.Interval,
}
}
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
"fmt"
"time"
)
// TimeScheduler is a scheduler that schedules at a specific time
type TimeScheduler struct {
Time time.Time
jobs map[JobID]StopChan
executed bool
}
func (s *TimeScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
if s.executed {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, job.GetID())
}
if s.Time.Before(time.Now()) {
return fmt.Errorf("%w: scheduled time is in the past", ErrInvalidTime)
}
if s.jobs == nil {
s.jobs = make(map[JobID]StopChan)
}
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
stopChan := make(StopChan)
s.jobs[id] = stopChan
timer := time.NewTimer(s.Time.Sub(time.Now()))
go func() {
select {
case <-timer.C:
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
s.executed = true
} else {
timer.Stop()
stopChan <- true
}
case <-stopChan:
timer.Stop()
return
}
}()
return nil
}
func (s *TimeScheduler) GetType() string {
return "Time"
}
func (s *TimeScheduler) IsAdHoc() bool {
return false
}
func (s *TimeScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if stopChan, ok := s.jobs[id]; ok {
select {
case stopChan <- true:
default:
}
delete(s.jobs, id)
}
return nil
}
func (s *TimeScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, stopChan := range s.jobs {
select {
case stopChan <- true:
default:
}
}
s.jobs = nil
return nil
}
func (s *TimeScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
func (s *TimeScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
Time: &s.Time,
Executed: s.executed,
}
}
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import "fmt"
// EventScheduler is a scheduler that schedules a job when an event is received
type EventScheduler struct {
Event EventName
EventBus *EventBus
jobs map[JobID]StopChan
}
func (s *EventScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
ch := make(chan interface{})
if s.EventBus != nil {
s.EventBus.Subscribe(s.Event, ch)
} else {
eventBus.Subscribe(s.Event, ch)
}
if s.jobs == nil {
s.jobs = make(map[JobID]StopChan)
}
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
stopChan := make(StopChan)
s.jobs[id] = stopChan
go func() {
for {
select {
case <-ch:
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
}
case <-stopChan:
eventBus.Unsubscribe(s.Event, ch)
return
}
}
}()
return nil
}
func (s *EventScheduler) GetType() string {
return "Event"
}
func (s *EventScheduler) IsAdHoc() bool {
return false
}
func (s *EventScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if stopChan, ok := s.jobs[id]; ok {
select {
case stopChan <- true:
default:
}
delete(s.jobs, id)
}
return nil
}
func (s *EventScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, stopChan := range s.jobs {
select {
case stopChan <- true:
default:
}
}
s.jobs = nil
return nil
}
func (s *EventScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
func (s *EventScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
Event: s.Event,
}
}
// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
"fmt"
"github.com/fsnotify/fsnotify"
"os"
)
// InotifyScheduler is a scheduler that schedules a job when a file changes
type InotifyScheduler struct {
Path string
EventFlags fsnotify.Op
jobs map[JobID]StopChan
}
func (s *InotifyScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
if s.EventFlags == 0 {
return fmt.Errorf("event flags are empty, at least one flag must be set. Valid flags are: 1 (Create), 2 (Write), 4 (Remove), 8 (Rename), 16 (Chmod)")
}
if s.Path == "" {
return fmt.Errorf("path is empty")
}
// exists and a directory
if fi, err := os.Stat(s.Path); err != nil || !fi.IsDir() {
return fmt.Errorf("path %s does not exist or is not a directory", s.Path)
}
if s.jobs == nil {
s.jobs = make(map[JobID]StopChan)
}
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
stopChan := make(StopChan)
s.jobs[id] = stopChan
watcher, err := fsnotify.NewWatcher()
if err != nil {
return err
}
go func() {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
continue
}
if event.Op&s.EventFlags != 0 {
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
}
}
case _, _ = <-watcher.Errors:
case <-stopChan:
_ = watcher.Close()
return
}
}
}()
err = watcher.Add(s.Path)
if err != nil {
select {
case stopChan <- true:
default:
}
return err
}
return nil
}
func (s *InotifyScheduler) GetType() string {
return "Inotify"
}
func (s *InotifyScheduler) IsAdHoc() bool {
return false
}
func (s *InotifyScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if stopChan, ok := s.jobs[id]; ok {
select {
case stopChan <- true:
default:
}
delete(s.jobs, id)
}
return nil
}
func (s *InotifyScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, stopChan := range s.jobs {
select {
case stopChan <- true:
default:
}
}
s.jobs = nil
return nil
}
func (s *InotifyScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
func (s *InotifyScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
Path: s.Path,
EventFlags: s.EventFlags,
}
}