Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • master
  • v1.0.0
  • v1.0.1
  • v1.1.0
  • v1.10.0
  • v1.10.1
  • v1.10.2
  • v1.11.0
  • v1.12.0
  • v1.12.1
  • v1.12.2
  • v1.12.3
  • v1.12.4
  • v1.12.5
  • v1.12.6
  • v1.12.7
  • v1.12.8
  • v1.13.0
  • v1.13.1
  • v1.13.2
  • v1.14.0
  • v1.15.0
  • v1.15.1
  • v1.15.10
  • v1.15.11
  • v1.15.12
  • v1.15.13
  • v1.15.14
  • v1.15.15
  • v1.15.16
  • v1.15.17
  • v1.15.2
  • v1.15.3
  • v1.15.4
  • v1.15.5
  • v1.15.6
  • v1.15.7
  • v1.15.8
  • v1.15.9
  • v1.16.0
  • v1.16.1
  • v1.17.0
  • v1.18.0
  • v1.18.1
  • v1.18.2
  • v1.19.0
  • v1.19.1
  • v1.19.2
  • v1.19.3
  • v1.19.4
  • v1.2.0
  • v1.20.0
  • v1.20.1
  • v1.20.2
  • v1.20.3
  • v1.21.0
  • v1.21.1
  • v1.22.0
  • v1.23.0
  • v1.23.1
  • v1.23.2
  • v1.3.0
  • v1.3.1
  • v1.3.2
  • v1.4.0
  • v1.5.0
  • v1.5.1
  • v1.6.0
  • v1.6.1
  • v1.7.0
  • v1.7.1
  • v1.7.2
  • v1.7.3
  • v1.8.0
  • v1.8.1
  • v1.9.0
76 results

Target

Select target project
  • oss/libraries/go/services/job-queues
1 result
Select Git revision
  • master
  • v1.0.0
  • v1.0.1
  • v1.1.0
  • v1.10.0
  • v1.10.1
  • v1.10.2
  • v1.11.0
  • v1.12.0
  • v1.12.1
  • v1.12.2
  • v1.12.3
  • v1.12.4
  • v1.12.5
  • v1.12.6
  • v1.12.7
  • v1.12.8
  • v1.13.0
  • v1.13.1
  • v1.13.2
  • v1.14.0
  • v1.15.0
  • v1.15.1
  • v1.15.10
  • v1.15.11
  • v1.15.12
  • v1.15.13
  • v1.15.14
  • v1.15.15
  • v1.15.16
  • v1.15.17
  • v1.15.2
  • v1.15.3
  • v1.15.4
  • v1.15.5
  • v1.15.6
  • v1.15.7
  • v1.15.8
  • v1.15.9
  • v1.16.0
  • v1.16.1
  • v1.17.0
  • v1.18.0
  • v1.18.1
  • v1.18.2
  • v1.19.0
  • v1.19.1
  • v1.19.2
  • v1.19.3
  • v1.19.4
  • v1.2.0
  • v1.20.0
  • v1.20.1
  • v1.20.2
  • v1.20.3
  • v1.21.0
  • v1.21.1
  • v1.22.0
  • v1.23.0
  • v1.23.1
  • v1.23.2
  • v1.3.0
  • v1.3.1
  • v1.3.2
  • v1.4.0
  • v1.5.0
  • v1.5.1
  • v1.6.0
  • v1.6.1
  • v1.7.0
  • v1.7.1
  • v1.7.2
  • v1.7.3
  • v1.8.0
  • v1.8.1
  • v1.9.0
