//go:build !runOnTask package jobqueue import ( "context" "fmt" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" "github.com/docker/docker/client" "github.com/docker/go-connections/nat" "github.com/stretchr/testify/assert" "gorm.io/driver/mysql" "gorm.io/gorm" "gorm.io/gorm/logger" "log" "net" "os" "runtime" "strconv" "sync" "testing" "time" ) func startTestMySQLDockerImageAndContainer(t *testing.T, port string, ctx context.Context) error { t.Helper() cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) if err != nil { return err } imageName := "mysql:8" reader, err := cli.ImagePull(ctx, imageName, types.ImagePullOptions{}) if err != nil { return err } // if debug image pull, comment out the following lines //_, _ = io.Copy(os.Stdout, reader) _ = reader hostConfig := &container.HostConfig{ PortBindings: nat.PortMap{ "3306/tcp": []nat.PortBinding{ { HostIP: DOCKER_TEST_HOST_IP, HostPort: port, }, }, // if you want to test the web interface, uncomment the following lines //"8025/tcp": []nat.PortBinding{ // { // HostIP: DOCKER_TEST_HOST_IP, // HostPort: "8025", // }, //}, }, } resp, err := cli.ContainerCreate(ctx, &container.Config{ Image: imageName, Env: []string{ "MYSQL_ROOT_PASSWORD=secret", "MYSQL_USER=user", "MYSQL_PASSWORD=secret", "MYSQL_DATABASE=test", }, }, hostConfig, nil, nil, "") if err != nil { return err } if err := cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil { return err } go func() { <-ctx.Done() timeout := 0 stopOptions := container.StopOptions{ Timeout: &timeout, Signal: "SIGKILL", } newCtx, _ := context.WithTimeout(context.Background(), 60*time.Second) if err := cli.ContainerStop(newCtx, resp.ID, stopOptions); err != nil { t.Errorf("ContainerStop returned error: %v", err) } if err := cli.ContainerRemove(newCtx, resp.ID, types.ContainerRemoveOptions{ Force: true, }); err != nil { t.Errorf("ContainerRemove returned error: %v", err) } }() statusCh, errCh := cli.ContainerWait(ctx, resp.ID, container.WaitConditionNotRunning) select { case err := <-errCh: if err != nil { // empty error means container exited normally (see container_wait.go) if err.Error() == "" { return nil } return err } case <-statusCh: } return nil } func TestWriteToDB(t *testing.T) { // if true, logging and port 3306 is used debugMode := false var err error ctb := context.Background() ctx, cancel := context.WithCancel(ctb) t.Cleanup(func() { cancel() time.Sleep(1 * time.Second) }) listener, err := net.Listen("tcp", DOCKER_TEST_HOST_IP+":0") if err != nil { t.Errorf("Unexpected error: %v", err) return } portAsInt := listener.Addr().(*net.TCPAddr).Port portAsString := fmt.Sprintf("%d", portAsInt) _ = listener.Close() useMySQLPort := os.Getenv("MYSQL_PORT") if debugMode || useMySQLPort != "" { if useMySQLPort == "" { useMySQLPort = "3306" } portAsString = useMySQLPort i, _ := strconv.Atoi(portAsString) portAsInt = i } done := make(chan bool) go func() { err = startTestMySQLDockerImageAndContainer(t, portAsString, ctx) if err != nil { t.Errorf("Unexpected error: %v", err) cancel() } done <- true }() waitCtx, waitCancel := context.WithTimeout(ctx, 60*time.Second) defer waitCancel() for { conn, err := net.DialTimeout("tcp", net.JoinHostPort(DOCKER_TEST_HOST_IP, portAsString), 1*time.Second) if err == nil { err = conn.Close() assert.Nil(t, err) break } select { case <-waitCtx.Done(): t.Error("Timeout waiting for container service") cancel() return default: time.Sleep(1 * time.Second) } } dsn := "user:secret@tcp(" + net.JoinHostPort(DOCKER_TEST_HOST_IP, portAsString) + ")/test?charset=utf8mb4&parseTime=True&loc=Local" counter := 0 var db *gorm.DB time.Sleep(20 * time.Second) var dbLogger logger.Interface if debugMode { dbLogger = logger.New( log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer logger.Config{ SlowThreshold: time.Second, // Slow SQL threshold LogLevel: logger.Info, // Log level Colorful: false, // Disable color }, ) } else { dbLogger = logger.Default.LogMode(logger.Silent) } for counter < 20 { db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{ Logger: dbLogger, }) if err == nil { break } counter++ time.Sleep(1 * time.Second) } if err != nil { t.Errorf("Unexpected error: %v", err) return } var wg sync.WaitGroup // run sub tests wg.Add(1) t.Run("TestWriteToDB", func(t *testing.T) { defer wg.Done() mgr := NewManager() mgr.SetDB(db) worker := NewLocalWorker(1) err := mgr.AddWorker(worker) assert.Nil(t, err) err = mgr.Start() assert.Nil(t, err) runner := &CounterRunnable{} job := NewJob[CounterResult]("job1", runner) scheduler := &InstantScheduler{} err = mgr.ScheduleJob(job, scheduler) assert.Nil(t, err) err = mgr.CancelJobSchedule("job1") assert.Nil(t, err) time.Sleep(1 * time.Second) scheduler2 := &InstantScheduler{} err = mgr.ScheduleJob(job, scheduler2) assert.Nil(t, err) err = mgr.CancelJobSchedule("job1") assert.Nil(t, err) time.Sleep(1 * time.Second) scheduler3 := &InstantScheduler{} err = mgr.ScheduleJob(job, scheduler3) assert.Nil(t, err) time.Sleep(1 * time.Second) if mgr.dbSaver == nil { t.Error("mgr.dbSaver == nil") return } time.Sleep(1 * time.Second) err = mgr.dbSaver.SaveJob(job) assert.Nil(t, err) runtime.Gosched() time.Sleep(1 * time.Second) err = mgr.CancelJobSchedule("job1") assert.Nil(t, err) runtime.Gosched() time.Sleep(1 * time.Second) scheduler4 := &InstantScheduler{} err = mgr.ScheduleJob(job, scheduler4) assert.Nil(t, err) runtime.Gosched() time.Sleep(1 * time.Second) err = mgr.dbSaver.SaveJob(job) assert.Nil(t, err) time.Sleep(2 * time.Second) err = mgr.CancelJobSchedule("job1") assert.Nil(t, err) tries := 10 for tries > 0 { var tmpJob JobPersistence if err := db.First(&tmpJob, "id = ?", "job1").Error; err == nil { break } tries-- time.Sleep(1 * time.Second) } assert.True(t, tries > 0) err = LoadJobsAndScheduleFromDatabase(db, mgr) assert.Nil(t, err) time.Sleep(1 * time.Second) err = mgr.dbSaver.SaveJob(job) assert.Nil(t, err) time.Sleep(1 * time.Second) }) wg.Wait() var jobPersistence JobPersistence var jobStats JobStats var jobLogs []JobLog // Assuming JobLog is your log model // Query JobPersistence if err := db.First(&jobPersistence, "id = ?", "job1").Error; err != nil { t.Errorf("Failed to query JobPersistence: %v", err) } else { // Validate the fields assert.Equal(t, JobID("job1"), jobPersistence.ID) } // Query JobStats if err := db.First(&jobStats, "job_id = ?", jobPersistence.ID).Error; err != nil { t.Errorf("Failed to query JobStats: %v", err) } else { // Validate the fields assert.Equal(t, jobPersistence.ID, jobStats.JobID) } // Query JobLogs if err := db.Find(&jobLogs, "job_id = ?", jobPersistence.ID).Error; err != nil { t.Errorf("Failed to query JobLogs: %v", err) } else { assert.NotEmpty(t, jobLogs) for _, l := range jobLogs { assert.Equal(t, jobPersistence.ID, l.JobID) } } cancel() select { case <-done: time.Sleep(1 * time.Second) case <-time.After(1 * time.Minute): t.Error("test hangs, timeout reached") } }