From 2f7bb447ef045c55ed412e5695978d59c9e8004c Mon Sep 17 00:00:00 2001 From: Volker Schukai <volker.schukai@schukai.com> Date: Thu, 12 Dec 2024 23:32:32 +0100 Subject: [PATCH] feat: add EventQueueLength --- .idea/workspace.xml | 7 +++++-- event-bus.go | 12 ++++++------ go.mod | 2 +- manager.go | 11 +++++++---- queue.go | 2 ++ 5 files changed, 21 insertions(+), 13 deletions(-) diff --git a/.idea/workspace.xml b/.idea/workspace.xml index b6599b1..88095bf 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -6,6 +6,8 @@ <component name="ChangeListManager"> <list default="true" id="9979eb22-471e-4f2f-b624-fd3edb5e8c6e" name="Changes" comment=""> <change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" /> + <change beforePath="$PROJECT_DIR$/event-bus.go" beforeDir="false" afterPath="$PROJECT_DIR$/event-bus.go" afterDir="false" /> + <change beforePath="$PROJECT_DIR$/go.mod" beforeDir="false" afterPath="$PROJECT_DIR$/go.mod" afterDir="false" /> <change beforePath="$PROJECT_DIR$/manager.go" beforeDir="false" afterPath="$PROJECT_DIR$/manager.go" afterDir="false" /> <change beforePath="$PROJECT_DIR$/queue.go" beforeDir="false" afterPath="$PROJECT_DIR$/queue.go" afterDir="false" /> </list> @@ -24,7 +26,7 @@ </list> </option> </component> - <component name="GOROOT" url="file:///nix/store/j9hk9x78z301jqldfyvw72z24vb992qi-go-1.22.6/share/go" /> + <component name="GOROOT" url="file://$USER_HOME$/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.23.4.linux-amd64" /> <component name="Git.Settings"> <option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$" /> </component> @@ -257,7 +259,8 @@ <workItem from="1734001335693" duration="5602000" /> <workItem from="1734012566872" duration="4517000" /> <workItem from="1734022155342" duration="424000" /> - <workItem from="1734024671338" duration="491000" /> + <workItem from="1734024671338" duration="1103000" /> + <workItem from="1734042130391" duration="498000" /> </task> <servers /> </component> diff --git a/event-bus.go b/event-bus.go index fc6988e..599ffc3 100644 --- a/event-bus.go +++ b/event-bus.go @@ -42,7 +42,7 @@ type Event struct { // EventBus is a simple event bus type EventBus struct { - subscribers map[EventName][]chan interface{} + subscribers map[EventName][]chan any publishErr map[MessageID]error mu sync.RWMutex shutdownChan chan struct{} @@ -53,7 +53,7 @@ type EventBus struct { func NewEventBus() *EventBus { Info("EventBus created") return &EventBus{ - subscribers: make(map[EventName][]chan interface{}), + subscribers: make(map[EventName][]chan any), publishErr: make(map[MessageID]error), shutdownChan: make(chan struct{}), } @@ -66,19 +66,19 @@ func (eb *EventBus) Shutdown() { } // Subscribe adds a channel to the subscribers list -func (eb *EventBus) Subscribe(name EventName, ch chan interface{}) { +func (eb *EventBus) Subscribe(name EventName, ch chan any) { eb.mu.Lock() defer eb.mu.Unlock() if _, found := eb.subscribers[name]; !found { - eb.subscribers[name] = []chan interface{}{} + eb.subscribers[name] = []chan any{} } eb.subscribers[name] = append(eb.subscribers[name], ch) } // Unsubscribe removes a channel from the subscribers list -func (eb *EventBus) Unsubscribe(name EventName, ch chan interface{}) { +func (eb *EventBus) Unsubscribe(name EventName, ch chan any) { eb.mu.Lock() defer eb.mu.Unlock() @@ -126,7 +126,7 @@ func (eb *EventBus) Publish(name EventName, data any) { if channels, found := eb.subscribers[name]; found { for _, ch := range channels { eb.wg.Add(1) - go func(ch chan interface{}) { + go func(ch chan any) { defer eb.wg.Done() msgID := NewMessageID() diff --git a/go.mod b/go.mod index 6ffc779..0d3057f 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module gitlab.schukai.com/oss/libraries/go/services/job-queues.git -go 1.22.6 +go 1.23.3 require ( github.com/DATA-DOG/go-sqlmock v1.5.0 diff --git a/manager.go b/manager.go index 351726b..d0b9775 100644 --- a/manager.go +++ b/manager.go @@ -29,7 +29,7 @@ type Manager struct { activeJobs map[JobID]GenericJob - jobEventCh chan interface{} + jobEventCh chan any cronInstance *cron.Cron //logger Logger @@ -44,15 +44,18 @@ type Manager struct { func NewManager() *Manager { eventBus := NewEventBus() + q := NewQueue(eventBus) mng := &Manager{ state: ManagerStateStopped, - queue: NewQueue(eventBus), + queue: q, workerMap: make(map[WorkerID]Worker), eventBus: eventBus, activeJobs: make(map[JobID]GenericJob), } + q.SetManager(mng) + return mng } @@ -538,7 +541,7 @@ func (m *Manager) Start() error { } } - m.jobEventCh = make(chan interface{}, 100) + m.jobEventCh = make(chan any, 1000) m.eventBus.Subscribe(QueueJob, m.jobEventCh) m.eventBus.Subscribe(JobReady, m.jobEventCh) @@ -559,7 +562,7 @@ func (m *Manager) Start() error { } -func safeClose(ch chan interface{}) (err error) { +func safeClose(ch chan any) (err error) { defer func() { if recover() != nil { diff --git a/queue.go b/queue.go index 76cd7d0..900d46b 100644 --- a/queue.go +++ b/queue.go @@ -200,6 +200,7 @@ type QueueStats struct { MaxAgeReachedCount int `json:"MaxAgeReachedCount"` PendingDependenciesCount int `json:"PendingDependenciesCount"` OldestProcessedJobAge time.Duration `json:"OldestProcessedJobAge"` + EventQueueLength int `json:"EventQueueLength"` } // Stats computes and returns the statistics of the job queue @@ -243,5 +244,6 @@ func (q *Queue) Stats() QueueStats { MaxAgeReachedCount: maxAgeReachedCount, PendingDependenciesCount: totalPendingDeps, OldestProcessedJobAge: oldestProcessedJobAge, + EventQueueLength: len(q.manger.jobEventCh), } } -- GitLab