Skip to content
Snippets Groups Projects
Select Git revision
  • master
  • v1.23.2
  • v1.23.1
  • v1.23.0
  • v1.22.0
  • v1.21.1
  • v1.21.0
  • v1.20.3
  • v1.20.2
  • v1.20.1
  • v1.20.0
  • v1.19.4
  • v1.19.3
  • v1.19.2
  • v1.19.1
  • v1.19.0
  • v1.18.2
  • v1.18.1
  • v1.18.0
  • v1.17.0
  • v1.16.1
21 results

runnable-sftp_test.go

Blame
  • runnable-sftp_test.go 6.94 KiB
    // Copyright 2023 schukai GmbH
    // SPDX-License-Identifier: AGPL-3.0
    
    //go:build !bench && !race && !runOnTask
    
    // the creation of the container is not working on the CI server
    // nor on the task command. use this test manually to test the
    // sftp functionality
    
    package jobqueue
    
    import (
    	"context"
    	"fmt"
    	"github.com/docker/docker/api/types"
    	"github.com/docker/docker/api/types/container"
    	"github.com/docker/docker/client"
    	"github.com/docker/go-connections/nat"
    	"github.com/stretchr/testify/assert"
    	"net"
    	"os"
    	"testing"
    	"time"
    )
    
    func startSFTPTestDockerImageAndContainer(t *testing.T, host string, port string, volume string, ctx context.Context) error {
    	t.Helper()
    
    	cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
    	if err != nil {
    		return err
    	}
    
    	imageName := "atmoz/sftp:alpine"
    
    	reader, err := cli.ImagePull(ctx, imageName, types.ImagePullOptions{})
    	if err != nil {
    		return err
    	}
    
    	//if debug image pull, comment out the following lines
    	//_, _ = io.Copy(os.Stdout, reader)
    	_ = reader
    
    	hostConfig := &container.HostConfig{
    		PortBindings: nat.PortMap{
    			"22/tcp": []nat.PortBinding{
    				{
    					HostIP:   host,
    					HostPort: port,
    				},
    			},
    		},
    	}
    
    	if volume != "" {
    		hostConfig.Binds = append(hostConfig.Binds, volume+":/home/demo/upload")
    	}
    
    	resp, err := cli.ContainerCreate(ctx, &container.Config{
    		Image: imageName,
    		Cmd:   []string{"demo:secret:::upload"},
    	}, hostConfig, nil, nil, "")
    
    	if err != nil {
    		return err
    	}
    
    	if err := cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil {
    		return err
    	}
    
    	go func() {
    		<-ctx.Done()
    
    		timeout := 0
    		stopOptions := container.StopOptions{
    			Timeout: &timeout,
    			Signal:  "SIGKILL",
    		}
    
    		newCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
    		defer cancel()
    
    		if err := cli.ContainerStop(newCtx, resp.ID, stopOptions); err != nil {
    			t.Errorf("ContainerStop returned error: %v", err)
    		}
    		if err := cli.ContainerRemove(newCtx, resp.ID, types.ContainerRemoveOptions{
    			Force: true,
    		}); err != nil {
    			t.Errorf("ContainerRemove returned error: %v", err)
    		}
    
    	}()
    
    	statusCh, errCh := cli.ContainerWait(ctx, resp.ID, container.WaitConditionNotRunning)
    	select {
    	case err := <-errCh:
    		if err != nil {
    			// empty error means container exited normally (see container_wait.go)
    			if err.Error() == "" {
    				return nil
    			}
    
    			return err
    		}
    	case <-statusCh:
    
    	}
    
    	return nil
    }
    
    func TestSFTPCRunnerLocalToRemote(t *testing.T) {
    	if os.Getenv("CI_SERVER") != "" {
    		t.Skip("Skipping test because CI_SERVER is set")
    		// TODO: run this test in CI
    	}
    
    	ctb := context.Background()
    	ctx, cancel := context.WithCancel(ctb)
    	t.Cleanup(func() {
    		cancel()
    		time.Sleep(1 * time.Second)
    	})
    
    	host := "0.0.0.0"
    
    	listener, err := net.Listen("tcp", host+":0")
    	if err != nil {
    		t.Errorf("Unexpected error: %v", err)
    		return
    	}
    	portAsInt := listener.Addr().(*net.TCPAddr).Port
    	portAsString := fmt.Sprintf("%d", portAsInt)
    	_ = listener.Close()
    
    	done := make(chan bool)
    	go func() {
    		err = startSFTPTestDockerImageAndContainer(t, host, portAsString, "", ctx)
    		if err != nil {
    			t.Errorf("Unexpected error: %v", err)
    			cancel()
    		}
    		done <- true
    	}()
    
    	waitCtx, waitCancel := context.WithTimeout(ctx, 60*time.Second)
    	defer waitCancel()
    	for {
    		conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, portAsString), 1*time.Second)
    		if err == nil {
    			err = conn.Close()
    			assert.Nil(t, err)
    			break
    		}
    		select {
    		case <-waitCtx.Done():
    			t.Error("Timeout waiting for container service")
    			cancel()
    			return
    		default:
    			time.Sleep(1 * time.Second)
    		}
    	}
    
    	time.Sleep(1 * time.Second)
    
    	tempDir := t.TempDir()
    	// create 4 test files
    	for i := 0; i < 4; i++ {
    		_, err := os.Create(fmt.Sprintf("%s/testfile%d.txt", tempDir, i))
    		if err != nil {
    			t.Errorf("Unexpected error: %v", err)
    			return
    		}
    	}
    
    	sftpRunnable := &SFTPRunnable{
    		Host:           host,
    		Port:           portAsInt,
    		User:           "demo",
    		Insecure:       true,
    		Credential:     "secret",
    		CredentialType: "password",
    
    		SrcDir:            tempDir,
    		DstDir:            "upload",
    		TransferDirection: LocalToRemote,
    	}
    
    	ctx = context.Background()
    	result, err := sftpRunnable.Run(ctx)
    
    	// Assertions
    	assert.NoError(t, err)
    	assert.Equal(t, ResultStatusSuccess, result.Status)
    	assert.IsType(t, SFTPResult{}, result.Data)
    
    	// check result.Data contains 4 files
    	sftpResult := result.Data.FilesCopied
    	assert.Equal(t, 4, len(sftpResult))
    
    	cancel()
    
    	select {
    	case <-done:
    		time.Sleep(1 * time.Second)
    	case <-time.After(1 * time.Minute):
    		t.Error("test hangs, timeout reached")
    	}
    
    }
    
    func TestSFTPCRunnerRemoteToLocal(t *testing.T) {
    
    	if os.Getenv("CI_SERVER") != "" {
    		t.Skip("Skipping test because CI_SERVER is set")
    		// TODO: run this test in CI
    	}
    
    	ctb := context.Background()
    	ctx, cancel := context.WithCancel(ctb)
    	t.Cleanup(func() {
    		cancel()
    		time.Sleep(1 * time.Second)
    	})
    
    	host := "127.0.0.1"
    
    	listener, err := net.Listen("tcp", host+":0")
    	if err != nil {
    		t.Errorf("Unexpected error: %v", err)
    		return
    	}
    	portAsInt := listener.Addr().(*net.TCPAddr).Port
    	portAsString := fmt.Sprintf("%d", portAsInt)
    	_ = listener.Close()
    
    	tempSrcDir := t.TempDir()
    	// create 4 test files
    	for i := 0; i < 4; i++ {
    		_, err := os.Create(fmt.Sprintf("%s/testfile%d.txt", tempSrcDir, i))
    		if err != nil {
    			t.Errorf("Unexpected error: %v", err)
    			return
    		}
    	}
    
    	done := make(chan bool)
    	go func() {
    		err = startSFTPTestDockerImageAndContainer(t, host, portAsString, tempSrcDir, ctx)
    		if err != nil {
    			t.Errorf("Unexpected error: %v", err)
    			cancel()
    		}
    		done <- true
    	}()
    
    	waitCtx, waitCancel := context.WithTimeout(ctx, 60*time.Second)
    	defer waitCancel()
    	for {
    		conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, portAsString), 1*time.Second)
    		if err == nil {
    			err = conn.Close()
    			assert.Nil(t, err)
    			break
    		}
    		select {
    		case <-waitCtx.Done():
    			t.Error("Timeout waiting for container service")
    			cancel()
    			return
    		default:
    			time.Sleep(1 * time.Second)
    		}
    	}
    
    	time.Sleep(1 * time.Second)
    
    	tempDir := t.TempDir()
    
    	sftpRunnable := &SFTPRunnable{
    		Host:              host,
    		Port:              portAsInt,
    		User:              "demo",
    		Insecure:          true,
    		Credential:        "secret",
    		CredentialType:    "password",
    		SrcDir:            "upload", // Remote-Verzeichnis mit Dateien
    		DstDir:            tempDir,
    		TransferDirection: RemoteToLocal,
    	}
    
    	// Methode aufrufen
    	ctx = context.Background()
    	result, err := sftpRunnable.Run(ctx)
    
    	// Assertions
    	assert.NoError(t, err)
    	assert.Equal(t, ResultStatusSuccess, result.Status)
    	assert.IsType(t, SFTPResult{}, result.Data)
    
    	// check result.Data contains 4 files
    	sftpResult := result.Data.FilesCopied
    	assert.Equal(t, 4, len(sftpResult))
    
    	// check files in tempDir
    	files, err := os.ReadDir(tempDir)
    	assert.NoError(t, err)
    	assert.Equal(t, 4, len(files))
    
    	cancel()
    
    	select {
    	case <-done:
    		time.Sleep(5 * time.Second)
    	case <-time.After(1 * time.Minute):
    		t.Error("test hangs, timeout reached")
    	}
    }