filed

Job queue using FUSE

git clone git://mccd.space/filed

commit bce898b76d81bd28fc76eb0716a1822d2f8558bb
parent dad5f8415e5ab7912e774f73844e0760d3a07bfa
Author: Marc Coquand <marc@coquand.email>
Date:   Sun, 14 Dec 2025 13:32:37 +0100

Working with retries

Diffstat:
Mmain.go | 172+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------
Amanager.go | 113+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mstore/store.go | 36+++++++++++++++++++++++++++++-------
Mstore/store_test.go | 4++--
4 files changed, 277 insertions(+), 48 deletions(-)
diff --git a/main.go b/main.go
@@ -5,6 +5,7 @@ import (
 	"context"
 	"flag"
 	"fmt"
+	"hash/fnv"
 	"log/slog"
 	"mccd/tbd/store"
 	"os"
@@ -33,7 +34,7 @@ func main() {
 	mountpoint := flag.Arg(0)
 	dbPath := flag.Arg(1)
 	if err := Unmount(mountpoint); err != nil {
-		slog.Info("FUSE: Pre-start unmount failed (this is usually okay)", "error", err)
+		slog.Debug("FUSE: Pre-start unmount failed (this is usually okay)", "error", err)
 	}
 
 	store, err := store.NewStore(dbPath)
@@ -44,8 +45,8 @@ func main() {
 	slog.Info("Mounting filesystem", "mountpoint", mountpoint)
 	c, err := fuse.Mount(
 		mountpoint,
-		fuse.FSName("helloworld"),
-		fuse.Subtype("hellofs"),
+		fuse.FSName("tbd"),
+		fuse.Subtype("tbdfs"),
 		fuse.AllowOther(),
 		fuse.DefaultPermissions(),
 	)
@@ -54,7 +55,12 @@ func main() {
 	}
 	defer c.Close()
 
-	err = fs.Serve(c, FS{store})
+	jobManager := NewJobManager(store)
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+	jobManager.StartWorker(ctx)
+
+	err = fs.Serve(c, FS{jobManager})
 	if err != nil {
 		panic(err)
 	}
@@ -62,15 +68,15 @@ func main() {
 }
 
 type FS struct {
-	store *store.Store
+	manager *JobManager
 }
 
 func (fs FS) Root() (fs.Node, error) {
-	return RootDir{fs.store}, nil
+	return RootDir{fs.manager}, nil
 }
 
 type RootDir struct {
-	store *store.Store
+	manager *JobManager
 }
 
 func (RootDir) Attr(ctx context.Context, a *fuse.Attr) error {
@@ -79,26 +85,26 @@ func (RootDir) Attr(ctx context.Context, a *fuse.Attr) error {
 }
 
 func (rd RootDir) Lookup(ctx context.Context, name string) (fs.Node, error) {
-	slog.Info("Lookup", "name", name)
+	slog.Info("FUSE: Lookup", "name", name)
 	switch name {
 	case "pending":
-		return PendingDir{store: rd.store}, nil
+		return PendingDir{manager: rd.manager}, nil
 	case "complete":
-		return JobDir{state: name, store: rd.store}, nil
+		return JobDir{state: name, manager: rd.manager, inode: 2}, nil
 	case "failed":
-		return JobDir{state: name, store: rd.store}, nil
+		return JobDir{state: name, manager: rd.manager, inode: 3}, nil
 	case "active":
-		return JobDir{state: name, store: rd.store}, nil
+		return JobDir{state: name, manager: rd.manager, inode: 4}, nil
 	default:
 		return nil, syscall.ENOENT
 	}
 }
 
 var rootEntries = []fuse.Dirent{
-	{Name: "pending", Type: fuse.DT_Dir},
-	{Name: "complete", Type: fuse.DT_Dir},
-	{Name: "failed", Type: fuse.DT_Dir},
-	{Name: "active", Type: fuse.DT_Dir},
+	{Name: store.StatePending, Type: fuse.DT_Dir},
+	{Name: store.StateCompleted, Type: fuse.DT_Dir},
+	{Name: store.StateFailed, Type: fuse.DT_Dir},
+	{Name: store.StateRunning, Type: fuse.DT_Dir},
 }
 
 func (RootDir) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
@@ -106,18 +112,21 @@ func (RootDir) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
 }
 
 type PendingDir struct {
-	store *store.Store
+	manager *JobManager
 }
 
 func (PendingDir) Attr(ctx context.Context, a *fuse.Attr) error {
-	a.Mode = os.ModeDir | 0o555
+	a.Mode = os.ModeDir | 0o775
+	a.Gid = uint32(os.Getgid())
+	a.Uid = uint32(os.Getuid())
+	a.Inode = 1
 	return nil
 }
 
 func (jd PendingDir) ReadDirAll(ctx context.Context) (entries []fuse.Dirent, err error) {
-	jobs, err := jd.store.ListJobsByState(store.StatePending)
+	jobs, err := jd.manager.store.ListJobsByState(store.StatePending)
 	if err != nil {
-		slog.Error("Could not find jobs", "error", err)
+		slog.Error("FUSE: Could not find jobs", "error", err)
 		return entries, nil
 	}
 	for _, job := range jobs {
@@ -130,7 +139,7 @@ func (d PendingDir) Create(ctx context.Context, req *fuse.CreateRequest, resp *f
 
 	f := &JobCreationFile{
 		id:    req.Name,
-		store: d.store,
+		store: d.manager.store,
 		uid:   req.Uid,
 		gid:   req.Gid,
 		mode:  req.Mode,
@@ -138,27 +147,65 @@ func (d PendingDir) Create(ctx context.Context, req *fuse.CreateRequest, resp *f
 	return f, f, nil
 }
 func (jd PendingDir) Lookup(ctx context.Context, name string) (fs.Node, error) {
-	slog.Info("Lookup", "name", name)
-	job, err := jd.store.GetJob(name)
+	slog.Info("FUSE: Lookup", "name", name)
+	job, err := jd.manager.store.GetJob(name)
 	if err != nil {
 		return nil, syscall.ENOENT
 	}
-	return &File{content: job.Command}, nil
+	if job.State == store.StatePending {
+		slog.Info("FUSE: Found job", "id", job.ID)
+		return &File{job, jd.manager}, nil
+	} else {
+		return nil, syscall.ENOENT
+	}
 }
 
 type File struct {
-	content string
+	job     *store.Job
+	manager *JobManager
 }
 
 func (f File) Attr(ctx context.Context, a *fuse.Attr) error {
-	a.Inode = 2
-	a.Mode = 0o444
-	a.Size = uint64(len(f.content))
+	a.Inode = InodeFromJobID(f.job.ID)
+	slog.Info("FUSE", "inode", a.Inode)
+	a.Mode = 0o775
+	a.Gid = uint32(os.Getgid())
+	a.Uid = uint32(os.Getuid())
+	res, err := f.readContent()
+	if err != nil {
+		a.Size = 0
+		return nil
+	}
+	a.Size = uint64(len(res))
 	return nil
 }
 
-func (f File) ReadAll(ctx context.Context) ([]byte, error) {
-	return []byte(f.content), nil
+func (f *File) ReadAll(ctx context.Context) ([]byte, error) {
+	slog.Info("FUSE: Read file content")
+	return f.readContent()
+}
+
+// Combines Command + Output (Live or DB)
+func (f *File) readContent() ([]byte, error) {
+	var output []byte
+
+	liveLog := f.manager.GetLiveLog(f.job.ID)
+
+	if liveLog != nil {
+		slog.Info("Using live output")
+		output = liveLog
+	} else {
+		slog.Info("Using job output")
+		output = f.job.Output
+	}
+
+	var buf bytes.Buffer
+	buf.WriteString(">>> ")
+	buf.WriteString(f.job.Command)
+	buf.WriteString("\n")
+	buf.Write(output)
+
+	return buf.Bytes(), nil
 }
 
 // Used before adding to the database. Lives in memory until the file is closed.
@@ -180,7 +227,6 @@ func (f *JobCreationFile) Attr(ctx context.Context, a *fuse.Attr) error {
 }
 
 func (f *JobCreationFile) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {
-	// Append data to our memory buffer
 	n, err := f.buf.Write(req.Data)
 	resp.Size = n
 	return err
@@ -193,24 +239,29 @@ func (f *JobCreationFile) Release(ctx context.Context, req *fuse.ReleaseRequest)
 
 	_, err := f.store.CreateJob(f.id, command)
 	if err != nil {
-		slog.Error("Failed to create job. Is the job name unique? I.E. no pending/completed/failed jobs with same name", "error", err)
+		slog.Error("Failed to create job. Is the job name unique? I.E. no pending/completed/failed jobs with same name.", "error", err)
 		return syscall.EIO
 	}
 	return nil
 }
 
 type JobDir struct {
-	state string
-	store *store.Store
+	state   string
+	manager *JobManager
+	inode   uint64
 }
 
-func (JobDir) Attr(ctx context.Context, a *fuse.Attr) error {
-	a.Mode = 0o444
+func (jd JobDir) Attr(ctx context.Context, a *fuse.Attr) error {
+	a.Mode = os.ModeDir | 0o775
+
+	a.Gid = uint32(os.Getgid())
+	a.Uid = uint32(os.Getuid())
+	a.Inode = jd.inode
 	return nil
 }
 
 func (jd JobDir) ReadDirAll(ctx context.Context) (entries []fuse.Dirent, err error) {
-	jobs, err := jd.store.ListJobsByState(jd.state)
+	jobs, err := jd.manager.store.ListJobsByState(jd.state)
 	if err != nil {
 		slog.Error("Could not find jobs", "error", err)
 		return entries, nil
@@ -221,9 +272,52 @@ func (jd JobDir) ReadDirAll(ctx context.Context) (entries []fuse.Dirent, err err
 	return entries, nil
 }
 
+func (d JobDir) Rename(ctx context.Context, req *fuse.RenameRequest, newDir fs.Node) error {
+	if d.state == store.StateRunning {
+		slog.Warn("Refusing to restart a currently running job", "id", req.OldName)
+		return syscall.EPERM
+	}
+
+	if _, ok := newDir.(PendingDir); !ok {
+		slog.Warn("Jobs can only be moved to 'pending' (to restart) or deleted", "dest", newDir)
+		return syscall.EPERM
+	}
+
+	if req.OldName != req.NewName {
+		slog.Warn("Renaming job IDs is not supported", "old", req.OldName, "new", req.NewName)
+		return syscall.EPERM
+	}
+
+	slog.Info("Restarting job", "id", req.OldName)
+
+	err := d.manager.store.ResetJob(req.OldName)
+	if err != nil {
+		slog.Error("Failed to restart job in DB", "error", err)
+		return syscall.ENOENT
+	}
+
+	return nil
+}
+
+func InodeFromJobID(id string) uint64 {
+	hash := fnv.New64a()
+	hash.Write([]byte(id))
+	return hash.Sum64()
+}
+
 func (jd JobDir) Lookup(ctx context.Context, name string) (fs.Node, error) {
-	slog.Info("Lookup", "name", name)
-	return nil, syscall.ENOENT
+	slog.Info("FUSE: Jobdir Lookup", "name", name)
+	job, err := jd.manager.store.GetJob(name)
+	if err != nil {
+		slog.Warn("FUSE: Not found", "name", name)
+		return nil, syscall.ENOENT
+	}
+	if job.State == jd.state {
+		slog.Info("FUSE: Found job", "id", job.ID)
+		return &File{job, jd.manager}, nil
+	} else {
+		return nil, syscall.ENOENT
+	}
 }
 
 func Unmount(dir string) error {
diff --git a/manager.go b/manager.go
@@ -0,0 +1,113 @@
+package main
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"log/slog"
+	"mccd/tbd/store"
+	"os/exec"
+	"sync"
+	"time"
+)
+
+type JobManager struct {
+	store      *store.Store
+	activeJobs sync.Map
+}
+
+type ActiveJob struct {
+	mu     sync.Mutex
+	output bytes.Buffer
+}
+
+func NewJobManager(s *store.Store) *JobManager {
+	return &JobManager{store: s}
+}
+
+func (jm *JobManager) StartWorker(ctx context.Context) {
+	ticker := time.NewTicker(1 * time.Second)
+	go func() {
+		for {
+			select {
+			case <-ctx.Done():
+				return
+			case <-ticker.C:
+				jm.processPendingJobs()
+			}
+		}
+	}()
+}
+
+func (jm *JobManager) processPendingJobs() {
+	jobs, err := jm.store.ListJobsByState(store.StatePending)
+	if err != nil {
+		slog.Error("Worker: Failed to list jobs", "error", err)
+		return
+	}
+
+	for _, job := range jobs {
+		if job.Attempts > 4 {
+			jm.store.FailJob(job.ID)
+		}
+		if err := jm.store.AttemptJob(job.ID); err != nil {
+			slog.Error("Worker: Failed to claim job", "id", job.ID, "error", err)
+			continue
+		}
+
+		go jm.runJob(job.ID, job.Command)
+	}
+}
+
+func (jm *JobManager) runJob(id, commandStr string) {
+	slog.Info("Worker: Starting job", "id", id, "cmd", commandStr)
+
+	active := &ActiveJob{}
+	jm.activeJobs.Store(id, active)
+	defer jm.activeJobs.Delete(id)
+
+	cmd := exec.Command("sh", "-c", commandStr)
+
+	writer := &SafeBuffer{target: active}
+	cmd.Stdout = writer
+	cmd.Stderr = writer
+
+	err := cmd.Run()
+
+	jobOutput := active.output.Bytes()
+
+	exitCode := cmd.ProcessState.ExitCode()
+
+	if err != nil || exitCode != 0 {
+		slog.Error("Worker: Job failed", "id", id, "error", err, "exitCode", exitCode)
+		errMsg := fmt.Sprintf("\n\n[System Error]: %v\n", err)
+		jobOutput = append(jobOutput, []byte(errMsg)...)
+		jm.store.RestartJob(id, jobOutput)
+	} else {
+		slog.Info("Worker: Job completed", "id", id, "Output", jobOutput, "exitCode", exitCode)
+		jm.store.CompleteJob(id, jobOutput)
+	}
+}
+
+type SafeBuffer struct {
+	target *ActiveJob
+}
+
+func (sb *SafeBuffer) Write(p []byte) (n int, err error) {
+	sb.target.mu.Lock()
+	defer sb.target.mu.Unlock()
+	return sb.target.output.Write(p)
+}
+
+func (jm *JobManager) GetLiveLog(id string) []byte {
+	val, ok := jm.activeJobs.Load(id)
+	if !ok {
+		return nil
+	}
+	job := val.(*ActiveJob)
+	job.mu.Lock()
+	defer job.mu.Unlock()
+	b := make([]byte, job.output.Len())
+	copy(b, job.output.Bytes())
+	return b
+}
diff --git a/store/store.go b/store/store.go
@@ -12,7 +12,7 @@ import (
 const (
 	StatePending   = "pending"
 	StateRunning   = "active"
-	StateCompleted = "completed"
+	StateCompleted = "complete"
 	StateFailed    = "failed"
 )
 
@@ -122,7 +122,7 @@ func (s *Store) DeleteJob(id string) error {
 }
 
 func (s *Store) ListJobsByState(state string) ([]Job, error) {
-	rows, err := s.db.Query("SELECT id, created_at,updated_at FROM jobs WHERE state = ?", state)
+	rows, err := s.db.Query("SELECT id, command, created_at,updated_at FROM jobs WHERE state = ?", state)
 	if err != nil {
 		return nil, err
 	}
@@ -131,7 +131,7 @@ func (s *Store) ListJobsByState(state string) ([]Job, error) {
 	var jobs []Job
 	for rows.Next() {
 		var j Job
-		if err := rows.Scan(&j.ID, &j.CreatedAt, &j.UpdatedAt); err != nil {
+		if err := rows.Scan(&j.ID, &j.Command, &j.CreatedAt, &j.UpdatedAt); err != nil {
 			return nil, err
 		}
 		j.State = state
@@ -168,13 +168,13 @@ func (s *Store) AttemptJob(id string) error {
 }
 
 // Moves a job back to 'pending'.
-func (s *Store) RestartJob(id string) error {
+func (s *Store) RestartJob(id string, output []byte) error {
 	query := `
 		UPDATE jobs 
-		SET state = ?, attempts = ?, updated_at = ?
+		SET state = ?, output = ?, updated_at = ?
 		WHERE id = ?
 	`
-	res, err := s.db.Exec(query, StatePending, 0, time.Now(), id)
+	res, err := s.db.Exec(query, output, StatePending, time.Now(), id)
 	if err != nil {
 		return err
 	}
@@ -185,11 +185,33 @@ func (s *Store) RestartJob(id string) error {
 	return nil
 }
 
-func (s *Store) CompleteJob(id string, output string) error {
+func (s *Store) ResetJob(id string) error {
+	query := `
+		UPDATE jobs 
+		state = ?, attempts = 0, updated_at = ?
+		WHERE id = ?
+	`
+	res, err := s.db.Exec(query, StatePending, time.Now(), id)
+	if err != nil {
+		return err
+	}
+	rows, _ := res.RowsAffected()
+	if rows == 0 {
+		return os.ErrNotExist
+	}
+	return nil
+}
+
+func (s *Store) CompleteJob(id string, output []byte) error {
 	_, err := s.db.Exec("UPDATE jobs SET state = ?, output = ?, updated_at = ? WHERE id = ?", StateCompleted, output, time.Now(), id)
 	return err
 }
 
+func (s *Store) WriteOutput(id string, output []byte) error {
+	_, err := s.db.Exec("UPDATE jobs SET output = ?, updated_at = ? WHERE id = ?", output, time.Now(), id)
+	return err
+}
+
 func (s *Store) FailJob(id string) error {
 	_, err := s.db.Exec("UPDATE jobs SET state = ?, updated_at = ? WHERE id = ?", StateFailed, time.Now(), id)
 	return err
diff --git a/store/store_test.go b/store/store_test.go
@@ -88,7 +88,7 @@ func TestLifecycleTransitions(t *testing.T) {
 	}
 
 	// 2. Running -> Completed
-	if err := s.CompleteJob(jobID, "hello"); err != nil {
+	if err := s.CompleteJob(jobID, []byte("hello")); err != nil {
 		t.Fatalf("CompleteJob failed: %v", err)
 	}
 
@@ -115,7 +115,7 @@ func TestListJobsByState(t *testing.T) {
 	s.CreateJob("p2", "cmd2")
 
 	s.CreateJob("c1", "cmd3")
-	s.CompleteJob("c1", "out")
+	s.CompleteJob("c1", []byte("out"))
 
 	// List Pending
 	pending, err := s.ListJobsByState(StatePending)