From bd92d57d8d411156821ec8cd89c94d76703fff3a Mon Sep 17 00:00:00 2001
From: Volker Schukai <volker.schukai@schukai.com>
Date: Fri, 13 Dec 2024 00:54:15 +0100
Subject: [PATCH] fix: manger stats

---
 .idea/workspace.xml |  5 ++--
 manager.go          | 59 +++++++++++++++++++++++++++++++++++++++++----
 queue.go            | 56 ------------------------------------------
 3 files changed, 56 insertions(+), 64 deletions(-)

diff --git a/.idea/workspace.xml b/.idea/workspace.xml
index 88095bf..b2407e4 100644
--- a/.idea/workspace.xml
+++ b/.idea/workspace.xml
@@ -6,8 +6,6 @@
   <component name="ChangeListManager">
     <list default="true" id="9979eb22-471e-4f2f-b624-fd3edb5e8c6e" name="Changes" comment="">
       <change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/event-bus.go" beforeDir="false" afterPath="$PROJECT_DIR$/event-bus.go" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/go.mod" beforeDir="false" afterPath="$PROJECT_DIR$/go.mod" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/manager.go" beforeDir="false" afterPath="$PROJECT_DIR$/manager.go" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/queue.go" beforeDir="false" afterPath="$PROJECT_DIR$/queue.go" afterDir="false" />
     </list>
@@ -260,7 +258,8 @@
       <workItem from="1734012566872" duration="4517000" />
       <workItem from="1734022155342" duration="424000" />
       <workItem from="1734024671338" duration="1103000" />
-      <workItem from="1734042130391" duration="498000" />
+      <workItem from="1734042130391" duration="922000" />
+      <workItem from="1734047047039" duration="578000" />
     </task>
     <servers />
   </component>
diff --git a/manager.go b/manager.go
index d0b9775..e84f4b2 100644
--- a/manager.go
+++ b/manager.go
@@ -60,11 +60,6 @@ func NewManager() *Manager {
 
 }
 
-func (m *Manager) GetQueueStats() QueueStats {
-	m.mu.Lock()
-	defer m.mu.Unlock()
-	return m.queue.Stats()
-}
 
 // GetEventBus returns the event bus
 func (m *Manager) GetEventBus() *EventBus {
@@ -798,3 +793,57 @@ func (m *Manager) handleJobEvents() {
 		}
 	}
 }
+
+type ManagerStats struct {
+	ReadyQueueLength         int            `json:"ReadyQueueLength"`
+	JobMapLength             int            `json:"JobMapLength"`
+	PendingDeps              int            `json:"PendingDeps"`
+	ProcessedJobsCount       int            `json:"ProcessedJobsCount"`
+	PendingDependenciesCount int            `json:"PendingDependenciesCount"`
+	EventQueueLength         int            `json:"EventQueueLength"`
+	Subscriber               SubscriberStat `json:"Subscriber"`
+}
+type SubscriberStat struct {
+	Count    int `json:"Count"`
+	Added    int `json:"Added"`
+	Ready    int `json:"Ready"`
+	Queued   int `json:"Queue"`
+	Finished int `json:"Finished"`
+	Done     int `json:"Done"`
+}
+
+// Stats computes and returns the statistics of the job queue
+func (m *Manager) Stats() ManagerStats {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+
+	readyQueueLength := len(m.queue.readyQueue)
+	jobMapLength := len(m.activeJobs)
+	pendingDeps := len(m.queue.pendingDependencies)
+	processedJobsCount := len(m.queue.processedJobs)
+	pendingDependenciesCount := len(m.queue.pendingDependencies)
+	eventQueueLength := len(m.jobEventCh)
+	subscriberCount := len(m.eventBus.subscribers)
+	subscriberJobAdded := len(m.eventBus.subscribers[JobAdded])
+	subscriberJobReady := len(m.eventBus.subscribers[JobReady])
+	subscriberQueueJob := len(m.eventBus.subscribers[QueueJob])
+	subscriberJobFinished := len(m.eventBus.subscribers[JobFinished])
+	subscriberJobDone := len(m.eventBus.subscribers[JobDone)
+
+	return ManagerStats{
+		ReadyQueueLength:         readyQueueLength,
+		JobMapLength:             jobMapLength,
+		PendingDeps:              pendingDeps,
+		ProcessedJobsCount:       processedJobsCount,
+		PendingDependenciesCount: pendingDependenciesCount,
+		EventQueueLength:         eventQueueLength,
+		Subscriber: SubscriberStat{
+			Count:    subscriberCount,
+			Added:    subscriberJobAdded,
+			Ready:    subscriberJobReady,
+			Queued:   subscriberQueueJob,
+			Finished: subscriberJobFinished,
+			Done:     subscriberJobDone,
+		},
+	}
+}
diff --git a/queue.go b/queue.go
index 900d46b..7bf6a35 100644
--- a/queue.go
+++ b/queue.go
@@ -191,59 +191,3 @@ func removeJobID(deps []JobID, id JobID) []JobID {
 	}
 	return deps
 }
-
-type QueueStats struct {
-	ReadyQueueLength         int           `json:"ReadyQueueLength"`
-	JobMapLength             int           `json:"JobMapLength"`
-	PendingDeps              int           `json:"PendingDeps"`
-	ProcessedJobsCount       int           `json:"ProcessedJobsCount"`
-	MaxAgeReachedCount       int           `json:"MaxAgeReachedCount"`
-	PendingDependenciesCount int           `json:"PendingDependenciesCount"`
-	OldestProcessedJobAge    time.Duration `json:"OldestProcessedJobAge"`
-	EventQueueLength         int           `json:"EventQueueLength"`
-}
-
-// Stats computes and returns the statistics of the job queue
-func (q *Queue) Stats() QueueStats {
-	q.mu.Lock()
-	defer q.mu.Unlock()
-
-	// Calculate the oldest processed job age
-	var oldestProcessedJobAge time.Duration
-	if len(q.processedJobs) > 0 {
-		oldestAge := time.Now().Sub(q.processedJobs[0].ProcessedTime)
-		for _, job := range q.processedJobs {
-			jobAge := time.Now().Sub(job.ProcessedTime)
-			if jobAge > oldestAge {
-				oldestAge = jobAge
-			}
-		}
-		oldestProcessedJobAge = oldestAge
-	}
-
-	// Count how many jobs have reached MaxAge
-	maxAgeReachedCount := 0
-	currentTime := time.Now()
-	for _, jobInfo := range q.processedJobs {
-		if currentTime.Sub(jobInfo.ProcessedTime) > MaxAge {
-			maxAgeReachedCount++
-		}
-	}
-
-	// Count all pending dependencies
-	var totalPendingDeps = 0
-	for _, deps := range q.pendingDependencies {
-		totalPendingDeps += len(deps)
-	}
-
-	return QueueStats{
-		ReadyQueueLength:         len(q.readyQueue),
-		JobMapLength:             len(q.jobMap),
-		PendingDeps:              len(q.pendingDependencies),
-		ProcessedJobsCount:       len(q.processedJobs),
-		MaxAgeReachedCount:       maxAgeReachedCount,
-		PendingDependenciesCount: totalPendingDeps,
-		OldestProcessedJobAge:    oldestProcessedJobAge,
-		EventQueueLength:         len(q.manger.jobEventCh),
-	}
-}
-- 
GitLab