Skip to content
Snippets Groups Projects
Verified Commit 1a93bb54 authored by Volker Schukai's avatar Volker Schukai :alien:
Browse files

feat: new fsnotify scheduler #33

parent 9d774f0a
No related branches found
No related tags found
No related merge requests found
module gitlab.schukai.com/oss/libraries/go/services/job-queues module gitlab.schukai.com/oss/libraries/go/services/job-queues
go 1.20 go 1.21
require ( require (
github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/docker/docker v24.0.6+incompatible github.com/docker/docker v24.0.6+incompatible
github.com/docker/go-connections v0.4.0 github.com/docker/go-connections v0.4.0
github.com/fsnotify/fsnotify v1.7.0
github.com/go-chi/chi/v5 v5.0.10 github.com/go-chi/chi/v5 v5.0.10
github.com/google/uuid v1.4.0 github.com/google/uuid v1.4.0
github.com/pkg/sftp v1.13.6 github.com/pkg/sftp v1.13.6
...@@ -13,7 +14,7 @@ require ( ...@@ -13,7 +14,7 @@ require (
github.com/shirou/gopsutil/v3 v3.23.10 github.com/shirou/gopsutil/v3 v3.23.10
github.com/stretchr/testify v1.8.4 github.com/stretchr/testify v1.8.4
go.uber.org/zap v1.26.0 go.uber.org/zap v1.26.0
golang.org/x/crypto v0.14.0 golang.org/x/crypto v0.16.0
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
gorm.io/driver/mysql v1.5.2 gorm.io/driver/mysql v1.5.2
gorm.io/gorm v1.25.5 gorm.io/gorm v1.25.5
...@@ -46,7 +47,7 @@ require ( ...@@ -46,7 +47,7 @@ require (
go.uber.org/multierr v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect
golang.org/x/mod v0.8.0 // indirect golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.10.0 // indirect golang.org/x/net v0.10.0 // indirect
golang.org/x/sys v0.14.0 // indirect golang.org/x/sys v0.15.0 // indirect
golang.org/x/time v0.3.0 // indirect golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.6.0 // indirect golang.org/x/tools v0.6.0 // indirect
gotest.tools/v3 v3.5.1 // indirect gotest.tools/v3 v3.5.1 // indirect
......
...@@ -16,6 +16,8 @@ github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKoh ...@@ -16,6 +16,8 @@ github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKoh
github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/go-chi/chi/v5 v5.0.10 h1:rLz5avzKpjqxrYwXNfmjkrYYXOyLJd37pz53UFHC6vk= github.com/go-chi/chi/v5 v5.0.10 h1:rLz5avzKpjqxrYwXNfmjkrYYXOyLJd37pz53UFHC6vk=
github.com/go-chi/chi/v5 v5.0.10/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= github.com/go-chi/chi/v5 v5.0.10/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
...@@ -98,6 +100,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y ...@@ -98,6 +100,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY=
golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
...@@ -134,10 +138,13 @@ golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= ...@@ -134,10 +138,13 @@ golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek=
golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
......
...@@ -2,6 +2,7 @@ package jobqueue ...@@ -2,6 +2,7 @@ package jobqueue
import ( import (
"fmt" "fmt"
"github.com/fsnotify/fsnotify"
"github.com/robfig/cron/v3" "github.com/robfig/cron/v3"
"gorm.io/gorm" "gorm.io/gorm"
"sync" "sync"
...@@ -52,12 +53,14 @@ func NewManager() *Manager { ...@@ -52,12 +53,14 @@ func NewManager() *Manager {
} }
// GetEventBus returns the event bus
func (m *Manager) GetEventBus() *EventBus { func (m *Manager) GetEventBus() *EventBus {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
return m.eventBus return m.eventBus
} }
// SetCronInstance sets the cron instance
func (m *Manager) SetCronInstance(cronInstance *cron.Cron) *Manager { func (m *Manager) SetCronInstance(cronInstance *cron.Cron) *Manager {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
...@@ -65,12 +68,14 @@ func (m *Manager) SetCronInstance(cronInstance *cron.Cron) *Manager { ...@@ -65,12 +68,14 @@ func (m *Manager) SetCronInstance(cronInstance *cron.Cron) *Manager {
return m return m
} }
// GetCronInstance returns the cron instance
func (m *Manager) GetCronInstance() *cron.Cron { func (m *Manager) GetCronInstance() *cron.Cron {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
return m.cronInstance return m.cronInstance
} }
// NewCronScheduler creates a new cron scheduler
func (m *Manager) NewCronScheduler(spec string) *CronScheduler { func (m *Manager) NewCronScheduler(spec string) *CronScheduler {
return &CronScheduler{ return &CronScheduler{
Spec: spec, Spec: spec,
...@@ -78,28 +83,47 @@ func (m *Manager) NewCronScheduler(spec string) *CronScheduler { ...@@ -78,28 +83,47 @@ func (m *Manager) NewCronScheduler(spec string) *CronScheduler {
} }
} }
// NewInstantScheduler creates a new instant scheduler
func (m *Manager) NewInstantScheduler() *InstantScheduler { func (m *Manager) NewInstantScheduler() *InstantScheduler {
return &InstantScheduler{} return &InstantScheduler{}
} }
// NewIntervalScheduler creates a new interval scheduler
func (m *Manager) NewIntervalScheduler(interval time.Duration) *IntervalScheduler { func (m *Manager) NewIntervalScheduler(interval time.Duration) *IntervalScheduler {
return &IntervalScheduler{ return &IntervalScheduler{
Interval: interval, Interval: interval,
} }
} }
// NewDelayScheduler creates a new delay scheduler
func (m *Manager) NewDelayScheduler(delay time.Duration) *DelayScheduler { func (m *Manager) NewDelayScheduler(delay time.Duration) *DelayScheduler {
return &DelayScheduler{ return &DelayScheduler{
Delay: delay, Delay: delay,
} }
} }
// NewEventScheduler creates a new event scheduler
func (m *Manager) NewEventScheduler(event EventName) *EventScheduler { func (m *Manager) NewEventScheduler(event EventName) *EventScheduler {
return &EventScheduler{ return &EventScheduler{
Event: event, Event: event,
} }
} }
// NewTimeScheduler creates a new time scheduler
func (m *Manager) NewTimeScheduler(t time.Time) *TimeScheduler {
return &TimeScheduler{
Time: t,
}
}
// NewInotifyScheduler creates a new inotify scheduler
func (m *Manager) NewInotifyScheduler(path string, eventFlags fsnotify.Op) *InotifyScheduler {
return &InotifyScheduler{
Path: path,
EventFlags: eventFlags,
}
}
// GetActiveJobs returns the active jobs // GetActiveJobs returns the active jobs
func (m *Manager) GetActiveJobs() map[JobID]GenericJob { func (m *Manager) GetActiveJobs() map[JobID]GenericJob {
m.mu.Lock() m.mu.Lock()
...@@ -189,38 +213,35 @@ func (m *Manager) ResetJobStats(id JobID) error { ...@@ -189,38 +213,35 @@ func (m *Manager) ResetJobStats(id JobID) error {
return nil return nil
} }
func (m *Manager) removeJobInternal(id JobID) error { type CancelScheduler interface {
Cancel(id JobID) error
}
scheduler := m.activeJobs[id].GetScheduler() func cancelSchedulerByManager(scheduler CancelScheduler, id JobID) error {
switch scheduler.(type) { err := scheduler.Cancel(id)
case *CronScheduler: if err != nil {
if err := scheduler.(*CronScheduler).Cancel(id); err != nil {
return err
}
case *DelayScheduler:
if err := scheduler.(*DelayScheduler).Cancel(id); err != nil {
return err
}
case *EventScheduler:
if err := scheduler.(*EventScheduler).Cancel(id); err != nil {
return err return err
} }
case *InstantScheduler:
if err := scheduler.(*InstantScheduler).Cancel(id); err != nil { return nil
return err
} }
case *IntervalScheduler:
if err := scheduler.(*IntervalScheduler).Cancel(id); err != nil { func (m *Manager) removeJobInternal(id JobID) error {
return err
scheduler := m.activeJobs[id].GetScheduler()
if scheduler == nil {
return ErrJobNotScheduled
} }
case *TimeScheduler:
if err := scheduler.(*TimeScheduler).Cancel(id); err != nil { if cancelScheduler, ok := scheduler.(CancelScheduler); ok {
err := cancelSchedulerByManager(cancelScheduler, id)
if err != nil {
return err return err
} }
default: } else {
return ErrUnknownScheduleType return ErrUnknownScheduleType
} }
delete(m.activeJobs, id) delete(m.activeJobs, id)
......
...@@ -2,14 +2,26 @@ package jobqueue ...@@ -2,14 +2,26 @@ package jobqueue
import ( import (
"sync" "sync"
"time"
) )
// MaxAge is the maximum age of a job in the processed jobs map
const MaxAge = 24 * time.Hour
// MaxProcessedJobs is the maximum number of jobs in the processed jobs map
const MaxProcessedJobs = 50000
type ProcessedJobInfo struct {
ProcessedTime time.Time
ID JobID
}
// Queue is a job queue // Queue is a job queue
type Queue struct { type Queue struct {
jobMap map[JobID]GenericJob jobMap map[JobID]GenericJob
pendingDependencies map[JobID][]JobID pendingDependencies map[JobID][]JobID
readyQueue []GenericJob readyQueue []GenericJob
processedJobs map[JobID]struct{} processedJobs []ProcessedJobInfo
eventBus *EventBus eventBus *EventBus
mu sync.Mutex mu sync.Mutex
manger *Manager manger *Manager
...@@ -21,7 +33,7 @@ func NewQueue(EventBus *EventBus) *Queue { ...@@ -21,7 +33,7 @@ func NewQueue(EventBus *EventBus) *Queue {
jobMap: make(map[JobID]GenericJob), jobMap: make(map[JobID]GenericJob),
pendingDependencies: make(map[JobID][]JobID), pendingDependencies: make(map[JobID][]JobID),
readyQueue: []GenericJob{}, readyQueue: []GenericJob{},
processedJobs: make(map[JobID]struct{}), processedJobs: []ProcessedJobInfo{},
eventBus: EventBus, eventBus: EventBus,
} }
} }
...@@ -85,7 +97,8 @@ func (q *Queue) Enqueue(job GenericJob) error { ...@@ -85,7 +97,8 @@ func (q *Queue) Enqueue(job GenericJob) error {
fullJobList = append(fullJobList, job) fullJobList = append(fullJobList, job)
} }
for id := range q.processedJobs { for i := range q.processedJobs {
id := q.processedJobs[i].ID
fullJobList = append(fullJobList, q.jobMap[id]) fullJobList = append(fullJobList, q.jobMap[id])
} }
...@@ -117,6 +130,40 @@ func (q *Queue) Enqueue(job GenericJob) error { ...@@ -117,6 +130,40 @@ func (q *Queue) Enqueue(job GenericJob) error {
return nil return nil
} }
// ClearProcessedJobs removes old jobs from the processed jobs map
func (q *Queue) ClearProcessedJobs() {
q.mu.Lock()
defer q.mu.Unlock()
// remove old jobs
currentTime := time.Now()
cutoffIndex := 0
for i, jobInfo := range q.processedJobs {
if currentTime.Sub(jobInfo.ProcessedTime) <= MaxAge {
cutoffIndex = i
break
}
}
q.processedJobs = q.processedJobs[cutoffIndex:]
// if the processed jobs map is too large, remove the oldest jobs
if len(q.processedJobs) > MaxProcessedJobs {
startIdx := len(q.processedJobs) - MaxProcessedJobs
q.processedJobs = q.processedJobs[startIdx:]
}
}
func (q *Queue) isDependency(id JobID) bool {
for _, deps := range q.pendingDependencies {
for _, depID := range deps {
if depID == id {
return true
}
}
}
return false
}
// Dequeue removes a job from the queue // Dequeue removes a job from the queue
func (q *Queue) Dequeue() (GenericJob, error) { func (q *Queue) Dequeue() (GenericJob, error) {
q.mu.Lock() q.mu.Lock()
...@@ -130,7 +177,10 @@ func (q *Queue) Dequeue() (GenericJob, error) { ...@@ -130,7 +177,10 @@ func (q *Queue) Dequeue() (GenericJob, error) {
q.readyQueue = q.readyQueue[1:] q.readyQueue = q.readyQueue[1:]
// Mark the job as processed but keep it in the jobMap for dependency resolution // Mark the job as processed but keep it in the jobMap for dependency resolution
q.processedJobs[job.GetID()] = struct{}{} q.processedJobs = append(q.processedJobs, ProcessedJobInfo{
ProcessedTime: time.Now(),
ID: job.GetID(),
})
return job, nil return job, nil
} }
......
...@@ -145,11 +145,14 @@ func TestProcessedJobs(t *testing.T) { ...@@ -145,11 +145,14 @@ func TestProcessedJobs(t *testing.T) {
t.Fatalf("Dequeue failed: %v", err) t.Fatalf("Dequeue failed: %v", err)
} }
if _, exists := q.processedJobs[job1.GetID()]; !exists { for _, jobInfo := range q.processedJobs {
t.Fatalf("Job 1 not marked as processed") if jobInfo.ID == job1.GetID() {
t.Fatalf("Job 1 should not be in processedJobs")
} }
} }
}
func TestCyclicDependencies(t *testing.T) { func TestCyclicDependencies(t *testing.T) {
runner := &DummyRunnable{} runner := &DummyRunnable{}
q := NewQueue(nil) q := NewQueue(nil)
......
...@@ -117,7 +117,7 @@ func (s *ShellRunnable) GetType() string { ...@@ -117,7 +117,7 @@ func (s *ShellRunnable) GetType() string {
func (c *ShellRunnable) GetPersistence() RunnableImport { func (c *ShellRunnable) GetPersistence() RunnableImport {
data := JSONMap{ data := JSONMap{
"script_path": c.ScriptPath, "scriptPath": c.ScriptPath,
"script": c.Script, "script": c.Script,
} }
......
package jobqueue
import (
"fmt"
"github.com/robfig/cron/v3"
)
// CronScheduler is a scheduler that uses the cron library to schedule jobs
type CronScheduler struct {
cron *cron.Cron
Spec string
jobs map[JobID]cron.EntryID
}
func (s *CronScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
var err error
if s.cron == nil {
return ErrCronNotInitialized
}
if s.jobs == nil {
s.jobs = make(map[JobID]cron.EntryID)
}
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
entryId, err := s.cron.AddFunc(s.Spec, func() {
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
}
})
s.jobs[id] = entryId
if err != nil {
return err
}
return nil
}
func (s *CronScheduler) GetType() string {
return "Cron"
}
func (s *CronScheduler) IsAdHoc() bool {
return false
}
func (s *CronScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if entryId, ok := s.jobs[id]; ok {
s.cron.Remove(entryId)
}
return nil
}
func (s *CronScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, entryId := range s.jobs {
s.cron.Remove(entryId)
}
s.jobs = nil
return nil
}
func (s *CronScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
func (s *CronScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
Spec: s.Spec,
}
}
package jobqueue
import (
"fmt"
"time"
)
// DelayScheduler is a scheduler that schedules a job after a delay
type DelayScheduler struct {
Delay time.Duration
jobs map[JobID]StopChan
}
func (s *DelayScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
if s.jobs == nil {
s.jobs = make(map[JobID]StopChan)
}
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
stopChan := make(StopChan)
s.jobs[id] = stopChan
timer := time.NewTimer(s.Delay)
go func() {
select {
case <-timer.C:
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
}
case <-stopChan:
timer.Stop()
return
}
}()
return nil
}
func (s *DelayScheduler) GetType() string {
return "Delay"
}
func (s *DelayScheduler) IsAdHoc() bool {
return true
}
func (s *DelayScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if stopChan, ok := s.jobs[id]; ok {
select {
case stopChan <- true:
default:
}
delete(s.jobs, id)
}
return nil
}
func (s *DelayScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, stopChan := range s.jobs {
select {
case stopChan <- true:
default:
}
}
s.jobs = nil
return nil
}
func (s *DelayScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
func (s *DelayScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
Delay: s.Delay,
}
}
package jobqueue
import (
"fmt"
"time"
)
// IntervalScheduler is a scheduler that schedules a job at a fixed interval
type IntervalScheduler struct {
Interval time.Duration
jobs map[JobID]StopChan
}
func (s *IntervalScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
if s.Interval <= 0 {
return fmt.Errorf("invalid interval: %v", s.Interval)
}
if s.jobs == nil {
s.jobs = make(map[JobID]StopChan)
}
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
stopChan := make(StopChan)
s.jobs[id] = stopChan
ticker := time.NewTicker(s.Interval)
go func() {
for {
select {
case <-ticker.C:
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
}
case <-stopChan:
ticker.Stop()
return
}
}
}()
return nil
}
func (s *IntervalScheduler) GetType() string {
return "Interval"
}
func (s *IntervalScheduler) IsAdHoc() bool {
return false
}
func (s *IntervalScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if stopChan, ok := s.jobs[id]; ok {
select {
case stopChan <- true:
default:
}
delete(s.jobs, id)
}
return nil
}
func (s *IntervalScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, stopChan := range s.jobs {
select {
case stopChan <- true:
default:
}
}
s.jobs = nil
return nil
}
func (s *IntervalScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
func (s *IntervalScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
Interval: s.Interval,
}
}
package jobqueue
import (
"fmt"
"time"
)
// TimeScheduler is a scheduler that schedules at a specific time
type TimeScheduler struct {
Time time.Time
jobs map[JobID]StopChan
executed bool
}
func (s *TimeScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
if s.executed {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, job.GetID())
}
if s.Time.Before(time.Now()) {
return fmt.Errorf("%w: scheduled time is in the past", ErrInvalidTime)
}
if s.jobs == nil {
s.jobs = make(map[JobID]StopChan)
}
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
stopChan := make(StopChan)
s.jobs[id] = stopChan
timer := time.NewTimer(s.Time.Sub(time.Now()))
go func() {
select {
case <-timer.C:
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
s.executed = true
} else {
timer.Stop()
stopChan <- true
}
case <-stopChan:
timer.Stop()
return
}
}()
return nil
}
func (s *TimeScheduler) GetType() string {
return "Time"
}
func (s *TimeScheduler) IsAdHoc() bool {
return false
}
func (s *TimeScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if stopChan, ok := s.jobs[id]; ok {
select {
case stopChan <- true:
default:
}
delete(s.jobs, id)
}
return nil
}
func (s *TimeScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, stopChan := range s.jobs {
select {
case stopChan <- true:
default:
}
}
s.jobs = nil
return nil
}
func (s *TimeScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
func (s *TimeScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
Time: &s.Time,
Executed: s.executed,
}
}
package jobqueue
import "fmt"
// EventScheduler is a scheduler that schedules a job when an event is received
type EventScheduler struct {
Event EventName
EventBus *EventBus
jobs map[JobID]StopChan
}
func (s *EventScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
ch := make(chan interface{})
if s.EventBus != nil {
s.EventBus.Subscribe(s.Event, ch)
} else {
eventBus.Subscribe(s.Event, ch)
}
if s.jobs == nil {
s.jobs = make(map[JobID]StopChan)
}
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
stopChan := make(StopChan)
s.jobs[id] = stopChan
go func() {
for {
select {
case <-ch:
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
}
case <-stopChan:
eventBus.Unsubscribe(s.Event, ch)
return
}
}
}()
return nil
}
func (s *EventScheduler) GetType() string {
return "Event"
}
func (s *EventScheduler) IsAdHoc() bool {
return false
}
func (s *EventScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if stopChan, ok := s.jobs[id]; ok {
select {
case stopChan <- true:
default:
}
delete(s.jobs, id)
}
return nil
}
func (s *EventScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, stopChan := range s.jobs {
select {
case stopChan <- true:
default:
}
}
s.jobs = nil
return nil
}
func (s *EventScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
func (s *EventScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
Event: s.Event,
}
}
package jobqueue
import (
"fmt"
"github.com/fsnotify/fsnotify"
"os"
)
// InotifyScheduler is a scheduler that schedules a job when a file changes
type InotifyScheduler struct {
Path string
EventFlags fsnotify.Op
jobs map[JobID]StopChan
}
func (s *InotifyScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
if s.EventFlags == 0 {
return fmt.Errorf("event flags are empty, at least one flag must be set. Valid flags are: 1 (Create), 2 (Write), 4 (Remove), 8 (Rename), 16 (Chmod)")
}
if s.Path == "" {
return fmt.Errorf("path is empty")
}
// exists and a directory
if fi, err := os.Stat(s.Path); err != nil || !fi.IsDir() {
return fmt.Errorf("path %s does not exist or is not a directory", s.Path)
}
if s.jobs == nil {
s.jobs = make(map[JobID]StopChan)
}
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
stopChan := make(StopChan)
s.jobs[id] = stopChan
watcher, err := fsnotify.NewWatcher()
if err != nil {
return err
}
go func() {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
continue
}
if event.Op&s.EventFlags != 0 {
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
}
}
case _, _ = <-watcher.Errors:
case <-stopChan:
_ = watcher.Close()
return
}
}
}()
err = watcher.Add(s.Path)
if err != nil {
select {
case stopChan <- true:
default:
}
return err
}
return nil
}
func (s *InotifyScheduler) GetType() string {
return "Inotify"
}
func (s *InotifyScheduler) IsAdHoc() bool {
return false
}
func (s *InotifyScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if stopChan, ok := s.jobs[id]; ok {
select {
case stopChan <- true:
default:
}
delete(s.jobs, id)
}
return nil
}
func (s *InotifyScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, stopChan := range s.jobs {
select {
case stopChan <- true:
default:
}
}
s.jobs = nil
return nil
}
func (s *InotifyScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
func (s *InotifyScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
Path: s.Path,
EventFlags: s.EventFlags,
}
}
package jobqueue
// InstantScheduler is a scheduler that schedules a job instantly
type InstantScheduler struct{}
func (s *InstantScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
}
return nil
}
func (s *InstantScheduler) GetType() string {
return "Instant"
}
func (s *InstantScheduler) IsAdHoc() bool {
return true
}
func (s *InstantScheduler) Cancel(id JobID) error {
return nil
}
func (s *InstantScheduler) CancelAll() error {
return nil
}
func (s *InstantScheduler) JobExists(id JobID) bool {
return false
}
func (s *InstantScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
}
}
...@@ -2,8 +2,7 @@ package jobqueue ...@@ -2,8 +2,7 @@ package jobqueue
import ( import (
"encoding/json" "encoding/json"
"fmt" "github.com/fsnotify/fsnotify"
"github.com/robfig/cron/v3"
"time" "time"
) )
...@@ -30,6 +29,8 @@ type SchedulerPersistence struct { ...@@ -30,6 +29,8 @@ type SchedulerPersistence struct {
Event EventName `yaml:"event,omitempty" json:"event,omitempty" gorm:"column:event"` Event EventName `yaml:"event,omitempty" json:"event,omitempty" gorm:"column:event"`
Time *time.Time `yaml:"time,omitempty" json:"time,omitempty" gorm:"column:time"` Time *time.Time `yaml:"time,omitempty" json:"time,omitempty" gorm:"column:time"`
Executed bool `yaml:"executed,omitempty" json:"executed,omitempty" gorm:"column:executed"` Executed bool `yaml:"executed,omitempty" json:"executed,omitempty" gorm:"column:executed"`
Path string `yaml:"path,omitempty" json:"path,omitempty" gorm:"column:path"`
EventFlags fsnotify.Op `yaml:"eventFlags,omitempty" json:"eventFlags,omitempty" gorm:"column:eventFlags"`
} }
// UnmarshalJSON implements the json.Unmarshaler interface // UnmarshalJSON implements the json.Unmarshaler interface
...@@ -64,529 +65,3 @@ func (sp *SchedulerPersistence) UnmarshalJSON(data []byte) error { ...@@ -64,529 +65,3 @@ func (sp *SchedulerPersistence) UnmarshalJSON(data []byte) error {
return nil return nil
} }
// IntervalScheduler is a scheduler that schedules a job at a fixed interval
type IntervalScheduler struct {
Interval time.Duration
jobs map[JobID]StopChan
}
func (s *IntervalScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
if s.Interval <= 0 {
return fmt.Errorf("invalid interval: %v", s.Interval)
}
if s.jobs == nil {
s.jobs = make(map[JobID]StopChan)
}
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
stopChan := make(StopChan)
s.jobs[id] = stopChan
ticker := time.NewTicker(s.Interval)
go func() {
for {
select {
case <-ticker.C:
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
}
case <-stopChan:
ticker.Stop()
return
}
}
}()
return nil
}
func (s *IntervalScheduler) GetType() string {
return "Interval"
}
func (s *IntervalScheduler) IsAdHoc() bool {
return false
}
func (s *IntervalScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if stopChan, ok := s.jobs[id]; ok {
select {
case stopChan <- true:
default:
}
delete(s.jobs, id)
}
return nil
}
func (s *IntervalScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, stopChan := range s.jobs {
select {
case stopChan <- true:
default:
}
}
s.jobs = nil
return nil
}
func (s *IntervalScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
func (s *IntervalScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
Interval: s.Interval,
}
}
// CronScheduler is a scheduler that uses the cron library to schedule jobs
type CronScheduler struct {
cron *cron.Cron
Spec string
jobs map[JobID]cron.EntryID
}
func (s *CronScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
var err error
if s.cron == nil {
return ErrCronNotInitialized
}
if s.jobs == nil {
s.jobs = make(map[JobID]cron.EntryID)
}
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
entryId, err := s.cron.AddFunc(s.Spec, func() {
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
}
})
s.jobs[id] = entryId
if err != nil {
return err
}
return nil
}
func (s *CronScheduler) GetType() string {
return "Cron"
}
func (s *CronScheduler) IsAdHoc() bool {
return false
}
func (s *CronScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if entryId, ok := s.jobs[id]; ok {
s.cron.Remove(entryId)
}
return nil
}
func (s *CronScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, entryId := range s.jobs {
s.cron.Remove(entryId)
}
s.jobs = nil
return nil
}
func (s *CronScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
func (s *CronScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
Spec: s.Spec,
}
}
// DelayScheduler is a scheduler that schedules a job after a delay
type DelayScheduler struct {
Delay time.Duration
jobs map[JobID]StopChan
}
func (s *DelayScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
if s.jobs == nil {
s.jobs = make(map[JobID]StopChan)
}
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
stopChan := make(StopChan)
s.jobs[id] = stopChan
timer := time.NewTimer(s.Delay)
go func() {
select {
case <-timer.C:
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
}
case <-stopChan:
timer.Stop()
return
}
}()
return nil
}
func (s *DelayScheduler) GetType() string {
return "Delay"
}
func (s *DelayScheduler) IsAdHoc() bool {
return true
}
func (s *DelayScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if stopChan, ok := s.jobs[id]; ok {
select {
case stopChan <- true:
default:
}
delete(s.jobs, id)
}
return nil
}
func (s *DelayScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, stopChan := range s.jobs {
select {
case stopChan <- true:
default:
}
}
s.jobs = nil
return nil
}
func (s *DelayScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
func (s *DelayScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
Delay: s.Delay,
}
}
// TimeScheduler is a scheduler that schedules at a specific time
type TimeScheduler struct {
Time time.Time
jobs map[JobID]StopChan
executed bool
}
func (s *TimeScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
if s.executed {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, job.GetID())
}
if s.Time.Before(time.Now()) {
return fmt.Errorf("%w: scheduled time is in the past", ErrInvalidTime)
}
if s.jobs == nil {
s.jobs = make(map[JobID]StopChan)
}
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
stopChan := make(StopChan)
s.jobs[id] = stopChan
timer := time.NewTimer(s.Time.Sub(time.Now()))
go func() {
select {
case <-timer.C:
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
s.executed = true
} else {
timer.Stop()
stopChan <- true
}
case <-stopChan:
timer.Stop()
return
}
}()
return nil
}
func (s *TimeScheduler) GetType() string {
return "Time"
}
func (s *TimeScheduler) IsAdHoc() bool {
return false
}
func (s *TimeScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if stopChan, ok := s.jobs[id]; ok {
select {
case stopChan <- true:
default:
}
delete(s.jobs, id)
}
return nil
}
func (s *TimeScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, stopChan := range s.jobs {
select {
case stopChan <- true:
default:
}
}
s.jobs = nil
return nil
}
func (s *TimeScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
func (s *TimeScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
Time: &s.Time,
Executed: s.executed,
}
}
// EventScheduler is a scheduler that schedules a job when an event is received
type EventScheduler struct {
Event EventName
EventBus *EventBus
jobs map[JobID]StopChan
}
func (s *EventScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
ch := make(chan interface{})
if s.EventBus != nil {
s.EventBus.Subscribe(s.Event, ch)
} else {
eventBus.Subscribe(s.Event, ch)
}
if s.jobs == nil {
s.jobs = make(map[JobID]StopChan)
}
id := job.GetID()
if _, ok := s.jobs[id]; ok {
return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
}
stopChan := make(StopChan)
s.jobs[id] = stopChan
go func() {
for {
select {
case <-ch:
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
}
case <-stopChan:
eventBus.Unsubscribe(s.Event, ch)
return
}
}
}()
return nil
}
func (s *EventScheduler) GetType() string {
return "Event"
}
func (s *EventScheduler) IsAdHoc() bool {
return false
}
func (s *EventScheduler) Cancel(id JobID) error {
if s.jobs == nil {
return nil
}
if stopChan, ok := s.jobs[id]; ok {
select {
case stopChan <- true:
default:
}
delete(s.jobs, id)
}
return nil
}
func (s *EventScheduler) CancelAll() error {
if s.jobs == nil {
return nil
}
for _, stopChan := range s.jobs {
select {
case stopChan <- true:
default:
}
}
s.jobs = nil
return nil
}
func (s *EventScheduler) JobExists(id JobID) bool {
if s.jobs == nil {
return false
}
_, ok := s.jobs[id]
return ok
}
func (s *EventScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
Event: s.Event,
}
}
// InstantScheduler is a scheduler that schedules a job instantly
type InstantScheduler struct{}
func (s *InstantScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
}
return nil
}
func (s *InstantScheduler) GetType() string {
return "Instant"
}
func (s *InstantScheduler) IsAdHoc() bool {
return true
}
func (s *InstantScheduler) Cancel(id JobID) error {
return nil
}
func (s *InstantScheduler) CancelAll() error {
return nil
}
func (s *InstantScheduler) JobExists(id JobID) bool {
return false
}
func (s *InstantScheduler) GetPersistence() SchedulerPersistence {
return SchedulerPersistence{
Type: s.GetType(),
}
}
...@@ -2,8 +2,10 @@ package jobqueue ...@@ -2,8 +2,10 @@ package jobqueue
import ( import (
"encoding/json" "encoding/json"
"github.com/fsnotify/fsnotify"
"github.com/robfig/cron/v3" "github.com/robfig/cron/v3"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"os"
"sync/atomic" "sync/atomic"
"testing" "testing"
"time" "time"
...@@ -366,3 +368,43 @@ func parseTimeForTesting(t *testing.T, value string) *time.Time { ...@@ -366,3 +368,43 @@ func parseTimeForTesting(t *testing.T, value string) *time.Time {
t.Fatalf("Failed to parse time '%s' in any known format", value) t.Fatalf("Failed to parse time '%s' in any known format", value)
return nil return nil
} }
func TestInotifyScheduler_BasicFunctionality(t *testing.T) {
var count int32
eventBus := NewEventBus()
tmpPath := t.TempDir()
inotifyScheduler := InotifyScheduler{Path: tmpPath, EventFlags: fsnotify.Create | fsnotify.Write | fsnotify.Remove}
job := NewJob[DummyResult]("test-job", &DummyRunnable{})
genericJob := GenericJob(job)
_ = inotifyScheduler.Schedule(genericJob, eventBus)
jobChannel := make(chan interface{})
go func() {
for _ = range jobChannel {
atomic.AddInt32(&count, 1)
}
}()
eventBus.Subscribe(QueueJob, jobChannel)
time.Sleep(time.Millisecond * 100)
tmpFile := tmpPath + "/test.txt"
_, err := os.Create(tmpFile)
assert.NoError(t, err)
time.Sleep(time.Millisecond * 100)
err = os.Remove(tmpFile)
assert.NoError(t, err)
time.Sleep(time.Millisecond * 100)
if atomic.LoadInt32(&count) != 2 {
t.Errorf("Expected to run 2 times, ran %d times", count)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment