filed
Job queue using FUSE
git clone git://mccd.space/filed
| Log | Files | Refs | README | LICENSE |
commit 3762623c577c98849326b9086c7325aed71135c6 parent 932744ca1ba093470273aecc562727ae41e80f26 Author: Marc Coquand <marc@coquand.email> Date: Mon, 15 Dec 2025 21:22:46 +0100 Add support for backoff retries Diffstat:
| M | README.md | | | 2 | +- |
| M | main.go | | | 14 | +++++++------- |
| M | manager.go | | | 26 | +++++++++++++++++++------- |
| M | newid.go | | | 3 | ++- |
| M | qj.1.scd | | | 30 | +++++++++++++++++++++++++++--- |
| M | store/config.go | | | 10 | ++++++---- |
| M | store/jobs.go | | | 4 | +++- |
7 files changed, 65 insertions(+), 24 deletions(-)
diff --git a/README.md b/README.md
@@ -66,7 +66,7 @@ $ mv /tmp/qj-jobs/failed/1 /tmp/qj-jobs/pending
- [x] Support chmod and chown
- [x] State is configured via environment variable
-- [ ] Customizable backoff and timeout before retries
+- [x] Customizable backoff and timeout before retries
- [x] Last modified and created at are correctly rendered for jobs
- [ ] "Landlock"-mode, or sandboxed jobs - Requires a design
diff --git a/main.go b/main.go
@@ -31,7 +31,7 @@ func main() {
if dbPath == "" {
xdg_home := os.Getenv("XDG_DATA_HOME")
if xdg_home == "" {
- fmt.Fprintf(os.Stderr, "QJ_DATA_FILE environment variable needs to be set.\n")
+ fmt.Fprintf(os.Stderr, "QJ_STATE_FILE environment variable needs to be set.\n")
usage()
os.Exit(1)
}
@@ -99,17 +99,17 @@ func (rd RootDir) Lookup(ctx context.Context, name string) (fs.Node, error) {
slog.Debug("FUSE: Lookup", "name", name)
switch name {
case store.StatePending:
- return PendingDir{manager: rd.manager, inode: 2}, nil
+ return &PendingDir{manager: rd.manager, inode: 2}, nil
case store.StateCompleted:
- return JobDir{state: name, manager: rd.manager, inode: 3}, nil
+ return &JobDir{state: name, manager: rd.manager, inode: 3}, nil
case store.StateFailed:
- return JobDir{state: name, manager: rd.manager, inode: 4}, nil
+ return &JobDir{state: name, manager: rd.manager, inode: 4}, nil
case store.StateRunning:
- return JobDir{state: name, manager: rd.manager, inode: 5}, nil
+ return &JobDir{state: name, manager: rd.manager, inode: 5}, nil
case NewIdName:
- return NewIdFile{manager: rd.manager, inode: 6}, nil
+ return &NewIdFile{manager: rd.manager, inode: 6}, nil
case ConfigName:
- return ConfigFile{manager: rd.manager, inode: 7}, nil
+ return &ConfigFile{manager: rd.manager, inode: 7}, nil
default:
return nil, syscall.ENOENT
}
diff --git a/manager.go b/manager.go
@@ -5,6 +5,7 @@ import (
"context"
"fmt"
"log/slog"
+ "math"
"os/exec"
"qj/store"
"sync"
@@ -78,17 +79,28 @@ func (jm *JobManager) processPendingJobs() {
jm.store.FailJob(job.ID)
continue
}
- slog.Info("Worker: Starting job", "id", job.ID, "cmd", job.Command, "attempt", job.Attempts)
- if err := jm.store.AttemptJob(job.ID); err != nil {
- slog.Error("Worker: Failed to claim job", "id", job.ID, "error", err)
- continue
- }
+ if jm.isReady(&conf, &job) {
+ slog.Info("Worker: Starting job", "id", job.ID, "cmd", job.Command, "attempt", job.Attempts)
+ if err := jm.store.AttemptJob(job.ID); err != nil {
+ slog.Error("Worker: Failed to claim job", "id", job.ID, "error", err)
+ continue
+ }
- go jm.runJob(job.ID, job.Command)
- activeJobCount++
+ go jm.runJob(job.ID, job.Command)
+ activeJobCount++
+ }
}
}
+func (jm *JobManager) isReady(config *store.Config, job *store.Job) bool {
+ currentTime := time.Now().Unix()
+ lastModified := job.UpdatedAt.Unix()
+ multiplier := float64(config.BackoffMult)
+ waitTime := int64(float64(config.BackoffBase) * math.Pow(multiplier, float64(job.Attempts)))
+ readyAt := lastModified + waitTime
+ return currentTime >= readyAt
+}
+
func (jm *JobManager) runJob(id, commandStr string) {
timeout_seconds := jm.store.GetConfig().TimeoutSec
diff --git a/newid.go b/newid.go
@@ -42,6 +42,7 @@ func (f NewIdFile) Attr(ctx context.Context, a *fuse.Attr) error {
slog.Error("Could not retrieve file metadata", "error", err)
return syscall.EIO
} else if fileMeta != nil {
+ slog.Info("Found file meta")
a.Mode = os.FileMode(fileMeta.Mode)
a.Gid = fileMeta.GID
a.Uid = fileMeta.UID
@@ -75,6 +76,6 @@ func (f *NewIdFile) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp
}
func (f *NewIdFile) ReadAll(ctx context.Context) ([]byte, error) {
- slog.Debug("FUSE: Read file content")
+ slog.Info("FUSE: Read file content")
return randomJobId(f.manager.store, 4), nil
}
diff --git a/qj.1.scd b/qj.1.scd
@@ -10,7 +10,8 @@ qj - queue jobs utility
# DESCRIPTION
-qj is a job queue that operates on files mounted to _mountpoint_.
+qj is a job queue that operates on files. It mounts a directory to
+_mountpoint_ that is used to inspect and run jobs.
qj exposes 4 directories, where each directory contains zero or more _jobs_.
Job names must be unique across all four directories. The directories are:
@@ -32,8 +33,7 @@ qj exposes 2 files:
*new-id* contains a short unique id that can be sampled for job
name entropy. The id is guaranteed to be unique at creation.
- *config.json* provides settings for max-retries, timeouts, max
- concurrent jobs. When changed the settings are applied immediately.
+ *config.json* provides various settings. See more in section *CONFIG*.
# SECURITY
@@ -66,6 +66,30 @@ look like this, for example:
The above cronjob looks for jobs older than 1 week and removes them.
+# CONFIG
+
+Changes made to config.json are applied immediately.
+
+## Max retries
+
+Maximum amount of retries before moving the job to failed.
+
+## Max job count
+
+Maximum amount of concurrent jobs
+
+## Backoff mult and backoff base
+
+The time to wait before retrying. The formula is
+
+ ```
+ base * mult^attempts
+ ```
+
+## Timeout
+
+Time before the job will be killed by a signal.
+
# ENVIRONMENT
## QJ_STATE_FILE
diff --git a/store/config.go b/store/config.go
@@ -5,21 +5,23 @@ type Config struct {
TimeoutSec int `json:"timeout_sec"`
// Max amount of concurrent jobs
MaxJobCount int `json:"max_job_count"`
+ BackoffMult int `json:"backoff_mult"`
+ BackoffBase int `json:"backoff_base"`
}
func (st *Store) UpdateConfig(conf Config) error {
query := `
UPDATE config
- SET max_attempts = ?, timeout_sec = ?, max_job_count = ?
+ SET max_attempts = ?, timeout_sec = ?, max_job_count = ?, backoff_mult = ?, backoff_base = ?
WHERE id = 0
`
- _, err := st.db.Exec(query, conf.MaxAttempts, conf.TimeoutSec, conf.MaxJobCount)
+ _, err := st.db.Exec(query, conf.MaxAttempts, conf.TimeoutSec, conf.MaxJobCount, conf.BackoffMult, conf.BackoffBase)
return err
}
func (st *Store) GetConfig() (conf Config) {
- query := `SELECT max_attempts, timeout_sec, max_job_count FROM config WHERE id = 0`
- err := st.db.QueryRow(query).Scan(&conf.MaxAttempts, &conf.TimeoutSec, &conf.MaxJobCount)
+ query := `SELECT max_attempts, timeout_sec, max_job_count, backoff_mult, backoff_base FROM config WHERE id = 0`
+ err := st.db.QueryRow(query).Scan(&conf.MaxAttempts, &conf.TimeoutSec, &conf.MaxJobCount, &conf.BackoffMult, &conf.BackoffBase)
if err != nil {
panic(err)
}
diff --git a/store/jobs.go b/store/jobs.go
@@ -70,7 +70,9 @@ func (s *Store) initSchema() error {
id INTEGER PRIMARY KEY CHECK (id = 0),
max_attempts INTEGER DEFAULT 3,
timeout_sec INTEGER DEFAULT 180,
- max_job_count INTEGER DEFAULT 20
+ max_job_count INTEGER DEFAULT 20,
+ backoff_mult INTEGER DEFAULT 2,
+ backoff_base INTEGER DEFAULT 2
);
INSERT INTO config (id) VALUES (0) ON CONFLICT DO NOTHING;
CREATE TRIGGER IF NOT EXISTS auto_increment_inode_trigger