Skip to content
Snippets Groups Projects
Verified Commit b8bcd593 authored by Volker Schukai's avatar Volker Schukai :alien:
Browse files

x

parent 205aa841
No related branches found
No related tags found
No related merge requests found
package jobqueue
import (
"github.com/google/uuid"
"sync"
)
......@@ -19,6 +20,7 @@ const (
type Event struct {
Name EventName
Data any
MessageID string
}
// EventBus is a simple event bus
......@@ -64,7 +66,11 @@ func (eb *EventBus) Publish(name EventName, data interface{}) {
defer eb.mu.RUnlock()
if channels, found := eb.subscribers[name]; found {
for _, ch := range channels {
ch <- Event{Name: name, Data: data}
ch <- Event{
Name: name,
Data: data,
MessageID: uuid.New().String(),
}
}
}
}
......@@ -25,6 +25,7 @@ require (
github.com/go-ole/go-ole v1.3.0 // indirect
github.com/go-sql-driver/mysql v1.7.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/kr/fs v0.1.0 // indirect
......
package jobqueue
import (
"github.com/robfig/cron/v3"
"github.com/stretchr/testify/assert"
"io/ioutil"
"os"
......@@ -48,7 +49,7 @@ func TestCreateJobAndSchedulerFromInput(t *testing.T) {
},
Scheduler: SchedulerImport{
Type: "Cron",
Spec: "*/5 * * * *",
Spec: "* * * * * *",
},
},
wantJob: GenericJob(&Job[ShellResult]{ /* Initialization */ }),
......@@ -60,6 +61,15 @@ func TestCreateJobAndSchedulerFromInput(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotJob, gotSchedule, err := CreateJobAndSchedulerFromImport(tt.input, nil)
if gotSchedule != nil {
if gotSchedule.GetType() == "Cron" {
cronInst := cron.New(cron.WithSeconds())
gotSchedule.(*CronScheduler).cron = cronInst
cronInst.Start()
defer cronInst.Stop()
}
}
if (err != nil) != tt.wantErr {
t.Errorf("CreateJobAndSchedulerFromImport() error = %v, wantErr %v", err, tt.wantErr)
......
......@@ -3,6 +3,7 @@ package jobqueue
import (
"github.com/robfig/cron/v3"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"os"
"strings"
"testing"
......@@ -49,18 +50,51 @@ func TestRoundTrip(t *testing.T) {
runnable:
type: shell
data:
script: echo "Hello World $(date)" >> /tmp/job1.log
script: echo "~1~ $(date "+%M:%S")" >> /tmp/job1.log
scheduler:
type: cron
spec: "* * * * *"
spec: "*/10 * * * * *"
- id: job2
priority: 1
timeout: 1s
maxRetries: 3
retryDelay: 1s
runnable:
type: shell
data:
script: echo "~~~~2~ $(date "+%M:%S")" >> /tmp/job1.log
scheduler:
type: cron
spec: "*/5 * * * * *"
- id: job3
priority: 1
timeout: 1s
maxRetries: 3
retryDelay: 1s
runnable:
type: shell
data:
script: echo "~~~~~~3~ $(date "+%M:%S")" >> /tmp/job1.log
scheduler:
type: interval
interval: 20s
`)
var err error
cronInstance := cron.New(cron.WithSeconds())
cronInstance.Start()
zapLogger, _ := zap.NewDevelopment()
manager := NewManager()
manager.SetCronInstance(cron.New())
worker := NewLocalWorker(1)
manager.SetLogger(&ZapAdapter{logger: zapLogger})
manager.SetCronInstance(cronInstance)
worker := NewLocalWorker(10)
worker.SetManager(manager)
err = manager.AddWorker(worker)
assert.Nil(t, err)
err = manager.Start()
......
......@@ -184,8 +184,10 @@ func (m *Manager) Start() error {
}
m.jobEventCh = make(chan interface{}, 100)
m.eventBus.Subscribe(QueueJob, m.jobEventCh)
m.eventBus.Subscribe(JobReady, m.jobEventCh)
go m.handleJobEvents()
err := m.checkAndSetRunningState()
......@@ -249,35 +251,66 @@ func (m *Manager) Stop() error {
}
func (m *Manager) SetLogger(logger Logger) {
m.mu.Lock()
defer m.mu.Unlock()
m.logger = logger
}
func (m *Manager) GetLogger() Logger {
m.mu.Lock()
defer m.mu.Unlock()
return m.logger
}
func (m *Manager) handleJobEvents() {
for event := range m.jobEventCh {
switch event := event.(type) {
case Event:
if m.logger != nil {
m.logger.Info("Event received", "event", event.Name, "data", event.Data, "message_id", event.MessageID)
}
switch event.Name {
case QueueJob:
if m.logger != nil {
m.logger.Info("Job queued", "job_id", event.Data.(GenericJob).GetID())
}
job := event.Data.(GenericJob)
err := m.queue.Enqueue(job)
if err != nil && err != ErrJobAlreadyExists {
fmt.Println(err)
}
case JobReady:
for {
nextJob, err := m.queue.Dequeue()
if err != nil {
break
}
if m.logger != nil {
m.logger.Info("Job ready", "job_id", nextJob.GetID())
}
assigned := false
for _, worker := range m.workerMap {
if err := worker.AssignJob(nextJob); err == nil {
assigned = true
break
}
}
if !assigned {
fmt.Println("Job not assigned")
err = m.queue.Enqueue(nextJob)
if err != nil && err != ErrJobAlreadyExists {
fmt.Println(err)
}
}
}
case JobFinished:
......
......@@ -30,6 +30,10 @@ func (m *MockWorker) Status() WorkerStatus {
return m.status
}
func (m *MockWorker) SetManager(manager *Manager) {
return
}
func (m *MockWorker) AssignJob(job GenericJob) error {
return nil
}
......@@ -80,6 +84,10 @@ func (m *MockGenericJob) GetDependencies() []JobID {
return nil
}
func (m *MockGenericJob) GetDependentJobs() []JobID {
return nil
}
func (m *MockGenericJob) GetPriority() Priority {
return PriorityDefault
}
......
package jobqueue
import (
"fmt"
"sync"
)
......@@ -11,6 +12,7 @@ type Queue struct {
processedJobs map[JobID]struct{}
eventBus *EventBus
mu sync.Mutex
manger *Manager
}
func NewQueue(EventBus *EventBus) *Queue {
......@@ -23,13 +25,16 @@ func NewQueue(EventBus *EventBus) *Queue {
}
}
func (q *Queue) SetManager(m *Manager) {
q.manger = m
}
func (q *Queue) Enqueue(job GenericJob) error {
q.mu.Lock()
defer q.mu.Unlock()
if _, exists := q.jobMap[job.GetID()]; !exists {
q.jobMap[job.GetID()] = job
//return ErrJobAlreadyExists
}
for _, readyJob := range q.readyQueue {
......@@ -83,6 +88,10 @@ func (q *Queue) Enqueue(job GenericJob) error {
q.readyQueue = newReadyQueue
if q.eventBus != nil && len(q.readyQueue) > 0 {
if q.manger != nil && q.manger.logger != nil {
q.manger.logger.Info("Job ready", "job_id", job.GetID())
}
q.eventBus.Publish(JobReady, job.GetID())
}
......@@ -105,6 +114,8 @@ func (q *Queue) Dequeue() (GenericJob, error) {
// Mark the job as processed but keep it in the jobMap for dependency resolution
q.processedJobs[job.GetID()] = struct{}{}
fmt.Printf("Dequeue: jobs in ready queue: %v, jobs in processed: %v, current job: %v\n", q.readyQueue, q.processedJobs, job.GetID())
return job, nil
}
......
......@@ -2,6 +2,8 @@ package jobqueue
import (
"context"
"fmt"
"github.com/google/uuid"
"sync"
"time"
)
......@@ -15,6 +17,10 @@ const (
type WorkerID string
func (id WorkerID) String() string {
return string(id)
}
// Worker is a worker
type Worker interface {
Start() error
......@@ -23,6 +29,8 @@ type Worker interface {
AssignJob(job GenericJob) error
GetID() WorkerID
SetManager(manager *Manager)
}
// GenericWorker is a generic worker
......@@ -40,6 +48,7 @@ type LocalWorker struct {
maxJobs int
mu sync.Mutex
wg sync.WaitGroup
manager *Manager
}
// GetID returns the ID of the worker
......@@ -53,6 +62,7 @@ func NewLocalWorker(maxJobs int) *LocalWorker {
w.jobChannels = make([]chan GenericJob, maxJobs)
w.stopChans = make([]chan bool, maxJobs)
w.cancelChans = make([]chan bool, maxJobs)
w.ID = WorkerID(uuid.New().String())
return w
}
......@@ -76,9 +86,17 @@ func (w *LocalWorker) Start() error {
w.wg.Wait()
w.status = WorkerStatusRunning
if w.manager != nil {
w.manager.logger.Info("Worker started", "worker", w.ID)
}
return nil
}
func (w *LocalWorker) SetManager(manager *Manager) {
w.manager = manager
}
// Stop stops the worker
func (w *LocalWorker) Stop() error {
w.mu.Lock()
......@@ -93,12 +111,24 @@ func (w *LocalWorker) Stop() error {
stopChan <- true
}
if w.manager != nil {
w.manager.logger.Info("Worker stopped", "worker", w.ID)
}
return nil
}
func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancelChan chan bool) {
w.wg.Done()
workerThreadID := w.ID.String() + "-" + fmt.Sprintf("%p", &w)
if w.manager != nil && w.manager.logger != nil {
w.manager.logger.Info("Worker thread with id started", "worker", w.ID, "thread_id", workerThreadID)
}
stopFlag := false
for {
select {
case job := <-jobChannel:
......@@ -119,6 +149,11 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel
}
ctxTimeout, cancelTimeout := context.WithTimeout(ctx, timeout)
if w.manager != nil && w.manager.logger != nil {
w.manager.logger.Info("Executing job on worker thread", "worker", w.ID, "thread_id", workerThreadID, "job_id", job.GetID())
}
_, err = job.Execute(ctxTimeout)
cancelTimeout()
......@@ -133,20 +168,30 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel
retries--
}
select {
case <-cancelChan:
cancel()
//select {
//case <-cancelChan:
// cancel()
case <-ctx.Done():
cancel()
//case <-ctx.Done():
// cancel()
default:
//default:
cancel()
}
//}
case <-stopChan:
return
stopFlag = true
break
}
if stopFlag {
break
}
}
if w.manager != nil && w.manager.logger != nil {
w.manager.logger.Info("Worker thread with id stopped", "worker", w.ID, "thread_id", workerThreadID)
}
}
// AssignJob assigns a job to the worker
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment