Skip to content
Snippets Groups Projects
Verified Commit 7483d879 authored by Volker Schukai's avatar Volker Schukai :alien:
Browse files

chore: tidy code and add doc files #6

parent 00e29577
No related branches found
No related tags found
No related merge requests found
...@@ -10,3 +10,4 @@ devenv.local.nix ...@@ -10,3 +10,4 @@ devenv.local.nix
.pre-commit-config.yaml .pre-commit-config.yaml
smell.go smell.go
/.attach_*
...@@ -3,7 +3,10 @@ ...@@ -3,7 +3,10 @@
<component name="Go" enabled="true" /> <component name="Go" enabled="true" />
<component name="NewModuleRootManager" inherit-compiler-output="true"> <component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output /> <exclude-output />
<content url="file://$MODULE_DIR$" /> <content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/.devenv" />
<excludeFolder url="file://$MODULE_DIR$/.direnv" />
</content>
<orderEntry type="inheritedJdk" /> <orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" /> <orderEntry type="sourceFolder" forTests="false" />
</component> </component>
......
# Contributing to schukai GmbH Projects
## Code of Conduct
Be a human, not an asshole. Common sense and basic human decency apply.
## Getting Started
### Setting up the Project
1. Fork the project on GitLab.
2. Clone your fork locally. Replace `[your-username]` with your GitLab username and `[project-name]` with the actual project name:
```bash
git clone $(git config --get remote.origin.url)
```
3. Add the upstream repository. Replace `[original-username]` and `[project-name]` with the original repository's username and project name:
```bash
git remote add upstream https://gitlab.schukai.com/[original-username]/[project-name].git
```
### Making Changes
1. Create a new branch:
```bash
git checkout -b new-feature-branch
```
2. Make your changes.
3. Commit your changes:
```bash
git commit -m "Description of change"
```
### Submitting a Merge Request
1. Push your changes to your fork:
```bash
git push origin new-feature-branch
```
2. Navigate to the original project repository on `gitlab.schukai.com`.
3. Open a Merge Request and provide a clear description of the changes.
## Coding Guidelines
- Follow the coding style used in the project.
- Write unit tests for new features.
- Ensure that all tests pass before submitting a Merge Request.
## Reporting Issues
If you find an issue, please create a new issue on `gitlab.schukai.com`.
## Additional Resources
- [GitLab Flow](https://docs.gitlab.com/ee/topics/gitlab_flow.html)
- [GitLab Merge Request Guidelines](https://docs.gitlab.com/ee/user/project/merge_requests/)
Thank you for your contribution!
Copyright (C) 2022 by schukai GmbH. Copyright (C) 2023 schukai GmbH
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published it under the terms of the GNU Affero General Public License as published
......
# Job Queues # Job Queues
## Overview
The `jobQueue` library in Go aims to serve as a Cron replacement, enabling the scheduled and event-driven execution of tasks in an organized manner. The `jobQueue` library in Go aims to serve as a Cron replacement, enabling the scheduled and event-driven execution of tasks in an organized manner.
## Library Requirements ## Getting Started
### Prerequisites
- Go 1.20+
```bash
go get -u
```
### Installation
```bash
git clone https://github.com/yourusername/GoJobQueue.git
cd GoJobQueue
go build
```
## Usage
Import the package and create a new queue.
```bash
goCopy codeimport "github.com/yourusername/GoJobQueue"
queue := GoJobQueue.New()
```
Adding a job:
```bash
goCopy codejob := func() {
// Your code here
}
### Core Requirements queue.Add(job)
1. **Job Management Structure**: A structure to catalog all jobs, providing methods to add and remove jobs. ```
2. **YAML Import**: A function to import job definitions from a YAML file. ## API Reference
3. **Job Exclusivity**: Jobs should be either concurrently runnable or exclusive. ### Manager
4. **System Resource Monitoring**: Real-time monitoring of system resources like CPU, memory, etc., consumed by the jobs. The `Manager` is the central orchestrator for job execution, worker assignment, and event handling. It provides methods for managing workers, scheduling jobs, and handling the state of the job queue.
5. **Priority and Distribution**: Each job should have a priority. Jobs should also be evenly distributed across available resources. #### Initializing a Manager
6. **Single and Recurring Jobs**: Support for both one-time and recurring jobs. To create a new `Manager` instance, use the `NewManager` function.
7. **Logging**: Each job should have a log written that includes Process ID, memory usage, start time, end time, and exit code. ```go
mng := jobqueue.NewManager()
```
8. **Priority Escalation**: If a job is not run due to its priority, its priority should escalate over time. #### Manager State
### Additional Requirements A Manager can be in one of two states:
1. **Error Handling**: Mechanisms for effective error handling, especially for failed or terminated jobs. - `ManagerStateStopped`: The manager is stopped.
- `ManagerStateRunning`: The manager is running and actively scheduling jobs.
2. **Notification System**: Optionally, an interface for notifications on job failures or completions. #### Configuration
3. **Resource Limits**: Ability to set resource limits per job. You can configure the manager using various setter methods:
4. **Job Dependencies**: Optional support for jobs that depend on the successful completion of other jobs. - `SetCronInstance(cronInstance *cron.Cron) *Manager` : Set a Cron instance for scheduled jobs.
- `SetDB(db *gorm.DB) *Manager` : Set a Gorm DB instance for database storage.
5. **Testability**: The library should be easy to test, ideally via unit tests. ```go
mng.SetCronInstance(cronInstance).SetDB(db)
6. **Documentation**: Comprehensive API documentation and a guide on how the library works. ```
#### Managing Workers
You can add or remove workers to/from the manager using the following methods:
## Documentation - `AddWorker(worker Worker) error`
- `RemoveWorker(worker Worker) error`
### `JobData` Structure ```go
err := mng.AddWorker(worker)
err = mng.RemoveWorker(worker)
#### Fields ```
- **Id (JobIDType)** #### Starting and Stopping the Manager
- Unique identifier for the job.
- **Priority (int)** The manager can be started or stopped using:
- The priority level of the job in the queue.
- **Exclusive (bool)** - `Start() error`
- Specifies whether the job should be executed exclusively or not. - `Stop() error`
- **MaxRuns (int)** ```go
- The maximum number of times the job should be executed. err := mng.Start()
err = mng.Stop()
- **Concurrency (int)** ```
- The maximum number of concurrent executions of the job.
- **LastRun (time.Time)** #### Scheduling Jobs
- Timestamp for when the job was last executed.
- **NextRun (time.Time)** To schedule a job for execution, use the `ScheduleJob` method.
- Timestamp for the next scheduled execution of the job.
- **Logs ([]JobLog)** ```go
- An array of log entries for the job. err := mng.ScheduleJob(job, scheduler)
- **Schedule (string)** ```
- A cron expression that defines the execution schedule.
- **Status (JobStatus)** #### Canceling Job Schedules
- The current status of the job (e.g., `Running`, `Completed`).
- **Timeout (time.Duration)** To cancel a scheduled job, use `CancelJobSchedule`.
- The maximum duration the job is allowed to run.
- **Retries (int)** ```go
- The number of times the job will be retried if it fails. err := mng.CancelJobSchedule(jobID)
- **RetryDelay (time.Duration)** ```
- The delay before retrying the job if it fails.
- **ResourceLimits (struct)** #### Event Bus
- Resource limits for CPU and Memory.
- **CPULimit (float64)** To get the EventBus instance, use `GetEventBus`.
- CPU limit for the job.
- **MemoryLimit (uint64)** ```go
- Memory limit for the job. eventBus := mng.GetEventBus()
- **Dependencies ([]JobIDType)** ```
- List of job IDs this job depends on.
- **Tags ([]string)** ### Error Handling
- Tags associated with the job for easier categorization.
- **Metadata (map[string]interface{})** Errors returned by the Manager methods should be handled appropriately to ensure smooth operation.
- Metadata for extending the structure with additional information.
- **Stats (JobStats)** ## Tests
- Job-related statistics.
### `Job` Structure Run tests using:
Inherits fields from `JobData` and adds additional functional fields. ```bash
go test ./...
```
#### Fields ## Contributing
- **Runnable (Runnable)** Please read [CONTRIBUTING.md](https://chat.openai.com/c/93eb4e0d-55a7-41a8-81a4-6e40c9d44dc2CONTRIBUTING.md) for the process for submitting pull requests.
- A function or method that defines the action of the job. This is what gets executed.
- **scheduleImpl (cron.Schedule)** ## License
- Internal cron schedule implementation that interprets the `Schedule` string in `JobData`.
- **TelemetryHooks ([]func(*JobLog))** This project is licensed under the AGPL-3.0 License - see the [LICENSE.md](LICENSE.md) file for details.
- An array of functions that will be called for telemetry during the job's execution. These hooks can update a `JobLog` object.
- **Failover (func() error)** ## Contact
- A function that gets called if the primary execution fails for some reason. It's meant for fail over mechanisms.
- **ctx (context.Context)** - schukai GmbH - [schukai.de](https://www.schukai.com)
- A context that can carry deadlines, cancellations, and other request-scoped values across API boundaries and between processes. - [info@schukai.com](mailto:info@schukai.com)
- **mu (sync.Mutex)**
- Mutex for synchronizing access to the job's fields, making the job safe for concurrent use.
...@@ -21,6 +21,7 @@ tasks: ...@@ -21,6 +21,7 @@ tasks:
TEST_BY_TASK: true TEST_BY_TASK: true
cmds: cmds:
- echo "Execute unit tests in Go." - echo "Execute unit tests in Go."
- gosec .
- go test -tags=runOnTask -cover -v ./... - go test -tags=runOnTask -cover -v ./...
- go test -tags=runOnTask -bench -v ./... - go test -tags=runOnTask -bench -v ./...
- go test -tags=runOnTask -race -v ./... - go test -tags=runOnTask -race -v ./...
......
...@@ -3,11 +3,11 @@ ...@@ -3,11 +3,11 @@
"devenv": { "devenv": {
"locked": { "locked": {
"dir": "src/modules", "dir": "src/modules",
"lastModified": 1697058441, "lastModified": 1698243190,
"narHash": "sha256-gjtW+nkM9suMsjyid63HPmt6WZQEvuVqA5cOAf4lLM0=", "narHash": "sha256-n+SbyNQRhUcaZoU00d+7wi17HJpw/kAUrXOL4zRcqE8=",
"owner": "cachix", "owner": "cachix",
"repo": "devenv", "repo": "devenv",
"rev": "55294461a62d90c8626feca22f52b0d3d0e18e39", "rev": "86f476f7edb86159fd20764489ab4e4df6edb4b6",
"type": "github" "type": "github"
}, },
"original": { "original": {
...@@ -74,11 +74,11 @@ ...@@ -74,11 +74,11 @@
}, },
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1697226376, "lastModified": 1698288402,
"narHash": "sha256-cumLLb1QOUtWieUnLGqo+ylNt3+fU8Lcv5Zl+tYbRUE=", "narHash": "sha256-jIIjApPdm+4yt8PglX8pUOexAdEiAax/DXW3S/Mb21E=",
"owner": "nixos", "owner": "nixos",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "898cb2064b6e98b8c5499f37e81adbdf2925f7c5", "rev": "60b9db998f71ea49e1a9c41824d09aa274be1344",
"type": "github" "type": "github"
}, },
"original": { "original": {
...@@ -106,11 +106,11 @@ ...@@ -106,11 +106,11 @@
}, },
"nixpkgs_2": { "nixpkgs_2": {
"locked": { "locked": {
"lastModified": 1697226376, "lastModified": 1698288402,
"narHash": "sha256-cumLLb1QOUtWieUnLGqo+ylNt3+fU8Lcv5Zl+tYbRUE=", "narHash": "sha256-jIIjApPdm+4yt8PglX8pUOexAdEiAax/DXW3S/Mb21E=",
"owner": "NixOS", "owner": "NixOS",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "898cb2064b6e98b8c5499f37e81adbdf2925f7c5", "rev": "60b9db998f71ea49e1a9c41824d09aa274be1344",
"type": "github" "type": "github"
}, },
"original": { "original": {
...@@ -130,11 +130,11 @@ ...@@ -130,11 +130,11 @@
"nixpkgs-stable": "nixpkgs-stable" "nixpkgs-stable": "nixpkgs-stable"
}, },
"locked": { "locked": {
"lastModified": 1696846637, "lastModified": 1698227354,
"narHash": "sha256-0hv4kbXxci2+pxhuXlVgftj/Jq79VSmtAyvfabCCtYk=", "narHash": "sha256-Fi5H9jbaQLmLw9qBi/mkR33CoFjNbobo5xWdX4tKz1Q=",
"owner": "cachix", "owner": "cachix",
"repo": "pre-commit-hooks.nix", "repo": "pre-commit-hooks.nix",
"rev": "42e1b6095ef80a51f79595d9951eb38e91c4e6ca", "rev": "bd38df3d508dfcdff52cd243d297f218ed2257bf",
"type": "github" "type": "github"
}, },
"original": { "original": {
......
{ pkgs, inputs, phps, lib, config, modulesPath, ... }: { pkgs ? import <nixpkgs> {}, inputs, phps, lib, config, modulesPath, ... }:
{ {
# https://devenv.sh/packages/ # https://devenv.sh/packages/
...@@ -235,6 +235,84 @@ EOF ...@@ -235,6 +235,84 @@ EOF
enterShell = '' enterShell = ''
cat <<'EOF' > CONTRIBUTING.md
# Contributing to schukai GmbH Projects
## Code of Conduct
Be a human, not an asshole. Common sense and basic human decency apply.
## Getting Started
### Setting up the Project
1. Fork the project on GitLab.
2. Clone your fork locally. Replace `[your-username]` with your GitLab username and `[project-name]` with the actual project name:
```bash
git clone $(git config --get remote.origin.url)
```
3. Add the upstream repository. Replace `[original-username]` and `[project-name]` with the original repository's username and project name:
```bash
git remote add upstream https://gitlab.schukai.com/[original-username]/[project-name].git
```
### Making Changes
1. Create a new branch:
```bash
git checkout -b new-feature-branch
```
2. Make your changes.
3. Commit your changes:
```bash
git commit -m "Description of change"
```
### Submitting a Merge Request
1. Push your changes to your fork:
```bash
git push origin new-feature-branch
```
2. Navigate to the original project repository on `gitlab.schukai.com`.
3. Open a Merge Request and provide a clear description of the changes.
## Coding Guidelines
- Follow the coding style used in the project.
- Write unit tests for new features.
- Ensure that all tests pass before submitting a Merge Request.
## Reporting Issues
If you find an issue, please create a new issue on `gitlab.schukai.com`.
## Additional Resources
- [GitLab Flow](https://docs.gitlab.com/ee/topics/gitlab_flow.html)
- [GitLab Merge Request Guidelines](https://docs.gitlab.com/ee/user/project/merge_requests/)
Thank you for your contribution!
EOF
cat <<'EOF' > LICENSE
Copyright (C) 2023 schukai GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
EOF
cat <<'EOF' > Taskfile.yml cat <<'EOF' > Taskfile.yml
# THIS FILE IS AUTOGENERATED BY THE DEVENVSHELL # THIS FILE IS AUTOGENERATED BY THE DEVENVSHELL
...@@ -259,6 +337,7 @@ tasks: ...@@ -259,6 +337,7 @@ tasks:
TEST_BY_TASK: true TEST_BY_TASK: true
cmds: cmds:
- echo "Execute unit tests in Go." - echo "Execute unit tests in Go."
- gosec .
- go test -tags=runOnTask -cover -v ./... - go test -tags=runOnTask -cover -v ./...
- go test -tags=runOnTask -bench -v ./... - go test -tags=runOnTask -bench -v ./...
- go test -tags=runOnTask -race -v ./... - go test -tags=runOnTask -race -v ./...
...@@ -370,8 +449,6 @@ deploy: ...@@ -370,8 +449,6 @@ deploy:
EOF EOF
''; '';
scripts.do-git-commit.exec = '' scripts.do-git-commit.exec = ''
......
package jobqueue
import (
"github.com/robfig/cron/v3"
"go.uber.org/zap"
"strings"
"time"
)
// ExampleManager shows how to use the manager
func ExampleManager() {
testData := []byte(`
- id: job1
priority: 1
timeout: 1s
maxRetries: 3
retryDelay: 1s
runnable:
type: shell
data:
script: echo "~1~ $(date "+%M:%S")" >> /tmp/job1.log
scheduler:
type: cron
spec: "*/10 * * * * *"
`)
var err error
cronInstance := cron.New(cron.WithSeconds())
cronInstance.Start()
zapLogger, _ := zap.NewDevelopment()
_ = zapLogger
manager := NewManager()
manager.SetLogger(&ZapAdapter{logger: zapLogger})
manager.SetCronInstance(cronInstance)
worker := NewLocalWorker(10)
worker.SetManager(manager)
err = manager.AddWorker(worker)
err = manager.Start()
if err != nil {
panic(err)
}
reader := strings.NewReader(string(testData))
err = ImportJobsAndSchedule(reader, "yaml", manager)
if err != nil {
panic(err)
}
time.Sleep(30 * time.Second)
}
...@@ -23,7 +23,6 @@ type Manager struct { ...@@ -23,7 +23,6 @@ type Manager struct {
eventBus *EventBus eventBus *EventBus
activeJobs map[JobID]GenericJob activeJobs map[JobID]GenericJob
//scheduled map[JobID]Scheduler
jobEventCh chan interface{} jobEventCh chan interface{}
...@@ -47,11 +46,8 @@ func NewManager() *Manager { ...@@ -47,11 +46,8 @@ func NewManager() *Manager {
workerMap: make(map[WorkerID]Worker), workerMap: make(map[WorkerID]Worker),
eventBus: eventBus, eventBus: eventBus,
activeJobs: make(map[JobID]GenericJob), activeJobs: make(map[JobID]GenericJob),
//scheduled: make(map[JobID]Scheduler),
//dbSaver: NewDBSaver(),
} }
//mng.dbSaver.SetManager(mng)
return mng return mng
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment