diff --git a/README.md b/README.md index 381dacef4ed34ab98d52c8eac6b4099faf8a6d5c..c76caf7a247e6c9701c47f6d01a5f01959ca6862 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,14 @@ ## Overview -The `jobQueue` library in Go aims to serve as a Cron replacement, enabling the scheduled and event-driven execution of tasks in an organized manner. +These `jobQueue` library written in Go provides a simple interface for managing jobs and workers. +It is designed to be used in a distributed environment where multiple workers can be assigned to a job +queue. The library provides a `Manager` that can be used to manage workers, schedule jobs, and handle +the state of the job queue. + +Jobs can also be persisted to a database using Gorm. + +The library also provides a `Cron` instance for scheduling jobs. ## Getting Started @@ -10,143 +17,48 @@ The `jobQueue` library in Go aims to serve as a Cron replacement, enabling the s - Go 1.20+ -```bash -go get -u -``` - ### Installation ```bash -git clone https://github.com/yourusername/GoJobQueue.git -cd GoJobQueue -go build - +go get -u gitlab.schukai.com/oss/libraries/go/services/job-queues ``` ## Usage Import the package and create a new queue. -```bash -goCopy codeimport "github.com/yourusername/GoJobQueue" - -queue := GoJobQueue.New() - -``` - -Adding a job: - -```bash -goCopy codejob := func() { - // Your code here -} - -queue.Add(job) - -``` - -## API Reference - -### Manager - -The `Manager` is the central orchestrator for job execution, worker assignment, and event handling. It provides methods for managing workers, scheduling jobs, and handling the state of the job queue. - -#### Initializing a Manager - -To create a new `Manager` instance, use the `NewManager` function. - ```go -mng := jobqueue.NewManager() -``` - -#### Manager State - -A Manager can be in one of two states: - -- `ManagerStateStopped`: The manager is stopped. -- `ManagerStateRunning`: The manager is running and actively scheduling jobs. - -#### Configuration - -You can configure the manager using various setter methods: - -- `SetCronInstance(cronInstance *cron.Cron) *Manager` : Set a Cron instance for scheduled jobs. -- `SetDB(db *gorm.DB) *Manager` : Set a Gorm DB instance for database storage. - -```go -mng.SetCronInstance(cronInstance).SetDB(db) - -``` - -#### Managing Workers - -You can add or remove workers to/from the manager using the following methods: - -- `AddWorker(worker Worker) error` -- `RemoveWorker(worker Worker) error` - -```go -err := mng.AddWorker(worker) -err = mng.RemoveWorker(worker) - -``` - -#### Starting and Stopping the Manager - -The manager can be started or stopped using: - -- `Start() error` -- `Stop() error` - -```go -err := mng.Start() -err = mng.Stop() - -``` - -#### Scheduling Jobs - -To schedule a job for execution, use the `ScheduleJob` method. - -```go -err := mng.ScheduleJob(job, scheduler) - -``` - -#### Canceling Job Schedules - -To cancel a scheduled job, use `CancelJobSchedule`. - -```go -err := mng.CancelJobSchedule(jobID) - -``` - -#### Event Bus - -To get the EventBus instance, use `GetEventBus`. - -```go -eventBus := mng.GetEventBus() +import "gitlab.schukai.com/oss/libraries/go/services/job-queues" + + +func main() { + var err error + + m := jobqueue.NewManager() + w := jobqueue.NewWorker("worker1") + _ = m.AddWorker(w) + + _ = m.Start() + + job := jobqueue.Job{ + ID: "job1", + } + scheduler := jobqueue.InstantScheduler{} + + _ = m.ScheduleJob(job, &scheduler) + +} ``` -### Error Handling - -Errors returned by the Manager methods should be handled appropriately to ensure smooth operation. - ## Tests Run tests using: ```bash -go test ./... +task test ``` -## Contributing - -Please read [CONTRIBUTING.md](https://chat.openai.com/c/93eb4e0d-55a7-41a8-81a4-6e40c9d44dc2CONTRIBUTING.md) for the process for submitting pull requests. - ## License This project is licensed under the AGPL-3.0 License - see the [LICENSE.md](LICENSE.md) file for details. diff --git a/database.go b/database.go index 98d32e75d2a5cee06e0de51984352665e2e7933c..0bde58478fb4231e9878dd707743fff802d227bb 100644 --- a/database.go +++ b/database.go @@ -170,6 +170,63 @@ func (s *DBSaver) SaveJob(job GenericJob) error { return ErrDBSaverNotRunning } + defer func() { + if r := recover(); r != nil { + s.logError("Error while saving job", "error", r) + } + }() + s.saveChannel <- job return nil } + +func checkRunningSaver(s *DBSaver) (*gorm.DB, error) { + if s.manager == nil { + return nil, ErrNoManager + } + + if s.manager.database == nil { + return nil, ErrNoDatabaseConnection + } + + if !s.isStatus(DBSaverStatusRunning, false) { + return nil, ErrDBSaverNotRunning + } + + return s.manager.database, nil + +} + +// DeleteJob deletes a job from the database +func (s *DBSaver) DeleteJob(job GenericJob) error { + s.mu.Lock() + defer s.mu.Unlock() + var db *gorm.DB + var err error + + if db, err = checkRunningSaver(s); err != nil { + return err + } + + return db.Transaction(func(tx *gorm.DB) error { + permJob := job.GetPersistence() + + dbErr := tx.Where("job_id = ?", permJob.GetID()).Delete(&JobLog{}).Error + if dbErr != nil { + return dbErr + } + + dbErr = tx.Where("job_id = ?", permJob.GetID()).Delete(&JobStats{}).Error + if dbErr != nil { + return dbErr + } + + dbErr = tx.Delete(&permJob).Error + if dbErr != nil { + return dbErr + } + + return nil + }) + +} diff --git a/errors.go b/errors.go index 6936c041a1e901d90439611dc88e05f77fb4b102..a159e2192f42c95797059afbe7babda48e42a98d 100644 --- a/errors.go +++ b/errors.go @@ -41,4 +41,6 @@ var ( ErrJobNotActive = fmt.Errorf("job is not active") ErrJobAlreadyActive = fmt.Errorf("job is already active") ErrChannelAlreadyClosed = fmt.Errorf("channel is already closed") + ErrUnknownScheduleType = fmt.Errorf("unknown schedule type") + ErrNoManager = fmt.Errorf("no manager") ) diff --git a/manager.go b/manager.go index 1bf57d3fd5b7e759221ce4df6304a2eb22c20418..362dde614696e8b8f94459f0535e33d8d5d17ee3 100644 --- a/manager.go +++ b/manager.go @@ -107,6 +107,37 @@ func (m *Manager) GetActiveJobs() map[JobID]GenericJob { return m.activeJobs } +// DeleteJob removes a job from the active jobs and the database +func (m *Manager) DeleteJob(id JobID) error { + m.mu.Lock() + defer m.mu.Unlock() + + var job GenericJob + var ok bool + + if job, ok = m.activeJobs[id]; !ok { + return ErrJobNotActive + } + + err := m.removeJobInternal(id) + if err != nil { + return err + } + + if m.dbSaver != nil { + + err := m.dbSaver.DeleteJob(job) + if err != nil { + return err + } + } + + return nil + +} + +// RemoveJob removes a job from the active jobs +// If you want to remove a job from the active jobs and the database, use DeleteJob instead func (m *Manager) RemoveJob(id JobID) error { m.mu.Lock() defer m.mu.Unlock() @@ -115,6 +146,11 @@ func (m *Manager) RemoveJob(id JobID) error { return ErrJobNotActive } + return m.removeJobInternal(id) +} + +func (m *Manager) removeJobInternal(id JobID) error { + scheduler := m.activeJobs[id].GetScheduler() switch scheduler.(type) { @@ -139,7 +175,7 @@ func (m *Manager) RemoveJob(id JobID) error { return err } default: - return fmt.Errorf("Unknown scheduler type") + return ErrUnknownScheduleType } @@ -407,6 +443,76 @@ func (m *Manager) GetLogger() Logger { return m.logger } +// ScheduleJob schedules a job +func (m *Manager) ScheduleJob(job GenericJob, scheduler Scheduler) error { + m.mu.Lock() + defer m.mu.Unlock() + + if scheduler == nil { + return ErrSchedulerNotSet + } + + if m.state != ManagerStateRunning { + return ErrManagerNotRunning + } + + if job.GetScheduler() != nil { + return ErrJobAlreadyScheduled + } + + if _, ok := m.activeJobs[job.GetID()]; ok { + return ErrJobAlreadyActive + } + + job.SetScheduler(scheduler) + err := scheduler.Schedule(job, m.eventBus) + if err != nil { + return err + } + + m.activeJobs[job.GetID()] = job + + if m.dbSaver != nil { + err := m.dbSaver.SaveJob(job) + if err != nil { + return err + } + } + + return nil + +} + +func (m *Manager) CancelJobSchedule(id JobID) error { + m.mu.Lock() + defer m.mu.Unlock() + + if m.state != ManagerStateRunning { + return ErrManagerNotRunning + } + + job, ok := m.activeJobs[id] + if !ok { + return ErrJobNotActive + } + + if job.GetScheduler() == nil { + return ErrJobNotScheduled + } + + scheduler := job.GetScheduler() + + err := scheduler.Cancel(job.GetID()) + if err != nil { + return err + } + + job.SetScheduler(nil) + delete(m.activeJobs, job.GetID()) + + return nil +} + // handleJobEvents handles job events func (m *Manager) handleJobEvents() { @@ -498,65 +604,3 @@ func (m *Manager) handleJobEvents() { } } } - -// ScheduleJob schedules a job -func (m *Manager) ScheduleJob(job GenericJob, scheduler Scheduler) error { - m.mu.Lock() - defer m.mu.Unlock() - - if scheduler == nil { - return ErrSchedulerNotSet - } - - if m.state != ManagerStateRunning { - return ErrManagerNotRunning - } - - if job.GetScheduler() != nil { - return ErrJobAlreadyScheduled - } - - if _, ok := m.activeJobs[job.GetID()]; ok { - return ErrJobAlreadyActive - } - - job.SetScheduler(scheduler) - err := scheduler.Schedule(job, m.eventBus) - if err != nil { - return err - } - - m.activeJobs[job.GetID()] = job - return nil - -} - -func (m *Manager) CancelJobSchedule(id JobID) error { - m.mu.Lock() - defer m.mu.Unlock() - - if m.state != ManagerStateRunning { - return ErrManagerNotRunning - } - - job, ok := m.activeJobs[id] - if !ok { - return ErrJobNotActive - } - - if job.GetScheduler() == nil { - return ErrJobNotScheduled - } - - scheduler := job.GetScheduler() - - err := scheduler.Cancel(job.GetID()) - if err != nil { - return err - } - - job.SetScheduler(nil) - delete(m.activeJobs, job.GetID()) - - return nil -}