filed

Job queue using FUSE

git clone git://mccd.space/filed

commit c783693c6b340a534aded9167a4ee1a70ec32c53
parent 4d71c9004e063a8779f42ab72eb3eb92f15a3538
Author: Marc Coquand <marc@coquand.email>
Date:   Sat, 13 Dec 2025 19:01:05 +0100

Add pending jobs

Diffstat:
AREADME.md | 13+++++++++++++
Mmain.go | 233++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
Mstore/store.go | 6++++--
3 files changed, 248 insertions(+), 4 deletions(-)
diff --git a/README.md b/README.md
@@ -0,0 +1,13 @@
+# tbd - Minimal job queue
+
+`tbd` is a simple job queue that is manipulated using files. It is great for single-server level workloads and features:
+
+- Retries
+- Output logs
+- Timeout
+- Job limits
+
+Other features are enabled by using built-in Unix features:
+
+- Periodic jobs using cron
+- Access rights using unix users
diff --git a/main.go b/main.go
@@ -1,7 +1,236 @@
 package main
 
-import "fmt"
+import (
+	"bytes"
+	"context"
+	"flag"
+	"fmt"
+	"log/slog"
+	"mccd/tbd/store"
+	"os"
+	"os/exec"
+	"runtime"
+	"syscall"
+
+	"bazil.org/fuse"
+	"bazil.org/fuse/fs"
+	_ "bazil.org/fuse/fs/fstestutil"
+)
+
+func usage() {
+	fmt.Fprintf(os.Stderr, "Usage: %s MOUNTPOINT DBPATH\n", os.Args[0])
+	flag.PrintDefaults()
+}
 
 func main() {
-    fmt.Println("Hello, World!")
+	flag.Usage = usage
+	flag.Parse()
+
+	if flag.NArg() != 2 {
+		usage()
+		os.Exit(2)
+	}
+	mountpoint := flag.Arg(0)
+	dbPath := flag.Arg(1)
+	if err := Unmount(mountpoint); err != nil {
+		slog.Debug("FUSE: Pre-start unmount failed (this is usually okay)", "error", err)
+	}
+
+	store, err := store.NewStore(dbPath)
+	if err != nil {
+		panic(err)
+	}
+
+	slog.Info("Mounting filesystem", "mountpoint", mountpoint)
+	c, err := fuse.Mount(
+		mountpoint,
+		fuse.FSName("helloworld"),
+		fuse.Subtype("hellofs"),
+		fuse.AllowOther(),
+		fuse.DefaultPermissions(),
+	)
+	if err != nil {
+		panic(err)
+	}
+	defer c.Close()
+
+	err = fs.Serve(c, FS{store})
+	if err != nil {
+		panic(err)
+	}
+
+}
+
+type FS struct {
+	store *store.Store
+}
+
+func (fs FS) Root() (fs.Node, error) {
+	return RootDir{fs.store}, nil
+}
+
+type RootDir struct {
+	store *store.Store
+}
+
+func (RootDir) Attr(ctx context.Context, a *fuse.Attr) error {
+	a.Mode = os.ModeDir | 0o555
+	return nil
+}
+
+func (rd RootDir) Lookup(ctx context.Context, name string) (fs.Node, error) {
+	slog.Info("Lookup", "name", name)
+	switch name {
+	case "pending":
+		return PendingDir{store: rd.store}, nil
+	case "complete":
+		return JobDir{state: name, store: rd.store}, nil
+	case "failed":
+		return JobDir{state: name, store: rd.store}, nil
+	case "active":
+		return JobDir{state: name, store: rd.store}, nil
+	default:
+		return nil, syscall.ENOENT
+	}
+}
+
+var rootEntries = []fuse.Dirent{
+	{Name: "pending", Type: fuse.DT_Dir},
+	{Name: "complete", Type: fuse.DT_Dir},
+	{Name: "failed", Type: fuse.DT_Dir},
+	{Name: "active", Type: fuse.DT_Dir},
+}
+
+func (RootDir) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
+	return rootEntries, nil
+}
+
+type PendingDir struct {
+	store *store.Store
+}
+
+func (PendingDir) Attr(ctx context.Context, a *fuse.Attr) error {
+	a.Mode = os.ModeDir | 0o555
+	return nil
+}
+
+func (jd PendingDir) ReadDirAll(ctx context.Context) (entries []fuse.Dirent, err error) {
+	jobs, err := jd.store.ListJobsByState(store.StatePending)
+	if err != nil {
+		slog.Error("Could not find jobs", "error", err)
+		return entries, nil
+	}
+	for _, job := range jobs {
+		entries = append(entries, fuse.Dirent{Name: job.ID, Type: fuse.DT_File})
+	}
+	return entries, nil
+}
+func (d PendingDir) Create(ctx context.Context, req *fuse.CreateRequest, resp *fuse.CreateResponse) (fs.Node, fs.Handle, error) {
+	slog.Info("Creating job file", "name", req.Name)
+
+	f := &JobCreationFile{
+		id:    req.Name,
+		store: d.store,
+		uid:   req.Uid,
+		gid:   req.Gid,
+		mode:  req.Mode,
+	}
+	return f, f, nil
+}
+func (jd PendingDir) Lookup(ctx context.Context, name string) (fs.Node, error) {
+	slog.Info("Lookup", "name", name)
+	job, err := jd.store.GetJob(name)
+	if err != nil {
+		return nil, syscall.ENOENT
+	}
+	return &File{content: job.Command}, nil
+}
+
+type File struct {
+	content string
+}
+
+func (f File) Attr(ctx context.Context, a *fuse.Attr) error {
+	a.Inode = 2
+	a.Mode = 0o444
+	a.Size = uint64(len(f.content))
+	return nil
+}
+
+func (f File) ReadAll(ctx context.Context) ([]byte, error) {
+	return []byte(f.content), nil
+}
+
+// Before adding to the database. Lives in memory until the file is closed.
+type JobCreationFile struct {
+	id    string
+	store *store.Store
+	buf   bytes.Buffer
+	uid   uint32
+	gid   uint32
+	mode  os.FileMode
+}
+
+func (f *JobCreationFile) Attr(ctx context.Context, a *fuse.Attr) error {
+	a.Mode = f.mode
+	a.Size = uint64(f.buf.Len())
+	a.Uid = f.uid
+	a.Gid = f.gid
+	return nil
+}
+
+func (f *JobCreationFile) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {
+	// Append data to our memory buffer
+	n, err := f.buf.Write(req.Data)
+	resp.Size = n
+	return err
+}
+
+func (f *JobCreationFile) Release(ctx context.Context, req *fuse.ReleaseRequest) error {
+	command := f.buf.String()
+
+	slog.Info("Committing new job", "id", f.id, "command", command)
+
+	_, err := f.store.CreateJob(f.id, command)
+	if err != nil {
+		slog.Error("Failed to create job", "error", err)
+		slog.Info("Each job has to have a unique id.")
+		return syscall.EIO
+	}
+	return nil
+}
+
+type JobDir struct {
+	state string
+	store *store.Store
+}
+
+func (JobDir) Attr(ctx context.Context, a *fuse.Attr) error {
+	a.Mode = 0o444
+	return nil
+}
+
+func (jd JobDir) ReadDirAll(ctx context.Context) (entries []fuse.Dirent, err error) {
+	jobs, err := jd.store.ListJobsByState(jd.state)
+	if err != nil {
+		slog.Error("Could not find jobs", "error", err)
+		return entries, nil
+	}
+	for _, job := range jobs {
+		entries = append(entries, fuse.Dirent{Name: job.ID, Type: fuse.DT_File})
+	}
+	return entries, nil
+}
+
+func (jd JobDir) Lookup(ctx context.Context, name string) (fs.Node, error) {
+	slog.Info("Lookup", "name", name)
+	return nil, syscall.ENOENT
+}
+
+func Unmount(dir string) error {
+	if runtime.GOOS == "linux" {
+		cmd := exec.Command("fusermount", "-u", "-z", dir)
+		return cmd.Run()
+	}
+	return fuse.Unmount(dir)
 }
diff --git a/store/store.go b/store/store.go
@@ -2,6 +2,7 @@ package store
 
 import (
 	"database/sql"
+	"log/slog"
 	"os"
 	"time"
 
@@ -10,7 +11,7 @@ import (
 
 const (
 	StatePending   = "pending"
-	StateRunning   = "running"
+	StateRunning   = "active"
 	StateCompleted = "completed"
 	StateFailed    = "failed"
 )
@@ -32,6 +33,7 @@ type Store struct {
 }
 
 func NewStore(filepath string) (*Store, error) {
+	slog.Info("STORE: Connecting", "filepath", filepath)
 	db, err := sql.Open("sqlite", filepath)
 	if err != nil {
 		return nil, err
@@ -138,7 +140,7 @@ func (s *Store) ListJobsByState(state string) ([]Job, error) {
 	return jobs, nil
 }
 
-// Picks up a pending job and moves it to 'running'.
+// Picks up a pending job and moves it to 'active'.
 func (s *Store) AttemptJob(id string) error {
 	tx, err := s.db.Begin()
 	if err != nil {