filed
Job queue using FUSE
git clone git://mccd.space/filed
| Log | Files | Refs | README | LICENSE |
pendingdir.go (4468B)
1 package main
2
3 import (
4 "bytes"
5 "context"
6 "git.sr.ht/~marcc/filed/store"
7 "log/slog"
8 "os"
9 "syscall"
10
11 "bazil.org/fuse"
12 "bazil.org/fuse/fs"
13 )
14
15 type PendingDir struct {
16 manager *JobManager
17 inode uint64
18 }
19
20 func (pd PendingDir) Attr(ctx context.Context, a *fuse.Attr) error {
21 fileMeta, err := pd.manager.store.GetFileMeta(pd.inode)
22 if err != nil {
23 slog.Error("Could not retrieve file metadata", "error", err)
24 return syscall.EIO
25 } else if fileMeta != nil {
26 a.Mode = os.FileMode(fileMeta.Mode)
27 a.Gid = fileMeta.GID
28 a.Uid = fileMeta.UID
29 a.Inode = pd.inode
30
31 } else {
32 a.Mode = os.ModeDir | 0o750
33 a.Gid = uint32(os.Getgid())
34 a.Uid = uint32(os.Getuid())
35 a.Inode = pd.inode
36 }
37 return nil
38 }
39
40 func (pd *PendingDir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
41 slog.Warn("FUSE: Changing access permissions")
42 defaultMode, err := pd.manager.store.GetFileMeta(pd.inode)
43 if defaultMode == nil {
44 defaultMode = &store.FileMeta{
45 Inode: pd.inode,
46 Mode: uint32(os.ModeDir | 0o750),
47 UID: uint32(os.Getgid()),
48 GID: uint32(os.Getuid()),
49 }
50
51 }
52 err = Setattr(pd.manager.store, defaultMode, ctx, req, resp)
53 if err != nil {
54 return err
55 }
56
57 return nil
58 }
59
60 func (pd PendingDir) ReadDirAll(ctx context.Context) (entries []fuse.Dirent, err error) {
61 jobs, err := pd.manager.store.ListJobsByState(store.StatePending)
62 if err != nil {
63 slog.Error("FUSE: Could not find jobs", "error", err)
64 return entries, nil
65 }
66 for _, job := range jobs {
67 entries = append(entries, fuse.Dirent{Name: job.ID, Type: fuse.DT_File})
68 }
69 return entries, nil
70 }
71 func (pd PendingDir) Create(ctx context.Context, req *fuse.CreateRequest, resp *fuse.CreateResponse) (fs.Node, fs.Handle, error) {
72 slog.Info("Creating job file", "name", req.Name)
73
74 f := &JobCreationFile{
75 id: req.Name,
76 manager: pd.manager,
77 uid: req.Uid,
78 gid: req.Gid,
79 mode: req.Mode,
80 }
81 return f, f, nil
82 }
83 func (pd PendingDir) Lookup(ctx context.Context, name string) (fs.Node, error) {
84 slog.Debug("FUSE: Lookup", "name", name)
85 job, err := pd.manager.store.GetJob(name)
86 if err != nil {
87 return nil, syscall.ENOENT
88 }
89 if job.State == store.StatePending {
90 slog.Debug("FUSE: Found job", "id", job.ID)
91 return &File{job, pd.manager}, nil
92 } else {
93 return nil, syscall.ENOENT
94 }
95 }
96
97 type File struct {
98 job *store.Job
99 manager *JobManager
100 }
101
102 func (f File) Attr(ctx context.Context, a *fuse.Attr) error {
103 // Append 20 to avoid collission with static files
104 a.Inode = uint64(f.job.INode + 20)
105 slog.Debug("FUSE", "inode", a.Inode)
106 a.Mode = 0o750
107 a.Gid = uint32(os.Getgid())
108 a.Uid = uint32(os.Getuid())
109 a.Mtime = f.job.UpdatedAt
110 a.Ctime = f.job.CreatedAt
111 res, err := f.readContent()
112 if err != nil {
113 a.Size = 0
114 return nil
115 }
116 a.Size = uint64(len(res))
117 return nil
118 }
119
120 func (f *File) ReadAll(ctx context.Context) ([]byte, error) {
121 slog.Debug("FUSE: Read file content")
122 return f.readContent()
123 }
124
125 // Combines Command + Output (Live or DB)
126 func (f *File) readContent() ([]byte, error) {
127 var output []byte
128
129 liveLog := f.manager.GetLiveLog(f.job.ID)
130
131 if liveLog != nil {
132 slog.Debug("FUSE: Using live output")
133 output = liveLog
134 } else {
135 slog.Debug("FUSE: Using job output")
136 output = f.job.Output
137 }
138
139 var buf bytes.Buffer
140 buf.WriteString(">>> ")
141 buf.WriteString(f.job.Command)
142 buf.WriteString("\n")
143 buf.Write(output)
144
145 return buf.Bytes(), nil
146 }
147
148 // Used before adding to the database. Lives in memory until the file is closed.
149 type JobCreationFile struct {
150 id string
151 manager *JobManager
152 buf bytes.Buffer
153 uid uint32
154 gid uint32
155 mode os.FileMode
156 }
157
158 func (f *JobCreationFile) Attr(ctx context.Context, a *fuse.Attr) error {
159 a.Mode = f.mode
160 a.Size = uint64(f.buf.Len())
161 a.Uid = f.uid
162 a.Gid = f.gid
163 return nil
164 }
165
166 func (f *JobCreationFile) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {
167 n, err := f.buf.Write(req.Data)
168 resp.Size = n
169 return err
170 }
171
172 func (f *JobCreationFile) Release(ctx context.Context, req *fuse.ReleaseRequest) error {
173 command := f.buf.String()
174
175 slog.Info("New job", "id", f.id, "command", command)
176
177 _, err := f.manager.store.CreateJob(f.id, command)
178 f.manager.notify()
179 if err != nil {
180 // XXX Should check specifically for file exist error
181 slog.Error("Failed to create job. Is the job name unique? I.E. no pending/completed/failed jobs with same name.", "error", err)
182 return syscall.EEXIST
183 }
184 return nil
185 }