Skip to content
Snippets Groups Projects
Verified Commit e0d03ed3 authored by Volker Schukai's avatar Volker Schukai :alien:
Browse files

feat: add delete job to manager #16

parent 027a52fb
No related branches found
No related tags found
No related merge requests found
......@@ -2,7 +2,14 @@
## Overview
The `jobQueue` library in Go aims to serve as a Cron replacement, enabling the scheduled and event-driven execution of tasks in an organized manner.
These `jobQueue` library written in Go provides a simple interface for managing jobs and workers.
It is designed to be used in a distributed environment where multiple workers can be assigned to a job
queue. The library provides a `Manager` that can be used to manage workers, schedule jobs, and handle
the state of the job queue.
Jobs can also be persisted to a database using Gorm.
The library also provides a `Cron` instance for scheduling jobs.
## Getting Started
......@@ -10,143 +17,48 @@ The `jobQueue` library in Go aims to serve as a Cron replacement, enabling the s
- Go 1.20+
```bash
go get -u
```
### Installation
```bash
git clone https://github.com/yourusername/GoJobQueue.git
cd GoJobQueue
go build
go get -u gitlab.schukai.com/oss/libraries/go/services/job-queues
```
## Usage
Import the package and create a new queue.
```bash
goCopy codeimport "github.com/yourusername/GoJobQueue"
queue := GoJobQueue.New()
```
Adding a job:
```bash
goCopy codejob := func() {
// Your code here
}
queue.Add(job)
```
## API Reference
### Manager
The `Manager` is the central orchestrator for job execution, worker assignment, and event handling. It provides methods for managing workers, scheduling jobs, and handling the state of the job queue.
#### Initializing a Manager
To create a new `Manager` instance, use the `NewManager` function.
```go
mng := jobqueue.NewManager()
```
import "gitlab.schukai.com/oss/libraries/go/services/job-queues"
#### Manager State
A Manager can be in one of two states:
- `ManagerStateStopped`: The manager is stopped.
- `ManagerStateRunning`: The manager is running and actively scheduling jobs.
#### Configuration
You can configure the manager using various setter methods:
- `SetCronInstance(cronInstance *cron.Cron) *Manager` : Set a Cron instance for scheduled jobs.
- `SetDB(db *gorm.DB) *Manager` : Set a Gorm DB instance for database storage.
```go
mng.SetCronInstance(cronInstance).SetDB(db)
```
func main() {
var err error
#### Managing Workers
m := jobqueue.NewManager()
w := jobqueue.NewWorker("worker1")
_ = m.AddWorker(w)
You can add or remove workers to/from the manager using the following methods:
_ = m.Start()
- `AddWorker(worker Worker) error`
- `RemoveWorker(worker Worker) error`
```go
err := mng.AddWorker(worker)
err = mng.RemoveWorker(worker)
```
#### Starting and Stopping the Manager
The manager can be started or stopped using:
- `Start() error`
- `Stop() error`
```go
err := mng.Start()
err = mng.Stop()
```
#### Scheduling Jobs
To schedule a job for execution, use the `ScheduleJob` method.
```go
err := mng.ScheduleJob(job, scheduler)
```
#### Canceling Job Schedules
To cancel a scheduled job, use `CancelJobSchedule`.
```go
err := mng.CancelJobSchedule(jobID)
```
#### Event Bus
job := jobqueue.Job{
ID: "job1",
}
scheduler := jobqueue.InstantScheduler{}
To get the EventBus instance, use `GetEventBus`.
_ = m.ScheduleJob(job, &scheduler)
```go
eventBus := mng.GetEventBus()
}
```
### Error Handling
Errors returned by the Manager methods should be handled appropriately to ensure smooth operation.
## Tests
Run tests using:
```bash
go test ./...
task test
```
## Contributing
Please read [CONTRIBUTING.md](https://chat.openai.com/c/93eb4e0d-55a7-41a8-81a4-6e40c9d44dc2CONTRIBUTING.md) for the process for submitting pull requests.
## License
This project is licensed under the AGPL-3.0 License - see the [LICENSE.md](LICENSE.md) file for details.
......
......@@ -170,6 +170,63 @@ func (s *DBSaver) SaveJob(job GenericJob) error {
return ErrDBSaverNotRunning
}
defer func() {
if r := recover(); r != nil {
s.logError("Error while saving job", "error", r)
}
}()
s.saveChannel <- job
return nil
}
func checkRunningSaver(s *DBSaver) (*gorm.DB, error) {
if s.manager == nil {
return nil, ErrNoManager
}
if s.manager.database == nil {
return nil, ErrNoDatabaseConnection
}
if !s.isStatus(DBSaverStatusRunning, false) {
return nil, ErrDBSaverNotRunning
}
return s.manager.database, nil
}
// DeleteJob deletes a job from the database
func (s *DBSaver) DeleteJob(job GenericJob) error {
s.mu.Lock()
defer s.mu.Unlock()
var db *gorm.DB
var err error
if db, err = checkRunningSaver(s); err != nil {
return err
}
return db.Transaction(func(tx *gorm.DB) error {
permJob := job.GetPersistence()
dbErr := tx.Where("job_id = ?", permJob.GetID()).Delete(&JobLog{}).Error
if dbErr != nil {
return dbErr
}
dbErr = tx.Where("job_id = ?", permJob.GetID()).Delete(&JobStats{}).Error
if dbErr != nil {
return dbErr
}
dbErr = tx.Delete(&permJob).Error
if dbErr != nil {
return dbErr
}
return nil
})
}
......@@ -41,4 +41,6 @@ var (
ErrJobNotActive = fmt.Errorf("job is not active")
ErrJobAlreadyActive = fmt.Errorf("job is already active")
ErrChannelAlreadyClosed = fmt.Errorf("channel is already closed")
ErrUnknownScheduleType = fmt.Errorf("unknown schedule type")
ErrNoManager = fmt.Errorf("no manager")
)
......@@ -107,6 +107,37 @@ func (m *Manager) GetActiveJobs() map[JobID]GenericJob {
return m.activeJobs
}
// DeleteJob removes a job from the active jobs and the database
func (m *Manager) DeleteJob(id JobID) error {
m.mu.Lock()
defer m.mu.Unlock()
var job GenericJob
var ok bool
if job, ok = m.activeJobs[id]; !ok {
return ErrJobNotActive
}
err := m.removeJobInternal(id)
if err != nil {
return err
}
if m.dbSaver != nil {
err := m.dbSaver.DeleteJob(job)
if err != nil {
return err
}
}
return nil
}
// RemoveJob removes a job from the active jobs
// If you want to remove a job from the active jobs and the database, use DeleteJob instead
func (m *Manager) RemoveJob(id JobID) error {
m.mu.Lock()
defer m.mu.Unlock()
......@@ -115,6 +146,11 @@ func (m *Manager) RemoveJob(id JobID) error {
return ErrJobNotActive
}
return m.removeJobInternal(id)
}
func (m *Manager) removeJobInternal(id JobID) error {
scheduler := m.activeJobs[id].GetScheduler()
switch scheduler.(type) {
......@@ -139,7 +175,7 @@ func (m *Manager) RemoveJob(id JobID) error {
return err
}
default:
return fmt.Errorf("Unknown scheduler type")
return ErrUnknownScheduleType
}
......@@ -407,6 +443,76 @@ func (m *Manager) GetLogger() Logger {
return m.logger
}
// ScheduleJob schedules a job
func (m *Manager) ScheduleJob(job GenericJob, scheduler Scheduler) error {
m.mu.Lock()
defer m.mu.Unlock()
if scheduler == nil {
return ErrSchedulerNotSet
}
if m.state != ManagerStateRunning {
return ErrManagerNotRunning
}
if job.GetScheduler() != nil {
return ErrJobAlreadyScheduled
}
if _, ok := m.activeJobs[job.GetID()]; ok {
return ErrJobAlreadyActive
}
job.SetScheduler(scheduler)
err := scheduler.Schedule(job, m.eventBus)
if err != nil {
return err
}
m.activeJobs[job.GetID()] = job
if m.dbSaver != nil {
err := m.dbSaver.SaveJob(job)
if err != nil {
return err
}
}
return nil
}
func (m *Manager) CancelJobSchedule(id JobID) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.state != ManagerStateRunning {
return ErrManagerNotRunning
}
job, ok := m.activeJobs[id]
if !ok {
return ErrJobNotActive
}
if job.GetScheduler() == nil {
return ErrJobNotScheduled
}
scheduler := job.GetScheduler()
err := scheduler.Cancel(job.GetID())
if err != nil {
return err
}
job.SetScheduler(nil)
delete(m.activeJobs, job.GetID())
return nil
}
// handleJobEvents handles job events
func (m *Manager) handleJobEvents() {
......@@ -498,65 +604,3 @@ func (m *Manager) handleJobEvents() {
}
}
}
// ScheduleJob schedules a job
func (m *Manager) ScheduleJob(job GenericJob, scheduler Scheduler) error {
m.mu.Lock()
defer m.mu.Unlock()
if scheduler == nil {
return ErrSchedulerNotSet
}
if m.state != ManagerStateRunning {
return ErrManagerNotRunning
}
if job.GetScheduler() != nil {
return ErrJobAlreadyScheduled
}
if _, ok := m.activeJobs[job.GetID()]; ok {
return ErrJobAlreadyActive
}
job.SetScheduler(scheduler)
err := scheduler.Schedule(job, m.eventBus)
if err != nil {
return err
}
m.activeJobs[job.GetID()] = job
return nil
}
func (m *Manager) CancelJobSchedule(id JobID) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.state != ManagerStateRunning {
return ErrManagerNotRunning
}
job, ok := m.activeJobs[id]
if !ok {
return ErrJobNotActive
}
if job.GetScheduler() == nil {
return ErrJobNotScheduled
}
scheduler := job.GetScheduler()
err := scheduler.Cancel(job.GetID())
if err != nil {
return err
}
job.SetScheduler(nil)
delete(m.activeJobs, job.GetID())
return nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment