filed
Job queue using FUSE
git clone git://mccd.space/filed
| Log | Files | Refs | README | LICENSE |
jobdir.go (3412B)
1 package main
2
3 import (
4 "context"
5 "git.sr.ht/~marcc/filed/store"
6 "log/slog"
7 "os"
8 "os/exec"
9 "runtime"
10 "syscall"
11
12 "bazil.org/fuse"
13 "bazil.org/fuse/fs"
14 )
15
16 type JobDir struct {
17 state string
18 manager *JobManager
19 inode uint64
20 }
21
22 func (jd JobDir) Attr(ctx context.Context, a *fuse.Attr) error {
23 fileMeta, err := jd.manager.store.GetFileMeta(jd.inode)
24 if err != nil {
25 slog.Error("Could not retrieve file metadata", "error", err)
26 return syscall.EIO
27 } else if fileMeta != nil {
28 a.Mode = os.FileMode(fileMeta.Mode)
29 a.Gid = fileMeta.GID
30 a.Uid = fileMeta.UID
31 a.Inode = jd.inode
32
33 } else {
34 a.Mode = os.ModeDir | 0o750
35 a.Gid = uint32(os.Getgid())
36 a.Uid = uint32(os.Getuid())
37 a.Inode = jd.inode
38 }
39 return nil
40 }
41
42 func (jd JobDir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
43 slog.Warn("FUSE: Changing access permissions")
44 defaultMode, err := jd.manager.store.GetFileMeta(jd.inode)
45 if defaultMode == nil {
46 defaultMode = &store.FileMeta{
47 Inode: jd.inode,
48 Mode: uint32(os.ModeDir | 0o750),
49 UID: uint32(os.Getgid()),
50 GID: uint32(os.Getuid()),
51 }
52
53 }
54 err = Setattr(jd.manager.store, defaultMode, ctx, req, resp)
55 if err != nil {
56 return err
57 }
58
59 return nil
60 }
61
62 func (jd JobDir) ReadDirAll(ctx context.Context) (entries []fuse.Dirent, err error) {
63 jobs, err := jd.manager.store.ListJobsByState(jd.state)
64 if err != nil {
65 slog.Error("Could not find jobs", "error", err)
66 return entries, nil
67 }
68 for _, job := range jobs {
69 entries = append(entries, fuse.Dirent{Name: job.ID, Type: fuse.DT_File})
70 }
71 return entries, nil
72 }
73
74 func (jd JobDir) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
75 slog.Info("Removing job", "id", req.Name)
76 if jd.state == store.StateRunning {
77 val, ok := jd.manager.activeJobs.Load(req.Name)
78 if !ok {
79 // Assume that job is already complete
80 return jd.manager.store.DeleteJob(req.Name)
81 }
82 activeJob := val.(*ActiveJob)
83 activeJob.cancel()
84 }
85 return jd.manager.store.DeleteJob(req.Name)
86 }
87
88 func (d JobDir) Rename(ctx context.Context, req *fuse.RenameRequest, newDir fs.Node) error {
89 if d.state == store.StateRunning {
90 slog.Warn("Refusing to restart a currently running job", "id", req.OldName)
91 return syscall.EPERM
92 }
93
94 if _, ok := newDir.(PendingDir); !ok {
95 slog.Warn("Jobs can only be moved to 'pending' (to restart) or deleted", "dest", newDir)
96 return syscall.EPERM
97 }
98
99 if req.OldName != req.NewName {
100 // XXX Maybe support?
101 slog.Warn("Renaming job IDs is not supported", "old", req.OldName, "new", req.NewName)
102 return syscall.EPERM
103 }
104
105 slog.Info("Restarting job", "id", req.OldName)
106
107 err := d.manager.store.ResetJob(req.OldName)
108 d.manager.notify()
109
110 if err != nil {
111 slog.Error("Failed to restart job in DB", "error", err)
112 return syscall.ENOENT
113 }
114
115 return nil
116 }
117
118 func (jd JobDir) Lookup(ctx context.Context, name string) (fs.Node, error) {
119 slog.Debug("FUSE: Jobdir Lookup", "name", name)
120 job, err := jd.manager.store.GetJob(name)
121 if err != nil {
122 slog.Warn("FUSE: Not found", "name", name)
123 return nil, syscall.ENOENT
124 }
125 if job.State == jd.state {
126 slog.Debug("FUSE: Found job", "id", job.ID)
127 return &File{job, jd.manager}, nil
128 } else {
129 return nil, syscall.ENOENT
130 }
131 }
132
133 func Unmount(dir string) error {
134 if runtime.GOOS == "linux" {
135 cmd := exec.Command("fusermount", "-u", "-z", dir)
136 return cmd.Run()
137 }
138 return fuse.Unmount(dir)
139 }