Skip to content
Snippets Groups Projects
Select Git revision
  • f077f3dd6f184be15bf09928ae8336593c8d6abc
  • master default protected
  • 1.31
  • 4.38.7
  • 4.38.6
  • 4.38.5
  • 4.38.4
  • 4.38.3
  • 4.38.2
  • 4.38.1
  • 4.38.0
  • 4.37.2
  • 4.37.1
  • 4.37.0
  • 4.36.0
  • 4.35.0
  • 4.34.1
  • 4.34.0
  • 4.33.1
  • 4.33.0
  • 4.32.2
  • 4.32.1
  • 4.32.0
23 results

template.js

Blame
  • database.go 4.84 KiB
    // Copyright 2024 schukai GmbH
    // SPDX-License-Identifier: AGPL-3.0
    
    package jobqueue
    
    import (
    	"errors"
    	"fmt"
    	"gorm.io/gorm"
    	"strings"
    	"time"
    )
    
    func (s *JobSyncer) CheckAndSaveOrUpdate(job JobPersistence) error {
    	s.mu.Lock()
    	defer s.mu.Unlock()
    
    	if s.manager == nil || s.manager.database == nil {
    		return ErrNoDatabaseConnection
    	}
    
    	db := s.manager.database
    	fmt.Println("Database: ", db)
    	var existing JobPersistence
    	result := db.Where("id = ?", job.GetID()).First(&existing)
    
    	if result.Error == nil {
    		return updateJob(job, db)
    	}
    
    	if errors.Is(result.Error, gorm.ErrRecordNotFound) {
    		return saveJob(job, db)
    	}
    
    	return result.Error
    }
    
    func (s *JobSyncer) DeleteJob(job GenericJob) error {
    	s.mu.Lock()
    	defer s.mu.Unlock()
    
    	if s.manager == nil || s.manager.database == nil {
    		return ErrNoDatabaseConnection
    	}
    
    	return s.manager.database.Transaction(func(tx *gorm.DB) error {
    		permJob := job.GetPersistence()
    		if err := tx.Where("job_id = ?", permJob.GetID()).Delete(&JobLog{}).Error; err != nil {
    			return err
    		}
    		if err := tx.Where("job_id = ?", permJob.GetID()).Delete(&JobStats{}).Error; err != nil {
    			return err
    		}
    		if err := tx.Delete(&permJob).Error; err != nil {
    			return err
    		}
    		return nil
    	})
    }
    
    func (s *JobSyncer) ResetLogs(job GenericJob) error {
    	s.mu.Lock()
    	defer s.mu.Unlock()
    
    	if s.manager == nil || s.manager.database == nil {
    		return ErrNoDatabaseConnection
    	}
    
    	return s.manager.database.Transaction(func(tx *gorm.DB) error {
    		permJob := job.GetPersistence()
    		if err := tx.Unscoped().Where("job_id = ?", permJob.GetID()).Delete(&JobLog{}).Error; err != nil {
    			return err
    		}
    
    		return nil
    	})
    }
    
    func (s *JobSyncer) ResetStats(job GenericJob) error {
    	s.mu.Lock()
    	defer s.mu.Unlock()
    
    	if s.manager == nil || s.manager.database == nil {
    		return ErrNoDatabaseConnection
    	}
    
    	job.ResetStats()
    	if s.manager == nil || s.manager.database == nil {
    		return ErrNoDatabaseConnection
    	}
    	return updateJob(job.GetPersistence(), s.manager.database)
    
    }
    
    func (s *JobSyncer) UpdateJob(job GenericJob) error {
    	s.mu.Lock()
    	defer s.mu.Unlock()
    	if s.manager == nil || s.manager.database == nil {
    		return ErrNoDatabaseConnection
    	}
    	return updateJob(job.GetPersistence(), s.manager.database)
    }
    
    func updateJob(job JobPersistence, db *gorm.DB) error {
    
    	if db == nil {
    		return ErrNoDatabaseConnection
    	}
    
    	maxRetries := 3
    	var attempt int
    	for attempt = 0; attempt < maxRetries; attempt++ {
    		err := update(&job, db)
    		if err != nil {
    			if strings.Contains(err.Error(), "lock") {
    				time.Sleep(time.Millisecond * 100)
    				continue
    			}
    			return err
    		}
    		break
    	}
    
    	if attempt == maxRetries {
    		return ErrMaxRetriesReached
    	}
    
    	return nil
    
    }
    
    func update(job *JobPersistence, db *gorm.DB) error {
    
    	if db == nil {
    		return ErrNoDatabaseConnection
    	}
    
    	return db.Transaction(func(tx *gorm.DB) error {
    
    		if job.Stats != (JobStats{}) {
    			Info("Updating stats for job %s", job.ID)
    
    			if job.Stats.RunCount == 0 {
    				Info("Stats runCount is 0, skipping update")
    			}
    
    			if err := tx.Model(job.Stats).
    				Select("*").
    				Omit("job_id", "created_at").
    				Updates(job.Stats).Error; err != nil {
    				return err // Fehler beim Update
    			}
    		}
    
    		for i := range job.Logs {
    			job.Logs[i].LogID = 0
    			_ = tx.Create(&job.Logs[i])
    			// no error handling, if it fails, it fails
    		}
    
    		return nil
    	})
    }
    
    func (s *JobSyncer) SaveJob(job GenericJob) error {
    	s.mu.Lock()
    	defer s.mu.Unlock()
    	if s.manager == nil || s.manager.database == nil {
    		return ErrNoDatabaseConnection
    	}
    	return saveJob(job.GetPersistence(), s.manager.database)
    }
    
    func save(job *JobPersistence, db *gorm.DB) error {
    
    	if db == nil {
    		return ErrNoDatabaseConnection
    	}
    
    	return db.Transaction(func(tx *gorm.DB) error {
    
    		var existingJob JobPersistence
    		if err := tx.First(&existingJob, "id = ?", job.ID).Error; err != nil {
    			if errors.Is(err, gorm.ErrRecordNotFound) {
    				if err := tx.Create(job).Error; err != nil {
    					return ErrFailedToCreate
    				}
    			} else {
    				return ErrFailedToQueryExistingJob
    			}
    		} else {
    			if err := tx.Save(job).Error; err != nil {
    				return ErrFailedToSaveJob
    			}
    		}
    
    		if job.Stats != (JobStats{}) {
    			job.Stats.JobID = job.ID
    
    			if err := tx.Model(job.Stats).
    				Select("*").
    				Omit("job_id", "created_at").
    				Updates(job.Stats).Error; err != nil {
    				return err
    			}
    		}
    
    		for i := range job.Logs {
    			job.Logs[i].LogID = 0
    			_ = tx.Create(&job.Logs[i])
    			// no error handling, if it fails, it fails
    		}
    
    		return nil
    	})
    }
    
    func saveJob(job JobPersistence, db *gorm.DB) error {
    
    	if db == nil {
    		return ErrNoDatabaseConnection
    	}
    
    	maxRetries := 3
    	var attempt int
    	for attempt = 0; attempt < maxRetries; attempt++ {
    		err := save(&job, db)
    		if err != nil {
    			if strings.Contains(err.Error(), "lock") {
    				time.Sleep(time.Millisecond * 100)
    				continue
    			}
    			return err
    		}
    		break
    	}
    
    	if attempt == maxRetries {
    		return ErrMaxRetriesReached
    	}
    
    	return nil
    
    }