filed
Job queue using FUSE
git clone git://mccd.space/filed
| Log | Files | Refs | README | LICENSE |
commit d5a1e860aee4cc9b674836bc34b89d61c7c9cde2 parent 30f39f8223c0df6daa0f800b430b264c3bc08d70 Author: Marc Coquand <marc@coquand.email> Date: Mon, 15 Dec 2025 17:03:07 +0100 Clean up main Diffstat:
| M | README.md | | | 4 | ++-- |
| A | jobdir.go | | | 127 | +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ |
| M | main.go | | | 286 | ------------------------------------------------------------------------------- |
| A | pendingdir.go | | | 183 | +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ |
4 files changed, 312 insertions(+), 288 deletions(-)
diff --git a/README.md b/README.md
@@ -67,8 +67,8 @@ $ mv /tmp/qj-jobs/failed/1 /tmp/qj-jobs/pending
- [x] Support chmod and chown
- [x] State is configured via environment variable
- [ ] Customizable backoff and timeout before retries
-- [ ] Last modified and created at are correctly rendered
-- [ ] "Landlock"-mode, or sandboxed jobs
+- [x] Last modified and created at are correctly rendered for jobs
+- [ ] "Landlock"-mode, or sandboxed jobs - Requires a design
## Alternatives
diff --git a/jobdir.go b/jobdir.go
@@ -0,0 +1,127 @@
+package main
+
+import (
+ "context"
+ "log/slog"
+ "os"
+ "os/exec"
+ "qj/store"
+ "runtime"
+ "syscall"
+
+ "bazil.org/fuse"
+ "bazil.org/fuse/fs"
+)
+
+type JobDir struct {
+ state string
+ manager *JobManager
+ inode uint64
+}
+
+func (jd JobDir) Attr(ctx context.Context, a *fuse.Attr) error {
+ fileMeta, err := jd.manager.store.GetFileMeta(jd.inode)
+ if err != nil {
+ slog.Error("Could not retrieve file metadata", "error", err)
+ return syscall.EIO
+ } else if fileMeta != nil {
+ a.Mode = os.FileMode(fileMeta.Mode)
+ a.Gid = fileMeta.GID
+ a.Uid = fileMeta.UID
+ a.Inode = jd.inode
+
+ } else {
+ a.Mode = os.ModeDir | 0o750
+ a.Gid = uint32(os.Getgid())
+ a.Uid = uint32(os.Getuid())
+ a.Inode = jd.inode
+ }
+ return nil
+}
+
+func (jd JobDir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
+ slog.Warn("FUSE: Changing access permissions")
+ defaultMode, err := jd.manager.store.GetFileMeta(jd.inode)
+ if defaultMode == nil {
+ defaultMode = &store.FileMeta{
+ Inode: jd.inode,
+ Mode: uint32(os.ModeDir | 0o750),
+ UID: uint32(os.Getgid()),
+ GID: uint32(os.Getuid()),
+ }
+
+ }
+ err = Setattr(jd.manager.store, defaultMode, ctx, req, resp)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (jd JobDir) ReadDirAll(ctx context.Context) (entries []fuse.Dirent, err error) {
+ jobs, err := jd.manager.store.ListJobsByState(jd.state)
+ if err != nil {
+ slog.Error("Could not find jobs", "error", err)
+ return entries, nil
+ }
+ for _, job := range jobs {
+ entries = append(entries, fuse.Dirent{Name: job.ID, Type: fuse.DT_File})
+ }
+ return entries, nil
+}
+
+func (jd JobDir) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
+ slog.Info("Removing job", "id", req.Name)
+ return jd.manager.store.DeleteJob(req.Name)
+}
+
+func (d JobDir) Rename(ctx context.Context, req *fuse.RenameRequest, newDir fs.Node) error {
+ if d.state == store.StateRunning {
+ slog.Warn("Refusing to restart a currently running job", "id", req.OldName)
+ return syscall.EPERM
+ }
+
+ if _, ok := newDir.(PendingDir); !ok {
+ slog.Warn("Jobs can only be moved to 'pending' (to restart) or deleted", "dest", newDir)
+ return syscall.EPERM
+ }
+
+ if req.OldName != req.NewName {
+ slog.Warn("Renaming job IDs is not supported", "old", req.OldName, "new", req.NewName)
+ return syscall.EPERM
+ }
+
+ slog.Info("Restarting job", "id", req.OldName)
+
+ err := d.manager.store.ResetJob(req.OldName)
+ if err != nil {
+ slog.Error("Failed to restart job in DB", "error", err)
+ return syscall.ENOENT
+ }
+
+ return nil
+}
+
+func (jd JobDir) Lookup(ctx context.Context, name string) (fs.Node, error) {
+ slog.Info("FUSE: Jobdir Lookup", "name", name)
+ job, err := jd.manager.store.GetJob(name)
+ if err != nil {
+ slog.Warn("FUSE: Not found", "name", name)
+ return nil, syscall.ENOENT
+ }
+ if job.State == jd.state {
+ slog.Info("FUSE: Found job", "id", job.ID)
+ return &File{job, jd.manager}, nil
+ } else {
+ return nil, syscall.ENOENT
+ }
+}
+
+func Unmount(dir string) error {
+ if runtime.GOOS == "linux" {
+ cmd := exec.Command("fusermount", "-u", "-z", dir)
+ return cmd.Run()
+ }
+ return fuse.Unmount(dir)
+}
diff --git a/main.go b/main.go
@@ -1,15 +1,12 @@
package main
import (
- "bytes"
"context"
"flag"
"fmt"
"log/slog"
"os"
- "os/exec"
"qj/store"
- "runtime"
"syscall"
"bazil.org/fuse"
@@ -130,286 +127,3 @@ var rootEntries = []fuse.Dirent{
func (RootDir) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
return rootEntries, nil
}
-
-type PendingDir struct {
- manager *JobManager
- inode uint64
-}
-
-func (pd PendingDir) Attr(ctx context.Context, a *fuse.Attr) error {
- fileMeta, err := pd.manager.store.GetFileMeta(pd.inode)
- if err != nil {
- slog.Error("Could not retrieve file metadata", "error", err)
- return syscall.EIO
- } else if fileMeta != nil {
- a.Mode = os.FileMode(fileMeta.Mode)
- a.Gid = fileMeta.GID
- a.Uid = fileMeta.UID
- a.Inode = pd.inode
-
- } else {
- a.Mode = os.ModeDir | 0o750
- a.Gid = uint32(os.Getgid())
- a.Uid = uint32(os.Getuid())
- a.Inode = pd.inode
- }
- return nil
-}
-
-func (pd *PendingDir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
- slog.Warn("FUSE: Changing access permissions")
- defaultMode, err := pd.manager.store.GetFileMeta(pd.inode)
- if defaultMode == nil {
- defaultMode = &store.FileMeta{
- Inode: pd.inode,
- Mode: uint32(os.ModeDir | 0o750),
- UID: uint32(os.Getgid()),
- GID: uint32(os.Getuid()),
- }
-
- }
- err = Setattr(pd.manager.store, defaultMode, ctx, req, resp)
- if err != nil {
- return err
- }
-
- return nil
-}
-
-func (pd PendingDir) ReadDirAll(ctx context.Context) (entries []fuse.Dirent, err error) {
- jobs, err := pd.manager.store.ListJobsByState(store.StatePending)
- if err != nil {
- slog.Error("FUSE: Could not find jobs", "error", err)
- return entries, nil
- }
- for _, job := range jobs {
- entries = append(entries, fuse.Dirent{Name: job.ID, Type: fuse.DT_File})
- }
- return entries, nil
-}
-func (pd PendingDir) Create(ctx context.Context, req *fuse.CreateRequest, resp *fuse.CreateResponse) (fs.Node, fs.Handle, error) {
- slog.Info("Creating job file", "name", req.Name)
-
- f := &JobCreationFile{
- id: req.Name,
- store: pd.manager.store,
- uid: req.Uid,
- gid: req.Gid,
- mode: req.Mode,
- }
- return f, f, nil
-}
-func (pd PendingDir) Lookup(ctx context.Context, name string) (fs.Node, error) {
- slog.Debug("FUSE: Lookup", "name", name)
- job, err := pd.manager.store.GetJob(name)
- if err != nil {
- return nil, syscall.ENOENT
- }
- if job.State == store.StatePending {
- slog.Debug("FUSE: Found job", "id", job.ID)
- return &File{job, pd.manager}, nil
- } else {
- return nil, syscall.ENOENT
- }
-}
-
-type File struct {
- job *store.Job
- manager *JobManager
-}
-
-func (f File) Attr(ctx context.Context, a *fuse.Attr) error {
- // Append 20 to avoid collission with static files
- a.Inode = uint64(f.job.INode + 20)
- slog.Info("FUSE", "inode", a.Inode)
- a.Mode = 0o775
- a.Gid = uint32(os.Getgid())
- a.Uid = uint32(os.Getuid())
- a.Mtime = f.job.UpdatedAt
- a.Ctime = f.job.CreatedAt
- res, err := f.readContent()
- if err != nil {
- a.Size = 0
- return nil
- }
- a.Size = uint64(len(res))
- return nil
-}
-
-func (f *File) ReadAll(ctx context.Context) ([]byte, error) {
- slog.Debug("FUSE: Read file content")
- return f.readContent()
-}
-
-// Combines Command + Output (Live or DB)
-func (f *File) readContent() ([]byte, error) {
- var output []byte
-
- liveLog := f.manager.GetLiveLog(f.job.ID)
-
- if liveLog != nil {
- slog.Info("Using live output")
- output = liveLog
- } else {
- slog.Info("Using job output")
- output = f.job.Output
- }
-
- var buf bytes.Buffer
- buf.WriteString(">>> ")
- buf.WriteString(f.job.Command)
- buf.WriteString("\n")
- buf.Write(output)
-
- return buf.Bytes(), nil
-}
-
-// Used before adding to the database. Lives in memory until the file is closed.
-type JobCreationFile struct {
- id string
- store *store.Store
- buf bytes.Buffer
- uid uint32
- gid uint32
- mode os.FileMode
-}
-
-func (f *JobCreationFile) Attr(ctx context.Context, a *fuse.Attr) error {
- a.Mode = f.mode
- a.Size = uint64(f.buf.Len())
- a.Uid = f.uid
- a.Gid = f.gid
- return nil
-}
-
-func (f *JobCreationFile) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {
- n, err := f.buf.Write(req.Data)
- resp.Size = n
- return err
-}
-
-func (f *JobCreationFile) Release(ctx context.Context, req *fuse.ReleaseRequest) error {
- command := f.buf.String()
-
- slog.Info("New job", "id", f.id, "command", command)
-
- _, err := f.store.CreateJob(f.id, command)
- if err != nil {
- slog.Error("Failed to create job. Is the job name unique? I.E. no pending/completed/failed jobs with same name.", "error", err)
- return syscall.EIO
- }
- return nil
-}
-
-type JobDir struct {
- state string
- manager *JobManager
- inode uint64
-}
-
-func (jd JobDir) Attr(ctx context.Context, a *fuse.Attr) error {
- fileMeta, err := jd.manager.store.GetFileMeta(jd.inode)
- if err != nil {
- slog.Error("Could not retrieve file metadata", "error", err)
- return syscall.EIO
- } else if fileMeta != nil {
- a.Mode = os.FileMode(fileMeta.Mode)
- a.Gid = fileMeta.GID
- a.Uid = fileMeta.UID
- a.Inode = jd.inode
-
- } else {
- a.Mode = os.ModeDir | 0o750
- a.Gid = uint32(os.Getgid())
- a.Uid = uint32(os.Getuid())
- a.Inode = jd.inode
- }
- return nil
-}
-
-func (jd JobDir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
- slog.Warn("FUSE: Changing access permissions")
- defaultMode, err := jd.manager.store.GetFileMeta(jd.inode)
- if defaultMode == nil {
- defaultMode = &store.FileMeta{
- Inode: jd.inode,
- Mode: uint32(os.ModeDir | 0o750),
- UID: uint32(os.Getgid()),
- GID: uint32(os.Getuid()),
- }
-
- }
- err = Setattr(jd.manager.store, defaultMode, ctx, req, resp)
- if err != nil {
- return err
- }
-
- return nil
-}
-
-func (jd JobDir) ReadDirAll(ctx context.Context) (entries []fuse.Dirent, err error) {
- jobs, err := jd.manager.store.ListJobsByState(jd.state)
- if err != nil {
- slog.Error("Could not find jobs", "error", err)
- return entries, nil
- }
- for _, job := range jobs {
- entries = append(entries, fuse.Dirent{Name: job.ID, Type: fuse.DT_File})
- }
- return entries, nil
-}
-
-func (jd JobDir) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
- slog.Info("Removing job", "id", req.Name)
- return jd.manager.store.DeleteJob(req.Name)
-}
-
-func (d JobDir) Rename(ctx context.Context, req *fuse.RenameRequest, newDir fs.Node) error {
- if d.state == store.StateRunning {
- slog.Warn("Refusing to restart a currently running job", "id", req.OldName)
- return syscall.EPERM
- }
-
- if _, ok := newDir.(PendingDir); !ok {
- slog.Warn("Jobs can only be moved to 'pending' (to restart) or deleted", "dest", newDir)
- return syscall.EPERM
- }
-
- if req.OldName != req.NewName {
- slog.Warn("Renaming job IDs is not supported", "old", req.OldName, "new", req.NewName)
- return syscall.EPERM
- }
-
- slog.Info("Restarting job", "id", req.OldName)
-
- err := d.manager.store.ResetJob(req.OldName)
- if err != nil {
- slog.Error("Failed to restart job in DB", "error", err)
- return syscall.ENOENT
- }
-
- return nil
-}
-
-func (jd JobDir) Lookup(ctx context.Context, name string) (fs.Node, error) {
- slog.Info("FUSE: Jobdir Lookup", "name", name)
- job, err := jd.manager.store.GetJob(name)
- if err != nil {
- slog.Warn("FUSE: Not found", "name", name)
- return nil, syscall.ENOENT
- }
- if job.State == jd.state {
- slog.Info("FUSE: Found job", "id", job.ID)
- return &File{job, jd.manager}, nil
- } else {
- return nil, syscall.ENOENT
- }
-}
-
-func Unmount(dir string) error {
- if runtime.GOOS == "linux" {
- cmd := exec.Command("fusermount", "-u", "-z", dir)
- return cmd.Run()
- }
- return fuse.Unmount(dir)
-}
diff --git a/pendingdir.go b/pendingdir.go
@@ -0,0 +1,183 @@
+package main
+
+import (
+ "bytes"
+ "context"
+ "log/slog"
+ "os"
+ "qj/store"
+ "syscall"
+
+ "bazil.org/fuse"
+ "bazil.org/fuse/fs"
+)
+
+type PendingDir struct {
+ manager *JobManager
+ inode uint64
+}
+
+func (pd PendingDir) Attr(ctx context.Context, a *fuse.Attr) error {
+ fileMeta, err := pd.manager.store.GetFileMeta(pd.inode)
+ if err != nil {
+ slog.Error("Could not retrieve file metadata", "error", err)
+ return syscall.EIO
+ } else if fileMeta != nil {
+ a.Mode = os.FileMode(fileMeta.Mode)
+ a.Gid = fileMeta.GID
+ a.Uid = fileMeta.UID
+ a.Inode = pd.inode
+
+ } else {
+ a.Mode = os.ModeDir | 0o750
+ a.Gid = uint32(os.Getgid())
+ a.Uid = uint32(os.Getuid())
+ a.Inode = pd.inode
+ }
+ return nil
+}
+
+func (pd *PendingDir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
+ slog.Warn("FUSE: Changing access permissions")
+ defaultMode, err := pd.manager.store.GetFileMeta(pd.inode)
+ if defaultMode == nil {
+ defaultMode = &store.FileMeta{
+ Inode: pd.inode,
+ Mode: uint32(os.ModeDir | 0o750),
+ UID: uint32(os.Getgid()),
+ GID: uint32(os.Getuid()),
+ }
+
+ }
+ err = Setattr(pd.manager.store, defaultMode, ctx, req, resp)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (pd PendingDir) ReadDirAll(ctx context.Context) (entries []fuse.Dirent, err error) {
+ jobs, err := pd.manager.store.ListJobsByState(store.StatePending)
+ if err != nil {
+ slog.Error("FUSE: Could not find jobs", "error", err)
+ return entries, nil
+ }
+ for _, job := range jobs {
+ entries = append(entries, fuse.Dirent{Name: job.ID, Type: fuse.DT_File})
+ }
+ return entries, nil
+}
+func (pd PendingDir) Create(ctx context.Context, req *fuse.CreateRequest, resp *fuse.CreateResponse) (fs.Node, fs.Handle, error) {
+ slog.Info("Creating job file", "name", req.Name)
+
+ f := &JobCreationFile{
+ id: req.Name,
+ store: pd.manager.store,
+ uid: req.Uid,
+ gid: req.Gid,
+ mode: req.Mode,
+ }
+ return f, f, nil
+}
+func (pd PendingDir) Lookup(ctx context.Context, name string) (fs.Node, error) {
+ slog.Debug("FUSE: Lookup", "name", name)
+ job, err := pd.manager.store.GetJob(name)
+ if err != nil {
+ return nil, syscall.ENOENT
+ }
+ if job.State == store.StatePending {
+ slog.Debug("FUSE: Found job", "id", job.ID)
+ return &File{job, pd.manager}, nil
+ } else {
+ return nil, syscall.ENOENT
+ }
+}
+
+type File struct {
+ job *store.Job
+ manager *JobManager
+}
+
+func (f File) Attr(ctx context.Context, a *fuse.Attr) error {
+ // Append 20 to avoid collission with static files
+ a.Inode = uint64(f.job.INode + 20)
+ slog.Info("FUSE", "inode", a.Inode)
+ a.Mode = 0o775
+ a.Gid = uint32(os.Getgid())
+ a.Uid = uint32(os.Getuid())
+ a.Mtime = f.job.UpdatedAt
+ a.Ctime = f.job.CreatedAt
+ res, err := f.readContent()
+ if err != nil {
+ a.Size = 0
+ return nil
+ }
+ a.Size = uint64(len(res))
+ return nil
+}
+
+func (f *File) ReadAll(ctx context.Context) ([]byte, error) {
+ slog.Debug("FUSE: Read file content")
+ return f.readContent()
+}
+
+// Combines Command + Output (Live or DB)
+func (f *File) readContent() ([]byte, error) {
+ var output []byte
+
+ liveLog := f.manager.GetLiveLog(f.job.ID)
+
+ if liveLog != nil {
+ slog.Info("Using live output")
+ output = liveLog
+ } else {
+ slog.Info("Using job output")
+ output = f.job.Output
+ }
+
+ var buf bytes.Buffer
+ buf.WriteString(">>> ")
+ buf.WriteString(f.job.Command)
+ buf.WriteString("\n")
+ buf.Write(output)
+
+ return buf.Bytes(), nil
+}
+
+// Used before adding to the database. Lives in memory until the file is closed.
+type JobCreationFile struct {
+ id string
+ store *store.Store
+ buf bytes.Buffer
+ uid uint32
+ gid uint32
+ mode os.FileMode
+}
+
+func (f *JobCreationFile) Attr(ctx context.Context, a *fuse.Attr) error {
+ a.Mode = f.mode
+ a.Size = uint64(f.buf.Len())
+ a.Uid = f.uid
+ a.Gid = f.gid
+ return nil
+}
+
+func (f *JobCreationFile) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {
+ n, err := f.buf.Write(req.Data)
+ resp.Size = n
+ return err
+}
+
+func (f *JobCreationFile) Release(ctx context.Context, req *fuse.ReleaseRequest) error {
+ command := f.buf.String()
+
+ slog.Info("New job", "id", f.id, "command", command)
+
+ _, err := f.store.CreateJob(f.id, command)
+ if err != nil {
+ slog.Error("Failed to create job. Is the job name unique? I.E. no pending/completed/failed jobs with same name.", "error", err)
+ return syscall.EIO
+ }
+ return nil
+}