Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
Loading items

Target

Select target project
  • oss/libraries/go/services/job-queues
1 result
Select Git revision
Loading items
Show changes
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"fmt" "fmt"
"gorm.io/driver/mysql" "gorm.io/driver/mysql"
"gorm.io/gorm" "gorm.io/gorm"
"sync"
) )
func NewDBRunnableFromMap(data map[string]interface{}) (*DBRunnable, error) { func NewDBRunnableFromMap(data map[string]interface{}) (*DBRunnable, error) {
...@@ -50,6 +51,7 @@ type DBRunnable struct { ...@@ -50,6 +51,7 @@ type DBRunnable struct {
Type string Type string
DSN string DSN string
Query string Query string
mu sync.Mutex
} }
type dbKey struct{} type dbKey struct{}
...@@ -109,6 +111,8 @@ func (d *DBRunnable) GetType() string { ...@@ -109,6 +111,8 @@ func (d *DBRunnable) GetType() string {
} }
func (d *DBRunnable) GetPersistence() RunnableImport { func (d *DBRunnable) GetPersistence() RunnableImport {
d.mu.Lock()
defer d.mu.Unlock()
data := JSONMap{ data := JSONMap{
"type": d.Type, "type": d.Type,
......
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"sync"
) )
func NewHTTPRunnableFromMap(data map[string]interface{}) (*HTTPRunnable, error) { func NewHTTPRunnableFromMap(data map[string]interface{}) (*HTTPRunnable, error) {
...@@ -62,6 +63,7 @@ type HTTPRunnable struct { ...@@ -62,6 +63,7 @@ type HTTPRunnable struct {
Method string Method string
Header map[string]string Header map[string]string
Body string Body string
mu sync.Mutex
} }
func (h *HTTPRunnable) Run(ctx context.Context) (RunResult[HTTPResult], error) { func (h *HTTPRunnable) Run(ctx context.Context) (RunResult[HTTPResult], error) {
...@@ -102,6 +104,8 @@ func (h *HTTPRunnable) GetType() string { ...@@ -102,6 +104,8 @@ func (h *HTTPRunnable) GetType() string {
} }
func (h *HTTPRunnable) GetPersistence() RunnableImport { func (h *HTTPRunnable) GetPersistence() RunnableImport {
h.mu.Lock()
defer h.mu.Unlock()
data := JSONMap{ data := JSONMap{
"url": h.URL, "url": h.URL,
......
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"context" "context"
"fmt" "fmt"
"net/smtp" "net/smtp"
"sync"
) )
func NewMailRunnableFromMap(data map[string]interface{}) (*MailRunnable, error) { func NewMailRunnableFromMap(data map[string]interface{}) (*MailRunnable, error) {
...@@ -96,6 +97,7 @@ type MailRunnable struct { ...@@ -96,6 +97,7 @@ type MailRunnable struct {
Username string Username string
Password string Password string
Headers map[string]string Headers map[string]string
mu sync.Mutex
} }
func (m *MailRunnable) Run(_ context.Context) (RunResult[MailResult], error) { func (m *MailRunnable) Run(_ context.Context) (RunResult[MailResult], error) {
...@@ -158,6 +160,8 @@ func (m *MailRunnable) GetType() string { ...@@ -158,6 +160,8 @@ func (m *MailRunnable) GetType() string {
} }
func (m *MailRunnable) GetPersistence() RunnableImport { func (m *MailRunnable) GetPersistence() RunnableImport {
m.mu.Lock()
defer m.mu.Unlock()
data := JSONMap{ data := JSONMap{
"to": m.To, "to": m.To,
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"golang.org/x/crypto/ssh" "golang.org/x/crypto/ssh"
"io" "io"
"os" "os"
"sync"
) )
func NewSFTPRunnableFromMap(data map[string]interface{}) (*SFTPRunnable, error) { func NewSFTPRunnableFromMap(data map[string]interface{}) (*SFTPRunnable, error) {
...@@ -113,6 +114,7 @@ type SFTPRunnable struct { ...@@ -113,6 +114,7 @@ type SFTPRunnable struct {
SrcDir string SrcDir string
DstDir string DstDir string
TransferDirection Direction TransferDirection Direction
mu sync.Mutex
} }
func (s *SFTPRunnable) Run(_ context.Context) (RunResult[SFTPResult], error) { func (s *SFTPRunnable) Run(_ context.Context) (RunResult[SFTPResult], error) {
...@@ -302,6 +304,8 @@ func (s *SFTPRunnable) GetType() string { ...@@ -302,6 +304,8 @@ func (s *SFTPRunnable) GetType() string {
} }
func (s *SFTPRunnable) GetPersistence() RunnableImport { func (s *SFTPRunnable) GetPersistence() RunnableImport {
s.mu.Lock()
defer s.mu.Unlock()
data := JSONMap{ data := JSONMap{
"host": s.Host, "host": s.Host,
......
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"os" "os"
"os/exec" "os/exec"
"strings" "strings"
"sync"
) )
func NewShellRunnableFromMap(data map[string]interface{}) (*ShellRunnable, error) { func NewShellRunnableFromMap(data map[string]interface{}) (*ShellRunnable, error) {
...@@ -51,6 +52,7 @@ func (s *ShellResult) GetError() (string, int) { ...@@ -51,6 +52,7 @@ func (s *ShellResult) GetError() (string, int) {
type ShellRunnable struct { type ShellRunnable struct {
ScriptPath string ScriptPath string
Script string Script string
mu sync.Mutex
} }
func (s *ShellRunnable) Run(ctx context.Context) (RunResult[ShellResult], error) { func (s *ShellRunnable) Run(ctx context.Context) (RunResult[ShellResult], error) {
...@@ -129,6 +131,8 @@ func (s *ShellRunnable) GetType() string { ...@@ -129,6 +131,8 @@ func (s *ShellRunnable) GetType() string {
} }
func (s *ShellRunnable) GetPersistence() RunnableImport { func (s *ShellRunnable) GetPersistence() RunnableImport {
s.mu.Lock()
defer s.mu.Unlock()
data := JSONMap{ data := JSONMap{
"scriptPath": s.ScriptPath, "scriptPath": s.ScriptPath,
......
...@@ -37,7 +37,6 @@ func (s *IntervalScheduler) Schedule(job GenericJob, eventBus *EventBus) error { ...@@ -37,7 +37,6 @@ func (s *IntervalScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
if !job.IsPaused() { if !job.IsPaused() {
eventBus.Publish(QueueJob, job) eventBus.Publish(QueueJob, job)
} }
......
...@@ -181,6 +181,10 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel ...@@ -181,6 +181,10 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel
select { select {
case job := <-jobChannel: case job := <-jobChannel:
if stopFlag {
break
}
w.statisticMu.Lock() w.statisticMu.Lock()
w.statistic.JobsAssigned++ w.statistic.JobsAssigned++
w.statistic.ActiveThreads++ w.statistic.ActiveThreads++
...@@ -233,12 +237,14 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel ...@@ -233,12 +237,14 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel
cancel() cancel()
if w.manager != nil && w.manager.dbSaver != nil { if w.manager != nil {
err = w.manager.dbSaver.SaveJob(job) go func() {
if err != nil { w.manager.mu.Lock()
Error("Error while saving job", "job_id", job.GetID(), "error", err) if w.manager.jobSyncer != nil {
w.manager.jobSyncer.AddJob(job)
} }
w.manager.mu.Unlock()
}()
} }
w.statisticMu.Lock() w.statisticMu.Lock()
...@@ -246,6 +252,7 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel ...@@ -246,6 +252,7 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel
w.statisticMu.Unlock() w.statisticMu.Unlock()
case <-stopChan: case <-stopChan:
Info("Stopping worker thread", "worker", w.ID, "thread_id", workerThreadID)
stopFlag = true stopFlag = true
break break
} }
......
...@@ -39,6 +39,10 @@ func (j DummyJob) ResetStats() { ...@@ -39,6 +39,10 @@ func (j DummyJob) ResetStats() {
} }
func (j DummyJob) GetStats() JobStats {
return JobStats{}
}
func (j DummyJob) GetMaxRetries() uint { func (j DummyJob) GetMaxRetries() uint {
return 0 return 0
} }
......