Select Git revision
database.go
database.go 4.84 KiB
// Copyright 2024 schukai GmbH
// SPDX-License-Identifier: AGPL-3.0
package jobqueue
import (
"errors"
"fmt"
"gorm.io/gorm"
"strings"
"time"
)
func (s *JobSyncer) CheckAndSaveOrUpdate(job JobPersistence) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.manager == nil || s.manager.database == nil {
return ErrNoDatabaseConnection
}
db := s.manager.database
fmt.Println("Database: ", db)
var existing JobPersistence
result := db.Where("id = ?", job.GetID()).First(&existing)
if result.Error == nil {
return updateJob(job, db)
}
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return saveJob(job, db)
}
return result.Error
}
func (s *JobSyncer) DeleteJob(job GenericJob) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.manager == nil || s.manager.database == nil {
return ErrNoDatabaseConnection
}
return s.manager.database.Transaction(func(tx *gorm.DB) error {
permJob := job.GetPersistence()
if err := tx.Where("job_id = ?", permJob.GetID()).Delete(&JobLog{}).Error; err != nil {
return err
}
if err := tx.Where("job_id = ?", permJob.GetID()).Delete(&JobStats{}).Error; err != nil {
return err
}
if err := tx.Delete(&permJob).Error; err != nil {
return err
}
return nil
})
}
func (s *JobSyncer) ResetLogs(job GenericJob) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.manager == nil || s.manager.database == nil {
return ErrNoDatabaseConnection
}
return s.manager.database.Transaction(func(tx *gorm.DB) error {
permJob := job.GetPersistence()
if err := tx.Unscoped().Where("job_id = ?", permJob.GetID()).Delete(&JobLog{}).Error; err != nil {
return err
}
return nil
})
}
func (s *JobSyncer) ResetStats(job GenericJob) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.manager == nil || s.manager.database == nil {
return ErrNoDatabaseConnection
}
job.ResetStats()
if s.manager == nil || s.manager.database == nil {
return ErrNoDatabaseConnection
}
return updateJob(job.GetPersistence(), s.manager.database)
}
func (s *JobSyncer) UpdateJob(job GenericJob) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.manager == nil || s.manager.database == nil {
return ErrNoDatabaseConnection
}
return updateJob(job.GetPersistence(), s.manager.database)
}
func updateJob(job JobPersistence, db *gorm.DB) error {
if db == nil {
return ErrNoDatabaseConnection
}
maxRetries := 3
var attempt int
for attempt = 0; attempt < maxRetries; attempt++ {
err := update(&job, db)
if err != nil {
if strings.Contains(err.Error(), "lock") {
time.Sleep(time.Millisecond * 100)
continue
}
return err
}
break
}
if attempt == maxRetries {
return ErrMaxRetriesReached
}
return nil
}
func update(job *JobPersistence, db *gorm.DB) error {
if db == nil {
return ErrNoDatabaseConnection
}
return db.Transaction(func(tx *gorm.DB) error {
if job.Stats != (JobStats{}) {
Info("Updating stats for job %s", job.ID)
if job.Stats.RunCount == 0 {
Info("Stats runCount is 0, skipping update")
}
if err := tx.Model(job.Stats).
Select("*").
Omit("job_id", "created_at").
Updates(job.Stats).Error; err != nil {
return err // Fehler beim Update
}
}
for i := range job.Logs {
job.Logs[i].LogID = 0
_ = tx.Create(&job.Logs[i])
// no error handling, if it fails, it fails
}
return nil
})
}
func (s *JobSyncer) SaveJob(job GenericJob) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.manager == nil || s.manager.database == nil {
return ErrNoDatabaseConnection
}
return saveJob(job.GetPersistence(), s.manager.database)
}
func save(job *JobPersistence, db *gorm.DB) error {
if db == nil {
return ErrNoDatabaseConnection
}
return db.Transaction(func(tx *gorm.DB) error {
var existingJob JobPersistence
if err := tx.First(&existingJob, "id = ?", job.ID).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
if err := tx.Create(job).Error; err != nil {
return ErrFailedToCreate
}
} else {
return ErrFailedToQueryExistingJob
}
} else {
if err := tx.Save(job).Error; err != nil {
return ErrFailedToSaveJob
}
}
if job.Stats != (JobStats{}) {
job.Stats.JobID = job.ID
if err := tx.Model(job.Stats).
Select("*").
Omit("job_id", "created_at").
Updates(job.Stats).Error; err != nil {
return err
}
}
for i := range job.Logs {
job.Logs[i].LogID = 0
_ = tx.Create(&job.Logs[i])
// no error handling, if it fails, it fails
}
return nil
})
}
func saveJob(job JobPersistence, db *gorm.DB) error {
if db == nil {
return ErrNoDatabaseConnection
}
maxRetries := 3
var attempt int
for attempt = 0; attempt < maxRetries; attempt++ {
err := save(&job, db)
if err != nil {
if strings.Contains(err.Error(), "lock") {
time.Sleep(time.Millisecond * 100)
continue
}
return err
}
break
}
if attempt == maxRetries {
return ErrMaxRetriesReached
}
return nil
}