diff --git a/.envrc b/.envrc new file mode 100644 index 0000000000000000000000000000000000000000..6de8a8acd7e0568480ae14eb70819c9036a8666a --- /dev/null +++ b/.envrc @@ -0,0 +1,3 @@ +source_url "https://raw.githubusercontent.com/cachix/devenv/d1f7b48e35e6dee421cfd0f51481d17f77586997/direnvrc" "sha256-YBzqskFZxmNb3kYVoKD9ZixoPXJh1C9ZvTLGFRkauZ0=" + +use devenv \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..7a3019331efd9ceee783133ff4eced3b5116f718 --- /dev/null +++ b/.gitignore @@ -0,0 +1,12 @@ + +# Devenv +.devenv* +devenv.local.nix + +# direnv +.direnv + +# pre-commit +.pre-commit-config.yaml + +smell.go diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml new file mode 100644 index 0000000000000000000000000000000000000000..43392807c3d88aed530af1314dbec75879e5257c --- /dev/null +++ b/.gitlab-ci.yml @@ -0,0 +1,69 @@ + +# THIS FILE IS AUTOGENERATED BY THE DEVENVSHELL +# DO NOT EDIT THIS FILE MANUALLY +# INSTEAD EDIT THE DEVENVSHELL CONFIGURATION FILE devenv.nix +# AND OPEN A SHELL WITH THE COMMAND devenv shell +# + +image: docker-registry.schukai.com:443/nixos-ci-devenv:latest + +services: + - docker:dind + +variables: + # The repo name as used in + # https://github.com/nix-community/NUR/blob/master/repos.json + NIXOS_VERSION: "23.05" + NIXPKGS_ALLOW_UNFREE: "1" + NIXPKGS_ALLOW_INSECURE: "1" + DOCKER_DRIVER: overlay2 + GIT_DEPTH: 10 + +stages: + - test + - deploy + +before_script: + - nix shell nixpkgs#coreutils-full -c mkdir -p /certs/client/ + - nix shell nixpkgs#coreutils-full -c ln -fs /etc/ssl/certs/ca-bundle.crt /certs/client/ca.pem + - echo > .env-gitlab-ci + - variables=("HOME=$HOME" "CI_COMMIT_REF_NAME=$CI_COMMIT_REF_NAME" "CI_REPOSITORY_URL=$CI_REPOSITORY_URL" "GITLAB_TOKEN=$GITLAB_TOKEN" "CI_JOB_TOKEN=$CI_JOB_TOKEN" "GITLAB_USER_EMAIL=$GITLAB_USER_EMAIL" "GITLAB_USER_NAME=\"$GITLAB_USER_NAME\"" "CI_REGISTRY_USER=$CI_REGISTRY_USER" "CI_PROJECT_ID=$CI_PROJECT_ID" "CI_PROJECT_DIR=$CI_PROJECT_DIR" "CI_API_V4_URL=$CI_API_V4_URL" "CI_PROJECT_NAME=$CI_PROJECT_NAME" "CI_COMMIT_SHORT_SHA=$CI_COMMIT_SHORT_SHA"); for var in "${variables[@]}"; do echo "$var" >> .env-gitlab-ci; done + - cat .env-gitlab-ci + +after_script: + - if [ -f .env-gitlab-ci ]; then rm .env-gitlab-ci; fi + +test: + stage: test + tags: + - nixos + script: + - devenv shell test-lib + + cache: + - key: nixos + paths: + - /nix/store + + artifacts: + paths: + - dist + +deploy: + stage: deploy + tags: + - nixos + script: + - devenv shell -c deploy-lib + + when: on_success + + cache: + - key: nixos + paths: + - /nix/store + + + artifacts: + paths: + - dist diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..13566b81b018ad684f3a35fee301741b2734c8f4 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/aws.xml b/.idea/aws.xml new file mode 100644 index 0000000000000000000000000000000000000000..ec328d0bbf68db9e7322932181cc811412e3ca87 --- /dev/null +++ b/.idea/aws.xml @@ -0,0 +1,17 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project version="4"> + <component name="accountSettings"> + <option name="activeProfile" value="profile:default" /> + <option name="activeRegion" value="eu-west-1" /> + <option name="recentlyUsedProfiles"> + <list> + <option value="profile:default" /> + </list> + </option> + <option name="recentlyUsedRegions"> + <list> + <option value="eu-west-1" /> + </list> + </option> + </component> +</project> \ No newline at end of file diff --git a/.idea/job-queues.iml b/.idea/job-queues.iml new file mode 100644 index 0000000000000000000000000000000000000000..25ed3f6e7b6e344b6ca91ebcc5d005f35357f9cf --- /dev/null +++ b/.idea/job-queues.iml @@ -0,0 +1,10 @@ +<?xml version="1.0" encoding="UTF-8"?> +<module type="JAVA_MODULE" version="4"> + <component name="Go" enabled="true" /> + <component name="NewModuleRootManager" inherit-compiler-output="true"> + <exclude-output /> + <content url="file://$MODULE_DIR$" /> + <orderEntry type="inheritedJdk" /> + <orderEntry type="sourceFolder" forTests="false" /> + </component> +</module> \ No newline at end of file diff --git a/.idea/markdown.xml b/.idea/markdown.xml new file mode 100644 index 0000000000000000000000000000000000000000..ec0b30fa7ea2824af6923493653e32595b0907a8 --- /dev/null +++ b/.idea/markdown.xml @@ -0,0 +1,9 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project version="4"> + <component name="MarkdownSettings"> + <enabledExtensions> + <entry key="MermaidLanguageExtension" value="false" /> + <entry key="PlantUMLLanguageExtension" value="true" /> + </enabledExtensions> + </component> +</project> \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000000000000000000000000000000000000..639900d13c6182e452e33a3bd638e70a0146c785 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,6 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project version="4"> + <component name="ProjectRootManager"> + <output url="file://$PROJECT_DIR$/out" /> + </component> +</project> \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000000000000000000000000000000000000..f095315fb2465a83ac56a163761565ae21b34c74 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project version="4"> + <component name="ProjectModuleManager"> + <modules> + <module fileurl="file://$PROJECT_DIR$/.idea/job-queues.iml" filepath="$PROJECT_DIR$/.idea/job-queues.iml" /> + </modules> + </component> +</project> \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000000000000000000000000000000000000..35eb1ddfbbc029bcab630581847471d7f238ec53 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project version="4"> + <component name="VcsDirectoryMappings"> + <mapping directory="" vcs="Git" /> + </component> +</project> \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000000000000000000000000000000000000..8aa3080082f117124214e2d4c96b5f14d6fbf54c --- /dev/null +++ b/LICENSE @@ -0,0 +1,14 @@ +Copyright (C) 2022 by schukai GmbH. + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published +by the Free Software Foundation, either version 3 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see <https://www.gnu.org/licenses/>. + diff --git a/README.md b/README.md index ddf35880dbd821e018b25e751ad57d3f471721db..728356c3551769cb2d2b084141df68b35ff40bd6 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,130 @@ # Job Queues -The jobque allows the processing of tasks. \ No newline at end of file +The `jobQueue` library in Go aims to serve as a Cron replacement, enabling the scheduled and event-driven execution of tasks in an organized manner. + +## Library Requirements + +### Core Requirements + +1. **Job Management Structure**: A structure to catalog all jobs, providing methods to add and remove jobs. + +2. **YAML Import**: A function to import job definitions from a YAML file. + +3. **Job Exclusivity**: Jobs should be either concurrently runnable or exclusive. + +4. **System Resource Monitoring**: Real-time monitoring of system resources like CPU, memory, etc., consumed by the jobs. + +5. **Priority and Distribution**: Each job should have a priority. Jobs should also be evenly distributed across available resources. + +6. **Single and Recurring Jobs**: Support for both one-time and recurring jobs. + +7. **Logging**: Each job should have a log written that includes Process ID, memory usage, start time, end time, and exit code. + +8. **Priority Escalation**: If a job is not run due to its priority, its priority should escalate over time. + +### Additional Requirements + +1. **Error Handling**: Mechanisms for effective error handling, especially for failed or terminated jobs. + +2. **Notification System**: Optionally, an interface for notifications on job failures or completions. + +3. **Resource Limits**: Ability to set resource limits per job. + +4. **Job Dependencies**: Optional support for jobs that depend on the successful completion of other jobs. + +5. **Testability**: The library should be easy to test, ideally via unit tests. + +6. **Documentation**: Comprehensive API documentation and a guide on how the library works. + + + +## Documentation + +### `JobData` Structure + +#### Fields + +- **Id (JobIDType)** + - Unique identifier for the job. + +- **Priority (int)** + - The priority level of the job in the queue. + +- **Exclusive (bool)** + - Specifies whether the job should be executed exclusively or not. + +- **MaxRuns (int)** + - The maximum number of times the job should be executed. + +- **Concurrency (int)** + - The maximum number of concurrent executions of the job. + +- **LastRun (time.Time)** + - Timestamp for when the job was last executed. + +- **NextRun (time.Time)** + - Timestamp for the next scheduled execution of the job. + +- **Logs ([]JobLog)** + - An array of log entries for the job. + +- **Schedule (string)** + - A cron expression that defines the execution schedule. + +- **Status (JobStatus)** + - The current status of the job (e.g., `Running`, `Completed`). + +- **Timeout (time.Duration)** + - The maximum duration the job is allowed to run. + +- **Retries (int)** + - The number of times the job will be retried if it fails. + +- **RetryDelay (time.Duration)** + - The delay before retrying the job if it fails. + +- **ResourceLimits (struct)** + - Resource limits for CPU and Memory. + + - **CPULimit (float64)** + - CPU limit for the job. + + - **MemoryLimit (uint64)** + - Memory limit for the job. + +- **Dependencies ([]JobIDType)** + - List of job IDs this job depends on. + +- **Tags ([]string)** + - Tags associated with the job for easier categorization. + +- **Metadata (map[string]interface{})** + - Metadata for extending the structure with additional information. + +- **Stats (JobStats)** + - Job-related statistics. + +### `Job` Structure + +Inherits fields from `JobData` and adds additional functional fields. + +#### Fields + +- **Runnable (Runnable)** + - A function or method that defines the action of the job. This is what gets executed. + +- **scheduleImpl (cron.Schedule)** + - Internal cron schedule implementation that interprets the `Schedule` string in `JobData`. + +- **TelemetryHooks ([]func(*JobLog))** + - An array of functions that will be called for telemetry during the job's execution. These hooks can update a `JobLog` object. + +- **Failover (func() error)** + - A function that gets called if the primary execution fails for some reason. It's meant for fail over mechanisms. + +- **ctx (context.Context)** + - A context that can carry deadlines, cancellations, and other request-scoped values across API boundaries and between processes. + +- **mu (sync.Mutex)** + - Mutex for synchronizing access to the job's fields, making the job safe for concurrent use. + diff --git a/Taskfile.yml b/Taskfile.yml new file mode 100644 index 0000000000000000000000000000000000000000..b7313f4d13319627727838e8df5302c52eefc042 --- /dev/null +++ b/Taskfile.yml @@ -0,0 +1,59 @@ + +# THIS FILE IS AUTOGENERATED BY THE DEVENVSHELL +# DO NOT EDIT THIS FILE MANUALLY +# INSTEAD EDIT THE DEVENVSHELL CONFIGURATION FILE devenv.nix +# AND OPEN A SHELL WITH THE COMMAND devenv shell +# +# Information about the task runner can be found here: +# https://taskfile.dev + +version: '3' + +tasks: + default: + cmds: + - task --list + silent: true + + test: + desc: Execute unit tests in Go. + cmds: + - echo "Execute unit tests in Go." + - go test -cover -v ./... + - go test -bench -v ./... + - go test -race -v ./... + + test-fuzz: + desc: Conduct fuzzing tests.# + cmds: + - echo "Conduct fuzzing tests." + - go test -v -fuzztime=30s -fuzz=Fuzz ./... + + add-licenses: + desc: Attach license headers to Go files. + cmds: + - echo "Attach license headers to Go files." + - go install github.com/google/addlicense@latest + - addlicense -c "schukai GmbH" -s -l "AGPL-3.0" ./*.go + silent: true + + check-licenses: + desc: Check license headers of Go files. + silent: true + cmds: + - go-licenses save "$(get-go-default-packages)" --ignore "gitlab.schukai.com" --force --save_path ${DEVENV_ROOT}/licenses/ + + check: + desc: Confirm repository status. + cmds: + - git diff-index --quiet HEAD || (echo "There are uncommitted changes after running make. Please commit or stash them before running make."; exit 1) + silent: true + + commit: + desc: Commit changes to the repository. + aliases: + - c + - ci + - git-commit + cmds: + - do-git-commit diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 0000000000000000000000000000000000000000..4ee4766a1a9efd9595784407cfe2403a8cabb860 --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,39 @@ +package main + +import ( + "fmt" + jobqueue "gitlab.schukai.com/oss/libraries/go/services/job-queues.git" +) + +func main() { + jq := jobqueue.NewJobs() + if jq == nil { + panic("NewJobs returned nil") + } + + // Hinzufügen eines neuen Jobs + err := jq.AddJob(jobqueue.JobSpecification{ + Id: "test", + Priority: 1, + }, &jobqueue.ExternalProcessRunner{ + Command: "sleep", + Args: []string{"1"}, + }) + if err != nil { + panic(err) + } + + // Abrufen aller Jobs + allJobs := jq.GetJobs() + fmt.Println("Alle Jobs:", allJobs) + + // Entfernen eines Jobs + removed, err := jq.RemoveJob("test") + if err != nil { + panic(err) + } + if removed { + fmt.Println("Job wurde entfernt.") + } + +} diff --git a/devenv.lock b/devenv.lock new file mode 100644 index 0000000000000000000000000000000000000000..4daa0b38d8565c0fc68525c74b3a5f97bd431458 --- /dev/null +++ b/devenv.lock @@ -0,0 +1,190 @@ +{ + "nodes": { + "devenv": { + "locked": { + "dir": "src/modules", + "lastModified": 1696344641, + "narHash": "sha256-cfGsdtDvzYaFA7oGWSgcd1yST6LFwvjMcHvtVj56VcU=", + "owner": "cachix", + "repo": "devenv", + "rev": "05e26941f34486bff6ebeb4b9c169b6f637f1758", + "type": "github" + }, + "original": { + "dir": "src/modules", + "owner": "cachix", + "repo": "devenv", + "type": "github" + } + }, + "flake-compat": { + "flake": false, + "locked": { + "lastModified": 1673956053, + "narHash": "sha256-4gtG9iQuiKITOjNQQeQIpoIB6b16fm+504Ch3sNKLd8=", + "owner": "edolstra", + "repo": "flake-compat", + "rev": "35bb57c0c8d8b62bbfd284272c928ceb64ddbde9", + "type": "github" + }, + "original": { + "owner": "edolstra", + "repo": "flake-compat", + "type": "github" + } + }, + "flake-utils": { + "inputs": { + "systems": "systems" + }, + "locked": { + "lastModified": 1685518550, + "narHash": "sha256-o2d0KcvaXzTrPRIo0kOLV0/QXHhDQ5DTi+OxcjO8xqY=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "a1720a10a6cfe8234c0e93907ffe81be440f4cef", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + }, + "gitignore": { + "inputs": { + "nixpkgs": [ + "pre-commit-hooks", + "nixpkgs" + ] + }, + "locked": { + "lastModified": 1660459072, + "narHash": "sha256-8DFJjXG8zqoONA1vXtgeKXy68KdJL5UaXR8NtVMUbx8=", + "owner": "hercules-ci", + "repo": "gitignore.nix", + "rev": "a20de23b925fd8264fd7fad6454652e142fd7f73", + "type": "github" + }, + "original": { + "owner": "hercules-ci", + "repo": "gitignore.nix", + "type": "github" + } + }, + "nixpkgs": { + "locked": { + "lastModified": 1696374741, + "narHash": "sha256-gt8B3G0ryizT9HSB4cCO8QoxdbsHnrQH+/BdKxOwqF0=", + "owner": "nixos", + "repo": "nixpkgs", + "rev": "8a4c17493e5c39769f79117937c79e1c88de6729", + "type": "github" + }, + "original": { + "owner": "nixos", + "ref": "nixos-23.05", + "repo": "nixpkgs", + "type": "github" + } + }, + "nixpkgs-stable": { + "locked": { + "lastModified": 1685801374, + "narHash": "sha256-otaSUoFEMM+LjBI1XL/xGB5ao6IwnZOXc47qhIgJe8U=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "c37ca420157f4abc31e26f436c1145f8951ff373", + "type": "github" + }, + "original": { + "owner": "NixOS", + "ref": "nixos-23.05", + "repo": "nixpkgs", + "type": "github" + } + }, + "nixpkgs_2": { + "locked": { + "lastModified": 1696374741, + "narHash": "sha256-gt8B3G0ryizT9HSB4cCO8QoxdbsHnrQH+/BdKxOwqF0=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "8a4c17493e5c39769f79117937c79e1c88de6729", + "type": "github" + }, + "original": { + "id": "nixpkgs", + "ref": "nixos-23.05", + "type": "indirect" + } + }, + "pre-commit-hooks": { + "inputs": { + "flake-compat": "flake-compat", + "flake-utils": "flake-utils", + "gitignore": "gitignore", + "nixpkgs": [ + "nixpkgs" + ], + "nixpkgs-stable": "nixpkgs-stable" + }, + "locked": { + "lastModified": 1696516544, + "narHash": "sha256-8rKE8Je6twTNFRTGF63P9mE3lZIq917RAicdc4XJO80=", + "owner": "cachix", + "repo": "pre-commit-hooks.nix", + "rev": "66c352d33e0907239e4a69416334f64af2c685cc", + "type": "github" + }, + "original": { + "owner": "cachix", + "repo": "pre-commit-hooks.nix", + "type": "github" + } + }, + "root": { + "inputs": { + "devenv": "devenv", + "nixpkgs": "nixpkgs", + "pre-commit-hooks": "pre-commit-hooks", + "version": "version" + } + }, + "systems": { + "locked": { + "lastModified": 1681028828, + "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", + "owner": "nix-systems", + "repo": "default", + "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", + "type": "github" + }, + "original": { + "owner": "nix-systems", + "repo": "default", + "type": "github" + } + }, + "version": { + "inputs": { + "nixpkgs": "nixpkgs_2" + }, + "locked": { + "lastModified": 1690668568, + "narHash": "sha256-jzixQKFFW4oxO0S4GYqbkFCXzhBd6com6Z9+MtVKakU=", + "ref": "refs/heads/master", + "rev": "3838f03165b726e47d586c04a1821749375e1001", + "revCount": 37, + "type": "git", + "url": "https://gitlab.schukai.com/oss/utilities/version.git" + }, + "original": { + "type": "git", + "url": "https://gitlab.schukai.com/oss/utilities/version.git" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/devenv.nix b/devenv.nix new file mode 100644 index 0000000000000000000000000000000000000000..f0ce8c2437f756120d722c1cb14f893fd60febaf --- /dev/null +++ b/devenv.nix @@ -0,0 +1,663 @@ +{ pkgs, inputs, phps, lib, config, modulesPath, ... }: + +{ + # https://devenv.sh/packages/ + packages = with pkgs; [ + inputs.version.defaultPackage."${builtins.currentSystem}" + appimage-run + blackbox + blackbox-terminal + coreutils-full + dbeaver + delve + dialog + drill + exa + fd + fd + gcc12 + gdlv + git + glab + gnugrep + gnumake + gnused + go-licenses + go-task + gum + httpie + hurl + jq + libffi + logrotate + meld + memcached + netcat + nixfmt + procps + ranger + unixtools.xxd + unzip + util-linux + wget + zlib + nodePackages.mermaid-cli + feh + + ]; + + # https://devenv.sh/languages/ + # languages.nix.enable = true; + languages = { go = { enable = true; }; }; + + difftastic.enable = true; + + scripts.draw-graph.exec = '' + echo -e "Enter Meirmaid graph definition. ''${RED}End with Ctrl+D''${RESET}\n" + diagram=$(${pkgs.gum}/bin/gum write --placeholder "Enter Meirmaid graph definition. End with Ctrl+D") + + tmpOutput=$(mktemp).png + + echo "$diagram" | ${pkgs.nodePackages.mermaid-cli}/bin/mmdc -i - -o "$tmpOutput" + ${pkgs.feh}/bin/feh $tmpOutput + + # should delte the file, but does not work ask with gum + ${pkgs.gum}/bin/gum confirm "Delete temporary file?" + if [ $? -eq 0 ]; then + rm "$tmpOutput" + else + echo "not deleting; file is at $tmpOutput" + fi + + + + + ''; + + scripts.get-go-default-packages.exec = '' + #!${pkgs.bash}/bin/bash + echo $(awk -F ' ' '/^module / { print $2 }' go.mod) + ''; + + # This script is executed when the app is built + # You can use it to build the app + scripts.test-lib.exec = '' + #!${pkgs.bash}/bin/bash + #set -euo pipefail + set -x + + PATH="''${PATH}":${pkgs.coreutils}/bin + PATH="''${PATH}":${pkgs.findutils}/bin + PATH="''${PATH}":${pkgs.jq}/bin/ + PATH="''${PATH}":${pkgs.rsync}/bin/ + PATH="''${PATH}":${pkgs.bash}/bin/ + PATH="''${PATH}":${pkgs.curl}/bin/ + PATH="''${PATH}":${pkgs.moreutils}/bin/ + PATH="''${PATH}":${pkgs.gnutar}/bin + PATH="''${PATH}":${pkgs.gzip}/bin/ + PATH="''${PATH}":${pkgs.procps}/bin/ + PATH="''${PATH}":${pkgs.exa}/bin/ + PATH="''${PATH}":${pkgs.git}/bin/ + PATH="''${PATH}":${pkgs.gnugrep}/bin/ + PATH="''${PATH}":${ + inputs.version.defaultPackage."${builtins.currentSystem}" + }/bin/ + + export PATH + + task test + + ''; + + # This scritp is used to deploy the app to the gitlab registry + # It is used by the gitlab-ci.yml file + # The environment variables are set in the gitlab project settings + scripts.deploy-lib.exec = '' + #!${pkgs.bash}/bin/bash + + PATH="''${PATH}":${pkgs.coreutils}/bin + PATH="''${PATH}":${pkgs.jq}/bin/ + PATH="''${PATH}":${pkgs.curl}/bin/ + PATH="''${PATH}":${pkgs.moreutils}/bin/ + PATH="''${PATH}":${pkgs.gnutar}/bin + PATH="''${PATH}":${pkgs.gzip}/bin/ + PATH="''${PATH}":${pkgs.exa}/bin/ + PATH="''${PATH}":${pkgs.git}/bin/ + PATH="''${PATH}":${ + inputs.version.defaultPackage."${builtins.currentSystem}" + }/bin/ + + export PATH + + if [[ -f .env-gitlab-ci ]]; then + source .env-gitlab-ci + rm .env-gitlab-ci + fi + + set -x + ## if $HOME not set, set it to current directory + if [[ -z "''${HOME}" ]]; then + HOME=$(pwd) + fi + + export HOME + + git config user.email "''${GITLAB_USER_EMAIL}" + git config user.name "''${GITLAB_USER_NAME:-"Gitlab CI"}" + git config pull.rebase true + git config http.sslVerify "false" + git remote set-url origin https://pad:''${GITLAB_TOKEN}@''${CI_REPOSITORY_URL#*@} + + git fetch --all --tags --unshallow + git reset --hard origin/master + git checkout $CI_COMMIT_REF_NAME + git pull origin $CI_COMMIT_REF_NAME + + if [ ! -z "''${CI_PROJECT_DIR}" ]; then + echo "CI_PROJECT_DIR is set, using it as project root." + project_root=$(realpath "''${CI_PROJECT_DIR}")/ + elif [ ! -z "''${DEVENV_ROOT}" ]; then + echo "DEVENV_ROOT is set, using it as project root." + project_root=$(realpath "''${DEVENV_ROOT}")/ + else + echo "Error: DEVENV_ROOT or CI_PROJECT_DIR environment variables are not set." + exit 1 + fi + + if [ ! -d "''${project_root}" ]; then + echo "Error: Project root directory does not seem to be valid." + echo "Check the DEVENV_ROOT or CI_PROJECT_DIR environment variables." + exit 1 + fi + + if [ -z "'CI_JOB_TOKEN" ]; then + echo "Error: CI_JOB_TOKEN variable is not set." + exit 1 + fi + + git --no-pager log --decorate=short --pretty=oneline + gitVersion=v$(version predict) + git tag -a $gitVersion -m"chore: bump version" + git --no-pager log --decorate=short --pretty=oneline + git push -o ci.skip origin ''${CI_COMMIT_REF_NAME} --tags + + echo "done" + + ''; + + enterShell = '' + + cat <<'EOF' > Taskfile.yml + + # THIS FILE IS AUTOGENERATED BY THE DEVENVSHELL + # DO NOT EDIT THIS FILE MANUALLY + # INSTEAD EDIT THE DEVENVSHELL CONFIGURATION FILE devenv.nix + # AND OPEN A SHELL WITH THE COMMAND devenv shell + # + # Information about the task runner can be found here: + # https://taskfile.dev + + version: '3' + + tasks: + default: + cmds: + - task --list + silent: true + + test: + desc: Execute unit tests in Go. + cmds: + - echo "Execute unit tests in Go." + - go test -cover -v ./... + - go test -bench -v ./... + - go test -race -v ./... + + test-fuzz: + desc: Conduct fuzzing tests.# + cmds: + - echo "Conduct fuzzing tests." + - go test -v -fuzztime=30s -fuzz=Fuzz ./... + + add-licenses: + desc: Attach license headers to Go files. + cmds: + - echo "Attach license headers to Go files." + - go install github.com/google/addlicense@latest + - addlicense -c "schukai GmbH" -s -l "AGPL-3.0" ./*.go + silent: true + + check-licenses: + desc: Check license headers of Go files. + silent: true + cmds: + - go-licenses save "$(get-go-default-packages)" --ignore "gitlab.schukai.com" --force --save_path ''${DEVENV_ROOT}/licenses/ + + check: + desc: Confirm repository status. + cmds: + - git diff-index --quiet HEAD || (echo "There are uncommitted changes after running make. Please commit or stash them before running make."; exit 1) + silent: true + + commit: + desc: Commit changes to the repository. + aliases: + - c + - ci + - git-commit + cmds: + - do-git-commit + EOF + + cat <<'EOF' > .gitlab-ci.yml + + # THIS FILE IS AUTOGENERATED BY THE DEVENVSHELL + # DO NOT EDIT THIS FILE MANUALLY + # INSTEAD EDIT THE DEVENVSHELL CONFIGURATION FILE devenv.nix + # AND OPEN A SHELL WITH THE COMMAND devenv shell + # + + image: docker-registry.schukai.com:443/nixos-ci-devenv:latest + + services: + - docker:dind + + variables: + # The repo name as used in + # https://github.com/nix-community/NUR/blob/master/repos.json + NIXOS_VERSION: "23.05" + NIXPKGS_ALLOW_UNFREE: "1" + NIXPKGS_ALLOW_INSECURE: "1" + DOCKER_DRIVER: overlay2 + GIT_DEPTH: 10 + + stages: + - test + - deploy + + before_script: + - nix shell nixpkgs#coreutils-full -c mkdir -p /certs/client/ + - nix shell nixpkgs#coreutils-full -c ln -fs /etc/ssl/certs/ca-bundle.crt /certs/client/ca.pem + - echo > .env-gitlab-ci + - variables=("HOME=$HOME" "CI_COMMIT_REF_NAME=$CI_COMMIT_REF_NAME" "CI_REPOSITORY_URL=$CI_REPOSITORY_URL" "GITLAB_TOKEN=$GITLAB_TOKEN" "CI_JOB_TOKEN=$CI_JOB_TOKEN" "GITLAB_USER_EMAIL=$GITLAB_USER_EMAIL" "GITLAB_USER_NAME=\"$GITLAB_USER_NAME\"" "CI_REGISTRY_USER=$CI_REGISTRY_USER" "CI_PROJECT_ID=$CI_PROJECT_ID" "CI_PROJECT_DIR=$CI_PROJECT_DIR" "CI_API_V4_URL=$CI_API_V4_URL" "CI_PROJECT_NAME=$CI_PROJECT_NAME" "CI_COMMIT_SHORT_SHA=$CI_COMMIT_SHORT_SHA"); for var in "''${variables[@]}"; do echo "$var" >> .env-gitlab-ci; done + - cat .env-gitlab-ci + + after_script: + - if [ -f .env-gitlab-ci ]; then rm .env-gitlab-ci; fi + + test: + stage: test + tags: + - nixos + script: + - devenv shell test-lib + + cache: + - key: nixos + paths: + - /nix/store + + artifacts: + paths: + - dist + + deploy: + stage: deploy + tags: + - nixos + script: + - devenv shell -c deploy-lib + + when: on_success + + cache: + - key: nixos + paths: + - /nix/store + + + artifacts: + paths: + - dist + EOF + + + + ''; + + scripts.do-git-commit.exec = '' + #!/usr/bin/env bash + + # Define colors if the terminal supports it + if [ -t 1 ]; then + RED='\033[0;31m' + GREEN='\033[0;32m' + RESET='\033[0m' + BOLD='\033[1m' + else + RED="" + GREEN="" + RESET="" + fi + + step=1 + + reset + clear + + # create random log file + LOGFILE="$(mktemp)" + if [ $? -ne 0 ]; then + echo -e "''${RED}✖ Could not create temporary log file. Exiting.''${RESET}" + exit 1 + fi + + log_and_display() { + echo -e "''${GREEN}==> $step. $1''${RESET}" | tee -a $LOGFILE + step=$((step + 1)) + } + + log_error_and_display() { + echo -e "''${RED}==> $step. $1''${RESET}" | tee -a $LOGFILE + } + + printLogfileAndExit() { + exit_code=$1 + echo -e "\n\n========================================\n\n\n" + + echo -e "\n\n''${BOLD}Git and GitLab Automation Script''${RESET}\n\nI have performed the following actions:\n\n" + cat "$LOGFILE" + + # Optional: Remove log file + rm -f "$LOGFILE" + + if [ $exit_code -eq 0 ]; then + echo -e "\n''${GREEN}✔''${RESET} All actions were successful" | tee -a $LOGFILE + elif [ $exit_code -eq -1 ]; then + echo -e "\n''${RED}✖''${RESET} The script was manually cancelled" | tee -a $LOGFILE + exit_code=0 + else + echo -e "\n''${RED}✖''${RESET} Some actions failed" | tee -a $LOGFILE + fi + + exit $exit_code + } + + print_headline() { + local title=$1 + local underline=$(printf '─%.0s' $(seq 1 ''${#title})) + echo -e "\n\n''${BOLD}''${title}\n''${underline}''${RESET}\n" + } + + do_cancel() { + echo -e "''${RED}==> ✖ Cancelled.''${RESET}" | tee -a $LOGFILE + printLogfileAndExit -1 + } + + # Function for unified logging and display + log_action() { + if [ $? -eq 0 ]; then + echo -e " ''${GREEN}✔''${RESET} $1: Successful" | tee -a $LOGFILE + else + echo -e " ''${RED}✖''${RESET} $1: Failed" | tee -a $LOGFILE + printLogfileAndExit 1 + fi + } + + print_what_to_do() { + echo -e "\n\nWhat do you want to do?\n" + } + + git_status=$(git status --porcelain) + if [[ -z "$git_status" ]]; then + log_error_and_display "No changes to commit. Exiting." + printLogfileAndExit 0 + fi + + print_headline "Choose commit type" + selection=$(gum choose "feat: (new feature for the user, not a new feature for build script)" "fix: (bug fix for the user, not a fix to a build script)" "chore: (updating grunt tasks etc.; no production code change)" "docs: (changes to the documentation)" "style: (formatting, missing semi colons, etc; no production code change)" "refactor: (refactoring production code, eg. renaming a variable)" "test: (adding missing tests, refactoring tests; no production code change)" "Cancel") + + commit_type=$(echo "$selection" | awk -F':' '{print $1}') + + if [[ "$commit_type" == "Cancel" ]]; then + do_cancel + fi + + log_and_display "You chose the commit type: $commit_type" + + # NEXT STEP ISSUE HANDLING ############################################################################################################ + #log_and_display "Issue handling" + + gitlabIssues=() + while IFS= read -r line; do + if [[ $line =~ ^# ]]; then + id=$(echo "$line" | awk '{print substr($1, 2)}') + title=$(echo "$line" | awk -F'about' '{print $1}' | awk '{$1=$2=""; print substr($0, 3)}') + gitlabIssues+=("$id > $title") + fi + done < <(gum spin --spinner dot --show-output --title "Ask gitlab ..." -- glab issue list --output-format=details) + + ## if issues are available, ask if user wants to use an existing issue or create a new one + createOption="Create new issue" + existingOption="Use existing issue" + cancelOption="Cancel" + + print_headline "Choose issue handling" + if [ ''${#gitlabIssues[@]} -eq 0 ]; then + log_and_display "There are no issues available." + + print_what_to_do + choice=$(gum choose "$createOption" "$cancelOption") + + else + log_and_display "There are ''${#gitlabIssues[@]} issues available." + print_what_to_do + choice=$(gum choose "$createOption" "$existingOption" "$cancelOption") + fi + + if [[ "$choice" == "$cancelOption" ]]; then + do_cancel + fi + + ## array of issue ids + work_on_issue_ids=() + + issue_text="" + + if [[ "$choice" == "$createOption" ]]; then + print_headline "Create new issue" + issue_text=$(gum input --placeholder "Enter issue title") + echo -e "Enter issue description. ''${RED}End with Ctrl+D''${RESET}\n" + issue_description=$(gum write --placeholder "Enter issue description. End with Ctrl+D") + + if [[ -z "$issue_text" ]]; then + log_error_and_display "Issue title is empty. Exiting." + printLogfileAndExit 1 + fi + + log_and_display "You entered the issue title: $issue_text" + log_and_display "You entered the issue description: $issue_description" + echo -e "\n" + + gum confirm "Do you want to create this issue?" + # gum confirm exits with status 0 if confirmed and status 1 if cancelled. + if [ $? -eq 1 ]; then + do_cancel + fi + + issue_output=$(glab issue create -t"$issue_text" --no-editor --description "$issue_description") + issue_id=$(echo "$issue_output" | grep -oP '(?<=/issues/)\d+') + + work_on_issue_ids+=("$issue_id") + + log_action "glab issue with id $issue_id created" + + else + + print_headline "Use existing issue" + echo -e "Select issue with arrow keys and press tab or space to select. Press enter to confirm.\n" + issue_ids=$(gum choose --no-limit "''${gitlabIssues[@]}") + + # assign issue_ids to work_on_issue_ids. iterate over lines and take integer from beginning of line + while IFS= read -r line; do + work_on_issue_ids+=($(echo "$line" | grep -oP '^\d+')) + done <<<"$issue_ids" + + fi + + if [ ''${#work_on_issue_ids[@]} -eq 0 ]; then + log_and_display "No issue selected. Exiting." + printLogfileAndExit 0 + fi + + # NEXT STEP COMMIT MESSAGE ############################################################################################################ + # print work_on_issue_ids + work_on_issue_ids_string="" + for i in "''${work_on_issue_ids[@]}"; do + work_on_issue_ids_string+="#$i " + done + + log_and_display "You chose to work on the following issues: ''${work_on_issue_ids_string}" + + + print_headline "Check for changes to commit" + + # ' ' = unmodified + # M = modified + # T = file type changed (regular file, symbolic link or submodule) + # A = added + # D = deleted + # R = renamed + # C = copied (if config option status.renames is set to "copies") + # U = updated but unmerged + # https://man.freebsd.org/cgi/man.cgi?query=git-status&sektion=1&manpath=freebsd-release-ports + + count_all_changes=$(echo "$git_status" | wc -l) + count_staged_changes=$(echo "$git_status" | grep -c '^M') + count_new_staged_files=$(echo "$git_status" | grep -c '^A') + count_staged_changes=$((count_staged_changes + count_new_staged_files)) + + + + git_options_all="All $count_all_changes changes" + git_options_staged="Only the $count_staged_changes staged changes" + git_options_select_files="Select files" + git_options_cancel="Cancel" + + git_options_array=() + if [[ $count_all_changes -gt 0 ]]; then + git_options_array+=("$git_options_all") + fi + + if [[ $count_staged_changes -gt 0 ]]; then + git_options_array+=("$git_options_staged") + fi + + git_options_array+=( "$git_options_select_files" ) + git_options_array+=( "$git_options_cancel" ) + + + selection=$(gum choose "''${git_options_array[@]}") + if [[ "$selection" == "$git_options_cancel" ]]; then + do_cancel + fi + + if [[ "$selection" == "$git_options_all" ]]; then + git add -A + echo "1" + elif [[ "$selection" == "$git_options_select_files" ]]; then + + files=() + while IFS= read -r line; do + files+=("$line") + done <<<"$git_status" + + selected_files=$(gum choose --no-limit "''${files[@]}") + + # no files selected + if [[ -z "$selected_files" ]]; then + log_and_display "No files selected. Exiting." + printLogfileAndExit 0 + fi + + # add selected files + while IFS= read -r line; do + ## git proclimne could have letter, ? or ! at the beginning of the line + file=$(echo "$line" | awk '{print $2}') + if [[ -z "$file" || ! -f "$file" ]]; then + log_and_display "No file found in line: $line" + continue + fi + + git add "$file" + done <<<"$selected_files" + + fi + + ## count staged changes again and print + count_staged_changes=$(echo "$git_status" | grep -c '^M') + count_new_staged_files=$(echo "$git_status" | grep -c '^A') + count_staged_changes=$((count_staged_changes + count_new_staged_files)) + + log_and_display "You have $count_staged_changes staged changes to commit." + + # NEXT STEP COMMIT MESSAGE ############################################################################################################ + + print_headline "Enter commit message" + commit_message=$(gum input --placeholder "Enter commit message" --value "$commit_type: $issue_text $work_on_issue_ids_string") + + if [[ -z "$commit_message" ]]; then + log_error_and_display "Commit message is empty. Exiting." + printLogfileAndExit 1 + fi + + log_and_display "You entered the commit message: $commit_message" + + gum confirm "Do you want to commit with this message?" + if [ $? -eq 1 ]; then + do_cancel + fi + + # NEXT STEP COMMIT #################################################################################################################### + print_headline "Committing changes" + + if ! git commit -m "$commit_message" ; then + log_error_and_display "Commit failed. Exiting." + printLogfileAndExit 1 + fi + + log_and_display "Commit successful." + + # NEXT STEP PUSH ###################################################################################################################### + + print_headline "Pushing changes" + + if ! git push ; then + log_error_and_display "Push failed. Exiting." + printLogfileAndExit 1 + fi + + log_and_display "Push successful." + + # Close issue ###################################################################################################################### + + print_headline "Closing issues" + + for issue_id in "''${work_on_issue_ids[@]}"; do + + gum confirm "Do you want to close issue #$issue_id?" + if [ $? -eq 1 ]; then + continue + fi + + if ! glab issue close "$issue_id" ; then + log_error_and_display "Closing issue $issue_id failed. Exiting." + else + log_and_display "Closing issue $issue_id successful." + fi + done + + printLogfileAndExit 0 + ''; + +} diff --git a/devenv.yaml b/devenv.yaml new file mode 100644 index 0000000000000000000000000000000000000000..525a6f02f172bb9108a23c335a4ced3e80f78502 --- /dev/null +++ b/devenv.yaml @@ -0,0 +1,7 @@ +inputs: + nixpkgs: + url: github:nixos/nixpkgs/nixos-23.05 + + version: + url: git+https://gitlab.schukai.com/oss/utilities/version.git + flake: true diff --git a/error.go b/error.go new file mode 100644 index 0000000000000000000000000000000000000000..85dfb4bf080e553de9e83bb41691acc67b086452 --- /dev/null +++ b/error.go @@ -0,0 +1,23 @@ +package jobqueue + +import "fmt" + +var ( + ErrCPUPercentage = fmt.Errorf("could not get CPU percentage") + ErrIntervalIsZero = fmt.Errorf("interval must not be 0") + ErrResourceLimitExceeded = fmt.Errorf("resource limit exceeded") + ErrNoRunDefined = fmt.Errorf("no runnable function defined") + ErrCycleDetected = fmt.Errorf("cycle detected") + ErrJobAlreadyExists = fmt.Errorf("job already exists") + ErrUnknownDependency = fmt.Errorf("unknown dependency") + ErrMissingDependency = fmt.Errorf("missing dependency") + ErrJobNotFound = fmt.Errorf("job not found") + ErrJobIsDependency = fmt.Errorf("job is a dependency") + ErrNotRunning = fmt.Errorf("job queue is not running") + ErrAlreadyPaused = fmt.Errorf("job queue is already paused") + ErrAlreadyStopped = fmt.Errorf("job queue is already stopped") + ErrNotPaused = fmt.Errorf("job queue is not paused") + ErrAlreadyStarted = fmt.Errorf("job queue is already started") + ErrTimeout = fmt.Errorf("job timed out") + //ErrInitializationFailed = fmt.Errorf("resource monitoring initialization failed") +) diff --git a/executor.go b/executor.go new file mode 100644 index 0000000000000000000000000000000000000000..d9b767e0ae121fc1252036f473806231dbf7932a --- /dev/null +++ b/executor.go @@ -0,0 +1,246 @@ +package jobqueue + +import ( + "context" + "sync" + "sync/atomic" + "time" +) + +const ( + MODE_STOPPED = iota + MODE_RUNNING + MODE_PAUSED +) + +type mode int32 + +type shouldStopFunc func(executor *jobExecutor) bool + +type jobExecutor struct { + mutex sync.Mutex + + Queue JobsInterface + Ctx context.Context + CancelFunc context.CancelFunc + MaxParallel int + interval time.Duration + Ticker *time.Ticker + cleanupTimer *time.Ticker + //stopChan chan struct{} + sem chan struct{} + + pauseChan chan struct{} + resumeChan chan struct{} + + runningMode int32 + + doneChan chan struct{} + + shouldStop shouldStopFunc +} + +func NewJobExecutor(queue JobsInterface, maxParallel int, interval time.Duration, shouldStopFunc shouldStopFunc) *jobExecutor { + + execCtx, cancelFunc := context.WithCancel(context.Background()) + + return &jobExecutor{ + Queue: queue, + Ctx: execCtx, + CancelFunc: cancelFunc, + MaxParallel: maxParallel, + interval: interval, + Ticker: nil, + cleanupTimer: nil, + //stopChan: make(chan struct{}), + sem: make(chan struct{}, maxParallel), + + pauseChan: make(chan struct{}), + resumeChan: make(chan struct{}), + runningMode: MODE_STOPPED, + shouldStop: shouldStopFunc, + } +} + +func (je *jobExecutor) executeJobs() { + + je.mutex.Lock() + je.Ticker = time.NewTicker(je.interval) + je.cleanupTimer = time.NewTicker(10 * je.interval) + je.setRunningFlag(MODE_RUNNING) + je.mutex.Unlock() + + for { + select { + case <-je.Ticker.C: + + if !je.IsPaused() { + je.runJobs() + if je.shouldStop != nil && je.shouldStop(je) { + _ = je.Stop() + return + } + } + + case <-je.cleanupTimer.C: + je.Queue.Cleanup() + case <-je.pauseChan: + je.setRunningFlag(MODE_PAUSED) + case <-je.resumeChan: + je.setRunningFlag(MODE_RUNNING) + case <-je.Ctx.Done(): + + je.mutex.Lock() + je.Ticker.Stop() + je.cleanupTimer.Stop() + je.setRunningFlag(MODE_STOPPED) + je.mutex.Unlock() + + return + } + } + +} + +func (je *jobExecutor) runJobs() { + // Get jobs that can be executed + jobs := je.Queue.GetExecutableJobs() + if len(jobs) == 0 { + return + } + + var wg sync.WaitGroup + + // Map to track the status of executed jobs + jobStatus := make(map[JobIDType]bool) + for _, job := range jobs { + jobStatus[job.GetId()] = false + } + + // Channel for coordinating job execution + jobChan := make(chan ReadOnlyJob) + + // Determine the number of jobs to be sent to the channel + for _, job := range jobs { + dependencies := job.GetDependencies() + canRun := true + for _, dependencyID := range dependencies { + // Check if dependencies have already been executed + if !jobStatus[dependencyID] { + canRun = false + break + } + } + + if canRun { + wg.Add(1) // Increment the WaitGroup counter for each job to be sent + } + } + + // Loop through all jobs and execute only if dependencies are met + go func() { + for _, job := range jobs { + dependencies := job.GetDependencies() + canRun := true + for _, dependencyID := range dependencies { + // Check if dependencies have already been executed + if !jobStatus[dependencyID] { + canRun = false + break + } + } + + if canRun { + // Send the job to the job channel + jobChan <- job + } + } + close(jobChan) // Close the channel after all jobs have been sent + }() + + maxParallel := je.MaxParallel + if len(jobs) < maxParallel { + maxParallel = len(jobs) + } + + // Execute jobs in parallel + for i := 0; i < maxParallel; i++ { + go func() { + for job := range jobChan { + job.Run(je.Ctx) + // Mark the job as executed + jobStatus[job.GetId()] = true + wg.Done() + } + }() + } + + // Wait for all jobs to complete before returning + wg.Wait() +} + +func (je *jobExecutor) Start() error { + + if je.IsRunning() { + return ErrAlreadyStarted + } + + if je.IsPaused() { + return je.Resume() + } + + go je.executeJobs() + return nil +} + +func (je *jobExecutor) Stop() error { + + if !je.IsRunning() && !je.IsPaused() { + return ErrAlreadyStopped + } + + je.CancelFunc() + return nil +} + +func (je *jobExecutor) Pause() error { + + if je.IsPaused() { + return ErrAlreadyPaused + } + + je.mutex.Lock() + je.pauseChan <- struct{}{} + je.setRunningFlag(MODE_PAUSED) + je.mutex.Unlock() + + return nil + +} + +func (je *jobExecutor) Resume() error { + + if !je.IsPaused() { + return ErrNotPaused + } + + je.mutex.Lock() + je.resumeChan <- struct{}{} + je.setRunningFlag(MODE_RUNNING) + je.mutex.Unlock() + + return nil + +} + +func (je *jobExecutor) IsPaused() bool { + return atomic.LoadInt32(&je.runningMode) == MODE_PAUSED +} + +func (je *jobExecutor) IsRunning() bool { + return atomic.LoadInt32(&je.runningMode) == MODE_RUNNING +} + +func (je *jobExecutor) setRunningFlag(mode int32) { + atomic.StoreInt32(&je.runningMode, mode) +} diff --git a/executor_test.go b/executor_test.go new file mode 100644 index 0000000000000000000000000000000000000000..f38f6aa07ea26cc1f6ef6317fb1d0e30896f10d0 --- /dev/null +++ b/executor_test.go @@ -0,0 +1,137 @@ +package jobqueue + +import ( + "context" + "testing" + "time" +) + +type TestRunnable struct{} + +func (r TestRunnable) Run(ctx context.Context) (int, any, error) { + // Dummy run implementation + return 0, nil, nil +} + +func TestJobExecutorStartAndStop(t *testing.T) { + queue := NewJobs() + executor := NewJobExecutor(queue, 1, time.Millisecond*50, nil) + + // Fügen Sie einen Job zur Warteschlange hinzu + err := queue.AddJob(JobSpecification{ + Id: "test-job", + Priority: 1, + Concurrency: 1, + }, TestRunnable{}) + + if err != nil { + t.Fatalf("Failed to add job: %v", err) + } + + err = executor.Start() + if err != nil { + t.Errorf("Failed to start executor: %v", err) + } + + for i := 0; i < 10; i++ { + time.Sleep(time.Millisecond * 100) + if executor.IsRunning() { + break + } + } + + if !executor.IsRunning() { + t.Errorf("Executor should be running") + } + + err = executor.Stop() + for i := 0; i < 10; i++ { + time.Sleep(time.Millisecond * 100) + if !executor.IsRunning() { + break + } + } + + if err != nil { + t.Errorf("Failed to stop executor: %v", err) + } + + if executor.IsRunning() { + t.Errorf("Executor should not be running") + } +} + +func TestJobExecutorPauseAndResume(t *testing.T) { + queue := NewJobs() + executor := NewJobExecutor(queue, 1, time.Millisecond*50, nil) + + // Fügen Sie einen Job zur Warteschlange hinzu + err := queue.AddJob(JobSpecification{ + Id: "test-job", + Priority: 1, + Concurrency: 1, + }, TestRunnable{}) + if err != nil { + t.Fatalf("Failed to add job: %v", err) + } + + err = executor.Start() + if err != nil { + t.Errorf("Failed to start executor: %v", err) + } + + for i := 0; i < 10; i++ { + time.Sleep(time.Millisecond * 100) + if executor.IsRunning() { + break + } + } + + err = executor.Pause() + if err != nil { + t.Errorf("Failed to pause executor: %v", err) + } + + for i := 0; i < 10; i++ { + time.Sleep(time.Millisecond * 100) + if executor.IsPaused() { + break + } + } + + if !executor.IsPaused() { + t.Errorf("Executor should be paused") + } + + err = executor.Resume() + if err != nil { + t.Errorf("Failed to resume executor: %v", err) + } + + for i := 0; i < 10; i++ { + time.Sleep(time.Millisecond * 100) + if executor.IsRunning() { + break + } + } + + if executor.IsPaused() { + t.Errorf("Executor should not be paused") + } + + err = executor.Stop() + if err != nil { + t.Errorf("Failed to stop executor: %v", err) + } + + for i := 0; i < 10; i++ { + time.Sleep(time.Millisecond * 100) + if !executor.IsRunning() { + break + } + } + + if executor.IsRunning() { + t.Errorf("Executor should not be running") + } +} diff --git a/exit-codes.go b/exit-codes.go new file mode 100644 index 0000000000000000000000000000000000000000..c0efd01f8630b5fc7271bf286f250a3a235448d2 --- /dev/null +++ b/exit-codes.go @@ -0,0 +1,8 @@ +package jobqueue + +const ( + SuccessExitCode = iota + DefaultErrorExitCode + TimeoutExitCode + RunnableTerminatedExitCode +) diff --git a/go.mod b/go.mod new file mode 100644 index 0000000000000000000000000000000000000000..adcd800a20576cce2dc6d96ab51fa90bceba7385 --- /dev/null +++ b/go.mod @@ -0,0 +1,21 @@ +module gitlab.schukai.com/oss/libraries/go/services/job-queues.git + +go 1.20 + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/go-ole/go-ole v1.2.6 // indirect + github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect + github.com/shirou/gopsutil v3.21.11+incompatible // indirect + github.com/shirou/gopsutil/v3 v3.23.9 // indirect + github.com/shoenig/go-m1cpu v0.1.6 // indirect + github.com/stretchr/testify v1.8.4 // indirect + github.com/tklauser/go-sysconf v0.3.12 // indirect + github.com/tklauser/numcpus v0.6.1 // indirect + github.com/yusufpapurcu/wmi v1.2.3 // indirect + golang.org/x/sys v0.12.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000000000000000000000000000000000000..9bed6c8828575311fcfb49fbb10b2c144c3fbd97 --- /dev/null +++ b/go.sum @@ -0,0 +1,49 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= +github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= +github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= +github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E= +github.com/robfig/cron/v3 v3.0.0/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= +github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= +github.com/shirou/gopsutil/v3 v3.23.9 h1:ZI5bWVeu2ep4/DIxB4U9okeYJ7zp/QLTO4auRb/ty/E= +github.com/shirou/gopsutil/v3 v3.23.9/go.mod h1:x/NWSb71eMcjFIO0vhyGW5nZ7oSIgVjrCnADckb85GA= +github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= +github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= +github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= +github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= +github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= +github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= +github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw= +github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/gob.go b/gob.go new file mode 100644 index 0000000000000000000000000000000000000000..ec53366cfaecf2ef3077e8b529ced33fd4376031 --- /dev/null +++ b/gob.go @@ -0,0 +1,47 @@ +package jobqueue + +import ( + "bytes" + "encoding/gob" + "sync" +) + +// SerializeJob serializes JobData and JobSpecification into a byte slice +func (j *job) SerializeJob() ([]byte, error) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + + if err := enc.Encode(j.JobData); err != nil { + return nil, err + } + if err := enc.Encode(j.JobSpecification); err != nil { + return nil, err + } + + return buf.Bytes(), nil +} + +// DeserializeJob deserializes a byte slice into a job struct +func DeserializeJob(data []byte) (*job, error) { + var jobData JobData + var jobSpec JobSpecification + + buf := bytes.NewBuffer(data) + dec := gob.NewDecoder(buf) + + if err := dec.Decode(&jobData); err != nil { + return nil, err + } + if err := dec.Decode(&jobSpec); err != nil { + return nil, err + } + + job := &job{ + JobData: jobData, + JobSpecification: jobSpec, + mu: sync.Mutex{}, + //ctx: context.Background(), + } + + return job, nil +} diff --git a/gob_test.go b/gob_test.go new file mode 100644 index 0000000000000000000000000000000000000000..170ec9ade9f53dc2c33935073afce2807595e2b5 --- /dev/null +++ b/gob_test.go @@ -0,0 +1,29 @@ +package jobqueue + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestSerializationAndDeserialization(t *testing.T) { + // Create a job instance and populate its fields + originalJob := &job{ + JobSpecification: JobSpecification{ + Id: "testJob", + Priority: 1, + }, + } + + // Serialize the job to a byte slice + data, err := originalJob.SerializeJob() + assert.Nil(t, err) + + // Deserialize the byte slice back to a job struct + deserializedJob, err := DeserializeJob(data) + assert.Nil(t, err) + + // Compare the original and deserialized jobs + assert.Equal(t, originalJob.Id, deserializedJob.Id) + assert.Equal(t, originalJob.Priority, deserializedJob.Priority) + +} diff --git a/issue-1_test.go b/issue-1_test.go new file mode 100644 index 0000000000000000000000000000000000000000..9669fbc7183a565458cdf49a994fdd9415b1e307 --- /dev/null +++ b/issue-1_test.go @@ -0,0 +1,135 @@ +package jobqueue + +import ( + "errors" + "testing" +) + +// TestJobQueueWithDependencies +// Test if jobs are sorted by priority and dependencies +// +// graph TD +// +// A[Job1-PriorityDefault] +// B[Job2-PriorityHigh] +// C[Job3-PriorityLow] +// D[Job4-PriorityCritical] +// +// A --> B +// A --> C +// C --> D +func TestRunJobQueueWithDependencies(t *testing.T) { + jq := NewJobs() + if jq == nil { + t.Errorf("NewJobs returned nil") + } + + // create new jobs + job1 := newJob(JobSpecification{Id: "1"}) // default priority is PriorityDefault + job2 := newJob(JobSpecification{Id: "2"}) + job2.JobSpecification.Priority = PriorityHigh + job3 := newJob(JobSpecification{Id: "3"}) + job3.JobSpecification.Priority = PriorityLow + job4 := newJob(JobSpecification{Id: "4"}) + job4.JobSpecification.Priority = PriorityCritical + + job3.JobSpecification.Dependencies = []JobIDType{"1"} + job4.JobSpecification.Dependencies = []JobIDType{"3"} + job2.JobSpecification.Dependencies = []JobIDType{"1"} + + _ = job1 + +} + +func TestIssue1NewJobQueue(t *testing.T) { + jq := NewJobs() + if jq == nil { + t.Errorf("NewJobs returned nil") + } + + // create new jobs + job1 := JobSpecification{Id: "1"} // default priority is PriorityDefault + job2 := JobSpecification{Id: "2", Priority: PriorityHigh} + job3 := JobSpecification{Id: "3", Priority: PriorityLow} + job4 := JobSpecification{Id: "4", Priority: PriorityCritical} + + // add jobs to jobs + if err := jq.AddJob(job1, nil); err != nil { + t.Errorf("Failed to add job: %v", err) + } + + if err := jq.AddJob(job1, nil); err == nil || !errors.Is(err, ErrJobAlreadyExists) { + t.Errorf("Expected ErrJobAlreadyExists, got %v", err) + } + + if err := jq.AddJob(job2, nil); err != nil { + t.Errorf("Failed to add job: %v", err) + } + + // add job3 and job4 to jobs + if err := jq.AddJob(job3, nil); err != nil { + t.Errorf("Failed to add job: %v", err) + } + + if err := jq.AddJob(job4, nil); err != nil { + t.Errorf("Failed to add job: %v", err) + } + + // check if jobs are in jobs + if len(jq.jobs) != 4 { + t.Errorf("Failed to add all jobs to jobs") + } + +} + +// TestJobQueueWithDependencies +// Test if jobs are sorted by priority and dependencies +// +// graph TD +// +// A[Job1-PriorityDefault] +// B[Job2-PriorityHigh] +// C[Job3-PriorityLow] +// D[Job4-PriorityCritical] +// +// A --> B +// A --> C +// C --> D +func TestJobQueueWithDependencies(t *testing.T) { + jq := NewJobs() + if jq == nil { + t.Errorf("NewJobs returned nil") + } + + job1 := JobSpecification{Id: "1"} // default priority is PriorityDefault + job2 := JobSpecification{Id: "2", Priority: PriorityHigh, Dependencies: []JobIDType{"1"}} + job3 := JobSpecification{Id: "3", Priority: PriorityLow, Dependencies: []JobIDType{"1"}} + job4 := JobSpecification{Id: "4", Priority: PriorityCritical, Dependencies: []JobIDType{"3"}} + + // set dependencies + // job1 depends on nothing + // job2 depends on job1 + // job3 depends on job1 + // job4 depends on job3 + // add jobs to jobs + if err := jq.AddJob(job1, nil); err != nil { + t.Errorf("Failed to add job: %v", err) + } + + if err := jq.AddJob(job2, nil); err != nil { + t.Errorf("Failed to add job: %v", err) + } + + if err := jq.AddJob(job3, nil); err != nil { + t.Errorf("Failed to add job: %v", err) + } + if err := jq.AddJob(job4, nil); err != nil { + t.Errorf("Failed to add job: %v", err) + } + + // check if jobs are in jobs + if len(jq.jobs) != 4 { + t.Errorf("Failed to add all jobs to jobs") + } + +} diff --git a/job-log.go b/job-log.go new file mode 100644 index 0000000000000000000000000000000000000000..a0be1c7457572cce2927c943ffa8240bd4a158da --- /dev/null +++ b/job-log.go @@ -0,0 +1,26 @@ +package jobqueue + +import ( + "time" +) + +type JobLog struct { + ProcessID int `json:"process_id"` + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` + ExitCode int `json:"exit_code"` + Result any `json:"output"` + ResourceUsage struct { + Memory uint64 `json:"memory"` + CPU struct { + Percentage float64 `json:"percentage"` + } `json:"cpu"` + } `json:"resource_usage"` + IO struct { + Disk int64 `json:"disk"` + Network int64 `json:"network"` + } `json:"io"` + ErrorMsg string `json:"error_msg"` + IsSuccessful bool `json:"is_successful"` + Metadata map[string]string `json:"metadata"` +} diff --git a/job-run.go b/job-run.go new file mode 100644 index 0000000000000000000000000000000000000000..c468404a40f79c9dbc4cbdece05d0a0364e88bd9 --- /dev/null +++ b/job-run.go @@ -0,0 +1,202 @@ +package jobqueue + +import ( + "context" + "fmt" + "os" + "time" +) + +func (j *job) handleFailover() error { + if j.failover != nil { + return j.failover() + } + return nil +} + +func (j *job) Run(ctx context.Context) { + + var err error + + defer func() { + if j.Status == JobRunning { + j.Status = JobPending + } + }() + + j.mu.Lock() + defer j.mu.Unlock() + + // Check for resource limits + if j.exceedsResourceLimits() { + j.updateStats(0, ErrResourceLimitExceeded, 0) + return + } + + // Execute fail-over logic if specified + err = j.handleFailover() + if err != nil { + j.updateStats(0, err, 0) + return + } + + // Add timeout if specified + if j.Timeout > 0 { + var cancelFunc context.CancelFunc + ctx, cancelFunc = context.WithTimeout(ctx, j.Timeout) + defer cancelFunc() + } + + maxRetries := j.Retries + if maxRetries == 0 { + maxRetries = 1 + } + + // Run job + err = nil + for i := 0; i < maxRetries; i++ { + err = j.singleRun(ctx) + if err == nil { + break + } + + if j.RetryDelay > 0 { + time.Sleep(j.RetryDelay) + } + + } + + // Update job status + j.LastRun = time.Now() + + // calculate next run + if j.scheduleImpl != nil { + j.NextRun = j.scheduleImpl.Next(j.LastRun) + } + + if err != nil { + j.Status = JobFailed + return + } + + if j.MaxRuns > 0 && j.Stats.RunCount >= j.MaxRuns { + j.Status = JobFinished + return + } + + j.Status = JobPending + +} + +func (j *job) singleRun(ctx context.Context) (err error) { + startTime := time.Now() + + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("job runnable resulted in panic: %v", r) + j.Status = JobFailed + } + + }() + + if j.runnable == nil { + return ErrNoRunDefined + } + + j.Status = JobRunning + exitCode, result, err := j.runnable.Run(ctx) + + // Log and stats update + logEntry := JobLog{ + ProcessID: os.Getpid(), + StartTime: startTime, + EndTime: time.Now(), + ExitCode: exitCode, + ErrorMsg: "", + Result: result, + IsSuccessful: true, + Metadata: nil, + } + + if exitCode != 0 { + logEntry.IsSuccessful = false + logEntry.ExitCode = exitCode + } + + if err != nil { + logEntry.ErrorMsg = err.Error() + logEntry.IsSuccessful = false + } + + j.addLogEntry(logEntry) + j.updateStats(exitCode, err, time.Since(startTime)) + + return +} + +func (j *job) addLogEntry(logEntry JobLog) { + j.Logs = append(j.Logs, logEntry) + + for _, hook := range j.telemetryHooks { + go hook(&logEntry) + } + + if j.MaxLogEntries == 0 { + return + } + + // Überprüfen, ob die maximale Anzahl von Log-Einträgen überschritten wurde + if len(j.Logs) > j.MaxLogEntries { + // Log-Einträge rotieren und an das Logger-Objekt senden + for i := 0; i < len(j.Logs)-j.MaxLogEntries; i++ { + if j.Logger != nil { + _, err := (*j.Logger).Write(j.Logs[i]) + if err != nil { + continue + } + } + } + + j.Logs = j.Logs[len(j.Logs)-j.MaxLogEntries:] + } +} + +func (j *job) exceedsResourceLimits() bool { + + currentCPU := GetCpuUsage() + currentMemory := GetMemoryUsage() + + if j.ResourceLimits.CPULimit != 0 && currentCPU > j.ResourceLimits.CPULimit { + return true + } + if j.ResourceLimits.MemoryLimit != 0 && currentMemory > j.ResourceLimits.MemoryLimit { + return true + } + return false +} + +func (j *job) updateStats(exitCode int, err error, duration time.Duration) { + j.Stats.RunCount++ + if err == nil { + j.Stats.SuccessCount++ + } else { + j.Stats.ErrorCount++ + j.Stats.LastErrorCode = exitCode + } + + // Aktualisieren der Zeitmetriken + j.Stats.TimeMetrics.TotalRunTime += duration + if j.Stats.RunCount == 1 { + j.Stats.TimeMetrics.MinRunTime = duration + j.Stats.TimeMetrics.MaxRunTime = duration + } else { + if duration < j.Stats.TimeMetrics.MinRunTime { + j.Stats.TimeMetrics.MinRunTime = duration + } + if duration > j.Stats.TimeMetrics.MaxRunTime { + j.Stats.TimeMetrics.MaxRunTime = duration + } + } + + j.Stats.TimeMetrics.AvgRunTime = j.Stats.TimeMetrics.TotalRunTime / time.Duration(j.Stats.RunCount) +} diff --git a/job-stat.go b/job-stat.go new file mode 100644 index 0000000000000000000000000000000000000000..136ff9918a9a881fbecaab3a9b49742d7900fcab --- /dev/null +++ b/job-stat.go @@ -0,0 +1,40 @@ +package jobqueue + +import ( + "time" +) + +type JobStats struct { + RunCount int `json:"run_count"` + SuccessCount int `json:"success_count"` + ErrorCount int `json:"error_count"` + TimeMetrics struct { + AvgRunTime time.Duration `json:"avg"` + MaxRunTime time.Duration `json:"max"` + MinRunTime time.Duration `json:"min"` + TotalRunTime time.Duration `json:"total"` + } `json:"time_metrics"` + LastErrorCode int `json:"last_error_code"` + LastSuccessCode int `json:"last_success_code"` + PriorityEscalates int `json:"priority_escalates"` + ResourceUsage struct { + CPU struct { + Avg float64 `json:"avg"` + StdDev float64 `json:"std_dev"` + } `json:"cpu"` + Memory struct { + Avg int `json:"avg"` + StdDev int `json:"std_dev"` + } `json:"memory"` + IO struct { + Disk struct { + Avg int64 `json:"avg"` + StdDev int64 `json:"std_dev"` + } `json:"disk"` + Network struct { + Avg int64 `json:"avg"` + StdDev int64 `json:"std_dev"` + } `json:"network"` + } `json:"io"` + } `json:"resource_usage"` +} diff --git a/job-status.go b/job-status.go new file mode 100644 index 0000000000000000000000000000000000000000..03756fe05125630e1f8eab7dd851805d021c1c79 --- /dev/null +++ b/job-status.go @@ -0,0 +1,17 @@ +package jobqueue + +// JobStatus is the status of a job +type JobStatus int + +const ( + JobPending JobStatus = iota + JobScheduled + JobRunning + JobFailed + JobFinished +) + +// String returns the string representation of a JobStatus +func (js JobStatus) String() string { + return [...]string{"Pending", "Scheduled", "Running", "Failed", "Finished"}[js] +} diff --git a/job.go b/job.go new file mode 100644 index 0000000000000000000000000000000000000000..d380aaf5a72fd37cd338ada8f29e1410c7dbf848 --- /dev/null +++ b/job.go @@ -0,0 +1,192 @@ +package jobqueue + +import ( + "context" + "github.com/robfig/cron/v3" + "sync" + "time" +) + +type JobIDType string + +func (j JobIDType) String() string { + return string(j) +} + +func newJob(spec JobSpecification) *job { + j := &job{ + JobSpecification: spec, + mu: sync.Mutex{}, + } + + if spec.Schedule != "" { + schedule, err := cron.ParseStandard(spec.Schedule) + if err != nil { + panic(err) + } + + j.scheduleImpl = schedule + } + + if spec.Priority == 0 { + j.Priority = PriorityDefault + } + + return j +} + +type ResourceLimits struct { + CPULimit float64 `json:"cpu_limit,omitempty"` + MemoryLimit uint64 `json:"memory_limit,omitempty"` +} + +type JobSpecification struct { + Id JobIDType `json:"id,omitempty"` + Priority int `json:"priority,omitempty"` + MaxRuns int `json:"max_runs,omitempty"` + Concurrency int `json:"concurrency,omitempty"` + Schedule string `json:"schedule,omitempty"` + Timeout time.Duration `json:"timeout,omitempty"` + Retries int `json:"retries,omitempty"` + RetryDelay time.Duration `json:"retry_delay,omitempty"` + ResourceLimits ResourceLimits `json:"resource_limits"` + Dependencies []JobIDType `json:"dependencies,omitempty"` + Tags []string `json:"tags,omitempty"` + Metadata map[string]interface{} `json:"metadata,omitempty"` +} + +type JobData struct { + Status JobStatus `json:"status,omitempty"` + + LastRun time.Time `json:"last_run"` + NextRun time.Time `json:"next_run"` + + Stats JobStats `json:"stats"` + Logs []JobLog `json:"logs,omitempty"` + + Logger *Logger + MaxLogEntries int +} + +// job contains both serializable data and functional fields +type job struct { + JobData `json:"data,omitempty"` + JobSpecification `json:"spec,omitempty"` + + runnable Runnable + scheduleImpl cron.Schedule + telemetryHooks []func(*JobLog) + failover func() error + mu sync.Mutex +} + +type ReadOnlyJob interface { + GetId() JobIDType + GetPriority() int + //GetExclusive() bool + GetMaxRuns() int + GetConcurrency() int + GetLastRun() time.Time + GetNextRun() time.Time + GetRunnable() Runnable + GetSchedule() cron.Schedule + GetDependencies() []JobIDType + GetDependents() []JobIDType + GetRuns() int + + GetLogs() []JobLog + + GetStats() JobStats + + Run(context.Context) +} + +func (j *job) GetStats() JobStats { + j.mu.Lock() + defer j.mu.Unlock() + + return j.Stats +} + +func (j *job) GetLogs() []JobLog { + j.mu.Lock() + defer j.mu.Unlock() + return j.Logs +} + +func (j *job) GetId() JobIDType { + j.mu.Lock() + defer j.mu.Unlock() + return j.Id +} + +func (j *job) GetPriority() int { + j.mu.Lock() + defer j.mu.Unlock() + return j.Priority +} + +func (j *job) GetMaxRuns() int { + j.mu.Lock() + defer j.mu.Unlock() + + return j.MaxRuns +} + +func (j *job) GetConcurrency() int { + j.mu.Lock() + defer j.mu.Unlock() + + return j.Concurrency +} + +func (j *job) GetLastRun() time.Time { + j.mu.Lock() + defer j.mu.Unlock() + + return j.LastRun +} + +func (j *job) GetNextRun() time.Time { + j.mu.Lock() + defer j.mu.Unlock() + + return j.NextRun +} + +func (j *job) GetRunnable() Runnable { + j.mu.Lock() + defer j.mu.Unlock() + + return j.runnable +} + +func (j *job) GetSchedule() cron.Schedule { + j.mu.Lock() + defer j.mu.Unlock() + + return j.scheduleImpl +} + +func (j *job) GetDependencies() []JobIDType { + j.mu.Lock() + defer j.mu.Unlock() + + return j.Dependencies +} + +func (j *job) GetDependents() []JobIDType { + + j.mu.Lock() + defer j.mu.Unlock() + + return j.Dependencies +} + +func (j *job) GetRuns() int { + + j.mu.Lock() + defer j.mu.Unlock() + + return j.Stats.RunCount +} diff --git a/job_test.go b/job_test.go new file mode 100644 index 0000000000000000000000000000000000000000..597cbe0f2d3101ee0077aa24df4673e803863f51 --- /dev/null +++ b/job_test.go @@ -0,0 +1,164 @@ +package jobqueue + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +type mockRunnable struct { + shouldFail int + sleep time.Duration +} + +func (r *mockRunnable) Run(ctx context.Context) (int, any, error) { + // Create a new context with a timeout + + done := make(chan struct{}) + var err error + + go func() { + if r.sleep > 0 { + // Simulate long-running job + time.Sleep(r.sleep) + } + if r.shouldFail > 0 { + r.shouldFail-- + err = errors.New("failed") + } + time.Sleep(1 * time.Millisecond) + close(done) + }() + + // Wait until either the job is done or the timeout expires + select { + case <-done: + if err != nil { + return DefaultErrorExitCode, nil, err + } + return SuccessExitCode, nil, nil + case <-ctx.Done(): + + if ctx.Err() == context.DeadlineExceeded { + // It was a timeout + return TimeoutExitCode, nil, fmt.Errorf("timeout") + } + + return DefaultErrorExitCode, nil, ctx.Err() + } +} + +func TestJobResourceLimitExceeded(t *testing.T) { + + _ = StartResourceMonitoring(1 * time.Second) + defer resetResourceStatsForTesting() + + j := &job{ + //ctx: context.Background(), + runnable: &mockRunnable{shouldFail: 0}, + JobSpecification: JobSpecification{ + ResourceLimits: ResourceLimits{ + CPULimit: 0.1, + MemoryLimit: 1, + }, + }, + } + j.Run(context.Background()) + assert.NotNil(t, j.GetLastRun()) +} + +func TestJobSuccessful(t *testing.T) { + j := &job{ + runnable: &mockRunnable{shouldFail: 0}, + // ctx: context.Background(), + } + + j.Run(context.Background()) + assert.NotNil(t, j.GetLastRun()) +} + +func TestJobFailed(t *testing.T) { + j := &job{runnable: &mockRunnable{shouldFail: 1}} + j.Run(context.Background()) + assert.NotNil(t, j.GetLastRun()) +} + +func TestJobRetry(t *testing.T) { + j := &job{ + runnable: &mockRunnable{shouldFail: 1}, + JobSpecification: JobSpecification{ + Retries: 1, + }, + } + j.Run(context.Background()) + + assert.NotNil(t, j.GetLastRun()) + assert.Equal(t, 1, j.Stats.ErrorCount) +} + +func TestJobTimeout(t *testing.T) { + j := &job{ + runnable: &mockRunnable{ + shouldFail: 0, + sleep: 4 * time.Millisecond, + }, + JobSpecification: JobSpecification{ + Timeout: 1 * time.Millisecond, + }, + } + j.Run(context.Background()) + assert.NotNil(t, j.GetLastRun()) +} + +func TestNewJobFromJSON(t *testing.T) { + jsonStr := `{"id":"testJob","Priority":1}` + job, err := NewJobFromJSON(jsonStr) + + assert.Nil(t, err) + assert.Equal(t, "testJob", job.Id.String()) + assert.Equal(t, 1, job.Priority) +} + +func TestToFromJSON(t *testing.T) { + j := job{ + JobSpecification: JobSpecification{ + Id: "testJob", + Priority: 1, + }, + //ctx: context.Background(), + } + + jsonStr, err := j.ToJSON() + assert.Nil(t, err) + + var job2 job + err = job2.FromJSON(jsonStr) + assert.Nil(t, err) + + assert.Equal(t, "testJob", job2.Id.String()) + +} + +func TestUnmarshalJSON(t *testing.T) { + jsonStr := `{"data":{"last_run":"0001-01-01T00:00:00Z","next_run":"0001-01-01T00:00:00Z","stats":{"run_count":0,"success_count":0,"error_count":0,"time_metrics":{"avg":0,"max":0,"min":0,"total":0},"last_error_code":0,"last_success_code":0,"priority_escalates":0,"resource_usage":{"cpu":{"avg":0,"std_dev":0},"memory":{"avg":0,"std_dev":0},"io":{"disk":{"avg":0,"std_dev":0},"network":{"avg":0,"std_dev":0}}}}},"spec":{"id":"testJob","priority":1,"resource_limits":{}}}` + var job job + err := json.Unmarshal([]byte(jsonStr), &job) + + assert.Nil(t, err) + assert.Equal(t, "testJob", job.Id.String()) + assert.Equal(t, 1, job.Priority) +} + +func TestNewJob(t *testing.T) { + job := newJob(JobSpecification{ + Id: "testJob", + }) + assert.NotNil(t, job) + assert.Equal(t, "testJob", job.Id.String()) + assert.Equal(t, PriorityDefault, job.Priority) + +} diff --git a/jobs.go b/jobs.go new file mode 100644 index 0000000000000000000000000000000000000000..0a22b53764be89c3c1ec7a129b5e3d23ea310fc1 --- /dev/null +++ b/jobs.go @@ -0,0 +1,240 @@ +package jobqueue + +import ( + "sync" + "time" +) + +type JobsInterface interface { + GetJobs() map[JobIDType]ReadOnlyJob + + GetExecutableJobs() map[JobIDType]ReadOnlyJob + + AddJob(jobSpec JobSpecification, runnable Runnable) error + + RemoveJob(id JobIDType) (bool, error) + + GetJobStatus(id JobIDType) (JobStatus, error) + + Cleanup() + + GetFinishedJobs() map[JobIDType]ReadOnlyJob + + GetFinishedJob(id JobIDType) ReadOnlyJob + + RemoveFinishedJob(id JobIDType) (bool, error) + + JobExists(id JobIDType) bool + + GetJob(id JobIDType) ReadOnlyJob + + GetJobsCount() int +} + +type jobs struct { + jobs map[JobIDType]*job + finishedJobs map[JobIDType]*job + mutex sync.Mutex +} + +// compile time check if jobs implements JobsInterface +var _ JobsInterface = (*jobs)(nil) + +func (jq *jobs) GetJobsCount() int { + jq.mutex.Lock() + defer jq.mutex.Unlock() + + return len(jq.jobs) +} + +func (jq *jobs) Cleanup() { + jq.mutex.Lock() + defer jq.mutex.Unlock() + + for id, job := range jq.jobs { + if job.Status == JobFinished { + jq.finishedJobs[id] = job + + delete(jq.jobs, id) + } + } +} + +func (jq *jobs) GetFinishedJobs() map[JobIDType]ReadOnlyJob { + jq.mutex.Lock() + defer jq.mutex.Unlock() + + jobs := make(map[JobIDType]ReadOnlyJob) + for id, job := range jq.finishedJobs { + jobs[id] = job // Implizites Casting zu ReadOnlyJob + } + + return jobs +} + +func (jq *jobs) GetFinishedJob(id JobIDType) ReadOnlyJob { + jq.mutex.Lock() + defer jq.mutex.Unlock() + + if _, exists := jq.finishedJobs[id]; !exists { + return nil + } + + return jq.finishedJobs[id] + +} + +func (jq *jobs) RemoveFinishedJob(id JobIDType) (bool, error) { + jq.mutex.Lock() + defer jq.mutex.Unlock() + + if _, exists := jq.finishedJobs[id]; !exists { + return false, ErrJobNotFound + } + + // Update internal data structures. + delete(jq.finishedJobs, id) + return true, nil +} + +// GetJobs returns a map of all jobs. +func (jq *jobs) GetJobs() map[JobIDType]ReadOnlyJob { + jq.mutex.Lock() + defer jq.mutex.Unlock() + + jobs := make(map[JobIDType]ReadOnlyJob) + for id, job := range jq.jobs { + jobs[id] = job // Implizites Casting zu ReadOnlyJob + } + + return jobs +} + +// GetJobs returns a map of all jobs. +func (jq *jobs) GetExecutableJobs() map[JobIDType]ReadOnlyJob { + jq.mutex.Lock() + defer jq.mutex.Unlock() + + jobs := make(map[JobIDType]ReadOnlyJob) + + tempJobs := make(map[JobIDType]*job) + for _, job := range jq.jobs { + + if job.Status != JobPending { + continue + } + + if job.NextRun.After(time.Now()) { + continue + } + + tempJobs[job.Id] = job + } + + sortedJobIDs, err := topologicalSortJobs(tempJobs) + if err != nil { + return nil + } + + for _, id := range sortedJobIDs { + job := jq.jobs[id] + job.Status = JobScheduled + jobs[id] = jq.jobs[id] + } + + return jobs +} + +func (jq *jobs) JobExists(id JobIDType) bool { + jq.mutex.Lock() + defer jq.mutex.Unlock() + + if _, exists := jq.jobs[id]; !exists { + return false + } + + return true +} + +func (jq *jobs) GetJob(id JobIDType) ReadOnlyJob { + jq.mutex.Lock() + defer jq.mutex.Unlock() + + if _, exists := jq.jobs[id]; !exists { + return nil + } + + return jq.jobs[id] + +} + +// NewJobs creates a new job queue. +func NewJobs() *jobs { + + jq := &jobs{ + jobs: make(map[JobIDType]*job), + finishedJobs: make(map[JobIDType]*job), + mutex: sync.Mutex{}, + } + + return jq +} + +// AddJob adds a new job to the queue. +func (jq *jobs) AddJob(jobSpec JobSpecification, runnable Runnable) error { + jq.mutex.Lock() + defer jq.mutex.Unlock() + + job := newJob(jobSpec) + job.runnable = runnable + + if _, exists := jq.jobs[job.Id]; exists { + return ErrJobAlreadyExists + } + + for _, dep := range job.Dependencies { + if _, exists := jq.jobs[dep]; !exists { + return ErrUnknownDependency + } + } + + jq.jobs[job.Id] = job + + return nil +} + +// RemoveJob removes a job from the queue. +func (jq *jobs) RemoveJob(id JobIDType) (bool, error) { + jq.mutex.Lock() + defer jq.mutex.Unlock() + + if _, exists := jq.jobs[id]; !exists { + return false, ErrJobNotFound + } + + // check if job is a dependency of another job + for _, job := range jq.jobs { + for _, dep := range job.Dependencies { + if dep == id { + return false, ErrJobIsDependency + } + } + } + + // Update internal data structures. + delete(jq.jobs, id) + return true, nil +} + +// GetJobStatus returns the status of a job. +func (jq *jobs) GetJobStatus(id JobIDType) (JobStatus, error) { + jq.mutex.Lock() + defer jq.mutex.Unlock() + + if _, exists := jq.jobs[id]; !exists { + return JobStatus(0), ErrJobNotFound + } + + return jq.jobs[id].Status, nil + +} diff --git a/jobs_test.go b/jobs_test.go new file mode 100644 index 0000000000000000000000000000000000000000..ef74250dc8d933934a1d126d5bf8dcaf806a1dd8 --- /dev/null +++ b/jobs_test.go @@ -0,0 +1,44 @@ +package jobqueue + +import ( + "testing" +) + +func TestNewJobQueue(t *testing.T) { + jq := NewJobs() + if jq == nil { + t.Errorf("NewJobs returned nil") + } +} + +func TestAddJob(t *testing.T) { + jq := NewJobs() + + err := jq.AddJob(JobSpecification{ + Id: "1", + Priority: 1, + //Exclusive: true, + MaxRuns: 3, + Concurrency: 2, + }, nil) + if err != nil { + t.Errorf("Failed to add job: %v", err) + } +} + +func TestRemoveJob(t *testing.T) { + jq := NewJobs() + + err := jq.AddJob(JobSpecification{ + Id: "1", + // Set other fields + }, nil) + if err != nil { + t.Errorf("Failed to add job: %v", err) + } + + removed, _ := jq.RemoveJob("1") + if !removed { + t.Errorf("Failed to remove job") + } +} diff --git a/json.go b/json.go new file mode 100644 index 0000000000000000000000000000000000000000..b946a33826922224073ac420dd5b9591a82366d4 --- /dev/null +++ b/json.go @@ -0,0 +1,65 @@ +package jobqueue + +import "github.com/robfig/cron/v3" +import "encoding/json" + +// UnmarshalJSON unmarshals a job from json. +func (j *job) UnmarshalJSON(data []byte) error { + type Alias job + aux := &struct { + Schedule string `json:"schedule"` + *Alias + }{ + Alias: (*Alias)(j), + } + + if err := json.Unmarshal(data, &aux); err != nil { + return err + } + + if aux.Schedule != "" { + schedule, err := cron.ParseStandard(aux.Schedule) + if err != nil { + return err + } + j.scheduleImpl = schedule + } + + return nil +} + +// MarshalJSON marshals a job to json. +func (j *job) MarshalJSON() ([]byte, error) { + type Alias job + aux := &struct { + *Alias + }{ + Alias: (*Alias)(j), + } + + return json.Marshal(aux) +} + +// NewJobFromJSON creates a new job from a json string. +func NewJobFromJSON(jsonStr string) (*job, error) { + var job JobSpecification + err := json.Unmarshal([]byte(jsonStr), &job) + if err != nil { + return nil, err + } + return newJob(job), nil +} + +// ToJSON marshals a job to a json string. +func (j *job) ToJSON() (string, error) { + data, err := json.Marshal(j) + if err != nil { + return "", err + } + return string(data), nil +} + +// FromJSON unmarshals a job from a json string. +func (j *job) FromJSON(jsonStr string) error { + return json.Unmarshal([]byte(jsonStr), j) +} diff --git a/json_test.go b/json_test.go new file mode 100644 index 0000000000000000000000000000000000000000..36a348f83bf783fa4b04566219b459271a0ffcf4 --- /dev/null +++ b/json_test.go @@ -0,0 +1 @@ +package jobqueue diff --git a/log-writer.go b/log-writer.go new file mode 100644 index 0000000000000000000000000000000000000000..8879499b0cb3ca43856c1c37dd81f8f2b3861974 --- /dev/null +++ b/log-writer.go @@ -0,0 +1,108 @@ +package jobqueue + +import ( + "bytes" + "fmt" + "os" + "sync" + "text/template" +) + +// Logger ist ein generisches Log-Interface, das verschiedene Ausgabeziele unterstützt. +type Logger interface { + // Write schreibt einen Log-Eintrag. + Write(entry JobLog) (n int, err error) + + // Close schließt den Logger. + Close() error +} + +// FileLogger is a logger that writes to a file. +type FileLogger struct { + mu sync.Mutex + logDir string + maxLogSize int64 + maxLogFiles int + currentLog *os.File + currentSize int64 + logFileIndex int +} + +// NewFileLogger creates a new FileLogger. +func NewFileLogger(logDir string, maxLogSize int64, maxLogFiles int) (*FileLogger, error) { + if err := os.MkdirAll(logDir, 0755); err != nil { + return nil, err + } + + lw := &FileLogger{ + logDir: logDir, + maxLogSize: maxLogSize, + maxLogFiles: maxLogFiles, + currentSize: 0, + logFileIndex: 1, + } + + if err := lw.rotateLog(); err != nil { + return nil, err + } + + return lw, nil +} + +// Write the log entry to the current log file. +func (lw *FileLogger) Write(entry JobLog) (n int, err error) { + lw.mu.Lock() + defer lw.mu.Unlock() + + logTemplate := "{{.Timestamp}} {{.JobID}} {{.JobName}} {{.JobStatus}} {{.ExitCode}} {{.Duration}} {{.Message}}" + tmpl := template.New("log") + tmpl, err = tmpl.Parse(logTemplate) + if err != nil { + return 0, err + } + + buffer := new(bytes.Buffer) + tmpl.Execute(buffer, entry) + + n, err = lw.currentLog.Write(buffer.Bytes()) + lw.currentSize += int64(n) + + if lw.currentSize >= lw.maxLogSize { + _ = lw.rotateLog() + } + + return n, err +} + +// Close closes the current log file. +func (lw *FileLogger) Close() error { + lw.mu.Lock() + defer lw.mu.Unlock() + return lw.currentLog.Close() +} + +// rotateLog closes the current log file and opens a new one. +func (lw *FileLogger) rotateLog() error { + lw.currentSize = 0 + + if lw.currentLog != nil { + if err := lw.currentLog.Close(); err != nil { + return err + } + } + + logFileName := fmt.Sprintf("%s/job_log_%d.log", lw.logDir, lw.logFileIndex) + lw.logFileIndex++ + + if lw.logFileIndex > lw.maxLogFiles { + lw.logFileIndex = 1 + } + + f, err := os.Create(logFileName) + if err != nil { + return err + } + + lw.currentLog = f + return nil +} diff --git a/manager.go b/manager.go new file mode 100644 index 0000000000000000000000000000000000000000..ccf83ecec80155b4400fa23d3119d49db887b1da --- /dev/null +++ b/manager.go @@ -0,0 +1,152 @@ +package jobqueue + +import ( + "context" + "errors" + "sync" + "time" +) + +type JobManager struct { + queue *jobs + executor *jobExecutor + mutex sync.Mutex + maxConcurrency int +} + +// NewJobManager creates a new job manager with configurable concurrency and interval. +func NewJobManager(maxConcurrency int, interval time.Duration, stopOnEmpty bool) *JobManager { + + jq := NewJobs() + instance := NewJobExecutor(jq, maxConcurrency, interval, + func(executor *jobExecutor) bool { + if executor.Ctx.Err() != nil { + return true + } + + if stopOnEmpty && executor.Queue.GetJobsCount() == 0 { + return true + } + + return false + }) + + return &JobManager{ + queue: jq, + executor: instance, + maxConcurrency: maxConcurrency, + } +} + +func (jm *JobManager) GetJobs() map[JobIDType]ReadOnlyJob { + return jm.queue.GetJobs() +} + +func (jm *JobManager) AddJob(jobSpec JobSpecification, runnable Runnable) error { + jm.mutex.Lock() + defer jm.mutex.Unlock() + return jm.queue.AddJob(jobSpec, runnable) +} + +func (jm *JobManager) GetJob(id JobIDType) ReadOnlyJob { + return jm.queue.GetJob(id) +} + +func (jm *JobManager) JobExists(id JobIDType) bool { + return jm.queue.JobExists(id) +} + +func (jm *JobManager) RemoveJob(id JobIDType) (bool, error) { + jm.mutex.Lock() + defer jm.mutex.Unlock() + return jm.queue.RemoveJob(id) +} + +func (jm *JobManager) Start() error { + jm.mutex.Lock() + defer jm.mutex.Unlock() + return jm.executor.Start() +} + +func (jm *JobManager) Stop() error { + jm.mutex.Lock() + defer jm.mutex.Unlock() + return jm.executor.Stop() +} + +func (jm *JobManager) Pause() error { + jm.mutex.Lock() + defer jm.mutex.Unlock() + return jm.executor.Pause() +} + +func (jm *JobManager) Resume() error { + jm.mutex.Lock() + defer jm.mutex.Unlock() + return jm.executor.Resume() +} + +func (jm *JobManager) GetFinishedJobs() map[JobIDType]ReadOnlyJob { + jm.mutex.Lock() + defer jm.mutex.Unlock() + return jm.queue.GetFinishedJobs() +} + +func (jm *JobManager) GetFinishedJob(id JobIDType) ReadOnlyJob { + jm.mutex.Lock() + defer jm.mutex.Unlock() + return jm.queue.GetFinishedJob(id) +} + +func (jm *JobManager) RemoveFinishedJob(id JobIDType) (bool, error) { + jm.mutex.Lock() + defer jm.mutex.Unlock() + return jm.queue.RemoveFinishedJob(id) +} + +func (jm *JobManager) OnStarted(hook func()) error { + + timeout := 5 * time.Second + startTime := time.Now() + + for { + if jm.executor.IsRunning() { + hook() + return nil + } + + if time.Since(startTime) >= timeout { + return ErrTimeout + } + + <-time.After(1 * time.Millisecond) + } + +} + +func (jm *JobManager) Wait() error { + + if jm.executor.Ctx.Err() != nil { + return jm.executor.Ctx.Err() + } + + if jm.executor.IsRunning() { + <-jm.executor.Ctx.Done() + + if errors.Is(jm.executor.Ctx.Err(), context.Canceled) { + return nil + } + + return jm.executor.Ctx.Err() + + } + + return ErrNotRunning + +} + +func (jm *JobManager) IsRunning() bool { + jm.mutex.Lock() + defer jm.mutex.Unlock() + return jm.executor.IsRunning() +} diff --git a/manger_test.go b/manger_test.go new file mode 100644 index 0000000000000000000000000000000000000000..d7ee287dfbf4b5f2d26b13631d7a4012558ca844 --- /dev/null +++ b/manger_test.go @@ -0,0 +1,158 @@ +package jobqueue + +import ( + "github.com/stretchr/testify/assert" + //_ "net/http/pprof" + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestRepeatNewJobManagerPauseAndResume(t *testing.T) { + numRepeats := 10 // Anzahl der Wiederholungen + + for i := 0; i < numRepeats; i++ { + t.Logf("Repeat %d\n", i+1) + dewJobManagerPauseAndResume(t) + } +} + +func dewJobManagerPauseAndResume(t *testing.T) { + + var wg sync.WaitGroup + + tickerTime := 1 * time.Microsecond + waitBeforePause := 5 * tickerTime + doPause := 5 * tickerTime + maxRuns := 3 + + jm := NewJobManager(1, tickerTime, false) + if jm == nil { + t.Errorf("NewJobManager returned nil") + } + + err := jm.AddJob(JobSpecification{ + Id: "1", + MaxRuns: maxRuns, + }, &ExternalProcessRunner{ + Command: "sleep", + Args: []string{"1"}, + }) + + assert.Nil(t, err) + + job := jm.GetJob("1") + assert.NotNil(t, job) + + var isPaused int32 + + err = jm.Start() + assert.Nil(t, err) + + // Timer für 5 Sekunden + timer := time.NewTimer(10 * time.Second) + + // anonymous function to stop the job manager after the timer has expired + go func() { + <-timer.C // wait for the timer to expire + err := jm.Stop() + assert.Nil(t, err) + }() + + defer func() { + err := jm.Stop() + if err != nil { + assert.ErrorIs(t, err, ErrAlreadyStopped) + } + }() + + // Go routine to pause and resume the job manager + go func() { + time.Sleep(waitBeforePause) + err := jm.Pause() + assert.Nil(t, err) + atomic.StoreInt32(&isPaused, 1) + + time.Sleep(doPause) + + err = jm.Resume() + assert.Nil(t, err) + atomic.StoreInt32(&isPaused, 0) + }() + + wg.Add(1) + err = jm.OnStarted(func() { + defer wg.Done() + err := jm.Wait() + assert.Nil(t, err) + + finishedJob := jm.GetFinishedJob("1") + assert.NotNil(t, finishedJob) + + paused := atomic.LoadInt32(&isPaused) == 1 + assert.False(t, paused, "Job manager should not be paused") + + }) + + assert.Nil(t, err) + wg.Wait() + +} + +func TestNewJobManager(t *testing.T) { + + doRuns := 5 + + jm := NewJobManager(10, 1*time.Microsecond, true) + if jm == nil { + t.Errorf("NewJobManager returned nil") + } + + err := jm.AddJob(JobSpecification{ + Id: "1", + MaxRuns: doRuns, + }, &ExternalProcessRunner{ + Command: "echo", + Args: []string{"hello"}, + }) + assert.Nil(t, err) + + err = jm.Start() + + err = jm.OnStarted(func() { + + assert.Nil(t, err) + assert.True(t, jm.IsRunning()) + + job := jm.GetJob("1") + assert.NotNil(t, job) + + defer func() { + err := jm.Stop() + if err != nil { + assert.ErrorIs(t, err, ErrAlreadyStopped) + } + }() + + err = jm.Wait() + assert.Nil(t, err) + + finishedJob := jm.GetFinishedJob("1") + assert.NotNil(t, finishedJob) + + runs := finishedJob.GetRuns() + assert.Equal(t, doRuns, runs) + + logs := finishedJob.GetLogs() + assert.Equal(t, doRuns, len(logs)) + + stats := finishedJob.GetStats() + assert.Equal(t, doRuns, stats.SuccessCount) + assert.Equal(t, 0, stats.ErrorCount) + + }) + + assert.Nil(t, err) + +} diff --git a/prority.go b/prority.go new file mode 100644 index 0000000000000000000000000000000000000000..9efa113623893215032fc709c1dd2c4bcbfa5102 --- /dev/null +++ b/prority.go @@ -0,0 +1,9 @@ +package jobqueue + +const ( + _ int = iota * 10 + PriorityLow + PriorityDefault + PriorityHigh + PriorityCritical +) diff --git a/release.json b/release.json new file mode 100644 index 0000000000000000000000000000000000000000..d2bad7dcaebc46331fc002036758c7447c0b679a --- /dev/null +++ b/release.json @@ -0,0 +1 @@ +{"version":"0.1.0"} diff --git a/runnable.go b/runnable.go new file mode 100644 index 0000000000000000000000000000000000000000..0e99d16a163240b181f79ff688ab9afff0e6bf85 --- /dev/null +++ b/runnable.go @@ -0,0 +1,88 @@ +package jobqueue + +import ( + "bytes" + "context" + "fmt" + "os/exec" +) + +type Runnable interface { + Run(ctx context.Context) (int, any, error) +} + +type GoFunctionRunner struct { + Func func() (int, any, error) +} + +type Result struct { + Code int + Result any + Err error +} + +func (g *GoFunctionRunner) Run(ctx context.Context) (int, any, error) { + done := make(chan Result) + + go func() { + var res Result + defer func() { + if r := recover(); r != nil { + res.Err = fmt.Errorf("Command panicked: %w", fmt.Errorf("%v", r)) + } + done <- res + }() + res.Code, res.Result, res.Err = g.Func() + }() + + select { + case res := <-done: + return res.Code, res.Result, res.Err + case <-ctx.Done(): + return RunnableTerminatedExitCode, nil, ctx.Err() + } +} + +type ExternalProcessRunner struct { + Command string + Args []string +} + +func (e *ExternalProcessRunner) Run(ctx context.Context) (int, any, error) { + var stdout, stderr bytes.Buffer + cmd := exec.CommandContext(ctx, e.Command, e.Args...) + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + done := make(chan Result) + + go func() { + var res Result + defer func() { + if r := recover(); r != nil { + res.Err = fmt.Errorf("Command panicked: %w", fmt.Errorf("%v", r)) + } + done <- res + }() + err := cmd.Run() + res.Result = stdout.String() + stderr.String() + if err != nil { + if exitErr, ok := err.(*exec.ExitError); ok { + res.Code = exitErr.ExitCode() + } + res.Err = err + } else { + res.Code = cmd.ProcessState.ExitCode() + } + }() + + select { + case res := <-done: + return res.Code, res.Result, res.Err + case <-ctx.Done(): + if cmd.Process != nil { + _ = cmd.Process.Kill() + } + return RunnableTerminatedExitCode, nil, ctx.Err() + } +} diff --git a/runnable_test.go b/runnable_test.go new file mode 100644 index 0000000000000000000000000000000000000000..168e5c02ac75d6dcab710d80e29c5bf7edb4cc70 --- /dev/null +++ b/runnable_test.go @@ -0,0 +1,131 @@ +package jobqueue + +import ( + "context" + "testing" + "time" +) + +func TestGoFunctionRunner(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + runner := &GoFunctionRunner{ + Func: func() (int, any, error) { + return 42, nil, nil + }, + } + + result, _, err := runner.Run(ctx) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if result != 42 { + t.Errorf("Expected 42, got %v", result) + } +} + +func TestGoFunctionRunnerTimeout(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) + defer cancel() + + runner := &GoFunctionRunner{ + Func: func() (int, any, error) { + time.Sleep(10 * time.Millisecond) + return 42, nil, nil + }, + } + + _, _, err := runner.Run(ctx) + if err == nil { + t.Errorf("Expected timeout error, got nil") + } +} + +func TestExternalProcessRunner(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Hour) + defer cancel() + + runner := &ExternalProcessRunner{ + Command: "echo", + Args: []string{"hello"}, + } + + _, _, err := runner.Run(ctx) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } +} + +func TestExternalProcessRunnerFail(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + runner := &ExternalProcessRunner{ + Command: "nonexistentcommand", + } + + _, _, err := runner.Run(ctx) + if err == nil { + t.Errorf("Expected error, got nil") + } +} + +func TestGoFunctionRunnerNilFunc(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + runner := &GoFunctionRunner{} + + _, _, err := runner.Run(ctx) + if err == nil { + t.Errorf("Expected error, got nil") + } +} + +func TestGoFunctionRunnerPanic(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + runner := &GoFunctionRunner{ + Func: func() (int, any, error) { + panic("Test panic") + }, + } + + _, _, err := runner.Run(ctx) + if err == nil { + t.Errorf("Expected error, got nil") + } +} + +func TestGoFunctionRunnerExpiredContext(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond) + defer cancel() + time.Sleep(2 * time.Nanosecond) + + runner := &GoFunctionRunner{ + Func: func() (int, any, error) { + return 42, nil, nil + }, + } + + _, _, err := runner.Run(ctx) + if err == nil { + t.Errorf("Expected context expired error, got nil") + } +} + +func TestExternalProcessRunnerInvalidCommand(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + runner := &ExternalProcessRunner{ + Command: "", + } + + _, _, err := runner.Run(ctx) + if err == nil { + t.Errorf("Expected error for invalid command, got nil") + } +} diff --git a/stat.go b/stat.go new file mode 100644 index 0000000000000000000000000000000000000000..ca0f5fafff1d5a827889de86b785cf277341a86c --- /dev/null +++ b/stat.go @@ -0,0 +1,153 @@ +package jobqueue + +import ( + "context" + "github.com/shirou/gopsutil/v3/cpu" + "math" + "runtime" + "sync" + "sync/atomic" + "time" +) + +var mainResourceStats *ResourceStats + +func StartResourceMonitoring(interval time.Duration) error { + mainResourceStats = NewResourceStats() + return mainResourceStats.StartMonitoring(interval) +} + +func StopResourceMonitoring() { + if mainResourceStats != nil { + mainResourceStats.StopMonitoring() + } +} + +func resetResourceStatsForTesting() { + if mainResourceStats != nil { + StopResourceMonitoring() + } +} + +func GetCpuUsage() float64 { + + if mainResourceStats != nil { + return mainResourceStats.GetCpuUsage() + } + return 0 +} + +func GetMemoryUsage() uint64 { + if mainResourceStats != nil { + return mainResourceStats.GetMemoryUsage() + } + return 0 +} + +type ResourceStats struct { + cpuUsage uint64 + memoryUsage uint64 + context context.Context + cancel context.CancelFunc + mu sync.Mutex +} + +func NewResourceStats() *ResourceStats { + return &ResourceStats{} +} + +func (stats *ResourceStats) getMemoryUsage() uint64 { + var m runtime.MemStats + runtime.ReadMemStats(&m) + return m.Alloc +} + +func (stats *ResourceStats) getCPUPercentage() (float64, error) { + percentages, err := cpu.Percent(100*time.Millisecond, false) + if err != nil { + return 0, err + } + + if len(percentages) == 0 { + return 0, ErrCPUPercentage + } + + return percentages[0], nil +} + +func (stats *ResourceStats) assignResourceStats() { + mem := stats.getMemoryUsage() + cpuP, err := stats.getCPUPercentage() + if err != nil { + return + } + cpuPBits := math.Float64bits(cpuP) + atomic.StoreUint64(&stats.cpuUsage, cpuPBits) + atomic.StoreUint64(&stats.memoryUsage, mem) +} + +func (stats *ResourceStats) MonitorResources(interval time.Duration) { + stats.mu.Lock() + ctx := stats.context + stats.mu.Unlock() + + if ctx == nil { + return + } + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + stats.assignResourceStats() + case <-ctx.Done(): + return + } + } +} + +func (stats *ResourceStats) StartMonitoring(interval time.Duration) error { + stats.mu.Lock() + defer stats.mu.Unlock() + + if stats.context != nil && stats.context.Err() == nil { + return nil + } + + ctx, cancel := context.WithCancel(context.Background()) + stats.context = ctx + stats.cancel = cancel + + if interval == 0 { + return ErrIntervalIsZero + } + + stats.assignResourceStats() + go stats.MonitorResources(interval) + + return nil +} + +func (stats *ResourceStats) StopMonitoring() { + stats.mu.Lock() + defer stats.mu.Unlock() + + if stats.cancel != nil { + ctx := stats.context // save for later + stats.context = nil // set to nil first + stats.cancel() // then cancel + ctx.Done() // ensure channel is closed if needed + stats.cancel = nil + } +} + +func (stats *ResourceStats) GetCpuUsage() float64 { + bits := atomic.LoadUint64(&stats.cpuUsage) + return math.Float64frombits(bits) +} + +func (stats *ResourceStats) GetMemoryUsage() uint64 { + return atomic.LoadUint64(&stats.memoryUsage) +} diff --git a/stat_test.go b/stat_test.go new file mode 100644 index 0000000000000000000000000000000000000000..72c77419fb80aeaaad0a3703bb553e8b2303ffa8 --- /dev/null +++ b/stat_test.go @@ -0,0 +1,45 @@ +package jobqueue + +import ( + "github.com/stretchr/testify/assert" + "math" + "sync/atomic" + "testing" + "time" +) + +func TestResourceStats(t *testing.T) { + stats := NewResourceStats() + + expectedCPUUsage := 75.5 + expectedMemoryUsage := uint64(1024 * 1024) // 1 MB + + cpuBits := math.Float64bits(expectedCPUUsage) + atomic.StoreUint64(&stats.cpuUsage, cpuBits) + atomic.StoreUint64(&stats.memoryUsage, expectedMemoryUsage) + + actualCPUUsage := stats.GetCpuUsage() + actualMemoryUsage := stats.GetMemoryUsage() + + assert.InDelta(t, expectedCPUUsage, actualCPUUsage, 0.001) // Überprüfe auf Genauigkeit bis auf 0,001 + assert.Equal(t, expectedMemoryUsage, actualMemoryUsage) +} + +func TestResourceMonitoringStartStop(t *testing.T) { + stats := NewResourceStats() + + err := stats.StartMonitoring(1 * time.Second) + assert.Nil(t, err) + defer stats.StopMonitoring() + + time.Sleep(100 * time.Millisecond) + + assert.NotNil(t, stats.context) + assert.Nil(t, stats.context.Err()) + + stats.StopMonitoring() + + time.Sleep(100 * time.Millisecond) + + assert.Nil(t, stats.context) +} diff --git a/topological-sort.go b/topological-sort.go new file mode 100644 index 0000000000000000000000000000000000000000..6961b7f66bf20077913ab005a474fd8aefa48883 --- /dev/null +++ b/topological-sort.go @@ -0,0 +1,98 @@ +package jobqueue + +import "container/heap" + +// JobIDPriority is a type that holds a JobID and its Priority +type JobIDPriority struct { + ID JobIDType + Priority int +} + +// JobIDPriorityQueue is a priority jobs for JobIDPriority +type JobIDPriorityQueue []JobIDPriority + +// Len implements heap.Interface.Len +func (pq JobIDPriorityQueue) Len() int { return len(pq) } + +// Less implements heap.Interface.Less +func (pq JobIDPriorityQueue) Less(i, j int) bool { + return pq[i].Priority > pq[j].Priority +} + +// Swap implements heap.Interface.Swap +func (pq JobIDPriorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] +} + +// Push implements heap.Interface.Push +func (pq *JobIDPriorityQueue) Push(x interface{}) { + item := x.(JobIDPriority) + *pq = append(*pq, item) +} + +// Pop implements heap.Interface.Pop +func (pq *JobIDPriorityQueue) Pop() interface{} { + old := *pq + n := len(old) + item := old[n-1] + *pq = old[0 : n-1] + return item +} + +// topologicalSortJobs returns a topologically sorted list of job IDs +func topologicalSortJobs(jobs map[JobIDType]*job) ([]JobIDType, error) { + // Initialize in-degrees + inDegrees := make(map[JobIDType]int) + for id := range jobs { + inDegrees[id] = 0 + } + + for _, job := range jobs { + for _, dependency := range job.Dependencies { + // check if dependency exists + if _, ok := jobs[dependency]; !ok { + return nil, ErrMissingDependency + } + + inDegrees[dependency]++ + } + } + + // Create a priority jobs + pq := make(JobIDPriorityQueue, 0) + heap.Init(&pq) + + // Add jobs with zero in-degree to priority jobs + for id, inDegree := range inDegrees { + if inDegree == 0 { + heap.Push(&pq, JobIDPriority{ID: id, Priority: jobs[id].Priority}) + } + } + + result := make([]JobIDType, 0) + + for len(pq) > 0 { + + jobIDPriority := heap.Pop(&pq).(JobIDPriority) + jobID := jobIDPriority.ID + + result = append(result, jobID) + + for _, dependentJobID := range jobs[jobID].Dependencies { + inDegrees[dependentJobID]-- + if inDegrees[dependentJobID] == 0 { + heap.Push(&pq, JobIDPriority{ID: dependentJobID, Priority: jobs[dependentJobID].Priority}) + } + } + + } + + // Check for cycles + for _, inDegree := range inDegrees { + if inDegree > 0 { + return nil, ErrCycleDetected + } + } + + return result, nil +} diff --git a/topological-sort_test.go b/topological-sort_test.go new file mode 100644 index 0000000000000000000000000000000000000000..565ce522568213d2b67ffc50529fda5ef0d4d49f --- /dev/null +++ b/topological-sort_test.go @@ -0,0 +1,233 @@ +package jobqueue + +import ( + "reflect" + "testing" +) + +func TestTopologicalSortJobs(t *testing.T) { + // Create a sample set of jobs with dependencies and priorities + + job1 := &job{JobSpecification: JobSpecification{Id: "1", Priority: PriorityHigh}} + + job2 := &job{JobSpecification: JobSpecification{Id: "2", Priority: PriorityHigh, Dependencies: []JobIDType{"1"}}} + job3 := &job{JobSpecification: JobSpecification{Id: "3", Priority: PriorityLow, Dependencies: []JobIDType{"1"}}} + job4 := &job{JobSpecification: JobSpecification{Id: "4", Priority: PriorityCritical, Dependencies: []JobIDType{"3"}}} + job5 := &job{JobSpecification: JobSpecification{Id: "5", Dependencies: []JobIDType{"2", "4"}}} + + jobs := map[JobIDType]*job{ + "1": job1, + "2": job2, + "3": job3, + "4": job4, + "5": job5, + } + + // Call the function to get the sorted job IDs + sortedJobIDs, err := topologicalSortJobs(jobs) + if err != nil { + t.Errorf("Error in sorting jobs: %v", err) + } + + // Define the expected order + expectedOrder := []JobIDType{"5", "4", "2", "3", "1"} + + // Check if the result matches the expected order + if !reflect.DeepEqual(sortedJobIDs, expectedOrder) { + t.Errorf("Expected order %v, but got %v", expectedOrder, sortedJobIDs) + } +} + +func TestTopologicalSortJobs2(t *testing.T) { + // Create a sample set of jobs with dependencies and priorities + + job1 := &job{JobSpecification: JobSpecification{Id: "1", Priority: PriorityHigh}} + + job2 := &job{JobSpecification: JobSpecification{Id: "2", Priority: PriorityHigh, Dependencies: []JobIDType{"1"}}} + job3 := &job{JobSpecification: JobSpecification{Id: "3", Priority: PriorityLow, Dependencies: []JobIDType{"1"}}} + job4 := &job{JobSpecification: JobSpecification{Id: "4", Priority: PriorityCritical, Dependencies: []JobIDType{"3"}}} + job5 := &job{JobSpecification: JobSpecification{Id: "5", Dependencies: []JobIDType{"2", "4"}}} + + jobs := map[JobIDType]*job{ + "1": job1, + "2": job2, + "3": job3, + "4": job4, + "5": job5, + } + + // Call the function to get the sorted job IDs + sortedJobIDs, err := topologicalSortJobs(jobs) + if err != nil { + t.Errorf("Error in sorting jobs: %v", err) + } + + // Define the expected order + expectedOrder := []JobIDType{"5", "4", "2", "3", "1"} + + // Check if the result matches the expected order + if !reflect.DeepEqual(sortedJobIDs, expectedOrder) { + t.Errorf("Expected order %v, but got %v", expectedOrder, sortedJobIDs) + } +} + +func TestTopologicalSortJobsNoDependencies(t *testing.T) { + // Create a sample set of jobs with no dependencies + job1 := &job{JobSpecification: JobSpecification{Id: "1"}} + job2 := &job{JobSpecification: JobSpecification{Id: "2"}} + job3 := &job{JobSpecification: JobSpecification{Id: "3"}} + + jobs := map[JobIDType]*job{ + "1": job1, + "2": job2, + "3": job3, + } + + // Call the function to get the sorted job IDs + sortedJobIDs, err := topologicalSortJobs(jobs) + if err != nil { + t.Errorf("Error in sorting jobs: %v", err) + } + + // Define the expected order (in any order because they have no dependencies) + expectedOrder := []JobIDType{"1", "2", "3"} + + // Check if the result contains the same elements as the expected order + if len(sortedJobIDs) != len(expectedOrder) { + t.Errorf("Expected order %v, but got %v", expectedOrder, sortedJobIDs) + } +} + +func TestTopologicalSortJobs_EmptyMap(t *testing.T) { + jobs := map[JobIDType]*job{} + sortedJobIDs, err := topologicalSortJobs(jobs) + if err != nil { + t.Errorf("Error in sorting jobs: %v", err) + } + if len(sortedJobIDs) != 0 { + t.Errorf("Expected empty slice, got %v", sortedJobIDs) + } +} + +func TestTopologicalSortJobs_CycleDetected(t *testing.T) { + // Creating a cycle 1 -> 2 -> 3 -> 1 + job1 := &job{JobSpecification: JobSpecification{Id: "1", Dependencies: []JobIDType{"3"}}} + job2 := &job{JobSpecification: JobSpecification{Id: "2", Dependencies: []JobIDType{"1"}}} + job3 := &job{JobSpecification: JobSpecification{Id: "3", Dependencies: []JobIDType{"2"}}} + + jobs := map[JobIDType]*job{ + "1": job1, + "2": job2, + "3": job3, + } + + _, err := topologicalSortJobs(jobs) + if err != ErrCycleDetected { + t.Errorf("Expected ErrCycleDetected, got %v", err) + } +} + +func TestTopologicalSortJobs_SingleNode(t *testing.T) { + job1 := &job{JobSpecification: JobSpecification{Id: "1"}} + + jobs := map[JobIDType]*job{ + "1": job1, + } + + sortedJobIDs, err := topologicalSortJobs(jobs) + if err != nil { + t.Errorf("Error in sorting jobs: %v", err) + } + if !reflect.DeepEqual(sortedJobIDs, []JobIDType{"1"}) { + t.Errorf("Expected [\"1\"], got %v", sortedJobIDs) + } +} + +func TestTopologicalSortJobs_MissingDependency(t *testing.T) { + job1 := &job{JobSpecification: JobSpecification{Id: "1"}} + job2 := &job{JobSpecification: JobSpecification{Id: "2", Dependencies: []JobIDType{"3"}}} + + jobs := map[JobIDType]*job{ + "1": job1, + "2": job2, + } + + _, err := topologicalSortJobs(jobs) + if err != nil && err != ErrMissingDependency { + t.Errorf("Expected ErrMissingDependency, got %v", err) + } +} + +func TestTopologicalSortJobs_SelfDependency(t *testing.T) { + // job 1 depends on itself + job1 := &job{JobSpecification: JobSpecification{Id: "1", Dependencies: []JobIDType{"1"}}} + + jobs := map[JobIDType]*job{ + "1": job1, + } + + _, err := topologicalSortJobs(jobs) + if err != ErrCycleDetected { + t.Errorf("Expected ErrCycleDetected, got %v", err) + } +} + +func TestTopologicalSortJobs_MultipleEdges(t *testing.T) { + // job 3 and job 4 both depend on job 2 + job1 := &job{JobSpecification: JobSpecification{Id: "1"}} + job2 := &job{JobSpecification: JobSpecification{Id: "2", Dependencies: []JobIDType{"1"}}} + job3 := &job{JobSpecification: JobSpecification{Id: "3", Dependencies: []JobIDType{"2"}}} + job4 := &job{JobSpecification: JobSpecification{Id: "4", Dependencies: []JobIDType{"2"}}} + + jobs := map[JobIDType]*job{ + "1": job1, + "2": job2, + "3": job3, + "4": job4, + } + + sortedJobIDs, err := topologicalSortJobs(jobs) + if err != nil { + t.Errorf("Error in sorting jobs: %v", err) + } + if !reflect.DeepEqual(sortedJobIDs, []JobIDType{"4", "3", "2", "1"}) && !reflect.DeepEqual(sortedJobIDs, []JobIDType{"3", "4", "2", "1"}) { + t.Errorf("Unexpected order: %v", sortedJobIDs) + } +} + +func TestTopologicalSortJobs_MultipleDependencies(t *testing.T) { + // job 3 depends on both job 1 and job 2 + job1 := &job{JobSpecification: JobSpecification{Id: "1"}} + job2 := &job{JobSpecification: JobSpecification{Id: "2"}} + job3 := &job{JobSpecification: JobSpecification{Id: "3", Dependencies: []JobIDType{"1", "2"}}} + + jobs := map[JobIDType]*job{ + "1": job1, + "2": job2, + "3": job3, + } + + sortedJobIDs, err := topologicalSortJobs(jobs) + if err != nil { + t.Errorf("Error in sorting jobs: %v", err) + } + if !reflect.DeepEqual(sortedJobIDs, []JobIDType{"3", "2", "1"}) && !reflect.DeepEqual(sortedJobIDs, []JobIDType{"3", "1", "2"}) { + t.Errorf("Unexpected order: %v", sortedJobIDs) + } +} + +func TestTopologicalSortJobs_PriorityIgnoredInCycle(t *testing.T) { + // Cycle exists even if one job has high priority + job1 := &job{JobSpecification: JobSpecification{Id: "1", Priority: PriorityHigh, Dependencies: []JobIDType{"2"}}} + job2 := &job{JobSpecification: JobSpecification{Id: "2", Dependencies: []JobIDType{"1"}}} + + jobs := map[JobIDType]*job{ + "1": job1, + "2": job2, + } + + _, err := topologicalSortJobs(jobs) + if err != ErrCycleDetected { + t.Errorf("Expected ErrCycleDetected, got %v", err) + } +}