// Copyright 2023 schukai GmbH // SPDX-License-Identifier: AGPL-3.0 package jobqueue import ( "sync" "time" ) type JobSyncer struct { mu sync.Mutex migrated bool manager *Manager running sync.WaitGroup lastError error } func NewJobSyncer(manager *Manager) *JobSyncer { js := &JobSyncer{ manager: manager, mu: sync.Mutex{}, } if manager == nil || manager.database == nil { return js } db := js.manager.database if !js.migrated { err := db.AutoMigrate(&JobPersistence{}, &JobLog{}, &JobStats{}) if err != nil { js.lastError = err } else { js.migrated = true } } return js } func (js *JobSyncer) Wait(timeout time.Duration) error { done := make(chan struct{}) go func() { js.running.Wait() close(done) }() select { case <-done: // Die WaitGroup ist fertig return nil case <-time.After(timeout): return ErrTimeoutReached } } func (js *JobSyncer) Sync(job GenericJob) error { js.mu.Lock() defer js.mu.Unlock() persistenceJob := job.GetPersistence() js.running.Add(1) go func() { defer js.running.Done() err := js.CheckAndSaveOrUpdate(persistenceJob) if err != nil { Error("Error while creating or updating the job", err) } }() return nil } func (js *JobSyncer) LastError() error { js.mu.Lock() defer js.mu.Unlock() return js.lastError } // Migrated returns if the database has been migrated. // // No parameters. // Returns a boolean. func (js *JobSyncer) Migrated() bool { js.mu.Lock() defer js.mu.Unlock() return js.migrated }