filed

Job queue using FUSE

git clone git://mccd.space/filed

commit 8cff7fd5a86082360056ffc4bfb8a8d2b3529c18
parent 9cae8bc18a6b75a7fc795b35340dd3b5eba04cc8
Author: Marc Coquand <marc@coquand.email>
Date:   Mon, 22 Dec 2025 10:54:09 +0100

Rearchitecture to use signals instead of polling technique

Diffstat:
Mjobdir.go | 2++
Mmain.go | 1-
Mmanager.go | 69+++++++++++++++++++++++++++++++++++++++++++++++++++++++--------------
Mpendingdir.go | 25+++++++++++++------------
Mstore/config.go | 16++++++++++++++--
Mstore/jobs.go | 9++++++++-
6 files changed, 92 insertions(+), 30 deletions(-)
diff --git a/jobdir.go b/jobdir.go
@@ -105,6 +105,8 @@ func (d JobDir) Rename(ctx context.Context, req *fuse.RenameRequest, newDir fs.N
 	slog.Info("Restarting job", "id", req.OldName)
 
 	err := d.manager.store.ResetJob(req.OldName)
+	d.manager.notify()
+
 	if err != nil {
 		slog.Error("Failed to restart job in DB", "error", err)
 		return syscall.ENOENT
diff --git a/main.go b/main.go
@@ -123,7 +123,6 @@ func main() {
 		roDirPathsForFiled := append(restrictions.roDir, "/proc")
 		rules = append(rules, landlock.RODirs(roDirPathsForFiled...))
 
-		// mountpoint might not technically be necessarily? Probably good to include either way...
 		rwPathsForFiled := append(restrictions.rwFiles, mountpoint)
 		rules = append(rules, landlock.RWDirs(rwPathsForFiled...))
 
diff --git a/manager.go b/manager.go
@@ -27,6 +27,7 @@ type JobManager struct {
 	// Command to use for executing jobs in pending
 	execCmd      string
 	restrictions *RestrictionsArg
+	notifyChan   chan struct{}
 }
 
 type ActiveJob struct {
@@ -46,7 +47,26 @@ func NewJobManager(s *store.Store, filedLaunchExecutablePath string, restriction
 	}
 	slog.Info("restrictions", "rest", argRestrictions)
 
-	return &JobManager{store: s, execCmd: filedLaunchExecutablePath, restrictions: &argRestrictions}
+	return &JobManager{
+		store:        s,
+		execCmd:      filedLaunchExecutablePath,
+		restrictions: &argRestrictions,
+		notifyChan:   make(chan struct{}, 1),
+	}
+}
+
+type signal struct{}
+
+// Notifies to rerun jobs
+//
+// Buffer has size 1, so if it's full, nothing will happen.
+// This means that for concurrent jobs, a signal can be sent
+// multiple times but only be processed once.
+func (jm *JobManager) notify() {
+	select {
+	case jm.notifyChan <- signal{}:
+	default:
+	}
 }
 
 func toArg(lst []string, argParam string) []string {
@@ -73,37 +93,47 @@ func (jm *JobManager) StartWorker(ctx context.Context) {
 		}
 	}
 
-	ticker := time.NewTicker(1 * time.Second)
 	go func() {
+		jm.notify()
 		for {
+			nextRetryIn := jm.processPendingJobs()
+			var timerChan <-chan time.Time
+			if nextRetryIn > 0 {
+				timerChan = time.After(nextRetryIn)
+			}
+
 			select {
 			case <-ctx.Done():
 				return
-			case <-ticker.C:
-				jm.processPendingJobs()
+			case <-jm.notifyChan:
+				// Wake up because a job was added or finished
+			case <-timerChan:
+				// Wake up because a backoff period ended
 			}
+
 		}
 	}()
 }
 
-func (jm *JobManager) processPendingJobs() {
+func (jm *JobManager) processPendingJobs() time.Duration {
+	var minWait time.Duration = 0
 	conf := jm.store.GetConfig()
 	activeJobs, err := jm.store.ListJobsByState(store.StateRunning)
 	if err != nil {
 		slog.Error("Worker: Failed to list jobs", "error", err)
-		return
+		return minWait
 	}
 	activeJobCount := len(activeJobs)
 
 	jobs, err := jm.store.ListJobsByState(store.StatePending)
 	if err != nil {
 		slog.Error("Worker: Failed to list jobs", "error", err)
-		return
+		return minWait
 	}
 
 	for _, job := range jobs {
 		if activeJobCount >= conf.MaxJobCount {
-			return
+			break
 		}
 
 		if job.Attempts >= conf.MaxAttempts {
@@ -111,7 +141,8 @@ func (jm *JobManager) processPendingJobs() {
 			jm.store.FailJob(job.ID)
 			continue
 		}
-		if jm.isReady(&conf, &job) {
+		ready, waitTime := jm.isReadyWithWait(&conf, &job)
+		if ready {
 			slog.Info("Worker: Starting job", "id", job.ID, "cmd", job.Command, "attempt", job.Attempts+1)
 			if err := jm.store.AttemptJob(job.ID); err != nil {
 				slog.Error("Worker: Failed to claim job", "id", job.ID, "error", err)
@@ -120,20 +151,30 @@ func (jm *JobManager) processPendingJobs() {
 
 			go jm.runJob(job.ID, job.Command)
 			activeJobCount++
+		} else {
+			if minWait == 0 || waitTime < minWait {
+				minWait = waitTime
+			}
 		}
 	}
+	return minWait
 }
 
-func (jm *JobManager) isReady(config *store.Config, job *store.Job) bool {
+func (jm *JobManager) isReadyWithWait(config *store.Config, job *store.Job) (bool, time.Duration) {
 	currentTime := time.Now().Unix()
 	lastModified := job.UpdatedAt.Unix()
-	multiplier := float64(config.BackoffMult)
-	waitTime := int64(float64(config.BackoffBase) * math.Pow(multiplier, float64(job.Attempts)))
-	readyAt := lastModified + waitTime
-	return currentTime >= readyAt
+	waitTimeSec := int64(float64(config.BackoffBase) * math.Pow(float64(config.BackoffMult), float64(job.Attempts)))
+	readyAt := lastModified + waitTimeSec
+
+	remaining := readyAt - currentTime
+	if remaining <= 0 {
+		return true, 0
+	}
+	return false, time.Duration(remaining) * time.Second
 }
 
 func (jm *JobManager) runJob(id, commandStr string) {
+	defer jm.notify()
 	timeout_seconds := jm.store.GetConfig().TimeoutSec
 
 	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout_seconds)*time.Second)
diff --git a/pendingdir.go b/pendingdir.go
@@ -72,11 +72,11 @@ func (pd PendingDir) Create(ctx context.Context, req *fuse.CreateRequest, resp *
 	slog.Info("Creating job file", "name", req.Name)
 
 	f := &JobCreationFile{
-		id:    req.Name,
-		store: pd.manager.store,
-		uid:   req.Uid,
-		gid:   req.Gid,
-		mode:  req.Mode,
+		id:      req.Name,
+		manager: pd.manager,
+		uid:     req.Uid,
+		gid:     req.Gid,
+		mode:    req.Mode,
 	}
 	return f, f, nil
 }
@@ -147,12 +147,12 @@ func (f *File) readContent() ([]byte, error) {
 
 // Used before adding to the database. Lives in memory until the file is closed.
 type JobCreationFile struct {
-	id    string
-	store *store.Store
-	buf   bytes.Buffer
-	uid   uint32
-	gid   uint32
-	mode  os.FileMode
+	id      string
+	manager *JobManager
+	buf     bytes.Buffer
+	uid     uint32
+	gid     uint32
+	mode    os.FileMode
 }
 
 func (f *JobCreationFile) Attr(ctx context.Context, a *fuse.Attr) error {
@@ -174,7 +174,8 @@ func (f *JobCreationFile) Release(ctx context.Context, req *fuse.ReleaseRequest)
 
 	slog.Info("New job", "id", f.id, "command", command)
 
-	_, err := f.store.CreateJob(f.id, command)
+	_, err := f.manager.store.CreateJob(f.id, command)
+	f.manager.notify()
 	if err != nil {
 		// XXX Should check specifically for file exist error
 		slog.Error("Failed to create job. Is the job name unique? I.E. no pending/completed/failed jobs with same name.", "error", err)
diff --git a/store/config.go b/store/config.go
@@ -16,14 +16,26 @@ func (st *Store) UpdateConfig(conf Config) error {
 		WHERE id = 0
 		`
 	_, err := st.db.Exec(query, conf.MaxAttempts, conf.TimeoutSec, conf.MaxJobCount, conf.BackoffMult, conf.BackoffBase)
+	st.mu.Lock()
+	st.config = conf
+	st.mu.Unlock()
 	return err
 }
 
-func (st *Store) GetConfig() (conf Config) {
+func (st *Store) PopulateConfig() {
+	conf := Config{}
 	query := `SELECT max_attempts, timeout_sec, max_job_count, backoff_mult, backoff_base FROM config WHERE id = 0`
 	err := st.db.QueryRow(query).Scan(&conf.MaxAttempts, &conf.TimeoutSec, &conf.MaxJobCount, &conf.BackoffMult, &conf.BackoffBase)
 	if err != nil {
 		panic(err)
 	}
-	return conf
+	st.mu.Lock()
+	st.config = conf
+	st.mu.Unlock()
+}
+
+func (st *Store) GetConfig() (conf Config) {
+	st.mu.RLock()
+	defer st.mu.RUnlock()
+	return st.config
 }
diff --git a/store/jobs.go b/store/jobs.go
@@ -4,6 +4,7 @@ import (
 	"database/sql"
 	"log/slog"
 	"os"
+	"sync"
 	"time"
 
 	_ "modernc.org/sqlite"
@@ -30,7 +31,9 @@ type Job struct {
 }
 
 type Store struct {
-	db *sql.DB
+	db     *sql.DB
+	mu     sync.RWMutex
+	config Config
 }
 
 func NewStore(filepath string) (*Store, error) {
@@ -40,6 +43,9 @@ func NewStore(filepath string) (*Store, error) {
 		return nil, err
 	}
 
+	// Otherwise there are strange race conditions
+	db.SetMaxOpenConns(1)
+
 	if _, err := db.Exec("PRAGMA journal_mode=WAL;PRAGMA busy_timeout=20000;"); err != nil {
 		return nil, err
 	}
@@ -49,6 +55,7 @@ func NewStore(filepath string) (*Store, error) {
 		db.Close()
 		return nil, err
 	}
+	s.PopulateConfig()
 
 	return s, nil
 }