// Copyright 2023 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0

package jobqueue

import "fmt"

// EventScheduler is a scheduler that schedules a job when an event is received
type EventScheduler struct {
	Event    EventName
	EventBus *EventBus
	jobs     map[JobID]StopChan
}

func (s *EventScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
	ch := make(chan interface{})

	if s.EventBus != nil {
		s.EventBus.Subscribe(s.Event, ch)
	} else {
		eventBus.Subscribe(s.Event, ch)
	}

	if s.jobs == nil {
		s.jobs = make(map[JobID]StopChan)
	}

	id := job.GetID()
	if _, ok := s.jobs[id]; ok {
		return fmt.Errorf("%w: job %s already scheduled", ErrJobAlreadyScheduled, id)
	}

	stopChan := make(StopChan)
	s.jobs[id] = stopChan

	go func() {
		for {
			select {
			case <-ch:
				if !job.IsPaused() {
					Info("EventScheduler: received event %s, scheduling job %s", s.Event, id)
					eventBus.Publish(QueueJob, job)
				}
			case <-stopChan:
				eventBus.Unsubscribe(s.Event, ch)
				return
			}
		}
	}()
	return nil
}

func (s *EventScheduler) GetType() string {
	return "Event"
}

func (s *EventScheduler) IsAdHoc() bool {
	return false
}

func (s *EventScheduler) Cancel(id JobID) error {
	if s.jobs == nil {
		return nil
	}

	if stopChan, ok := s.jobs[id]; ok {
		select {
		case stopChan <- true:
		default:
		}
		delete(s.jobs, id)
	}

	return nil

}

func (s *EventScheduler) CancelAll() error {
	if s.jobs == nil {
		return nil
	}

	for _, stopChan := range s.jobs {
		select {
		case stopChan <- true:
		default:
		}
	}

	s.jobs = nil
	return nil
}

func (s *EventScheduler) JobExists(id JobID) bool {
	if s.jobs == nil {
		return false
	}

	_, ok := s.jobs[id]
	return ok
}

func (s *EventScheduler) GetPersistence() SchedulerPersistence {
	return SchedulerPersistence{
		Type:  s.GetType(),
		Event: s.Event,
	}
}