filed

Job queue using FUSE

git clone git://mccd.space/filed

commit 137b406e800fecdcefd851bbc6f0f4f105bbd632
Author: Marc Coquand <marc@coquand.email>
Date:   Sat, 13 Dec 2025 17:22:08 +0100

Initial commit

Diffstat:
A.gitignore | 1+
Ago.mod | 18++++++++++++++++++
Ago.sum | 27+++++++++++++++++++++++++++
Amain.go | 7+++++++
Astore/store.go | 197+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Astore/store_test.go | 161+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
6 files changed, 411 insertions(+), 0 deletions(-)
diff --git a/.gitignore b/.gitignore
@@ -0,0 +1 @@
+tbd
diff --git a/go.mod b/go.mod
@@ -0,0 +1,18 @@
+module mccd/tbd
+
+go 1.24.4
+
+require (
+	bazil.org/fuse v0.0.0-20230120002735-62a210ff1fd5 // indirect
+	github.com/dustin/go-humanize v1.0.1 // indirect
+	github.com/google/uuid v1.6.0 // indirect
+	github.com/mattn/go-isatty v0.0.20 // indirect
+	github.com/ncruces/go-strftime v0.1.9 // indirect
+	github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
+	golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect
+	golang.org/x/sys v0.36.0 // indirect
+	modernc.org/libc v1.66.10 // indirect
+	modernc.org/mathutil v1.7.1 // indirect
+	modernc.org/memory v1.11.0 // indirect
+	modernc.org/sqlite v1.40.1 // indirect
+)
diff --git a/go.sum b/go.sum
@@ -0,0 +1,27 @@
+bazil.org/fuse v0.0.0-20230120002735-62a210ff1fd5 h1:A0NsYy4lDBZAC6QiYeJ4N+XuHIKBpyhAVRMHRQZKTeQ=
+bazil.org/fuse v0.0.0-20230120002735-62a210ff1fd5/go.mod h1:gG3RZAMXCa/OTes6rr9EwusmR1OH1tDDy+cg9c5YliY=
+github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
+github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
+github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
+github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
+github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
+github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
+github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
+github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
+github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
+golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o=
+golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8=
+golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18=
+golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k=
+golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
+modernc.org/libc v1.66.10 h1:yZkb3YeLx4oynyR+iUsXsybsX4Ubx7MQlSYEw4yj59A=
+modernc.org/libc v1.66.10/go.mod h1:8vGSEwvoUoltr4dlywvHqjtAqHBaw0j1jI7iFBTAr2I=
+modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU=
+modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg=
+modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI=
+modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw=
+modernc.org/sqlite v1.40.1 h1:VfuXcxcUWWKRBuP8+BR9L7VnmusMgBNNnBYGEe9w/iY=
+modernc.org/sqlite v1.40.1/go.mod h1:9fjQZ0mB1LLP0GYrp39oOJXx/I2sxEnZtzCmEQIKvGE=
diff --git a/main.go b/main.go
@@ -0,0 +1,7 @@
+package main
+
+import "fmt"
+
+func main() {
+    fmt.Println("Hello, World!")
+}
diff --git a/store/store.go b/store/store.go
@@ -0,0 +1,197 @@
+package store
+
+import (
+	"database/sql"
+	"os"
+	"time"
+
+	_ "modernc.org/sqlite"
+)
+
+const (
+	StatePending   = "pending"
+	StateRunning   = "running"
+	StateCompleted = "completed"
+	StateFailed    = "failed"
+)
+
+// A single unit of work
+type Job struct {
+	ID      string
+	State   string
+	Command string
+	// Command output on completion
+	Output    []byte
+	Attempts  int
+	CreatedAt time.Time
+	UpdatedAt time.Time
+}
+
+type Store struct {
+	db *sql.DB
+}
+
+func NewStore(filepath string) (*Store, error) {
+	db, err := sql.Open("sqlite", filepath)
+	if err != nil {
+		return nil, err
+	}
+
+	if _, err := db.Exec("PRAGMA journal_mode=WAL;"); err != nil {
+		return nil, err
+	}
+
+	s := &Store{db: db}
+	if err := s.initSchema(); err != nil {
+		db.Close()
+		return nil, err
+	}
+
+	return s, nil
+}
+
+func (s *Store) initSchema() error {
+	query := `
+	CREATE TABLE IF NOT EXISTS jobs (
+		id TEXT PRIMARY KEY,
+		state TEXT NOT NULL,
+		command TEXT NOT NULL,
+		output BLOB,
+		attempts INTEGER DEFAULT 0,
+		created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
+		updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
+	);
+	
+	CREATE INDEX IF NOT EXISTS idx_state ON jobs(state);
+	`
+	_, err := s.db.Exec(query)
+	return err
+}
+
+func (s *Store) Close() error {
+	return s.db.Close()
+}
+
+// Adds a new job with the 'pending' state.
+func (s *Store) CreateJob(id string, command string) (*Job, error) {
+	query := `
+		INSERT INTO jobs (id, state, command, created_at, updated_at)
+		VALUES (?, ?, ?, ?, ?)
+	`
+	now := time.Now()
+	j := &Job{
+		ID:        id,
+		State:     StatePending,
+		Command:   command,
+		CreatedAt: now,
+		UpdatedAt: now,
+		Attempts:  0,
+	}
+
+	_, err := s.db.Exec(query, id, StatePending, command, now, now)
+	if err != nil {
+		return nil, err
+	}
+	return j, nil
+}
+
+// GetJob retrieves a job by ID.
+// Used by FUSE Read/GetAttr operations.
+func (s *Store) GetJob(id string) (*Job, error) {
+	query := `SELECT id, state, command, output, attempts, created_at FROM jobs WHERE id = ?`
+	j := &Job{}
+	err := s.db.QueryRow(query, id).Scan(
+		&j.ID, &j.State, &j.Command, &j.Output, &j.Attempts, &j.CreatedAt,
+	)
+	if err == sql.ErrNoRows {
+		return nil, os.ErrNotExist
+	}
+	return j, err
+}
+
+// DeleteJob removes a job.
+func (s *Store) DeleteJob(id string) error {
+	res, err := s.db.Exec("DELETE FROM jobs WHERE id = ?", id)
+	if err != nil {
+		return err
+	}
+	rows, _ := res.RowsAffected()
+	if rows == 0 {
+		return os.ErrNotExist
+	}
+	return nil
+}
+
+func (s *Store) ListJobsByState(state string) ([]Job, error) {
+	rows, err := s.db.Query("SELECT id, created_at,updated_at FROM jobs WHERE state = ?", state)
+	if err != nil {
+		return nil, err
+	}
+	defer rows.Close()
+
+	var jobs []Job
+	for rows.Next() {
+		var j Job
+		if err := rows.Scan(&j.ID, &j.CreatedAt, &j.UpdatedAt); err != nil {
+			return nil, err
+		}
+		j.State = state
+		jobs = append(jobs, j)
+	}
+	return jobs, nil
+}
+
+// Picks up a pending job and moves it to 'running'.
+func (s *Store) AttemptJob(id string) error {
+	tx, err := s.db.Begin()
+	if err != nil {
+		return err
+	}
+	defer tx.Rollback()
+
+	var currentState string
+	err = tx.QueryRow("SELECT state FROM jobs WHERE id = ?", id).Scan(&currentState)
+	if err != nil {
+		return err
+	}
+
+	query := `
+		UPDATE jobs 
+		SET state = ?, attempts = attempts + 1, updated_at = ? 
+		WHERE id = ?
+	`
+	_, err = tx.Exec(query, StateRunning, time.Now(), id)
+	if err != nil {
+		return err
+	}
+
+	return tx.Commit()
+}
+
+// Moves a job back to 'pending'.
+func (s *Store) RestartJob(id string) error {
+	query := `
+		UPDATE jobs 
+		SET state = ?, attempts = ?, updated_at = ?
+		WHERE id = ?
+	`
+	res, err := s.db.Exec(query, StatePending, 0, time.Now(), id)
+	if err != nil {
+		return err
+	}
+	rows, _ := res.RowsAffected()
+	if rows == 0 {
+		return os.ErrNotExist
+	}
+	return nil
+}
+
+func (s *Store) CompleteJob(id string) error {
+	_, err := s.db.Exec("UPDATE jobs SET state = ?, updated_at = ? WHERE id = ?", StateCompleted, time.Now(), id)
+	return err
+}
+
+func (s *Store) FailJob(id string) error {
+	_, err := s.db.Exec("UPDATE jobs SET state = ?, updated_at = ? WHERE id = ?", StateFailed, time.Now(), id)
+	return err
+}
diff --git a/store/store_test.go b/store/store_test.go
@@ -0,0 +1,161 @@
+package store
+
+import (
+	"path/filepath"
+	"testing"
+)
+
+// setupTestDB creates a new store in a temporary directory for each test.
+func setupTestDB(t *testing.T) *Store {
+	t.Helper()
+	// Create a temporary directory that is automatically cleaned up
+	tmpDir := t.TempDir()
+	dbPath := filepath.Join(tmpDir, "jobs.db")
+
+	store, err := NewStore(dbPath)
+	if err != nil {
+		t.Fatalf("Failed to create store: %v", err)
+	}
+
+	// Ensure we close the store after the test finishes
+	t.Cleanup(func() {
+		store.Close()
+	})
+
+	return store
+}
+
+func TestCreateJob(t *testing.T) {
+	s := setupTestDB(t)
+
+	jobID := "job-123"
+	cmd := "echo hello"
+
+	// Test Creation
+	job, err := s.CreateJob(jobID, cmd)
+	if err != nil {
+		t.Fatalf("Failed to create job: %v", err)
+	}
+
+	if job.ID != jobID {
+		t.Errorf("Expected ID %s, got %s", jobID, job.ID)
+	}
+	if job.State != StatePending {
+		t.Errorf("Expected state %s, got %s", StatePending, job.State)
+	}
+	if job.Command != cmd {
+		t.Errorf("Expected command %s, got %s", cmd, job.Command)
+	}
+}
+
+func TestGetJob(t *testing.T) {
+	s := setupTestDB(t)
+	jobID := "test-job-get"
+
+	_, err := s.CreateJob(jobID, "sleep 1")
+	if err != nil {
+		t.Fatalf("CreateJob failed: %v", err)
+	}
+
+	// Retrieve it
+	job, err := s.GetJob(jobID)
+	if err != nil {
+		t.Fatalf("GetJob failed: %v", err)
+	}
+
+	if job.ID != jobID {
+		t.Errorf("Expected retrieved ID to match")
+	}
+}
+
+func TestLifecycleTransitions(t *testing.T) {
+	s := setupTestDB(t)
+	jobID := "lifecycle-1"
+
+	_, err := s.CreateJob(jobID, "ffmpeg -i ...")
+	if err != nil {
+		t.Fatalf("CreateJob failed: %v", err)
+	}
+
+	// 1. Pending -> Running (AttemptJob)
+	if err := s.AttemptJob(jobID); err != nil {
+		t.Fatalf("AttemptJob failed: %v", err)
+	}
+
+	j, _ := s.GetJob(jobID)
+	if j.State != StateRunning {
+		t.Errorf("Expected state Running, got %s", j.State)
+	}
+	if j.Attempts != 1 {
+		t.Errorf("Expected attempts 1, got %d", j.Attempts)
+	}
+
+	// 2. Running -> Completed
+	if err := s.CompleteJob(jobID); err != nil {
+		t.Fatalf("CompleteJob failed: %v", err)
+	}
+
+	j, _ = s.GetJob(jobID)
+	if j.State != StateCompleted {
+		t.Errorf("Expected state Completed, got %s", j.State)
+	}
+
+	// 3. Restart (Completed -> Pending)
+	if err := s.RestartJob(jobID); err != nil {
+		t.Fatalf("RestartJob failed: %v", err)
+	}
+
+	j, _ = s.GetJob(jobID)
+	if j.State != StatePending {
+		t.Errorf("Expected state Pending after restart, got %s", j.State)
+	}
+}
+
+func TestListJobsByState(t *testing.T) {
+	s := setupTestDB(t)
+
+	// Create 2 pending jobs and 1 completed job
+	s.CreateJob("p1", "cmd1")
+	s.CreateJob("p2", "cmd2")
+
+	s.CreateJob("c1", "cmd3")
+	s.CompleteJob("c1")
+
+	// List Pending
+	pending, err := s.ListJobsByState(StatePending)
+	if err != nil {
+		t.Fatalf("ListJobsByState failed: %v", err)
+	}
+
+	if len(pending) != 2 {
+		t.Errorf("Expected 2 pending jobs, got %d", len(pending))
+	}
+
+	// List Completed
+	completed, err := s.ListJobsByState(StateCompleted)
+	if err != nil {
+		t.Fatalf("ListJobsByState failed: %v", err)
+	}
+	if len(completed) != 1 {
+		t.Errorf("Expected 1 completed job, got %d", len(completed))
+	}
+	if completed[0].ID != "c1" {
+		t.Errorf("Expected completed job to be c1")
+	}
+}
+
+func TestDeleteJob(t *testing.T) {
+	s := setupTestDB(t)
+	jobID := "del-1"
+
+	s.CreateJob(jobID, "cmd")
+
+	if err := s.DeleteJob(jobID); err != nil {
+		t.Fatalf("DeleteJob failed: %v", err)
+	}
+
+	_, err := s.GetJob(jobID)
+	if err == nil {
+		t.Error("Expected error getting deleted job, got nil")
+	}
+}