Skip to content
Snippets Groups Projects
Verified Commit d14a74cc authored by Volker Schukai's avatar Volker Schukai :alien:
Browse files

feat: implement core functions

parent bd62057b
No related branches found
No related tags found
No related merge requests found
Showing with 1921 additions and 184 deletions
package jobqueue
import (
"testing"
)
func TestEnqueueJobAlreadyExists(t *testing.T) {
runner := &DummyRunnable{}
job := NewJob[DummyResult](JobID("1"), runner)
q := NewQueue(nil)
_ = q.Enqueue(job)
err := q.Enqueue(job)
if err != ErrJobAlreadyExists {
t.Fatalf("Expected ErrJobAlreadyExists, got %v", err)
}
}
func TestEnqueueAndDequeue(t *testing.T) {
runner := &DummyRunnable{}
q := NewQueue(nil)
job1 := NewJob[DummyResult](JobID("1"), runner)
job1.SetPriority(PriorityHigh)
job2 := NewJob[DummyResult](JobID("2"), runner)
_ = q.Enqueue(job1)
_ = q.Enqueue(job2)
dequeuedJob, err := q.Dequeue()
if err != nil || dequeuedJob.GetID() != JobID("1") {
t.Fatalf("Unexpected dequeue result: jobID %s, err %v", dequeuedJob.GetID(), err)
}
}
func TestEnqueueAndDequeue2(t *testing.T) {
runner := &DummyRunnable{}
q := NewQueue(nil)
job1 := NewJob[DummyResult](JobID("1"), runner)
job2 := NewJob[DummyResult](JobID("2"), runner)
job2.AddDependency(JobID("1"))
_ = q.Enqueue(job1)
_ = q.Enqueue(job2)
dequeuedJob, err := q.Dequeue()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if dequeuedJob.GetID() != JobID("1") {
t.Fatalf("Unexpected dequeue result: jobID %s", dequeuedJob.GetID())
}
}
func TestDependencyResolution(t *testing.T) {
runner := &DummyRunnable{}
q := NewQueue(nil)
job1 := NewJob[DummyResult](JobID("1"), runner)
job2 := NewJob[DummyResult](JobID("2"), runner)
job3 := NewJob[DummyResult](JobID("3"), runner)
_ = q.Enqueue(job3)
_ = q.Enqueue(job2)
_ = q.Enqueue(job1)
contains := func(arr []JobID, id JobID) bool {
for _, v := range arr {
if v == id {
return true
}
}
return false
}
possibleJobIDs := []JobID{"1", "2", "3"}
job, _ := q.Dequeue()
if !contains(possibleJobIDs, job.GetID()) {
t.Fatalf("Expected jobID in %v, got %s", possibleJobIDs, job.GetID())
}
// remove jobID from possibleJobIDs
for i, v := range possibleJobIDs {
if v == job.GetID() {
possibleJobIDs = append(possibleJobIDs[:i], possibleJobIDs[i+1:]...)
}
}
job, _ = q.Dequeue()
if !contains(possibleJobIDs, job.GetID()) {
t.Fatalf("Expected jobID in %v, got %s", possibleJobIDs, job.GetID())
}
// remove jobID from possibleJobIDs
for i, v := range possibleJobIDs {
if v == job.GetID() {
possibleJobIDs = append(possibleJobIDs[:i], possibleJobIDs[i+1:]...)
}
}
job, _ = q.Dequeue()
if !contains(possibleJobIDs, job.GetID()) {
t.Fatalf("Expected jobID in %v, got %s", possibleJobIDs, job.GetID())
}
// remove jobID from possibleJobIDs
for i, v := range possibleJobIDs {
if v == job.GetID() {
possibleJobIDs = append(possibleJobIDs[:i], possibleJobIDs[i+1:]...)
}
}
if len(possibleJobIDs) != 0 {
t.Fatalf("Expected no jobIDs left in %v", possibleJobIDs)
}
}
func TestDequeueEmptyQueue(t *testing.T) {
q := NewQueue(nil)
_, err := q.Dequeue()
if err != ErrQueueEmpty {
t.Fatalf("Expected ErrQueueEmpty, got %v", err)
}
}
func TestProcessedJobs(t *testing.T) {
q := NewQueue(nil)
runner := &DummyRunnable{}
job1 := NewJob[DummyResult](JobID("1"), runner)
job2 := NewJob[DummyResult](JobID("2"), runner)
_ = q.Enqueue(job1)
_, _ = q.Dequeue()
_ = q.Enqueue(job2)
_, err := q.Dequeue()
if err != nil {
t.Fatalf("Dequeue failed: %v", err)
}
if _, exists := q.processedJobs[job1.GetID()]; !exists {
t.Fatalf("Job 1 not marked as processed")
}
}
func TestCyclicDependencies(t *testing.T) {
runner := &DummyRunnable{}
q := NewQueue(nil)
job1 := NewJob[DummyResult](JobID("1"), runner)
job2 := NewJob[DummyResult](JobID("2"), runner)
job3 := NewJob[DummyResult](JobID("3"), runner)
job1.AddDependency(JobID("2"))
job2.AddDependency(JobID("3"))
job3.AddDependency(JobID("1"))
err := q.Enqueue(job1)
if err != nil {
t.Fatalf("Enqueue failed: %v", err)
}
err = q.Enqueue(job2)
if err != nil {
t.Fatalf("Enqueue failed: %v", err)
}
err = q.Enqueue(job3)
if err == nil || err != ErrCycleDetected {
t.Fatalf("Expected ErrCyclicDependency, got %v", err)
}
}
func TestDuplicateDependencies(t *testing.T) {
runner := &DummyRunnable{}
q := NewQueue(nil)
job1 := NewJob[DummyResult](JobID("1"), runner)
job2 := NewJob[DummyResult](JobID("2"), runner)
job2.AddDependency(JobID("1"))
job2.AddDependency(JobID("1"))
_ = q.Enqueue(job1)
err := q.Enqueue(job2)
if err != nil {
t.Fatalf("Enqueue failed: %v", err)
}
_, err = q.Dequeue()
if err != nil {
t.Fatalf("Dequeue failed: %v", err)
}
_, err = q.Dequeue()
if err != nil {
t.Fatalf("Dequeue failed: %v", err)
}
if len(q.processedJobs) != 2 {
t.Fatalf("Expected 2 processed jobs, got %d", len(q.processedJobs))
}
}
func TestJobWithSelfAsDependency(t *testing.T) {
runner := &DummyRunnable{}
q := NewQueue(nil)
job1 := NewJob[DummyResult](JobID("1"), runner)
job1.AddDependency(JobID("1"))
err := q.Enqueue(job1)
if err == nil || err != ErrCycleDetected {
t.Fatalf("Expected ErrCycleDetected, got %v", err)
}
}
// Continue with other test cases...
package jobqueue
import (
"sync"
)
// CounterResult is a result of a counter
type CounterResult struct {
Count int
}
// CounterRunnable is a runnable that counts
type CounterRunnable struct {
Count int
mu sync.Mutex
}
// GetCount returns the current count
func (c *CounterRunnable) GetCount() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.Count
}
// Run runs the counter
func (c *CounterRunnable) Run() (RunResult[CounterResult], error) {
c.mu.Lock()
defer c.mu.Unlock()
c.Count++
return RunResult[CounterResult]{
Status: ResultStatusSuccess,
Data: CounterResult{
Count: c.Count,
},
}, nil
}
package jobqueue
import (
"testing"
)
func TestRunnableCounter(t *testing.T) {
runner := &CounterRunnable{}
r, err := runner.Run()
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if r.Status != ResultStatusSuccess {
t.Errorf("Unexpected result status: %v", r.Status)
}
if r.Data.Count != 1 {
t.Errorf("Unexpected result data: %v", r.Data)
}
}
package jobqueue
// DummyResult is a dummy result
type DummyResult struct {
}
type DummyRunnable struct{}
func (d *DummyRunnable) Run() (RunResult[DummyResult], error) {
return RunResult[DummyResult]{
Status: ResultStatusSuccess,
}, nil
}
package jobqueue
import (
"testing"
)
func TestDummyRunnable(t *testing.T) {
runner := &DummyRunnable{}
_, err := runner.Run()
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
}
package jobqueue
import (
"os"
)
type FileOperationResult struct {
Success bool
Content string // Optional, je nach Operation
}
const (
FileOperationRead = "read"
FileOperationWrite = "write"
FileOperationDelete = "delete"
FileOperationAppend = "append"
FileOperationCreate = "create"
)
type FileOperationRunnable struct {
Operation string // z.B. "read", "write", "delete"
FilePath string
Content string // Optional, je nach Operation
}
func (f *FileOperationRunnable) Run() (RunResult[FileOperationResult], error) {
switch f.Operation {
case FileOperationRead:
content, err := os.ReadFile(f.FilePath)
if err != nil {
return RunResult[FileOperationResult]{Status: ResultStatusFailed}, err
}
return RunResult[FileOperationResult]{
Status: ResultStatusSuccess,
Data: FileOperationResult{
Success: true,
Content: string(content),
},
}, nil
case FileOperationWrite:
err := os.WriteFile(f.FilePath, []byte(f.Content), 0644)
if err != nil {
return RunResult[FileOperationResult]{Status: ResultStatusFailed}, err
}
return RunResult[FileOperationResult]{
Status: ResultStatusSuccess,
Data: FileOperationResult{
Success: true,
},
}, nil
case FileOperationDelete:
err := os.Remove(f.FilePath)
if err != nil {
return RunResult[FileOperationResult]{Status: ResultStatusFailed}, err
}
return RunResult[FileOperationResult]{
Status: ResultStatusSuccess,
Data: FileOperationResult{
Success: true,
},
}, nil
case FileOperationAppend:
fp, err := os.OpenFile(f.FilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return RunResult[FileOperationResult]{Status: ResultStatusFailed}, err
}
defer fp.Close()
if _, err := fp.WriteString(f.Content); err != nil {
return RunResult[FileOperationResult]{Status: ResultStatusFailed}, err
}
return RunResult[FileOperationResult]{
Status: ResultStatusSuccess,
Data: FileOperationResult{
Success: true,
},
}, nil
case FileOperationCreate:
f, err := os.Create(f.FilePath)
if err != nil {
return RunResult[FileOperationResult]{Status: ResultStatusFailed}, err
}
defer f.Close()
return RunResult[FileOperationResult]{
Status: ResultStatusSuccess,
Data: FileOperationResult{
Success: true,
},
}, nil
default:
return RunResult[FileOperationResult]{Status: ResultStatusFailed}, ErrUnsupportedFileOption
}
}
package jobqueue
import (
"io/ioutil"
"os"
"path"
"testing"
)
func TestFileOperationRunnable(t *testing.T) {
dir := t.TempDir()
err := os.Chdir(dir)
if err != nil {
t.Fatalf("Failed to change directory: %v", err)
}
testFilePath := path.Join(dir, "test.txt")
testContent := "Hello, World!"
// Test FileOperationCreate
createRunner := FileOperationRunnable{Operation: FileOperationCreate, FilePath: testFilePath}
_, err = createRunner.Run()
if err != nil {
t.Fatalf("Failed to create file: %v", err)
}
// Test FileOperationWrite
writeRunner := FileOperationRunnable{Operation: FileOperationWrite, FilePath: testFilePath, Content: testContent}
_, err = writeRunner.Run()
if err != nil {
t.Fatalf("Failed to write to file: %v", err)
}
// Test FileOperationRead
readRunner := FileOperationRunnable{Operation: FileOperationRead, FilePath: testFilePath}
result, err := readRunner.Run()
if err != nil || result.Data.Content != testContent {
t.Fatalf("Failed to read from file: %v", err)
}
// Test FileOperationAppend
appendContent := " Appended."
appendRunner := FileOperationRunnable{Operation: FileOperationAppend, FilePath: testFilePath, Content: appendContent}
_, err = appendRunner.Run()
if err != nil {
t.Fatalf("Failed to append to file: %v", err)
}
// Re-verify content after append
updatedContent, _ := ioutil.ReadFile(testFilePath)
if string(updatedContent) != testContent+appendContent {
t.Fatalf("Append operation failed.")
}
// Test FileOperationDelete
deleteRunner := FileOperationRunnable{Operation: FileOperationDelete, FilePath: testFilePath}
_, err = deleteRunner.Run()
if err != nil {
t.Fatalf("Failed to delete file: %v", err)
}
// Verify the file is deleted
if _, err := os.Stat(testFilePath); !os.IsNotExist(err) {
t.Fatalf("File deletion failed.")
}
}
package jobqueue
import (
"gorm.io/driver/mysql"
"gorm.io/gorm"
)
// DBResult is a result of a db query
type DBResult struct {
RowsAffected int
}
type DBRunnable struct {
Type string
DSN string
Query string
db *gorm.DB // internal for testing
}
func (d *DBRunnable) Run() (RunResult[DBResult], error) {
var db *gorm.DB
var err error
if d.db == nil {
switch d.Type {
case "mysql":
db, err = gorm.Open(mysql.Open(d.DSN), &gorm.Config{})
default:
return RunResult[DBResult]{Status: ResultStatusFailed}, ErrUnsupportedDatabaseType
}
} else {
db = d.db
}
if err != nil {
return RunResult[DBResult]{Status: ResultStatusFailed}, err
}
var result *gorm.DB
result = db.Exec(d.Query)
if result.Error != nil {
return RunResult[DBResult]{Status: ResultStatusFailed}, result.Error
}
return RunResult[DBResult]{
Status: ResultStatusSuccess,
Data: DBResult{
RowsAffected: int(result.RowsAffected),
},
}, nil
}
package jobqueue
import (
"github.com/DATA-DOG/go-sqlmock"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"testing"
)
func TestDBRunnable_Run(t *testing.T) {
// Mock-DB erstellen
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Could not create mock: %s", err)
}
gormDB, _ := gorm.Open(mysql.New(mysql.Config{
Conn: db,
SkipInitializeWithVersion: true,
}), &gorm.Config{})
//existsRows := sqlmock.NewRows([]string{"exists"}).
// AddRow(true)
//mock.ExpectQuery("SELECT EXISTS \\( SELECT 1 FROM information_schema\\.tables WHERE table_schema = 'public' AND table_name = 'myTable3' \\);").
// WillReturnRows(existsRows)
mock.ExpectExec("SELECT \\* FROM table_name").WillReturnResult(sqlmock.NewResult(1, 1))
// Erstellen Sie die zu testende Instanz
runnable := &DBRunnable{
Type: "mysql",
db: gormDB, // Injizierte Mock-DB
Query: "SELECT * FROM table_name",
}
// Rufen Sie die Run()-Methode auf und überprüfen Sie die Ergebnisse
result, err := runnable.Run()
// Überprüfungen hier
if err != nil {
t.Fatalf("Failed to run: %s", err)
}
if result.Status != ResultStatusSuccess {
t.Fatalf("Expected success, got: %d", result.Status)
}
}
package jobqueue
import (
"bytes"
"io"
"net/http"
)
// HTTPResult is a result of a http request
type HTTPResult struct {
StatusCode int
Body string
}
type HTTPRunnable struct {
URL string
Method string
Header map[string]string
Body string
}
func (h *HTTPRunnable) Run() (RunResult[HTTPResult], error) {
client := &http.Client{}
reqBody := bytes.NewBufferString(h.Body)
req, err := http.NewRequest(h.Method, h.URL, reqBody)
if err != nil {
return RunResult[HTTPResult]{Status: ResultStatusFailed}, err
}
for key, value := range h.Header {
req.Header.Set(key, value)
}
resp, err := client.Do(req)
if err != nil {
return RunResult[HTTPResult]{Status: ResultStatusFailed}, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return RunResult[HTTPResult]{Status: ResultStatusFailed}, err
}
return RunResult[HTTPResult]{
Status: ResultStatusSuccess,
Data: HTTPResult{
StatusCode: resp.StatusCode,
Body: string(body),
},
}, nil
}
package jobqueue
import (
"github.com/stretchr/testify/assert"
"net/http"
"net/http/httptest"
"testing"
)
func TestHTTPRunnable_Run(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte("Hello, world!"))
}))
defer server.Close()
httpRunnable := &HTTPRunnable{
URL: server.URL,
Method: "GET",
Header: map[string]string{
"Content-Type": "application/json",
},
Body: "",
}
result, err := httpRunnable.Run()
// Assertions
assert.NoError(t, err)
assert.Equal(t, ResultStatusSuccess, result.Status)
assert.IsType(t, HTTPResult{}, result.Data)
httpResult := result.Data
assert.Equal(t, 200, httpResult.StatusCode)
assert.Equal(t, "Hello, world!", httpResult.Body)
}
package jobqueue
import (
"net/smtp"
)
// MailResult is a result of a email
type MailResult struct {
Sent bool
ServerReply string
SmtpStatusCode uint
}
type MailRunnable struct {
To string
From string
Subject string
Body string
Server string
Port string
Username string
Password string
Headers map[string]string
}
func (m *MailRunnable) Run() (RunResult[MailResult], error) {
smtpServer := m.Server + ":" + m.Port
// Connect to the remote SMTP server.
client, err := smtp.Dial(smtpServer)
if err != nil {
return RunResult[MailResult]{Status: ResultStatusFailed}, err
}
if client != nil {
defer client.Close()
}
if m.Username != "" && m.Password != "" {
if err := client.Auth(smtp.PlainAuth("", m.Username, m.Password, m.Server)); err != nil {
return RunResult[MailResult]{Status: ResultStatusFailed}, err
}
}
// To && From.
if err := client.Mail(m.From); err != nil {
return RunResult[MailResult]{Status: ResultStatusFailed}, err
}
if err := client.Rcpt(m.To); err != nil {
return RunResult[MailResult]{Status: ResultStatusFailed}, err
}
// Headers and Data.
writer, err := client.Data()
if err != nil {
return RunResult[MailResult]{Status: ResultStatusFailed}, err
}
headers := "From: " + m.From + "\r\n"
headers += "To: " + m.To + "\r\n"
headers += "Subject: " + m.Subject + "\r\n"
for key, value := range m.Headers {
headers += key + ": " + value + "\r\n"
}
_, err = writer.Write([]byte(headers + "\r\n" + m.Body))
if err != nil {
return RunResult[MailResult]{Status: ResultStatusFailed}, err
}
_ = writer.Close()
// Quit and get the SMTP status code.
smtpStatusCode, _ := client.Text.Cmd("QUIT")
return RunResult[MailResult]{Status: ResultStatusSuccess, Data: MailResult{Sent: true, SmtpStatusCode: smtpStatusCode}}, nil
}
package jobqueue
import (
"context"
"fmt"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/docker/go-connections/nat"
"github.com/stretchr/testify/assert"
"net"
"testing"
"time"
)
func startTestSMTPDockerImageAndContainer(t *testing.T, host string, port string, ctx context.Context) error {
t.Helper()
cli, err := client.NewClientWithOpts(client.WithVersion("1.41"))
if err != nil {
return err
}
imageName := "axllent/mailpit"
reader, err := cli.ImagePull(ctx, imageName, types.ImagePullOptions{})
if err != nil {
return err
}
// if debug image pull, comment out the following lines
//_, _ = io.Copy(os.Stdout, reader)
_ = reader
hostConfig := &container.HostConfig{
PortBindings: nat.PortMap{
"1025/tcp": []nat.PortBinding{
{
HostIP: host,
HostPort: port,
},
},
"8025/tcp": []nat.PortBinding{
{
HostIP: host,
HostPort: "8025",
},
},
},
}
resp, err := cli.ContainerCreate(ctx, &container.Config{
Image: imageName,
}, hostConfig, nil, nil, "")
if err != nil {
return err
}
if err := cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil {
return err
}
go func() {
<-ctx.Done()
timeout := 0
stopOptions := container.StopOptions{
Timeout: &timeout,
Signal: "SIGKILL",
}
newCtx, _ := context.WithTimeout(context.Background(), 20*time.Second)
if err := cli.ContainerStop(newCtx, resp.ID, stopOptions); err != nil {
t.Errorf("ContainerStop returned error: %v", err)
}
if err := cli.ContainerRemove(newCtx, resp.ID, types.ContainerRemoveOptions{
Force: true,
}); err != nil {
t.Errorf("ContainerRemove returned error: %v", err)
}
}()
statusCh, errCh := cli.ContainerWait(ctx, resp.ID, container.WaitConditionNotRunning)
select {
case err := <-errCh:
if err != nil {
// empty error means container exited normally (see container_wait.go)
if err.Error() == "" {
return nil
}
return err
}
case <-statusCh:
}
return nil
}
func TestMailRunner(t *testing.T) {
ctb := context.Background()
ctx, cancel := context.WithCancel(ctb)
t.Cleanup(func() {
cancel()
time.Sleep(1 * time.Second)
})
host := "127.0.0.1"
listener, err := net.Listen("tcp", host+":0")
if err != nil {
t.Errorf("Unexpected error: %v", err)
return
}
portAsInt := listener.Addr().(*net.TCPAddr).Port
portAsString := fmt.Sprintf("%d", portAsInt)
_ = listener.Close()
done := make(chan bool)
go func() {
err = startTestSMTPDockerImageAndContainer(t, host, portAsString, ctx)
if err != nil {
t.Errorf("Unexpected error: %v", err)
cancel()
}
done <- true
}()
waitCtx, waitCancel := context.WithTimeout(ctx, 30*time.Second)
defer waitCancel()
for {
conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, portAsString), 1*time.Second)
if err == nil {
err = conn.Close()
assert.Nil(t, err)
break
}
select {
case <-waitCtx.Done():
t.Error("Timeout waiting for container service")
cancel()
return
default:
time.Sleep(1 * time.Second)
}
}
time.Sleep(1 * time.Second)
mailRunnable := &MailRunnable{
To: "to@example.com",
From: "from@example.com",
Subject: "this is a test",
Body: "this is the body",
Server: host,
Port: portAsString,
Username: "",
Password: "",
Headers: map[string]string{
"X-Test": "test",
},
}
result, err := mailRunnable.Run()
// Assertions
assert.NoError(t, err)
assert.Equal(t, ResultStatusSuccess, result.Status)
assert.IsType(t, MailResult{}, result.Data)
// check result.Data contains 4 files
mailResult := result.Data.Sent
assert.Equal(t, true, mailResult)
cancel()
select {
case <-done:
time.Sleep(1 * time.Second)
case <-time.After(1 * time.Minute):
t.Error("test hangs, timeout reached")
}
}
package jobqueue
import (
"fmt"
"github.com/pkg/sftp"
"golang.org/x/crypto/ssh"
"io"
"os"
)
// SFTPResult is a result of a sftp
type SFTPResult struct {
FilesCopied []string
}
const (
CredentialTypePassword = "password"
CredentialTypeKey = "key"
)
type Direction string
const (
LocalToRemote Direction = "LocalToRemote"
RemoteToLocal Direction = "RemoteToLocal"
)
type SFTPRunnable struct {
Host string
Port int
User string
Insecure bool
Credential string
CredentialType string
HostKey string
SrcDir string
DstDir string
TransferDirection Direction
}
func (s *SFTPRunnable) Run() (RunResult[SFTPResult], error) {
var authMethod ssh.AuthMethod
// Auth
switch s.CredentialType {
case CredentialTypePassword:
authMethod = ssh.Password(s.Credential)
case CredentialTypeKey:
key, err := ssh.ParsePrivateKey([]byte(s.Credential))
if err != nil {
return RunResult[SFTPResult]{Status: ResultStatusFailed}, err
}
authMethod = ssh.PublicKeys(key)
default:
return RunResult[SFTPResult]{Status: ResultStatusFailed}, ErrUnsupportedCredentialType
}
var hkCallback ssh.HostKeyCallback
if s.HostKey != "" {
hostkeyBytes := []byte(s.HostKey)
hostKey, err := ssh.ParsePublicKey(hostkeyBytes)
if err != nil {
return RunResult[SFTPResult]{Status: ResultStatusFailed}, err
}
hkCallback = ssh.FixedHostKey(hostKey)
} else {
if s.Insecure {
hkCallback = ssh.InsecureIgnoreHostKey()
} else {
hkCallback = ssh.FixedHostKey(nil)
}
}
config := &ssh.ClientConfig{
User: s.User,
Auth: []ssh.AuthMethod{
authMethod,
},
HostKeyCallback: hkCallback,
}
client, err := ssh.Dial("tcp", fmt.Sprintf("%s:%d", s.Host, s.Port), config)
if err != nil {
return RunResult[SFTPResult]{Status: ResultStatusFailed}, err
}
defer client.Close()
sftpClient, err := sftp.NewClient(client)
if err != nil {
return RunResult[SFTPResult]{Status: ResultStatusFailed}, err
}
defer sftpClient.Close()
var filesCopied []string
switch s.TransferDirection {
case LocalToRemote:
filesCopied, err = s.copyLocalToRemote(sftpClient)
case RemoteToLocal:
filesCopied, err = s.copyRemoteToLocal(sftpClient)
default:
return RunResult[SFTPResult]{Status: ResultStatusFailed}, ErrUnsupportedTransferDirection
}
if err != nil {
return RunResult[SFTPResult]{Status: ResultStatusFailed}, err
}
if err != nil {
return RunResult[SFTPResult]{Status: ResultStatusFailed}, err
}
return RunResult[SFTPResult]{Status: ResultStatusSuccess, Data: SFTPResult{FilesCopied: filesCopied}}, nil
}
func copyFile(src io.Reader, dst io.Writer) error {
_, err := io.Copy(dst, src)
return err
}
func (s *SFTPRunnable) copyLocalToRemote(sftpClient *sftp.Client) ([]string, error) {
var filesCopied []string
// create destination directory
err := sftpClient.MkdirAll(s.DstDir)
if err != nil {
return nil, err
}
// copy files
files, err := os.ReadDir(s.SrcDir)
if err != nil {
return nil, err
}
for _, file := range files {
if file.IsDir() {
continue
}
srcFile, err := os.Open(fmt.Sprintf("%s/%s", s.SrcDir, file.Name()))
if err != nil {
return nil, err
}
dstFile, err := sftpClient.Create(fmt.Sprintf("%s/%s", s.DstDir, file.Name()))
if err != nil {
_ = srcFile.Close()
return nil, err
}
err = copyFile(srcFile, dstFile)
_ = srcFile.Close()
_ = dstFile.Close()
if err != nil {
return nil, err
}
filesCopied = append(filesCopied, fmt.Sprintf("%s/%s", s.DstDir, file.Name()))
}
return filesCopied, nil
}
func (s *SFTPRunnable) copyRemoteToLocal(sftpClient *sftp.Client) ([]string, error) {
var filesCopied []string
// create destination directory
err := os.MkdirAll(s.DstDir, 0755)
if err != nil {
return nil, err
}
// copy files
files, err := sftpClient.ReadDir(s.SrcDir)
if err != nil {
return nil, err
}
for _, file := range files {
if file.IsDir() {
continue
}
srcFile, err := sftpClient.Open(fmt.Sprintf("%s/%s", s.SrcDir, file.Name()))
if err != nil {
return nil, err
}
dstFile, err := os.Create(fmt.Sprintf("%s/%s", s.DstDir, file.Name()))
if err != nil {
_ = srcFile.Close()
return nil, err
}
err = copyFile(srcFile, dstFile)
_ = srcFile.Close()
_ = dstFile.Close()
if err != nil {
return nil, err
}
filesCopied = append(filesCopied, fmt.Sprintf("%s/%s", s.DstDir, file.Name()))
}
return filesCopied, nil
}
This diff is collapsed.
package jobqueue
import (
"os/exec"
"strings"
)
// ShellResult is a result of a shell script
type ShellResult struct {
Output string
Error string
ExitCode int
}
type ShellRunnable struct {
ScriptPath string
}
func (s *ShellRunnable) Run() (RunResult[ShellResult], error) {
cmd := exec.Command("sh", s.ScriptPath)
output, err := cmd.Output()
var stderr []byte
if err != nil {
stderr = err.(*exec.ExitError).Stderr
}
exitCode := 0
if err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
exitCode = exitError.ExitCode()
}
return RunResult[ShellResult]{
Status: ResultStatusFailed,
Data: ShellResult{
Output: string(output),
ExitCode: exitCode,
Error: string(stderr),
},
}, err
}
return RunResult[ShellResult]{
Status: ResultStatusSuccess,
Data: ShellResult{
Output: strings.TrimSpace(string(output)),
ExitCode: exitCode,
Error: string(stderr),
},
}, nil
}
package jobqueue
import (
"os"
"path"
"testing"
)
func TestShellRunnable_Run(t *testing.T) {
// Erstellen einer temporären Shell-Datei#
tmpDir := t.TempDir()
tmpFile := "example.sh"
tmpPath := path.Join(tmpDir, tmpFile)
fp, err := os.Create(tmpPath)
if err != nil {
t.Fatal(err)
}
defer fp.Close()
content := []byte("#!/bin/sh\necho 'Hello, world!'\n")
if _, err := fp.Write(content); err != nil {
t.Fatal(err)
}
// ShellRunnable mit dem Pfad der temporären Datei initialisieren
shellRunnable := ShellRunnable{ScriptPath: fp.Name()}
// Run-Methode aufrufen
result, err := shellRunnable.Run()
// Überprüfungen
if err != nil {
t.Errorf("Run() failed with error: %v", err)
}
if result.Status != ResultStatusSuccess {
t.Errorf("Expected status Success, got %v", result.Status)
}
if result.Data.Output != "Hello, world!" {
t.Errorf("Expected output 'Hello, world!', got '%v'", result.Data.Output)
}
if result.Data.ExitCode != 0 {
t.Errorf("Expected exit code 0, got %v", result.Data.ExitCode)
}
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment