Skip to content
Snippets Groups Projects
Verified Commit 2f7bb447 authored by Volker Schukai's avatar Volker Schukai :alien:
Browse files

feat: add EventQueueLength

parent 3e7265f9
No related branches found
No related tags found
No related merge requests found
...@@ -6,6 +6,8 @@ ...@@ -6,6 +6,8 @@
<component name="ChangeListManager"> <component name="ChangeListManager">
<list default="true" id="9979eb22-471e-4f2f-b624-fd3edb5e8c6e" name="Changes" comment=""> <list default="true" id="9979eb22-471e-4f2f-b624-fd3edb5e8c6e" name="Changes" comment="">
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" /> <change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/event-bus.go" beforeDir="false" afterPath="$PROJECT_DIR$/event-bus.go" afterDir="false" />
<change beforePath="$PROJECT_DIR$/go.mod" beforeDir="false" afterPath="$PROJECT_DIR$/go.mod" afterDir="false" />
<change beforePath="$PROJECT_DIR$/manager.go" beforeDir="false" afterPath="$PROJECT_DIR$/manager.go" afterDir="false" /> <change beforePath="$PROJECT_DIR$/manager.go" beforeDir="false" afterPath="$PROJECT_DIR$/manager.go" afterDir="false" />
<change beforePath="$PROJECT_DIR$/queue.go" beforeDir="false" afterPath="$PROJECT_DIR$/queue.go" afterDir="false" /> <change beforePath="$PROJECT_DIR$/queue.go" beforeDir="false" afterPath="$PROJECT_DIR$/queue.go" afterDir="false" />
</list> </list>
...@@ -24,7 +26,7 @@ ...@@ -24,7 +26,7 @@
</list> </list>
</option> </option>
</component> </component>
<component name="GOROOT" url="file:///nix/store/j9hk9x78z301jqldfyvw72z24vb992qi-go-1.22.6/share/go" /> <component name="GOROOT" url="file://$USER_HOME$/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.23.4.linux-amd64" />
<component name="Git.Settings"> <component name="Git.Settings">
<option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$" /> <option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$" />
</component> </component>
...@@ -257,7 +259,8 @@ ...@@ -257,7 +259,8 @@
<workItem from="1734001335693" duration="5602000" /> <workItem from="1734001335693" duration="5602000" />
<workItem from="1734012566872" duration="4517000" /> <workItem from="1734012566872" duration="4517000" />
<workItem from="1734022155342" duration="424000" /> <workItem from="1734022155342" duration="424000" />
<workItem from="1734024671338" duration="491000" /> <workItem from="1734024671338" duration="1103000" />
<workItem from="1734042130391" duration="498000" />
</task> </task>
<servers /> <servers />
</component> </component>
......
...@@ -42,7 +42,7 @@ type Event struct { ...@@ -42,7 +42,7 @@ type Event struct {
// EventBus is a simple event bus // EventBus is a simple event bus
type EventBus struct { type EventBus struct {
subscribers map[EventName][]chan interface{} subscribers map[EventName][]chan any
publishErr map[MessageID]error publishErr map[MessageID]error
mu sync.RWMutex mu sync.RWMutex
shutdownChan chan struct{} shutdownChan chan struct{}
...@@ -53,7 +53,7 @@ type EventBus struct { ...@@ -53,7 +53,7 @@ type EventBus struct {
func NewEventBus() *EventBus { func NewEventBus() *EventBus {
Info("EventBus created") Info("EventBus created")
return &EventBus{ return &EventBus{
subscribers: make(map[EventName][]chan interface{}), subscribers: make(map[EventName][]chan any),
publishErr: make(map[MessageID]error), publishErr: make(map[MessageID]error),
shutdownChan: make(chan struct{}), shutdownChan: make(chan struct{}),
} }
...@@ -66,19 +66,19 @@ func (eb *EventBus) Shutdown() { ...@@ -66,19 +66,19 @@ func (eb *EventBus) Shutdown() {
} }
// Subscribe adds a channel to the subscribers list // Subscribe adds a channel to the subscribers list
func (eb *EventBus) Subscribe(name EventName, ch chan interface{}) { func (eb *EventBus) Subscribe(name EventName, ch chan any) {
eb.mu.Lock() eb.mu.Lock()
defer eb.mu.Unlock() defer eb.mu.Unlock()
if _, found := eb.subscribers[name]; !found { if _, found := eb.subscribers[name]; !found {
eb.subscribers[name] = []chan interface{}{} eb.subscribers[name] = []chan any{}
} }
eb.subscribers[name] = append(eb.subscribers[name], ch) eb.subscribers[name] = append(eb.subscribers[name], ch)
} }
// Unsubscribe removes a channel from the subscribers list // Unsubscribe removes a channel from the subscribers list
func (eb *EventBus) Unsubscribe(name EventName, ch chan interface{}) { func (eb *EventBus) Unsubscribe(name EventName, ch chan any) {
eb.mu.Lock() eb.mu.Lock()
defer eb.mu.Unlock() defer eb.mu.Unlock()
...@@ -126,7 +126,7 @@ func (eb *EventBus) Publish(name EventName, data any) { ...@@ -126,7 +126,7 @@ func (eb *EventBus) Publish(name EventName, data any) {
if channels, found := eb.subscribers[name]; found { if channels, found := eb.subscribers[name]; found {
for _, ch := range channels { for _, ch := range channels {
eb.wg.Add(1) eb.wg.Add(1)
go func(ch chan interface{}) { go func(ch chan any) {
defer eb.wg.Done() defer eb.wg.Done()
msgID := NewMessageID() msgID := NewMessageID()
......
...@@ -29,7 +29,7 @@ type Manager struct { ...@@ -29,7 +29,7 @@ type Manager struct {
activeJobs map[JobID]GenericJob activeJobs map[JobID]GenericJob
jobEventCh chan interface{} jobEventCh chan any
cronInstance *cron.Cron cronInstance *cron.Cron
//logger Logger //logger Logger
...@@ -44,15 +44,18 @@ type Manager struct { ...@@ -44,15 +44,18 @@ type Manager struct {
func NewManager() *Manager { func NewManager() *Manager {
eventBus := NewEventBus() eventBus := NewEventBus()
q := NewQueue(eventBus)
mng := &Manager{ mng := &Manager{
state: ManagerStateStopped, state: ManagerStateStopped,
queue: NewQueue(eventBus), queue: q,
workerMap: make(map[WorkerID]Worker), workerMap: make(map[WorkerID]Worker),
eventBus: eventBus, eventBus: eventBus,
activeJobs: make(map[JobID]GenericJob), activeJobs: make(map[JobID]GenericJob),
} }
q.SetManager(mng)
return mng return mng
} }
...@@ -538,7 +541,7 @@ func (m *Manager) Start() error { ...@@ -538,7 +541,7 @@ func (m *Manager) Start() error {
} }
} }
m.jobEventCh = make(chan interface{}, 100) m.jobEventCh = make(chan any, 1000)
m.eventBus.Subscribe(QueueJob, m.jobEventCh) m.eventBus.Subscribe(QueueJob, m.jobEventCh)
m.eventBus.Subscribe(JobReady, m.jobEventCh) m.eventBus.Subscribe(JobReady, m.jobEventCh)
...@@ -559,7 +562,7 @@ func (m *Manager) Start() error { ...@@ -559,7 +562,7 @@ func (m *Manager) Start() error {
} }
func safeClose(ch chan interface{}) (err error) { func safeClose(ch chan any) (err error) {
defer func() { defer func() {
if recover() != nil { if recover() != nil {
......
...@@ -200,6 +200,7 @@ type QueueStats struct { ...@@ -200,6 +200,7 @@ type QueueStats struct {
MaxAgeReachedCount int `json:"MaxAgeReachedCount"` MaxAgeReachedCount int `json:"MaxAgeReachedCount"`
PendingDependenciesCount int `json:"PendingDependenciesCount"` PendingDependenciesCount int `json:"PendingDependenciesCount"`
OldestProcessedJobAge time.Duration `json:"OldestProcessedJobAge"` OldestProcessedJobAge time.Duration `json:"OldestProcessedJobAge"`
EventQueueLength int `json:"EventQueueLength"`
} }
// Stats computes and returns the statistics of the job queue // Stats computes and returns the statistics of the job queue
...@@ -243,5 +244,6 @@ func (q *Queue) Stats() QueueStats { ...@@ -243,5 +244,6 @@ func (q *Queue) Stats() QueueStats {
MaxAgeReachedCount: maxAgeReachedCount, MaxAgeReachedCount: maxAgeReachedCount,
PendingDependenciesCount: totalPendingDeps, PendingDependenciesCount: totalPendingDeps,
OldestProcessedJobAge: oldestProcessedJobAge, OldestProcessedJobAge: oldestProcessedJobAge,
EventQueueLength: len(q.manger.jobEventCh),
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment