diff --git a/event-bus.go b/event-bus.go index 86cc84eaf08f1d2252d15d6367ae86446dcfb58a..6f957d6c7c7c424168686b859d636a8822bacf4e 100644 --- a/event-bus.go +++ b/event-bus.go @@ -1,6 +1,7 @@ package jobqueue import ( + "github.com/google/uuid" "sync" ) @@ -17,8 +18,9 @@ const ( ) type Event struct { - Name EventName - Data any + Name EventName + Data any + MessageID string } // EventBus is a simple event bus @@ -64,7 +66,11 @@ func (eb *EventBus) Publish(name EventName, data interface{}) { defer eb.mu.RUnlock() if channels, found := eb.subscribers[name]; found { for _, ch := range channels { - ch <- Event{Name: name, Data: data} + ch <- Event{ + Name: name, + Data: data, + MessageID: uuid.New().String(), + } } } } diff --git a/go.mod b/go.mod index 453b642b900262b3bcdf1f2b98b3bef15e81ef98..6cfd282066755c4c99d998765f3a26ed2bd9e41f 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/go-ole/go-ole v1.3.0 // indirect github.com/go-sql-driver/mysql v1.7.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/google/uuid v1.3.1 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/kr/fs v0.1.0 // indirect diff --git a/go.sum b/go.sum index 996f88a55e76433e7690f3b90894f5a5f24de26e..49b6057457b82993b5605649d56a3263a4b1141c 100644 --- a/go.sum +++ b/go.sum @@ -28,6 +28,8 @@ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= diff --git a/import_test.go b/import_test.go index 8687627c8107c351d0d7828273368d3aac5718aa..39e1d6d4fa73c9ae661389f0bd762e786d6e5f16 100644 --- a/import_test.go +++ b/import_test.go @@ -1,6 +1,7 @@ package jobqueue import ( + "github.com/robfig/cron/v3" "github.com/stretchr/testify/assert" "io/ioutil" "os" @@ -48,7 +49,7 @@ func TestCreateJobAndSchedulerFromInput(t *testing.T) { }, Scheduler: SchedulerImport{ Type: "Cron", - Spec: "*/5 * * * *", + Spec: "* * * * * *", }, }, wantJob: GenericJob(&Job[ShellResult]{ /* Initialization */ }), @@ -60,6 +61,15 @@ func TestCreateJobAndSchedulerFromInput(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { gotJob, gotSchedule, err := CreateJobAndSchedulerFromImport(tt.input, nil) + if gotSchedule != nil { + if gotSchedule.GetType() == "Cron" { + cronInst := cron.New(cron.WithSeconds()) + gotSchedule.(*CronScheduler).cron = cronInst + cronInst.Start() + defer cronInst.Stop() + + } + } if (err != nil) != tt.wantErr { t.Errorf("CreateJobAndSchedulerFromImport() error = %v, wantErr %v", err, tt.wantErr) diff --git a/issue-1_test.go b/issue-1_test.go index ce9a16c7818569048c2183b259b7686b949198af..bd90bb876af9f82c2a8dfbaf669f1af62001879c 100644 --- a/issue-1_test.go +++ b/issue-1_test.go @@ -3,6 +3,7 @@ package jobqueue import ( "github.com/robfig/cron/v3" "github.com/stretchr/testify/assert" + "go.uber.org/zap" "os" "strings" "testing" @@ -42,6 +43,30 @@ func TestRoundTrip(t *testing.T) { // define test data with jobs in yaml format testData := []byte(` - id: job1 + priority: 1 + timeout: 1s + maxRetries: 3 + retryDelay: 1s + runnable: + type: shell + data: + script: echo "~1~ $(date "+%M:%S")" >> /tmp/job1.log + scheduler: + type: cron + spec: "*/10 * * * * *" +- id: job2 + priority: 1 + timeout: 1s + maxRetries: 3 + retryDelay: 1s + runnable: + type: shell + data: + script: echo "~~~~2~ $(date "+%M:%S")" >> /tmp/job1.log + scheduler: + type: cron + spec: "*/5 * * * * *" +- id: job3 priority: 1 timeout: 1s maxRetries: 3 @@ -49,18 +74,27 @@ func TestRoundTrip(t *testing.T) { runnable: type: shell data: - script: echo "Hello World $(date)" >> /tmp/job1.log + script: echo "~~~~~~3~ $(date "+%M:%S")" >> /tmp/job1.log scheduler: - type: cron - spec: "* * * * *" + type: interval + interval: 20s `) var err error + cronInstance := cron.New(cron.WithSeconds()) + cronInstance.Start() + + zapLogger, _ := zap.NewDevelopment() + manager := NewManager() - manager.SetCronInstance(cron.New()) - worker := NewLocalWorker(1) + manager.SetLogger(&ZapAdapter{logger: zapLogger}) + + manager.SetCronInstance(cronInstance) + worker := NewLocalWorker(10) + worker.SetManager(manager) err = manager.AddWorker(worker) + assert.Nil(t, err) err = manager.Start() diff --git a/manager.go b/manager.go index 7771d2eb3589423d054e30017da27375c05fccb0..5a8468f4007b9b3133463de1ea625fd21c473a50 100644 --- a/manager.go +++ b/manager.go @@ -184,8 +184,10 @@ func (m *Manager) Start() error { } m.jobEventCh = make(chan interface{}, 100) + m.eventBus.Subscribe(QueueJob, m.jobEventCh) m.eventBus.Subscribe(JobReady, m.jobEventCh) + go m.handleJobEvents() err := m.checkAndSetRunningState() @@ -249,35 +251,66 @@ func (m *Manager) Stop() error { } func (m *Manager) SetLogger(logger Logger) { + m.mu.Lock() + defer m.mu.Unlock() m.logger = logger } +func (m *Manager) GetLogger() Logger { + m.mu.Lock() + defer m.mu.Unlock() + return m.logger +} + func (m *Manager) handleJobEvents() { for event := range m.jobEventCh { switch event := event.(type) { case Event: + if m.logger != nil { + m.logger.Info("Event received", "event", event.Name, "data", event.Data, "message_id", event.MessageID) + } + switch event.Name { case QueueJob: + + if m.logger != nil { + m.logger.Info("Job queued", "job_id", event.Data.(GenericJob).GetID()) + } + job := event.Data.(GenericJob) err := m.queue.Enqueue(job) if err != nil && err != ErrJobAlreadyExists { fmt.Println(err) - } case JobReady: for { + nextJob, err := m.queue.Dequeue() if err != nil { break } + if m.logger != nil { + m.logger.Info("Job ready", "job_id", nextJob.GetID()) + } + + assigned := false for _, worker := range m.workerMap { if err := worker.AssignJob(nextJob); err == nil { + assigned = true break } } + + if !assigned { + fmt.Println("Job not assigned") + err = m.queue.Enqueue(nextJob) + if err != nil && err != ErrJobAlreadyExists { + fmt.Println(err) + } + } } case JobFinished: diff --git a/manager_test.go b/manager_test.go index 0fada7d392e90a45420c06b83dd0ceced0735ab1..bbf2fb04665e0eca5595553ff8c68937a40c5d67 100644 --- a/manager_test.go +++ b/manager_test.go @@ -30,6 +30,10 @@ func (m *MockWorker) Status() WorkerStatus { return m.status } +func (m *MockWorker) SetManager(manager *Manager) { + return +} + func (m *MockWorker) AssignJob(job GenericJob) error { return nil } @@ -80,6 +84,10 @@ func (m *MockGenericJob) GetDependencies() []JobID { return nil } +func (m *MockGenericJob) GetDependentJobs() []JobID { + return nil +} + func (m *MockGenericJob) GetPriority() Priority { return PriorityDefault } diff --git a/queue.go b/queue.go index 5e2f65318075f49d95909e919a59586ca36c8fc4..c850f61917090b72476ac122439a7370026e8a26 100644 --- a/queue.go +++ b/queue.go @@ -1,6 +1,7 @@ package jobqueue import ( + "fmt" "sync" ) @@ -11,6 +12,7 @@ type Queue struct { processedJobs map[JobID]struct{} eventBus *EventBus mu sync.Mutex + manger *Manager } func NewQueue(EventBus *EventBus) *Queue { @@ -23,13 +25,16 @@ func NewQueue(EventBus *EventBus) *Queue { } } +func (q *Queue) SetManager(m *Manager) { + q.manger = m +} + func (q *Queue) Enqueue(job GenericJob) error { q.mu.Lock() defer q.mu.Unlock() if _, exists := q.jobMap[job.GetID()]; !exists { q.jobMap[job.GetID()] = job - //return ErrJobAlreadyExists } for _, readyJob := range q.readyQueue { @@ -83,6 +88,10 @@ func (q *Queue) Enqueue(job GenericJob) error { q.readyQueue = newReadyQueue if q.eventBus != nil && len(q.readyQueue) > 0 { + if q.manger != nil && q.manger.logger != nil { + q.manger.logger.Info("Job ready", "job_id", job.GetID()) + } + q.eventBus.Publish(JobReady, job.GetID()) } @@ -105,6 +114,8 @@ func (q *Queue) Dequeue() (GenericJob, error) { // Mark the job as processed but keep it in the jobMap for dependency resolution q.processedJobs[job.GetID()] = struct{}{} + fmt.Printf("Dequeue: jobs in ready queue: %v, jobs in processed: %v, current job: %v\n", q.readyQueue, q.processedJobs, job.GetID()) + return job, nil } diff --git a/scheduler.go b/scheduler.go index c26afcf984bec57a5fb74cc83d0d72b1c784be7e..9b702aeb235bfe66dde82796c5b34f1b6b4662fc 100644 --- a/scheduler.go +++ b/scheduler.go @@ -128,7 +128,7 @@ func (s *CronScheduler) Schedule(job GenericJob, eventBus *EventBus) error { if err != nil { return err } - + return nil } diff --git a/worker.go b/worker.go index 9f38e8a8142b3f4f319caec7e8fb9b527a900c7e..f09dd8c1068dc2d532d1a9c23fd41bf176069022 100644 --- a/worker.go +++ b/worker.go @@ -2,6 +2,8 @@ package jobqueue import ( "context" + "fmt" + "github.com/google/uuid" "sync" "time" ) @@ -15,6 +17,10 @@ const ( type WorkerID string +func (id WorkerID) String() string { + return string(id) +} + // Worker is a worker type Worker interface { Start() error @@ -23,6 +29,8 @@ type Worker interface { AssignJob(job GenericJob) error GetID() WorkerID + + SetManager(manager *Manager) } // GenericWorker is a generic worker @@ -40,6 +48,7 @@ type LocalWorker struct { maxJobs int mu sync.Mutex wg sync.WaitGroup + manager *Manager } // GetID returns the ID of the worker @@ -53,6 +62,7 @@ func NewLocalWorker(maxJobs int) *LocalWorker { w.jobChannels = make([]chan GenericJob, maxJobs) w.stopChans = make([]chan bool, maxJobs) w.cancelChans = make([]chan bool, maxJobs) + w.ID = WorkerID(uuid.New().String()) return w } @@ -76,9 +86,17 @@ func (w *LocalWorker) Start() error { w.wg.Wait() w.status = WorkerStatusRunning + if w.manager != nil { + w.manager.logger.Info("Worker started", "worker", w.ID) + } + return nil } +func (w *LocalWorker) SetManager(manager *Manager) { + w.manager = manager +} + // Stop stops the worker func (w *LocalWorker) Stop() error { w.mu.Lock() @@ -93,12 +111,24 @@ func (w *LocalWorker) Stop() error { stopChan <- true } + if w.manager != nil { + w.manager.logger.Info("Worker stopped", "worker", w.ID) + } + return nil } func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancelChan chan bool) { w.wg.Done() + workerThreadID := w.ID.String() + "-" + fmt.Sprintf("%p", &w) + + if w.manager != nil && w.manager.logger != nil { + w.manager.logger.Info("Worker thread with id started", "worker", w.ID, "thread_id", workerThreadID) + } + + stopFlag := false + for { select { case job := <-jobChannel: @@ -119,6 +149,11 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel } ctxTimeout, cancelTimeout := context.WithTimeout(ctx, timeout) + + if w.manager != nil && w.manager.logger != nil { + w.manager.logger.Info("Executing job on worker thread", "worker", w.ID, "thread_id", workerThreadID, "job_id", job.GetID()) + } + _, err = job.Execute(ctxTimeout) cancelTimeout() @@ -133,20 +168,30 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel retries-- } - select { - case <-cancelChan: - cancel() + //select { + //case <-cancelChan: + // cancel() - case <-ctx.Done(): - cancel() + //case <-ctx.Done(): + // cancel() - default: - cancel() - } + //default: + cancel() + //} case <-stopChan: - return + stopFlag = true + break + } + + if stopFlag { + break } } + + if w.manager != nil && w.manager.logger != nil { + w.manager.logger.Info("Worker thread with id stopped", "worker", w.ID, "thread_id", workerThreadID) + } + } // AssignJob assigns a job to the worker