filed
Job queue using FUSE
git clone git://mccd.space/filed
| Log | Files | Refs | README | LICENSE |
commit f45a9a53e03cefb6c24493d4e12f9ea86070bd18 parent 1cf873b22d027eb29c636fc84ce2b94c90b310a0 Author: Marc Coquand <marc@coquand.email> Date: Sun, 14 Dec 2025 14:50:22 +0100 Support generating new ids from a file Diffstat:
| M | README.md | | | 30 | +++++++++++++++++++++++++++++- |
| M | main.go | | | 26 | ++++++++++++++------------ |
| M | manager.go | | | 5 | ++--- |
| A | newid.go | | | 49 | +++++++++++++++++++++++++++++++++++++++++++++++++ |
4 files changed, 94 insertions(+), 16 deletions(-)
diff --git a/README.md b/README.md
@@ -22,6 +22,34 @@ cd tbd
go build
```
-## Usage
+## Quick start
+### Creating a job
+```
+$ # In terminal one
+$ mkdir /tmp/tbd-jobs
+$ tbd /tmp/tbd-jobs /tmp/tbd-state.db
+```
+
+In terminal two:
+
+```
+$ printf "echo 'Running job'" >> /tmp/tbd-jobs/pending/1
+```
+
+Note: Each job name must be unique. See man pages for examples to generate an ID.
+
+### Restarting a job
+
+By default, a job retries 3 times.
+
+```
+$ mv /tmp/tbd-jobs/failed/1 /tmp/tbd-jobs/pending
+```
+
+### Inspect job logs
+
+```
+$ cat /tmp/tbd-jobs/active/1
+```
diff --git a/main.go b/main.go
@@ -5,7 +5,6 @@ import (
"context"
"flag"
"fmt"
- "hash/fnv"
"log/slog"
"mccd/tbd/store"
"os"
@@ -87,14 +86,16 @@ func (RootDir) Attr(ctx context.Context, a *fuse.Attr) error {
func (rd RootDir) Lookup(ctx context.Context, name string) (fs.Node, error) {
slog.Info("FUSE: Lookup", "name", name)
switch name {
- case "pending":
+ case store.StatePending:
return PendingDir{manager: rd.manager}, nil
- case "complete":
+ case store.StateCompleted:
return JobDir{state: name, manager: rd.manager, inode: 3}, nil
- case "failed":
+ case store.StateFailed:
return JobDir{state: name, manager: rd.manager, inode: 4}, nil
- case "active":
+ case store.StateRunning:
return JobDir{state: name, manager: rd.manager, inode: 5}, nil
+ case NewIdName:
+ return &NewIdFile{manager: rd.manager, inode: 6}, nil
default:
return nil, syscall.ENOENT
}
@@ -105,6 +106,7 @@ var rootEntries = []fuse.Dirent{
{Name: store.StateCompleted, Type: fuse.DT_Dir},
{Name: store.StateFailed, Type: fuse.DT_Dir},
{Name: store.StateRunning, Type: fuse.DT_Dir},
+ {Name: NewIdName, Type: fuse.DT_File},
}
func (RootDir) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
@@ -166,7 +168,8 @@ type File struct {
}
func (f File) Attr(ctx context.Context, a *fuse.Attr) error {
- a.Inode = InodeFromJobID(f.job.ID)
+ // Append 20 to avoid collission with static files
+ a.Inode = uint64(f.job.INode + 20)
slog.Info("FUSE", "inode", a.Inode)
a.Mode = 0o775
a.Gid = uint32(os.Getgid())
@@ -272,6 +275,11 @@ func (jd JobDir) ReadDirAll(ctx context.Context) (entries []fuse.Dirent, err err
return entries, nil
}
+func (jd JobDir) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
+ slog.Info("Removing job", "id", req.Name)
+ return jd.manager.store.DeleteJob(req.Name)
+}
+
func (d JobDir) Rename(ctx context.Context, req *fuse.RenameRequest, newDir fs.Node) error {
if d.state == store.StateRunning {
slog.Warn("Refusing to restart a currently running job", "id", req.OldName)
@@ -299,12 +307,6 @@ func (d JobDir) Rename(ctx context.Context, req *fuse.RenameRequest, newDir fs.N
return nil
}
-func InodeFromJobID(id string) uint64 {
- hash := fnv.New64a()
- hash.Write([]byte(id))
- return hash.Sum64()
-}
-
func (jd JobDir) Lookup(ctx context.Context, name string) (fs.Node, error) {
slog.Info("FUSE: Jobdir Lookup", "name", name)
job, err := jd.manager.store.GetJob(name)
diff --git a/manager.go b/manager.go
@@ -47,9 +47,8 @@ func (jm *JobManager) processPendingJobs() {
}
for _, job := range jobs {
- slog.Info("Worker", "j", job)
if job.Attempts >= 2 {
- slog.Warn("WORK job exceeded failure rate.", "Job", job.ID)
+ slog.Warn("Worker: job exceeded max attempts. Moving to failed", "Job", job.ID)
jm.store.FailJob(job.ID)
continue
}
@@ -82,7 +81,7 @@ func (jm *JobManager) runJob(id, commandStr string) {
exitCode := cmd.ProcessState.ExitCode()
if err != nil || exitCode != 0 {
- slog.Error("Worker: Job failed", "id", id, "error", err, "exitCode", exitCode)
+ slog.Warn("Worker: Job failed", "id", id, "error", err, "exitCode", exitCode)
errMsg := fmt.Sprintf("\n\n[System Error]: %v\n", err)
jobOutput = append(jobOutput, []byte(errMsg)...)
jm.store.RestartJob(id, jobOutput)
diff --git a/newid.go b/newid.go
@@ -0,0 +1,49 @@
+package main
+
+import (
+ "context"
+ "log/slog"
+ "math/rand"
+ "mccd/tbd/store"
+ "os"
+ "time"
+
+ "bazil.org/fuse"
+)
+
+var NewIdName = "get-id"
+
+// Helper function to get a 4 character ID that hasn'nt been used before
+func randomJobId(store *store.Store, characters int) []byte {
+ const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
+ rand.New(rand.NewSource(time.Now().UnixNano()))
+ result := make([]byte, characters)
+ for i := range result {
+ result[i] = charset[rand.Intn(len(charset))]
+ }
+ _, err := store.GetJob(string(result))
+ if err == os.ErrNotExist {
+ return result
+ } else {
+ // In the unlikely event
+ return randomJobId(store, characters+1)
+ }
+}
+
+type NewIdFile struct {
+ manager *JobManager
+ inode uint64
+}
+
+func (f NewIdFile) Attr(ctx context.Context, a *fuse.Attr) error {
+ a.Mode = 0o444
+ a.Gid = uint32(os.Getgid())
+ a.Uid = uint32(os.Getuid())
+ a.Inode = f.inode
+ a.Size = uint64(4)
+ return nil
+}
+func (f *NewIdFile) ReadAll(ctx context.Context) ([]byte, error) {
+ slog.Info("FUSE: Read file content")
+ return randomJobId(f.manager.store, 4), nil
+}