diff --git a/runnable-counter.go b/runnable-counter.go index 82c27b3ef6bfda06f6fcecf74e7623cef8efac86..25f8208309ff3ca18d841e8e8b63298ab0d1bbcf 100644 --- a/runnable-counter.go +++ b/runnable-counter.go @@ -68,6 +68,8 @@ func (c *CounterRunnable) GetType() string { } func (c *CounterRunnable) GetPersistence() RunnableImport { + c.mu.Lock() + defer c.mu.Unlock() data := JSONMap{ "count": c.Count, diff --git a/runnable-dummy.go b/runnable-dummy.go index 86d2c62ba151b80ccbd8f24c5da97488e80d6335..ec4ec1cbb32e9583c2213bf5d6912f50bbe4b2f2 100644 --- a/runnable-dummy.go +++ b/runnable-dummy.go @@ -3,7 +3,10 @@ package jobqueue -import "context" +import ( + "context" + "sync" +) func NewDummyRunnableFromMap(data map[string]any) (*DummyRunnable, error) { return &DummyRunnable{}, nil @@ -13,7 +16,9 @@ func NewDummyRunnableFromMap(data map[string]any) (*DummyRunnable, error) { type DummyResult struct { } -type DummyRunnable struct{} +type DummyRunnable struct { + mu sync.Mutex +} func (d *DummyRunnable) Run(_ context.Context) (RunResult[DummyResult], error) { return RunResult[DummyResult]{ @@ -26,6 +31,8 @@ func (d *DummyRunnable) GetType() string { } func (c *DummyRunnable) GetPersistence() RunnableImport { + c.mu.Lock() + defer c.mu.Unlock() data := JSONMap{} diff --git a/runnable-fileoperation.go b/runnable-fileoperation.go index 9631fc84b212ef4db6ed97b11b6865929e93bb52..333b8b2e89484bc307c134a0508e7ebdf40f0a14 100644 --- a/runnable-fileoperation.go +++ b/runnable-fileoperation.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "os" + "sync" ) func NewFileOperationRunnableFromMap(data map[string]interface{}) (*FileOperationRunnable, error) { @@ -60,6 +61,7 @@ type FileOperationRunnable struct { Operation string // z.B. "read", "write", "delete" FilePath string Content string // Optional, je nach Operation + mu sync.Mutex } func (f *FileOperationRunnable) Run(_ context.Context) (RunResult[FileOperationResult], error) { @@ -142,6 +144,8 @@ func (f *FileOperationRunnable) GetType() string { } func (f *FileOperationRunnable) GetPersistence() RunnableImport { + f.mu.Lock() + defer f.mu.Unlock() data := JSONMap{ "operation": f.Operation, diff --git a/runnable-gorm.go b/runnable-gorm.go index 8215f63e521387730f56cd702511f093f2835fb6..57f8d273e0dacc8af15067f3db0fdb69850ebc08 100644 --- a/runnable-gorm.go +++ b/runnable-gorm.go @@ -8,6 +8,7 @@ import ( "fmt" "gorm.io/driver/mysql" "gorm.io/gorm" + "sync" ) func NewDBRunnableFromMap(data map[string]interface{}) (*DBRunnable, error) { @@ -50,6 +51,7 @@ type DBRunnable struct { Type string DSN string Query string + mu sync.Mutex } type dbKey struct{} @@ -109,6 +111,8 @@ func (d *DBRunnable) GetType() string { } func (d *DBRunnable) GetPersistence() RunnableImport { + d.mu.Lock() + defer d.mu.Unlock() data := JSONMap{ "type": d.Type, diff --git a/runnable-http.go b/runnable-http.go index 8f3db274fc516f8870b881fd9f2df7da72dc3c79..d4423773890c1546c8715952093a4751de8b4491 100644 --- a/runnable-http.go +++ b/runnable-http.go @@ -9,6 +9,7 @@ import ( "fmt" "io" "net/http" + "sync" ) func NewHTTPRunnableFromMap(data map[string]interface{}) (*HTTPRunnable, error) { @@ -62,6 +63,7 @@ type HTTPRunnable struct { Method string Header map[string]string Body string + mu sync.Mutex } func (h *HTTPRunnable) Run(ctx context.Context) (RunResult[HTTPResult], error) { @@ -102,6 +104,8 @@ func (h *HTTPRunnable) GetType() string { } func (h *HTTPRunnable) GetPersistence() RunnableImport { + h.mu.Lock() + defer h.mu.Unlock() data := JSONMap{ "url": h.URL, diff --git a/runnable-mail.go b/runnable-mail.go index c94276331440335ac229a0b97916137351d1f857..c75fa9c2418289f50a604ca8e8221252a67e973f 100644 --- a/runnable-mail.go +++ b/runnable-mail.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "net/smtp" + "sync" ) func NewMailRunnableFromMap(data map[string]interface{}) (*MailRunnable, error) { @@ -96,6 +97,7 @@ type MailRunnable struct { Username string Password string Headers map[string]string + mu sync.Mutex } func (m *MailRunnable) Run(_ context.Context) (RunResult[MailResult], error) { @@ -158,6 +160,8 @@ func (m *MailRunnable) GetType() string { } func (m *MailRunnable) GetPersistence() RunnableImport { + m.mu.Lock() + defer m.mu.Unlock() data := JSONMap{ "to": m.To, diff --git a/runnable-sftp.go b/runnable-sftp.go index 1226f3526f59004e9d229764f8bb2bc381bd3059..5ac20421ab2d726a6210ff31d15d4810f2ec2957 100644 --- a/runnable-sftp.go +++ b/runnable-sftp.go @@ -10,6 +10,7 @@ import ( "golang.org/x/crypto/ssh" "io" "os" + "sync" ) func NewSFTPRunnableFromMap(data map[string]interface{}) (*SFTPRunnable, error) { @@ -113,6 +114,7 @@ type SFTPRunnable struct { SrcDir string DstDir string TransferDirection Direction + mu sync.Mutex } func (s *SFTPRunnable) Run(_ context.Context) (RunResult[SFTPResult], error) { @@ -302,6 +304,8 @@ func (s *SFTPRunnable) GetType() string { } func (s *SFTPRunnable) GetPersistence() RunnableImport { + s.mu.Lock() + defer s.mu.Unlock() data := JSONMap{ "host": s.Host, diff --git a/runnable-shell.go b/runnable-shell.go index dbe09f7650eb31893c9d7df81d77ffa14ab3df14..2d2c54d868c49e1a7ea540f19b50c10d5be95bef 100644 --- a/runnable-shell.go +++ b/runnable-shell.go @@ -9,6 +9,7 @@ import ( "os" "os/exec" "strings" + "sync" ) func NewShellRunnableFromMap(data map[string]interface{}) (*ShellRunnable, error) { @@ -51,6 +52,7 @@ func (s *ShellResult) GetError() (string, int) { type ShellRunnable struct { ScriptPath string Script string + mu sync.Mutex } func (s *ShellRunnable) Run(ctx context.Context) (RunResult[ShellResult], error) { @@ -129,6 +131,8 @@ func (s *ShellRunnable) GetType() string { } func (s *ShellRunnable) GetPersistence() RunnableImport { + s.mu.Lock() + defer s.mu.Unlock() data := JSONMap{ "scriptPath": s.ScriptPath,