76 results
Show changes
......@@ -8,6 +8,7 @@ import (
"fmt"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"sync"
)
func NewDBRunnableFromMap(data map[string]interface{}) (*DBRunnable, error) {
......@@ -50,6 +51,7 @@ type DBRunnable struct {
Type string
DSN string
Query string
mu sync.Mutex
}
type dbKey struct{}
......@@ -109,6 +111,8 @@ func (d *DBRunnable) GetType() string {
}
func (d *DBRunnable) GetPersistence() RunnableImport {
d.mu.Lock()
defer d.mu.Unlock()
data := JSONMap{
"type": d.Type,
......
......@@ -9,6 +9,7 @@ import (
"fmt"
"io"
"net/http"
"sync"
)
func NewHTTPRunnableFromMap(data map[string]interface{}) (*HTTPRunnable, error) {
......@@ -62,6 +63,7 @@ type HTTPRunnable struct {
Method string
Header map[string]string
Body string
mu sync.Mutex
}
func (h *HTTPRunnable) Run(ctx context.Context) (RunResult[HTTPResult], error) {
......@@ -102,6 +104,8 @@ func (h *HTTPRunnable) GetType() string {
}
func (h *HTTPRunnable) GetPersistence() RunnableImport {
h.mu.Lock()
defer h.mu.Unlock()
data := JSONMap{
"url": h.URL,
......
......@@ -7,6 +7,7 @@ import (
"context"
"fmt"
"net/smtp"
"sync"
)
func NewMailRunnableFromMap(data map[string]interface{}) (*MailRunnable, error) {
......@@ -96,6 +97,7 @@ type MailRunnable struct {
Username string
Password string
Headers map[string]string
mu sync.Mutex
}
func (m *MailRunnable) Run(_ context.Context) (RunResult[MailResult], error) {
......@@ -158,6 +160,8 @@ func (m *MailRunnable) GetType() string {
}
func (m *MailRunnable) GetPersistence() RunnableImport {
m.mu.Lock()
defer m.mu.Unlock()
data := JSONMap{
"to": m.To,
......
......@@ -10,6 +10,7 @@ import (
"golang.org/x/crypto/ssh"
"io"
"os"
"sync"
)
func NewSFTPRunnableFromMap(data map[string]interface{}) (*SFTPRunnable, error) {
......@@ -113,6 +114,7 @@ type SFTPRunnable struct {
SrcDir string
DstDir string
TransferDirection Direction
mu sync.Mutex
}
func (s *SFTPRunnable) Run(_ context.Context) (RunResult[SFTPResult], error) {
......@@ -302,6 +304,8 @@ func (s *SFTPRunnable) GetType() string {
}
func (s *SFTPRunnable) GetPersistence() RunnableImport {
s.mu.Lock()
defer s.mu.Unlock()
data := JSONMap{
"host": s.Host,
......
......@@ -9,6 +9,7 @@ import (
"os"
"os/exec"
"strings"
"sync"
)
func NewShellRunnableFromMap(data map[string]interface{}) (*ShellRunnable, error) {
......@@ -51,6 +52,7 @@ func (s *ShellResult) GetError() (string, int) {
type ShellRunnable struct {
ScriptPath string
Script string
mu sync.Mutex
}
func (s *ShellRunnable) Run(ctx context.Context) (RunResult[ShellResult], error) {
......@@ -129,6 +131,8 @@ func (s *ShellRunnable) GetType() string {
}
func (s *ShellRunnable) GetPersistence() RunnableImport {
s.mu.Lock()
defer s.mu.Unlock()
data := JSONMap{
"scriptPath": s.ScriptPath,
......
......@@ -37,7 +37,6 @@ func (s *IntervalScheduler) Schedule(job GenericJob, eventBus *EventBus) error {
for {
select {
case <-ticker.C:
if !job.IsPaused() {
eventBus.Publish(QueueJob, job)
}
......
......@@ -181,6 +181,10 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel
select {
case job := <-jobChannel:
if stopFlag {
break
}
w.statisticMu.Lock()
w.statistic.JobsAssigned++
w.statistic.ActiveThreads++
......@@ -233,12 +237,14 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel
cancel()
if w.manager != nil && w.manager.dbSaver != nil {
err = w.manager.dbSaver.SaveJob(job)
if err != nil {
Error("Error while saving job", "job_id", job.GetID(), "error", err)
if w.manager != nil {
go func() {
w.manager.mu.Lock()
if w.manager.jobSyncer != nil {
w.manager.jobSyncer.AddJob(job)
}
w.manager.mu.Unlock()
}()
}
w.statisticMu.Lock()
......@@ -246,6 +252,7 @@ func (w *LocalWorker) run(jobChannel chan GenericJob, stopChan chan bool, cancel
w.statisticMu.Unlock()
case <-stopChan:
Info("Stopping worker thread", "worker", w.ID, "thread_id", workerThreadID)
stopFlag = true
break
}
......
......@@ -39,6 +39,10 @@ func (j DummyJob) ResetStats() {
}
func (j DummyJob) GetStats() JobStats {
return JobStats{}
}
func (j DummyJob) GetMaxRetries() uint {
return 0
}
......