Skip to content
Snippets Groups Projects
Verified Commit ffa01504 authored by Volker Schukai's avatar Volker Schukai :alien:
Browse files

fix: race condition in runnables #46

parent 6b8861d7
No related branches found
No related tags found
No related merge requests found
......@@ -68,6 +68,8 @@ func (c *CounterRunnable) GetType() string {
}
func (c *CounterRunnable) GetPersistence() RunnableImport {
c.mu.Lock()
defer c.mu.Unlock()
data := JSONMap{
"count": c.Count,
......
......@@ -3,7 +3,10 @@
package jobqueue
import "context"
import (
"context"
"sync"
)
func NewDummyRunnableFromMap(data map[string]any) (*DummyRunnable, error) {
return &DummyRunnable{}, nil
......@@ -13,7 +16,9 @@ func NewDummyRunnableFromMap(data map[string]any) (*DummyRunnable, error) {
type DummyResult struct {
}
type DummyRunnable struct{}
type DummyRunnable struct {
mu sync.Mutex
}
func (d *DummyRunnable) Run(_ context.Context) (RunResult[DummyResult], error) {
return RunResult[DummyResult]{
......@@ -26,6 +31,8 @@ func (d *DummyRunnable) GetType() string {
}
func (c *DummyRunnable) GetPersistence() RunnableImport {
c.mu.Lock()
defer c.mu.Unlock()
data := JSONMap{}
......
......@@ -7,6 +7,7 @@ import (
"context"
"fmt"
"os"
"sync"
)
func NewFileOperationRunnableFromMap(data map[string]interface{}) (*FileOperationRunnable, error) {
......@@ -60,6 +61,7 @@ type FileOperationRunnable struct {
Operation string // z.B. "read", "write", "delete"
FilePath string
Content string // Optional, je nach Operation
mu sync.Mutex
}
func (f *FileOperationRunnable) Run(_ context.Context) (RunResult[FileOperationResult], error) {
......@@ -142,6 +144,8 @@ func (f *FileOperationRunnable) GetType() string {
}
func (f *FileOperationRunnable) GetPersistence() RunnableImport {
f.mu.Lock()
defer f.mu.Unlock()
data := JSONMap{
"operation": f.Operation,
......
......@@ -8,6 +8,7 @@ import (
"fmt"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"sync"
)
func NewDBRunnableFromMap(data map[string]interface{}) (*DBRunnable, error) {
......@@ -50,6 +51,7 @@ type DBRunnable struct {
Type string
DSN string
Query string
mu sync.Mutex
}
type dbKey struct{}
......@@ -109,6 +111,8 @@ func (d *DBRunnable) GetType() string {
}
func (d *DBRunnable) GetPersistence() RunnableImport {
d.mu.Lock()
defer d.mu.Unlock()
data := JSONMap{
"type": d.Type,
......
......@@ -9,6 +9,7 @@ import (
"fmt"
"io"
"net/http"
"sync"
)
func NewHTTPRunnableFromMap(data map[string]interface{}) (*HTTPRunnable, error) {
......@@ -62,6 +63,7 @@ type HTTPRunnable struct {
Method string
Header map[string]string
Body string
mu sync.Mutex
}
func (h *HTTPRunnable) Run(ctx context.Context) (RunResult[HTTPResult], error) {
......@@ -102,6 +104,8 @@ func (h *HTTPRunnable) GetType() string {
}
func (h *HTTPRunnable) GetPersistence() RunnableImport {
h.mu.Lock()
defer h.mu.Unlock()
data := JSONMap{
"url": h.URL,
......
......@@ -7,6 +7,7 @@ import (
"context"
"fmt"
"net/smtp"
"sync"
)
func NewMailRunnableFromMap(data map[string]interface{}) (*MailRunnable, error) {
......@@ -96,6 +97,7 @@ type MailRunnable struct {
Username string
Password string
Headers map[string]string
mu sync.Mutex
}
func (m *MailRunnable) Run(_ context.Context) (RunResult[MailResult], error) {
......@@ -158,6 +160,8 @@ func (m *MailRunnable) GetType() string {
}
func (m *MailRunnable) GetPersistence() RunnableImport {
m.mu.Lock()
defer m.mu.Unlock()
data := JSONMap{
"to": m.To,
......
......@@ -10,6 +10,7 @@ import (
"golang.org/x/crypto/ssh"
"io"
"os"
"sync"
)
func NewSFTPRunnableFromMap(data map[string]interface{}) (*SFTPRunnable, error) {
......@@ -113,6 +114,7 @@ type SFTPRunnable struct {
SrcDir string
DstDir string
TransferDirection Direction
mu sync.Mutex
}
func (s *SFTPRunnable) Run(_ context.Context) (RunResult[SFTPResult], error) {
......@@ -302,6 +304,8 @@ func (s *SFTPRunnable) GetType() string {
}
func (s *SFTPRunnable) GetPersistence() RunnableImport {
s.mu.Lock()
defer s.mu.Unlock()
data := JSONMap{
"host": s.Host,
......
......@@ -9,6 +9,7 @@ import (
"os"
"os/exec"
"strings"
"sync"
)
func NewShellRunnableFromMap(data map[string]interface{}) (*ShellRunnable, error) {
......@@ -51,6 +52,7 @@ func (s *ShellResult) GetError() (string, int) {
type ShellRunnable struct {
ScriptPath string
Script string
mu sync.Mutex
}
func (s *ShellRunnable) Run(ctx context.Context) (RunResult[ShellResult], error) {
......@@ -129,6 +131,8 @@ func (s *ShellRunnable) GetType() string {
}
func (s *ShellRunnable) GetPersistence() RunnableImport {
s.mu.Lock()
defer s.mu.Unlock()
data := JSONMap{
"scriptPath": s.ScriptPath,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment