From ffa01504aa544cf61a09543f217e3af07e6c982d Mon Sep 17 00:00:00 2001 From: Volker Schukai <volker.schukai@schukai.com> Date: Wed, 13 Mar 2024 10:48:03 +0100 Subject: [PATCH] fix: race condition in runnables #46 --- runnable-counter.go | 2 ++ runnable-dummy.go | 11 +++++++++-- runnable-fileoperation.go | 4 ++++ runnable-gorm.go | 4 ++++ runnable-http.go | 4 ++++ runnable-mail.go | 4 ++++ runnable-sftp.go | 4 ++++ runnable-shell.go | 4 ++++ 8 files changed, 35 insertions(+), 2 deletions(-) diff --git a/runnable-counter.go b/runnable-counter.go index 82c27b3..25f8208 100644 --- a/runnable-counter.go +++ b/runnable-counter.go @@ -68,6 +68,8 @@ func (c *CounterRunnable) GetType() string { } func (c *CounterRunnable) GetPersistence() RunnableImport { + c.mu.Lock() + defer c.mu.Unlock() data := JSONMap{ "count": c.Count, diff --git a/runnable-dummy.go b/runnable-dummy.go index 86d2c62..ec4ec1c 100644 --- a/runnable-dummy.go +++ b/runnable-dummy.go @@ -3,7 +3,10 @@ package jobqueue -import "context" +import ( + "context" + "sync" +) func NewDummyRunnableFromMap(data map[string]any) (*DummyRunnable, error) { return &DummyRunnable{}, nil @@ -13,7 +16,9 @@ func NewDummyRunnableFromMap(data map[string]any) (*DummyRunnable, error) { type DummyResult struct { } -type DummyRunnable struct{} +type DummyRunnable struct { + mu sync.Mutex +} func (d *DummyRunnable) Run(_ context.Context) (RunResult[DummyResult], error) { return RunResult[DummyResult]{ @@ -26,6 +31,8 @@ func (d *DummyRunnable) GetType() string { } func (c *DummyRunnable) GetPersistence() RunnableImport { + c.mu.Lock() + defer c.mu.Unlock() data := JSONMap{} diff --git a/runnable-fileoperation.go b/runnable-fileoperation.go index 9631fc8..333b8b2 100644 --- a/runnable-fileoperation.go +++ b/runnable-fileoperation.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "os" + "sync" ) func NewFileOperationRunnableFromMap(data map[string]interface{}) (*FileOperationRunnable, error) { @@ -60,6 +61,7 @@ type FileOperationRunnable struct { Operation string // z.B. "read", "write", "delete" FilePath string Content string // Optional, je nach Operation + mu sync.Mutex } func (f *FileOperationRunnable) Run(_ context.Context) (RunResult[FileOperationResult], error) { @@ -142,6 +144,8 @@ func (f *FileOperationRunnable) GetType() string { } func (f *FileOperationRunnable) GetPersistence() RunnableImport { + f.mu.Lock() + defer f.mu.Unlock() data := JSONMap{ "operation": f.Operation, diff --git a/runnable-gorm.go b/runnable-gorm.go index 8215f63..57f8d27 100644 --- a/runnable-gorm.go +++ b/runnable-gorm.go @@ -8,6 +8,7 @@ import ( "fmt" "gorm.io/driver/mysql" "gorm.io/gorm" + "sync" ) func NewDBRunnableFromMap(data map[string]interface{}) (*DBRunnable, error) { @@ -50,6 +51,7 @@ type DBRunnable struct { Type string DSN string Query string + mu sync.Mutex } type dbKey struct{} @@ -109,6 +111,8 @@ func (d *DBRunnable) GetType() string { } func (d *DBRunnable) GetPersistence() RunnableImport { + d.mu.Lock() + defer d.mu.Unlock() data := JSONMap{ "type": d.Type, diff --git a/runnable-http.go b/runnable-http.go index 8f3db27..d442377 100644 --- a/runnable-http.go +++ b/runnable-http.go @@ -9,6 +9,7 @@ import ( "fmt" "io" "net/http" + "sync" ) func NewHTTPRunnableFromMap(data map[string]interface{}) (*HTTPRunnable, error) { @@ -62,6 +63,7 @@ type HTTPRunnable struct { Method string Header map[string]string Body string + mu sync.Mutex } func (h *HTTPRunnable) Run(ctx context.Context) (RunResult[HTTPResult], error) { @@ -102,6 +104,8 @@ func (h *HTTPRunnable) GetType() string { } func (h *HTTPRunnable) GetPersistence() RunnableImport { + h.mu.Lock() + defer h.mu.Unlock() data := JSONMap{ "url": h.URL, diff --git a/runnable-mail.go b/runnable-mail.go index c942763..c75fa9c 100644 --- a/runnable-mail.go +++ b/runnable-mail.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "net/smtp" + "sync" ) func NewMailRunnableFromMap(data map[string]interface{}) (*MailRunnable, error) { @@ -96,6 +97,7 @@ type MailRunnable struct { Username string Password string Headers map[string]string + mu sync.Mutex } func (m *MailRunnable) Run(_ context.Context) (RunResult[MailResult], error) { @@ -158,6 +160,8 @@ func (m *MailRunnable) GetType() string { } func (m *MailRunnable) GetPersistence() RunnableImport { + m.mu.Lock() + defer m.mu.Unlock() data := JSONMap{ "to": m.To, diff --git a/runnable-sftp.go b/runnable-sftp.go index 1226f35..5ac2042 100644 --- a/runnable-sftp.go +++ b/runnable-sftp.go @@ -10,6 +10,7 @@ import ( "golang.org/x/crypto/ssh" "io" "os" + "sync" ) func NewSFTPRunnableFromMap(data map[string]interface{}) (*SFTPRunnable, error) { @@ -113,6 +114,7 @@ type SFTPRunnable struct { SrcDir string DstDir string TransferDirection Direction + mu sync.Mutex } func (s *SFTPRunnable) Run(_ context.Context) (RunResult[SFTPResult], error) { @@ -302,6 +304,8 @@ func (s *SFTPRunnable) GetType() string { } func (s *SFTPRunnable) GetPersistence() RunnableImport { + s.mu.Lock() + defer s.mu.Unlock() data := JSONMap{ "host": s.Host, diff --git a/runnable-shell.go b/runnable-shell.go index dbe09f7..2d2c54d 100644 --- a/runnable-shell.go +++ b/runnable-shell.go @@ -9,6 +9,7 @@ import ( "os" "os/exec" "strings" + "sync" ) func NewShellRunnableFromMap(data map[string]interface{}) (*ShellRunnable, error) { @@ -51,6 +52,7 @@ func (s *ShellResult) GetError() (string, int) { type ShellRunnable struct { ScriptPath string Script string + mu sync.Mutex } func (s *ShellRunnable) Run(ctx context.Context) (RunResult[ShellResult], error) { @@ -129,6 +131,8 @@ func (s *ShellRunnable) GetType() string { } func (s *ShellRunnable) GetPersistence() RunnableImport { + s.mu.Lock() + defer s.mu.Unlock() data := JSONMap{ "scriptPath": s.ScriptPath, -- GitLab