// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0

package jobqueue

import (
	"fmt"
	"github.com/google/uuid"
	"sync"
	"time"
)

type EventName string

func (e EventName) String() string {
	return string(e)
}

const (
	JobAdded    EventName = "JobAdded"
	JobReady    EventName = "JobReady"
	QueueJob    EventName = "QueueJob"
	JobFinished EventName = "JobFinished"
	JobDone     EventName = "JobDone"
)

type MessageID string

func (m MessageID) String() string {
	return string(m)
}

func NewMessageID() MessageID {
	return MessageID(uuid.New().String())
}

type Event struct {
	Name      EventName
	Data      any
	MessageID MessageID
}

// EventBus is a simple event bus
type EventBus struct {
	subscribers  map[EventName][]chan interface{}
	publishErr   map[MessageID]error
	mu           sync.RWMutex
	shutdownChan chan struct{}
	wg           sync.WaitGroup
}

// NewEventBus creates a new event bus
func NewEventBus() *EventBus {
	Info("EventBus created")
	return &EventBus{
		subscribers:  make(map[EventName][]chan interface{}),
		publishErr:   make(map[MessageID]error),
		shutdownChan: make(chan struct{}),
	}
}

func (eb *EventBus) Shutdown() {
	close(eb.shutdownChan)
	eb.wg.Wait()
	Info("EventBus shutdown")
}

// Subscribe adds a channel to the subscribers list
func (eb *EventBus) Subscribe(name EventName, ch chan interface{}) {
	eb.mu.Lock()
	defer eb.mu.Unlock()

	if _, found := eb.subscribers[name]; !found {
		eb.subscribers[name] = []chan interface{}{}
	}
	eb.subscribers[name] = append(eb.subscribers[name], ch)

}

// Unsubscribe removes a channel from the subscribers list
func (eb *EventBus) Unsubscribe(name EventName, ch chan interface{}) {
	eb.mu.Lock()
	defer eb.mu.Unlock()

	if channels, found := eb.subscribers[name]; found {
		for i := range channels {
			if channels[i] == ch {
				eb.subscribers[name] = append(channels[:i], channels[i+1:]...)

				if len(eb.subscribers[name]) == 0 {
					delete(eb.subscribers, name)
				}

				break
			}
		}
	}
}

func (eb *EventBus) GetPublishError(msgID MessageID) error {
	eb.mu.RLock()
	defer eb.mu.RUnlock()
	return eb.publishErr[msgID]
}

func (eb *EventBus) SetPublishError(msgID MessageID, err error) {
	eb.mu.Lock()
	defer eb.mu.Unlock()
	eb.publishErr[msgID] = err
}

// Publish publishes an event to all subscribers
func (eb *EventBus) Publish(name EventName, data any) {
	eb.mu.RLock()
	defer eb.mu.RUnlock()

	Info("EventBus: publishing event " + name.String())

	select {
	case <-eb.shutdownChan:
		return
	default:

	}

	if channels, found := eb.subscribers[name]; found {
		for _, ch := range channels {
			eb.wg.Add(1)
			go func(ch chan interface{}) {
				defer eb.wg.Done()

				msgID := NewMessageID()

				defer func() {
					if r := recover(); r != nil {
						Error("EventBus: publish panic: %v", r)
						eb.SetPublishError(msgID, fmt.Errorf("publish panic: %v", r))
					}
				}()

				select {
				case ch <- Event{
					Name:      name,
					Data:      data,
					MessageID: msgID,
				}:

				case <-time.After(time.Second * 1):
					eb.SetPublishError(msgID, fmt.Errorf("publish timeout"))
				}

			}(ch)
		}
	}
}