filed

Job queue using FUSE

git clone git://mccd.space/filed

commit f6476d60af5d20a453d8c18e794e0e870225b733
parent edc802d64c527862c4629b562001f3491db4ba4f
Author: Marc Coquand <marc@coquand.email>
Date:   Sun, 14 Dec 2025 19:28:24 +0100

Add config

Diffstat:
Aconfig.go | 60++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mmain.go | 3+++
Mmanager.go | 9+++++++--
Mnewid.go | 2+-
Mqj.1.scd | 13+++++++++----
Astore/config.go | 25+++++++++++++++++++++++++
Mstore/store.go | 6++++++
7 files changed, 111 insertions(+), 7 deletions(-)
diff --git a/config.go b/config.go
@@ -0,0 +1,60 @@
+package main
+
+import (
+	"context"
+	"encoding/json"
+	"log/slog"
+	"os"
+	"qj/store"
+	"syscall"
+
+	"bazil.org/fuse"
+)
+
+var ConfigName = "config.json"
+
+type ConfigFile struct {
+	manager *JobManager
+	inode   uint64
+}
+
+func (f *ConfigFile) Attr(ctx context.Context, a *fuse.Attr) error {
+	config := f.manager.store.GetConfig()
+	a.Mode = 0o744
+	a.Gid = uint32(os.Getgid())
+	a.Uid = uint32(os.Getuid())
+	a.Inode = f.inode
+	confJson, err := json.Marshal(config)
+	if err != nil {
+		slog.Error("FUSE: Could not marshal conf", "error", err)
+		return nil
+	}
+
+	a.Size = uint64(len(confJson))
+	return nil
+}
+func (f *ConfigFile) ReadAll(ctx context.Context) (jsonConf []byte, err error) {
+	slog.Info("FUSE: Read file content")
+	config := f.manager.store.GetConfig()
+	jsonConf, err = json.Marshal(config)
+	if err != nil {
+		slog.Error("FUSE: Could not marshal conf", "error", err)
+		return nil, nil
+	}
+	return jsonConf, nil
+}
+
+func (f *ConfigFile) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {
+	var newContent store.Config
+	if err := json.Unmarshal(req.Data, &newContent); err != nil {
+		return syscall.EINVAL
+	}
+	slog.Info("FUSE: Updating config", "config", newContent)
+	err := f.manager.store.UpdateConfig(newContent)
+	if err != nil {
+		slog.Error("FUSE: Could not update config", "config", newContent)
+		return syscall.EIO
+	}
+	resp.Size = len(req.Data)
+	return nil
+}
diff --git a/main.go b/main.go
@@ -96,6 +96,8 @@ func (rd RootDir) Lookup(ctx context.Context, name string) (fs.Node, error) {
 		return JobDir{state: name, manager: rd.manager, inode: 5}, nil
 	case NewIdName:
 		return &NewIdFile{manager: rd.manager, inode: 6}, nil
+	case ConfigName:
+		return &ConfigFile{manager: rd.manager, inode: 7}, nil
 	default:
 		return nil, syscall.ENOENT
 	}
@@ -107,6 +109,7 @@ var rootEntries = []fuse.Dirent{
 	{Name: store.StateFailed, Type: fuse.DT_Dir},
 	{Name: store.StateRunning, Type: fuse.DT_Dir},
 	{Name: NewIdName, Type: fuse.DT_File},
+	{Name: ConfigName, Type: fuse.DT_File},
 }
 
 func (RootDir) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
