// Copyright 2023 schukai GmbH // SPDX-License-Identifier: AGPL-3.0 package jobqueue import ( "fmt" "github.com/google/uuid" "sync" "time" ) type EventName string func (e EventName) String() string { return string(e) } const ( JobAdded EventName = "JobAdded" JobReady EventName = "JobReady" QueueJob EventName = "QueueJob" JobFinished EventName = "JobFinished" JobDone EventName = "JobDone" ) type MessageID string func (m MessageID) String() string { return string(m) } func NewMessageID() MessageID { return MessageID(uuid.New().String()) } type Event struct { Name EventName Data any MessageID MessageID } // EventBus is a simple event bus type EventBus struct { subscribers map[EventName][]chan interface{} publishErr map[MessageID]error mu sync.RWMutex shutdownChan chan struct{} wg sync.WaitGroup } // NewEventBus creates a new event bus func NewEventBus() *EventBus { Info("EventBus created") return &EventBus{ subscribers: make(map[EventName][]chan interface{}), publishErr: make(map[MessageID]error), shutdownChan: make(chan struct{}), } } func (eb *EventBus) Shutdown() { close(eb.shutdownChan) eb.wg.Wait() Info("EventBus shutdown") } // Subscribe adds a channel to the subscribers list func (eb *EventBus) Subscribe(name EventName, ch chan interface{}) { eb.mu.Lock() defer eb.mu.Unlock() if _, found := eb.subscribers[name]; !found { eb.subscribers[name] = []chan interface{}{} } eb.subscribers[name] = append(eb.subscribers[name], ch) } // Unsubscribe removes a channel from the subscribers list func (eb *EventBus) Unsubscribe(name EventName, ch chan interface{}) { eb.mu.Lock() defer eb.mu.Unlock() if channels, found := eb.subscribers[name]; found { for i := range channels { if channels[i] == ch { eb.subscribers[name] = append(channels[:i], channels[i+1:]...) if len(eb.subscribers[name]) == 0 { delete(eb.subscribers, name) } break } } } } func (eb *EventBus) GetPublishError(msgID MessageID) error { eb.mu.RLock() defer eb.mu.RUnlock() return eb.publishErr[msgID] } func (eb *EventBus) SetPublishError(msgID MessageID, err error) { eb.mu.Lock() defer eb.mu.Unlock() eb.publishErr[msgID] = err } // Publish publishes an event to all subscribers func (eb *EventBus) Publish(name EventName, data any) { eb.mu.RLock() defer eb.mu.RUnlock() Info("EventBus: publishing event " + name.String()) select { case <-eb.shutdownChan: return default: } if channels, found := eb.subscribers[name]; found { for _, ch := range channels { eb.wg.Add(1) go func(ch chan interface{}) { defer eb.wg.Done() msgID := NewMessageID() defer func() { if r := recover(); r != nil { Error("EventBus: publish panic: %v", r) eb.SetPublishError(msgID, fmt.Errorf("publish panic: %v", r)) } }() select { case ch <- Event{ Name: name, Data: data, MessageID: msgID, }: case <-time.After(time.Second * 1): eb.SetPublishError(msgID, fmt.Errorf("publish timeout")) } }(ch) } } }