diff --git a/.gitignore b/.gitignore index 7a3019331efd9ceee783133ff4eced3b5116f718..3f921d231b45aef7a4570d0132bc7477199d5ad8 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ devenv.local.nix .pre-commit-config.yaml smell.go +/.attach_* diff --git a/.idea/job-queues.iml b/.idea/job-queues.iml index 25ed3f6e7b6e344b6ca91ebcc5d005f35357f9cf..9b4b74933ef63213f3b3ee6492b3da131df6c11b 100644 --- a/.idea/job-queues.iml +++ b/.idea/job-queues.iml @@ -3,7 +3,10 @@ <component name="Go" enabled="true" /> <component name="NewModuleRootManager" inherit-compiler-output="true"> <exclude-output /> - <content url="file://$MODULE_DIR$" /> + <content url="file://$MODULE_DIR$"> + <excludeFolder url="file://$MODULE_DIR$/.devenv" /> + <excludeFolder url="file://$MODULE_DIR$/.direnv" /> + </content> <orderEntry type="inheritedJdk" /> <orderEntry type="sourceFolder" forTests="false" /> </component> diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000000000000000000000000000000000000..2713a857d50d9b7ea2e90606657b251583e4bca7 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,58 @@ +# Contributing to schukai GmbH Projects + +## Code of Conduct + +Be a human, not an asshole. Common sense and basic human decency apply. + +## Getting Started + +### Setting up the Project + +1. Fork the project on GitLab. +2. Clone your fork locally. Replace `[your-username]` with your GitLab username and `[project-name]` with the actual project name: + ```bash + git clone $(git config --get remote.origin.url) + ``` +3. Add the upstream repository. Replace `[original-username]` and `[project-name]` with the original repository's username and project name: + ```bash + git remote add upstream https://gitlab.schukai.com/[original-username]/[project-name].git + ``` + +### Making Changes + +1. Create a new branch: + ```bash + git checkout -b new-feature-branch + ``` +2. Make your changes. +3. Commit your changes: + ```bash + git commit -m "Description of change" + ``` + +### Submitting a Merge Request + +1. Push your changes to your fork: + ```bash + git push origin new-feature-branch + ``` +2. Navigate to the original project repository on `gitlab.schukai.com`. +3. Open a Merge Request and provide a clear description of the changes. + +## Coding Guidelines + +- Follow the coding style used in the project. +- Write unit tests for new features. +- Ensure that all tests pass before submitting a Merge Request. + +## Reporting Issues + +If you find an issue, please create a new issue on `gitlab.schukai.com`. + +## Additional Resources + +- [GitLab Flow](https://docs.gitlab.com/ee/topics/gitlab_flow.html) +- [GitLab Merge Request Guidelines](https://docs.gitlab.com/ee/user/project/merge_requests/) + +Thank you for your contribution! + diff --git a/LICENSE b/LICENSE index 8aa3080082f117124214e2d4c96b5f14d6fbf54c..5694d302432e91a1be35bb39ea0a4bad0fe1cc17 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright (C) 2022 by schukai GmbH. +Copyright (C) 2023 schukai GmbH This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published @@ -10,5 +10,5 @@ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License -along with this program. If not, see <https://www.gnu.org/licenses/>. - +along with this program. If not, see <https://www.gnu.org/licenses/>. + diff --git a/README.md b/README.md index 728356c3551769cb2d2b084141df68b35ff40bd6..381dacef4ed34ab98d52c8eac6b4099faf8a6d5c 100644 --- a/README.md +++ b/README.md @@ -1,130 +1,159 @@ # Job Queues +## Overview + The `jobQueue` library in Go aims to serve as a Cron replacement, enabling the scheduled and event-driven execution of tasks in an organized manner. -## Library Requirements +## Getting Started + +### Prerequisites + +- Go 1.20+ + +```bash +go get -u +``` + +### Installation + +```bash +git clone https://github.com/yourusername/GoJobQueue.git +cd GoJobQueue +go build + +``` + +## Usage + +Import the package and create a new queue. + +```bash +goCopy codeimport "github.com/yourusername/GoJobQueue" + +queue := GoJobQueue.New() + +``` + +Adding a job: + +```bash +goCopy codejob := func() { + // Your code here +} -### Core Requirements +queue.Add(job) -1. **Job Management Structure**: A structure to catalog all jobs, providing methods to add and remove jobs. +``` -2. **YAML Import**: A function to import job definitions from a YAML file. +## API Reference -3. **Job Exclusivity**: Jobs should be either concurrently runnable or exclusive. +### Manager -4. **System Resource Monitoring**: Real-time monitoring of system resources like CPU, memory, etc., consumed by the jobs. +The `Manager` is the central orchestrator for job execution, worker assignment, and event handling. It provides methods for managing workers, scheduling jobs, and handling the state of the job queue. -5. **Priority and Distribution**: Each job should have a priority. Jobs should also be evenly distributed across available resources. +#### Initializing a Manager -6. **Single and Recurring Jobs**: Support for both one-time and recurring jobs. +To create a new `Manager` instance, use the `NewManager` function. -7. **Logging**: Each job should have a log written that includes Process ID, memory usage, start time, end time, and exit code. +```go +mng := jobqueue.NewManager() +``` -8. **Priority Escalation**: If a job is not run due to its priority, its priority should escalate over time. +#### Manager State -### Additional Requirements +A Manager can be in one of two states: -1. **Error Handling**: Mechanisms for effective error handling, especially for failed or terminated jobs. +- `ManagerStateStopped`: The manager is stopped. +- `ManagerStateRunning`: The manager is running and actively scheduling jobs. -2. **Notification System**: Optionally, an interface for notifications on job failures or completions. +#### Configuration -3. **Resource Limits**: Ability to set resource limits per job. +You can configure the manager using various setter methods: -4. **Job Dependencies**: Optional support for jobs that depend on the successful completion of other jobs. +- `SetCronInstance(cronInstance *cron.Cron) *Manager` : Set a Cron instance for scheduled jobs. +- `SetDB(db *gorm.DB) *Manager` : Set a Gorm DB instance for database storage. -5. **Testability**: The library should be easy to test, ideally via unit tests. +```go +mng.SetCronInstance(cronInstance).SetDB(db) -6. **Documentation**: Comprehensive API documentation and a guide on how the library works. +``` +#### Managing Workers +You can add or remove workers to/from the manager using the following methods: -## Documentation +- `AddWorker(worker Worker) error` +- `RemoveWorker(worker Worker) error` -### `JobData` Structure +```go +err := mng.AddWorker(worker) +err = mng.RemoveWorker(worker) -#### Fields +``` -- **Id (JobIDType)** - - Unique identifier for the job. +#### Starting and Stopping the Manager -- **Priority (int)** - - The priority level of the job in the queue. +The manager can be started or stopped using: -- **Exclusive (bool)** - - Specifies whether the job should be executed exclusively or not. +- `Start() error` +- `Stop() error` -- **MaxRuns (int)** - - The maximum number of times the job should be executed. +```go +err := mng.Start() +err = mng.Stop() -- **Concurrency (int)** - - The maximum number of concurrent executions of the job. +``` -- **LastRun (time.Time)** - - Timestamp for when the job was last executed. +#### Scheduling Jobs -- **NextRun (time.Time)** - - Timestamp for the next scheduled execution of the job. +To schedule a job for execution, use the `ScheduleJob` method. -- **Logs ([]JobLog)** - - An array of log entries for the job. +```go +err := mng.ScheduleJob(job, scheduler) -- **Schedule (string)** - - A cron expression that defines the execution schedule. +``` -- **Status (JobStatus)** - - The current status of the job (e.g., `Running`, `Completed`). +#### Canceling Job Schedules -- **Timeout (time.Duration)** - - The maximum duration the job is allowed to run. +To cancel a scheduled job, use `CancelJobSchedule`. -- **Retries (int)** - - The number of times the job will be retried if it fails. +```go +err := mng.CancelJobSchedule(jobID) -- **RetryDelay (time.Duration)** - - The delay before retrying the job if it fails. +``` -- **ResourceLimits (struct)** - - Resource limits for CPU and Memory. +#### Event Bus - - **CPULimit (float64)** - - CPU limit for the job. +To get the EventBus instance, use `GetEventBus`. - - **MemoryLimit (uint64)** - - Memory limit for the job. +```go +eventBus := mng.GetEventBus() -- **Dependencies ([]JobIDType)** - - List of job IDs this job depends on. +``` -- **Tags ([]string)** - - Tags associated with the job for easier categorization. +### Error Handling -- **Metadata (map[string]interface{})** - - Metadata for extending the structure with additional information. +Errors returned by the Manager methods should be handled appropriately to ensure smooth operation. -- **Stats (JobStats)** - - Job-related statistics. +## Tests -### `Job` Structure +Run tests using: -Inherits fields from `JobData` and adds additional functional fields. +```bash +go test ./... +``` -#### Fields +## Contributing -- **Runnable (Runnable)** - - A function or method that defines the action of the job. This is what gets executed. +Please read [CONTRIBUTING.md](https://chat.openai.com/c/93eb4e0d-55a7-41a8-81a4-6e40c9d44dc2CONTRIBUTING.md) for the process for submitting pull requests. -- **scheduleImpl (cron.Schedule)** - - Internal cron schedule implementation that interprets the `Schedule` string in `JobData`. +## License -- **TelemetryHooks ([]func(*JobLog))** - - An array of functions that will be called for telemetry during the job's execution. These hooks can update a `JobLog` object. +This project is licensed under the AGPL-3.0 License - see the [LICENSE.md](LICENSE.md) file for details. -- **Failover (func() error)** - - A function that gets called if the primary execution fails for some reason. It's meant for fail over mechanisms. +## Contact -- **ctx (context.Context)** - - A context that can carry deadlines, cancellations, and other request-scoped values across API boundaries and between processes. +- schukai GmbH - [schukai.de](https://www.schukai.com) +- [info@schukai.com](mailto:info@schukai.com) -- **mu (sync.Mutex)** - - Mutex for synchronizing access to the job's fields, making the job safe for concurrent use. diff --git a/Taskfile.yml b/Taskfile.yml index f3aa11124c8bd4d5bed9f931bb155c694c1aba99..b48a18a1f7c7a047e8f4dde1a7d02a53d8b6fcd4 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -10,7 +10,7 @@ version: '3' tasks: - default: + default: cmds: - task --list silent: true @@ -21,6 +21,7 @@ tasks: TEST_BY_TASK: true cmds: - echo "Execute unit tests in Go." + - gosec . - go test -tags=runOnTask -cover -v ./... - go test -tags=runOnTask -bench -v ./... - go test -tags=runOnTask -race -v ./... diff --git a/devenv.lock b/devenv.lock index 3f091a6bdecc910ed160fa01f4a719414160d07a..a9b3b229863f84225ad9b061845a54d8a07b7718 100644 --- a/devenv.lock +++ b/devenv.lock @@ -3,11 +3,11 @@ "devenv": { "locked": { "dir": "src/modules", - "lastModified": 1697058441, - "narHash": "sha256-gjtW+nkM9suMsjyid63HPmt6WZQEvuVqA5cOAf4lLM0=", + "lastModified": 1698243190, + "narHash": "sha256-n+SbyNQRhUcaZoU00d+7wi17HJpw/kAUrXOL4zRcqE8=", "owner": "cachix", "repo": "devenv", - "rev": "55294461a62d90c8626feca22f52b0d3d0e18e39", + "rev": "86f476f7edb86159fd20764489ab4e4df6edb4b6", "type": "github" }, "original": { @@ -74,11 +74,11 @@ }, "nixpkgs": { "locked": { - "lastModified": 1697226376, - "narHash": "sha256-cumLLb1QOUtWieUnLGqo+ylNt3+fU8Lcv5Zl+tYbRUE=", + "lastModified": 1698288402, + "narHash": "sha256-jIIjApPdm+4yt8PglX8pUOexAdEiAax/DXW3S/Mb21E=", "owner": "nixos", "repo": "nixpkgs", - "rev": "898cb2064b6e98b8c5499f37e81adbdf2925f7c5", + "rev": "60b9db998f71ea49e1a9c41824d09aa274be1344", "type": "github" }, "original": { @@ -106,11 +106,11 @@ }, "nixpkgs_2": { "locked": { - "lastModified": 1697226376, - "narHash": "sha256-cumLLb1QOUtWieUnLGqo+ylNt3+fU8Lcv5Zl+tYbRUE=", + "lastModified": 1698288402, + "narHash": "sha256-jIIjApPdm+4yt8PglX8pUOexAdEiAax/DXW3S/Mb21E=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "898cb2064b6e98b8c5499f37e81adbdf2925f7c5", + "rev": "60b9db998f71ea49e1a9c41824d09aa274be1344", "type": "github" }, "original": { @@ -130,11 +130,11 @@ "nixpkgs-stable": "nixpkgs-stable" }, "locked": { - "lastModified": 1696846637, - "narHash": "sha256-0hv4kbXxci2+pxhuXlVgftj/Jq79VSmtAyvfabCCtYk=", + "lastModified": 1698227354, + "narHash": "sha256-Fi5H9jbaQLmLw9qBi/mkR33CoFjNbobo5xWdX4tKz1Q=", "owner": "cachix", "repo": "pre-commit-hooks.nix", - "rev": "42e1b6095ef80a51f79595d9951eb38e91c4e6ca", + "rev": "bd38df3d508dfcdff52cd243d297f218ed2257bf", "type": "github" }, "original": { diff --git a/devenv.nix b/devenv.nix index 7d8af9a71bae8350aad9fef10026123207be46da..e55f2ceea75c6f150c6edf3be02261cf1a2657a4 100644 --- a/devenv.nix +++ b/devenv.nix @@ -1,4 +1,4 @@ -{ pkgs, inputs, phps, lib, config, modulesPath, ... }: +{ pkgs ? import <nixpkgs> {}, inputs, phps, lib, config, modulesPath, ... }: { # https://devenv.sh/packages/ @@ -234,8 +234,86 @@ EOF ''; enterShell = '' - - cat <<'EOF' > Taskfile.yml + +cat <<'EOF' > CONTRIBUTING.md +# Contributing to schukai GmbH Projects + +## Code of Conduct + +Be a human, not an asshole. Common sense and basic human decency apply. + +## Getting Started + +### Setting up the Project + +1. Fork the project on GitLab. +2. Clone your fork locally. Replace `[your-username]` with your GitLab username and `[project-name]` with the actual project name: + ```bash + git clone $(git config --get remote.origin.url) + ``` +3. Add the upstream repository. Replace `[original-username]` and `[project-name]` with the original repository's username and project name: + ```bash + git remote add upstream https://gitlab.schukai.com/[original-username]/[project-name].git + ``` + +### Making Changes + +1. Create a new branch: + ```bash + git checkout -b new-feature-branch + ``` +2. Make your changes. +3. Commit your changes: + ```bash + git commit -m "Description of change" + ``` + +### Submitting a Merge Request + +1. Push your changes to your fork: + ```bash + git push origin new-feature-branch + ``` +2. Navigate to the original project repository on `gitlab.schukai.com`. +3. Open a Merge Request and provide a clear description of the changes. + +## Coding Guidelines + +- Follow the coding style used in the project. +- Write unit tests for new features. +- Ensure that all tests pass before submitting a Merge Request. + +## Reporting Issues + +If you find an issue, please create a new issue on `gitlab.schukai.com`. + +## Additional Resources + +- [GitLab Flow](https://docs.gitlab.com/ee/topics/gitlab_flow.html) +- [GitLab Merge Request Guidelines](https://docs.gitlab.com/ee/user/project/merge_requests/) + +Thank you for your contribution! + +EOF + +cat <<'EOF' > LICENSE +Copyright (C) 2023 schukai GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published +by the Free Software Foundation, either version 3 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see <https://www.gnu.org/licenses/>. + +EOF + +cat <<'EOF' > Taskfile.yml # THIS FILE IS AUTOGENERATED BY THE DEVENVSHELL # DO NOT EDIT THIS FILE MANUALLY @@ -248,7 +326,7 @@ EOF version: '3' tasks: - default: + default: cmds: - task --list silent: true @@ -259,6 +337,7 @@ tasks: TEST_BY_TASK: true cmds: - echo "Execute unit tests in Go." + - gosec . - go test -tags=runOnTask -cover -v ./... - go test -tags=runOnTask -bench -v ./... - go test -tags=runOnTask -race -v ./... @@ -301,7 +380,7 @@ tasks: - do-git-commit EOF - cat <<'EOF' > .gitlab-ci.yml +cat <<'EOF' > .gitlab-ci.yml # THIS FILE IS AUTOGENERATED BY THE DEVENVSHELL # DO NOT EDIT THIS FILE MANUALLY @@ -369,8 +448,6 @@ deploy: - /nix/store EOF - - ''; diff --git a/example_test.go b/example_test.go new file mode 100644 index 0000000000000000000000000000000000000000..d0a9b760a8e4d3dc32de6b2ee008dda1a845964d --- /dev/null +++ b/example_test.go @@ -0,0 +1,58 @@ +package jobqueue + +import ( + "github.com/robfig/cron/v3" + "go.uber.org/zap" + "strings" + "time" +) + +// ExampleManager shows how to use the manager +func ExampleManager() { + + testData := []byte(` +- id: job1 + priority: 1 + timeout: 1s + maxRetries: 3 + retryDelay: 1s + runnable: + type: shell + data: + script: echo "~1~ $(date "+%M:%S")" >> /tmp/job1.log + scheduler: + type: cron + spec: "*/10 * * * * *" +`) + + var err error + + cronInstance := cron.New(cron.WithSeconds()) + cronInstance.Start() + + zapLogger, _ := zap.NewDevelopment() + _ = zapLogger + + manager := NewManager() + manager.SetLogger(&ZapAdapter{logger: zapLogger}) + + manager.SetCronInstance(cronInstance) + worker := NewLocalWorker(10) + worker.SetManager(manager) + err = manager.AddWorker(worker) + + err = manager.Start() + if err != nil { + panic(err) + } + + reader := strings.NewReader(string(testData)) + + err = ImportJobsAndSchedule(reader, "yaml", manager) + if err != nil { + panic(err) + } + + time.Sleep(30 * time.Second) + +} diff --git a/manager.go b/manager.go index 6d3bc66e4fc045a307c02840b36d3796ed2d3ca1..6b400de285c2c5f8e481a6ea82cd09db18ba32be 100644 --- a/manager.go +++ b/manager.go @@ -23,7 +23,6 @@ type Manager struct { eventBus *EventBus activeJobs map[JobID]GenericJob - //scheduled map[JobID]Scheduler jobEventCh chan interface{} @@ -47,11 +46,8 @@ func NewManager() *Manager { workerMap: make(map[WorkerID]Worker), eventBus: eventBus, activeJobs: make(map[JobID]GenericJob), - //scheduled: make(map[JobID]Scheduler), - //dbSaver: NewDBSaver(), } - //mng.dbSaver.SetManager(mng) return mng }