diff --git a/manager.go b/manager.go
@@ -40,6 +40,7 @@ func (jm *JobManager) StartWorker(ctx context.Context) {
 }
 
 func (jm *JobManager) processPendingJobs() {
+	conf := jm.store.GetConfig()
 	jobs, err := jm.store.ListJobsByState(store.StatePending)
 	if err != nil {
 		slog.Error("Worker: Failed to list jobs", "error", err)
@@ -47,7 +48,7 @@ func (jm *JobManager) processPendingJobs() {
 	}
 
 	for _, job := range jobs {
-		if job.Attempts >= 2 {
+		if job.Attempts >= conf.MaxAttempts {
 			slog.Warn("Worker: job exceeded max attempts. Moving to failed", "Job", job.ID)
 			jm.store.FailJob(job.ID)
 			continue
@@ -62,13 +63,17 @@ func (jm *JobManager) processPendingJobs() {
 }
 
 func (jm *JobManager) runJob(id, commandStr string) {
+	timeout_seconds := jm.store.GetConfig().TimeoutSec
 	slog.Info("Worker: Starting job", "id", id, "cmd", commandStr)
 
+	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout_seconds)*time.Second)
+	defer cancel()
+
 	active := &ActiveJob{}
 	jm.activeJobs.Store(id, active)
 	defer jm.activeJobs.Delete(id)
 
-	cmd := exec.Command("sh", "-c", commandStr)
+	cmd := exec.CommandContext(ctx, "sh", "-c", commandStr)
 
 	writer := &SafeBuffer{target: active}
 	cmd.Stdout = writer
diff --git a/newid.go b/newid.go
@@ -11,7 +11,7 @@ import (
 	"bazil.org/fuse"
 )
 
-var NewIdName = "get-id"
+var NewIdName = "new-id"
 
 // Helper function to get a 4 character ID that hasn'nt been used before
 func randomJobId(store *store.Store, characters int) []byte {
diff --git a/qj.1.scd b/qj.1.scd
@@ -14,15 +14,20 @@ qj is a simple job queue that operates using files by setting up a virtual
 file system on _MOUNTPOINT_.
 
 qj exposes 4 directories: *pending*, *active*, *failed*, *completed*. Files
-in the directories correspond to jobs. To create a job, simply add it to *pending*. 
-See examples.
+in the directories correspond to jobs. To create a job, simply add it to
+*pending*. Each job name must be unique globally across all four directories. 
+
+qj exposes 3 files: 
+
+	_new-id_ generate a unique id that can be used for job names
+	_config.json_ provides various settings such as max-jobs, timeouts. 
 
 # EXAMPLES
 
 Create a new job with $JOBID that calculates 1+1:
 
-	$ JOBID="Big Number $(head -c4 /dev/urandom | base64)"
-	$ echo "echo 1+1 | bc -s" >> "/var/qj/pending/$JOBID"
+	$ JOBID="$(cat /var/qj/new-id)"
+	$ printf "echo hello-world" >> "/var/qj/pending/$JOBID"
 
 Periodic jobs can be set up using _CRON(8)_.
 
diff --git a/store/config.go b/store/config.go
@@ -0,0 +1,25 @@
+package store
+
+type Config struct {
+	MaxAttempts int `json:"max_attempts"`
+	TimeoutSec  int `json:"timeout_sec"`
+}
+
+func (st *Store) UpdateConfig(conf Config) error {
+	query := `
+		UPDATE config
+		SET max_attempts = ?, timeout_sec = ?
+		WHERE id = 0
+		`
+	_, err := st.db.Exec(query, conf.MaxAttempts, conf.TimeoutSec)
+	return err
+}
+
+func (st *Store) GetConfig() (conf Config) {
+	query := `SELECT max_attempts, timeout_sec FROM config WHERE id = 0`
+	err := st.db.QueryRow(query).Scan(&conf.MaxAttempts, &conf.TimeoutSec)
+	if err != nil {
+		panic(err)
+	}
+	return conf
+}
diff --git a/store/store.go b/store/store.go
@@ -65,6 +65,12 @@ func (s *Store) initSchema() error {
 		created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
 		updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
 	);
+	CREATE TABLE IF NOT EXISTS config (
+		id INTEGER PRIMARY KEY CHECK (id = 0),
+		max_attempts INTEGER DEFAULT 3,
+		timeout_sec INTEGER DEFAULT 180
+	);
+	INSERT INTO config (id) VALUES (0) ON CONFLICT DO NOTHING;
 	CREATE TRIGGER IF NOT EXISTS auto_increment_inode_trigger
 		AFTER INSERT ON jobs
 		WHEN new.inode IS NULL