Skip to content
Snippets Groups Projects
Select Git revision
  • 993d1d2642c6d17b9fc3880fa196c81622ab33ef
  • master default protected
  • v1.23.2
  • v1.23.1
  • v1.23.0
  • v1.22.0
  • v1.21.1
  • v1.21.0
  • v1.20.3
  • v1.20.2
  • v1.20.1
  • v1.20.0
  • v1.19.4
  • v1.19.3
  • v1.19.2
  • v1.19.1
  • v1.19.0
  • v1.18.2
  • v1.18.1
  • v1.18.0
  • v1.17.0
  • v1.16.1
22 results

database.go

Blame
  • database.go 4.84 KiB
    // Copyright 2024 schukai GmbH
    // SPDX-License-Identifier: AGPL-3.0
    
    package jobqueue
    
    import (
    	"errors"
    	"fmt"
    	"gorm.io/gorm"
    	"strings"
    	"time"
    )
    
    func (s *JobSyncer) CheckAndSaveOrUpdate(job JobPersistence) error {
    	s.mu.Lock()
    	defer s.mu.Unlock()
    
    	if s.manager == nil || s.manager.database == nil {
    		return ErrNoDatabaseConnection
    	}
    
    	db := s.manager.database
    	fmt.Println("Database: ", db)
    	var existing JobPersistence
    	result := db.Where("id = ?", job.GetID()).First(&existing)
    
    	if result.Error == nil {
    		return updateJob(job, db)
    	}
    
    	if errors.Is(result.Error, gorm.ErrRecordNotFound) {
    		return saveJob(job, db)
    	}
    
    	return result.Error
    }
    
    func (s *JobSyncer) DeleteJob(job GenericJob) error {
    	s.mu.Lock()
    	defer s.mu.Unlock()
    
    	if s.manager == nil || s.manager.database == nil {
    		return ErrNoDatabaseConnection
    	}
    
    	return s.manager.database.Transaction(func(tx *gorm.DB) error {
    		permJob := job.GetPersistence()
    		if err := tx.Where("job_id = ?", permJob.GetID()).Delete(&JobLog{}).Error; err != nil {
    			return err
    		}
    		if err := tx.Where("job_id = ?", permJob.GetID()).Delete(&JobStats{}).Error; err != nil {
    			return err
    		}
    		if err := tx.Delete(&permJob).Error; err != nil {
    			return err
    		}
    		return nil
    	})
    }
    
    func (s *JobSyncer) ResetLogs(job GenericJob) error {
    	s.mu.Lock()
    	defer s.mu.Unlock()
    
    	if s.manager == nil || s.manager.database == nil {
    		return ErrNoDatabaseConnection
    	}
    
    	return s.manager.database.Transaction(func(tx *gorm.DB) error {
    		permJob := job.GetPersistence()
    		if err := tx.Unscoped().Where("job_id = ?", permJob.GetID()).Delete(&JobLog{}).Error; err != nil {
    			return err
    		}
    
    		return nil
    	})
    }
    
    func (s *JobSyncer) ResetStats(job GenericJob) error {
    	s.mu.Lock()
    	defer s.mu.Unlock()
    
    	if s.manager == nil || s.manager.database == nil {
    		return ErrNoDatabaseConnection
    	}
    
    	job.ResetStats()
    	if s.manager == nil || s.manager.database == nil {
    		return ErrNoDatabaseConnection
    	}
    	return updateJob(job.GetPersistence(), s.manager.database)
    
    }
    
    func (s *JobSyncer) UpdateJob(job GenericJob) error {
    	s.mu.Lock()
    	defer s.mu.Unlock()
    	if s.manager == nil || s.manager.database == nil {
    		return ErrNoDatabaseConnection
    	}
    	return updateJob(job.GetPersistence(), s.manager.database)
    }
    
    func updateJob(job JobPersistence, db *gorm.DB) error {
    
    	if db == nil {
    		return ErrNoDatabaseConnection
    	}
    
    	maxRetries := 3
    	var attempt int
    	for attempt = 0; attempt < maxRetries; attempt++ {
    		err := update(&job, db)
    		if err != nil {
    			if strings.Contains(err.Error(), "lock") {
    				time.Sleep(time.Millisecond * 100)
    				continue
    			}
    			return err
    		}
    		break
    	}
    
    	if attempt == maxRetries {
    		return ErrMaxRetriesReached
    	}
    
    	return nil
    
    }
    
    func update(job *JobPersistence, db *gorm.DB) error {
    
    	if db == nil {
    		return ErrNoDatabaseConnection
    	}
    
    	return db.Transaction(func(tx *gorm.DB) error {
    
    		if job.Stats != (JobStats{}) {
    			Info("Updating stats for job %s", job.ID)
    
    			if job.Stats.RunCount == 0 {
    				Info("Stats runCount is 0, skipping update")
    			}
    
    			if err := tx.Model(job.Stats).
    				Select("*").
    				Omit("job_id", "created_at").
    				Updates(job.Stats).Error; err != nil {
    				return err // Fehler beim Update
    			}
    		}
    
    		for i := range job.Logs {
    			job.Logs[i].LogID = 0
    			_ = tx.Create(&job.Logs[i])
    			// no error handling, if it fails, it fails
    		}
    
    		return nil
    	})
    }
    
    func (s *JobSyncer) SaveJob(job GenericJob) error {
    	s.mu.Lock()
    	defer s.mu.Unlock()
    	if s.manager == nil || s.manager.database == nil {
    		return ErrNoDatabaseConnection
    	}
    	return saveJob(job.GetPersistence(), s.manager.database)
    }
    
    func save(job *JobPersistence, db *gorm.DB) error {
    
    	if db == nil {
    		return ErrNoDatabaseConnection
    	}
    
    	return db.Transaction(func(tx *gorm.DB) error {
    
    		var existingJob JobPersistence
    		if err := tx.First(&existingJob, "id = ?", job.ID).Error; err != nil {
    			if errors.Is(err, gorm.ErrRecordNotFound) {
    				if err := tx.Create(job).Error; err != nil {
    					return ErrFailedToCreate
    				}
    			} else {
    				return ErrFailedToQueryExistingJob
    			}
    		} else {
    			if err := tx.Save(job).Error; err != nil {
    				return ErrFailedToSaveJob
    			}
    		}
    
    		if job.Stats != (JobStats{}) {
    			job.Stats.JobID = job.ID
    
    			if err := tx.Model(job.Stats).
    				Select("*").
    				Omit("job_id", "created_at").
    				Updates(job.Stats).Error; err != nil {
    				return err
    			}
    		}
    
    		for i := range job.Logs {
    			job.Logs[i].LogID = 0
    			_ = tx.Create(&job.Logs[i])
    			// no error handling, if it fails, it fails
    		}
    
    		return nil
    	})
    }
    
    func saveJob(job JobPersistence, db *gorm.DB) error {
    
    	if db == nil {
    		return ErrNoDatabaseConnection
    	}
    
    	maxRetries := 3
    	var attempt int
    	for attempt = 0; attempt < maxRetries; attempt++ {
    		err := save(&job, db)
    		if err != nil {
    			if strings.Contains(err.Error(), "lock") {
    				time.Sleep(time.Millisecond * 100)
    				continue
    			}
    			return err
    		}
    		break
    	}
    
    	if attempt == maxRetries {
    		return ErrMaxRetriesReached
    	}
    
    	return nil
    
    }