// Copyright 2023 schukai GmbH // SPDX-License-Identifier: AGPL-3.0 package jobqueue import ( "sync" "testing" "time" ) func TestSubscribeAndPublish(t *testing.T) { eb := NewEventBus() jobAddedCh := make(chan interface{}, 1) defer close(jobAddedCh) eb.Subscribe(JobAdded, jobAddedCh) jobData := "New Job Data" eb.Publish(JobAdded, jobData) select { case receivedData := <-jobAddedCh: rd := receivedData.(Event) if rd.Data != jobData { t.Errorf("Received data %v, want %v", receivedData, jobData) } case <-time.After(1 * time.Second): t.Error("Timed out waiting for published event") } } func TestUnsubscribe(t *testing.T) { eb := NewEventBus() jobAddedCh := make(chan interface{}, 1) defer close(jobAddedCh) eb.Subscribe(JobAdded, jobAddedCh) eb.Unsubscribe(JobAdded, jobAddedCh) jobData := "New Job Data" eb.Publish(JobAdded, jobData) select { case <-jobAddedCh: t.Error("Received data after unsubscribing") case <-time.After(1 * time.Second): // Test passes if it times out (no data received) } } func TestMultipleSubscribers(t *testing.T) { eb := NewEventBus() jobAddedCh1 := make(chan interface{}, 1) jobAddedCh2 := make(chan interface{}, 1) eb.Subscribe(JobAdded, jobAddedCh1) eb.Subscribe(JobAdded, jobAddedCh2) defer close(jobAddedCh1) defer close(jobAddedCh2) jobData := "New Job Data" eb.Publish(JobAdded, jobData) var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() select { case receivedData := <-jobAddedCh1: rd := receivedData.(Event) if rd.Data != jobData { t.Errorf("Received data %v, want %v", receivedData, jobData) } case <-time.After(1 * time.Second): t.Error("Timed out waiting for published event on channel 1") } }() go func() { defer wg.Done() select { case receivedData := <-jobAddedCh2: rd := receivedData.(Event) if rd.Data != jobData { t.Errorf("Received data %v, want %v", receivedData, jobData) } case <-time.After(1 * time.Second): t.Error("Timed out waiting for published event on channel 2") } }() wg.Wait() }