// Copyright 2023 schukai GmbH // SPDX-License-Identifier: AGPL-3.0 //go:build !bench && !race && !runOnTask // the creation of the container is not working on the CI server // nor on the task command. use this test manually to test the // sftp functionality package jobqueue import ( "context" "fmt" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" "github.com/docker/docker/client" "github.com/docker/go-connections/nat" "github.com/stretchr/testify/assert" "net" "os" "testing" "time" ) func startSFTPTestDockerImageAndContainer(t *testing.T, host string, port string, volume string, ctx context.Context) error { t.Helper() cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) if err != nil { return err } imageName := "atmoz/sftp:alpine" reader, err := cli.ImagePull(ctx, imageName, types.ImagePullOptions{}) if err != nil { return err } //if debug image pull, comment out the following lines //_, _ = io.Copy(os.Stdout, reader) _ = reader hostConfig := &container.HostConfig{ PortBindings: nat.PortMap{ "22/tcp": []nat.PortBinding{ { HostIP: host, HostPort: port, }, }, }, } if volume != "" { hostConfig.Binds = append(hostConfig.Binds, volume+":/home/demo/upload") } resp, err := cli.ContainerCreate(ctx, &container.Config{ Image: imageName, Cmd: []string{"demo:secret:::upload"}, }, hostConfig, nil, nil, "") if err != nil { return err } if err := cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil { return err } go func() { <-ctx.Done() timeout := 0 stopOptions := container.StopOptions{ Timeout: &timeout, Signal: "SIGKILL", } newCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() if err := cli.ContainerStop(newCtx, resp.ID, stopOptions); err != nil { t.Errorf("ContainerStop returned error: %v", err) } if err := cli.ContainerRemove(newCtx, resp.ID, types.ContainerRemoveOptions{ Force: true, }); err != nil { t.Errorf("ContainerRemove returned error: %v", err) } }() statusCh, errCh := cli.ContainerWait(ctx, resp.ID, container.WaitConditionNotRunning) select { case err := <-errCh: if err != nil { // empty error means container exited normally (see container_wait.go) if err.Error() == "" { return nil } return err } case <-statusCh: } return nil } func TestSFTPCRunnerLocalToRemote(t *testing.T) { if os.Getenv("CI_SERVER") != "" { t.Skip("Skipping test because CI_SERVER is set") // TODO: run this test in CI } ctb := context.Background() ctx, cancel := context.WithCancel(ctb) t.Cleanup(func() { cancel() time.Sleep(1 * time.Second) }) host := "0.0.0.0" listener, err := net.Listen("tcp", host+":0") if err != nil { t.Errorf("Unexpected error: %v", err) return } portAsInt := listener.Addr().(*net.TCPAddr).Port portAsString := fmt.Sprintf("%d", portAsInt) _ = listener.Close() done := make(chan bool) go func() { err = startSFTPTestDockerImageAndContainer(t, host, portAsString, "", ctx) if err != nil { t.Errorf("Unexpected error: %v", err) cancel() } done <- true }() waitCtx, waitCancel := context.WithTimeout(ctx, 60*time.Second) defer waitCancel() for { conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, portAsString), 1*time.Second) if err == nil { err = conn.Close() assert.Nil(t, err) break } select { case <-waitCtx.Done(): t.Error("Timeout waiting for container service") cancel() return default: time.Sleep(1 * time.Second) } } time.Sleep(1 * time.Second) tempDir := t.TempDir() // create 4 test files for i := 0; i < 4; i++ { _, err := os.Create(fmt.Sprintf("%s/testfile%d.txt", tempDir, i)) if err != nil { t.Errorf("Unexpected error: %v", err) return } } sftpRunnable := &SFTPRunnable{ Host: host, Port: portAsInt, User: "demo", Insecure: true, Credential: "secret", CredentialType: "password", SrcDir: tempDir, DstDir: "upload", TransferDirection: LocalToRemote, } ctx = context.Background() result, err := sftpRunnable.Run(ctx) // Assertions assert.NoError(t, err) assert.Equal(t, ResultStatusSuccess, result.Status) assert.IsType(t, SFTPResult{}, result.Data) // check result.Data contains 4 files sftpResult := result.Data.FilesCopied assert.Equal(t, 4, len(sftpResult)) cancel() select { case <-done: time.Sleep(1 * time.Second) case <-time.After(1 * time.Minute): t.Error("test hangs, timeout reached") } } func TestSFTPCRunnerRemoteToLocal(t *testing.T) { if os.Getenv("CI_SERVER") != "" { t.Skip("Skipping test because CI_SERVER is set") // TODO: run this test in CI } ctb := context.Background() ctx, cancel := context.WithCancel(ctb) t.Cleanup(func() { cancel() time.Sleep(1 * time.Second) }) host := "127.0.0.1" listener, err := net.Listen("tcp", host+":0") if err != nil { t.Errorf("Unexpected error: %v", err) return } portAsInt := listener.Addr().(*net.TCPAddr).Port portAsString := fmt.Sprintf("%d", portAsInt) _ = listener.Close() tempSrcDir := t.TempDir() // create 4 test files for i := 0; i < 4; i++ { _, err := os.Create(fmt.Sprintf("%s/testfile%d.txt", tempSrcDir, i)) if err != nil { t.Errorf("Unexpected error: %v", err) return } } done := make(chan bool) go func() { err = startSFTPTestDockerImageAndContainer(t, host, portAsString, tempSrcDir, ctx) if err != nil { t.Errorf("Unexpected error: %v", err) cancel() } done <- true }() waitCtx, waitCancel := context.WithTimeout(ctx, 60*time.Second) defer waitCancel() for { conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, portAsString), 1*time.Second) if err == nil { err = conn.Close() assert.Nil(t, err) break } select { case <-waitCtx.Done(): t.Error("Timeout waiting for container service") cancel() return default: time.Sleep(1 * time.Second) } } time.Sleep(1 * time.Second) tempDir := t.TempDir() sftpRunnable := &SFTPRunnable{ Host: host, Port: portAsInt, User: "demo", Insecure: true, Credential: "secret", CredentialType: "password", SrcDir: "upload", // Remote-Verzeichnis mit Dateien DstDir: tempDir, TransferDirection: RemoteToLocal, } // Methode aufrufen ctx = context.Background() result, err := sftpRunnable.Run(ctx) // Assertions assert.NoError(t, err) assert.Equal(t, ResultStatusSuccess, result.Status) assert.IsType(t, SFTPResult{}, result.Data) // check result.Data contains 4 files sftpResult := result.Data.FilesCopied assert.Equal(t, 4, len(sftpResult)) // check files in tempDir files, err := os.ReadDir(tempDir) assert.NoError(t, err) assert.Equal(t, 4, len(files)) cancel() select { case <-done: time.Sleep(5 * time.Second) case <-time.After(1 * time.Minute): t.Error("test hangs, timeout reached") } }