Skip to content
Snippets Groups Projects
Verified Commit bd92d57d authored by Volker Schukai's avatar Volker Schukai :alien:
Browse files

fix: manger stats

parent 08909157
No related branches found
No related tags found
No related merge requests found
......@@ -6,8 +6,6 @@
<component name="ChangeListManager">
<list default="true" id="9979eb22-471e-4f2f-b624-fd3edb5e8c6e" name="Changes" comment="">
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/event-bus.go" beforeDir="false" afterPath="$PROJECT_DIR$/event-bus.go" afterDir="false" />
<change beforePath="$PROJECT_DIR$/go.mod" beforeDir="false" afterPath="$PROJECT_DIR$/go.mod" afterDir="false" />
<change beforePath="$PROJECT_DIR$/manager.go" beforeDir="false" afterPath="$PROJECT_DIR$/manager.go" afterDir="false" />
<change beforePath="$PROJECT_DIR$/queue.go" beforeDir="false" afterPath="$PROJECT_DIR$/queue.go" afterDir="false" />
</list>
......@@ -260,7 +258,8 @@
<workItem from="1734012566872" duration="4517000" />
<workItem from="1734022155342" duration="424000" />
<workItem from="1734024671338" duration="1103000" />
<workItem from="1734042130391" duration="498000" />
<workItem from="1734042130391" duration="922000" />
<workItem from="1734047047039" duration="578000" />
</task>
<servers />
</component>
......
......@@ -60,11 +60,6 @@ func NewManager() *Manager {
}
func (m *Manager) GetQueueStats() QueueStats {
m.mu.Lock()
defer m.mu.Unlock()
return m.queue.Stats()
}
// GetEventBus returns the event bus
func (m *Manager) GetEventBus() *EventBus {
......@@ -798,3 +793,57 @@ func (m *Manager) handleJobEvents() {
}
}
}
type ManagerStats struct {
ReadyQueueLength int `json:"ReadyQueueLength"`
JobMapLength int `json:"JobMapLength"`
PendingDeps int `json:"PendingDeps"`
ProcessedJobsCount int `json:"ProcessedJobsCount"`
PendingDependenciesCount int `json:"PendingDependenciesCount"`
EventQueueLength int `json:"EventQueueLength"`
Subscriber SubscriberStat `json:"Subscriber"`
}
type SubscriberStat struct {
Count int `json:"Count"`
Added int `json:"Added"`
Ready int `json:"Ready"`
Queued int `json:"Queue"`
Finished int `json:"Finished"`
Done int `json:"Done"`
}
// Stats computes and returns the statistics of the job queue
func (m *Manager) Stats() ManagerStats {
m.mu.Lock()
defer m.mu.Unlock()
readyQueueLength := len(m.queue.readyQueue)
jobMapLength := len(m.activeJobs)
pendingDeps := len(m.queue.pendingDependencies)
processedJobsCount := len(m.queue.processedJobs)
pendingDependenciesCount := len(m.queue.pendingDependencies)
eventQueueLength := len(m.jobEventCh)
subscriberCount := len(m.eventBus.subscribers)
subscriberJobAdded := len(m.eventBus.subscribers[JobAdded])
subscriberJobReady := len(m.eventBus.subscribers[JobReady])
subscriberQueueJob := len(m.eventBus.subscribers[QueueJob])
subscriberJobFinished := len(m.eventBus.subscribers[JobFinished])
subscriberJobDone := len(m.eventBus.subscribers[JobDone)
return ManagerStats{
ReadyQueueLength: readyQueueLength,
JobMapLength: jobMapLength,
PendingDeps: pendingDeps,
ProcessedJobsCount: processedJobsCount,
PendingDependenciesCount: pendingDependenciesCount,
EventQueueLength: eventQueueLength,
Subscriber: SubscriberStat{
Count: subscriberCount,
Added: subscriberJobAdded,
Ready: subscriberJobReady,
Queued: subscriberQueueJob,
Finished: subscriberJobFinished,
Done: subscriberJobDone,
},
}
}
......@@ -191,59 +191,3 @@ func removeJobID(deps []JobID, id JobID) []JobID {
}
return deps
}
type QueueStats struct {
ReadyQueueLength int `json:"ReadyQueueLength"`
JobMapLength int `json:"JobMapLength"`
PendingDeps int `json:"PendingDeps"`
ProcessedJobsCount int `json:"ProcessedJobsCount"`
MaxAgeReachedCount int `json:"MaxAgeReachedCount"`
PendingDependenciesCount int `json:"PendingDependenciesCount"`
OldestProcessedJobAge time.Duration `json:"OldestProcessedJobAge"`
EventQueueLength int `json:"EventQueueLength"`
}
// Stats computes and returns the statistics of the job queue
func (q *Queue) Stats() QueueStats {
q.mu.Lock()
defer q.mu.Unlock()
// Calculate the oldest processed job age
var oldestProcessedJobAge time.Duration
if len(q.processedJobs) > 0 {
oldestAge := time.Now().Sub(q.processedJobs[0].ProcessedTime)
for _, job := range q.processedJobs {
jobAge := time.Now().Sub(job.ProcessedTime)
if jobAge > oldestAge {
oldestAge = jobAge
}
}
oldestProcessedJobAge = oldestAge
}
// Count how many jobs have reached MaxAge
maxAgeReachedCount := 0
currentTime := time.Now()
for _, jobInfo := range q.processedJobs {
if currentTime.Sub(jobInfo.ProcessedTime) > MaxAge {
maxAgeReachedCount++
}
}
// Count all pending dependencies
var totalPendingDeps = 0
for _, deps := range q.pendingDependencies {
totalPendingDeps += len(deps)
}
return QueueStats{
ReadyQueueLength: len(q.readyQueue),
JobMapLength: len(q.jobMap),
PendingDeps: len(q.pendingDependencies),
ProcessedJobsCount: len(q.processedJobs),
MaxAgeReachedCount: maxAgeReachedCount,
PendingDependenciesCount: totalPendingDeps,
OldestProcessedJobAge: oldestProcessedJobAge,
EventQueueLength: len(q.manger.jobEventCh),
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment