Skip to content
Snippets Groups Projects
Select Git revision
  • 735375b2dd2079722cad506ebecce5ecbec097f3
  • master default protected
  • v1.23.2
  • v1.23.1
  • v1.23.0
  • v1.22.0
  • v1.21.1
  • v1.21.0
  • v1.20.3
  • v1.20.2
  • v1.20.1
  • v1.20.0
  • v1.19.4
  • v1.19.3
  • v1.19.2
  • v1.19.1
  • v1.19.0
  • v1.18.2
  • v1.18.1
  • v1.18.0
  • v1.17.0
  • v1.16.1
22 results

database_test.go

Blame
  • database_test.go 7.41 KiB
    //go:build !runOnTask
    
    package jobqueue
    
    import (
    	"context"
    	"fmt"
    	"github.com/docker/docker/api/types"
    	"github.com/docker/docker/api/types/container"
    	"github.com/docker/docker/client"
    	"github.com/docker/go-connections/nat"
    	"github.com/stretchr/testify/assert"
    	"gorm.io/driver/mysql"
    	"gorm.io/gorm"
    	"gorm.io/gorm/logger"
    	"log"
    	"net"
    	"os"
    	"runtime"
    	"strconv"
    	"sync"
    	"testing"
    	"time"
    )
    
    func startTestMySQLDockerImageAndContainer(t *testing.T, port string, ctx context.Context) error {
    	t.Helper()
    
    	cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
    	if err != nil {
    		return err
    	}
    
    	imageName := "mysql:8"
    
    	reader, err := cli.ImagePull(ctx, imageName, types.ImagePullOptions{})
    	if err != nil {
    		return err
    	}
    
    	// if debug image pull, comment out the following lines
    	//_, _ = io.Copy(os.Stdout, reader)
    	_ = reader
    
    	hostConfig := &container.HostConfig{
    		PortBindings: nat.PortMap{
    			"3306/tcp": []nat.PortBinding{
    				{
    					HostIP:   DOCKER_TEST_HOST_IP,
    					HostPort: port,
    				},
    			},
    			// if you want to test the web interface, uncomment the following lines
    			//"8025/tcp": []nat.PortBinding{
    			//	{
    			//		HostIP:   DOCKER_TEST_HOST_IP,
    			//		HostPort: "8025",
    			//	},
    			//},
    		},
    	}
    
    	resp, err := cli.ContainerCreate(ctx, &container.Config{
    		Image: imageName,
    		Env: []string{
    			"MYSQL_ROOT_PASSWORD=secret",
    			"MYSQL_USER=user",
    			"MYSQL_PASSWORD=secret",
    			"MYSQL_DATABASE=test",
    		},
    	}, hostConfig, nil, nil, "")
    
    	if err != nil {
    		return err
    	}
    
    	if err := cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil {
    		return err
    	}
    
    	go func() {
    		<-ctx.Done()
    
    		timeout := 0
    		stopOptions := container.StopOptions{
    			Timeout: &timeout,
    			Signal:  "SIGKILL",
    		}
    		newCtx, _ := context.WithTimeout(context.Background(), 60*time.Second)
    		if err := cli.ContainerStop(newCtx, resp.ID, stopOptions); err != nil {
    			t.Errorf("ContainerStop returned error: %v", err)
    		}
    		if err := cli.ContainerRemove(newCtx, resp.ID, types.ContainerRemoveOptions{
    			Force: true,
    		}); err != nil {
    			t.Errorf("ContainerRemove returned error: %v", err)
    		}
    
    	}()
    
    	statusCh, errCh := cli.ContainerWait(ctx, resp.ID, container.WaitConditionNotRunning)
    	select {
    	case err := <-errCh:
    		if err != nil {
    			// empty error means container exited normally (see container_wait.go)
    			if err.Error() == "" {
    				return nil
    			}
    
    			return err
    		}
    	case <-statusCh:
    
    	}
    
    	return nil
    }
    
    func TestWriteToDB(t *testing.T) {
    
    	// if true, logging and port 3306 is used
    	debugMode := false
    
    	var err error
    
    	ctb := context.Background()
    	ctx, cancel := context.WithCancel(ctb)
    	t.Cleanup(func() {
    		cancel()
    		time.Sleep(1 * time.Second)
    	})
    
    	listener, err := net.Listen("tcp", DOCKER_TEST_HOST_IP+":0")
    	if err != nil {
    		t.Errorf("Unexpected error: %v", err)
    		return
    	}
    	portAsInt := listener.Addr().(*net.TCPAddr).Port
    	portAsString := fmt.Sprintf("%d", portAsInt)
    	_ = listener.Close()
    
    	useMySQLPort := os.Getenv("MYSQL_PORT")
    	if debugMode || useMySQLPort != "" {
    		if useMySQLPort == "" {
    			useMySQLPort = "3306"
    		}
    		portAsString = useMySQLPort
    		i, _ := strconv.Atoi(portAsString)
    
    		portAsInt = i
    	}
    
    	done := make(chan bool)
    	go func() {
    		err = startTestMySQLDockerImageAndContainer(t, portAsString, ctx)
    		if err != nil {
    			t.Errorf("Unexpected error: %v", err)
    			cancel()
    		}
    		done <- true
    	}()
    
    	waitCtx, waitCancel := context.WithTimeout(ctx, 60*time.Second)
    	defer waitCancel()
    	for {
    		conn, err := net.DialTimeout("tcp", net.JoinHostPort(DOCKER_TEST_HOST_IP, portAsString), 1*time.Second)
    		if err == nil {
    			err = conn.Close()
    			assert.Nil(t, err)
    			break
    		}
    		select {
    		case <-waitCtx.Done():
    			t.Error("Timeout waiting for container service")
    			cancel()
    			return
    		default:
    			time.Sleep(1 * time.Second)
    		}
    	}
    
    	dsn := "user:secret@tcp(" + net.JoinHostPort(DOCKER_TEST_HOST_IP, portAsString) + ")/test?charset=utf8mb4&parseTime=True&loc=Local"
    
    	counter := 0
    	var db *gorm.DB
    
    	time.Sleep(20 * time.Second)
    
    	var dbLogger logger.Interface
    
    	if debugMode {
    		dbLogger = logger.New(
    			log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer
    			logger.Config{
    				SlowThreshold: time.Second, // Slow SQL threshold
    				LogLevel:      logger.Info, // Log level
    				Colorful:      false,       // Disable color
    			},
    		)
    	} else {
    
    		dbLogger = logger.Default.LogMode(logger.Silent)
    	}
    
    	for counter < 20 {
    
    		db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
    			Logger: dbLogger,
    		})
    
    		if err == nil {
    			break
    		}
    
    		counter++
    		time.Sleep(1 * time.Second)
    	}
    
    	if err != nil {
    		t.Errorf("Unexpected error: %v", err)
    		return
    	}
    
    	var wg sync.WaitGroup
    
    	// run sub tests
    	wg.Add(1)
    	t.Run("TestWriteToDB", func(t *testing.T) {
    
    		defer wg.Done()
    
    		mgr := NewManager()
    		mgr.SetDB(db)
    		worker := NewLocalWorker(1)
    		err := mgr.AddWorker(worker)
    		assert.Nil(t, err)
    
    		err = mgr.Start()
    		assert.Nil(t, err)
    
    		runner := &CounterRunnable{}
    		job := NewJob[CounterResult]("job1", runner)
    
    		scheduler := &InstantScheduler{}
    		err = mgr.ScheduleJob(job, scheduler)
    		assert.Nil(t, err)
    
    		err = mgr.CancelJobSchedule("job1")
    		assert.Nil(t, err)
    
    		time.Sleep(1 * time.Second)
    
    		scheduler2 := &InstantScheduler{}
    		err = mgr.ScheduleJob(job, scheduler2)
    		assert.Nil(t, err)
    
    		err = mgr.CancelJobSchedule("job1")
    		assert.Nil(t, err)
    
    		time.Sleep(1 * time.Second)
    
    		scheduler3 := &InstantScheduler{}
    		err = mgr.ScheduleJob(job, scheduler3)
    		assert.Nil(t, err)
    
    		time.Sleep(1 * time.Second)
    
    		if mgr.dbSaver == nil {
    			t.Error("mgr.dbSaver == nil")
    			return
    		}
    
    		time.Sleep(1 * time.Second)
    
    		err = mgr.dbSaver.SaveJob(job)
    		assert.Nil(t, err)
    
    		runtime.Gosched()
    		time.Sleep(1 * time.Second)
    		err = mgr.CancelJobSchedule("job1")
    		assert.Nil(t, err)
    
    		runtime.Gosched()
    		time.Sleep(1 * time.Second)
    
    		scheduler4 := &InstantScheduler{}
    		err = mgr.ScheduleJob(job, scheduler4)
    		assert.Nil(t, err)
    
    		runtime.Gosched()
    		time.Sleep(1 * time.Second)
    
    		err = mgr.dbSaver.SaveJob(job)
    		assert.Nil(t, err)
    
    		time.Sleep(2 * time.Second)
    		err = mgr.CancelJobSchedule("job1")
    		assert.Nil(t, err)
    
    		tries := 10
    		for tries > 0 {
    
    			var tmpJob JobPersistence
    
    			if err := db.First(&tmpJob, "id = ?", "job1").Error; err == nil {
    				break
    			}
    
    			tries--
    			time.Sleep(1 * time.Second)
    
    		}
    
    		assert.True(t, tries > 0)
    
    		err = LoadJobsAndScheduleFromDatabase(db, mgr)
    		assert.Nil(t, err)
    
    		time.Sleep(1 * time.Second)
    
    		err = mgr.dbSaver.SaveJob(job)
    		assert.Nil(t, err)
    		time.Sleep(1 * time.Second)
    
    	})
    
    	wg.Wait()
    
    	var jobPersistence JobPersistence
    	var jobStats JobStats
    	var jobLogs []JobLog // Assuming JobLog is your log model
    
    	// Query JobPersistence
    	if err := db.First(&jobPersistence, "id = ?", "job1").Error; err != nil {
    		t.Errorf("Failed to query JobPersistence: %v", err)
    	} else {
    		// Validate the fields
    		assert.Equal(t, JobID("job1"), jobPersistence.ID)
    	}
    
    	// Query JobStats
    	if err := db.First(&jobStats, "job_id = ?", jobPersistence.ID).Error; err != nil {
    		t.Errorf("Failed to query JobStats: %v", err)
    	} else {
    		// Validate the fields
    		assert.Equal(t, jobPersistence.ID, jobStats.JobID)
    	}
    
    	// Query JobLogs
    	if err := db.Find(&jobLogs, "job_id = ?", jobPersistence.ID).Error; err != nil {
    		t.Errorf("Failed to query JobLogs: %v", err)
    	} else {
    		assert.NotEmpty(t, jobLogs)
    
    		for _, l := range jobLogs {
    			assert.Equal(t, jobPersistence.ID, l.JobID)
    		}
    	}
    
    	cancel()
    
    	select {
    	case <-done:
    		time.Sleep(1 * time.Second)
    	case <-time.After(1 * time.Minute):
    		t.Error("test hangs, timeout reached")
    	}
    }