Something went wrong on our end
Select Git revision
datasource.mjs
-
Volker Schukai authoredVolker Schukai authored
database_test.go 7.41 KiB
//go:build !runOnTask
package jobqueue
import (
"context"
"fmt"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/docker/go-connections/nat"
"github.com/stretchr/testify/assert"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/logger"
"log"
"net"
"os"
"runtime"
"strconv"
"sync"
"testing"
"time"
)
func startTestMySQLDockerImageAndContainer(t *testing.T, port string, ctx context.Context) error {
t.Helper()
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
return err
}
imageName := "mysql:8"
reader, err := cli.ImagePull(ctx, imageName, types.ImagePullOptions{})
if err != nil {
return err
}
// if debug image pull, comment out the following lines
//_, _ = io.Copy(os.Stdout, reader)
_ = reader
hostConfig := &container.HostConfig{
PortBindings: nat.PortMap{
"3306/tcp": []nat.PortBinding{
{
HostIP: DOCKER_TEST_HOST_IP,
HostPort: port,
},
},
// if you want to test the web interface, uncomment the following lines
//"8025/tcp": []nat.PortBinding{
// {
// HostIP: DOCKER_TEST_HOST_IP,
// HostPort: "8025",
// },
//},
},
}
resp, err := cli.ContainerCreate(ctx, &container.Config{
Image: imageName,
Env: []string{
"MYSQL_ROOT_PASSWORD=secret",
"MYSQL_USER=user",
"MYSQL_PASSWORD=secret",
"MYSQL_DATABASE=test",
},
}, hostConfig, nil, nil, "")
if err != nil {
return err
}
if err := cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil {
return err
}
go func() {
<-ctx.Done()
timeout := 0
stopOptions := container.StopOptions{
Timeout: &timeout,
Signal: "SIGKILL",
}
newCtx, _ := context.WithTimeout(context.Background(), 60*time.Second)
if err := cli.ContainerStop(newCtx, resp.ID, stopOptions); err != nil {
t.Errorf("ContainerStop returned error: %v", err)
}
if err := cli.ContainerRemove(newCtx, resp.ID, types.ContainerRemoveOptions{
Force: true,
}); err != nil {
t.Errorf("ContainerRemove returned error: %v", err)
}
}()
statusCh, errCh := cli.ContainerWait(ctx, resp.ID, container.WaitConditionNotRunning)
select {
case err := <-errCh:
if err != nil {
// empty error means container exited normally (see container_wait.go)
if err.Error() == "" {
return nil
}
return err
}
case <-statusCh:
}
return nil
}
func TestWriteToDB(t *testing.T) {
// if true, logging and port 3306 is used
debugMode := false
var err error
ctb := context.Background()
ctx, cancel := context.WithCancel(ctb)
t.Cleanup(func() {
cancel()
time.Sleep(1 * time.Second)
})
listener, err := net.Listen("tcp", DOCKER_TEST_HOST_IP+":0")
if err != nil {
t.Errorf("Unexpected error: %v", err)
return
}
portAsInt := listener.Addr().(*net.TCPAddr).Port
portAsString := fmt.Sprintf("%d", portAsInt)
_ = listener.Close()
useMySQLPort := os.Getenv("MYSQL_PORT")
if debugMode || useMySQLPort != "" {
if useMySQLPort == "" {
useMySQLPort = "3306"
}
portAsString = useMySQLPort
i, _ := strconv.Atoi(portAsString)
portAsInt = i
}
done := make(chan bool)
go func() {
err = startTestMySQLDockerImageAndContainer(t, portAsString, ctx)
if err != nil {
t.Errorf("Unexpected error: %v", err)
cancel()
}
done <- true
}()
waitCtx, waitCancel := context.WithTimeout(ctx, 60*time.Second)
defer waitCancel()
for {
conn, err := net.DialTimeout("tcp", net.JoinHostPort(DOCKER_TEST_HOST_IP, portAsString), 1*time.Second)
if err == nil {
err = conn.Close()
assert.Nil(t, err)
break
}
select {
case <-waitCtx.Done():
t.Error("Timeout waiting for container service")
cancel()
return
default:
time.Sleep(1 * time.Second)
}
}
dsn := "user:secret@tcp(" + net.JoinHostPort(DOCKER_TEST_HOST_IP, portAsString) + ")/test?charset=utf8mb4&parseTime=True&loc=Local"
counter := 0
var db *gorm.DB
time.Sleep(20 * time.Second)
var dbLogger logger.Interface
if debugMode {
dbLogger = logger.New(
log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer
logger.Config{
SlowThreshold: time.Second, // Slow SQL threshold
LogLevel: logger.Info, // Log level
Colorful: false, // Disable color
},
)
} else {
dbLogger = logger.Default.LogMode(logger.Silent)
}
for counter < 20 {
db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
Logger: dbLogger,
})
if err == nil {
break
}
counter++
time.Sleep(1 * time.Second)
}
if err != nil {
t.Errorf("Unexpected error: %v", err)
return
}
var wg sync.WaitGroup
// run sub tests
wg.Add(1)
t.Run("TestWriteToDB", func(t *testing.T) {
defer wg.Done()
mgr := NewManager()
mgr.SetDB(db)
worker := NewLocalWorker(1)
err := mgr.AddWorker(worker)
assert.Nil(t, err)
err = mgr.Start()
assert.Nil(t, err)
runner := &CounterRunnable{}
job := NewJob[CounterResult]("job1", runner)
scheduler := &InstantScheduler{}
err = mgr.ScheduleJob(job, scheduler)
assert.Nil(t, err)
err = mgr.CancelJobSchedule("job1")
assert.Nil(t, err)
time.Sleep(1 * time.Second)
scheduler2 := &InstantScheduler{}
err = mgr.ScheduleJob(job, scheduler2)
assert.Nil(t, err)
err = mgr.CancelJobSchedule("job1")
assert.Nil(t, err)
time.Sleep(1 * time.Second)
scheduler3 := &InstantScheduler{}
err = mgr.ScheduleJob(job, scheduler3)
assert.Nil(t, err)
time.Sleep(1 * time.Second)
if mgr.dbSaver == nil {
t.Error("mgr.dbSaver == nil")
return
}
time.Sleep(1 * time.Second)
err = mgr.dbSaver.SaveJob(job)
assert.Nil(t, err)
runtime.Gosched()
time.Sleep(1 * time.Second)
err = mgr.CancelJobSchedule("job1")
assert.Nil(t, err)
runtime.Gosched()
time.Sleep(1 * time.Second)
scheduler4 := &InstantScheduler{}
err = mgr.ScheduleJob(job, scheduler4)
assert.Nil(t, err)
runtime.Gosched()
time.Sleep(1 * time.Second)
err = mgr.dbSaver.SaveJob(job)
assert.Nil(t, err)
time.Sleep(2 * time.Second)
err = mgr.CancelJobSchedule("job1")
assert.Nil(t, err)
tries := 10
for tries > 0 {
var tmpJob JobPersistence
if err := db.First(&tmpJob, "id = ?", "job1").Error; err == nil {
break
}
tries--
time.Sleep(1 * time.Second)
}
assert.True(t, tries > 0)
err = LoadJobsAndScheduleFromDatabase(db, mgr)
assert.Nil(t, err)
time.Sleep(1 * time.Second)
err = mgr.dbSaver.SaveJob(job)
assert.Nil(t, err)
time.Sleep(1 * time.Second)
})
wg.Wait()
var jobPersistence JobPersistence
var jobStats JobStats
var jobLogs []JobLog // Assuming JobLog is your log model
// Query JobPersistence
if err := db.First(&jobPersistence, "id = ?", "job1").Error; err != nil {
t.Errorf("Failed to query JobPersistence: %v", err)
} else {
// Validate the fields
assert.Equal(t, JobID("job1"), jobPersistence.ID)
}
// Query JobStats
if err := db.First(&jobStats, "job_id = ?", jobPersistence.ID).Error; err != nil {
t.Errorf("Failed to query JobStats: %v", err)
} else {
// Validate the fields
assert.Equal(t, jobPersistence.ID, jobStats.JobID)
}
// Query JobLogs
if err := db.Find(&jobLogs, "job_id = ?", jobPersistence.ID).Error; err != nil {
t.Errorf("Failed to query JobLogs: %v", err)
} else {
assert.NotEmpty(t, jobLogs)
for _, l := range jobLogs {
assert.Equal(t, jobPersistence.ID, l.JobID)
}
}
cancel()
select {
case <-done:
time.Sleep(1 * time.Second)
case <-time.After(1 * time.Minute):
t.Error("test hangs, timeout reached")
}
}